RabbitMQ 的高阶应用及可靠性保证

       

目录

一、RabbitMQ 高阶应用        

     1.1 消息何去何从

        1.2 过期时间

        1.3 死信队列

        1.4 延迟队列

        1.5 优先级队列

        1.6 消费质量保证(QOS)

二、持久化

三、生产者确认

四、消息可靠性和重复消费

        4.1 消息可靠性

        4.2 重复消费问题


上篇文章介绍了 RabbitMQ 的基本概念和使用,这篇文章就来介绍下其高阶应用和可靠性保证。

一、RabbitMQ 高阶应用        

        RabbitMQ 还提供了诸多高级特性,比如:过期时间、交换器备份、死信队列、延迟队列、优先级队列、持久化、消费端消息分发等等,下面介绍几个重要特性。

     1.1 消息何去何从

        mandatory 参数,当设置为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当设置为 false 时,出现上述情形,则消息直接被丢弃。那么生产者如何获取到未被路由到合适队列的消息呢?需要实现 listener,SpringBoot 中需要实现 ReturnCallback。

        immediate 参数,为 true 时,如果交换机将消息路由到队列时发现队列上并不存在消费者,那么这条消息将不会被存入队列中。当与路由键匹配的队列都没有消费者时,该消息会 return 给生产者。

        概括来说,mandatory 参数告诉服务器至少将消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递,如果所有匹配的队列上无消费者,则将消息返回给生产者,不用将消息存入队列等待消费者。

        RabbitMQ 3.0 版本去掉了 immediat 参数的支持,官方解释是:会影响镜像队列的性能,增加代码的复杂性,建议采用 TTL 和 DLX 的方法替换。

        1.2 过期时间

        RabbitMQ 可以对消息和队列设置 TTL。

        设置消息的TTL。方法一:通过队列的属性设置,队列中的所有消息都有相同的过期时间,一旦消息过期,就会立即从队列中抹去。方法二:对消息单独设置,每条消息的TTL可以不同,即使消息过期也不会立即从队列中抹去,在投递前判定。如果两者一同使用,则以最小的那个为准,消息的生存时间一旦超过了设置的TTL,就会变成“死信”,消费者则无法收到该消息。设置过期时间的方法如下:

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 6000);
Queue queue = new Queue(vodQueue, true, false, false, args);

        1.3 死信队列

        DLX,全称为 Dead-Letter-Exchange,死信交换器。当一个消息在队列中变成死信之后,它能被发送到另一个交换器中,这个交换器即是 DLX,绑定死信交换器的队列称为死信队列。消息变为死信的情况:

  • 消息被拒绝,并且设置的 requeue 为 false
  • 消息过期
  • 队列达到最大长度

        DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动的将消息重新发布到设置的 DLX 上去,进而被路由到另一个队列中,即死信队列。可以监听这个队列中的消息进行相应的处理。设置死信队列的方法:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
Queue queue = new Queue(vodQueue, true, false, false, args);

        对于 RabbitMQ 来说,DLX 是一个非常有用的特性。他可以处理异常情况,消息不能被消费者正确消费而被至于死信队列中的情况后去分析程序可以通过这个死信队列中的内容来分析当时所遇到的一场情况。进而可优化改善系统。

        1.4 延迟队列

        所谓延迟队列是指当消息被发出后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到消息。

        在AMQP协议中,或者 RabbitMQ 本身并没有直接支持延迟队列的功能,但可以通过 DLX 和TTL 来实现。        

        1.5 优先级队列

         优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。设置优先级队列

Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
Queue queue = new Queue(vodQueue, true, false, false, args);

        在发送消息时设置当前的优先级,默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个是有前提的:如果在消费者消费速度大于生产者的生产速度且broker 中没有消息堆积的情况,对发送的消息设置优先级也就没有什么实际意义了。因为生产者刚发送完一条消息就被消费了,那么就意味着 broker 中至多有一条消息,对于单条消息来说优先级是没意义的。       

        1.6 消费质量保证(QOS)

        当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询的方式分发给消费者。每条消息只会发送给一个消费者。这种方式非常适合扩展,而且是专门为并发程序设计的。如果现在的负载加重,只需要创建更多的消费者即可。

        这种方式不那么优雅,分发中不管消费者的消息是否处理完了,试想一下,某些消费者的任务繁重,来不及处理消息并确认,而某些消费者由于某些原因很快处理完了所分配的消息,进而进程空闲,这样会造成总体的吞吐量下降。该如何处理这种情况呢?引入Qos,他会告诉 broker 我没消费完当前消息前,不要给我新消息了,这就保证了消费质量。Qos对于拉模式是无效的。                设置方法如下:

