RabbitMQ常见问题之延迟消息

文章目录

  • 一、死信交换机
  • 二、TTL
      • 1. Queue指定死信交换机并设置TTL
      • 2. 消息设置TTL
  • 三、延迟队列
      • 1. SpringAMQP创建延迟队列
      • 2. 设置消息延迟
      • 3. 测试

一、死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而
这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
在这里插入图片描述

二、TTL

在这里插入图片描述

如果messagequeue都有ttl,采用更小的一方。

1. Queue指定死信交换机并设置TTL

@Configuration
public class CommonConfig {@Beanpublic DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000).deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}
}

2. 消息设置TTL

@Test
public void testTTLMessage(){Message message = MessageBuilder.withBody("hello ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000").build();rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);log.info("ttl消息已发送");
}

借助TTL机制可以用死信交换机模拟延迟队列,但是设计上比较牵强,性能不好。

三、延迟队列

这是官方提供的一些额外插件
https://www.rabbitmq.com/community-plugins.html

下载其中的DelayExchange插件,把.ez文件挂载到RabbitMQ容器的/plugins目录下,然后进入容器,执行

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
root@7c4ba266e5bc:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@7c4ba266e5bc:
rabbitmq_delayed_message_exchange
The following plugins have been configured:rabbitmq_delayed_message_exchangerabbitmq_managementrabbitmq_management_agentrabbitmq_prometheusrabbitmq_web_dispatch
Applying plugin configuration to rabbit@7c4ba266e5bc...
The following plugins have been enabled:rabbitmq_delayed_message_exchangestarted 1 plugins.

1. SpringAMQP创建延迟队列

基于@RabbitListener或者基于@Bean都可以。

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listenDelayExchange(String msg){log.info("消费者接收到delay.queue的延迟消息:【" + msg + "】");}

在这里插入图片描述

2. 设置消息延迟

