死信队列理解与使用

一、简介

在rabbitMQ中常用的交换机有三种,直连交换机、广播交换机、主题交换机;

直连交换机中队列与交换机需要约定好routingKey去进行绑定;

广播交换机并不需要routingKey绑定,只需队列与交换机绑定即可;

主题交换机最大的特点可以通过*和#去匹配队列;

而死信队列其实就是平常的队列的一种,通常我会使用直连交换机来作为死信队列;所以说,死信队列其实就是我们在处理业务中慢慢衍生出来的一个名词、一种方案;它和普通的队列是一样的。

二、使用场景

        我们知道在使用队列时有几种应答模式,比如自动应答(auto)、手动应答(manual)等,而在使用自动应答时,无论消息是否成功消费,达到重试次数后就会自动的把此消息给删除掉了,当然我们是不想把没有消费成功的消息给删除掉的。而开启手动应答时,配置的重试机制会失效 当有消费失败的消息时 会进入死循环。

        那么为了解决此场景,就引入了死信队列。当有不能正常消费的消息时 就把此消息给打到死信队列中,然后再根据实际情况去处理此信息。

关于自动应答和手动应答可参考这篇博客: 

rabbitMQ手动应答与自动应答_骑着蜗牛打天下的博客-CSDN博客

 

在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。

死信就是消息在特定场景下的一种表现形式,这些场景包括:

1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时。 或者拒绝basicReject

2. 消费者发生异常,超过重试次数 。

3. 消息的 TTL 过期时。

4. 消息队列达到最大长度。

三、代码实现

父pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.1</version>
<!--        <version>2.2.5.RELEASE</version>--><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq</name><properties><java.version>8</java.version><hutool.version>5.8.3</hutool.version><lombok.version>1.18.24</lombok.version></properties><description>spring-boot-rabbitmq</description><packaging>pom</packaging><modules><module>direct-exchange</module><module>fanout-exchange</module><module>topic-exchange</module><module>game-exchange</module><module>dead-letter-queue</module><module>delay-queue</module><module>delay-queue2</module></modules><dependencyManagement><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies></dependencyManagement></project>

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><relativePath>../pom.xml</relativePath></parent><artifactId>dead-letter-queue</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件

server.port=8084
#host
spring.rabbitmq.host=121.40.100.66
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#每个消费者每次可最大处理的nack消息数量
spring.rabbitmq.listener.simple.prefetch=1
#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#监听重试是否可用
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#最大重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
#第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.multiplier=2
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

config

正常队列config

package com.chensir.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic MessageConverter messageConverter(){return  new Jackson2JsonMessageConverter();}@Beanpublic DirectExchange directExchange(){return  new DirectExchange("DirectExchange",true,false);}@Beanpublic Queue directQueueLong(){return   QueueBuilder.durable("DirectQueue").deadLetterExchange("DeadLetterExchange").deadLetterRoutingKey("dead")//20s还没消费就打到死信队列中.ttl(20000)//当队列中长度有500个消息,也打入死信队列.maxLength(500).build();}@Beanpublic Binding binding(){return  BindingBuilder.bind(directQueueLong()).to(directExchange()).with("direct123");}
}

死信队列config

package com.chensir.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 死信队列 一般由运维在rebbitMQ服务创建交换机和队列 不需要代码配置*/
//@Configuration
public class DeadLetterConfig {@Beanpublic MessageConverter messageConverter(){return  new Jackson2JsonMessageConverter();}@Beanpublic DirectExchange directExchange() {DirectExchange directExchange = new DirectExchange("DeadLetterExchange");return directExchange;}@Beanpublic Queue queue() {Queue deadLetterQueue = QueueBuilder.durable("DeadLetterQueue").build();return deadLetterQueue;}@Beanpublic Binding binding() {Binding binding = BindingBuilder.bind(queue()).to(directExchange()).with("dead");return binding;}}

生产者