// prefetchSize和prefetchCount设置为0,说明无限制
// prefetchSize: 指定消费者可以接收的最大内容量(单位通常是字节)。如果设置了非零值,RabbitMQ 会阻止发布者发送更多的消息,直到消费者发送了足够多的确认来释放足够的容量。默认情况下,RabbitMQ 并不实现 prefetchSize 参数,所以通常设置为0,表示不对此做限制。
// prefetchCount: 更常用的一个参数,表示消费者最多可以接收多少个未确认的消息。当达到这个数量后,RabbitMQ 将暂停向该消费者推送更多消息,直到消费者确认了部分消息,腾出了“槽位”。例如,设置为1意味着消费者每次处理完一个消息并发送确认之后,才能接收下一个消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

二、持久化

        持久化可以提高 RabbitMQ 的可靠性,防止在异常情况下消息的丢失。RabbitMQ 的持久化分成三部分:交换器的持久化、队列的持久化和消息的持久化。

        交换器的持久化可以在声明交换器的时候设置,如果交换器不设置持久化,RabbitMQ 重启后,交换器的元数据会丢失,不过消息不会丢失,只是不能将消息发送给这个交换器了。

        队列的持久化在声明队列的时候设置,如果不设置队列的持久化,RabbitMQ 服务重启后,队列的元数据会丢失,此时数据也会丢失。

        将交换器、队列、消息都设置成持久化后能保证数据不丢失吗?答案是否定的。

  1. 从消费者的角度来看,将 autoAck 设置成 true,那么当消费者接受到相关消息后,还没来得及处理就宕机了,这样也算数据丢失。
  2. 在持久化的消息正确存入 RabbitMQ 之后,还需要一段时间才能存入磁盘。RabbitMQ 不会为每条消息都进行同步存盘(调用内核的fsync)的处理,可能仅保存在操作系统缓存之中而不是物理磁盘之中。如果在这段时间内,RabbitMQ 发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。

        上面的问题可以通过镜像队列机制来解决。相当于配置了副本,如果主节点在此特殊时期挂掉了,可以自动切换到从节点,这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证不丢失,但这样已经好很多。

三、生产者确认

        生产者将消息发送出去后,消息到底有没有到达服务器呢?如果不进行配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下,生产者不知道消息有没有正确的到达服务器。如果到达服务器之前就丢失了,持久化操作也解决不了问题,因为还没到达服务器,何谈持久化?针对这个问题提供了两种解决方式:

  • 通过事务机制实现
  • 通过发送确认机制实现

        开启事务多了几个环节,只有消息成功被 RabbitMQ 接收,事务才能提交,否则便可在捕获异常之后进行处理。但事务会严重影响 RabbitMQ 的性能,大大降低吞吐量。

        发送方确认是一种轻量级的机制,生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包括消息的唯一id),这就使得生产者知晓消息已经正确到达目的地。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。

        事务机制在一条消息发出后会使发送端阻塞,以等待rabbitmq的回应,之后才能发送下一条消息。相比消息确认机制,发送方确认机制是异步的。事务机制和确认机制是互斥的不能共存。

四、消息可靠性和重复消费

        只要涉及到消息中间件,消息可靠性和重复消费就是无可避免的话题,那 RabbitMQ 是如何设计的呢?

        4.1 消息可靠性

  1. 持久化设置,这在上文已经介绍,通过持久化队列、交换器和消息来保存消息。
  2. 事务和确认机制:上文已经介绍了生产者的确认机制,通过这个机制来保证生产者发送的消息不回丢失。
  3. 消费者消息确认:可以通过消息的手动ack来保证消息能消费完成
  4. 消息镜像队列:设置队列为镜像队列,可以将消息复制到多个节点,即使某个节点宕机,消息仍可以从其他节点获取。

        通过以上措施的组合使用,可以大大提高 RabbitMQ 消息传递的可靠性,尽可能减少消息丢失的风险。然而,即使采取了所有措施,也不能完全保证100%的消息不丢失,因为消息在传输过程中可能还受到网络、硬件故障等因素的影响。在实际应用中,需要根据业务场景权衡消息的可靠性、性能和成本。

        4.2 重复消费问题

        在 RabbitMQ 中,重复消费指的是同一个消息被多个消费者或者同一个消费者消费多次的现象。这种问题可能会导致数据不一致或者业务逻辑错误。造成重复消费的原因可能有:

  1. 消费者ACK确认失败:消费者接收到消息并开始处理,但是在处理完毕并发送 ACK 确认之前断开了连接,比如网络抖动或消费者进程异常退出,导致 RabbitMQ 未收到ACK确认,于是消息重新入队等待再次被消费。
  2. 消息重回队列:在有死信交换机(Dead Letter Exchange, DLX)或者消息TTL(Time To Live)到期后重新投递的情况下,消息可能被重新发送到原来的队列或另一个队列,从而被再次消费。
  3. 消费者超时设置不当:如果消费者的超时设置过短,可能会在消息处理未完成时就已经被认为超时,消息会被重新放回队列。

        那如何解决重复消费问题呢?

  1. 消息确认机制:确保消费者正确使用手动确认模式(Manual Acknowledgments),只有当消息处理成功后才发送 ACK 确认给 RabbitMQ,否则在遇到异常时可以重新消费。
  2. 幂等性设计:消费者的业务逻辑应当设计为幂等的,即使同一条消息被消费多次,处理结果也是相同的,不影响业务状态。例如,通过消息ID或业务流水号来判断消息是否已经处理过。
  3. 防重ID:在消息体中携带一个全局唯一的ID,消费者在处理消息前,先检查这个ID是否已经被处理过,如果已经处理过,则直接丢弃消息。

        总之,避免重复消费的关键在于消息确认机制、幂等性设计以及合理的重试和补偿策略。同时,完善的日志记录和监控也是非常重要的,以便在出现问题时能够快速定位和修复。

