【SpringBoot】SpringBoot整合RabbitMQ消息中间件,实现延迟队列和死信队列

  📝个人主页:哈__

期待您的关注 

目录

 一、🔥死信队列

RabbitMQ的工作模式

 死信队列的工作模式

 二、🍉RabbitMQ相关的安装 

三、🍎SpringBoot引入RabbitMQ

1.引入依赖

2.创建队列和交换器

2.1 变量声明 

2.2 创建延迟交换器

2.3 创建延迟队列

2.4 延迟队列绑定延迟交换器

2.5 死信队列配置

3. 添加application.yml

4. 添加RabbitMQListener (消费者)

5. 创建DelayMessageSender 

6. 创建Controller 

7.测试 

四、🍌死信队列的应用场景


 一、🔥死信队列

RabbitMQ的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于接收其他队列中的“死信”消息。所谓“死信”,是指满足一定条件而无法被消费者正确处理的消息,这些条件包括消息被拒绝、消息过期、消息达到最大重试次数等。

当消息成为死信时,RabbitMQ会将其重新发送到指定的死信队列,而不是丢弃它们。这样做的好处是可以对死信进行分析和处理,例如记录日志、重新入队或者进一步处理。

死信队列通常与RabbitMQ的延迟队列(Delayed Message Queue)一起使用,通过延迟队列延迟消息的处理时间,可以更容易地触发消息成为死信的条件,从而进行测试和调试。

死信队列在消息中间件中有许多实际应用场景,主要用于处理无法被正常消费的消息,增强了消息的可靠性和处理能力。以下是一些常见的应用场景:

  1. 延迟消息处理:通过将消息发送到延迟队列,在指定的时间后再将消息发送到目标队列,实现延迟处理消息的功能。

  2. 消息重试:当消费者无法处理消息时,消息可以被重新发送到队列并设置重试次数,达到最大重试次数后转发到死信队列,以便进行进一步处理。

  3. 异常处理:当消息无法被消费者正常处理时(如格式错误、业务异常等),将消息转发到死信队列,用于记录日志、报警或人工处理。

  4. 消息超时处理:当消息在队列中等待时间过长时,可以设置消息的过期时间(TTL),超过时间后将消息转发到死信队列。

  5. 消息路由失败:当消息无法被正确路由到目标队列时,可以将消息发送到死信队列,避免消息丢失。

  6. 消息版本兼容性处理:当消息的格式或内容发生变化时,通过死信队列可以处理老版本消息,确保新版本系统的兼容性。


RabbitMQ的工作模式


 死信队列的工作模式

今天我要实现的就是这个延迟队列和死信队列。生产者首先向延迟队列发送消息,待达到TTL后消息会被转送到死信队列当中,消费者会从死信队列中获取消息进行消费。

 二、🍉RabbitMQ相关的安装 

win10 安装rabbitMQ详细步骤_rabbitmq 安装-CSDN博客

我这里直接引用别人的文章了,下载需要大家去看一看。

RabbitMQ延迟插件的安装。

[超详细]RabbitMQ安装延迟消息插件_rabbitmq安装延迟插件-CSDN博客

三、🍎SpringBoot引入RabbitMQ

1.引入依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency>

2.创建队列和交换器

这一步是很重要的,如果你配置错误了,消息很可能无法正确的传送。要实现延迟队列和死信队列,我们一共要创建以下几个组件:

  1. 延迟队列
  2. 延迟队列的交换器
  3. 死信队列
  4. 死信队列的交换器

在我们创建了这几个组件之后,我们还要干一些事情,我们需要把这些组件进行组装,如果你不了解RabbitMQ的基础,你可以先看看基础教学,我这里简单的说一下。RabbitMQ中有一种绑定方式,这种绑定方式会把BindingKey和RoutingKey完全匹配的进行绑定,如下图所示,生产者发送了一个BindingKey为“warning”的消息,那么这个消息就会被发送到Queue1和Queue2,这并不难理解。

我们要做的就是把队列和交换器通过一个RoutingKey绑定在一起。


2.1 变量声明 

 接下来的代码要好好看了,首先我们把我们后边要用到的名称变量全部定义出来。因为这个名称起的很长,我们不方便直接使用。创建DeadRabbitConfig。在类中定义如下变量,延迟队列交换器名称、延迟队列名称、延迟队列Routing名称。除此之外还有死信队列交换器名称、死信队列名称和死信Routing名称。

  // 延迟队列交换器名称public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";// 延迟队列A名称public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a";// 延迟队列B名称public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b";// 延迟队列routingA名称public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key";// 延迟队列routingB名称public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key";// 死信队列public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key";public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key";public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a";public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";

