【Rabbitmq篇】RabbitMQ⾼级特性----持久性,发送⽅确认,重试机制

目录

一.持久化

1 .交换机持久化 

2 队列持久化

3.消息持久化

测试场景

二.发送⽅确认 

 1 .confirm确认模式

2 return退回模式

如何保证RabbitMQ消息的可靠传输?(面试重点)

三. 重试机制


一.持久化

我们在前⾯讲了消费端处理消息时,消息如何不丢失,但是如何保证当RabbitMQ服务停掉以后,⽣产者发送的消息不丢失呢.默认情况下, RabbitMQ 退出或者由于某种原因崩溃时,会忽视队列和消息,除⾮告知他不要这么做.

它确保消息在消息队列崩溃、重启或其他故障情况下仍能得以保留。

RabbitMQ 的持久化是确保消息在服务器重启或意外故障后不会丢失的重要特性。它主要涉及到交换机(Exchange)、队列(Queue)和消息(Message)三个层面的持久化。


1 .交换机持久化 

交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机,交换机会⾃动建⽴,相当于⼀直存在

此时都是持久化的 

设置交换机持久化为true

设置交换机持久化为false

测试持久化交换机:

 测试非持久化交换机:

 


此时我们重启我们的rabbitmq 看看结果怎么样? 

service rabbitmq-server restart

此时非持久化的交换机没了,所有消息也丢失了 

如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,对⼀个⻓期使⽤的交换器来说,建议将其置为持久化的.  


2 队列持久化

 队列的持久化是通过在声明队列时将 durable 参数置为true实现的.
如果队列不设置持久化,那么在RabbitMQ服务重启之后,该队列就会被删掉,此时数据也会丢失.(队列没有了,消息也⽆处可存了)

持久化队列: 

点开源码可以看见该⽅法默认 durable 是true

 非持久化队列:


3.消息持久化

消息实现持久化,需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,
也就是 MessageDeliveryMode. PERSISTENT


测试场景

1)队列持久化&&消息持久化 vs  队列非持久化&&消息非持久化

    @Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");return "消息发送成功!";}@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息非持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息发送成功";}

队列

重启设备 更新结果

 

队列持久化&&消息持久化 消息保留

队列非持久化&&消息非持久化 丢失


2)队列持久化 &&消息非持久化 

    @RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息非持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息发送成功";}

重启 更新结果 

 队列持久化 &&消息非持久化  队列未丢失,消息丢失

3)队列非持久化 &&消息持久化 

    @RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息发送成功";}

 重启 更新结果 

 

队列非持久化 &&消息持久化 队列丢失,消息丢失


将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗

答案是否定的.
1. 从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据居丢失.这种情况很好解决,将autoAck参数设置为false,并进⾏⼿动确认,详细可以参考[消息确认]章节.
2. 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中.RabbitMQ并不会为每条消息都进⾏同步存盘(调⽤内核的fsync⽅法)的处理,可能仅仅保存到操作系统缓存之中⽽不是物理磁盘之中.如果在这段时间内RabbitMQ服务节点发⽣了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失. 

如何解决? 

1. 引⼊RabbitMQ的仲裁队列(后⾯再讲),如果主节点(master)在此特殊时间内挂掉,可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性,除⾮整个集群都挂掉(此⽅法也不能保证100%可靠,但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多,实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
2. 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ中,详细参考下⼀个章节内容介绍--"发送⽅确认


二.发送⽅确认 

在使⽤RabbitMQ的时候,可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失,

但是还有⼀个问题,当消息的⽣产者将消息发送出去之后,消息到底有没有正确地到达服务器呢如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启,那么RabbitMQ重启期间⽣产者消息投递失败),持久化操作也解决不了这个问题,

因为消息根本没有到达服务器,何谈持久化 ?

RabbitMQ为我们提供了两种解决⽅案:
a. 通过事务机制实现(消耗性能,不推荐)
b. 通过发送⽅确认(publisherconfirm)机制实现 

 1 .confirm确认模式

 Producer在发送消息的时候,对发送端设置⼀个ConfirmCallback的监听,⽆论消息是否到达
Exchange,这个监听都会被执⾏,如果Exchange成功收到,ACK( Acknowledge character ,确认字符)为true,如果没收到消息,ACK就为false.