往期经典推荐

探秘 RabbitMQ 的设计理念与核心技术要点-CSDN博客

走进 Mybatis 内核世界:理解原理,释放更多生产力-CSDN博客

深入浅出 Kafka 消费者:解密分布式消息流的幕后英雄_kafka消费-CSDN博客

深入剖析Kafka生产者:揭秘消息从发送到落地的全过程-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/769016.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

流畅的 Python 第二版(GPT 重译)(三)

第五章&#xff1a;数据类构建器 数据类就像孩子一样。它们作为一个起点是可以的&#xff0c;但要作为一个成熟的对象参与&#xff0c;它们需要承担一些责任。 马丁福勒和肯特贝克 Python 提供了几种构建简单类的方法&#xff0c;这些类只是一组字段&#xff0c;几乎没有额外功…

Linux 安装 JDK、MySQL、Tomcat(图文并茂)

所需资料 下载 1.1 软件安装方式 在Linux系统中&#xff0c;安装软件的方式主要有四种&#xff0c;这四种安装方式的特点如下&#xff1a; 安装方式特点二进制发布包安装软件已经针对具体平台编译打包发布&#xff0c;只要解压&#xff0c;修改配置即可rpm安装软件已经按照re…

美易官方:科技巨头涨势好标普指数年底前有望升至6000点

高盛&#xff0c;作为全球领先的金融机构之一&#xff0c;近日发布了一份报告&#xff0c;预测在科技巨头的涨势推动下&#xff0c;标普500指数年底前有望升至6000点。这一预测引起了市场的广泛关注&#xff0c;投资者们纷纷开始重新评估自己的投资策略。 David Kostin等策略师…

超过 1200 个能够拦截在野外检测到的 2FA 的网络钓鱼工具包

超过 1200 个能够拦截在野外检测到的 2FA 的网络钓鱼工具包。 #################### 免责声明&#xff1a;工具本身并无好坏&#xff0c;希望大家以遵守《网络安全法》相关法律为前提来使用该工具&#xff0c;支持研究学习&#xff0c;切勿用于非法犯罪活动&#xff0c;对于恶…

202基于matlab的曲柄滑块机构的运动学仿真分析

基于matlab的曲柄滑块机构的运动学仿真分析&#xff0c;分析各个杆的速度、位移、加速度曲线&#xff0c;以及曲柄滑块机构的动画。程序已调通&#xff0c;可直接运行。 202 matlab 曲柄滑块机构 运动学仿真分析 - 小红书 (xiaohongshu.com)

第九篇【传奇开心果系列】Python自动化办公库技术点案例示例:深度解读Python处理PDF文件

传奇开心果博文系列 系列博文目录Python自动化办公库技术点案例示例系列 博文目录前言一、重要作用介绍二、Python库处理PDF文件基础操作和高级操作介绍&#xff08;一&#xff09;基础操作介绍&#xff08;二&#xff09;高级操作介绍 三、Python库处理PDF文件基础操作示例代码…

H5实现Web ECharts教程:轻松创建动态数据图表

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

【OpenBayes 官方教程】快速部署通义千问 72B 大模型

本教程主要为大家介绍怎样在 OpenBayes 上快速部署通义千文 72B 大模型&#xff0c;新朋友点击下方链接注册后&#xff0c;即可获得 4 小时 RTX 4090 5 小时 CPU 的免费使用时长哦&#xff01; 注册链接 https://openbayes.com/console/signup?ryuudi_nBBThttps://openbaye…

算法|数学与数论|素数筛

