第十二章 RabbitMQ之失败消息处理策略

目录

一、引言

二、RepublishMessageRecoverer 实现

2.1. 实现步骤

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

2.2.2. 常规交换机队列配置类 

2.2.3. 消费者代码

2.2.4. 消费者yml配置 

2.2.5. 生产者代码 

2.2.6. 生产者yml配置 

 2.2.7. 运行效果


一、引言

Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

二、RepublishMessageRecoverer 实现

在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤

1. 将失败处理策略改为RepublishMessageRecoverer

2. 定义接收失败消息的交换机、队列及其绑定关系

3. 定义RepublishMessageRecoverer

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@BeanQueue errorQueue() {return new Queue("error.queue");}@BeanDirectExchange errorExchange() {return new DirectExchange("error.direct");}@BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

2.2.2. 常规交换机队列配置类 

package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("simple.queue").build();}
}

2.2.3. 消费者代码

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}

2.2.4. 消费者yml配置 

# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2.2.5. 生产者代码 

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}

2.2.6. 生产者yml配置 

# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou

 2.2.7. 运行效果

最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

重新定义自动驾驶的动态视觉?谷歌提出几何优先的动态场景方法MonST3R

导读: 本文引入了Motion DUSt3R (MonST3R),这是一种几何优先的动态场景方法,它以点图的形式直接估计几何形状。相比以前的工作,MonST3R具有如下关键优势: 增强的稳健性,特别是在具有挑战性的场景中&#xf…

【二刷hot-100】day2

