RabbitMQ 之 死信队列

目录

​编辑一、死信的概念

二、死信的来源

三、死信实战

1、代码架构图

2、消息 TTL 过期

(1)消费者

(2)生产者

(3)结果展示​编辑

 3、队列达到最大长度

(1)消费者

(2)生产者

(3)结果展示

4、消息被拒

(1)消费者

(2)生产者

(3)结果展示


一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效


二、死信的来源

1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.


三、死信实战

1、代码架构图

生产者正常情况下走的是普通的交换机,这个交换机的类型是 direct ,它和普通队列之间的关系是一个叫 "zhangsan" 的路由 key, 正常情况下会被 C1 消费。

但是发生了上面所说的三种情况中的一种,成为了死信,然后被转换到死信交换机中,这个死信交换机也是 direct 类型,它们之间的 routingKey 是 "lisi",然后就进入了死信队列,死信队列由  C2 消费。


2、消息 TTL 过期

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}

// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}
}

(3)结果展示


 3、队列达到最大长度

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列的长度的限制arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
// 死信队列 实战
// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间/*AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();*/for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
}

(3)结果展示


4、消息被拒

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列的长度的限制// arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer01 接收的消息是: " + msg + ": 此消息是被 C1 拒绝的");// 拒绝,且不放囧普通队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));   channel.basicAck(message.getEnvelope().getDeliveryTag(),false);            }};CancelCallback cancelCallback = consumerTag->{};// 开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);}
}
// 死信队列 实战
// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间/*AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();*/for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
}

(3)结果展示

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/842649.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI智能体研发之路-模型篇(五):pytorch vs tensorflow框架DNN网络结构源码级对比

博客导读&#xff1a; 《AI—工程篇》 AI智能体研发之路-工程篇&#xff08;一&#xff09;&#xff1a;Docker助力AI智能体开发提效 AI智能体研发之路-工程篇&#xff08;二&#xff09;&#xff1a;Dify智能体开发平台一键部署 AI智能体研发之路-工程篇&#xff08;三&am…

3DF Zephyr v7 解锁版安装教程 (照片转三维模型软件)

前言 3DF Zephyr是一款照片转三维模型软件&#xff0c;可以导出许多常见的3D格式&#xff0c;甚至无需外部工具即可生成无损视频。此外&#xff0c;可以生成真正的正射影像&#xff0c;数字高程模型&#xff08;DTM&#xff09;&#xff0c;甚至可以计算面积&#xff0c;体积&…

软件技术架构全面详解

软件架构全面详解 软件架构 这个与建筑设计架构类似,建筑设计架构师负责设计建筑物的整体结构、布局和功能分配。 而软件架构师,负责设计软件系统的整体组织结构、模块划分、和功能分配。 两者都需要考虑到业务功能、性能、可扩展性、安全性、以及用户体验等方面。 软件架…

TypeScript 函数

函数基础 在 TypeScript 中&#xff0c;函数的定义方式与 JavaScript 类似&#xff0c;但是可以添加类型注解来增强函数的类型安全性。例如&#xff1a; function greet(name: string): string {return Hello, ${name}!; }上面的例子定义了一个名为 greet 的函数&#xff0c;…

【深度学习】ultralytics, yolo seg,实例分割图绘制,核对yolo seg 的txt标记对不对

这段代码的作用是从指定路径读取图像和标签文件&#xff0c;然后在图像上绘制分割区域和相关点&#xff0c;并保存最终的图像。以下是每个函数的具体作用及其解释&#xff1a; read_labels(label_path): 读取指定路径的标签文件。标签文件的每一行表示一个物体的分割信息&#…

【python深度学习】——torch.expand()广播机制|torch.norm()

【python深度学习】——torch.expand广播机制|torch.norm 1. torch.expand()与广播机制2. torch.norm()计算范数3. 结合使用的完整示例代码 1. torch.expand()与广播机制 在处理3D点云时, 有时需要对两帧点云进行逐点的三维坐标相加减、做点积等运算, 但是读入的PCD文件中,点云…

硬盘的分区

目录 概念 硬盘的分区 实操 创建分区 fdisk&#xff08;<2T&#xff09; 创建文件系统 挂载 自动挂载&#xff08;永久挂载&#xff09; gpt区分 swap 交换分区 如何删除已挂载的分区 概念 硬盘&#xff1a;计算机的存储设备。&#xff08;如无特殊说明&#xff0…

sklearn线性回归--岭回归

sklearn线性回归--岭回归 岭回归也是一种用于回归的线性模型&#xff0c;因此它的预测公式与普通最小二乘法相同。但在岭回归中&#xff0c;对系数&#xff08;w&#xff09;的选择不仅要在训练数据上得到好的预测结果&#xff0c;而且还要拟合附加约束&#xff0c;使系数尽量小…