数学与数论|素数筛 1.判断素数 2.朴素筛 3.埃氏筛 4.欧拉筛(线性筛) 心有猛虎&#xff0c;细嗅蔷薇。你好朋友&#xff0c;这里是锅巴的C\C学习笔记&#xff0c;常言道&#xff0c;不积跬步无以至千里&#xff0c;希望有朝一日我们积累的滴水可以击穿顽石。 质数(素数)&…

【教程】高效数据加密混淆方法及实现简介

背景 在需要对数据进行传输或者表达时&#xff0c;通常要求数据加密的安全级别不高&#xff0c;但希望加解密时间复杂度尽可能低。这时使用传统的对称加密&#xff08;如3DES、AES&#xff09;或非对称加密&#xff08;如RSA、ECC&#xff09;显然不太适合。因为加密的安全级别…

【MySQL】10. 复合查询(重点)

复合查询&#xff08;重点&#xff09; 前面我们讲解的mysql表的查询都是对一张表进行查询&#xff0c;在实际开发中这远远不够。 1. 基本查询回顾 数据还是使用之前的雇员信息表 在标题7的位置&#xff01; mysql> select * from emp where sal > 500 or job MANAG…

【数据结构取经之路】队列循环队列

目录 引言 队列的性质 队列的基本操作 初始化 判空 销毁 队列的长度 插入 删除 返回队头元素 循环队列 假溢出 空与满的判定 实现 初始化 插入 判空 销毁 删除 返回队列长度 返回队列头元素 判满 引言 队列和栈一样&#xff0c;也是数据结构的一种&…

特征工程 | 数据清洗、异常值处理、归一化、标准化、特征提取

目录 一. 数据清洗1. 数据清洗&#xff1a;格式内容错误数据清洗2. 数据清洗&#xff1a;逻辑错误清洗3. 数据清洗&#xff1a;去除不需要的数据4. 数据清洗&#xff1a;关联性验证 二. 异常值的处理1. 删除2. 填充 三. 归一化和标准化1. 归一化2. 标准化 四. 特征提取1. One-H…

MyBatis是纸老虎吗?(六)

经过前面一些列文章的梳理&#xff0c;我们已将MyBatis框架所需要的资源都准备好了&#xff1a;数据库连接信息储存在Configuration对象中的Environment属性中&#xff08;该对象中有这样几个属性String类型的id&#xff0c;TransactionFactory类型的transactionFactory、DataS…

如何优雅的爬取公众号文章

目录 相关函数库介绍 代码例子 IP池免费送 相关函数库介绍 在合法合规的前提下&#xff0c;爬取微信公众号文章可以使用以下几个Python库&#xff1a; requests&#xff1a;这是一个非常流行的HTTP库&#xff0c;用于发送各种HTTP请求。它简单易用&#xff0c;能够高效地处…

关于序列化和反序列化

什么是序列化&#xff0c;什么是反序列化 简单来说&#xff1a; 序列化&#xff1a;将数据结构或对象转换成二进制字节流的过程反序列化&#xff1a;将在序列化过程中所生成的二进制字节流转换成数据结构或者对象的过程 为什么要进行序列化 我们要将java对象进行网络传输&a…

TorchAcc:基于 TorchXLA 的分布式训练框架

演讲人&#xff1a;林伟&#xff0c;阿里云研究员&#xff0c;阿里云人工智能平台 PAI 技术负责人 本文旨在探讨阿里云 TorchAcc&#xff0c;这是一个基于 PyTorch/XLA 的大模型分布式训练框架。 过去十年 AI 领域的显著进步&#xff0c;关键在于训练技术的革新和模型规模的快…

详细剖析多线程2----线程安全问题(面试高频考点)

文章目录 一、概念二、线程不安全的原因三、解决线程不安全问题--加锁&#xff08;synchronized&#xff09;synchronized的特性 四、死锁问题五、内存可见性导致的线程安全问题 一、概念 想给出⼀个线程安全的确切定义是复杂的&#xff0c;但我们可以这样认为&#xff1a; 在多…

立体统计图表绘制方法(凸显式环图)

立体统计图表绘制方法&#xff08;凸显式环图&#xff09; 记得我学统计学的时候&#xff0c;那些统计图表大都是平面的框框图&#xff0c;很呆板&#xff0c;就只是表现出统计的意义就好了。在网络科技发展进步的当下&#xff0c;原来一些传统的统计图表都有了进一步的创新。在…

RDGCN翻译

RDGCN翻译 Relation-Aware Entity Alignment for Heterogeneous Knowledge Graphs 面向异质知识图谱的关系感知实体对齐 阅读时间&#xff1a;2024.03.24 领域&#xff1a;知识图谱&#xff0c;知识对齐 作者&#xff1a;Yuting Wu等人 PKU 出处&#xff1a;IJCAI Abstract…