RabbitMQ如何保证消息的可靠性6000字详解

RabbitMQ通过生产者、消费者以及MQ Broker达到了解耦的特点,实现了异步通讯等一些优点,但是在消息的传递中引入了MQ Broker必然会带来一些其他问题,比如如何保证消息在传输过程中可靠性(即不让数据丢失,发送一次消息就会被消费一次)?这篇博客将详细从生产者,MQ Broker以及消费者的角度讲解如何保证消息的可靠性!

1,消息丢失的情况

1.1 消息传递流程图如下

 Producer -> exchange ->queue -> Consumer(其中exchange和queue属于MQ Broker的组件)

1.2 消息可能丢失的情况

  • 生产者给交换机exchange的过程中发生数据丢失;
  • 交换机exchange路由给队列queue的过程中发生数据丢失;
  • 消息到达MQ的一瞬间,MQ发生了宕机的情况造成数据丢失;
  • 消费者从队列queue中取出消息进行消费的一瞬间消费者宕机了造成数据丢失。

2,生产者确认机制

生产者确认机制主要是站在生产者的角度来保证消息的可靠性,针对的是生产者给交换机发送消息以及交换机给队列发送消息的过程中数据丢失的情况!

2.1 书写配置信息

# 配置日志信息
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debugspring:rabbitmq:host: 123.207.72.43 # rabbitMQ的ip地址port: 5672 # 端口username: adminpassword: 123virtual-host: /publisher-confirm-type: correlatedpublisher-returns: true#消息发送失败时执行returnCallback回调函数template:mandatory: true
  • publisher-confirm-type表示开启publisher-confirm;这个参数有两种类型,分别是correlated和simple(correlated代表异步等待回调,类似于js中发送的ajax请求的回调函数,MQ返回结果时会执行定义的confirmCallback函数;simple代表同步等待confirm结果直到超时);
  • publisher-returns表示开启publish-return功能,同样是基于callback机制,不过是定义returnCallback;
  • template.mandatory定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。

2.2 定义return回调机制

我们使用的是SpringBoot来整合的RabbitMQ,所以不论是return回调还是confim回调都是用rabbittemplate对象进行定义的。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//记录日志log.error("消息发送队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());//如果需要的话进行消息的重发});}
}

注意:

  1. 一个RabbitTemplate只能配置一个ReturnCallback,所以需要在项目启动的时候进行定义,这样rabbitTemplate就是全局唯一的了(也可以采用PostConstruct注解中的init方法进行定义);
  2. ApplicationContextAware是Spring创建完Bean工厂之后的通知方法,当Spring创建完Bean工厂之后就可以在Spring容器中拿到RabbitTemplate对象了;
  3. 配置ReturnCallback时可以采用匿名内部类的方法简化代码,如果消息发送失败可以根据需要进行消息重发操作。

