RabbitMQ常见问题之延迟消息

文章目录

  • 一、死信交换机
  • 二、TTL
      • 1. Queue指定死信交换机并设置TTL
      • 2. 消息设置TTL
  • 三、延迟队列
      • 1. SpringAMQP创建延迟队列
      • 2. 设置消息延迟
      • 3. 测试

一、死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而
这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
在这里插入图片描述

二、TTL

在这里插入图片描述

如果messagequeue都有ttl,采用更小的一方。

1. Queue指定死信交换机并设置TTL

@Configuration
public class CommonConfig {@Beanpublic DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000).deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}
}

2. 消息设置TTL

@Test
public void testTTLMessage(){Message message = MessageBuilder.withBody("hello ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000").build();rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);log.info("ttl消息已发送");
}

借助TTL机制可以用死信交换机模拟延迟队列,但是设计上比较牵强,性能不好。

三、延迟队列

这是官方提供的一些额外插件
https://www.rabbitmq.com/community-plugins.html

下载其中的DelayExchange插件,把.ez文件挂载到RabbitMQ容器的/plugins目录下,然后进入容器,执行

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
root@7c4ba266e5bc:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@7c4ba266e5bc:
rabbitmq_delayed_message_exchange
The following plugins have been configured:rabbitmq_delayed_message_exchangerabbitmq_managementrabbitmq_management_agentrabbitmq_prometheusrabbitmq_web_dispatch
Applying plugin configuration to rabbit@7c4ba266e5bc...
The following plugins have been enabled:rabbitmq_delayed_message_exchangestarted 1 plugins.

1. SpringAMQP创建延迟队列

基于@RabbitListener或者基于@Bean都可以。

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listenDelayExchange(String msg){log.info("消费者接收到delay.queue的延迟消息:【" + msg + "】");}

在这里插入图片描述

2. 设置消息延迟

