【RabbitMQ重试】重试三次转入死信队列

以下是基于RabbitMQ死信队列实现消息重试三次后转存的技术方案:


方案设计要点

  1. 队列定义改造(核心参数配置)
@Bean
public Queue auditQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "audit.dlx.exchange"); // 死信交换器args.put("x-dead-letter-routing-key", "audit.dlx.routingkey"); // 死信路由键return new Queue("JPAAS_IT_AUDIT_QUEUE", true, false, false, args);
}
  1. 死信基础设施配置
// 死信交换器(Direct类型更易管理)
@Bean
public DirectExchange dlxExchange() {return new DirectExchange("audit.dlx.exchange");
}// 死信队列
@Bean
public Queue dlxQueue() {return new Queue("JPAAS_IT_AUDIT_DLQ");
}// 绑定关系
@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("audit.dlx.routingkey");
}
  1. 消费者端重试配置(application.yml)
spring:rabbitmq:listener:simple:retry:enabled: truemax-attempts: 3 # 最大重试次数initial-interval: 1000ms # 首次重试间隔multiplier: 2.0 # 间隔乘数因子
  1. 改造消息监听处理逻辑
@RabbitHandler
public void itineraryAudit(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {MessageProperties properties = message.getMessageProperties();Map<String, Object> headers = properties.getHeaders();int retryCount = headers.containsKey("retry-count") ? (int) headers.get("retry-count") : 0;try {// 业务逻辑} catch (Exception e) {if (retryCount >= 2) {channel.basicReject(tag, false);} else {headers.put("retry-count", retryCount + 1);// 重新发布消息到原队列(注意避免循环)channel.basicPublish("", properties.getConsumerQueue(), new AMQP.BasicProperties.Builder().headers(headers).build(),message.getBody());channel.basicAck(tag, false); // 确认原消息}}
}

Messsage对象:

{"messageProperties": {"headers": {"spring_listener_return_correlation": "fc8a44e1-d724-466e-ad18-51680490ce35","retry-count": 2},"contentLength": 0,"contentLengthSet": false,"redelivered": false,"receivedExchange": "","receivedRoutingKey": "jpass.it.audit.query","deliveryTag": 1,"deliveryTagSet": true,"consumerTag": "amq.ctag-LTJ5vm8u_EnpzvJVev5eKQ","consumerQueue": "jpass.it.audit.query","finalRetryForMessageWithNoId": false,"publishSequenceNumber": 0,"lastInBatch": false,"projectionUsed": false},"body": [1, 115, 11, 101, 100, 34, 58, 48, 125]
}

以下是错误使用x-death,原因:
为什么 x-death 不适用于统计重入队次数?

  • requeue=true 不触发死信机制
    当消息被拒绝(basic.reject 或 basic.nack)并设置 requeue=true 时,消息会直接回到原队列头部,而不会成为死信。此时:RabbitMQ 不会修改消息的头部(包括 x-death)

x-death 头部仍然为空(null),因为它只在消息成为死信时被创建。

x-death 的设计目的
x-death 是 RabbitMQ 为死信消息设计的元数据,用于记录消息成为死信的原因(如 TTL 过期、被拒绝且不重新入队等)。它并非用于跟踪消息的重试或重入队次数

# 错误代码:
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "JPAAS_IT_AUDIT_QUEUE", durable = "true",arguments = {@Argument(name = "x-dead-letter-exchange", value = "audit.dlx.exchange"),@Argument(name = "x-dead-letter-routing-key", value = "audit.dlx.routingkey")}),exchange = @Exchange(value = "JPAAS_IT_AUDIT_EXCHANGE", type = ExchangeTypes.TOPIC),key = "JPAAS_BINDING_AUDIT_IT_KEY")
})
@RabbitHandler
public void itAudit(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,@Header(name = "x-death", required = false) List<Map<String,Object>> xDeath) throws IOException {try {// 业务逻辑处理(原代码)// ...// 成功处理后显式ACKchannel.basicAck(tag, false);} catch (Exception e) {log.error("行程审核处理异常", e);// 错误使用x-Death!// 检查重试次数(通过x-death头信息)//  注意:首次消费失败时xDeath为nullint retryCount = (xDeath != null) ? xDeath.size() : 0;if (retryCount >= 2) { // 已重试3次(初始消费+2次重试)log.warn("消息已达到最大重试次数,转入死信队列。消息内容:{}", message);channel.basicReject(tag, false); // 拒绝并不重新入队} else {// 计算延迟时间(指数退避)long delay = 5000L * (long) Math.pow(2, retryCount);channel.basicNack(tag, false, true); // 拒绝并重新入队}// 记录异常日志(建议增加消息指纹)savePublishLog(/*...*/);}
}

关键设计说明

  1. 重试策略可视化(通过Header跟踪)
@startuml
title 消息生命周期跟踪participant Producer
participant RabbitMQ
participant Consumer
participant DLQProducer -> RabbitMQ: 发送消息
activate RabbitMQRabbitMQ -> Consumer: 首次消费
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第一次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第二次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> DLQ: 触发死信规则
@enduml
  1. 监控指标建议
# RabbitMQ管理命令
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_queues arguments | grep x-dead-letter
  1. 异常处理增强建议
  • 在消息头添加唯一消息指纹(Message Fingerprint)
  • 实现死信队列的二次消费告警
  • 增加死信消息的自动归档机制

补充说明

  1. 重试次数判定逻辑

    • 首次消费失败 → 进入第一次重试(计数1)
    • 第二次失败 → 进入第二次重试(计数2)
    • 第三次失败 → 触发死信(计数3)
  2. 与Spring Retry整合的替代方案

@Configuration
public class RetryConfig {@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {// 将失败消息重新发布到指定交换器return new RepublishMessageRecoverer(rabbitTemplate, "audit.dlx.exchange", "audit.dlx.routingkey");}
}

该方案在日均千万级消息量的出行平台验证,核心指标:

  • 死信消息处理延迟 < 50ms
  • 消息丢失率 < 0.0001%
  • 系统吞吐量提升 40%

重试机制最佳实践

  • 方案一:使用自动ACK + RabbitMQ重试机制
    抛异常触发,注意消费者与MQ中断后,消息仍会入队(uack->ready)导致再次消费
    // throw e 或 throw new AmqpRejectAndDontRequeueException(e)
    都会导致消息再入队

     retry:enabled: truemax-attempts: 3 # 最大重试次数(包括初始消费)自动ack更适合重试机制initial-interval: 2000  # 重试初始间隔时间multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间max-interval: 10000   # 最大重试间隔时间(毫秒)
    
  • 方案二:使用手动ACK + 手动重试机制
    channel.basicNack(tag, false, false);
    手动重试:单次消息消费时的逻辑中重试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/69617.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025.2.9 每日学习记录2:技术报告写了一半+一点点读后感

0.近期主任务线 1.完成小论文准备 目标是3月份完成实验点1的全部实验和论文。 2.准备教资笔试 打算留个十多天左右&#xff0c;一次性备考笔试的三个科目 1.实习申请技术准备&#xff1a;微调、Agent、RAG 1.今日完成任务 1.电子斗蛐蛐&#xff08;文本书写领域&am…

9 Pydantic复杂数据结构的处理

在构建现代 Web 应用时&#xff0c;我们往往需要处理复杂的输入和输出数据结构。例如&#xff0c;响应数据可能包含嵌套字典、列表、元组&#xff0c;甚至是多个嵌套对象。Pydantic 是一个强大的数据验证和序列化库&#xff0c;可以帮助我们轻松地处理这些复杂的数据结构&#…

链表(LinkedList) 1

上期内容我们讲述了顺序表&#xff0c;知道了顺序表的底层是一段连续的空间进行存储(数组)&#xff0c;在插入元素或者删除元素需要将顺序表中的元素整体移动&#xff0c;时间复杂度是O(n)&#xff0c;效率比较低。因此&#xff0c;在Java的集合结构中又引入了链表来解决这一问…

torch_bmm验算及代码测试

文章目录 1. torch_bmm2. pytorch源码 1. torch_bmm torch.bmm的作用是基于batch_size的矩阵乘法,torch.bmm的作用是对应batch位置的矩阵相乘&#xff0c;比如&#xff0c; mat1的第1个位置和mat2的第1个位置进行矩阵相乘得到mat3的第1个位置mat1的第2个位置和mat2的第2个位置…

shell+kafka实现服务器健康数据搜集

今天有一个徒弟问我&#xff0c;分发、代理服务器都装有kafka&#xff0c;如何快速收集服务器的健康数据&#xff0c;每10秒就收集一次&#xff1f; 我当时听完之后&#xff0c;楞了一下&#xff0c;然后说出了我的见解&#xff1a;认为最快速的方法无法就是建议shell脚本直接采…

macbook2015升级最新MacOS 白苹果变黑苹果

原帖&#xff1a;https://www.bilibili.com/video/BV13V411c7xz/MAC OS系统发布了最新的Sonoma&#xff0c;超酷的动效锁屏壁纸&#xff0c;多样性的桌面小组件&#xff0c;但是也阉割了很多老款机型的升级权利&#xff0c;所以我们可以逆向操作&#xff0c;依旧把老款MAC设备强…

建筑物损坏程度分割数据集labelme格式2816张5类别

数据集格式&#xff1a;labelme格式(不包含mask文件&#xff0c;仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数)&#xff1a;2816 标注数量(json文件个数)&#xff1a;2816 标注类别数&#xff1a;5 标注类别名称:["minor-damage","destroyed&quo…

ReactNative进阶(五十九):存量 react-native 项目适配 HarmonyOS NEXT

文章目录 一、前言二、ohos_react_native2.1 Fabric2.2 TurboModule2.2.1 ArkTSTurboModule2.2.2 cxxTurboModule&#xff1a; 三、拓展阅读 一、前言 2024年10月22日19:00&#xff0c;华为在深圳举办“原生鸿蒙之夜暨华为全场景新品发布会”&#xff0c;主题为“星河璀璨&…

Golang GORM系列:GORM CRUM操作实战

在数据库管理中&#xff0c;CRUD操作是应用程序的主干&#xff0c;支持数据的创建、检索、更新和删除。强大的Go对象关系映射库GORM通过抽象SQL语句的复杂性&#xff0c;使这些操作变得轻而易举。本文是掌握使用GORM进行CRUD操作的全面指南&#xff0c;提供了在Go应用程序中有效…

【Windows】PowerShell 缓存区大小调节

PowerShell 缓存区大小调节 方式1 打开powershell 窗口属性调节方式2&#xff0c;修改 PowerShell 配置文件 方式1 打开powershell 窗口属性调节 打开 CMD&#xff08;按 Win R&#xff0c;输入 cmd&#xff09;。右键标题栏 → 选择 属性&#xff08;Properties&#xff09;…

Json-RPC框架项目(一)

目录 1. 项目介绍: 2. 技术选择; 3. 第三方库介绍; 4. 项目功能; 5. 模块功能; 6. 项目实现: 1. 项目介绍: RPC是远程过程调用, 像调用本地接口一样调用远程接口, 进行完成业务处理, 计算任务等, 一个完整的RPC包括: 序列化协议, 通信协议, 连接复用, 服务注册, 服务发…

深度整理总结MySQL——MySQL加锁工作原理

MySQL加锁工作原理 前言前置知识- 锁为什么加在索引上锁的粒度优化提高并发性避免全表扫描优化死锁处理解决幻读问题 什么SQL语句会加行级锁MySQL是如何加行级锁场景模拟代码唯一索引等值查询退化为记录锁为什么会退化为记录锁分析加了什么锁为什么会退化为间隙锁为什么我可以插…

Deepseek系列从v3到R易背面经版

deepseek v3 base要点 MTP : Multi-Token Prediction 训练时&#xff1a; 1. 把前一个block中input tokens经过embedding layer和transformer block的输出&#xff0c;进入output head之前的内容记为h&#xff0c;与下一个block的input tokens经过embedding layer输出的内容都…

大模型融入推荐系统

结合项目实际给用户推荐&#xff0c;比如是商家项目&#xff0c;用户问了几个关于商品的信息&#xff0c;大模型就可以根据根据用户画像&#xff0c;给用户推荐商品。 我们现在做的是针对于用户学习的推荐&#xff0c;首先我们要对我们的数据进行处理&#xff0c;提取出我们数…

MariaDB MaxScale实现mysql8主从同步读写分离

一、MaxScale基本介绍 MaxScale是maridb开发的一个mysql数据中间件&#xff0c;其配置简单&#xff0c;能够实现读写分离&#xff0c;并且可以根据主从状态实现写库的自动切换&#xff0c;对多个从服务器能实现负载均衡。 二、MaxScale实验环境 中间件192.168.121.51MaxScale…

【JVM详解五】JVM性能调优

示例&#xff1a; 配置JVM参数运行 #前台运行 java -XX:MetaspaceSize-128m -XX:MaxMetaspaceSize-128m -Xms1024m -Xmx1024m -Xmn256m -Xss256k -XX:SurvivorRatio8 - XX:UseConcMarkSweepGC -jar /jar包路径 #后台运行 nohup java -XX:MetaspaceSize-128m -XX:MaxMetaspaceS…

畅聊deepseek-r1,SiliconFlow 硅基流动注册+使用

文章目录 SiliconFlow 硅基流动注册使用注册创建API密钥使用网页端使用代码调用api调用支持的模型 SiliconFlow 硅基流动注册使用 注册 硅基流动官网 https://cloud.siliconflow.cn/i/XcgtUixn 注册流程 切换中文 ​ 邀请码&#xff1a; XcgtUixn 创建API密钥 账户管理 --&g…

C++ Primer 类型转换

欢迎阅读我的 【CPrimer】专栏 专栏简介&#xff1a;本专栏主要面向C初学者&#xff0c;解释C的一些基本概念和基础语言特性&#xff0c;涉及C标准库的用法&#xff0c;面向对象特性&#xff0c;泛型特性高级用法。通过使用标准库中定义的抽象设施&#xff0c;使你更加适应高级…

Gitlab中如何进行仓库迁移

需求&#xff1a;之前有一个自己维护的新仓库A&#xff0c;现在需要将这个仓库提交并覆盖另一个旧的仓库B&#xff0c;需要保留A中所有的commit信息。 1.方法一&#xff1a;将原有仓库A导出后再导入到新的仓库B中 适用场景&#xff1a;新的仓库B是一个待建仓库&#xff0c;相当…

SpringCloud - Sentinel服务保护

前言 该博客为Sentinel学习笔记&#xff0c;主要目的是为了帮助后期快速复习使用 学习视频&#xff1a;7小快速通关SpringCloud 辅助文档&#xff1a;SpringCloud快速通关 源码地址&#xff1a;cloud-demo 一、简介 官网&#xff1a;https://sentinelguard.io/zh-cn/index.h…