rabbitmq死信队列详解与使用

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

以上是个人的通俗解释,专业术语解释的比较正规点大家可以参考,主要想搞清楚这个概念,不同的消息中间件大概都有自身对于死信或者死信队列的处理方式,下面重点要说说rabbitmq的死信队列

对rabbitmq来说,产生死信的来源大致有如下几种:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.

  • 消息TTL过期

  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

  • 死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

  • 综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理,关于这一点,也是本篇要重点讲述的,下面将用代码演示一下死信的产生及路由,即上面提到的三种方式,网上可供参考的资料比较多,但大多不全面,下面提供比较完整的demo,将各种场景的产生和过程进行列举,

640?wx_fmt=png

方式1:消息超时进入死信队列

这是一种在实际生产中应用场景比较多的一种方式,比如我们熟知的订单业务场景,当用户购买商品产生了一个订单的时候,可以设置过期时间,如果在这段时间内,消息还没有被消费,将会被路由到死信队列,专业术语来讲,即消息的TTL,TTL过期了消息将进入死信队列,下面是一段演示代码,这里包括两部分,生产者和消费者。

producer代码,此处模拟生产者产生订单,推送到队列中,消息有效时间是10S,过了10S如果没有被消费将会被路由到死信队列

public static void main(String[] args) throws Exception{	final Channel channel = RabbitUtil.getChannel();	String orderExchangeName = "order_exchange";	String orderQueueName = "order_queue";	String orderRoutingKey = "order.#";	Map<String, Object> arguments = new HashMap<String, Object>(16);	//死信队列配置  ----------------	String dlxExchangeName = "dlx.exchange";	String dlxQueueName = "dlx.queue";	String dlxRoutingKey = "#";	// 为队列设置队列交换器	arguments.put("x-dead-letter-exchange",dlxExchangeName);	// 设置队列中的消息 10s 钟后过期	arguments.put("x-message-ttl", 10000);	//正常的队列绑定	channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);	channel.queueDeclare(orderQueueName, true, false, false, arguments);	channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);	String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 创建订单.";	// 创建死信交换器和队列	channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);	channel.queueDeclare(dlxQueueName, true, false, false, null);	channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);	channel.basicPublish(orderExchangeName, "order.save", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());	System.err.println("消息发送完成......");	}

consumer代码,消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了

public class Consumer {	//消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了	private static final String QUEUE_NAME = "dlx.queue";	public static void main(String[] args) throws Exception{	// 创建信道	final Channel channel = RabbitUtil.getChannel();	System.out.println("消费者启动 ..........");	com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){	@Override	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {	System.err.println("死信队列接收到消息:" + new String(body));	System.err.println("deliveryTag:" + envelope.getDeliveryTag());	channel.basicAck(envelope.getDeliveryTag(), false);	}	};	channel.basicConsume(QUEUE_NAME, consumer);	TimeUnit.SECONDS.sleep(10000000L);	}	}

然后我们分别运行两端的代码,这里提示一下,我们并没有提前在控制台去创建queue 和 exchange,这个在producer启动或者consumer启动的时候,如果没有创建过会自动创建以及建立queue和exchange的绑定关系,

启动producer,消息发送成功,同时可以通过控制台看到,exhange和相关的队列也帮我们创建了,要注意的是在dlx.queue中,有一个消息就绪,很明显,消息过了10S中没有任何消费者消费,就被路由到了死信队列dlx.queue中,

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

启动consumer,通过控制台打印结果,可以看到,由于消费端监听的是死信队列,已经从dlx.queue中成功获取到了这条信息,

640?wx_fmt=png

2、消息被拒绝,且requeue=false

没有细致研究过这个问题的可能会有点儿懵,其实就是在consumer端,当消费者要过滤某些消息的时候,那部分被过滤掉的消息如果不设置退回,即上一篇所讲的消息重回队列的话,这些消息就变成了死信,即在下面的代码中第三个参数设置成false即可,下面来看具体的代码,

有这样一个场景,一批消息中,当消费端从header中收到了num=0的消息将会被过滤掉,并且设置如上requeue=false,下面看具体的代码,

peoducer端代码,

/**	* 生产者	* 死信队列使用	*/	
public class Producer {	public static void main(String[] args) throws Exception{	Channel channel = RabbitUtil.getChannel();	String exchangeName = "test_ack_exchange";	String routingKey = "ack.save";	//通过在properties设置来标识消息的相关属性	for(int i=0;i<5;i++){	Map<String, Object> headers = new HashMap<String, Object>();	headers.put("num",i);	AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()	.deliveryMode(2)                    // 传送方式 2:持久化投递	.contentEncoding("UTF-8")           // 编码方式	//.expiration("10000")              // 过期时间	.headers(headers)                  //自定义属性	.build();	String message = "hello this is ack message ....."  + i;	System.out.println(message);	channel.basicPublish(exchangeName,routingKey,true,properties,message.getBytes());	}	}	}

consumer端代码,

public class Consumer {	public static void main(String[] args) throws Exception{	final Channel channel = RabbitUtil.getChannel();	String exchangeName = "test_ack_exchange";	String exchangeType="topic";	final String queueName = "test_ack_queue";	String routingKey = "ack.#";	//死信队列配置  ----------------	String deadExchangeName = "dead_exchange";	String deadQueueName = "dead_queue";	String deadRoutingKey = "#";	//死信队列配置  ----------------	//如果需要将死信消息路由	Map<String,Object> arguments = new HashMap<String, Object>();	arguments.put("x-dead-letter-exchange",deadExchangeName);	channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);	channel.queueDeclare(queueName,false,false,false,arguments);	channel.queueBind(queueName,exchangeName,routingKey);	//死信队列绑定配置  ----------------	channel.exchangeDeclare(deadExchangeName,exchangeType,true,false,false,null);	channel.queueDeclare(deadQueueName,true,false,false,null);	channel.queueBind(deadQueueName,deadExchangeName,deadRoutingKey);	//死信队列配置  ----------------	System.out.println("consumer启动 .....");	com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){	@Override	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {	try{	Thread.sleep(2000);	}catch (Exception e){	}	Integer num = (Integer)properties.getHeaders().get("num");	if(num==0){	//未被ack的消息,并且requeue=false。即nack的 消息不再被退回队列而成为死信队列	channel.basicNack(envelope.getDeliveryTag(),false,false);	String message = new String(body, "UTF-8");	System.out.println("consumer端的Nack消息是:" + message);	}else {	channel.basicAck(envelope.getDeliveryTag(),false);	String message = new String(body, "UTF-8");	System.out.println("consumer端的ack消息是:" + message);	}	}	};	//消息要能重回队列,需要设置autoAck的属性为false,即在回调函数中进行手动签收	channel.basicConsume(queueName,false,consumer);	}	
}

要关注的即下面的这处代码和第三个参数,

640?wx_fmt=png

启动生产者和消费者

640?wx_fmt=png

启动生产者,生产者成功发送5条消息

640?wx_fmt=png

再看消费端的控制台,这里num=0的这条消息由于设置了死信队列而不会重回原来的队列,在上一篇中,当参数设置成了true的时候,看到控制台一直会打印一条消息,

640?wx_fmt=png

同时,通过控制台也可以发现,在dead_queue中,有一条消息为就绪状态了,即死信消息,但这里并没有对这条消息做处理,目前一直存在队列里面,可以根据实际应用做后续的处理,

640?wx_fmt=png

3、队列达到最大长度

这个很好理解,比如我们设置某个队列的最大可承载消息的数量是100个,超出第100个的消息将会被路由到死信队列中,设置消息队列的最大数量也是实际生产中作为队列限流的一种常规手段,具有实际的业务意义,下面是代码演示,基本设置和上述的TTL类似,只是在参数中将TTL更换为如下配置,

生产者代码,这里我们设定order_queue这个队列的容量是5个,但是我们在程序中设置的x-max-length=3,那么按照这个猜想,将会有两个消息被路由到死信队列

public class Producer {	public static void main(String[] args) throws Exception{	final Channel channel = RabbitUtil.getChannel();	String orderExchangeName = "order_exchange";	String orderQueueName = "order_queue";	String orderRoutingKey = "order.#";	Map<String, Object> arguments = new HashMap<String, Object>(16);	//死信队列配置  ----------------	String dlxExchangeName = "dlx.exchange";	String dlxQueueName = "dlx.queue";	String dlxRoutingKey = "#";	// 为队列设置队列交换器	arguments.put("x-dead-letter-exchange",dlxExchangeName);	// 设置队列中的消息 10s 钟后过期	//arguments.put("x-message-ttl", 10000);	arguments.put("x-max-length",3);	//正常的队列绑定	channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);	channel.queueDeclare(orderQueueName, true, false, false, arguments);	channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);	String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 创建订单.";	// 创建死信交换器和队列	channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);	channel.queueDeclare(dlxQueueName, true, false, false, null);	channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);	for(int i=0;i<5;i++){	message = message + "========> " + i ;	System.out.println("发送的消息是:" + message);	channel.basicPublish(orderExchangeName, "order.save",null, message.getBytes());	}	System.err.println("消息发送完成......");	}	}

消费者代码

public class Consumer {	private static final String QUEUE_NAME = "order_queue";	public static void main(String[] args) throws Exception{	// 创建信道	final Channel channel = RabbitUtil.getChannel();	// 消费端消息限流。	// 设置客户端最多接收未被ack的消息个数, 只有消息 手动签收  此参数才会生效。	//channel.basicQos(1);	System.out.println("消费者启动 ..........");	com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){	@Override	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {	System.err.println("死信队列接收到消息:" + new String(body));	System.err.println("deliveryTag:" + envelope.getDeliveryTag());	channel.basicAck(envelope.getDeliveryTag(),false);	}	};	channel.basicConsume(QUEUE_NAME,false, consumer);	//TimeUnit.SECONDS.sleep(10000000L);	}	}

启动生产者,5条消息发送完毕

640?wx_fmt=png

再启动消费端,通过控制台可以看到,消费端只从order_queue中消费了3条消息,还剩2条消息去哪里了呢?

640?wx_fmt=png

我们再回到控制台观察一下,发现在dlx.queue这个死信队列中有两条就绪的消息,即剩下的2条消息被路由到了死信队列了

640?wx_fmt=png

以上便是关于死信队列常见的3种方式的处理程序和逻辑

640?wx_fmt=jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/313005.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用ASP.NET Core 3.x 构建 RESTful API - 3.2 开始建立Controller和Action

Demo下面我们就来实践一下。打开之前的项目&#xff0c;并建立CompaniesController&#xff1a; 这里有6个地方比较关键&#xff0c;我们挨个看一下&#xff1a; RESTful API 或者其它Web API的Controller都应该继承于 ControllerBase 这个类&#xff08;点此查看详细的官方文档…

C++ 链表

线性表&#xff08;顺序表&#xff09;有两种存储方式&#xff1a;链式存储和顺式存储&#xff0c;顺式存储如数组&#xff0c;其内存连续分配&#xff0c;且是静态分配。链式存储&#xff0c;内存是不连续的&#xff0c;且是动态分配。前一个元素存储数据&#xff0c;后一个元…

波拉契尔数列 C++

题目&#xff1a;写一个函数&#xff0c;输入n, 求斐波那契数列的第n项。 分析&#xff1a;该题有两种实现方式递归或循环。当n比较大的时候f(n)结果也会比较大&#xff0c;故定义的时候可以采用long(int 也行)。递归会有大量的重复计算&#xff0c;而循环可以把f(n-1)和f(n-2)…

Deepin 下 使用 Rider 开发 .NET Core

国产的 Deepin 不错&#xff0c;安利一下。Deepin 用了也有一两年&#xff0c;也只是玩玩&#xff0c;没用在开发上面。后来 Win10 不太清真了&#xff0c;就想着能不能到 Deepin下撸码。要搞开发&#xff0c;首先少不了 IDE&#xff0c;VS2019 用不来&#xff0c;Vs Code 太复…

[视频演示].NET Core开发的iNeuOS物联网平台,实现从设备PLC、云平台、移动APP数据链路闭环...

此次我们团队人员对iNeuOS进行了全面升级&#xff0c;主要升级内容包括&#xff1a;&#xff08;1&#xff09; 设备容器增加设备驱动&#xff0c;包括&#xff1a;西门子&#xff08;S7-200smart、S7-300、S7-400、S7-1200、S7-1500&#xff09;、三菱&#xff08;FxSerial…

选择开源项目什么最重要?

开发人员在决定是否使用某个开源项目时考虑到的最重要事项是什么&#xff1f;代码质量&#xff1f;安全性&#xff1f;好的文档&#xff1f;上述因素都很重要&#xff0c;但根据 Tidelift 和 The New Stack 的联合调查&#xff0c;控制着开源项目的开源许可证才是最需要考量的因…

居然不知道和的区别?

前言那年刚找工作那会&#xff0c;就碰到过这么一个简单的题目“&和&&的区别” 那时知识面窄&#xff0c;大概也就知道1.都是作为逻辑与的运算符。2.&&具有短路功能&#xff0c;计算出前者false&#xff0c;就不需计算后者的true or false。后来在微信群里…

【DevOps进行时】自动化测试之单元测试

在DevOps建设中&#xff0c;主流的测试分层体系可以分为单元测试、接口测试和界面测试。Google曾提出一个经验法则&#xff1a;70%的小型测试&#xff0c;20%的中型测试&#xff0c;10%大型测试。当然&#xff0c;这个比例不是确定的&#xff0c;不同类型的项目&#xff0c;测试…

Zongsoft.Data 发布公告

很高兴我们的 ORM 数据访问框架(Zongsoft.Data)在历经两个 SaaS 产品的应用之后&#xff0c;今天正式宣布对外推广。它是一个类 GraphQL 风格的 ORM(Object/Relational Mapping) 数据访问框架。又一个轮子&#xff1f;在很长时间里&#xff0c;.NET 阵营似乎一直缺乏一个被普遍…

使用 .NET Core模板引擎创建自定义的模板和项目

本文要点.NET CLI 包含了一个模板引擎&#xff0c;它可以直接利用命令行创建新项目和项目项。这就是“dotnet new”命令。默认模板集涵盖了默认控制台和基于 ASP.NET 的应用程序以及测试项目所需的基本项目和文件类型。自定义模板可以创建更加有趣或定制化的项目和项目项&#…

.NET Core前后端分离快速开发框架(Core.3.0+AntdVue)

时间真快&#xff0c;转眼今年又要过去了。回想今年&#xff0c;依次开源发布了Colder.Fx.Net.AdminLTE(254Star)、Colder.Fx.Core.AdminLTE(335Star)、DotNettySocket(82Star)、IdHelper(47Star)&#xff0c;这些框架及组件都是本着以实际出发&#xff0c;实事求是的态度&…

.Net开发3年,应聘大厂惨遭淘汰,如何翻身打脸面试官?

(设计师忘记了&#xff0c;这里还有个双十一福利课&#xff0c;还能1元秒杀&#xff01;)

面对金九银十铜十一你真的准备好了吗?

作者&#xff1a;回首笑人间&#xff0c;高级Java工程师一枚&#xff0c;热爱研究开源技术&#xff0c;架构师社区合伙人&#xff01;前言&#xff1a;又是一年一度的金九银十跳槽季&#xff0c;回首在经历了半个月的求职奔波后&#xff0c;终于又能安稳的静下心来敲代码了&…

深入理解二叉搜索树

什么是二叉搜索树&#xff1f; 顾名思义&#xff0c;一颗二叉搜索树是基于二叉树来组织的&#xff0c;它包括许多动态集合操作&#xff08;Search&#xff0c;MiniNum, MaxiNum, Prodecessor, Successor, Insert 和Delete等&#xff09;。二叉搜索树上的基本操作所花费的时间与…

深入理解堆(最大堆,最小堆及堆排序)

基本概念&#xff1a; 1、完全二叉树&#xff1a;若二叉树的深度为h&#xff0c;则除第h层外&#xff0c;其他层的结点全部达到最大值&#xff0c;且第h层的所有结点都集中在左子树。 2、满二叉树&#xff1a;满二叉树是一种特殊的的完全二叉树&#xff0c;所有层的结点都是最…

王炸吐血整理60个Redis面试题,全网最全了

1.Redis 是一个基于内存的高性能key-value数据库。 2.Redis相比memcached有哪些优势&#xff1a; memcached所有的值均是简单的字符串&#xff0c;redis作为其替代者&#xff0c;支持更为丰富的数据类型redis的速度比memcached快很多redis可以持久化其数据3.Redis是单线程 redi…

H.266 参考软件VTM下载和安装

1、下载安装cmake &#xff0c;下载地址https://cmake.org/。 安装后打开控制面板-系统-高级系统设置-环境变量-PATH-编辑-输入cmake.exe的路径即可。 2、如果你之前&#xff08;HEVC&#xff09;时已经下载好了SVN&#xff0c;直接在桌面点击右键SVNcheckout&#xff0c;出来…

VTM编码结构框架

VTM流程差不多就是这样子的啦&#xff0c;后续会继续补充&#xff0c;由于能力有限&#xff0c;如有错误&#xff0c;欢迎指正。在后面会进一步分析每一个函数&#xff0c;并分析流程的细节。

AspNetCore应用注意这一点,CTO会对你刮目相看

背景已经有很多文章记录了Web程序中采用异步编程的优势和.Net异步编程的用法&#xff0c; 异步编程虽然不能解决查询数据库的瓶颈&#xff0c; 但是利用线程切换&#xff0c;能最大限度的弹性利用工作线程&#xff0c; 提高了web服务的响应能力。????9012年了&#xff0c;再…

ASP.NET Core如何限制请求频率

ASP.NET Core如何限制请求频率&#xff0c;为了防止恶意请求&#xff0c;我们往往会对接口请求的频率做限制&#xff0c;比如请求间隔&#xff0c;一段时间内请求的次数&#xff0c;针对部分IP做出不同的限制策略如何去限制请求频率不需要我们去实现&#xff0c;用上AspNetCore…