RabbitMQ 死信队列应用

1. 概念

死信队列(Dead Letter Queue)是在消息队列系统中的一种特殊队列,用于存储无法被消费的消息。消息可能会因为多种原因变成“死信”,例如消息过期、消息被拒绝、消息队列长度超过限制等。当消息变成“死信”时,它们会被路由到死信队列中,以便进行进一步处理或分析。 死信队列能够帮助系统进行消息跟踪、监控和处理异常情况,是消息队列系统中的重要组成部分。

2. 应用场景

死信队列在消息队列系统中有多种应用场景,包括但不限于以下几个方面:

  • 延迟消息处理:实现延迟消息投递,例如实现消息的定时投递、消息重试机制等。

  • 任务调度:用于实现任务调度系统,例如延迟执行任务、失败重试任务等。

  • 异常处理:处理消息消费失败或超时的情况,对异常消息进行统一处理。

  • 业务流程控制:实现业务流程中的状态控制和超时处理,例如订单超时取消、支付超时处理等。

  • 监控和统计:对异常消息进行统计和分析,用于系统性能监控和问题排查。

这些应用场景展示了死信队列的灵活性和实用性,在实际系统开发中具有广泛的应用价值。

3. 造成消息进入死信队列的原因

消息成为死信的原因有以下几种:

  • 消息被拒绝(basic.reject或basic.nack),并且requeue标志被设置为false。若参数requeue为true,则表示还可以将此跳消息重新塞回普通队列,若为false则消息被拒绝后直接进入死信队列。

  • 消息过期。在生产者设置生产时设置,若消费者未在过期时间内消费消息,则消息被转发到死信队列中。("x-message-ttl")

  • 队列达到最大长度。当普通队列中消息堆积数量长度达到了maxLength,则会将新接收的消息转发到死信队列中去,从而避免消息丢失。

4. 死信队列工作流程图

5. 代码示例

5.1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.15</version>
</dependency>

5.2 RabbitMQ配置

@Configuration
public class RabbitConfig {/*** 死信队列消息模型构建----------------------------------------------------------------------------------**/// 创建普通队列@Beanpublic Queue basicQueue() {Map<String, Object> params = new HashMap<>(8);// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,params.put("x-dead-letter-exchange", Exchange.DEMO_DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。params.put("x-dead-letter-routing-key", RoutingKey.DEMO_DEAD_ROUTING_KEY);// 注意这里是毫秒单位,这里我们给10秒params.put("x-message-ttl", 10*1000);return new Queue(MyQueue.DEMO_CONSUMER_QUEUE, true, false, false, params);}//创建“基本消息模型”的基本交换机,面向生产者@Beanpublic TopicExchange basicExchange() {//创建并返回基本交换机实例return new TopicExchange(Exchange.DEMO_BASIC_NORMAL_EXCHANGE, true, false);}//创建“基本消息模型”的基本绑定(基本交换机+基本路由),面向生产者@Beanpublic Binding basicBinding() {//创建并返回基本消息模型中的基本绑定(注意这里是正常交换机跟死信队列绑定在一定,不叫死信路由)return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RoutingKey.DEMO_ROUTING_KEY);}// 创建死信交换机@Beanpublic TopicExchange deadLetterExchange() {//创建并返回死信交换机实例return new TopicExchange(Exchange.DEMO_DEAD_LETTER_EXCHANGE, true, false);}// 创建第二个中转站// 创建死信队列@Beanpublic Queue deadLetterQueue() {return new Queue(MyQueue.DEMO_DEAD_LETTER_QUEUE, true);}// 创建死信路由及其绑定@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(RoutingKey.DEMO_DEAD_ROUTING_KEY);}public static class Exchange {public static final String DEMO_BASIC_NORMAL_EXCHANGE = "demo.basic.exchange";public static final String DEMO_DEAD_LETTER_EXCHANGE = "demo.dead.letter.exchange";}public static class RoutingKey {//交换机与报表队列绑定的RoutingKeypublic static final String DEMO_ROUTING_KEY = "demo.basic.routing.key";public static final String DEMO_DEAD_ROUTING_KEY = "demo.dead.routing.key";}/*** 队列名称* @author peng.zhang* @date 2024/01/30*/public static class MyQueue {//报表队列名称public static final String DEMO_CONSUMER_QUEUE = "demo.basic.queue";//死信队列名称public static final String DEMO_DEAD_LETTER_QUEUE = "demo.dead.letter.queue";}
}

