第十二章 RabbitMQ之失败消息处理策略

目录

一、引言

二、RepublishMessageRecoverer 实现

2.1. 实现步骤

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

2.2.2. 常规交换机队列配置类 

2.2.3. 消费者代码

2.2.4. 消费者yml配置 

2.2.5. 生产者代码 

2.2.6. 生产者yml配置 

 2.2.7. 运行效果


一、引言

Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

二、RepublishMessageRecoverer 实现

在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤

1. 将失败处理策略改为RepublishMessageRecoverer

2. 定义接收失败消息的交换机、队列及其绑定关系

3. 定义RepublishMessageRecoverer

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@BeanQueue errorQueue() {return new Queue("error.queue");}@BeanDirectExchange errorExchange() {return new DirectExchange("error.direct");}@BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

2.2.2. 常规交换机队列配置类 

package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("simple.queue").build();}
}

2.2.3. 消费者代码

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}

2.2.4. 消费者yml配置 

# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2.2.5. 生产者代码 

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}

2.2.6. 生产者yml配置 

# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou

 2.2.7. 运行效果

最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Codeforces Round 883 (Div. 3) G. Rudolf and CodeVid-23(Dijkstra最短路)

题目链接 Codeforces Round 883 (Div. 3) G. Rudolf and CodeVid-23 思路 因为 n n n最大值为 10 10 10,且只有 01 01 01两种状态,当作二进制数转化为十进制数后最多只有 1024 1024 1024种。 因为 m m m的最大值为 1 e 3 1e3 1e3,因此我们…

重新定义自动驾驶的动态视觉?谷歌提出几何优先的动态场景方法MonST3R

导读: 本文引入了Motion DUSt3R (MonST3R),这是一种几何优先的动态场景方法,它以点图的形式直接估计几何形状。相比以前的工作,MonST3R具有如下关键优势: 增强的稳健性,特别是在具有挑战性的场景中&#xf…

unity动态批处理

unity动态批处理 动态批处理要求和兼容性渲染管线兼容性 使用动态批处理网格的动态批处理限制动态生成几何体的动态批处理 动态批处理 动态批处理是一种绘制调用批处理方法,用于批处理移动的 GameObjects 以减少绘制调用。动态批处理在处理网格和 Unity 在运行时动…

【系统架构设计师】案例专题三:数据库系统考点梳理

更多内容请见: 备考系统架构设计师-核心总结目录 摘要:本文主要梳理系统架构设计师 - 数据库系统 案例考点 ,主要包括ORM技术、关系型数据库、内存数据库、NoSQL、规范化、分布式数据库、数据仓库集成等。 文章目录 一、ORM技术二、数据库分类比较三、并发控制四、封锁协议…

【二刷hot-100】day2