2.2 创建延迟交换器

// 注册延迟交换器delayExchange@Bean("delayExchange")public DirectExchange delayExchange(){return  new DirectExchange(DELAY_EXCHANGE_NAME);}

2.3 创建延迟队列

这里的延迟队列需要我们额外的配置一些参数,用于和死信队列进行信息发送。这里我是用了两种不同的方式构建延迟队列A和延迟队列B,在延迟队列A种我没有设置TTL参数,而是通过RabbitMQ的延迟插件实现的,而延迟队列B我设置了TTL为10000ms,也就是十秒,十秒内消息如果没有被消费掉就会发送到死信队列。

// 注册延迟队列A   还要绑定死信交换器和死信routingA@Bean("delayQueueA")public Queue delayQueueA(){Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);//args.put("x-message-ttl",6000);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();}// 注册延迟队列B   还要绑定死信交换器和死信routingB@Bean("delayQueueB")public Queue delayQueueB(){Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY);args.put("x-message-ttl",10000);return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();}

2.4 延迟队列绑定延迟交换器

 // 延迟队列A绑定交换器@Beanpublic Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);}// 延迟队列B绑定交换器@Beanpublic Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME);}

2.5 死信队列配置

与延迟队列不同的是,死信队列并没有配置延迟参数。

// 注册死信队列A@Bean("deadLetterQueueA")public Queue deadLetterQueueA(){return new Queue(DEAD_LETTER_QUEUE_A_NAME);}// 注册死信队列B@Bean("deadLetterQueueB")public Queue deadLetterQueueB(){return new Queue(DEAD_LETTER_QUEUE_B_NAME);}// 注册死信交换器@Beanpublic DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 死信队列A绑定死信交换器@Beanpublic Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);}// 死信队列B绑定死信交换器@Beanpublic Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);}

到此为止,RabbitMQ的组件配置完成。


3. 添加application.yml

server:port: 8081
spring:application:name: test-rabbitmq-producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

4. 添加RabbitMQListener (消费者)

下方的代码一共有两个消费者,一个消费者获取死信队列A中的消息,另一个消费者获取死信队列B中的消息。

@Component
public class DeadLetterQueueConsumer {public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);@RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL")public void receiveA(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());LOGGER.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);System.out.println(message.getMessageProperties().getDeliveryTag());channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL")public void receiveB(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());LOGGER.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

5. 创建DelayMessageSender 

这里采用的就是两种不同的方式,一种方式是使用插件来延迟消息的发送,另一种是通过TTL参数。

@Component
public class DelayMessageSender {@ResourceRabbitTemplate rabbitTemplate;public void sendMessage(String msg,Integer delayTimes){switch (delayTimes){case 6:rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(String.valueOf(6000));return message;}});break;case 10:rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg);break;}}
}

6. 创建Controller 

@RestController
@RequestMapping("/student")
public class StudentController {@AutowiredDelayMessageSender messageSender;@RequestMapping("/send-message")public String sendMessage(String msg,Integer delayTimes){System.out.println(new Date());messageSender.sendMessage(msg,delayTimes);return "发送成功";}
}

7.测试 

在浏览器中输入以下地址进入RabbitMQ界面。账号密码都是guest。

http://localhost:15672/

 先来看看我们的初始队列。这里是什么都没有的。


然后我们启动项目后在看。我们刚才创建出来的四个队列全部都被加载了出来。


 使用PostMan发送一次请求。


 我们的请求在17s的时候发送到后端,消息打印在23s,说明我们的延迟队列有效果。


接下来我们测试10s的延迟队列。


 10s后死信队列B成功的接收到了消息。

四、🍌死信队列的应用场景

