RabbitMQ 之 死信队列

目录

​编辑一、死信的概念

二、死信的来源

三、死信实战

1、代码架构图

2、消息 TTL 过期

(1)消费者

(2)生产者

(3)结果展示​编辑

 3、队列达到最大长度

(1)消费者

(2)生产者

(3)结果展示

4、消息被拒

(1)消费者

(2)生产者

(3)结果展示


一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效


二、死信的来源

1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.


三、死信实战

1、代码架构图

生产者正常情况下走的是普通的交换机,这个交换机的类型是 direct ,它和普通队列之间的关系是一个叫 "zhangsan" 的路由 key, 正常情况下会被 C1 消费。

但是发生了上面所说的三种情况中的一种,成为了死信,然后被转换到死信交换机中,这个死信交换机也是 direct 类型,它们之间的 routingKey 是 "lisi",然后就进入了死信队列,死信队列由  C2 消费。


2、消息 TTL 过期

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}

// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}
}

(3)结果展示


 3、队列达到最大长度

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列的长度的限制arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
// 死信队列 实战
// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间/*AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();*/for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
}

(3)结果展示


4、消息被拒

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列的长度的限制// arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer01 接收的消息是: " + msg + ": 此消息是被 C1 拒绝的");// 拒绝,且不放囧普通队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));   channel.basicAck(message.getEnvelope().getDeliveryTag(),false);            }};CancelCallback cancelCallback = consumerTag->{};// 开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);}
}
// 死信队列 实战
// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间/*AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();*/for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
}

(3)结果展示

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/842649.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3DF Zephyr v7 解锁版安装教程 (照片转三维模型软件)

前言 3DF Zephyr是一款照片转三维模型软件&#xff0c;可以导出许多常见的3D格式&#xff0c;甚至无需外部工具即可生成无损视频。此外&#xff0c;可以生成真正的正射影像&#xff0c;数字高程模型&#xff08;DTM&#xff09;&#xff0c;甚至可以计算面积&#xff0c;体积&…

软件技术架构全面详解

软件架构全面详解 软件架构 这个与建筑设计架构类似,建筑设计架构师负责设计建筑物的整体结构、布局和功能分配。 而软件架构师,负责设计软件系统的整体组织结构、模块划分、和功能分配。 两者都需要考虑到业务功能、性能、可扩展性、安全性、以及用户体验等方面。 软件架…

【深度学习】ultralytics, yolo seg,实例分割图绘制,核对yolo seg 的txt标记对不对

这段代码的作用是从指定路径读取图像和标签文件&#xff0c;然后在图像上绘制分割区域和相关点&#xff0c;并保存最终的图像。以下是每个函数的具体作用及其解释&#xff1a; read_labels(label_path): 读取指定路径的标签文件。标签文件的每一行表示一个物体的分割信息&#…

硬盘的分区

目录 概念 硬盘的分区 实操 创建分区 fdisk&#xff08;<2T&#xff09; 创建文件系统 挂载 自动挂载&#xff08;永久挂载&#xff09; gpt区分 swap 交换分区 如何删除已挂载的分区 概念 硬盘&#xff1a;计算机的存储设备。&#xff08;如无特殊说明&#xff0…

sklearn线性回归--岭回归

sklearn线性回归--岭回归 岭回归也是一种用于回归的线性模型&#xff0c;因此它的预测公式与普通最小二乘法相同。但在岭回归中&#xff0c;对系数&#xff08;w&#xff09;的选择不仅要在训练数据上得到好的预测结果&#xff0c;而且还要拟合附加约束&#xff0c;使系数尽量小…

基于springboot+vue的班级综合测评管理系统

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

git中忽略文件的配置

git中忽略文件的配置 一、在项目根目录下创建.gitignore文件二、配置规则如果在配置之前已经提交过文件了&#xff0c;要删除提交过的&#xff0c;如何修改&#xff0c;参考下面的 一、在项目根目录下创建.gitignore文件 .DS_Store node_modules/ /dist# local env files .env…

找一个区间内两个数最大公约数的最大值(24年gdcpc省赛G题)

题目大意就是t组数据,每组一个左右边界l,r,问区间内的两个是xi,yi是区间内max(gcd(xi,yi)),数据范围是1e12. 答案就是找到第一个a*x<b*x(a<b),他们两在l到r之间且x最大,那么x就是答案,可以知道,要使两个数在区间内,那么他们之间的差值要小于min(R-L,[R/2]),[]表示向下取…

【安装】VMware虚拟机安装windows操作系统,VM的相关操作

