RabbitMQ之消费者可靠性

文章目录

  • 前言
  • 一、消费者确认机制
  • 二、失败重试机制
  • 三、失败处理策略
  • 四、业务幂等性
    • 唯一消息ID
    • 业务判断
  • 五、兜底方案
  • 总结


前言

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
但问题来了:RabbitMQ如何得知消费者的处理状态呢?


一、消费者确认机制

  • 为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

    • ack:成功处理消息,RabbitMQ从队列中删除该消息
    • nack:消息处理失败,RabbitMQ需要再次投递消息
    • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
  • 一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack。

  • 由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

    • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
    • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
    • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
      • 如果是业务异常,会自动返回nack;
      • 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring 消费者接收到消息:【" + msg + "】");if (true) {throw new MessageConversionException("故意的");}log.info("消息处理完成");
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

我们再次把确认机制修改为auto:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

在这里插入图片描述

放行以后,由于抛出的是消息转换异常,因此Spring会自动返回reject,所以消息依然会被删除:

在这里插入图片描述
我们将异常改为RuntimeException类型(此时配置还是auto):

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring 消费者接收到消息:【" + msg + "】");if (true) {throw new RuntimeException("故意的");}log.info("消息处理完成");
}

在异常位置打断点,然后再次发送消息测试,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

在这里插入图片描述
异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除:

在这里插入图片描述
所以当我们把配置改为auto时,如果是业务异常,会自动返回nack,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

二、失败重试机制

  • 当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
  • 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。
  • 当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次。
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject。

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试。
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃。

三、失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

定义处理失败消息的交换机和队列:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

四、业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:

  • 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  • 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  • 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  • 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID

这个思路非常简单:

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。

以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

业务判断

  • 业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
  • 例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
  • 相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例:

@Override
public void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();
}

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

五、兜底方案

  • 虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
  • 假设我们要做一个支付项目,有没有其它兜底方案,能够确保订单的支付状态一致呢?
  • 其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
  • 通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

总结

以上就是实现消息可靠性的详细讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/171532.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3DMAX平滑布尔插件超级布尔工具使用教程

3dMax平滑布尔插件SmoothBoolean用于在ProBoolean网格之间创建平滑过渡的3dMax插件。再也不需要花几个小时来清理用布尔切割创建的混乱拓扑了。如果你只是想让你的三维模型在渲染图上看起来很棒,或者你需要为游戏烘焙一些东西,那么你就不必浪费时间来建模…

Kotlin学习——kt中的类,数据类 枚举类 密封类,以及对象

Kotlin 是一门现代但已成熟的编程语言,旨在让开发人员更幸福快乐。 它简洁、安全、可与 Java 及其他语言互操作,并提供了多种方式在多个平台间复用代码,以实现高效编程。 https://play.kotlinlang.org/byExample/01_introduction/02_Functio…

个人硬件测试用例入门设计

📑打牌 : da pai ge的个人主页 🌤️个人专栏 : da pai ge的博客专栏 ☁️宝剑锋从磨砺出,梅花香自苦寒来 🌤️功能测试 进行新增、…

Java中绕过SSL/TLS验证:开发与风险透视

警告: 本文提供的方法绕过SSL/TLS证书验证,这在某些开发场景下可能是有用的,但使用这些方法会导致严重的安全隐患。在生产环境中,你应该始终验证SSL/TLS证书以确保数据的安全传输。 引言 在日常的软件开发中,我们经常需要与其他服…

FFmpeg命令分隔视频

有一个视频如a.mp4,此视频采用帧率为30生成,共有299帧,这里通过FFmpeg命令分隔成1秒一个个的小视频,即每个小视频帧数为30帧。 用到的FFmpeg参数如下所示: (1).-i:指定输入视频文件的名称; (2).-c:指…

name 属性:提高 Vue 应用可维护性的关键

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…

对称加密与非对称加密的区别是什么?

对称加密与非对称加密的区别是什么? 对称加密概念:好处和坏处:基本原理 非对称加密概念:工作原理: 两者区别安全性处理速度密钥管理通信双方数量 对称加密 概念: 同一个密钥可以同时用来对信息进行加密和…

Sublime Text 3运行 Python文件出现中文打印乱码的解决方式

很多小伙伴在下载安装好sublime这个编辑器后发现,它虽然能够用来打开python脚本和创建文件编写代码,但是却不能够来运行python代码和程序。所以下面这一篇文章就是会来分享一下,sublime编辑器无法运行python的解决方法,感兴趣的话…

