RabbitMQ的死信队列和延迟队列

文章目录

    • 死信队列
    • 如何配置死信队列
      • 死信队列的应用场景
      • Spring Boot实现RabbitMQ的死信队列
    • 延迟队列
      • 方案优劣:
      • 延迟队列的实现有两种方式:

死信队列

1)“死信”是RabbitMQ中的一种消息机制。
2)消息变成死信,可能是由于以下的原因
● 消息被拒绝
● 消息过期
● 队列达到最大长度
3)死信队列
当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX(Dead-Letter-Exchange ) ,绑定 DLX 的队列就称之为死信队列。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

如何配置死信队列

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列
    注意:并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
    有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
    具体因为队列消息过期而被投递到死信队列的流程:
    在这里插入图片描述

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
死信消息的生命周期:
1)业务消息被投入业务队列
2)消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
3)被nck或reject的消息由RabbitMQ投递到死信交换机中
4)死信交换机将消息投入相应的死信队列
5)死信队列的消费者消费死信消息
死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费

死信队列的应用场景

一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。
通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。

Spring Boot实现RabbitMQ的死信队列

当您在使用Spring Boot实现RabbitMQ的死信队列时,您需要完成以下步骤:

  1. 添加Maven依赖
    确保您的pom.xml文件中包含以下Maven依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

这将为您的应用程序提供RabbitMQ的基本支持。
2. 配置RabbitMQ连接信息
在application.properties或application.yml文件中添加RabbitMQ的连接信息,包括主机地址、用户名、密码等。
3. 创建RabbitMQConfig类
创建一个配置类,用于定义交换机、队列以及它们之间的绑定关系。在这个类中,您需要定义普通队列、死信队列、死信交换机,并将它们进行绑定。
4. 创建消费者类
创建一个消费者类,使用@RabbitListener注解标记需要监听的死信队列,并在方法上使用@RabbitHandler注解来处理接收到的死信消息。
5. 发送消息到普通队列
在需要发送消息的地方,使用RabbitTemplate发送消息到普通队列中。当消息因为过期或被拒绝接收时,会被标记为死信消息,并根据参数设置转发到死信交换机中,然后路由到死信队列中。

下面是一个简单的示例代码,演示了如何在Spring Boot中实现RabbitMQ的死信队列,并消费死信队列的消息:

// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {@Beanpublic Queue myQueue() {return QueueBuilder.durable("my_queue").withArgument("x-dead-letter-exchange", "dlx_exchange").withArgument("x-dead-letter-routing-key", "dlq_queue").build();}@Beanpublic Queue dlqQueue() {return QueueBuilder.durable("dlq_queue").build();}@Beanpublic Exchange dlxExchange() {return ExchangeBuilder.directExchange("dlx_exchange").durable(true).build();}@Beanpublic Binding binding(Queue myQueue, Exchange dlxExchange) {return BindingBuilder.bind(myQueue).to(dlxExchange).with("my_queue").noargs();}
}// DeadLetterQueueConsumer.java
@Component
@RabbitListener(queues = "dlq_queue")
public class DeadLetterQueueConsumer {@RabbitHandlerpublic void processDeadLetterMessage(String message) {System.out.println("Received message from dead letter queue: " + message);// 处理接收到的死信消息}
}// RabbitMQService.java
@Service
public class RabbitMQService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage() {rabbitTemplate.convertAndSend("my_queue", "Hello, RabbitMQ!");}
}

通过以上步骤,您就可以在Spring Boot项目中实现RabbitMQ的死信队列,并消费死信队列的消息。

延迟队列

延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列 来实现;具体如下流程图所示:

在这里插入图片描述