延迟队列通常用于需要延迟执行某些任务或触发某些事件的场景。例如,在电子商务中,可以使用延迟队列实现订单超时未支付自动取消功能。

  • 1.订单创建

    • 用户下单后,系统生成订单,并将订单信息发送到一个普通队列,同时设置一个TTL(Time-To-Live)为30分钟。
    • 这个队列配置了死信交换机(Dead Letter Exchange, DLX),当消息过期后会被转发到死信队列。
  • 2.等待支付

    • 在30分钟内,用户可以完成支付。如果用户在30分钟内支付完成,系统会从普通队列中移除对应的消息并正常处理订单。
  • 3.订单超时处理

    • 如果用户未在30分钟内完成支付,消息会自动过期并转发到死信交换机,进而转发到死信队列。
  • 4.取消订单

    • 系统有一个专门的消费者监听死信队列。当有消息进入死信队列时,消费者会自动处理这些消息,即取消订单、释放库存,并通知用户订单已取消。
  • 5.定时任务(可选):

    • 虽然死信队列已经提供了超时订单的处理,但为了防止消息丢失或处理延迟,可以设置一个定时任务定期检查订单状态,确保所有超时未支付的订单都得到了处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/24666.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在群晖上通过Docker部署DB-GPT

最近一直有网友在后台私信&#xff0c;发的内容高度统一&#xff0c;只有后面 8 位数字不一样&#xff0c;都是 &#xff03;22232 xxxxxxxx&#xff0c;有谁知道是什么意思吗&#xff1f;在我印象中&#xff0c;这是第二次这么大规模的发类似的字符串了 什么是 DB-GPT ? DB-G…

Linux lvm卷扩容之SSM

介绍 SSM&#xff08;System Storage Manager&#xff09;是系统存储管理器&#xff0c;它是一种统一的命令行界面&#xff0c;用于管理各种存储设备。通过SSM&#xff0c;用户可以方便地管理、配置和监控存储系统。检查关于可用硬驱和LVM卷的信息。显示关于现有磁盘存储设备、…

O2OA(翱途)开发应用平台(v9)开发实战(3)-如何做信息发布

内容管理就是用来发布信息的&#xff0c;比如说发布单位的内部信息&#xff1a;像公司新闻、通知公告、规章制度等等。 接下来我们来介绍一下如何创建&#xff0c;比如我要创建一个栏目&#xff0c;专门用来发布公司的规章制度 需求 规章制度 首先从菜单打开“内容管理设置…

平衡二叉树AVL

平衡二叉树是一种特殊的二叉查找树&#xff0c;其中每个节点的左右子树的高度差不超过1。这种树的平衡性质使其在多种操作下保持较高的效率。 平衡二叉树的定义与性质 严格定义&#xff1a;在平衡二叉树中&#xff0c;任一节点的两个子树的高度最大差别为一&#xff0c;这使得…

Linux卸载RocketMQ教程【带图文命令巨详细】

巨详细Linux卸载RocketMQ教程 #查询rocketmq进程 ps -ef | grep rocketmq #杀掉相关进程 kill -9 进程id #查找安装目录 find / -name runbroker.sh #删除rocketMQ目录 rm -rf 安装目录框起来的就是进程id&#xff0c;全部杀掉 这里就是我的安装目录&#xff0c;我的删除命令…

SwiftUI五视图动画和转场

代码下载 使用SwiftUI可以把视图状态的改变转成动画过程&#xff0c;SwiftUI会处理所有复杂的动画细节。在这篇中&#xff0c;会给跟踪用户徒步的图表视图添加动画&#xff0c;使用animation(_:)修改器给一个视图添加动画效果非常容易。 下载起步项目并跟着本篇教程一步步实践…

AI 写高考作文丨10 款大模型 “交卷”,实力水平如何?

前言 在科技日新月异的今天&#xff0c;人工智能&#xff08;AI&#xff09;已不再是遥不可及的未来科技&#xff0c;而是逐渐融入我们日常生活的实用工具。从智能语音助手到自动驾驶汽车&#xff0c;从智能家居系统到精准医疗诊断&#xff0c;AI技术正以其强大的计算能力和数…

Rust基础学习-Rust宏

Rust中的宏是生成另一段代码的一段代码。可以根据输入生成代码&#xff0c;简化重复模式&#xff0c;使得代码更加简洁。比如我们一直在用的println!,vec!,panic!都是宏。 创建宏 可以使用macro_rules!创建一个宏&#xff1a; macro_rules! macro_name {(...) > {...} }这…

c#与汇川plc通信 使用官网API库

前言 上位机开发中有时会要求与PLC进行通信&#xff0c;汇川官网也有好用的API库方便大家使用。记录一下开发过程。 1.下载资料 汇川官网地址&#xff1a;汇川技术 - 推进工业文明 共创美好生活 打开后选择&#xff1a;服务与支持-》资料下载-》 资料下载 这里可以直接搜索&am…

C++学习插曲:“name“的初始化操作由“case“标签跳过