目录 引出报错&#xff1a;press any key to boot form cd激活调整显示 在VMware上新建虚拟机&#xff0c;并安装Windows1、新建虚拟机2、装载 ISO 镜像3、安装Windows server 20164、开机初始化 虚拟机操作1、虚拟机基本操作2、虚拟机快照3、虚拟机克隆4、VMware Tools 总结 引…

消费增值:国家支持的消费新零售模型

在当下的消费时代&#xff0c;一个全新的概念——消费增值&#xff0c;正逐渐走进大众视野。它不仅仅是一种消费模式&#xff0c;更是一种全新的财富增长途径。那么&#xff0c;消费增值究竟是什么&#xff1f; 首先&#xff0c;消费增值的本质在于将消费行为与投资行为相结合…

无人机技术:倾转旋翼飞行器的关键技术详解

一、总体设计 倾转旋翼飞行器作为一种独特的垂直起降与水平巡航的航空器&#xff0c;其总体设计是关键技术之一。总体设计涵盖了飞行器的整体布局、重量分配、气动性能、机械结构设计等多个方面。在总体设计中&#xff0c;需要充分考虑飞行器的垂直起降、悬停、过渡飞行和水平…

二分例题(D.负重越野,I.路径规划)

这两天的训练赛都有一道二分的题&#xff0c;但是都没往二分上面想&#xff0c;同样不知道怎么二分。 D. Fast and Fat 思路 二分的关键也就是check函数怎么写了&#xff0c;求队伍最大速度&#xff0c;可以分为速度>mid和<mid两部分&#xff0c;再判断&#xff0c;能不…

流量分析入门

什么是流量分析 通过捕捉网络中流动的数据包&#xff0c;查看里面的数据和协议&#xff0c;流量分析和各种数据的统计来发现网络运行中的问题&#xff0c;在ctf中一般是一个包含流量数据的 PCAP 文件。 [陇剑杯 2021]签到 1.题目问我们正在进行的事什么协议的攻击 2.打开wire…

【vue与iframe通讯】

vue 与 iframe 通讯 发送数据vue 向 iframe 发送数据iframe 向 vue 发送数据接收信息( vue & iframe 通用) 实现相互通讯通讯流程图实现代码vue 页面iframe页面iframe 内部重定向访问地址,更新 vue 路由 访问跨域代码下载 前言&#xff1a;vue嵌套iframe实现步骤 发送数据…

基于Pytorch框架的深度学习ShufflenetV2神经网络十七种猴子动物识别分类系统源码

第一步&#xff1a;准备数据 17种猴子动物数据&#xff1a; self.class_indict ["白头卷尾猴", "弥猴", "山魈", "松鼠猴", "叶猴", "银色绒猴", "印度乌叶猴", "疣猴", "侏绒"…

做抖音小店不懂这四个“重点”!那就别怪你的店铺,做不长久!

我相信大家做抖音小店&#xff0c;都去抖音刷过知识点&#xff0c;也去浏览器学习过技巧 但在这里&#xff0c;我给大家泼盆冷水 方法再多&#xff01;这四点不搞明白&#xff0c;那你的店铺出几天单&#xff0c;也就再也做不起来了 哪四点&#xff1f;请认真的看下去&#…

django-celery-beat自动调度异步任务

Celery是一个简单、灵活且可靠的分布式系统&#xff0c;专门用于处理大量消息的实时任务调度。它支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。Celery不仅支持异步任务&#xff08;如发送邮件、文件上传、图像处理等耗时操作&#xff09;&#xff0c;还支持…

2024.05.27学习记录

1、面经复习&#xff1a; 实际工作经验章节 2、代码随想录刷题&#xff1a;动态规划剩下部分和单调栈 3、rosebush 组件库完成Input 和 AutoComplete部分内容

2024甘肃省三支一扶报名流程详细图解

预计报名时间&#xff1a;2024年5月27日9:00至5月31日18&#xff1a;00 2024甘肃省三支一扶报名流程 登录甘肃人力人力资源考试中心&#xff0c;选择网上报名 进入账户登录&#xff0c;首次登录同学请先注册账号。 注册账号&#xff0c;认真填写&#xff0c;仔细核对信息。…

惠海 H6901B升压恒流3.7V 7.4V 12V 24V 30V 36V 48V 60V 80V 100V调光无频闪细腻顺滑

H6901B是一款升压型LED恒流驱动芯片&#xff0c;具有良好稳定性的特点。H6901B的主要特点包括宽输入电压范围&#xff08;2.7V-100V&#xff09;、高工作频率&#xff08;1MHz&#xff09;以及多种保护功能&#xff08;如芯片供电欠压保护、过温保护、软启动等&#xff09;。此…