如何保证消息不丢之MQ重试机制消息队列

1. 简介

死信队列,简称:DLXDead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另外一个交换机,这个交换机就是DLX

在这里插入图片描述

那么什么情况下会成为Dead message?

队列的长度达到阈值。
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
原队列存在消息过期设置,消息到达超时时间未被消费。

流程讲解,如图所示(以第三种情况为例):

Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX。
DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
此时消费者监听此死信队列并消费此消息。

那么什么情况下会成为Dead message?

队列的长度达到阈值。
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
原队列存在消息过期设置,消息到达超时时间未被消费。

流程讲解,如图所示(以第三种情况为例):

Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX。
DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
此时消费者监听此死信队列并消费此消息。

死信队列有什么用呢?

  1. 取消订单(比如下单30分钟后未付款,则取消订单,回滚库存),或者新用户注册,隔段时间进行短信问候等。
  2. 将消费者拒绝的消息发送到死信队列,然后将消息进行持久化,后续可以做业务分析或者处理。

2. TTL

因为要实现延迟消息,我们先得知道如何设置过期时间。这里指演示

TTL :Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。 例:现在有两条消息,第一条消息过期时间为30s,而第二条消息过期时间为15s,当过了15秒后,第二条消息不会立即过期,而是要等第一条消息被消费后,第二条消息被消费时,才会判断是否过期,所以当所有消息的过期时间一致时(比如30m后过期),最好给队列设置过期时间,而不是消息。但是有的情况确实每个消息的过期时间不一致,比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知,这就有问题了,不可能设置那么多的队列。
如果两者都进行了设置,以时间短的为准。

3. 利用死信队列的机制, 带有重试队列机制的消费队列的流程图:

在这里插入图片描述

动态绑定队列和重试队列:

/*** 遍历所有的 枚举队列 手动注册队列等相关bean到spring容器中*/
@Component
public class QueueAutoRegisterAware implements BeanFactoryAware {@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;for (QueueDeclareEnum value : QueueDeclareEnum.values()) {// 正常消息 队列 交换器 路由键绑定Queue queue = new Queue(value.getQueueName());DirectExchange directExchange = new DirectExchange(value.getExchangeName());Binding binding = BindingBuilder.bind(queue).to(directExchange).with(value.getRoutingKey());// 注册beandefaultListableBeanFactory.registerSingleton(value.getQueueName(), queue);defaultListableBeanFactory.registerSingleton(value.getExchangeName(), directExchange);defaultListableBeanFactory.registerSingleton(value.getRoutingKey(), binding);// TODO 重试队列 交换器 路由键(暂时没有处理, 后续需要用到mq机制则可以进行加入AOP切面重试机制)if (StringUtil.isNotBlank(value.getRetryQueueName()) && StringUtil.isNotBlank(value.getRetryRoutingKey())) {Map<String, Object> dlqParamMap = new HashMap<>(2);dlqParamMap.put("x-dead-letter-exchange", value.getExchangeName());dlqParamMap.put("x-dead-letter-routing-key", value.getRoutingKey());Queue retryQueue = new Queue(value.getRetryQueueName(), true, false, false, dlqParamMap);Binding retryBindIng = BindingBuilder.bind(retryQueue).to(directExchange).with(value.getRetryRoutingKey());defaultListableBeanFactory.registerSingleton(value.getRetryQueueName(), retryQueue);defaultListableBeanFactory.registerSingleton(value.getRetryRoutingKey(), retryBindIng);}}}
}

Aop异常切面,拦截异常把设置消息的重试次数和ttl过期时间,发送重试队列中;

/*** RabbitMQ监听器 切面* 异常消息发送到重试队列*/
@Aspect
@Component
@Slf4j
@AllArgsConstructor
public class RabbitListenerAspect {private final RabbitTemplate rabbitTemplate;private final IFailureService failureService;private final TmsProperties tmsProperties;private final IOrderRequestService orderRequestService;@Pointcut("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")public void pointCut() {}@Around("pointCut()")public Object around(ProceedingJoinPoint joinPoint) {log.debug("MQ切面接收到消息");AtomicReference<Message> messageReference = new AtomicReference<>();Arrays.stream(joinPoint.getArgs()).forEach(arg -> {if (arg instanceof Message) {messageReference.set((Message) arg);}});Message message = messageReference.get();if (message == null) {log.debug("mq消息内容为空,不执行相关操作");return null;}Object proceed;// 消费出现异常时 会根据配置发送到重试队列try {//绑定租户String tenantId = message.getMessageProperties().getHeader(ForecastConstant.TENANT_ID);if (tmsProperties.isTenantApp()) {TmsTenantUtil.bindId(tenantId);}proceed = joinPoint.proceed();if (tmsProperties.isTenantApp()) {TmsTenantUtil.unbind();}} catch (ForecastException e) {log.info("预报失败,直接回传失败信息", e);this.forecastMqExceptionDeal(message, e);return null;} catch (Throwable throwable) {log.error("消费失败,异常信息:{}", throwable);this.sendToRetryQueue(message, throwable);return null;}return proceed;}/*** 发送到重试队列** @param message 消息内容*/private void sendToRetryQueue(Message message, Throwable throwable) {sendToRetryQueue(message, throwable, true);}/*** 发送到重试队列** @param message 消息内容*/private void sendToRetryQueue(Message message, Throwable throwable, Boolean addRetryCount) {String consumerQueue = message.getMessageProperties().getConsumerQueue();// 重试次数 默认0Optional<QueueDeclareEnum> first = Arrays.stream(QueueDeclareEnum.values()).filter(value ->value.getQueueName().equalsIgnoreCase(consumerQueue)).findFirst();if (!first.isPresent()) {return;}QueueDeclareEnum queueDeclareEnum = first.get();String retryQueueName = queueDeclareEnum.getRetryQueueName();String retryRoutingKey = queueDeclareEnum.getRetryRoutingKey();if (StringUtil.isEmpty(retryQueueName) || StringUtil.isEmpty(retryRoutingKey)) {log.info("当前队列没有配置重试队列 不进行重试,队列名:{}", queueDeclareEnum.getQueueName());return;}Integer retryCount = (Integer) Optional.ofNullable(message.getMessageProperties().getHeader(ForecastConstant.RETRY_COUNT)).orElse(0);//有效的操作 重试次数才+1if (addRetryCount) {retryCount++;}if (retryCount == 1) {try {failureService.deal(message, throwable);} catch (Exception e) {log.error("失败最大次数处理异常", e);}}if (retryCount > queueDeclareEnum.getMaxRetryCount()) {log.info("当前消息超过队列配置最大重试次数,不进行重试,队列名:{}", queueDeclareEnum.getQueueName());return;}//如果是手动重试的 则不进入重试队列if (message.getMessageProperties().getHeader(ForecastConstant.IS_HAND)) {return;}Message retryMessage = MessageBuilder.fromMessage(message).setHeader(ForecastConstant.RETRY_COUNT, retryCount).build();this.convertAndRetry(retryMessage, queueDeclareEnum);}/*** 预报失败处理*/private void forecastMqExceptionDeal(Message message, ForecastException mqException) {switch (mqException.getErrorCode()) {case ERROR_CODE_1:OrderEntityBO orderEntityBO = (OrderEntityBO) mqException.getData();//1.回传失败failureService.deal(message, mqException);//2.塞到重试任务中去 第二天0时执行orderRequestService.saveEntity(new OrderTaskEntity(orderEntityBO.getOrder().getCode(),OrderRequestTypeEnum.RETRY_FORECAST, getOperateTime()));//3.记录到redis中redisDeal(orderEntityBO);break;case ERROR_CODE_2://不增加重试次数sendToRetryQueue(message, mqException, false);break;case ERROR_CODE_3:Integer retryCount = (Integer) Optional.ofNullable(message.getMessageProperties().getHeader(ForecastConstant.RETRY_COUNT)).orElse(0);if (retryCount == 3) {WXCallUtil.call(mqException.getCallMessage());}this.sendToRetryQueue(message, mqException);default:}}private void redisDeal(OrderEntityBO orderEntityBO) {String channelCode = orderEntityBO.getOrder().getChannelCode();String country = orderEntityBO.getReceiver().getCountryCode();String ruleName = country + "_" + channelCode;List<String> list = TmsRedisUtil.get(ForecastRedisConstant.CHANNEL_LIMIT_DATA);if (ObjectUtil.isEmpty(list)) {list = new ArrayList<>();}if (list.contains(ruleName)) {return;}list.add(ruleName);TmsRedisUtil.set(ForecastRedisConstant.CHANNEL_LIMIT_DATA, list);TmsRedisUtil.expireAt(ForecastRedisConstant.CHANNEL_LIMIT_DATA, getSecondDayZero());}/*** 获取第二天0时 date** @return*/private Date getSecondDayZero() {Instant instant = new Date().toInstant();return DateUtil.beginOfDay(Date.from(instant.plus(Duration.ofDays(1))));}/*** 避免服务器时间不同步 向后兼容5分钟** @return*/private Date getOperateTime() {//如果当前时间小于0时5分 则下次重试时间设为当天零时Date date = DateUtil.beginOfDay(new Date());if (System.currentTimeMillis() - date.getTime() < 5 * 60 * 1000) {return date;}return getSecondDayZero();}/*** 发送消息到重试队列** @param message          重试消息* @param queueDeclareEnum 队列枚举*/public void convertAndRetry(Message message, QueueDeclareEnum queueDeclareEnum) {String ttlTime = String.valueOf(queueDeclareEnum.getTtlTime() * (Integer) message.getMessageProperties().getHeader(ForecastConstant.RETRY_COUNT));message.getMessageProperties().setExpiration(ttlTime);rabbitTemplate.convertAndSend(queueDeclareEnum.getExchangeName(), queueDeclareEnum.getRetryRoutingKey(), message);}/*** 发送消息到重试队列** @param message          重试消息* @param queueDeclareEnum 队列枚举*/public void convertAndRetry(Message message, QueueDeclareEnum queueDeclareEnum, String ttlTime) {message.getMessageProperties().setExpiration(ttlTime);rabbitTemplate.convertAndSend(queueDeclareEnum.getExchangeName(), queueDeclareEnum.getRetryRoutingKey(), message);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/731335.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

波卡 Alpha 计划启动,呼唤先锋创新者重新定义 Web3 开发

原文&#xff1a;https://polkadot.network/blog/the-polkadot-alpha-program-a-new-era-for-decentralized-building-collaboration/ 编译&#xff1a;OneBlock 区块链领域不断发展&#xff0c;随之而来的是发展和创新机会的增加。而最新里程碑是令人振奋的 Polkadot Alpha …

深入理解nginx一致性哈希负载均衡模块[上]

1. 引言 在现代的网络应用中&#xff0c;负载均衡是一个至关重要的组件。它能够分配流量到多个服务器上&#xff0c;实现高可用性和性能扩展。Nginx是一个广泛使用的高性能Web服务器和反向代理服务器&#xff0c;其负载均衡模块提供了多种算法来实现流量的分发。其中&#xff0…

【刷题记录】详谈设计循环队列

下题目为个人的刷题记录&#xff0c;在本节博客中我将详细谈论设计循环队列的思路&#xff0c;并给出代码&#xff0c;有需要借鉴即可。 题目&#xff1a;LINK 循环队列是线性表吗&#xff1f;或者说循环队列是线性结构吗&#xff1f; 对于这个问题&#xff0c;我们来看一下线…

【Linux进阶之路】网络 —— “?“ (下)

文章目录 前言一、概念铺垫1.TCP2.全双工 二、网络版本计算器1. 原理简要2. 实现框架&&代码2.1 封装socket2.2 客户端与服务端2.3 封装与解包2.4 请求与响应2.5 对数据进行处理2.6 主程序逻辑 3.Json的简单使用 总结尾序 前言 在上文我们学习使用套接字的相关接口进行了…

面试经典150题(108-110)

leetcode 150道题 计划花两个月时候刷完之未完成后转&#xff0c;今天&#xff08;第3天&#xff09;完成了3道(108-110)150 108.(201. 数字范围按位与) 题目描述&#xff1a; 给你两个整数 left 和 right &#xff0c;表示区间 [left, right] &#xff0c;返回此区间内所有数…

Tomcat的安装

下载Tomcat&#xff08;这里以Tomcat8.5为例&#xff09; 直接进入官网进行下载&#xff0c;Tomcat官网 选择需要下载的版本&#xff0c;点击下载这里一定要注意&#xff1a;下载路径一定要记住&#xff0c;并且路径中尽量不要有中文&#xff01;&#xff01;&#xff01;&…

怎么把视频变成gif动图?一招在线生成gif动画

MP4是一种常见的视频文件格式&#xff0c;它是一种数字多媒体容器格式&#xff0c;可以用于存储视频、音频和字幕等多种媒体数据。MP4格式通常用于在计算机、移动设备和互联网上播放和共享视频内容。要将MP4视频转换为GIF格式&#xff0c;您可以使用专门的视频转gif工具。这个工…

三款顶级开源RAG (检索增强生成)工具:Verba、Unstructured 和 Neum

三款顶级开源RAG (检索增强生成)工具&#xff1a;Verba、Unstructured 和 Neum 概述 随着企业对话式数据处理需求的提升&#xff0c;面临的挑战是数据隐私性和缺乏企业级解决方案。虽然类似LangChain能在短时间内构建RAG应用&#xff0c;但忽视了文档解析、多来源数据ETL、批量…

Python 对Excel工作表中的数据进行排序

在Excel中&#xff0c;排序是整理数据的一种重要方式&#xff0c;它可以让你更好地理解数据&#xff0c;并为进一步的分析和报告做好准备。本文将介绍如何使用第三方库Spire.XLS for Python通过Python来对Excel中的数据进行排序。包含以下三种排序方法示例&#xff1a; 按数值…

【洛谷 P8723】[蓝桥杯 2020 省 AB3] 乘法表 题解(数学+进制转换+字符串)

[蓝桥杯 2020 省 AB3] 乘法表 题目描述 九九乘法表是学习乘法时必须要掌握的。在不同进制数下&#xff0c;需要不同的乘法表。 例如, 四进制下的乘法表如下所示&#xff1a; 1*11 2*12 2*210 3*13 3*212 3*321请注意&#xff0c;乘法表中两个数相乘的顺序必须为样例中所示的…

从0开始的 Vue 生活

Vue 一、配置环境1.1 安装node.js1.1.1 node.js 下载1.1.2 node.js 安装1.1.3 node.js 配置 1.2 安装VSCode1.2.1 VSCode 下载1.2.2 VSCode 安装1.2.3 VSCode 配置 二、创建Vue项目2.1 使用命令行创建Vue项目2.2 使用VSCode运行Vue项目2.3 尝试编写Vue项目2.3.1 准备工作2.3.2 …

线性代数笔记14--投影

1. 一维空间投影 p X A e B − p B − X A A ⊤ e 0 A ⊤ ( B − X A ) 0 X A ⊤ A A ⊤ B X A ⊤ B A ⊤ A p X A A A ⊤ B A ⊤ A pXA\\ eB-pB-XA\\ A^{\top}e0\\ A^{\top}(B-XA)0\\ XA^{\top}AA^{\top}B\\ X\frac{A^{\top}B}{A^{\top}A}\\ pXAA\frac{A^{\top}B}{A^…

Java开发与配置用到的各类中间件官网

开发配置时用到了一些官网地址&#xff0c;记录一下。 activemq 官网&#xff1a;ActiveMQ elk 官网&#xff1a;Elasticsearch 平台 — 大规模查找实时答案 | Elastic nginx 官网&#xff1a;nginx maven 官网&#xff1a;Maven – Welcome to Apache Maven nexus 官网&a…

Zoom软件怎么购买?zoom付费订阅教程

首先&#xff0c;让我们来了解一下Zoom的各个版本以及它们的价格。简单来说&#xff0c;Zoom分为免费版和收费版&#xff0c;收费版又包括专业版、商业版和企业版。 一、免费版 Zoom的免费版功能已经非常实用了&#xff0c;适合个人用户和小团队使用。免费版提供以下功能: 最多…

centos7 python3.12.1 报错 No module named _ssl

https://blog.csdn.net/Amio_/article/details/126716818 安装python cd /usr/local/src wget https://www.python.org/ftp/python/3.12.1/Python-3.12.1.tgz tar -zxvf Python-3.12.1.tgz cd Python-3.12.1/ ./configure -C --enable-shared --with-openssl/usr/local/opens…

小程序学习

一、第一天 1、小程序体验 2、注册账号 小程序 (qq.com) 3、开发工具下载 下载 / 稳定版更新日志 (qq.com) 4、目录结构 "navigationBarBackgroundColor": "#00b26a" 配置头部背景色 4、wxml模板介绍 5、wxss 6、js文件 7、宿主环境 1、通信主体 2…

spring boot 2.4.x 之前版本(对应spring-cloud-openfeign 3.0.0之前版本)feign请求异常逻辑

目录 feign SynchronousMethodHandler 第一部分 第二部分 第三部分 spring-cloud-openfeign LoadBalancerFeignClient ribbon AbstractLoadBalancerAwareClient 在之前写的文章配置基础上 https://blog.csdn.net/zlpzlpzyd/article/details/136060312 因为从 spring …

Java --- springcloud之consul

目录 一、consul的使用 1.1、主要功能 1.2、安装及运行 1.3、添加微服务到consul 1.3.1、8001微服务添加相关pom、配置文件、注解 1.3.2、80微服务添加相关pom、配置文件、注解 1.4、三个注册中心异同 1.5、consul进行分布式配置 1.5.1、修改8001的yml配置文件 1.5.2…

Ubuntu中如何卸载软件

在Ubuntu系统中要干净地卸载软件&#xff0c;意味着不仅移除软件本身&#xff0c;还包括它的配置文件以及可能存在的依赖关系。以下是几种确保干净卸载的方法&#xff1a; 方法1&#xff1a;使用apt-get 通过命令行进行卸载&#xff0c;这是最常用且能处理依赖关系的方式&…

运维知识点-Apache HTTP Server

Apache 介绍 介绍 Apache是一个开源的Web服务器软件&#xff0c;全称为Apache HTTP Server&#xff0c;由Apache软件基金会开发和维护。它是目前全球使用最广泛的Web服务器软件之一&#xff0c;占全球所有网络服务器的很大比例。Apache服务器具有跨平台的特性&#xff0c;可以…