2.3 定义confirm回调机制

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同,可以通过测试方法进行定义。

    @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() throws InterruptedException {//1.准备消息String message = "hello spring amqp";//2.准备CorrelationData//2.1 消息IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//2.2 准备准备ConfirmCallbackcorrelationData.getFuture().addCallback(confirm -> {if (confirm.isAck()) {log.debug("消息成功投递到交换机!消息ID:{}", correlationData.getId());} else {log.error("消息投递到交换机上失败!消息ID:{}", correlationData.getId());//重发消息}}, throwable -> {//记录日志log.error("发送消息失败!",throwable);//重发消息});//3.发送消息rabbitTemplate.convertAndSend("amq.topic","a.simple.hello",message,correlationData);//加上休眠时间 避免mq连接直接关闭Thread.sleep(1000);}

注意:

  1. 生产者给交换机发送的消息数据很多的,为了区分每个消息的归属,每个消息都要附属上一个ID信息,可以采用UUID的方式生成唯一身份标识;
  2. 在发送消息的时候需要增加一个correlation变量,这个变量记录了两个东西(1.每个消息的ID 2.定义的cinfirm回调机制);
  3. 加上线程休眠的操作是为了避免消息发送到交换机之后mq的连接直接关闭,这样会导致返回ack的错误。

3,消息持久化

消息持久化是站在MQ Broker的角度来保证消息的可靠性的,将交换机、队列以及消息设置成持久化的从而避免MQ宕机造成消息的丢失!

3.1 交换机持久化

@Beanpublic DirectExchange simpleDirect(){return new DirectExchange("simple.direct",true,false);}

第二个参数设置成true就是让就交换机是可持久化的,第三个参数是是否自动删除,一般设为false;

3.2 队列持久化

@Beanpublic Queue simpleQueue(){return QueueBuilder.durable("simple.queue").build();}

durable的意思就是可持久化的,传入队列名称然后进行build操作,这样创建的队列就是一个可持久化的队列;

3.3 消息持久化

将交换机和队列设置为持久化的之后重启MQ服务器之后消息依然会丢失,因为发送的消息不是可持久化的,所以也需要将消息设置成可持久化的

4,消费者消息确认

消费者消息确认是站在消费者的角度来保证消息可靠性的,消息者处理完一条消息之后需要给MQ Broker返回一条ACK表示消息处理完成!

4.1 三种确认模式

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack;
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack;
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。

4.2 none模式的演示

1.修改消费者工程中的配置文件

spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack

2.监听一个队列,在监听的方法中模拟一个异常情况,观察消息是否会被删除

@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {log.debug("消费者接收到simple.queue的消息:【" + msg + "】");//这里模拟一个异常System.out.println(1 / 0);log.info("消费者处理消息成功!");}

3.在rabbitmq控制台模拟发送一条消息,观察抛出异常之后消息是否会重发

 抛出异常消费者并没有处理消息成功,再观察控制台是否将消息删除:

 队列中已经没有消息了,说明消息被删除了!

消费者确认机制为none的时候,只要消费者拿到消息之后MQ就会把消息删除,不关心消费者是否将消息成功处理!

4.3 auto模式的演示

1.修改消费者工程中的配置文件

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack

 2.监听一个队列,在监听的方法中模拟一个异常情况,观察消息是否会被删除

@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {log.debug("消费者接收到simple.queue的消息:【" + msg + "】");//这里模拟一个异常System.out.println(1 / 0);log.info("消费者处理消息成功!");}

3.在rabbitmq控制台模拟发送一条消息,观察抛出异常之后消息是否会重发

消费者确认机制为auto的时候,消费者拿到消息之后MQ并不会立刻删除队列中的消息,只有消费者成功处理完消息之后给队列返回一个ack的时候队列才会删除消息!

5, 消费者失败重试机制

我们发现当消费者确认机制为auto时,如果代码中出现了异常,消息会进行重复入队列(requeue)的操作,重复入队的操作对于MQ来说开销会非常大,消息处理飙升,所以引入了失败重试机制:当代码中出现了异常的时候,消费者内部会进行重发的操作(可以控制重发的时间和次数),如果超过设置的重发次数消费者还未成功处理消息默认将消息丢弃!

5.1 本地重试

Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列,可以在消费者工程的yml文件中添加如下配置:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 3 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 4 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

 4次重发之后消息还未成功处理spring抛出了AmqpRejectAndDontRequeueException异常,这是失败之后的默认处理方式,默认消费者给队列返回了ack,此时队列会将消息从队列中删除!

5.2 失败策略

失败达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认就是这种方式;
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队;
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

如果消息这个消息比较重要,达到最大重试次数之后这个消息不能被丢弃该怎么办,此时就可以使用RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

@Configuration
public class ErrorMessageConfig {//定义失败之后处理的交换机和队列@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}//将交换机和队列进行绑定@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}//定义一个RepublishMessageRecoverer,替换spring默认的处理机制​@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

流程图如下:

 6, 如何保证RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列;
  • 开启持久化功能,确保消息未消费前在队列中不会丢失;
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack;
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1099.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习babylon.js --- [2] 项目工程搭建

本文讲述如何搭建babylonjs的项目工程。 一 准备 首先创建一个目录叫MyProject,然后在这个目录里再创建三个目录:dist,public和src,如下, 接着在src目录里添加一个文件叫app.ts,本文使用typescript&#…

docker数据卷权限管理--理论和验证

一、Docker容器中用户权限管理 Linux系统的权限管理是由uid和gid负责,Linux系统会检查创建进程的uid和gid,以确定它是否有足够的权限修改文件,而非是通过用户名和用户组来确认。 同样,在docker容器中主机上运行的所有容器共享同一…

【kubernetes系列】Kubernetes之配置dashboard安装使用

Kubernetes之配置dashboard 概述 Dashboard 是基于网页的 Kubernetes 用户界面。 你可以使用 Dashboard 将容器应用部署到 Kubernetes 集群中,也可以对容器应用排错,还能管理集群资源。 你可以使用 Dashboard 获取运行在集群中的应用的概览信息&#x…

【单例模式】—— 每天一点小知识

💧 单例模式 \color{#FF1493}{单例模式} 单例模式💧 🌷 仰望天空,妳我亦是行人.✨ 🦄 个人主页——微风撞见云的博客🎐 🐳 《数据结构与算法》专栏的文章图文并茂🦕生动形…

LiveGBS流媒体平台GB/T28181功能-作为上级平台对接海康大华华为宇视等下级平台监控摄像机NVR硬件执法仪等GB28181设备

LiveGBS作为上级平台对接海康大华华为宇视等下级平台监控摄像机NVR硬件执法仪等GB28181设备 1、背景说明2、部署国标平台2.1、安装使用说明2.2、服务器网络环境2.3、信令服务配置 3、监控摄像头设备接入3.1、海康GB28181接入示例3.2、大华GB28181接入示例3.3、华为IPC GB28181接…

SpringBoot整合ZooKeeper完整教程

目录 ZooKeeper简单介绍 一、安装zookeeper 二、springboot整合zookeeper ZooKeeper简单介绍 zookeeper是为分布式应用程序提供的高性能协调服务。zookeeper将命名、配置管理、同步和组服务等常用服务公开在一个简单的接口中,因此用户无需从头开始编写这些服务。可…

Android GridPager实战,从RecyclerView to ViewPager

这个简单的的案例展示了如何从RecyclerView to ViewPager,以网上的公开图片为样例。 安卓开发中从RecyclerView 到 ViewPager demo运行结果demo项目工程目录结构关键代码 MainActivity关键代码GridFragment关键代码ImageFragment关键代码ImagePagerFragment关键布局…

CSS---CSS面试题

目录 1.盒模型 2.offsetHeight /clientheight/scrollHeight 3.left与offsetLeft 4.对BFC规范的理解 5.解决元素浮动导致的父元素高度塌陷的问题 6.CSS样式的先级 7.隐藏页面元素 8.display: none 与 visibility: hidden 的区别 9.页面引入样式时,使用link与import有…

C++学习——类和对象(一)

C语言和C语言最大的区别在于在C当中引入了面向对象的编程思想,想要完全了解c当中的类和对象,就要从头开始一点一点的积累并学习。 一:什么是面向对象编程 我们之前学习的C语言属于面向过程的编程方法。举一个简单的例子来说:面向过…

使用npm和nrm查看源和切换镜像

一、使用npm查看当前源、切换淘宝镜像、切换官方源 (1)npm查看当前源: npm get registry (2)npm设置淘宝镜像源: npm config set registry http://registry.npm.taobao.org (3)n…

【运维工程师学习三】Linux中Shell脚本编写

【运维工程师学习三】shell编程 Shell程序分类1、系统中sh命令是bash的软链接2、Shell脚本标准格式之文件后缀3、Shell脚本标准格式之文件内容首行4、Shell脚本的运行方法一、作为可执行程序解释 二、作为解释器(bash)参数 5、find、grep、xargs、sort、…

网络协议与攻击模拟-17-DNS协议-报文格式

二、DNS 查询 客户机想要访问www.baidu.com,根据自己的 TCP / IP 参数,向自己的首选 DNS 服务器发送 DNS 请求 首选 DNS 收到客户机的请求后,会去查询自己的区域文件,找不到www.baidu.com的 IP 地址信息(将请求转发到…

MYSQL 5.7.17 安装版 的配置文件

解压版解压后都有 my.ini配置文件,安装版要查找这个配置文件可以查看 MYSQL Workbench --> 左侧 INSTANCE --> Options File ,然后可以看到底部 Configuration File所处的位置,即为my.ini的路径。

Jupyter notebook添加与删除kernel

目录 1 添加虚拟环境的kernel 2 删除jupyter notebook已有的kernal 3 切换内核与查看当前内核 4 添加C语言的kernel 5 添加python2的kernel 6 添加java语言的kernel 6.1 sudo apt install default-jre 6.2 下载并安装 ijava 6.3 sudo apt install openjdk-11…

TortoiseGit 入门指南05:推送和拉取

本节所讲内容均涉及到 远端版本库。 版本库 的概念在《TortoiseGit 入门指南02:创建和克隆仓库》中提到过,它是工作目录下面的一个名为 .git 的隐藏目录,我们每一次提交、每一个分支都会保存在版本库中。这个版本库就在我们电脑上的某个文件…

鸽了百万用户四年的赛博皮卡终于要来啦

作者 | Amy 编辑 | 德新 本月15号,特斯拉官方宣布,第一辆 赛博皮卡已在特斯拉得州工厂下线。 而就在本月初,马斯克还发推预热了一波,「开着赛博皮卡在奥斯汀(特斯拉得州工厂所在地)溜了一圈&#xff01…

THREE.JS镜头随鼠标晃动效果

为了让动画更灵活并且简单 借助gsap让其具有更多可能,在未来更容易扩充其他动效 gsap Dom跟随鼠标移动 gsap.quickTo() 首先要监听鼠标移动,并且将移动的值转换到 -1 和 1 之间 方便处理 private mousemove(e: MouseEvent) {const x (e.clientX / inner…

华为配置LLDP基本功能

华为配置LLDP基本功能 1.什么是lldp协议 定义 LLDP(Link Layer Discovery Protocol)是IEEE 802.1ab中定义的链路层发现协议。LLDP是一种标准的二层发现方式,可以将本端设备的管理地址、设备标识、接口标识等信息组织起来,并发布给自己的邻居设备,邻居设备收到这些信息后将…

SSH远程直连Docker容器

文章目录 1. 下载docker镜像2. 安装ssh服务3. 本地局域网测试4. 安装cpolar5. 配置公网访问地址6. SSH公网远程连接测试7.固定连接公网地址8. SSH固定地址连接测试8. SSH固定地址连接测试 转载自cpolar极点云文章:SSH远程直连Docker容器 在某些特殊需求下,我们想ssh…

45、Spring Boot自动配置原理

Spring Boot自动配置原理 lmport Configuration Spring spi 自动配置类由各个starter提供,使用Configuration Bean定义配置类,放到META-INF/spring.factories下使用Spring spi扫描META-INF/spring.factories下的配置类使用lmport导入自动配置类