【RabbitMQ】死信队列、延迟队列

死信队列

死信,简单理解就是因为种种原因,无法被消费的消息。

有死信,自然就有死信队列。当一个消息在一个队列中变成死信消息之后,就会被重新发送到另一个交换器中,这个交换器就是DLX(Dead Letter Exchange),绑定该交换器的队列,就被称为死信队列DLQ(Dead Letter Queue)。

消息变成死信消息一般是由于以下几条:

  • 队列达到最大长度
  • 消息过期
  • 消息被拒绝,即消息确认机中手动确认的两种拒绝情况,并且不允许重新入队

队列达到最大长度

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-test
@Configuration
public class DeadConfig {// 正常队列,当正常队列中的消息出现一些不确定情况时,消息就会进入死信交换机中@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead.maxLength(3) // 设置队列的最大长度为3.build();}@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalQueueBind")public Binding normalQueueBind(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}// 死信队列@Bean("deadQueue")public Queue deadQueue() {return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}@Bean("deadExchange")public Exchange deadExchange() {return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}@Bean("deadQueueBind")public Binding deadQueueBind(@Qualifier("deadQueue") Queue queue,@Qualifier("deadExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dead").noargs();}}
@RestController
@RequestMapping("/dead")
public class DeadController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void deadQueue() {this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信");System.out.println("正常队列发送消息成功");}}
@Configuration
public class DeadListener {@RabbitListener(queues = Constants.DEAD_QUEUE)public void deadListener(String msg) {System.out.println("死信队列接收到消息:" + msg);}}

在上述代码中,主要内容是声明了正常队列、交换机和绑定关系以及声明死信队列、死信交换机以及其绑定关系、正常队列的生产者代码、死亡队列的消费者代码。

队列达到最大长度和死信消息要转发到的DLX和路由键都是由正常队列在声明时进行绑定的。

启动上述程序之后,当正常队列存在三条消息之时,假设再来消息,那么消息就要进入死信交换机,从而路由到死信队列了。如下图可以看出,当发送第四条消息之后,死信队列的消费者就消费了一条消息:

在上述图片中,D表示队列是持久化的,Lim表示队列有最大长度,DLX表示队列存在死信交换机、DLK表示队列存在路由键。把鼠标放在这些字母上方,详细的消息都会表示。

在下述代码中,主要是对上述代码改进之后地方的指出,并没有把所有的代码全部给出。

消息过期

消息过期分为两种,一种是设置队列过期时间让消息过期,另一种是设置消息过期时间让消息过期,都可以进行测试。

设置队列过期时间

    @Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead
//                .maxLength(3) // 设置队列的最大长度为3.ttl(5 * 1000) // 设置队列的过期时间为5秒.build();}

 由上图以及结合代码可以看出,将消息由正常生产者发送给Broker之后,大概5秒钟之后,消息过期。此时消息就会发送给死信交换机,从而交给其对应的消费者消费。

设置消息的过期时间

@Slf4j
@RestController
@RequestMapping("/dead")
public class DeadController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void deadQueue() {// 设置消息的过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");return message;}};this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信", messagePostProcessor);log.info("死信队列发送成功");}}

同样,结合上图和代码来说,19秒的时候消息发送功,24秒的时候死信消费者消费消息成功。

消息被拒绝

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制,手动确认
@Slf4j
@Configuration
public class DeadListener {// 正常队列接收消息@RabbitListener(queues = Constants.NORMAL_QUEUE)public void normalListener(Channel channel, String msg, Message message) throws IOException {try {log.info("正常队列监听器接收消息:{}", msg);int num = 3 / 0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {log.error("正常队列监听器接收消息异常:{}", e.getMessage());channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}// 死信队列接收消息@RabbitListener(queues = Constants.DEAD_QUEUE)public void deadListener(String msg, Channel channel, Message message) throws IOException {try {log.info("死信队列监听器接收消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}

由上图以及代码可以看到,当消息的确认机制是手动确认时,当出现异常并且拒绝消息重新入队以后,消息就会来到死信队列中。

使用场景

用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。

消息重试:将死信消息发送到原队列或另一个队列进行重试处理。

消息丢弃:直接丢弃这些无法处理的消息,避免占用系统资源。

日志收集:将死信消息做为日志收集起来,用户后续分析和问题定位。

延迟队列

概念

延迟队列就是消息发送之后,并不想让消费者立即拿到消息,而是在等待特定时间之后,消费者才能拿到消息进行消费

应用场景