RabbitMQ 的基因中没有延时队列这回事,它不能直接指定一个队列类型为延时队列,然后去延时处理,但是经过上面两节的铺垫,我们可以将 TTL+DLX 相结合,这就能组成一个延时队列。
设想一个场景,下完订单之后 15 分钟未付款我们就要将订单关闭,这就是一个很经典的演示消费的场景,如果拿 RabbitMQ 来做,我们就需要结合 TTL+DLX 了。
先把订单消息设置好 15 分钟过期时间,然后过期后队列将消息转发给我们设置好的 DLX-Exchange,DLX-Exchange 再将分发给它绑定的队列,我们的消费者再消费这个队列中的消息,就做到了延时十五分钟消费。

RabbitMQ 有两个特性,一个是 Time-To-Live Extensions,另一个是 Dead Letter Exchanges。
Time-To-Live Extensions
RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后 “死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。
Dead Letter Exchanges
在 RabbitMQ 中,一共有三种消息的 “死亡” 形式:
● 消息被拒绝。通过调用 basic.reject 或者 basic.nack 并且设置的 requeue 参数为 false;
● 消息因为设置了TTL而过期;
● 队列达到最大长度。

DLX同一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当队列中有 DLX 消息时,RabbitMQ就会自动的将 DLX 消息重新发布到设置的 Exchange 中去,进而被路由到另一个队列,publish 可以监听这个队列中消息做相应的处理。
由上简介大家可以看出,RabbitMQ本身是不支持延迟队列的,只是他的特性让勤劳的 中国脱发群体 急中生智(为了完成任务)弄出了这么一套可用的方案。
可用的方案就是:

  1. 如果有事件需要延迟那么将该事件发送到MQ 队列中,为需要延迟的消息设置一个TTL;
  2. TTL到期后就会自动进入设置好的DLX,然后由DLX转发到配置好的实际消费队列;
  3. 消费该队列的延迟消息,处理事件。

方案优劣:

优点:
大品牌组件,用的放心。如果面临大数据量需求可以很容易的横向扩展,同时消息支持持久化,有问题可回滚。
缺点:

  1. 配置麻烦,额外增加一个死信交换机和一个死信队列的配置;
  2. RabbitMQ 是一个消息中间件,TTL 和 DLX 只是他的一个特性,将延迟队列绑定在一个功能软件的某一个特性上,可能会有风险。不要杠,当你们组不用 RabbitMQ 的时候迁移很痛苦;
  3. 消息队列具有先进先出的特点,如果第一个进入队列的消息 A 的延迟是10分钟,第二个进入队列的消息B 的延迟是5分钟,期望的是谁先到 TTL谁先出,但是事实是B已经到期了,而还要等到 A 的延迟10分钟结束A先出之后,B 才能出。所以在设计的时候需要考虑不同延迟的消息要放到不同的队列。另外该问题官方已经给出了插件来支持:插件地址。

延迟队列的实现有两种方式:

通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。
针对任务丢失的代价过大,高并发的场景
优点: 支持集群,分布式,高并发场景;
缺点: 引入额外的消息队列,增加项目的部署和维护的复杂度。
场景:为一个委托指定期限,委托到期后,委托关系终止,相关业务权限移交回原拥有者 这里采用的是RabbitMq的死信队列加TTL消息转化为延迟队列的方式(RabbitMq没有延时队列)