1)配置RabbitMQ

2)设置确认回调逻辑并发送消息 

⽆论消息确认成功还是失败,都会调⽤ConfirmCallback的confirm⽅法.如果消息成功发送到Broker,  ack为true.
如果消息发送失败,ack为false,并且cause提供失败的原因. 

package com.bite.extensions.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n", correlationData == null ? null : correlationData.getId());} else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData == null ? null : correlationData.getId(), cause);//相应的业务处理}}});return rabbitTemplate;}
}

3)测试

@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");return "消息发送成功!";}@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息发送成功";}@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE+"hhh","confirm","confirm test...",correlationData);return "消息发送成功";}
}

 

 ​​

这里在prev 方法里面没有实现 confirmRabbitTemplate方法 却依然打印了接收到信息
为什么?

因为此时他们共用了一个confirmRabbitTemplate方法

此时要把他们分开 各种使用各自的

测试正常: 

 ​​​​


抛出问题 继续测试

 

此时不正确的routingKey 此时应该发送失败 为什么还发送成功?

 原来此时ack只是看看是否发送到交换机 发送成功就显示true 

但是并没有发送到对应队列 这如何解决?

引入return模式


2 return退回模式

消息到达Exchange之后,会根据路由规则匹配,把消息放⼊Queue中.Exchange到Queue的过程,如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者.消息退回给发送者时,我们可以设置⼀个返回回调⽅法,对消息进⾏处理. 

1)配置RabbitMQ 

publisher-confirm-type: correlated #消息发送确认

2) 设置返回回调逻辑并发送消息 

//rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息退回:"+returnedMessage);}});

使⽤RabbitTemplate的setMandatory⽅法设置消息的mandatory属性为true(默认为false).这个属性的作⽤是告诉RabbitMQ,如果⼀条消息⽆法被任何队列消费,RabbitMQ应该将消息返回给发送者,此时 ReturnCallback 就会被触发. 

3)测试

    @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("3");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm111","returns test...",correlationData);return "消息发送成功";}

并没有撤回  原因在于 :

给他恢复设置成true   rabbitTemplate.setMandatory(true);

 继续测试

此时消息已被撤回 剩下的还是刚才的未处理的信息 


如何保证RabbitMQ消息的可靠传输?(面试重点)

 从这个图中,可以看出,消息可能丢失的场景以及解决⽅案:

1. ⽣产者将消息发送到RabbitMQ失败

        a. 可能原因:⽹络问题等

        b. 解决办法:参考本章节[发送⽅确认-confirm确认模式]

2. 消息在交换机中⽆法路由到指定队列:

        a. 可能原因:代码或者配置层⾯错误,导致消息路由失败

        b. 解决办法:参考本章节[发送⽅确认-return模式]

3. 消息队列⾃⾝数据丢失

        a. 可能原因:消息到达RabbitMQ之后,RabbitMQServer宕机导致消息丢失.

        b. 解决办法:参考本章节[持久性].开启RabbitMQ持久化,就是消息写⼊之后会持久化到磁盘,如果RabbitMQ挂了,恢复之后会⾃动读取之前存储的数据.(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的⽅式提⾼可靠性)

4. 消费者异常,导致消息丢失

        a. 可能原因:消息到达消费者,还没来得及消费,消费者宕机.消费者逻辑有问题.

        b. 解决办法:参考本章节[消息确认].RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息.默认情况下消费者应答机制是⾃动应答的,可以开启⼿动确认,当消费者确认消费成功后才会删除消息,从⽽避免消息丢失.除此之外,也可以配置重试机制(参考下⼀章节),当消息消费异常时,通过消息重试确保消息的可靠性 


三. 重试机制

 在消息传递过程中,可能会遇到各种问题,如⽹络故障,服务不可⽤,资源不⾜等,这些问题可能导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误,那么多次重试也是没有⽤的,可以设置重试次数

1 )重试配置 

2) 配置交换机&队列

    //重试机制public static final String RETRY_QUEUE = "retry.queue";public static final String RETRY_EXCHANGE = "retry.exchange";
    //重试机制@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();}