这个插件只能在消息上设置延迟时间,没有队列设置延迟时间的概念,不过都是一样的。
message要在Header上添加一个x-delay

    @Testpublic void testDelayMessage(){Message message = MessageBuilder.withBody("hello delay".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader("x-delay", 5000).build();// confirm callbackCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);log.info("发送消息成功");}

3. 测试

直接运行测试,可能会报错,因为rabbitmq意识到消息到了exchange却没有立即到queue,被认为错误,回调returnback,所以我们在ReturnCallBack中绕过这个限制。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{//check if is delay messageif (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {return;}log.error("消息发送到queue失败,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",replyCode, replyText, exchange, routingKey, message.toString());});}
}

运行Test测试,可以看到Test方面,消息发送的时间为21:09:13

21:09:13:516  INFO 25468 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2063c53e:0/SimpleConnection@6415f61e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 62470]
21:09:13:557  INFO 25468 --- [           main] cn.itcast.mq.spring.SpringAmqpTest       : 发送消息成功

listener方面消息消费的时间为21:09:18,刚好5s。

21:08:31:952  INFO 19532 --- [           main] cn.itcast.mq.ConsumerApplication         : Started ConsumerApplication in 1.735 seconds (JVM running for 2.357)
21:09:18:583  INFO 19532 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到delay.queue的延迟消息:【hello delay】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/632221.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

行列转化【附加面试题】

在MySQL中,行列转换是一种常见的操作。它包括行转列和列转行两种情况。 行转列:行转列是将表中的某些行转换成列,以提供更为清晰、易读的数据视图。例如,假设我们有一个包含科目和分数的表,我们可以使用SUM和CASE语句…

python使用Apache+mod_wsgi部署Flask

python使用Apachemod_wsgi部署Flask 一、安装python环境(V3.10.10)二、安装mod_wsgi三、安装Apache1、下载2、解压3、配置 四、安装项目依赖五、启动六、基于多端口部署多个flask项目 一、安装python环境(V3.10.10) 安装时勾选&q…

Spring重要知识点

一、Spring中相关概念 1.IOC 控制反转 IoC(Inverse of Control:控制反转)是⼀种设计思想,就是将原本在程序中⼿动创建对象的控制权,交由Spring框架来管理。IoC 在其他语⾔中也有应⽤,并⾮ Spring 所独有。 IoC 容器…

06-数据容器(字典)基础知识0基础来学

为什么需要字典 可以提供通过某个东西找到某个东西 """ 演示数据容器字典的定义 ​ """ #定义字典 my_dict1{"王力宏":99,"周结论":88,"林俊杰":77} #定义空字典 my_dict2{} my_dict3dict() print(f"字典1的…

软件设计师6--流水线技术

软件设计师6--流水线技术 考点1:流水线--概念考点2:流水线--流水线计算考点3:流水线--流水线吞吐率计算例题: 考点1:流水线–概念 相关参数计算: 流水线执行时间计算、流水线吞吐率、流水线加速比、流水线…

Verilog刷题笔记15

题目: An adder-subtractor can be built from an adder by optionally negating one of the inputs, which is equivalent to inverting the input then adding 1. The net result is a circuit that can do two operations: (a b 0) and (a ~b 1). See Wikipe…

力扣刷MySQL-第四弹(详细讲解)

🎉欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克🍹 ✨博客主页:小小恶斯法克的博客 🎈该系列文章专栏:力扣刷题讲解-MySQL 🍹文章作者技术和水平很有限,如果文中出…

[go语言]数据类型

目录 知识结构 整型、浮点型 1.整型 2.浮点型 复数、布尔类型 1.复数 2.布尔类型 字符与字符串 1.字符串的格式化 2.字符串的截取 3.格式化好的字符串赋值给量 4.字符串的转换 5.strings包 知识结构 整型、浮点型 1.整型 在Go语言中,整型数据是一种基…

MySQL三大日志

1. redo log 1.1 特点 InnoDB存储引擎独有物理日志,记录在数据页上做的修改让MySQL拥有了崩溃恢复能力,保证事务的持久性 1.2 刷盘时机 事务提交时log buffer 空间使用大约一半时事务日志缓冲区满InnoDB 定期执行检查点Checkpoint后台刷新线程&#…

短视频代运营抖音项目规划管理计划模板

【干货资料持续更新,以防走丢】 短视频代运营抖音项目规划管理计划模板 部分资料预览 资料部分是网络整理,仅供学习参考。 短视频代运营模板(完整资料包含以下内容) 目录 具体的表格设计和内容可能因不同的情况和需求而有所变…

移动端开发进阶之蓝牙通讯(四)

移动端开发进阶之蓝牙通讯(四) 在移动端开发实践中,可能会要求在不同的设备之间切换,从而提升用户体验; 或者为了提升设备的利用率,实现设备之间的连接和协同工作; 不得不通过多端连接,将多个设备连接在一起,实现设备之间的数据共享、远程控制等功能,根据具体的应用…

STC8H8K蓝牙智能巡线小车——1. 环境搭建(基于RTX51操作系统)

1. 基本介绍 开发环境准备:Keil uVision5 烧录软件:STC-ISP(V6.92A) 芯片: STC8H8K64U-45I-LQFP64 芯片引脚: 2.创建项目 打开Keil,点击【Project】,选择【new uVersion proje…

LeetCode刷题16:滑动窗口解决209. 长度最小的子数组

题目陈述: 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续子数组 [numsl, numsl1, ..., numsr-1, numsr] ,并返回其长度。如果不存在符合条件的子数组,返回 0 。 示例 1&a…

【控制篇 / 分流】(7.4) ❀ 03. 对国内和国际IP网段访问进行分流 ❀ FortiGate 防火墙

【简介】公司有两条宽带用来上网,一条电信,一条IPLS国际专线,由于IPLS仅有2M,且价格昂贵,领导要求,访问国内IP走电信,国际IP走IPLS,那么应该怎么做? 国内IP地址组 我们已…

深度学习(2)--卷积神经网络(CNN)

卷积神经网络(Convolutional Neural Networks)是一种深度学习模型或类似于人工神经网络的多层感知器,常用来分析视觉图像。 一.卷积网络基础概念 传统网络是一维的,而卷积网络是三维的。 例如32x32x3的图片,在传统网…

滑动窗口经典入门题-——长度最小子数组

文章目录 算法原理题目解析暴力枚举法的代码优化第一步初始化第二步right右移第三步left右移 滑动窗口法的代码 算法原理 滑动窗口是一种在序列(例如数组或链表)上解决问题的算法模式。它通常用于解决子数组或子字符串的问题,其中滑动窗口表示…

C#: form 窗体的各种操作

说明:记录 C# form 窗体的各种操作 1. C# form 窗体居中显示 // 获取屏幕的宽度和高度 int screenWidth Screen.PrimaryScreen.Bounds.Width; int screenHeight Screen.PrimaryScreen.Bounds.Height;// 设置窗体的位置 this.StartPosition FormStartPosition.M…

2024.1.17每日一题

LeetCode 2744.最大字符串配对数目 2744. 最大字符串配对数目 - 力扣(LeetCode) 题目描述 给你一个下标从 0 开始的数组 words ,数组中包含 互不相同 的字符串。 如果字符串 words[i] 与字符串 words[j] 满足以下条件,我们称…

【ubuntu】docker中如何ping其他ip或外网

docker中如何ping其他ip或外网 示例图: 运行下面命令: docker run -it --namehei busybox看情况需要加权限 sudo,即: sudo docker run -it --namehei busyboxping 外网 ping -c 4 www.baidu.comping 内网 ping -c 4 192.168.…

【多线程】认识Thread类及其常用方法

📄前言: 本文是对以往多线程学习中 Thread类 的介绍,以及对其中的部分细节问题进行总结。 文章目录 一. 线程的 创建和启动🍆1. 通过继承 Thread 类创建线程🍅2. 通过实现 Runnable 接口创建线程🥦3. 其他方…