Linux环境下自动化创建大量的账号

参考《鸟哥的Linux私房菜基础篇第四版》13.7.2节微调而成: 下面脚本的目的是为服务器的管理员自动化创建大量的账号,节省生命。 #!/bin/bash # This shell script will create amount of Linux login accounts for you. # 1. check the "accounta…

2006-2023年2月地级市城投债数据

2006-2023年2月地级市城投债数据 1、时间:2006-2023年2月 2、指标:省份、城市、证券代码、证券简称、债券简称、证券全称、债券初始面值单位元、债券最新面值交易日期20221231、发行总额单位亿元、债券余额日期20221231单位亿、起息日期、计息截止日、…

React中通过children prop或者React.memo来优化子组件渲染【react性能优化】

文章目录 前言未优化之前的代码问题解决方案一,通过children prop解决方案二,通过React.memo后言 前言 hello world欢迎来到前端的新世界 😜当前文章系列专栏:react.js 🐱‍👓博主在前端领域还有很多知识和…

基于PyQT5的图像分类网络训练平台

1.主界面 2.选择数据集路径 里面包含两个文件夹 train和val 3.选择类别标签 以txt为结尾 4.训练基本设置 包括输入图像大小、batch size、轮次、学习率等 5.训练高级设置 是否进行标签平滑、图像增强操作 6.选择训练日志输出地址 为一个文件夹 7.选择训练好的模…

1.如何修改seruat对象的行名 2.FeaturePlot如何把所有阳性表达的spot放到图的前面

本有解决标题中的两个问题 1.答案是修改不了,不如重新制作一个seurat对象。 试图使用rownames(obj)featurenames是不成功的 记录 客户需求遇到一个问题:作者提供的rds文件行名为ensemble id,如何改成gene symbol。 …

【Vue】图片切换

上一篇&#xff1a; vue的指令 https://blog.csdn.net/m0_67930426/article/details/134599378?spm1001.2014.3001.5502 本篇所需要的指令有&#xff1a; v-on v-bind v-show <!DOCTYPE html> <html lang"en"> <head><meta charset"…

人力资源管理后台 === 角色管理

目录 1.组织架构-编辑部门-弹出层获取数据 2.组织架构-编辑部门-编辑表单校验 3.组织架构-编辑部门-确认取消 4.组织架构-删除部门 5.角色管理-搭建页面结构 6.角色管理-获取数据 7.角色管理-表格自定义结构 8.角色管理-分页功能 9.角色管理-新增功能弹层 10.角色管理…

2024年天津天狮学院市场营销专业《管理学》考试大纲

2024年天津天狮学院专升本市场营销专业高职升本入学考试《管理学》考试大纲 一、考试性质 《管理学》专业课程考试是天津天狮学院市场营销专业高职升本入学考试的必考科 目之一&#xff0c;其性质是考核学生是否达到了升入本科继续学习的要求而进行的选拔性考试。《管理学》考…

5、Hydra与Crunch基本使用

文章目录 一、关于Hydra与Crunch二、在操作机上使用Crunch生成用户名和密码字典三、在操作机上使用Hydra对靶机FTP登录密码进行字典攻击 一、关于Hydra与Crunch Hydra&#xff08;九头蛇&#xff09;是一个相当强大的暴力密码破解工具。该工具支持几乎所有协议的在线密码破解&…

目录树自动生成器 golang+fyne

go tree 代码实现请看 gitee 仓库链接 有很多生成目录树的工具&#xff0c;比如windows自带的tree命令&#xff0c;nodejs的treer&#xff0c;tree-cli等等。这些工具都很成熟、很好用&#xff0c;有较完善的功能。 但是&#xff0c;这些工具全部是命令式的&#xff0c;如果…

解密 sqli靶场第一关:一步一步学习 SQL 注入技术

目录 一、判断是否存在注入点 二、构造类似?id1 --的语句 三、判断数据表中的列数 四、使用union联合查询 五、使用group_concat()函数 六、爆出数据库中的表名 七、爆出users表中的列名 八、爆出users表中的数据 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很…

Matlab数学建模算法详解之混合整数线性规划 (MILP) 算法(附完整实现代码)

&#x1f517; 运行环境&#xff1a;Matlab &#x1f6a9; 撰写作者&#xff1a;左手の明天 &#x1f947; 精选专栏&#xff1a;《python》 &#x1f525; 推荐专栏&#xff1a;《算法研究》 #### 防伪水印——左手の明天 #### &#x1f497; 大家好&#x1f917;&#x1f91…