package com.chensir.provider;import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class DirectProvider {@Resourceprivate RabbitTemplate rabbitTemplate;public  void  send(){// 死信队列
//        rabbitTemplate.convertAndSend("DeadLetterExchange", "dead","123");for (int i=1;i<7;i++){OrderIngOk orderIngOk = new OrderIngOk();orderIngOk.setOrderNo("202308289687-"+i);orderIngOk.setId(i);orderIngOk.setUserName("倪海杉");
//            String s = JSONUtil.toJsonStr(orderIngOk);rabbitTemplate.convertAndSend("DirectExchange", "direct123",orderIngOk);}}}

消费者

正常队列消费者

package com.chensir.consumer;import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class DirectConsumer {@RabbitHandler@RabbitListener(queues = "DirectQueue" )public void  process(OrderIngOk orderIngOk) throws IOException {try {// 处理业务开始if(orderIngOk.getId().equals(5)){int a =  0;int b= 2/a;}System.out.println("接受到消息,并正常处理结束"+ JSONUtil.toJsonStr(orderIngOk));} catch (Exception ex){System.out.println(ex.getMessage());System.out.println("接受到消息,发生异常"+ JSONUtil.toJsonStr(orderIngOk));//自动应答 当消费者成功消费消息时会自动把消息删除,而没有成功消费消息时需要给重试机制抛出个异常 重试机制才会开启重试throw ex;//手动模式//channel.basicReject(deliveryTag,true);//channel.basicNack(deliveryTag,false,true);}}
}

死信队列消费者

package com.chensir.consumer;import com.chensir.model.OrderIngOk;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DeadConsumer {@RabbitHandler@RabbitListener(queues = "DeadLetterQueue")public void  process(OrderIngOk orderIngOk)  {System.out.println("这条信息在运行时发生了未知的异常,此信息被打到了死信队列,被死信队列消费者消费成功"+orderIngOk);}
}

结果

接受到消息,并正常处理结束{"id":1,"OrderNo":"202308289687-1","userName":"倪海杉"}
接受到消息,并正常处理结束{"id":2,"OrderNo":"202308289687-2","userName":"倪海杉"}
接受到消息,并正常处理结束{"id":3,"OrderNo":"202308289687-3","userName":"倪海杉"}
接受到消息,并正常处理结束{"id":4,"OrderNo":"202308289687-4","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
2023-08-28 16:45:39.848  WARN 24432 --- [ntContainer#1-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'[B@1a6e663a(byte[58])' MessageProperties [headers={__TypeId__=com.chensir.model.OrderIngOk}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DirectExchange, receivedRoutingKey=direct123, deliveryTag=5, consumerTag=amq.ctag-f9Up1UES-F3rDvb-AK16xw, consumerQueue=DirectQueue])这条信息在运行时发生了未知的异常,此信息被打到了死信队列,被死信队列消费者消费成功OrderIngOk(id=5, OrderNo=202308289687-5, userName=倪海杉)
接受到消息,并正常处理结束{"id":6,"OrderNo":"202308289687-6","userName":"倪海杉"}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/66171.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

​7.1 项目1 学生通讯录管理:文本文件增删改查(C++版本)(自顶向下设计+断点调试) (A)​

C自学精简教程 目录(必读) 作业目标&#xff1a; 这个作业中&#xff0c;你需要综合运用之前文章中的知识&#xff0c;来解决一个相对完整的应用程序。 作业描述&#xff1a; 1 在这个作业中你需要在文本文件中存储学生通讯录的信息&#xff0c;并在程序启动的时候加载这些…

python+requests实现接口自动化测试

这两天一直在找直接用python做接口自动化的方法&#xff0c;在网上也搜了一些博客参考&#xff0c;今天自己动手试了一下。 一、整体结构 上图是项目的目录结构&#xff0c;下面主要介绍下每个目录的作用。 Common:公共方法:主要放置公共的操作的类&#xff0c;比如数据库sql…

简单了解网络传输介质

目录 一、同轴电缆 二、双绞线 三、光纤 四、串口电缆 一、同轴电缆 10BASE前面的数字表示传输带宽为10M&#xff0c;由于带宽较低、现在已不再使用。 50Ω同轴电缆主要用来传送基带数字信号&#xff0c;因此也被称作为基带同轴电缆&#xff0c;在局域网中得到了广泛的应用…

Prompt GPT推荐社区

大家好&#xff0c;我是荷逸&#xff0c;这次给大家带来的是我日常学习Prompt社区推荐 Snack Prompt 访问地址&#xff1a;http://snackprompt.com Snack Prompt是一个采用的Prompts诱导填空式的社区&#xff0c;它提供了一种简单的prompt修改方式&#xff0c;你只需要输入关…

一款windows的终端神奇,类似mac的iTem2

终于找到了一款windows的终端神奇。类似mac的iTem2 来&#xff0c;上神器 cmder cmder是一款windows的命令行工具&#xff0c;就是我们的linux的终端&#xff0c;用起来和linux的命令一样。所以我们今天要做的是安装并配置cmder ![在这里插入图片描述](https://img-blog.csdni…

Python所有方向的学习路线图!!

学习路线图上面写的是某个方向建议学习和掌握的知识点汇总&#xff0c;举个例子&#xff0c;如果你要学习爬虫&#xff0c;那么你就去学Python爬虫学习路线图上面的知识点&#xff0c;这样学下来之后&#xff0c;你的知识体系是比较全面的&#xff0c;比起在网上找到什么就学什…

MATLAB中circshift函数转化为C语言

背景 有项目算法使用matlab中circshift函数进行运算&#xff0c;这里需要将转化为C语言&#xff0c;从而模拟算法运行&#xff0c;将算法移植到qt。 MATLAB中circshift简单介绍 circshift是循环移位函数。可以使用于数组和矩阵元素的循环移位。 当A是数组 Bcircshift(A,p);如果…

Axes3D绘制3d图不出图解决办法【Python】

运行下面一段代码​&#xff1a; import numpy as npimport matplotlib.pyplot as pltfrom mpl_toolkits.mplot3d import Axes3D#这里设函数为y3x2x_data [1.0,2.0,3.0]y_data [5.0,8.0,11.0]​def forward(x): return x * w b​def loss(x,y): y_pred forward(x) …

裸露土方智能识别算法 python

裸露土方智能识别算法通过opencvpython网络模型框架算法&#xff0c;裸露土方智能识别算法能够准确识别现场土堆的裸露情况&#xff0c;并对超过40%部分裸露的土堆进行抓拍预警。此次算法用到的Python是一种由Guido van Rossum开发的通用编程语言&#xff0c;它很快就变得非常流…

75 # koa 基本逻辑实现以及属性的扩展

准备工作 新建自己的 kaimo-koa 文件夹&#xff0c;结构如下&#xff1a; lib application.js&#xff1a;创建应用context.js&#xff1a;上下文request.js&#xff1a;koa 中自己实现的 request 的对象response.js&#xff1a;koa 中自己实现的 response 的对象 package.js…

小白学Go基础01-Go 语言的介绍

Go 语言对传统的面向对象开发进行了重新思考&#xff0c;并且提供了更高效的复用代码的手段。Go 语言还让用户能更高效地利用昂贵服务器上的所有核心&#xff0c;而且它编译大型项目的速度也很快。 用 Go 解决现代编程难题 Go 语言开发团队花了很长时间来解决当今软件开发人员…

面经:安卓学习笔记

文章目录 1. Android系统架构2. Activity2.0 定义2.1 生命周期2.2 生命状态2.3 启动模式 3. Service3.1 定义3.2 两种启动方式3.3 生命周期3.4 跨进程service3.5 IntentService 4. BroadCastReceiver4.1 概念4.2 组成4.3 广播接收器的分类4.4 生命周期4.5 静态注册和动态注册 5…

Blender界面学习02

学习视频 【基础篇】1.3 认识界面_哔哩哔哩_bilibili 基本的3d建模的流程是什么&#xff1f; 四个角现出加号时可以拆分窗口&#xff0c;也可以合并窗口 向自己的方向拉是合并&#xff0c;向不是自己的方向拉是合并 如果界面搞乱后需要回到原来的布局 然后在新建的布局上右击 …

探秘C语言扫雷游戏实现技巧

本篇博客会讲解&#xff0c;如何使用C语言实现扫雷小游戏。 0.思路及准备工作 使用2个二维数组mine和show&#xff0c;分别来存储雷的位置信息和排查出来的雷的信息&#xff0c;前者隐藏&#xff0c;后者展示给玩家。假设盘面大小是99&#xff0c;这2个二维数组都要开大一圈…

Java“牵手”1688淘口令转换API接口数据,1688API接口申请指南

1688平台商品淘口令接口是开放平台提供的一种API接口&#xff0c;通过调用API接口&#xff0c;开发者可以获取1688商品的标题、价格、库存、商品快递费用&#xff0c;宝贝ID&#xff0c;发货地&#xff0c;区域ID&#xff0c;快递费用&#xff0c;月销量、总销量、库存、详情描…

HarmonyOS/OpenHarmony(Stage模型)应用开发单一手势(一)

一、点击手势&#xff08;TapGesture&#xff09; TapGesture(value?:{count?:number; fingers?:number}) 点击手势支持单次点击和多次点击&#xff0c;拥有两个可选参数&#xff1a; count&#xff1a;非必填参数&#xff0c;声明该点击手势识别的连续点击次数。默认值为…

git submodule 子模块的基本使用

常用命令 命令说明git submodule add <url> <本地路径>添加子模块git submodule update --init --recursive添加子模块后&#xff0c;同步子模块内容git clone <url> --recurse-submodules克隆带有子模块的项目git submodule init初始化子模块git submodule…

图神经网络教程之HAN-异构图模型

异构图 包含不同类型节点和链接的异构图 异构图的定义&#xff1a;节点类别数量和边的类别数量加起来大于2就叫异构图。 meta-path元路径的定义&#xff1a;连接两个对象的复合关系&#xff0c;比如&#xff0c;节点类型A和节点类型B&#xff0c;A-B-A和B-A-B都是一种元路径。 …

Linux上git的简单使用

git的作用&#xff1a;版本控制多人协作 客户端 磁盘上的文件-->本地仓库-->远端仓库 服务端 gitee和GitHub是基于git的商业化网站 git的命令行如何使用&#xff1f; 1、新建一个仓库 .git ignore 是忽略带有某些后缀的文件的上传。 例如&#xff1a;里面有 .sln …

实战:基于卷积的MNIST手写体分类

前面实现了基于多层感知机的MNIST手写体识别&#xff0c;本章将实现以卷积神经网络完成的MNIST手写体识别。 1. 数据的准备 在本例中&#xff0c;依旧使用MNIST数据集&#xff0c;对这个数据集的数据和标签介绍&#xff0c;前面的章节已详细说明过了&#xff0c;相对于前面章…