【RabbitMQ】可靠性策略(幂等,消息持久化)

MQ可靠性策略

  • 发送者的可靠性问题
    • 生产者的重连
    • 生产者确认
  • MQ的可靠性
    • 数据持久化
    • Lazy Queue
  • 消费者的可靠性问题
    • 消费者确认机制
    • 消息失败处理
  • 业务幂等性
  • 简答问题

发送者的可靠性问题

生产者的重连

可能存在由于网络波动,出现的客户端连接MQ失败,我们可以通过配置文件配置解决

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled:  true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时间= initial-interval * multipliermax-attempts: 3 #最大重试次数

生产者确认

RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开机确认机制后,在MQ成功收到消息后会返回确认消息给生产者,返回的结果有以下几种情况:

  1. 消息投递到了MO,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  2. 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  3. 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  4. 其他情况都会返回NACK,告知投递失败

通过配置文件配置生产者的消息类型:

spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里的publisher-confirm-type 有三种模式可选:
none:关闭confirm机制
simple: 同步阻塞等待MQ的回执消息
correlated:MQ异步调用方式返回回执消息

异步回调方式:

我们完成一个任务将消息交由消息队列中,就进行别的任务了,当消息队列返回异常问题,在过来进行对应的处理

我们需要调用ReturnCallback函数完成消息失败后的操作:
在使用之前需要配置ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback

@Configuration
public class CommonConfig implements ApplicationContextAware{@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException{//获取RabbitTemplateRabbitTemplate rabbittemplate =applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallbackrabbitTemplate.setReturnCallback(message,replyCode,replyText,exchange,routingKey)->{//处理操作}}
}

通过ConfirmCallback来处理消息失败:
每一个消息指定一个ConfirmCallback

void test() throws InterruptedException{CorrelationData cd= new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.confirm>(){@Overridepublic void onFailure(Throwable ex){//Future 发生异常是的逻辑处理,基本不会触发}@Overridepublic void onSuccess(CorrelationData.confirm result){//Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ //result.isack,boolean类型,true代表ack回执,false表示nack回执//处理逻辑}else{//异常处理}}});
}
rabbittemplate.convertAndSend("","",cd);

MQ的可靠性

在默认情况下,Rabbitmq会将接收到的数据保存在内存中以降低消息收发的延迟,这样会有问题:

  1. 一旦MQ宕机,内存的消息会丢失
  2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞(消息对队列将消息保存到磁盘,此时MQ阻塞)

数据持久化

RabbitMQ的数据持久化包括:

  1. 交换机持久化(Durable 永久的,Transient临时的)
  2. 队列持久化(Durable 永久的,Transient临时的)
  3. 消息持久化

消息的持久化:

void test(){Message message =MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF-8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
}

会将消息在过程中持久化到磁盘不会导致MQ阻塞

消息的持久化性能不是很高,可以通过Lazy Queue进行消息的持久化

Lazy Queue

惰性队列
特征:

  1. 接收到消息后直接存入磁盘而非内存(内存只保留最近消息,默认2048条)
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存
  3. 支持百万条数据的消息存储

在3.12 版本后,所有的队列都是Lazy Queue模式,无法更改
在这里插入图片描述
在java中要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() //开启lazy模式.build();
}

通过注解也可以实现

@RabbitListener(queuesToDeclare= @Queue(name="lazy.queue",durable="true",//持久化arguments =@Argument(name="x-queue-mode",value="lazy")))
public void listenLazyQueue(String msg){//消费处理
}

消费者的可靠性问题

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该信息
nack:消息处理失败,RabbitMQ需要再次投递信息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该信息
那么如何实现呢:
SprinaAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式
有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SprinGAMOP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时:
如果是业务异常,则会自动返回nack
如果是消息处理或校验异常,自动返回reject

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none #none,关闭

消息失败处理

如果消费者返回nack,那就会重复进行,这样大大影响效率
我们可以利用Spring的retry机制,在消费者出现异常的时利用本地重试:

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true  #开启消费者失败重试initial-interval: 1000ms #初始的失败等待时间multiplier: 1 #下次失败的等待时长的倍数max-attempts: 3 # 最大重试次数stateless: true # true无状态,false有状态,如果业务中包含事务,这里改为false

在开启重试模式之后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
在这里插入图片描述

将失败消息重新投递到error交换机中,可以绑定error消息队列,将来发送信息给开发人员等操作消息

将失败策略改为RepublishMessageRecoverer:

  1. 首先,定义接收失败消息的交换机,队列绑定
  2. 定义RepublishMessageRecoverer
@Bean
public MessageRecoverer test(RabbitTemplate rabbittemplate){return new RepublishMessageRecoverer(rabbitTemplate,"交换机名称","key值")
}

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),在程序开发中,则指同一个业务,执行一次或多次对业务状态的影响是一致的
幂等的使用场景:防止某一数据被重复进行修改
幂等业务:根据id的查询业务,根据id的删除业务等
非幂等:用户下单,扣减库存等

如何实现幂等:
方案一唯一消息id
给每一个消息都设置一个唯一id,利用id区分是否重复消息:

  1. 每一条消息都生成一个唯一id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息id保存到数据库
  3. 如果下次又收到相同的消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
@Bean
public MessageConverter messageConverter(){//定义消息转换器Jackson2JsonMessageConverter jjmc= new jackson2JsonMessageConverter();//配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

但是这样会造成额外的操作冗余,比如还需要写数据库等等
方案二:结合业务逻辑,基于业务本身做判断

简答问题

如何保证支付服务与交易服务之间的订单状态一致性:

  1. 首先,支付服务会在正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步
  2. 其次,为了保证消息的可靠性,我们采取了生产者确认机制,消费者确认,消费者失败重试等策略,确保消息投递和处理的可靠性,同时可开启了MQ的持久化,避免因服务宕机导致消息丢失
  3. 最后,我们还会交易服务更新订单状态时作业业务幂等判断,避免因消息重复导致订单异常

如果交易服务处理失败,还有什么方案:
在交易服务设置定时任务,定期查询订单生产状态,这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/5267.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VUE+TS使用elementUI的el-checkbox双重v-for循环做勾选

html部分 <template><div class"hello"><el-form :model"elForm"> <!-- cities对象数组形式 --><el-form-item v-for"(item, topIndex) in cities" :key"topIndex"> <!--item.checked 是每一个item…

最新游戏陪玩语音聊天系统3.0商业升级独立版本源码+搭建教程

首发价值29800元的最新商业版游戏陪玩语音聊天系统3.0商业升级独立版本源码。 下 载 地 址 &#xff1a; runruncode.com/php/19748.html 1. 新增人气店员轮播功能。 2. UI界面优化&#xff0c;包括游戏图标展示和分类展示的改进。 3. 增加动态礼物打赏功能。 4. 新增礼…

SQL底层执行过程

MySQL 的查询流程 客户端请求连接器 负责与客户端的通信,是半双工模式&#xff08;半双工(Half Duplex)数据传输指数据可以在一个信号载体的两个方向上传输,但是不能同时传输。&#xff09;&#xff0c;验证请求用户的账户和密码是否正确&#xff0c;③如果用户的账户和密码验…

Codigger数据篇(下):数据安全的全方位保障

在数字化浪潮中&#xff0c;数据已成为现代企业的核心财富。Codigger作为领先的数据服务平台&#xff0c;深知数据安全对于用户的重要性&#xff0c;因此在深挖数据价值的同时&#xff0c;我们始终坚守数据安全防线。 一、双重加密技术保障 Codigger平台运用先进的加密通信和…

vue2.7与vue2.6、vue3的区别

官网链接&#xff1a;https://v2.cn.vuejs.org/v2/guide/migration-vue-2-7.html -组合式与选项式 选项式&#xff1a;export default { 各种选项关键字名&#xff0c;都定好了&#xff0c;无需引入&#xff0c;配对放置即可}

stm32单片机开发一、中断之外部中断实验

stm32单片机的外部中断和定时器中断、ADC中断等都由stm32的内核中的NVIC模块控制&#xff0c;stm32的中断有很多中&#xff0c;比如供电不足中断&#xff0c;当供电不足时&#xff0c;会产生的一种中断&#xff0c;这么多中断如果都接在CPU上&#xff0c;或者说CPU去处理&#…

Leetcode—2639. 查询网格图中每一列的宽度【简单】

2024每日刷题&#xff08;121&#xff09; Leetcode—2639. 查询网格图中每一列的宽度 实现代码 class Solution { public:int func(int num) {if(num 0) {return 1;}int len 0;while(num ! 0) {len;num / 10;}return len;}vector<int> findColumnWidth(vector<ve…

如何测试响应式网站

我们每天通过多种设备访问互联网。移动电话&#xff0c;台式机/笔记本电脑&#xff0c;平板电脑&#xff0c;平板电脑…我们所掌握的设备数量已经增长为天文数字。作为消费者&#xff0c;体验很棒。我们可以随时随地在任何设备上自由访问互联网。但对于Web开发人员&#xff0c;…

类加载子系统之类的生命周期(待完善)

0、前言 文中大量图片来源于 B站 黑马程序员 0.1、类加载子系统在 JVM 中的位置 类加载器负责的事情是&#xff1a;加载、链接、解析 0.2、与类的生命周期相关的虚拟机参数 参数描述-XX:TraceClassLoading打印出加载且初始化的类 1、类的生命周期 堆上的变量在分配空间的时…

【C++STL详解(三)】------vector的介绍与使用

目录 前言 一、关于数组 二、vector的介绍 三、vector的使用 Ⅰ、默认成员函数 1.构造函数 2.赋值重载 3.析构函数 Ⅱ、容量 1.size(&#xff09; 2.capacity() 3.empty() 4.resize() 5.reserve() Ⅲ、遍历操作 1.迭代器 begin() end()&#xff08;正向迭代器…

element 分页切换时:current-page无效 页数不会跟着一起切换

问题回溯&#xff1a;使用el-pagination组件 选择切换当前分页 页数为2 问题结果&#xff1a;el-pagination组件 当前页切换失败 一直都是 1&#xff0c;接口传参分页数据是2&#xff0c;打印当前分页也是2 解决方案1&#xff1a;使用 current-page参数 .sync 修饰符 解决方案2…

rust将json字符串直接转为map对象或者hashmap对象

有些时候我们还真的不清楚返回的json数据里面到底有哪些数据&#xff0c;数据类型是什么等&#xff0c;这个时候就可以使用批处理的方式将json字符串转为一个对象&#xff0c;然后通过这个对象的get方法来获取json里面的数据。 pub async fn test_json(&self) {let json_st…

STM32的TIM输入捕获和PWMI详解

系列文章目录 STM32单片机系列专栏 C语言术语和结构总结专栏 文章目录 1. IC输入捕获 2. 频率测量 3. 主模式、从模式、触发源选择 4. 输入捕获基本结构 5. PWMI模式 6. 代码示例 6.1 PWM.c 6.2 PWM.h 6.3 IC.c 6.4 IC.h 6.5 完整工程文件 输出比较可以看下面这篇…

numpy+matplotlib绘制阿基米德螺线

【第10次课]实验十一数据可视化及应用】 声明&#xff1a;著作权归作者所有。商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处。 绘制阿基米德螺线&#xff0c;运行效果如图所示。 参数方程为: x icosi yisini 其中半径i和圆心角i变化一致&#xff0c;取值范围为…

#你觉得未来的智慧工地是什么样子的#

#你觉得未来的智慧工地是什么样子的# 有人说“现阶段的智慧工地都是噱头&#xff0c;实际用处不大”&#xff1b;也有人将智慧工地吹嘘上天。那么&#xff0c;随着技术的发展&#xff0c;你觉得未来的智慧工地会是什么样子呢&#xff1f; 随着大数据时代的到来&#xff0c;未来…

Three.js杂记(十五)—— 汽车展览(下)

在上一篇文章Three.js杂记&#xff08;十四&#xff09;—— 汽车展览上 - 掘金 (juejin.cn)中主要对切换相机不同位置和鼠标拖拽移动相机焦点做了简单的应用。 那么现在聊聊该如何实现汽车模型自带的三种动画展示了&#xff0c;实际上可以是两种汽车前后盖打开和汽车4车门打开…

抑郁后的症状表现——XWX-QP大小鼠强迫游泳桶硬件

简单介绍&#xff1a; 大小鼠强迫游泳桶硬件主要用于抗抑郁的研究。适用于大鼠、小鼠或其他实验室动物&#xff0c;通过将实验动物置于一个局限的环境中&#xff0c;动物在该环境中拼命挣扎试图逃跑又无法逃脱&#xff0c;从而提供了一个无可回避的压迫环境&#xff0c;动物的…

如何提取二维码文本信息?文本二维码提取内容的方法

如何分解出二维码中的文本信息呢&#xff1f;很多商家在做活动时会给每个用户生成一个单独的二维码&#xff0c;每一个二维码中有单独的编号信息&#xff0c;那么当我们收集到用户的二维码时&#xff0c;如何操作才能够提取二维码中的编号信息呢&#xff1f;想要解决这个问题可…

双目深度估计原理立体视觉

双目深度估计原理&立体视觉 0. 写在前面1. 双目估计的大致步骤2. 理想双目系统的深度估计公式推导3. 双目标定公式推导4. 极线校正理论推导 0. 写在前面 双目深度估计是通过两个相机的对同一个点的视差来得到给该点的深度。 标准系统的双目深度估计的公式推导需要满足:1)两…

Vue3+ts(day04:watch、watchEffect)

学习源码可以看我的个人前端学习笔记 (github.com):qdxzw/frontlearningNotes 觉得有帮助的同学&#xff0c;可以点心心支持一下哈&#xff08;笔记是根据b站上学习的尚硅谷的前端视频【张天禹老师】&#xff0c;记录一下学习笔记&#xff0c;用于自己复盘&#xff0c;有需要学…