目录 1.无重复字符的最长子串 2.找到字符串中所有字母异位词 3.和为 K 的子数组 4.滑动窗口最大值 1.无重复字符的最长子串 class Solution {public int lengthOfLongestSubstring(String s) {Map<Character,Integer> dict new HashMap<>();int ret0;int i-1;for…

Oracle实际需要用到但常常被忽略的函数

1、Oracle中nvl()与nvl2()函数 函数nvl(expression1,expression2)根据参数1是否为null返回参数1或参数2的值&#xff1b; 函数nvl2(expression1,expression2,expression3)根据参数1是否为null返回参数2或参数3的值 【函数格式】&#xff1a;nvl(expression1,expression2) 若…

从一致性哈希算法带来的分布式系统设计思考

引言 在分布式系统中&#xff0c;数据存储和访问的均匀性、高可用性及可扩展性至关重要。一致性哈希算法&#xff08;Consistent Hashing&#xff09;以其优秀的数据分布特性&#xff0c;广泛应用于缓存、负载均衡和数据库分片等领域&#xff0c;有效提升了系统的稳定性和灵活…

快速了解AUTOSAR CP DEM模块作用与实现工作原理

在AUTOSAR&#xff08;Automotive Open System Architecture&#xff09;Classic Platform&#xff08;CP&#xff09;中&#xff0c;DEM&#xff08;Diagnostic Event Manager&#xff09;模块的主要作用是记录和管理ECU&#xff08;Electronic Control Unit&#xff09;内的诊…

uniapp onPageScroll

子组件有onPageScroll, 首页也要引入onPageScroll, eg: 主页面 sell/detail/index 《子组件》 <script setup> 引入onPageScroll </script> 组件&#xff1a; 引入onPageScroll 别人的比较

如果使用 Iptables 配置端口转发 ?

现实生活中&#xff0c;港口转发就像在一个大型公寓大楼里告诉送货司机该去哪里。通常情况下&#xff0c;该建筑群的正门是不对外开放的。但如果里面有人想要快递&#xff0c;他们可以告诉保安让司机进来&#xff0c;并指引他们到特定的公寓。 类似地&#xff0c;在计算机网络…

jeecg3版本的vue,离线启动

jeecg的vue2版本已经停止维护&#xff0c;所以只能用vue3的版本。3版本中使用的是pnpm&#xff08;npm的增强版本&#xff09;下载依赖。使用pnpm安装的node_modules&#xff0c;不能直接复制到离线主机中&#xff08;因为在 pnpm安装过程中&#xff0c;会给依赖的配置文件写死…

2025届保研-优营率0%上岸C9

保研面试心得 以上是标题 说在前面 本人保研边缘人&#xff0c;基本不陶瓷老师&#xff08;因为没啥成果&#xff0c;但是没啥用&#xff0c;不如有offer再套&#xff0c;结果0offer&#xff09;&#xff0c;大三还在努力卷保研资格。绩点不高专业一般英语一般项目很水&#…

【C++刷题】力扣-#118-杨辉三角

题目描述 给定一个非负整数 numRows&#xff0c;生成杨辉三角的前 numRows 行。在杨辉三角中&#xff0c;每个数是它正上方两个数的和。 示例 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]]示例 2: 输入: numRows 1 输出: [[1]]题解 这个问题…

Elasticsearch 入门

ES 概述 ES 是一个开源的高扩展的分布式全文搜索引擎。 倒排索引 环境准备 Elasticsearch 官方地址&#xff1a;https://www.elastic.co/cn/ 下载地址&#xff1a; 注意&#xff1a;9300 端口为 Elasticsearch 集群间组件的通信端口&#xff0c;9200 端口为浏览器访问的 h…

Leetcode 3321. Find X-Sum of All K-Long Subarrays II

Leetcode 3321. Find X-Sum of All K-Long Subarrays II 1. 解题思路2. 代码实现 题目链接&#xff1a;3321. Find X-Sum of All K-Long Subarrays II 1. 解题思路 这一题同样虽然是一道hard的题目&#xff0c;但也是比较常规的&#xff0c;就是通过一个滑动窗口不断地维护当…

【赵渝强老师】K8s中Deployment控制器与StatefulSet控制器的区别

一、K8s的Deployment与StatefulSets 在K8s中&#xff0c;Deployment将Pod部署成无状态的应用程序&#xff0c;它只关心Pod的数量、Pod更新方式、使用的镜像和资源限制等。由于是无状态的管理方式&#xff0c;因此Deployment中没有角色和顺序的概念&#xff0c;换句话说&#xf…

vue项目页面白边如何解决

这是出现白边的页面 原因是vue项目创建时在main.js下它引入了刚开始提供的main.css全局设置 直接把该设置注释掉即可&#xff0c; 然后在App.vue中添加如下style&#xff0c;就大功告成了

MVC与MVVM

mvp mvvm区别 ‌MVP&#xff08;‌Model-View-Presenter&#xff09;和‌MVVM&#xff08;Model-View-ViewModel&#xff09;是两种常见的软件架构设计模式&#xff0c;它们在架构和通信方式上存在明显的区别。 ‌MVP模式‌&#xff1a; MVP是从MVC&#xff08;Model-View-Co…

2025推荐选题|微信小程序实现经济新闻资讯

作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验&#xff0c;被多个学校常年聘为校外企业导师&#xff0c;指导学生毕业设计并参与学生毕业答辩指导&#xff0c;…

2.stm32 GPIO输出

GPIO简介 GPIO&#xff08;General Purpose Input Output&#xff09;通用输入输出口 可配置为8种输入输出模式 引脚电平&#xff1a;0V~3.3V&#xff0c;部分引脚可容忍5V 输出模式下可控制端口输出高低电平&#xff0c;用以驱动LED、控制蜂鸣器、模拟通信协议输出时序等 …