  1. 用户发起退款后,24小时内商家未处理,默认退款
  2. 用户注册成功后,三天后发送短信,提高用户活跃度
  3. 预定会议后,在会议开始前15分钟提醒众人参加会议
  4. 用户通过手机远程遥控家里的智能设备在指定时间进行工作,这就可以使用延迟队列。用户发送消息到延迟队列,当指定时间到了再将指令推送到智能设备。

实现方法

  1. RabbitMQ本身并没有实现延迟队列,因此可以使用TTL + 死信队列的方式来实现延迟队列。
  2. 安装延迟队列插件来实现延迟队列。

TTL + 死信队列

@Configuration
public class MockDelayConfig {@Bean("mockDelayNormalQueue")public Queue mockDelayNormalQueue() {return QueueBuilder.durable(Constants.MOCk_DELAY_NORMAL_QUEUE).ttl(5000 * 10) // 设置消息过期时间为50秒.deadLetterExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("mock.delay.dead") // 设置死信路由键.build();}@Bean("mockDelayNormalExchange")public Exchange mockDelayNormalExchange() {return ExchangeBuilder.directExchange(Constants.MOCk_DELAY_NORMAL_EXCHANGE).durable(true).build();}@Bean("mockDelayNormalQueueBind")public Binding mockDelayNormalQueueBind(@Qualifier("mockDelayNormalQueue") Queue queue,@Qualifier("mockDelayNormalExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("mock.delay.normal").noargs();}@Bean("mockDelayDeadQueue")public Queue mockDelayDeadQueue() {return QueueBuilder.durable(Constants.MOCK_DELAY_DEAD_QUEUE).build();}@Bean("mockDelayDeadExchange")public Exchange mockDelayDeadExchange() {return ExchangeBuilder.directExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE).durable(true).build();}@Bean("mockDelayDeadQueueBind")public Binding mockDelayDeadQueueBind(@Qualifier("mockDelayDeadQueue") Queue queue,@Qualifier("mockDelayDeadExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("mock.delay.dead").noargs();}}
@Slf4j
@RestController
@RequestMapping("/mockDelay")
public class MockDelayController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void mockDelayQueue() {this.rabbitTemplate.convertAndSend(Constants.MOCk_DELAY_NORMAL_EXCHANGE,"mock.delay.normal", "hello 延迟队列");log.info("延迟队列生产者发送成功");}}
@Slf4j
@Configuration
public class MockDelayListener {@RabbitListener(queues = Constants.MOCK_DELAY_DEAD_QUEUE)public void mockDelayListener(String msg) {log.info("模拟延迟队列消费者接收到消息:" + msg);}}

在上述代码中,实现的功能是生产者发送消息后,消费者在50秒之后获得消息,对消息进行消费:

在TTL一文中,已经说明了RabbitMQ只会检查队首消息是否过期,不会扫描整个队列。因此如果想要放在模拟延迟队列中的消息过期时间不一致,那就会出现死信消息无法被及时处理的情况。因此,我们想要模拟实现延迟队列,就要确保队列中所有消息的过期时间是一致的。如果存在时间不一致的情况,我们就可以使用不同的模拟延迟队列来实现。

延迟队列插件

下载插件:官方网站进行下载(注意版本对应关系)

启动插件

rabbitma-plusins list // 查看插件列表rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启动插件service rabbitmq-server restart # 重启服务

如下图,当交换机中有了x-delayed-message就表示延迟插件安装成功 

代码测试

@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed() // 延迟交换机.durable(true) // 持久化.build();}@Bean("delayQueueBind")public Binding delayQueueBind(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();}}
@Slf4j
@RestController
@RequestMapping("/delay")
public class DelayController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void delayQueue() {for(int i = 0; i < 5; i++) {// 随机生成延迟时间Random random = new Random();int time = random.nextInt(20);// 消息处理器,设置延迟时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong((long) (time * 1000)); // 设置延迟时间return message;}};// 发送消息到延迟队列this.rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "hello 延迟队列 " + i, messagePostProcessor);log.info("发送延迟队列第" + i + "消息成功,延迟时间为:" + time);}}}
@Slf4j
@Configuration
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void delayListener(String msg) {log.info("延迟队列监听器,接收到的消息:{}", msg);}}

本质上,延迟插件就是让消息停留在交换机中,等到延迟时间结束之后,再发送到对应的队列中去。 

两者对比

使用TTL + 死信队列的好处是不需要额外安装插件。缺点是受消息的延迟时间影响,同一个队列中的消息必须延迟时间相同。

使用延迟队列插件的好处是不受延迟时间影响,同一队列中的所有消息延迟时间可以不同,额外的插件使得延迟队列的实现比较容易。缺点是需要依赖特定的插件,并且插件的版本必须和对应的RabbitMQ相对应。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/55002.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

连锁收银系统的五大功能 选择开源收银系统三要素

连锁收银系统的五大功能&#xff0c;很多新手是不清楚的&#xff0c;老手也只是知道一些大概&#xff0c;今天&#xff0c;商淘云为大家分享收银系统的五大功能&#xff0c;尤其是第五个&#xff0c;大部分人不清楚&#xff0c;有的企业前面选了不合适的收银系统&#xff0c;导…

如何在iPad上设置Chrome为默认浏览器

将Chrome设置为iPad上的默认浏览器&#xff0c;不仅能够享受到谷歌强大的搜索功能和丰富的扩展生态&#xff0c;还能通过一系列自定义设置来进一步提升浏览体验。本文将详细介绍如何在iPad上完成这一设置&#xff0c;并探讨如何通过优化Chrome浏览器的相关功能&#xff0c;让您…

c语言200例 64

大家好&#xff0c;欢迎来到无限大的频道。 今天带领大家来学习c语言。 题目要求&#xff1a; 设计一个进行候选人的选票程序。假设有三位候选人&#xff0c;在屏幕上输入要选择的候选人姓名&#xff0c; 有10次投票机会&#xff0c;最后输出每个人的得票结果。好的&#xff…

【LLM多模态】视频理解模型Cogvlm-video和MVBench评测基准

note Cogvlm-video模型通过视频抽帧&#xff08;24帧&#xff0c;每帧大小为224 x 224&#xff09;后经过ViT进行图像编码&#xff08;ViT中添加了2x2的卷积核更好的压缩视觉信息&#xff09;&#xff0c;使用adapter模块更好的将视觉特征和文本特征对齐&#xff0c;得到的图像…

5--苍穹外卖-SpringBoot项目中菜品管理 详解(一)

目录 公共字段自动填充 问题分析 实现思路 代码开发 步骤一 步骤二 功能测试 新增菜品 需求分析和设计 代码开发 文件上传接口 功能测试 1--苍穹外卖-SpringBoot项目介绍及环境搭建 详解-CSDN博客 2--苍穹外卖-SpringBoot项目中员工管理 详解&#xff08;一&#…

python绘制图像

柱状图 import os# 输入想要存储图像的路径 os.chdir(D:)import matplotlib.pyplot as plt import numpy as np # 改变绘图风格 import seaborn as snssns.set(color_codesTrue)cell [gen7, xgspon, 3081GB, vettel, totalplay, other] pvalue [21, 20, 18, 13, 7, 34]width…

【GUI设计】基于图像分割的GUI系统(6),matlab实现

博主简介&#xff1a; 如需获取设计的完整源代码或者有matlab图像代码项目需求/合作&#xff0c;可联系主页个人简介提供的联系方式或者文末的二维码。博客内容有疑问可联系沟通&#xff08;博主邮箱&#xff1a;3249726188qq.com&#xff09;。 ~~~~~~~~~~~~~~~~~~~~~~~…

实现简易 vuedraggable 的拖拽排序功能

一、案例效果 拖拽计数4实现手动排序 二、案例代码 <draggable:list"searchResult.indicator":group"{ name: indicators }"item-key"field"handle".drag-handle-icon"><divclass"field-item"v-for"(item…

快速创建第一个Spring Boot 项目

一、介绍 Spring Boot 是一个开源的 Java 基础框架&#xff0c;它基于 Spring 框架&#xff0c;用于创建独立、生产级别的基于 Spring 的应用程序&#xff0c;你可以“跑起来”&#xff08;run&#xff09;你的 Spring 应用程序。Spring Boot 让基于 Spring 的应用开发变得更容…

对onlyoffice进行定制化开发

基于onlyoffice8.0源码&#xff0c;进行二次开发&#xff0c;可实现包括但不限于以下的功能 1、内容控件的插入 2、内容空间的批量替换 3、插入文本 4、插入图片 5、添加&#xff0c;去除水印 6、修改同时在线人数限制 7、内容域的删除 8、页面UI的定制化 9、新增插件开发 10、…

生信初学者教程(四):软件

文章目录 RRstudioLinux系统其他软件本书是使用R语言编写的教程,用户需要下载R和RStudio软件用于进行分析。 版权归生信学习者所有,禁止商业和盗版使用,侵权必究 R R语言是一种免费的统计计算和图形化编程语言,是一种用于数据分析和统计建模的强大工具。它具有丰富的统计…

C语言 | Leetcode C语言题解之第429题N叉树的层序遍历

题目&#xff1a; 题解&#xff1a; #define MAX_LEVE_SIZE 1000 #define MAX_NODE_SIZE 10000int** levelOrder(struct Node* root, int* returnSize, int** returnColumnSizes) {int ** ans (int **)malloc(sizeof(int *) * MAX_LEVE_SIZE);*returnColumnSizes (int *)mal…

ArcGIS Desktop使用入门(三)常用工具条——拓扑(下篇:地理数据库拓扑)

系列文章目录 ArcGIS Desktop使用入门&#xff08;一&#xff09;软件初认识 ArcGIS Desktop使用入门&#xff08;二&#xff09;常用工具条——标准工具 ArcGIS Desktop使用入门&#xff08;二&#xff09;常用工具条——编辑器 ArcGIS Desktop使用入门&#xff08;二&#x…

WordPress最佳恶意软件扫描插件:入门级指南

在现代互联网环境中&#xff0c;网站安全已经成为每个网站管理员必须重视的问题。特别是对于使用WordPress的用户来说&#xff0c;由于其普及度高&#xff0c;WordPress网站常常成为黑客的首要攻击目标。幸运的是&#xff0c;有许多优秀的恶意软件扫描插件可以帮助我们保护网站…

[附源码]网上订餐系统+SpringBoot+前后端分离

今天带来一款优秀的项目&#xff1a;网上订餐系统源码 。 系统采用的流行的前后端分离结构&#xff0c;包含了“管理端”&#xff0c;“商家管理端”&#xff0c;“用户购买端” 如果您有任何问题&#xff0c;也请联系小编&#xff0c;小编是经验丰富的程序员&#xff01; 一.…

[000-002-01].第29节:MySQL数据库缓冲池

1、什么是数据缓冲池&#xff1a; 1.InnoDB 存储引擎是以页为单位来管理存储空间的&#xff0c;我们进行的增删改查操作其实本质上都是在访问页面&#xff08;包括读页面、写页面、创建新页面等操作&#xff09;&#xff0c;而磁盘 I/O 需要消耗的时间很多&#xff0c;而在内存…

【Python报错已解决】TypeError: tuple indices must be integers or slices, not str

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

华为HarmonyOS灵活高效的消息推送服务(Push Kit) -- 7 推送卡片刷新消息

场景介绍 如今衣食住行娱乐影音应用占据了大多数人的手机&#xff0c;一部手机可以满足日常大多需求&#xff0c;但对需要经常查看或进行简单操作的应用来说&#xff0c;总需要用户点开应用体验较繁琐。针对此种场景&#xff0c;HarmonyOS提供了Form Kit&#xff08;卡片开发服…

Python | Leetcode Python题解之第437题路径总和III

题目&#xff1a; 题解&#xff1a; class Solution:def pathSum(self, root: TreeNode, targetSum: int) -> int:prefix collections.defaultdict(int)prefix[0] 1def dfs(root, curr):if not root:return 0ret 0curr root.valret prefix[curr - targetSum]prefix[cu…

知识管理数据库

知识管理数据库&#xff0c;可以分为几类&#xff1a; 灵感库、卡片库、作品库。 灵感库&#xff0c;通常是素材&#xff0c;想法。 片库&#xff0c;是完整的&#xff0c;成段落的文字。 作品库&#xff0c;是文章、专栏&#xff0c;或者书籍。 这三者的关系&#xff0c;好比…