问题 "name"的初始化操作由"case"标签跳过 问题代码 case 3: // 3、删除联系人string name;cout << "请输入删除联系人姓名&#xff1a;" << endl;cin >> name;if (isExistPerson(&abs, name) -1){cout << "…

【刷题篇】分治-归并排序

文章目录 1、排序数组2、交易逆序对的总数3、计算右侧小于当前元素的个数4、翻转对 1、排序数组 给你一个整数数组 nums&#xff0c;请你将该数组升序排列。 class Solution { public:vector<int> tmp;void mergeSort(vector<int>& nums,int left,int right){…

cocos creator3.7版本拖拽事件处理

前言&#xff1a;网上能找到的资料都太落后了&#xff0c;导致哥们用AI去写&#xff0c;全是瞎B写&#xff0c;版本都不对。贴点实际有用的。别老捣鼓你那破convertToNodeSpaceAR或者convertToNodeSpace了。 核心代码 touch.getDeltaX() touch.getDeltaY() 在cocoscreator3…

python-自幂数判断

[题目描述]&#xff1a; 自幂数是指&#xff0c;一个N 位数&#xff0c;满足各位数字N 次方之和是本身。例如&#xff0c;153153 是 33 位数&#xff0c;其每位数的 33 次方之和&#xff0c;135333153135333153&#xff0c;因此 153153 是自幂数&#xff1b;16341634 是 44 位数…

简单快速设置Windows和Ubuntu双系统双引导

一、参考资料 Windows和Ubuntu双系统安装教程 二、设置引导 1. 安装EasyBCD 下载并安装 EasyBCD 2. 设置Windows引导 3. 设置Ubuntu引导 4. 启动系统 遇到这种情况&#xff0c;直接Enter回车。 三、修复引导 如果引导区损坏&#xff0c;导致无法进入系统&#xff0c;可以…

FuTalk设计周刊-Vol.041

&#x1f525;AI漫谈 热点捕手 1、国产GPTs来了&#xff0c;基于智谱第4代大模型 全自研第四代基座大模型GLM-4&#xff0c;且所有更新迭代的能力全量上线。GLM-4性能相比GLM-3提升60%&#xff0c;逼近GPT-4&#xff08;11月6日最新版本效果&#xff09;。而同时推出的GLM-4-…

【漏洞复现】多客圈子论坛系统 httpGet 任意文件读取漏洞

0x01 产品简介 多客圈子论坛系统是一种面向特定人群或特定话题的社交网络&#xff0c;它提供了用户之间交流、分享、讨论的平台。在这个系统中&#xff0c;用户可以创建、加入不同的圈子&#xff0c;圈子可以是基于兴趣、地域、职业等不同主题的。用户可以在圈子中发帖、评论、…

算法分析与设计期末考试复习(更新ing)

重点内容&#xff1a; 绪论&#xff1a; 简单的递推方程求解 1.19(1)(2) 、 教材例题 多个函数按照阶的大小排序 1.18 分治法&#xff1a; 分治法解决芯片测试问题 计算a^n的复杂度为logn的算法&#xff08;快速幂&#xff09; 分治法解决平面最近点对问…

让 AI 写高考作文丨10 款大模型 “交卷”,实力水平如何?

文章部分素材来源 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09; 前言 在科技日新月异的今天&#xff0c;人工智能&#xff08;AI&#xff09;已不再是遥不可及的未来科技&#xff0c;而是逐渐融入我们日常生活的实用工具。从智能语音助手到自动驾驶汽车&#xff0c…

炫技来了!使用SDR设备成功抓到蓝牙air packet, 并且wireshark实时解析, 没错就是蓝牙空口抓包器

本文章主要介绍是用ZYNQ7020AD9361Gnu radio是搭建一个蓝牙抓包器的文章。 由于之前一直做蓝牙Host&#xff0c;对controller觉得是一个比较虚无缥缈的东西&#xff0c;得不到的总是在骚动&#xff0c;所以最近用我用吃灰了2年的SDR(Software Defined Radio&#xff09;设备研…

C语言scanf( ) 函数、fprintf( ) 函数与 scanf( ) 函数和printf( ) 函数有什么不同?

一、问题 fscanf( ) 函数、fprintf( ) 函数与 printf( ) 函数、scanf( ) 函数的作⽤相似&#xff0c;都是格式化读写函 数&#xff0c;那么这两个读写函数有什么不同呢&#xff1f; 二、解答 两者的区别就在于前⾯的字符“f”&#xff0c;即 fscanfQ函数和 fprintfD函数的读写…