①声明一个队列设定其的死信队列  
@Configuration
public class MqConfig {public static final String GLOBAL_RABBIT_TEMPLATE = "rabbitTemplateGlobal";public static final String DLX_EXCHANGE_NAME = "dlxExchange";public static final String AUTH_EXCHANGE_NAME = "authExchange";public static final String DLX_QUEUE_NAME = "dlxQueue";public static final String AUTH_QUEUE_NAME = "authQueue";public static final String DLX_AUTH_QUEUE_NAME = "dlxAuthQueue";@Bean@Qualifier(GLOBAL_RABBIT_TEMPLATE)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Bean@Qualifier(AUTH_EXCHANGE_NAME)public Exchange authExchange() {return ExchangeBuilder.directExchange (AUTH_EXCHANGE_NAME).durable (true).build ();}/*** 死信交换机* @return*/@Bean@Qualifier(DLX_EXCHANGE_NAME)public Exchange dlxExchange() {return ExchangeBuilder.directExchange (DLX_EXCHANGE_NAME).durable (true).build ();}/*** 记录日志的死信队列* @return*/@Bean@Qualifier(DLX_QUEUE_NAME)public Queue dlxQueue() {// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)return QueueBuilder.durable (DLX_QUEUE_NAME).build ();}/*** 委托授权专用队列* @return*/@Bean@Qualifier(AUTH_QUEUE_NAME)public Queue authQueue() {return QueueBuilder.durable (AUTH_QUEUE_NAME).withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME).withArgument("x-dead-letter-routing-key", "dlx_auth").build ();}/*** 委托授权专用死信队列* @return*/@Bean@Qualifier(DLX_AUTH_QUEUE_NAME)public Queue dlxAuthQueue() {// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)return QueueBuilder.durable (DLX_AUTH_QUEUE_NAME).withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME).withArgument("x-dead-letter-routing-key", "dlx_key").build ();}@Beanpublic Binding bindDlxQueueExchange(@Qualifier(DLX_QUEUE_NAME) Queue dlxQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){return BindingBuilder.bind (dlxQueue).to (dlxExchange).with ("dlx_key").noargs ();}/*** 委托授权专用死信队列绑定关系* @param dlxAuthQueue* @param dlxExchange* @return*/@Beanpublic Binding bindDlxAuthQueueExchange(@Qualifier(DLX_AUTH_QUEUE_NAME) Queue dlxAuthQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){return BindingBuilder.bind (dlxAuthQueue).to (dlxExchange).with ("dlx_auth").noargs ();}/*** 委托授权专用队列绑定关系* @param authQueue* @param authExchange* @return*/@Beanpublic Binding bindAuthQueueExchange(@Qualifier(AUTH_QUEUE_NAME) Queue authQueue, @Qualifier(AUTH_EXCHANGE_NAME) Exchange authExchange){return BindingBuilder.bind (authQueue).to (authExchange).with ("auth").noargs ();}}②发送含过期时间的消息  向授权交换机,发送路由为"auth"的消息(指定了业务所需的超时时间) =》发向MqConfig.AUTH_QUEUE_NAME 队列  
rabbitTemplate.convertAndSend(MqConfig.AUTH_EXCHANGE_NAME, "auth", "类型:END,信息:{id:1,fromUserId:111,toUserId:222,beginData:20201204,endData:20211104}", message -> {/*** MessagePostProcessor:消息后置处理* 为消息设置属性,然后返回消息,相当于包装消息的类*///业务逻辑:过期时间=xxxxString ttl = "5000";//设置消息的过期时间message.getMessageProperties ().setExpiration (ttl);return message;});
复制代码
③超时后队列MqConfig.AUTH_QUEUE_NAME会将消息转发至其配置的死信路由"dlx_auth",监听该死信队列即可消费定时的消息/*** 授权定时处理* @param channel* @param message*/@RabbitListener(queues = MqConfig.DLX_AUTH_QUEUE_NAME)public void dlxAuthQ(Channel channel, Message message) throws IOException {System.out.println ("\n死信原因:" + message.getMessageProperties ().getHeaders ().get ("x-first-death-reason"));//1.判断消息类型:1.BEGIN 2.ENDtry {//2.1 类型为授权到期(END)//2.1.1 修改报件办理人//2.1.2 修改授权状态为0(失效)//2.2 类型为授权开启(BEGIN)//2.2.1 修改授权状态为1(开启)System.out.println (new String(message.getBody (), Charset.forName ("utf8")));channel.basicAck (message.getMessageProperties ().getDeliveryTag (),  false);System.out.println ("已处理,授权相关信息修改成功");} catch (Exception e) {//拒签消息channel.basicNack (message.getMessageProperties ().getDeliveryTag (), false, false);System.out.println ("授权相关信息处理失败, 进入死信队列记录日志");}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/700044.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

绿盾限制终端网络访问权限会恢复后,别的网站访问正常就是无法访问钉钉网站和下载东西

环境&#xff1a; Win10 专业版 钉钉7.5.5 绿盾7.0 问题描述&#xff1a; 绿盾限制终端网络访问权限会恢复后&#xff0c;别的网站访问正常就是无法访问钉钉网站和下载东西 解决方案&#xff1a; 排查方法 1.重置浏览器或者更换浏览器测试&#xff08;未解决&#xff09…

【Java】输入输出流(实验八)

目录 一、实验目的 二、实验内容 三、实验小结 一、实验目的 1、掌握java I/O的基本原理。 2、掌握标准输入输出流和Scanner类的基本使用方法。 3、掌握FileInputStream、FileOutStream、FileReader、FileWriter、BufferedReader 、BufferedWriter类的常用方法。 二、实验…

VR系统的开发流程

虚拟现实&#xff08;Virtual Reality&#xff0c;VR&#xff09;系统是一种通过计算机技术模拟出的具有三维视角和交互性的虚拟环境&#xff0c;使用户能够沉浸在其中并与虚拟环境进行交互。这种技术通常利用头戴式显示器和手柄等设备&#xff0c;使用户能够感觉到仿佛身临其境…

【kubernetes】kubeadm部署k8s集群(3主3从+keepalived/nginx负载均衡高可用)

目录 一、完成系统初始化 步骤一&#xff1a;常规环境初始化 步骤二&#xff1a;内核版本升级以及内核限制文件参数修改 步骤三&#xff1a;提前准备好负载均衡器和keepalived(接着之前的二进制部署修改的) 二、所有节点部署docker&#xff0c;以及指定版本的kubeadm 步骤…

Mysql系列之命令行登录、连接工具登录、数据库表常用命令

登录与常用命令 连接工具登录命令行登录数据库1、查看数据库2、指定数据库3、查看当前数据库4、建库语句 数据表1、查看数据表2、查看表结构信息3、查看建表语句4、建表语句 连接工具登录 首先下载mysql连接工具&#xff0c;解压后直接打开软件&#xff0c;按以下步骤操作&…

CSS实现半边边框(只有边框的部分可见)

CSS实现半边边框&#xff08;只有边框的部分可见&#xff09; <div class"part box"><h1>内容</h1><!-- 绘出下面两个对角边框--><div class"part-footer"></div> </div>主要代码 .box {width: 100px;height:…

RabbitMq:什么是RabbitMq? ①

一、RabbitMq定位 RabbitMq是一个基于消息订阅发布的一款消息中间件。 二、技术原理 核心概念 server&#xff1a;又称broker&#xff0c;接受客户端连接&#xff0c;实现AMQP实体服务。缓存代理&#xff0c;Kafka集群中的一台或多台服务器统称broker.connection&#xff1a;…

工作中常见问题总结

工作中常见错误清单 1、springboot实现无数据库启动 问题 springboot往往是作为b/s系统的server端的架子来使用&#xff0c;但是有些时候&#xff0c;是作为静默的server&#xff0c;并没有界面和数据库&#xff0c;但是springboot默认是链接数据库的&#xff0c;如何解决这个…

2024年初中生古诗文大会备考:选择题往年真题练习和解析

今天我们继续来做初中古诗文大会的选择题真题&#xff0c;让大家了解初中生古诗文大会的考察内容&#xff0c;并且提供了我独家的题目解析和答案&#xff0c;供孩子们参考。 Tips&#xff1a;通过对古诗文大会题目的解析发现&#xff0c;古诗文大会的许多题目都来自于中考、高…

Druid无法登录监控页面

问题表现&#xff1a;在配置和依赖都正确的情况下&#xff0c;无法通过配置的用户名密码登录Druid的监控页面 检查配置发现 配置的用户名和密码和请求中参数是一致的&#x1f914; Debug发现 ResourceServlet 是Druid的登录实现&#xff0c; 且调试发现usernameParam是null&am…

python程序设计基础:字符串与正则表达式

第四章&#xff1a;字符串与正则表达式 4.1字符串 最早的字符串编码是美国标准信息交换码ASCII&#xff0c;仅对10个数字、26个大写英文字母、26个小写英文字母及一些其他符号进行了编码。ASCII码采用1个字节来对字符进行编码&#xff0c;最多只能表示256个符号。 随着信息技…

【数据结构与算法初学者指南】【冲击蓝桥篇】String与StringBuilder的区别和用法

&#x1f389;&#x1f389;欢迎光临&#x1f389;&#x1f389; &#x1f3c5;我是苏泽&#xff0c;一位对技术充满热情的探索者和分享者。&#x1f680;&#x1f680; &#x1f31f;特别推荐给大家我的最新专栏《数据结构与算法&#xff1a;初学者入门指南》&#x1f4d8;&am…

docker打包当前dinky项目

以下是我的打包过程&#xff0c;大家可以借鉴。我也是第一次慢慢摸索&#xff0c;打包一个公共项目&#xff0c;自己上传。 如果嫌麻烦&#xff0c;可以直接使用我的镜像&#xff0c;直接跳到拉取镜像&#xff01; <可以在任何地方的服务器进行拉取> docker打包当前din…

LLMs之Gemma:Gemma(Google开发的新一代领先的开源模型)的简介、安装、使用方法之详细攻略

LLMs之Gemma&#xff1a;Gemma(Google开发的新一代领先的开源模型)的简介、安装、使用方法之详细攻略 导读&#xff1a;此文章介绍了Google推出的新一代开源模型Gemma&#xff0c;旨在帮助研发人员负责任地开发AI。 背景&#xff1a; >> Google长期致力于为开发者和研究人…

Vue(学习笔记)

什么是Vue Vue是一套构建用户界面的渐进式框架 构建用户界面&#xff1a; 基于数据渲染出用户可以看到的界面 渐进式&#xff1a; 所谓渐进式就是循序渐进&#xff0c;不一定非得把Vue中的所有API都学完才能开发Vue&#xff0c;可以学一点开发一点 创建Vue实例 比如就上面…

猫头虎分享已解决Bug || ImportError: Keras requires TensorFlow 2.2 or higher

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

openai sora 只能根据文本生成视频?不,TA 是通用物理世界模拟器

视频生成模型作为世界模拟器 我们探索了在视频数据上进行大规模生成模型的训练。 具体来说&#xff0c;我们联合在可变持续时间、分辨率和长宽比的视频和图像上训练文本条件扩散模型。 我们利用了一个在视频和图像潜在编码的时空补丁上操作的变压器架构。 我们最大的模型So…

C++的string容器->基本概念、构造函数、赋值操作、字符串拼接、查找和替换、字符串比较、字符存取、插入和删除、子串

#include<iostream> using namespace std; #include <string> //string的构造函数 /* -string(); //创建一个空的字符串 例如: string str; -string(const char* s); //使用字符串s初始化 -string(const string& str); //使…

Leetcode 209.长度最小的子数组

给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;并返回其长度。如果不存在符合条件的子数组&#xff0c;返回 0 。 示例 1&#xff1a; 输入&…

Linux基础命令-文件管理

目录结构 1.存放命令相关的目录 /bin #普通用户使用的命令 /bin/ls, /bin/date /sbin #管理员使用的命令 /sbin/service2.存放用户相关数据的家目录 /home #普通用户的家目录, 默认为/home/username /root #超级管理员root的家目录, 普通用户无权操作3.系统文件目录 /usr #相当…