rabbitmq死信队列详解与使用

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

以上是个人的通俗解释,专业术语解释的比较正规点大家可以参考,主要想搞清楚这个概念,不同的消息中间件大概都有自身对于死信或者死信队列的处理方式,下面重点要说说rabbitmq的死信队列

对rabbitmq来说,产生死信的来源大致有如下几种:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.

  • 消息TTL过期

  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

  • 死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

  • 综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理,关于这一点,也是本篇要重点讲述的,下面将用代码演示一下死信的产生及路由,即上面提到的三种方式,网上可供参考的资料比较多,但大多不全面,下面提供比较完整的demo,将各种场景的产生和过程进行列举,

640?wx_fmt=png

方式1:消息超时进入死信队列

这是一种在实际生产中应用场景比较多的一种方式,比如我们熟知的订单业务场景,当用户购买商品产生了一个订单的时候,可以设置过期时间,如果在这段时间内,消息还没有被消费,将会被路由到死信队列,专业术语来讲,即消息的TTL,TTL过期了消息将进入死信队列,下面是一段演示代码,这里包括两部分,生产者和消费者。

producer代码,此处模拟生产者产生订单,推送到队列中,消息有效时间是10S,过了10S如果没有被消费将会被路由到死信队列

public static void main(String[] args) throws Exception{	final Channel channel = RabbitUtil.getChannel();	String orderExchangeName = "order_exchange";	String orderQueueName = "order_queue";	String orderRoutingKey = "order.#";	Map<String, Object> arguments = new HashMap<String, Object>(16);	//死信队列配置  ----------------	String dlxExchangeName = "dlx.exchange";	String dlxQueueName = "dlx.queue";	String dlxRoutingKey = "#";	// 为队列设置队列交换器	arguments.put("x-dead-letter-exchange",dlxExchangeName);	// 设置队列中的消息 10s 钟后过期	arguments.put("x-message-ttl", 10000);	//正常的队列绑定	channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);	channel.queueDeclare(orderQueueName, true, false, false, arguments);	channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);	String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 创建订单.";	// 创建死信交换器和队列	channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);	channel.queueDeclare(dlxQueueName, true, false, false, null);	channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);	channel.basicPublish(orderExchangeName, "order.save", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());	System.err.println("消息发送完成......");	}

consumer代码,消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了

public class Consumer {	//消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了	private static final String QUEUE_NAME = "dlx.queue";	public static void main(String[] args) throws Exception{	// 创建信道	final Channel channel = RabbitUtil.getChannel();	System.out.println("消费者启动 ..........");	com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){	@Override	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {	System.err.println("死信队列接收到消息:" + new String(body));	System.err.println("deliveryTag:" + envelope.getDeliveryTag());	channel.basicAck(envelope.getDeliveryTag(), false);	}	};	channel.basicConsume(QUEUE_NAME, consumer);	TimeUnit.SECONDS.sleep(10000000L);	}	}

然后我们分别运行两端的代码,这里提示一下,我们并没有提前在控制台去创建queue 和 exchange,这个在producer启动或者consumer启动的时候,如果没有创建过会自动创建以及建立queue和exchange的绑定关系,

启动producer,消息发送成功,同时可以通过控制台看到,exhange和相关的队列也帮我们创建了,要注意的是在dlx.queue中,有一个消息就绪,很明显,消息过了10S中没有任何消费者消费,就被路由到了死信队列dlx.queue中,

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

启动consumer,通过控制台打印结果,可以看到,由于消费端监听的是死信队列,已经从dlx.queue中成功获取到了这条信息,

640?wx_fmt=png

2、消息被拒绝,且requeue=false

没有细致研究过这个问题的可能会有点儿懵,其实就是在consumer端,当消费者要过滤某些消息的时候,那部分被过滤掉的消息如果不设置退回,即上一篇所讲的消息重回队列的话,这些消息就变成了死信,即在下面的代码中第三个参数设置成false即可,下面来看具体的代码,

有这样一个场景,一批消息中,当消费端从header中收到了num=0的消息将会被过滤掉,并且设置如上requeue=false,下面看具体的代码,

peoducer端代码,

/**	* 生产者	* 死信队列使用	*/	
public class Producer {	public static void main(String[] args) throws Exception{	Channel channel = RabbitUtil.getChannel();	String exchangeName = "test_ack_exchange";	String routingKey = "ack.save";	//通过在properties设置来标识消息的相关属性	for(int i=0;i<5;i++){	Map<String, Object> headers = new HashMap<String, Object>();	headers.put("num",i);	AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()	.deliveryMode(2)                    // 传送方式 2:持久化投递	.contentEncoding("UTF-8")           // 编码方式	//.expiration("10000")              // 过期时间	.headers(headers)                  //自定义属性	.build();	String message = "hello this is ack message ....."  + i;	System.out.println(message);	channel.basicPublish(exchangeName,routingKey,true,properties,message.getBytes());	}	}	}

consumer端代码,

public class Consumer {	public static void main(String[] args) throws Exception{	final Channel channel = RabbitUtil.getChannel();	String exchangeName = "test_ack_exchange";	String exchangeType="topic";	final String queueName = "test_ack_queue";	String routingKey = "ack.#";	//死信队列配置  ----------------	String deadExchangeName = "dead_exchange";	String deadQueueName = "dead_queue";	String deadRoutingKey = "#";	//死信队列配置  ----------------	//如果需要将死信消息路由	Map<String,Object> arguments = new HashMap<String, Object>();	arguments.put("x-dead-letter-exchange",deadExchangeName);	channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);	channel.queueDeclare(queueName,false,false,false,arguments);	channel.queueBind(queueName,exchangeName,routingKey);	//死信队列绑定配置  ----------------	channel.exchangeDeclare(deadExchangeName,exchangeType,true,false,false,null);	channel.queueDeclare(deadQueueName,true,false,false,null);	channel.queueBind(deadQueueName,deadExchangeName,deadRoutingKey);	//死信队列配置  ----------------	System.out.println("consumer启动 .....");	com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){	@Override	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {	try{	Thread.sleep(2000);	}catch (Exception e){	}	Integer num = (Integer)properties.getHeaders().get("num");	if(num==0){	//未被ack的消息,并且requeue=false。即nack的 消息不再被退回队列而成为死信队列	channel.basicNack(envelope.getDeliveryTag(),false,false);	String message = new String(body, "UTF-8");	System.out.println("consumer端的Nack消息是:" + message);	}else {	channel.basicAck(envelope.getDeliveryTag(),false);	String message = new String(body, "UTF-8");	System.out.println("consumer端的ack消息是:" + message);	}	}	};	//消息要能重回队列,需要设置autoAck的属性为false,即在回调函数中进行手动签收	channel.basicConsume(queueName,false,consumer);	}	
}

要关注的即下面的这处代码和第三个参数,

640?wx_fmt=png

启动生产者和消费者

640?wx_fmt=png

启动生产者,生产者成功发送5条消息

640?wx_fmt=png

再看消费端的控制台,这里num=0的这条消息由于设置了死信队列而不会重回原来的队列,在上一篇中,当参数设置成了true的时候,看到控制台一直会打印一条消息,

640?wx_fmt=png

同时,通过控制台也可以发现,在dead_queue中,有一条消息为就绪状态了,即死信消息,但这里并没有对这条消息做处理,目前一直存在队列里面,可以根据实际应用做后续的处理,

640?wx_fmt=png

3、队列达到最大长度

这个很好理解,比如我们设置某个队列的最大可承载消息的数量是100个,超出第100个的消息将会被路由到死信队列中,设置消息队列的最大数量也是实际生产中作为队列限流的一种常规手段,具有实际的业务意义,下面是代码演示,基本设置和上述的TTL类似,只是在参数中将TTL更换为如下配置,

生产者代码,这里我们设定order_queue这个队列的容量是5个,但是我们在程序中设置的x-max-length=3,那么按照这个猜想,将会有两个消息被路由到死信队列

public class Producer {	public static void main(String[] args) throws Exception{	final Channel channel = RabbitUtil.getChannel();	String orderExchangeName = "order_exchange";	String orderQueueName = "order_queue";	String orderRoutingKey = "order.#";	Map<String, Object> arguments = new HashMap<String, Object>(16);	//死信队列配置  ----------------	String dlxExchangeName = "dlx.exchange";	String dlxQueueName = "dlx.queue";	String dlxRoutingKey = "#";	// 为队列设置队列交换器	arguments.put("x-dead-letter-exchange",dlxExchangeName);	// 设置队列中的消息 10s 钟后过期	//arguments.put("x-message-ttl", 10000);	arguments.put("x-max-length",3);	//正常的队列绑定	channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);	channel.queueDeclare(orderQueueName, true, false, false, arguments);	channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);	String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 创建订单.";	// 创建死信交换器和队列	channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);	channel.queueDeclare(dlxQueueName, true, false, false, null);	channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);	for(int i=0;i<5;i++){	message = message + "========> " + i ;	System.out.println("发送的消息是:" + message);	channel.basicPublish(orderExchangeName, "order.save",null, message.getBytes());	}	System.err.println("消息发送完成......");	}	}

消费者代码

public class Consumer {	private static final String QUEUE_NAME = "order_queue";	public static void main(String[] args) throws Exception{	// 创建信道	final Channel channel = RabbitUtil.getChannel();	// 消费端消息限流。	// 设置客户端最多接收未被ack的消息个数, 只有消息 手动签收  此参数才会生效。	//channel.basicQos(1);	System.out.println("消费者启动 ..........");	com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){	@Override	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {	System.err.println("死信队列接收到消息:" + new String(body));	System.err.println("deliveryTag:" + envelope.getDeliveryTag());	channel.basicAck(envelope.getDeliveryTag(),false);	}	};	channel.basicConsume(QUEUE_NAME,false, consumer);	//TimeUnit.SECONDS.sleep(10000000L);	}	}

启动生产者,5条消息发送完毕

640?wx_fmt=png

再启动消费端,通过控制台可以看到,消费端只从order_queue中消费了3条消息,还剩2条消息去哪里了呢?

640?wx_fmt=png

我们再回到控制台观察一下,发现在dlx.queue这个死信队列中有两条就绪的消息,即剩下的2条消息被路由到了死信队列了

640?wx_fmt=png

以上便是关于死信队列常见的3种方式的处理程序和逻辑

640?wx_fmt=jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/313005.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

归并排序算法 C++

感谢博客https://blog.csdn.net/a130737/article/details/38228369 归并排序的时间复杂度为o(nlgn)&#xff0c;空间复杂度为o(n)。是一种采用分治思想的排序方法。 逻辑分析&#xff1a; 假设输入数组a[] { 2, 1, 4, 5, 3, 8, 7, 9, 0, 6 }。 首先将数组二分成两个数组le…

使用ASP.NET Core 3.x 构建 RESTful API - 3.2 开始建立Controller和Action

Demo下面我们就来实践一下。打开之前的项目&#xff0c;并建立CompaniesController&#xff1a; 这里有6个地方比较关键&#xff0c;我们挨个看一下&#xff1a; RESTful API 或者其它Web API的Controller都应该继承于 ControllerBase 这个类&#xff08;点此查看详细的官方文档…

C++ 链表

线性表&#xff08;顺序表&#xff09;有两种存储方式&#xff1a;链式存储和顺式存储&#xff0c;顺式存储如数组&#xff0c;其内存连续分配&#xff0c;且是静态分配。链式存储&#xff0c;内存是不连续的&#xff0c;且是动态分配。前一个元素存储数据&#xff0c;后一个元…

波拉契尔数列 C++

题目&#xff1a;写一个函数&#xff0c;输入n, 求斐波那契数列的第n项。 分析&#xff1a;该题有两种实现方式递归或循环。当n比较大的时候f(n)结果也会比较大&#xff0c;故定义的时候可以采用long(int 也行)。递归会有大量的重复计算&#xff0c;而循环可以把f(n-1)和f(n-2)…

Deepin 下 使用 Rider 开发 .NET Core

国产的 Deepin 不错&#xff0c;安利一下。Deepin 用了也有一两年&#xff0c;也只是玩玩&#xff0c;没用在开发上面。后来 Win10 不太清真了&#xff0c;就想着能不能到 Deepin下撸码。要搞开发&#xff0c;首先少不了 IDE&#xff0c;VS2019 用不来&#xff0c;Vs Code 太复…

求旋转数组的最小数字C++

发现还是数组这种最简单的编码才适合我&#xff0c;遇到树&#xff0c;链表这些真的是一头雾水&#xff0c;自己也不知道怎么实现。言归正传&#xff0c;该篇文章介绍如何求旋转数组的最小值&#xff0c;求最大值可以适当改编即可。 什么是旋转数组呢&#xff0c;就是将一个数…

[视频演示].NET Core开发的iNeuOS物联网平台,实现从设备PLC、云平台、移动APP数据链路闭环...

此次我们团队人员对iNeuOS进行了全面升级&#xff0c;主要升级内容包括&#xff1a;&#xff08;1&#xff09; 设备容器增加设备驱动&#xff0c;包括&#xff1a;西门子&#xff08;S7-200smart、S7-300、S7-400、S7-1200、S7-1500&#xff09;、三菱&#xff08;FxSerial…

707 设计单链表

第一次完成这样的设计&#xff0c;一路磕磕碰碰&#xff0c;遇到了许多问题&#xff0c;最后终于一一解决了。感恩https://blog.csdn.net/lym940928/article/details/81276658 题目如下&#xff1a; 设计链表的实现。您可以选择使用单链表或双链表。单链表中的节点应该具有两…

选择开源项目什么最重要?

开发人员在决定是否使用某个开源项目时考虑到的最重要事项是什么&#xff1f;代码质量&#xff1f;安全性&#xff1f;好的文档&#xff1f;上述因素都很重要&#xff0c;但根据 Tidelift 和 The New Stack 的联合调查&#xff0c;控制着开源项目的开源许可证才是最需要考量的因…

leetcode 二进制求和 addBinary

题目描述&#xff1a; 给定两个二进制字符串&#xff0c;返回他们的和&#xff08;用二进制表示&#xff09;。 输入为非空字符串且只包含数字 1 和 0。 示例 1: 输入: a "11", b "1" 输出: "100" 示例 2: 输入: a "1010", b…

居然不知道和的区别?

前言那年刚找工作那会&#xff0c;就碰到过这么一个简单的题目“&和&&的区别” 那时知识面窄&#xff0c;大概也就知道1.都是作为逻辑与的运算符。2.&&具有短路功能&#xff0c;计算出前者false&#xff0c;就不需计算后者的true or false。后来在微信群里…

两数之和,输入有序数组 leetcode C++

给定一个已按照升序排列 的有序数组&#xff0c;找到两个数使得它们相加之和等于目标数。 函数应该返回这两个下标值 index1 和 index2&#xff0c;其中 index1 必须小于 index2。 说明: 返回的下标值&#xff08;index1 和 index2&#xff09;不是从零开始的。你可以假设每…

【DevOps进行时】自动化测试之单元测试

在DevOps建设中&#xff0c;主流的测试分层体系可以分为单元测试、接口测试和界面测试。Google曾提出一个经验法则&#xff1a;70%的小型测试&#xff0c;20%的中型测试&#xff0c;10%大型测试。当然&#xff0c;这个比例不是确定的&#xff0c;不同类型的项目&#xff0c;测试…

反转字符串中的单词 III leetcode

给定一个字符串&#xff0c;你需要反转字符串中每个单词的字符顺序&#xff0c;同时仍保留空格和单词的初始顺序。 示例 1: 输入: "Lets take LeetCode contest" 输出: "steL ekat edoCteeL tsetnoc" 注意&#xff1a;在字符串中&#xff0c;每个单词由单…

Zongsoft.Data 发布公告

很高兴我们的 ORM 数据访问框架(Zongsoft.Data)在历经两个 SaaS 产品的应用之后&#xff0c;今天正式宣布对外推广。它是一个类 GraphQL 风格的 ORM(Object/Relational Mapping) 数据访问框架。又一个轮子&#xff1f;在很长时间里&#xff0c;.NET 阵营似乎一直缺乏一个被普遍…

leetcode 旋转数组

给定一个数组&#xff0c;将数组中的元素向右移动 k 个位置&#xff0c;其中 k 是非负数。 示例 1:输入: [1,2,3,4,5,6,7] 和 k 3 输出: [5,6,7,1,2,3,4] 解释: 向右旋转 1 步: [7,1,2,3,4,5,6] 向右旋转 2 步: [6,7,1,2,3,4,5] 向右旋转 3 步: [5,6,7,1,2,3,4] 示例 2…

使用 .NET Core模板引擎创建自定义的模板和项目

本文要点.NET CLI 包含了一个模板引擎&#xff0c;它可以直接利用命令行创建新项目和项目项。这就是“dotnet new”命令。默认模板集涵盖了默认控制台和基于 ASP.NET 的应用程序以及测试项目所需的基本项目和文件类型。自定义模板可以创建更加有趣或定制化的项目和项目项&#…

leetcode 岛屿的个数

给定一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的的二维网格&#xff0c;计算岛屿的数量。一个岛被水包围&#xff0c;并且它是通过水平方向或垂直方向上相邻的陆地连接而成的。你可以假设网格的四个边均被水包围。 示例 1: 输入: 11110 11010…

.NET Core前后端分离快速开发框架(Core.3.0+AntdVue)

时间真快&#xff0c;转眼今年又要过去了。回想今年&#xff0c;依次开源发布了Colder.Fx.Net.AdminLTE(254Star)、Colder.Fx.Core.AdminLTE(335Star)、DotNettySocket(82Star)、IdHelper(47Star)&#xff0c;这些框架及组件都是本着以实际出发&#xff0c;实事求是的态度&…