MQ如何保证可靠性

       📝个人主页:五敷有你      

 🔥系列专栏:MQ

⛺️稳中求进,晒太阳

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

2.数据持久化

为了提高性能,默认情况下,MQ的数据都是再内存存储的临时数据,重启后就会消失,为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化

  • 队列持久化

  • 消息持久化

我们以控制台界面为例来说明。

2.1.交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

image.png

设置为Durable就是持久化模式,Transient就是临时模式。

2.2.队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

image.png

除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。

2.3.消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

image.png

说明

        在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

        不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

 代码层次实现:

 @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "msg.queue",durable ="true"),exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),key = "msg"))public void listenMsg(String jsonStr){log.info("收到消息{}", jsonStr);Map<String,Object> map = JSONUtil.toBean(jsonStr, Map.class);JSONObject object=new JSONObject(map);String actionName =object.getString(Action.ACTION);Action action = getAction(actionName);action.doMessage(getWebSocketManager(),object);}

3.消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障

  • 消费者接收到消息后突然宕机

  • 消费者接收到消息后,因处理不当导致异常

RabbitMQ如何得知消费者的处理状态呢?

3.1消费者确认机制

        为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息

  • nack:消息处理失败,RabbitMQ需要再次投递消息

  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

        一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

        由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

测试下面代码验证: 在none情况下,异常产生后,消息队列中的消息被删除了

  @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "msg.queue",durable ="true"),exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),key = "msg"))public void listenMsg(String jsonStr){log.info("收到消息{}", jsonStr);throw new RuntimeException("异常");}

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;

测试下面代码验证: 在auto情况下,异常产生后,消息一直在被重复投递,

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):放行以后,由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除: 这个一直Unack的状态。当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

如果是消息转换异常,spring会返回reject

刚刚发现,当消费者出现异常后,消息会不断的requeue(重入队)到队列,再重新发送给消费者,如果消费者执行依然出错,消息会再次投递到队列,直到处理成功为止。

极端情况下,消费之一直无法执行成功,那么消息requeue就会无限循环,导致mq的处理消息飙升,带来不必要的压力。

3.2.失败重试机制

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次

  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

试了三次,失败就停下来了。

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试

  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3.2失败处理策略

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
package com.aqiuo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic TopicExchange errorMessageExchange(){return new TopicExchange("error.topic",true,false);}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, TopicExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/7064.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Typescript语法二

继承 继承是⾯向对象编程中的重要机制&#xff0c;允许⼀个类&#xff08;⼦类或派⽣类&#xff09;继承另⼀个类&#xff08;⽗类或基类&#xff09;的属性和⽅法。⼦类可以直接使⽤⽗类的特性&#xff0c;并根据需要添加新的特性或覆盖现有的特性。这种机制赋予⾯向对象程序良…

getchar和putchar函数详解

getchar和putchar函数详解 1.getchar函数1.1函数概述1.2函数返回值1.3函数注意事项1.4函数的使用 2.putchar函数2.1函数概述2.2函数返回值2.3函数使用实例 1.getchar函数 1.1函数概述 从一个流中读取一个字符&#xff0c;或者从标准输入中获得一个字符 函数原型&#xff1a; …

Sa-Token框架入门使用

说明&#xff1a;Sa-Token是一个轻量级java权限认证框架&#xff08;官方语&#xff09;&#xff0c;所谓权限认证框架&#xff0c;就是登录框架&#xff0c;像Shiro、Spring Security。本文介绍Sa-Token框架的入门使用&#xff0c;基于Spring Boot环境。 准备工作 首先&…

滑动窗口 | 1652. 拆炸弹 |LeetCode

文章目录 题目介绍暴力(可以过力扣竟然。不愧是简单题)&#xff1a;滑动窗口 祝你天天开心 题目介绍 你有一个炸弹需要拆除&#xff0c;时间紧迫&#xff01;你的情报员会给你一个长度为 n 的 循环 数组 code 以及一个密钥 k 。 为了获得正确的密码&#xff0c;你需要替换掉每…

代码随想录第四十五天|爬楼梯、零钱兑换、完全平方数

题目链接&#xff1a;57. 爬楼梯&#xff08;第八期模拟笔试&#xff09; 代码如下&#xff1a; 题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 代码如下&#xff1a; 题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 代码如下&#xff1a;…

Vue阶段练习:初始化渲染、获取焦点、记账清单

阶段练习主要承接Vue 生命周期-CSDN博客 &#xff0c;学习完该部分内容后&#xff0c;进行自我检测&#xff0c;每个练习主要分为效果显示、需求分析、静态代码、完整代码、总结 四个部分&#xff0c;效果显示和准备代码已给出&#xff0c;我们需要完成“完整代码”部分。 练习…

关系型数据库MySql分库分表带来的问题以及解决方案

水平分表 水平分表是什么&#xff1f; 将一张表横向拆分为多张表&#xff0c;拆分的表&#xff0c;依然在同一个库中。 例如&#xff0c;user表有400w条记录&#xff0c;将user表拆分成4张表&#xff0c;每张表100w条记录。拆分后的表名&#xff0c;分别叫做user_0、user1、u…