这个插件只能在消息上设置延迟时间,没有队列设置延迟时间的概念,不过都是一样的。
message要在Header上添加一个x-delay

    @Testpublic void testDelayMessage(){Message message = MessageBuilder.withBody("hello delay".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader("x-delay", 5000).build();// confirm callbackCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);log.info("发送消息成功");}

3. 测试

直接运行测试,可能会报错,因为rabbitmq意识到消息到了exchange却没有立即到queue,被认为错误,回调returnback,所以我们在ReturnCallBack中绕过这个限制。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{//check if is delay messageif (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {return;}log.error("消息发送到queue失败,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",replyCode, replyText, exchange, routingKey, message.toString());});}
}

运行Test测试,可以看到Test方面,消息发送的时间为21:09:13

21:09:13:516  INFO 25468 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2063c53e:0/SimpleConnection@6415f61e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 62470]
21:09:13:557  INFO 25468 --- [           main] cn.itcast.mq.spring.SpringAmqpTest       : 发送消息成功

listener方面消息消费的时间为21:09:18,刚好5s。

21:08:31:952  INFO 19532 --- [           main] cn.itcast.mq.ConsumerApplication         : Started ConsumerApplication in 1.735 seconds (JVM running for 2.357)
21:09:18:583  INFO 19532 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到delay.queue的延迟消息:【hello delay】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/632221.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

行列转化【附加面试题】

在MySQL中,行列转换是一种常见的操作。它包括行转列和列转行两种情况。 行转列:行转列是将表中的某些行转换成列,以提供更为清晰、易读的数据视图。例如,假设我们有一个包含科目和分数的表,我们可以使用SUM和CASE语句…

python使用Apache+mod_wsgi部署Flask

python使用Apachemod_wsgi部署Flask 一、安装python环境(V3.10.10)二、安装mod_wsgi三、安装Apache1、下载2、解压3、配置 四、安装项目依赖五、启动六、基于多端口部署多个flask项目 一、安装python环境(V3.10.10) 安装时勾选&q…

Spring重要知识点

一、Spring中相关概念 1.IOC 控制反转 IoC(Inverse of Control:控制反转)是⼀种设计思想,就是将原本在程序中⼿动创建对象的控制权,交由Spring框架来管理。IoC 在其他语⾔中也有应⽤,并⾮ Spring 所独有。 IoC 容器…

06-数据容器(字典)基础知识0基础来学

为什么需要字典 可以提供通过某个东西找到某个东西 """ 演示数据容器字典的定义 ​ """ #定义字典 my_dict1{"王力宏":99,"周结论":88,"林俊杰":77} #定义空字典 my_dict2{} my_dict3dict() print(f"字典1的…

软件设计师6--流水线技术

软件设计师6--流水线技术 考点1:流水线--概念考点2:流水线--流水线计算考点3:流水线--流水线吞吐率计算例题: 考点1:流水线–概念 相关参数计算: 流水线执行时间计算、流水线吞吐率、流水线加速比、流水线…

Verilog刷题笔记15

题目: An adder-subtractor can be built from an adder by optionally negating one of the inputs, which is equivalent to inverting the input then adding 1. The net result is a circuit that can do two operations: (a b 0) and (a ~b 1). See Wikipe…

力扣刷MySQL-第四弹(详细讲解)

🎉欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克🍹 ✨博客主页:小小恶斯法克的博客 🎈该系列文章专栏:力扣刷题讲解-MySQL 🍹文章作者技术和水平很有限,如果文中出…

JavaScripts笔记I(基础)

JavaScripts基础 js是一门编程语言,可以实现很多的网页交互效果。 JavaScript脚本语言具有以下特点: (1)脚本语言。JavaScript是一种解释型的脚本语言,C、C等语言先编译后执行,而JavaScript是在程序的运行过程中逐行…

macOS - md5 | md5sum

文章目录 简单使用介绍文档Linux - md5summacOS - md5 大文件传输是否完整,你可以使用 md5 进行校验 linux 上使用 md5sum 命令,在macOS 上 md5 命令是和 md5sum 等效的 简单使用介绍 参考:https://blog.csdn.net/cnds123321/article/detail…

[go语言]数据类型

目录 知识结构 整型、浮点型 1.整型 2.浮点型 复数、布尔类型 1.复数 2.布尔类型 字符与字符串 1.字符串的格式化 2.字符串的截取 3.格式化好的字符串赋值给量 4.字符串的转换 5.strings包 知识结构 整型、浮点型 1.整型 在Go语言中,整型数据是一种基…

计算机基础之输入设备,输出设备及中央处理器

输入、输出设备和中央处理器 输入设备 输入设备用于接收用户输入的命令、程序、图像和视频等,专门负责将现实中的信息转换成计算机能识别的二进制编码,并放入内存。是计算机与用户或其他设备通信的桥梁。 常用的输入设备: 键盘&#xff1…

MySQL三大日志

1. redo log 1.1 特点 InnoDB存储引擎独有物理日志,记录在数据页上做的修改让MySQL拥有了崩溃恢复能力,保证事务的持久性 1.2 刷盘时机 事务提交时log buffer 空间使用大约一半时事务日志缓冲区满InnoDB 定期执行检查点Checkpoint后台刷新线程&#…

短视频代运营抖音项目规划管理计划模板

【干货资料持续更新,以防走丢】 短视频代运营抖音项目规划管理计划模板 部分资料预览 资料部分是网络整理,仅供学习参考。 短视频代运营模板(完整资料包含以下内容) 目录 具体的表格设计和内容可能因不同的情况和需求而有所变…

移动端开发进阶之蓝牙通讯(四)

移动端开发进阶之蓝牙通讯(四) 在移动端开发实践中,可能会要求在不同的设备之间切换,从而提升用户体验; 或者为了提升设备的利用率,实现设备之间的连接和协同工作; 不得不通过多端连接,将多个设备连接在一起,实现设备之间的数据共享、远程控制等功能,根据具体的应用…

STC8H8K蓝牙智能巡线小车——1. 环境搭建(基于RTX51操作系统)

1. 基本介绍 开发环境准备:Keil uVision5 烧录软件:STC-ISP(V6.92A) 芯片: STC8H8K64U-45I-LQFP64 芯片引脚: 2.创建项目 打开Keil,点击【Project】,选择【new uVersion proje…

LeetCode刷题16:滑动窗口解决209. 长度最小的子数组

题目陈述: 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续子数组 [numsl, numsl1, ..., numsr-1, numsr] ,并返回其长度。如果不存在符合条件的子数组,返回 0 。 示例 1&a…

RPM命令详解2---查询验证

一、RPM查询 rpm {-q|–query} [select-options] [query-options] select-options [PACKAGE_NAME] [-a,–all] [-f,–file FILE] [-g,–group GROUP] {-p,–package PACKAGE_FILE] [–hdrid SHA1] [–pkgid MD5] [–tid TID] [–querybynumber HDRNUM] [–triggeredby PACKAGE…

【控制篇 / 分流】(7.4) ❀ 03. 对国内和国际IP网段访问进行分流 ❀ FortiGate 防火墙

【简介】公司有两条宽带用来上网,一条电信,一条IPLS国际专线,由于IPLS仅有2M,且价格昂贵,领导要求,访问国内IP走电信,国际IP走IPLS,那么应该怎么做? 国内IP地址组 我们已…

Spring之BeanDefinition

BeanDefinition BeanDefinition表示Bean定义,BeanDefinition中存在很多属性用来描述一个Bean的特点。 比如: - class,表示Bean类型 - scope,表示Bean作用域,单例或原型等 - lazyInit:表示Bean是否是懒加载…

JavaScript之判断是否整数、取余、取整、进制、位或、ES6

MENU 方法一方式二方式三方式四方式五结束语 方法一 使用取余运算符判断,利用任何整数都会被1整除的原理,即余数是0的特点,通过这个规则来判断是否是整数。 let isInteger (val) > val % 1 0;// true isInteger(5); // false isInteger(…