目录 1.无重复字符的最长子串 2.找到字符串中所有字母异位词 3.和为 K 的子数组 4.滑动窗口最大值 1.无重复字符的最长子串 class Solution {public int lengthOfLongestSubstring(String s) {Map<Character,Integer> dict new HashMap<>();int ret0;int i-1;for…

从一致性哈希算法带来的分布式系统设计思考

引言 在分布式系统中&#xff0c;数据存储和访问的均匀性、高可用性及可扩展性至关重要。一致性哈希算法&#xff08;Consistent Hashing&#xff09;以其优秀的数据分布特性&#xff0c;广泛应用于缓存、负载均衡和数据库分片等领域&#xff0c;有效提升了系统的稳定性和灵活…

uniapp onPageScroll

子组件有onPageScroll, 首页也要引入onPageScroll, eg: 主页面 sell/detail/index 《子组件》 <script setup> 引入onPageScroll </script> 组件&#xff1a; 引入onPageScroll 别人的比较

如果使用 Iptables 配置端口转发 ?

现实生活中&#xff0c;港口转发就像在一个大型公寓大楼里告诉送货司机该去哪里。通常情况下&#xff0c;该建筑群的正门是不对外开放的。但如果里面有人想要快递&#xff0c;他们可以告诉保安让司机进来&#xff0c;并指引他们到特定的公寓。 类似地&#xff0c;在计算机网络…

jeecg3版本的vue,离线启动

jeecg的vue2版本已经停止维护&#xff0c;所以只能用vue3的版本。3版本中使用的是pnpm&#xff08;npm的增强版本&#xff09;下载依赖。使用pnpm安装的node_modules&#xff0c;不能直接复制到离线主机中&#xff08;因为在 pnpm安装过程中&#xff0c;会给依赖的配置文件写死…

Elasticsearch 入门

ES 概述 ES 是一个开源的高扩展的分布式全文搜索引擎。 倒排索引 环境准备 Elasticsearch 官方地址&#xff1a;https://www.elastic.co/cn/ 下载地址&#xff1a; 注意&#xff1a;9300 端口为 Elasticsearch 集群间组件的通信端口&#xff0c;9200 端口为浏览器访问的 h…

【赵渝强老师】K8s中Deployment控制器与StatefulSet控制器的区别

一、K8s的Deployment与StatefulSets 在K8s中&#xff0c;Deployment将Pod部署成无状态的应用程序&#xff0c;它只关心Pod的数量、Pod更新方式、使用的镜像和资源限制等。由于是无状态的管理方式&#xff0c;因此Deployment中没有角色和顺序的概念&#xff0c;换句话说&#xf…

vue项目页面白边如何解决

这是出现白边的页面 原因是vue项目创建时在main.js下它引入了刚开始提供的main.css全局设置 直接把该设置注释掉即可&#xff0c; 然后在App.vue中添加如下style&#xff0c;就大功告成了

2025推荐选题|微信小程序实现经济新闻资讯

作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验&#xff0c;被多个学校常年聘为校外企业导师&#xff0c;指导学生毕业设计并参与学生毕业答辩指导&#xff0c;…

2.stm32 GPIO输出

GPIO简介 GPIO&#xff08;General Purpose Input Output&#xff09;通用输入输出口 可配置为8种输入输出模式 引脚电平&#xff1a;0V~3.3V&#xff0c;部分引脚可容忍5V 输出模式下可控制端口输出高低电平&#xff0c;用以驱动LED、控制蜂鸣器、模拟通信协议输出时序等 …

tensorflow入门案例手写数字识别人工智能界的helloworld项目落地1

参考 https://tensorflow.google.cn/?hlzh-cn https://tensorflow.google.cn/tutorials/keras/classification?hlzh-cn 项目资源 https://download.csdn.net/download/AnalogElectronic/89872174 文章目录 一、案例学习1、导入测试和训练数据集&#xff0c;定义模型&#xff…

【R语言】随机森林+相关性热图组合图

数据概况文末有获取方式 随机森林部分 #调用R包 library(randomForest) library(rfPermute) library(ggplot2) library(psych) library(reshape2) library(patchwork) library(reshape2) library(RColorBrewer) ​ ​ #读取数据 df<-read.csv("F:\\EXCEL-元数据\\2020…

深度学习之残差网络ResNet

文章目录 1. 残差网络定义2. 数学基础函数类3. 残差块4.ResNet模型5.训练模型6.小结 1. 残差网络定义 随着我们设计的网络越来越深&#xff0c;深刻理解“新添加的层如何提升神经网络的性能”变得至关重要。更重要的是设计网络的能力。在这种网络中&#xff0c;添加层会使得网…

2010年国赛高教杯数学建模A题储油罐的变位识别与罐容表标定解题全过程文档及程序

2010年国赛高教杯数学建模 A题 储油罐的变位识别与罐容表标定 通常加油站都有若干个储存燃油的地下储油罐&#xff0c;并且一般都有与之配套的“油位计量管理系统”&#xff0c;采用流量计和油位计来测量进/出油量与罐内油位高度等数据&#xff0c;通过预先标定的罐容表&#…

JDBC的学习

一、JDBC DriverManager 二、JDBC connection 三、 JDBC Statement 1.DML 2.DDL 四、JDBC ResultSet 五、JDBC PreparedStatement

30.第二阶段x86游戏实战2-遍历周围-C++遍历二叉树(玩家角色基址)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 本人写的内容纯属胡编乱造&#xff0c;全都是合成造假&#xff0c;仅仅只是为了娱乐&#xff0c;请不要…

Prometheus运维监控平台之监控指标注册到consul脚本开发、自定义监控项采集配置调试(三)

系列文章目录 运维监控平台搭建 运维监控平台监控标签 golang_Consul代码实现Prometheus监控目标的注册以及动态发现与配置V1版本 文章目录 系列文章目录目的一、监控指标注册到consul的golang脚本开发1、修改settings.yaml文件2、修改config/ocnsul,go文件3、修改core/consul…

让你的MacOS剪切板变得更加强大,如何解决复制内容覆盖的问题

MacOS的日常使用过程中&#xff0c;肯定少不了复制粘贴&#xff0c;不论是文本内容还是文件&#xff0c;复制粘贴是避不开的操作&#xff0c;如果需要复制粘贴的内容不多&#xff0c;那么普通的复制粘贴就可以完成了&#xff0c;但是当有同样的内容需要输入不同的地方的时候&am…

C++的魔法世界:类和对象的终章

文章目录 一、再探构造函数二、类型转换2.1隐式类型转换2.2内置类型的类型转化2.3explicit关键字2.4多参数构造 三、static成员四、友元五、内部类内部类的特性 六、匿名对象 一、再探构造函数 类和对象(中)里介绍的构造函数&#xff0c;使用的是赋值实现成员变量的初始化。而…