3 )发送消息

    @RequestMapping("/retry")public String retry() {confirmRabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");return "消息发送成功";}

 4)消费消息

    @RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3/0;System.out.println("业务处理完成");}

5)auto(自动)测试 

 

重发5次 因为代码错误 所以没有效果

6)manual 测试 

    @RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3 / 0;System.out.println("业务处理完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}

 结果:

无效 达不到我们想要的目的

 使⽤重试机制时需要注意:
1. ⾃动确认模式下:程序逻辑异常,多次重试还是失败,消息就会被⾃动确认,那么消息就丢失

2. ⼿动确认模式下:程序逻辑异常,多次重试消息依然处理失败,⽆法被确认,就⼀直是
unacked的状态,导致消息积压


结语: 写博客不仅仅是为了分享学习经历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,接受大家的批评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/64863.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习之目标检测——RCNN

Selective Search 背景:事先不知道需要检测哪个类别,且候选目标存在层级关系与尺度关系 常规解决方法:穷举法,在原始图片上进行不同尺度不同大小的滑窗,获取每个可能的位置 弊端:计算量大,且尺度不能兼顾 Selective …

Flutter环境搭建

1.Flutter 简介 1.1 Flutter 是什么 ? Flutter 是一个 UI SDK(Software Development Kit)跨平台解决方案:可以实现一套代码发布移动端(iOS、Android、HarmonyOS)、Web端、桌面端目前很多公司都在用它&…

安全算法基础(一)

安全算法是算法的分支之一,还的依靠大量的数学基础进行计算,本文参照兜哥的AI安全样本对抗,做一个简单的算法安全概括,从零学习。 最新的安全算法对于我们常规的攻击样本检测,效果是不理想的,为了探究其原…

单元测试-Unittest框架实践

文章目录 1.Unittest简介1.1 自动化测试用例编写步骤1.2 相关概念1.3 用例编写规则1.4 断言方法 2.示例2.1 业务代码2.2 编写测试用例2.3 生成报告2.3.1 方法12.3.2 方法2 1.Unittest简介 Unittest是Python自带的单元测试框架,适用于:单元测试、Web自动…

QtCreator配置github copilot实现AI辅助编程

文章目录 1、概述2、配置环境3、演示 1、概述 新时代的浪潮早就已经来临,上不了船的人终将被抛弃,合理使用AI辅助开发、提升效率是大趋势,注意也不要过于依赖。 2024年12月18日,GitHub 官方宣布了一个激动人心的重大消息&#xf…

数字经济下的 AR 眼镜

目录 1. 📂 AR 眼镜发展历史 1.1 AR 眼镜相关概念 1.2 市面主流 XR 眼镜 1.3 AR 眼镜大事记 1.4 国内外 XR 眼镜 1.5 国内 AR 眼镜四小龙 2. 🔱 关键技术 2.1 AR 眼镜近眼显示原理 2.2 AR 眼镜关键技术 2.3 AR 眼镜技术难点 3. &#x1f4a…

LabVIEW深海气密采水器测控系统

LabVIEW的深海气密采水器测控系统通过高性价比的硬件选择与自主开发的软件,实现了高精度的温度、盐度和深度测量,并在实际海上试验中得到了有效验证。 项目背景 深海气密采水器是进行海底科学研究的关键工具,用LabVIEW开发了一套测控系统&am…

RocketMQ的集群架构是怎样的?

大家好,我是锋哥。今天分享关于【RocketMQ的集群架构是怎样的?】面试题。希望对大家有帮助; RocketMQ的集群架构是怎样的? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 RocketMQ 是阿里巴巴开源的分布式消息中间件,广泛用于处…

【Rust自学】4.5. 切片(Slice)

4.5.0. 写在正文之前 这是第四章的最后一篇文章了,在这里也顺便对这章做一个总结: 所有权、借用和切片的概念确保 Rust 程序在编译时的内存安全。 Rust语言让程序员能够以与其他系统编程语言相同的方式控制内存使用情况,但是当数据所有者超…

AI的进阶之路:从机器学习到深度学习的演变(一)

AI的进阶之路:从机器学习到深度学习的演变 在当今科技迅猛发展的时代,人工智能(AI)、机器学习(ML)和深度学习(DL)已成为推动创新的核心力量。这三个领域虽然紧密相连,却…

《算法》题目

多项选择题 2023年2月,美国国家标准与技术研究院(NIST)将 Ascon算法确立为轻量级加密(LWC)标准,关于该算法和标准的说法,正确的是( )。 A.该标准属于国际标准 B.该标准旨在保护物联网(IoT)创建和传输的信息 C.通过法律法规规范标准化机构的职责与权限,可以起到推动技…

Git配置公钥步骤

GIt公钥的配置去除了git push输入账号密码的过程,简化了push流程。 1.生成SSH公钥和私钥 ssh-keygen -t rsa -b 4096 -C “your_emailexample.com” 遇到的所有选项都按回车按默认处理。获得的公钥私钥路径如下: 公钥路径 : ~/.ssh/id_rsa.pub 私钥路径…

Mysql的多表查询及表的连接

Mysql的多表查询及表连接 目录 Mysql的多表查询及表连接连接查询条件有关联的表的连接natural joinusingon等值连接非等值连接 表与表的外连接左外连接右外连接 表的自连接表的子连接表的伪表查询 连接查询条件 查询的两张表如果出现同名的列,我们需要将表名标注到列…

移动魔百盒中的 OpenWrt作为旁路由 安装Tailscale并配置子网路由实现在外面通过家里的局域网ip访问内网设备

移动魔百盒中的 OpenWrt作为旁路由 安装Tailscale并配置子网路由实现在外面通过家里的局域网ip访问内网设备 一、前提条件 确保路由器硬件支持: OpenWrt 路由器需要足够的存储空间和 CPU 性能来运行 Tailscale。确保设备架构支持 Tailscale 二进制文件,例…

每天40分玩转Django:Django部署

Django部署 一、今日学习内容概述 学习模块重要程度主要内容生产环境配置⭐⭐⭐⭐⭐settings配置、环境变量WSGI服务器⭐⭐⭐⭐⭐Gunicorn配置、性能优化Nginx配置⭐⭐⭐⭐反向代理、静态文件安全设置⭐⭐⭐⭐⭐SSL证书、安全选项 二、生产环境配置 2.1 项目结构调整 mypr…

CVPR2024 | 通过集成渐近正态分布学习实现强可迁移对抗攻击

Strong Transferable Adversarial Attacks via Ensembled Asymptotically Normal Distribution Learning 摘要-Abstract引言-Introduction相关工作及前期准备-Related Work and Preliminaries1. 黑盒对抗攻击2. SGD的渐近正态性 提出的方法-Proposed Method随机 BIM 的渐近正态…

华为IPD流程6大阶段370个流程活动详解_第一阶段:概念阶段 — 81个活动

华为IPD流程涵盖了产品从概念到上市的完整过程,各阶段活动明确且相互衔接。在概念启动阶段,产品经理和项目经理分析可行性,PAC评审后成立PDT。概念阶段则包括产品描述、市场定位、投资期望等内容的确定,同时组建PDT核心组并准备项目环境。团队培训涵盖团队建设、流程、业务…

《LangChain大模型应用开发》书籍分享

前言 ChatGPT和OpenAI开发的GPT模型不仅改变了我们的写作和研究方式,还改变了我们处理信息的方式。《LangChain大模型应用开发》讨论了聊天模式下的LLM的运作、能力和局限性,包括ChatGPT和Gemini。书中通过一系列实际例子演示了如何使用LangChain框架构…

Win10将WindowsTerminal设置默认终端并添加到右键(无法使用微软商店)

由于公司内网限制,无法通过微软商店安装 Windows Terminal,本指南提供手动安装和配置新版 Windows Terminal 的步骤,并添加右键菜单快捷方式。 1. 下载新版终端安装包: 访问 Windows Terminal 的 GitHub 发布页面:https://githu…

Oracle中间件 SOA之 OSB 12C服务器环境搭建

环境信息 服务器基本信息 如下表,本次安装总共使用1台服务器,具体信息如下: App1服务器 归类 APP服务器 Ip Address 172.xx.30.xx HostName appdev01. xxxxx.com Alias appdev01 OSB1服务器 归类 OSB服务器 Ip Address 172.xx3…