5.3 消息生产者

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送消息到死信队列*/@PostMapping("/testDeadQueue")public String testDeadQueue() {// 设置生产者到交换机的确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("correlationData:{}, ack:{}, cause:{}", JSON.toJSONString(correlationData), ack, cause);});// 设置消息未被队列接收时的返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, routing) -> {log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}", JSON.toJSONString(message),replyCode, replyText, ex, routing);});// 生成关联数据并发送消息到交换机CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 消息内容String messageBody = StrUtil.format("this message send at {}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));rabbitTemplate.convertAndSend(RabbitConfig.Exchange.DEMO_BASIC_NORMAL_EXCHANGE, RabbitConfig.RoutingKey.DEMO_ROUTING_KEY, messageBody, correlationData);log.info(">>>>>{}, 发送消息:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), messageBody);return "OK";}}

5.4 消息消费者

@Component
@Slf4j
public class DeadLetterConsumer {/*** 监听 DEMO_CONSUMER_QUEUE 并处理传入的消息。* 为测试目的抛出 IOException 以模拟异常。** @param messageBody 消息负载* @param headers     消息头* @param channel     用于消息确认的通道* @throws IOException 如果抛出异常*/@RabbitListener(queues = RabbitConfig.MyQueue.DEMO_CONSUMER_QUEUE)@RabbitHandlerpublic void testBasicQueueAndThrowsException(@Payload String messageBody, @Headers Map<String, Object> headers, Channel channel) throws IOException {/*** Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,* 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。* RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。*/Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);log.info(">>>>>{} 普通队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, messageBody);/***  multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认*  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认*/// ACK,确认一条消息已经被消费
//        channel.basicAck(deliveryTag, false);// 对应的业务操作。。。。。// doBusiness();// 模拟消息拒绝channel.basicNack(tag, false, false);}/*** 处理业务逻辑*/private void doBusiness() {System.out.println("here do some business code");}/*** 监听死信队列并处理消息。** @param data    消息内容* @param tag     消息标签* @param channel 通道*/@RabbitListener(queues = RabbitConfig.MyQueue.DEMO_DEAD_LETTER_QUEUE)@RabbitHandlerpublic void fromDeadLetter(@Payload String data, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {log.info(">>>>>{} 死信队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, data);// 对应的业务操作。。。。。}
}

5.5 YML配置

spring:rabbitmq:username: rabbitmqpassword: rabbitmqport: 5672host: 127.0.0.1#publisher-confirm-type参数有三个可选值:#SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。#CORRELATED:消息从生产者发送到交换机后触发回调方法。#NONE(默认):关闭发布确认模式。publisher-confirm-type: correlatedtemplate:receive-timeout: 1800000reply-timeout: 1800000retry:enabled: falselistener:direct:retry:enabled: truedefault-requeue-rejected: falsesimple:retry:# 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)enabled: true# 最大重试次数max-attempts: 1# 重试间隔时间(单位毫秒)initial-interval: 10000# 重试最大时间间隔(单位毫秒)max-interval: 300000# 应用于前一重试间隔的乘法器multiplier: 5default-requeue-rejected: false

5.6 控制台输出

从控制台可以看出,消息被拒绝后,大概10秒后死信队列消息被消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/659280.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据分享】1929-2023年全球站点的逐月最高气温数据(Shp\Excel\无需转发)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff0c;其中又以气温指标最为常用&#xff01;说到气温数据&#xff0c;最详细的气温数据是具体到气象监测站点的气温数据&#xff01; 之前我们分享过1929-2023年全球气象站…

Leetcode1109. 航班预订统计

Every day a Leetcode 题目来源&#xff1a;1109. 航班预订统计 解法1&#xff1a;差分数组 注意到一个预订记录实际上代表了一个区间的增量。我们的任务是将这些增量叠加得到答案。因此&#xff0c;我们可以使用差分解决本题。 代码&#xff1a; /** lc appleetcode.cn i…

asp.net core监听本地ip地址

开发asp.net core的时候遇到一个问题我想提供访问供其他同事测试&#xff0c;但是默认都是localhost或者127.0.0.1。我想换成我的Ip地址访问但是不行&#xff0c;百度搜索需要更换监听的地址即修改launchSettings.json&#xff0c;修改为0.0.0.0:5248&#xff0c;这样不管local…

力扣hot100 跳跃游戏 II 贪心 思维

Problem: 45. 跳跃游戏 II 思路 &#x1f468;‍&#x1f3eb; 参考 每次在上次能跳到的范围&#xff08;end&#xff09;内选择一个能跳的最远的位置&#xff08;也就是能跳到max_far位置的点&#xff09;作为下次的起跳点 &#xff01; Code ⏰ 时间复杂度: O ( n ) O(n…

DVI接口主机连接VGA显示器解决方案:DVI转VGA转换器DV

DVI转VGA转换器概述 DVI转VGA转换器能够将DVI数字信号转换成VGA模拟信号&#xff0c;通过VGA线缆传输给VGA显示设备使用&#xff0c;这样就能实现DVI接口主机连接VGA接口的显示器。 DVI转VGA转换器DV DVI转VGA转换器DV接口说明 DVI转VGA转换器DV接口介绍 DVI转VGA转换器连接示…

Spring Boot集成RocketMQ

本文目的是&#xff1a;教会你使用Spring Boot集成RocketMQ。 pom.xml文件引入rocketMQ依赖 <!-- rocketmq 依赖--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId>&…

XCTF:warmup[WriteUP]

CtrlU查看页面源码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible&q…

嵌入式学习第十四天!(结构体、共用体、枚举、位运算)

1. 结构体&#xff1a; 1. 结构体类型定义&#xff1a; 嵌入式学习第十三天&#xff01;&#xff08;const指针、函数指针和指针函数、构造数据类型&#xff09;-CSDN博客 2. 结构体变量的定义&#xff1a; 嵌入式学习第十三天&#xff01;&#xff08;const指针、函数指针和…

深度学习侧信道攻击的集成方法

深度学习侧信道攻击的集成方法 深度学习侧信道攻击的集成方法项目背景与意义摘要项目链接作者数据集CHES CTF 数据集ASCAD FIXED KEY 数据集ASCAD RANDOM KEY 数据集 代码代码执行神经网络 深度学习侧信道攻击的集成方法 项目背景与意义 在TCHES2020&#xff08;第4期&#x…

安卓线性布局LinearLayout

<?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:tools"http://schemas.android.com/tools"android:layout_width"match_parent"android:…

如何用MapTalks IDE来发布网站?

简介 MapTalks IDE 全称 MapTalks集成设计环境&#xff08;Integrated Design Environment&#xff09;&#xff0c;是由MapTalks技术团队开发的新一代web地图设计软件。 通过MapTalks IDE&#xff0c;您可以自由的创建二维和三维地图&#xff0c;在其中载入或创建地理数据&a…

【数据结构与算法】之哈希表系列-20240130

这里写目录标题 一、383. 赎金信二、387. 字符串中的第一个唯一字符三、389. 找不同四、409. 最长回文串五、448. 找到所有数组中消失的数字六、594. 最长和谐子序列 一、383. 赎金信 简单 给你两个字符串&#xff1a;ransomNote 和 magazine &#xff0c;判断 ransomNote 能不…

【Midjourney】新手指南:命令

1./ask 向Midjourney提问&#xff0c;不过问题和回答都是英文的&#xff0c;例如&#xff1a; 2./blend 将两张图片合并为一张 ​ 3./describe 上传一张图片&#xff0c;Midjourney会生成四组该图片相关的关键词&#xff0c;可以使用这些关键词再生成图片。 ​ 4./turbo …

力扣 55.跳跃游戏

思路&#xff1a; 从后往前遍历&#xff0c;遇到元素为0时&#xff0c;记录对应的下标位置&#xff0c;再向前遍历元素&#xff0c;看最大的跳跃步数能否跳过0的位置&#xff0c;不能则继续往前遍历 代码&#xff1a; class Solution { public:bool canJump(vector<int>…

毕业设计过程学习

传统的目标检测算法主要通过人工设计与纹理、颜色和形状相关的特征来进行目标区域特征的提取。随着深度学习和人工智能技术的飞速发展&#xff0c;目标检测技术也取得了很大的成就。早期基于深度学习的目标检测算法的研究方向仍然是将目标定位任务和图像分类任务分离开来的&…

uni-app在hbuilderx打开微信开发工具运行

一、运行设置配置微信开发者工具路径 运行-运行到小程序模拟器-运行设置 配置微信开发工具的安装路径&#xff08;可浏览文件位置选择&#xff09;&#xff1b;web服务器端口号在第二步骤获得&#xff1b; 二、打开微信开发者工具设置-安全设置 打开服务端口开关&#xff0…

使用ffmpeg madiamtx制作一个rtsp源

有很多人在跑rtsp解码的demo的时候, 苦于找不到一个可以拉流的源, 这里说一个简单的方法. 使用mediamtx, 加ffmpeg加mp4文件方式, 模拟一个rtsp的源. 基本架构就是这样. 在PC上, 这里说的PC可以是远程的服务器, 也可以是你的开发用的windows, 都行. 把mediamtx, 在pc上跑起来 …

书写触感细腻的电容触控笔,透明造型超好看,西圣Pencil2上手

iPad在配上手写笔之后&#xff0c;才能才能充分发挥优势&#xff0c;实现除看视频之外的更多功能。很多人入手iPad的初衷都是工作或者学习&#xff0c;如果只拿来观剧或玩游戏就太浪费了。当然了&#xff0c;现实情况下&#xff0c;Apple Pencil高昂的定价也是很多人望而却步的…

【2024-01-20】 瑞幸咖啡小程序-blackbox

需要联系主页V 瑞幸咖啡小程序 登入需要过同盾滑块下单需要balckbox参数 测试 下单 过滑块 登入发短信 加密参数

第九节HarmonyOS 常用基础组件20-Divider

1、描述 提供分割器组件&#xff0c;分割不同内容块或内容元素。 2、接口 Divider() 3、属性 名称 参数类型 描述 vertical boolean 使用水平分割线还是垂直分割线。 false&#xff1a;水平分割线 true&#xff1a;垂直分割线 color ResourceColor 分割线颜色 默认…