基于springboot+vue的班级综合测评管理系统

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

操作系统 实验18 批处理操作接口8:函数

1、建立文件func2.sh&#xff0c;输出文件内容各行及行数 脚本&#xff1a; #!/bin/bash echo -n "请输入一个文件名及路径&#xff1a;" read FILE statisfile(){local i0while read linedo let iecho "$i $line"done < $FILEecho "$FILE有$i行…

高德地图之获取经纬度并且根据获取经纬度渲染到路线规划

话不多说&#xff0c;直接给兄弟们上干货&#xff0c;修改了好多次&#xff0c;之前都是获取到路线但是无法删除&#xff0c;这次是根据点击的单双判断开始和结束 <!DOCTYPE html> <html><head><meta charset"utf-8"><meta http-equiv&q…

解读 MySQL 容器信息:`docker inspect` 字段详解

前言 在使用 Docker 时&#xff0c;docker inspect 命令是一个非常有用的工具&#xff0c;它能够返回容器或镜像的详细配置信息和状态。以下是对 docker inspect mysql 命令输出的字段的详细解释&#xff0c;这些信息可以帮助您更好地了解容器的内部工作机制。 容器基础信息 …

回顾Java中的算术运算符、关系运算符和逻辑运算符,并解释下和的区别

一、在Java中&#xff0c;存在多种类型的运算符&#xff0c;包括算术运算符、关系运算符和逻辑运算符。以下是这些运算符的列表和简要说明&#xff1a; 算术运算符 加法运算符 (): 用于将两个操作数相加。减法运算符 (-): 用于从一个操作数中减去另一个操作数。乘法运算符 (*…

【前端】CSS弹性布局 display-flex

一、display-flex弹性布局 Flex&#xff0c;用来为盒装模型提供最大的灵活性。任何一个容器都可以指定为Flex布局。 在父DIV中使用 display: -webkit-flex; /*在webkit内核的浏览器上使用要加前缀*/ display: flex; //将对象作为弹性伸缩盒显示// 沿水平主轴让元素从左向右排…

git中忽略文件的配置

git中忽略文件的配置 一、在项目根目录下创建.gitignore文件二、配置规则如果在配置之前已经提交过文件了&#xff0c;要删除提交过的&#xff0c;如何修改&#xff0c;参考下面的 一、在项目根目录下创建.gitignore文件 .DS_Store node_modules/ /dist# local env files .env…

找一个区间内两个数最大公约数的最大值(24年gdcpc省赛G题)

题目大意就是t组数据,每组一个左右边界l,r,问区间内的两个是xi,yi是区间内max(gcd(xi,yi)),数据范围是1e12. 答案就是找到第一个a*x<b*x(a<b),他们两在l到r之间且x最大,那么x就是答案,可以知道,要使两个数在区间内,那么他们之间的差值要小于min(R-L,[R/2]),[]表示向下取…

【安装】VMware虚拟机安装windows操作系统,VM的相关操作

目录 引出报错&#xff1a;press any key to boot form cd激活调整显示 在VMware上新建虚拟机&#xff0c;并安装Windows1、新建虚拟机2、装载 ISO 镜像3、安装Windows server 20164、开机初始化 虚拟机操作1、虚拟机基本操作2、虚拟机快照3、虚拟机克隆4、VMware Tools 总结 引…

消费增值:国家支持的消费新零售模型

在当下的消费时代&#xff0c;一个全新的概念——消费增值&#xff0c;正逐渐走进大众视野。它不仅仅是一种消费模式&#xff0c;更是一种全新的财富增长途径。那么&#xff0c;消费增值究竟是什么&#xff1f; 首先&#xff0c;消费增值的本质在于将消费行为与投资行为相结合…

大模型日报2024-05-27

大模型日报 2024-05-27 大模型资讯 芝加哥大学研究AI在金融分析中的应用 摘要: 芝加哥大学的研究探索了大型语言模型&#xff08;LLMs&#xff09;在金融分析中的能力。GPT-4等大型语言模型在文本分析、解释和生成方面表现出色。 AI帮助揭示海洋和肠道中病毒的动态 摘要: 病毒在…

无人机技术:倾转旋翼飞行器的关键技术详解

一、总体设计 倾转旋翼飞行器作为一种独特的垂直起降与水平巡航的航空器&#xff0c;其总体设计是关键技术之一。总体设计涵盖了飞行器的整体布局、重量分配、气动性能、机械结构设计等多个方面。在总体设计中&#xff0c;需要充分考虑飞行器的垂直起降、悬停、过渡飞行和水平…