基于 Linux 自建怀旧游戏之 - 80 款 H5 精品小游戏合集

1&#xff09;简介 最近又找到了一款宝藏游戏资源分享给大家&#xff0c;包含 80 款 H5 精品小游戏&#xff0c;都是非常有趣味耐玩的游戏&#xff0c;比如 植物大战僵尸、捕鱼达人、贪吃蛇、俄罗斯方块、斗地主、坦克大战、双人五子棋、中国象棋 等等超级好玩的 H5 小游戏&…

CUDA调整指令级原语

在GPU上运行的运算密集型应用程序&#xff0c;处理器的计算吞吐量可以用它在一段时间内执行操作的数量来衡量。因为GPU有很多SIMT指令和计算核心&#xff0c;所以其峰值计算吞吐量通常比其他的处理器高。 对应用程序的吞吐量和正确性进行优化时&#xff0c;理解不同低级原语的…

常见通信协议

1、串口&#xff1a;&#xff08;串行异步全双工&#xff0c;先发低位&#xff09; 因为是异步的&#xff0c;所以没有时钟线&#xff0c;因为是全双工&#xff0c;所以有两条数据传输线&#xff0c;实现数据的收发。 帧格式 起始位1位&#xff0c;数据位8位&#xff0c;校验…

【教程】极简Python接入免费语音识别API

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;请不吝给个[点赞、收藏、关注]哦~ 安装库&#xff1a; pip install SpeechRecognition 使用方法&#xff1a; import speech_recognition as srr sr.Recognizer() harvard sr…

LeetCode 面试经典150题 228.汇总区间

题目&#xff1a; 给定一个 无重复元素 的 有序 整数数组 nums 。 返回 恰好覆盖数组中所有数字 的 最小有序 区间范围列表 。也就是说&#xff0c;nums 的每个元素都恰好被某个区间范围所覆盖&#xff0c;并且不存在属于某个范围但不属于 nums 的数字 x 。 列表中的每个区…

spring高级篇(九)

boot的执行流程分为构造SpringApplication对象、调用run方法两部分 1、Spring Boot 执行流程-构造 通常我们会在SpringBoot的主启动类中写以下的代码&#xff1a; 参数一是当前类的字节码&#xff0c;参数二是main的args参数。 public class StartApplication {public static…

【Linux IO基础】缓冲区

概念 缓冲区的主要作用是提高效率 --- 提高使用者的效率&#xff0c;因为有缓冲区的存在&#xff0c;我们可以积累一部分再统一发送&#xff0c;提高发送的效率。 刷新方式 缓冲区因为能够暂存数据&#xff0c;必定要有一定的刷新方式&#xff1a; 一般策略&#xff1a; 无…

Flask应用的部署和使用,以照片分割为例。

任务是本地上传一张照片&#xff0c;在服务器端处理后&#xff0c;下载到本地。 服务器端已经封装好了相关的程序通过以下语句调用 from amg_test import main from test import test main() test() 首先要在虚拟环境中安装flask pip install Flask 文件组织架构 your_pro…

基于Spring Boot的民宿管理平台设计与实现

基于Spring Boot的民宿管理平台设计与实现 开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/idea 系统部分展示 前台首页功能界面图&#xff0c;在系统首页可以查看首页…

【软件开发规范篇】JAVA后端开发编程规范

作者介绍&#xff1a;本人笔名姑苏老陈&#xff0c;从事JAVA开发工作十多年了&#xff0c;带过大学刚毕业的实习生&#xff0c;也带过技术团队。最近有个朋友的表弟&#xff0c;马上要大学毕业了&#xff0c;想从事JAVA开发工作&#xff0c;但不知道从何处入手。于是&#xff0…

视频号小店在行业内的门槛高不高?有门槛是好事还是坏事?

大家好&#xff0c;我是电商小V 现在伴随着时代的慢慢发展&#xff0c;很多人都是想找一个好一点的创业项目&#xff0c;现在找创业项目都是找一些稍微有门槛的项目&#xff0c;没有门槛的话&#xff0c;要不然刚开始去做&#xff0c;项目就泛滥了&#xff0c;项目的红利期直接…

【AI】深度学习框架的期望与现实 机器学习编译尚未兑现其早期的一些承诺……

深度学习框架的期望与现实 机器学习编译尚未兑现其早期的一些承诺…… 来自&#xff1a;Axelera AI 资深软件工程师 Matthew Barrett 原帖是linkedin帖子&#xff1a; https://linkedin.com/posts/matthew-barrett-a49929177_i-think-its-fair-to-say-that-ml-compilation-ac…

Python_4-远程连接Linux

文章目录 使用Python通过SSH自动化Linux主机管理代码执行ls结果&#xff1a;文件传输&#xff1a; 使用Python通过SSH自动化Linux主机管理 在系统管理与自动化运维中&#xff0c;SSH&#xff08;Secure Shell&#xff09;是一个常用的协议&#xff0c;用于安全地访问远程计算机…