RabbitMQ 的经典问题

文章目录

  • 前言
  • 一、防止消息丢失
    • 1.1 ConfirmCallback/ReturnCallback
    • 1.2 持久化
    • 1.3 消费者确认消息
  • 二、防止重复消费
  • 三、处理消息堆积
  • 四、有序消费消息
  • 五、实现延时队列
  • 六、小结
  • 推荐阅读

前言

当设计和运维消息队列系统时,如 RabbitMQ,有几个关键问题需要特别关注:消息丢失、重复消费、消息堆积、有序消费和延时队列。这些问题直接影响系统的可靠性、性能和数据完整性。本文将深入探讨如何在使用 RabbitMQ 时有效地解决这些问题。

一、防止消息丢失

在 RabbitMQ 中,消息从生产者到消费者需要经历多个阶段。以下是消息传递的流程:

  1. 生产者创建消息:生产者(Producer)创建要发送的消息。
  2. 消息发送到交换机:生产者将消息发送到 RabbitMQ 服务器上的交换机(Exchange)。
  3. 消息进入队列:交换机将消息路由到一个或多个队列(Queue)。
  4. 消息从队列投递到消费者:RabbitMQ 将消息从队列中取出并投递给订阅的消费者。消费者接收到消息后,可以开始处理消息。

在这里插入图片描述

在这个流程中,可能会发生以下几个问题:

  1. 生产者无法确认消息是否发送到 RabbitMQ 服务器,若中途出现意外,则可能丢失消息
  2. 消息在 RabbitMQ 服务器内存中,若服务器出现异常,会丢失消息
  3. RabbitMQ 服务器无法确定消费者是否正常消费消息,中途出现异常,会丢失消息

1.1 ConfirmCallback/ReturnCallback

为了解决第一个问题,生产者无法确认消息是否正确发送到 RabbitMQ 服务器,RabbitMQ 给出的解决方案是:

  1. ConfirmCallback: 用来确认消息是否被 RabbitMQ 服务器成功接收的回调接口。
    • 当消息成功发送到交换机时,会调用 ConfirmCallback 的 confirm 方法。
    • 如果消息发送到交换机失败(比如交换机不存在),也会调用 ConfirmCallback 的 confirm 方法,此时 ack 参数为 false。
    • 通常情况下,ConfirmCallback 用来确认消息是否成功发送到交换机
  2. ReturnCallback: 用来处理未被路由到合适 Queue 的消息的回调接口。
    • 当消息从交换机发送到队列失败时,如果设置了 mandatory 为 true,则会调用 ReturnCallback。
    • 如果消息成功路由到队列,则不会调用 ReturnCallback。
    • 在消息无法被路由到队列时,会调用 ReturnCallback 的 returnedMessage 方法,可以在该方法中处理未路由消息的相关逻辑,比如重新发送或记录日志等。

1.2 持久化

针对第二个问题:RabbitMQ 服务器异常,内存中的内容无法持久化导致消息丢失。RabbitMQ 提供了相应的持久化方案:

  1. 交换机持久化:当声明一个交换机时,可以选择将其标记为持久化。持久化的交换机将会存储在磁盘上,即使 RabbitMQ 服务器重启,也不会丢失。
  2. 队列持久化:类似于交换机,可以声明一个队列为持久化。持久化的队列会在 RabbitMQ 服务器重启后保留,并且其中的消息也会被保留。这确保了即使发生了服务中断或者重启,消息也不会丢失。
  3. 消息持久化:当发布一条消息到 RabbitMQ 时,可以选择使消息持久化。持久化的消息将会写入磁盘,这样即使 RabbitMQ 服务器在消息到达消费者之前崩溃,消息也不会丢失。持久化消息比非持久化消息更加安全,但是也会有性能开销,因为需要写入磁盘。

1.3 消费者确认消息

RabbitMQ 是阅后即焚机制,即 RabbitMQ 将消息发送到消费者后会立刻删除。如果 RabbitMQ 将消息投递给消费者后,RabbitMQ 将消息进行了删除。然而,消费者出现了异常,并没有正常处理消息。就会导致消息丢失。

在 RabbitMQ 中,消费者可以设置不同的确认模式(acknowledgement mode),以确定当消费者收到消息时如何向 RabbitMQ 确认消息已经被处理。这些确认模式通常称为手动确认(manual acknowledgment)、自动确认(automatic acknowledgment)和无确认(no acknowledgment),也可以简写为 manual、auto 和 none。

  1. Manual Acknowledgment (manual):
    • 在手动确认模式下,消费者收到消息后,必须显式地向 RabbitMQ 发送一个确认(ack)来告知它已经处理了该消息。
    • 这种方式可以确保消息只有在消费者成功处理后才被标记为已传递(delivered),避免消息在处理过程中丢失。
  2. Automatic Acknowledgment (auto):
    • 在自动确认模式下,当消费者收到消息并且 RabbitMQ 将其成功传递给消费者时,它会自动向 RabbitMQ 发送一个确认。这意味着一旦消息传递给消费者,RabbitMQ 就会将其标记为已传递,而无需消费者显式确认。
    • 这种模式适合于那些允许偶尔丢失一些消息的应用场景,因为在消息传递给消费者后,就无法确保消费者是否成功处理了消息。
  3. No Acknowledgment (none):
    • 在无确认模式下,消费者在接收到消息后,RabbitMQ 不会等待消费者的确认,也不会尝试重新传递消息,而是将消息传递给消费者并将其标记为已传递,然后立即将其视为已处理。
    • 这种模式通常用于那些不需要保证每条消息都被处理的场景,例如日志处理等。

二、防止重复消费

RabbitMQ 的重复消费问题指的是在消息队列中,消费者可能会多次处理相同的消息,从而导致业务逻辑出现问题或者数据处理不一致的情况。这种问题通常发生在以下几种情况下:

  1. 消息重新投递
    • 当消费者处理消息时发生了异常或者消费者未能确认消息的处理完成(Ack),RabbitMQ 可能会将消息重新投递到同一个消费者或者其他消费者,导致消息被重复消费。
    • 这种情况下,如果消费者处理消息的过程中出现了异常或者无法确认消息处理结果,RabbitMQ 会将消息重新发送给消费者,以确保消息不会丢失。
  2. 消费者处理时间过长
    • 如果消息处理需要较长时间,而消费者在处理过程中并未确认消息的完成(Ack),则 RabbitMQ 可能会误以为消息未能成功处理,再次将消息投递给消费者。这样可能导致消息被重复处理。

在 RabbitMQ 中,防止重复消费的方法通常涉及以下几种策略和技术:

  1. 消息去重(Message Deduplication)
    • 使用唯一标识符(如消息的 ID 或者业务相关的唯一键)来标记每条消息。在消费者端,在处理消息之前,可以通过维护一个已处理消息的列表或者使用缓存(如 Redis)来检查这个唯一标识符是否已经被处理过。如果已经处理过,则可以选择性地丢弃这条消息或者执行相应的处理逻辑。
    • 如果消息具有全局唯一性要求,可以在生产者端确保生成的消息 ID 或者唯一键的唯一性,以确保在消费者端能够准确判断消息是否已经处理过。
  2. 幂等性操作(Idempotent Consumers)
    • 在设计消费者应用程序时,尽量确保消费者的处理逻辑具有幂等性。即使同一条消息被消费多次,也不会引起意外的副作用或者数据不一致的情况。例如,数据库操作中的幂等性可以通过使用唯一键约束、UPSERT(如果支持)、乐观锁等技术来实现。
    • 这种方式适用于那些无法避免消息重复投递的场景,或者确保消费者的处理逻辑具备强大的鲁棒性和数据一致性。
  3. 消息确认和消息预取(Message Acknowledgment and Prefetching)
    • RabbitMQ 提供了消息确认机制,即消费者在处理完消息后需要显式地确认消息。确保消费者确认消息后再进行后续的处理操作,可以防止因为消费者未能成功处理消息而导致消息重复消费的问题。
    • 同时,通过合理设置消费者的预取数(Prefetch Count),可以控制消费者从队列中预先获取的消息数量。这可以帮助在一定程度上减少因消息处理过长或者消费者出现故障而导致的消息重复消费。

三、处理消息堆积

RabbitMQ 消息堆积问题指的是在消息队列中积累大量未被消费的消息,导致队列中的消息数量急剧增加,可能会对系统的性能和稳定性产生负面影响。这种问题通常由以下几个原因引起:

  1. 生产者速度快于消费者速度
    • 如果生产者生产消息的速度远快于消费者处理消息的速度,未被消费的消息会不断积累在队列中,最终导致队列中消息数量急剧增加,形成消息堆积。
    • 这种情况可能发生在消费者处理能力不足、消费者出现故障或者网络延迟等情况下。
  2. 消费者处理能力不足
    • 当消费者处理消息的速度跟不上生产者发送消息的速度时,未处理的消息会在队列中积累,最终导致消息堆积问题。
    • 消费者处理能力不足可能是因为消费者数量不足、消费者处理逻辑复杂或者消费者出现瓶颈等原因引起的。
  3. 队列设置不当
    • 队列的容量设置不当,例如队列的最大长度设置过小,无法应对高峰期的消息积压。

解决 RabbitMQ 消息堆积问题需要综合考虑生产者和消费者之间的消息流量控制、队列的监控与管理以及系统的容错处理。以下是一些常见的解决方法和策略:

  1. 增加消费者数量
    • 最直接的方法是增加消费者的数量,以提升消息处理的能力。通过增加消费者,可以分担队列中消息的处理压力,减少消息堆积的可能性。
    • 可以动态地根据系统负载情况或者队列中消息数量来调整消费者的数量。
  2. 优化消费者处理能力
    • 优化消费者的处理逻辑,确保消费者能够高效地处理消息。这包括优化数据库访问、提升算法效率、避免长时间阻塞操作等。
    • 使用并发处理和异步处理技术,充分利用多线程或者异步框架来提高消费者的并发处理能力。
  3. 调整队列的配置参数
    • 根据实际情况调整队列的容量限制、消息过期时间等参数,避免队列过于拥挤或者消息长时间积压。

四、有序消费消息

在消息队列系统中,通常情况下,消息在生产者发送到队列后,会按照 FIFO(先进先出)的原则被消费者处理。但是,当存在多个消费者同时消费同一个队列时,RabbitMQ 无法保证消息的严格顺序性,因为不同的消费者可能以不同的速度处理消息,导致消息的处理顺序可能被打乱。

在 RabbitMQ 中实现有序消费消息通常涉及以下几种方法和技术:

  1. 单队列单消费者
    • 最简单的方式是使用单队列和单消费者。RabbitMQ 会确保对于同一个队列,消息的投递顺序与其被消费的顺序一致。这意味着当消费者从队列中接收消息时,它们会按照它们被发送的顺序进行处理。
    • 在这种情况下,RabbitMQ 的默认行为会保证消息的有序性,只要消费者处理消息的速度足够快,就能保证消息的有序消费。
  2. 使用消息的顺序属性
    • 在生产者端,可以为每条消息添加一个标识其顺序的属性,例如序号或者时间戳。然后在消费者端,通过这些属性来排序和处理消息。
    • 消费者可以在消费消息之前先对消息进行排序,或者根据属性来判断是否需要延迟处理某些消息,以保证消息的有序性。
  3. 使用全局顺序化插件(RabbitMQ Global Ordered Queue)
    • RabbitMQ 提供了全局顺序化插件,它可以确保所有队列中的消息都按照一定的顺序被投递和消费。这个插件适用于一些需要强有序性的场景,但需要注意它可能会引入一些性能上的限制和开销。

五、实现延时队列

在常规的消息队列中,消息一旦发送到队列中,就会尽快被消费者获取和处理。然而,某些业务场景可能需要延迟发送消息,或者延迟消费消息,这时就需要使用延时队列来实现这一需求。

在 RabbitMQ 中实现延时队列通常涉及使用以下几种方法:

  1. 使用 TTL(Time-To-Live)和死信队列(Dead Letter Exchange)
    • RabbitMQ 支持设置消息的 TTL,即消息的存活时间。通过设置消息的 TTL,可以让消息在指定的时间段后过期,然后被发送到死信队列(Dead Letter Queue)。
    • 死信队列是一个特殊的队列,它接收所有因某些原因未能成功消费的消息,包括过期的消息。可以通过设置队列的 x-dead-letter-exchangex-dead-letter-routing-key 参数,将消息发送到指定的交换器和路由键,实现延时消息的转发和消费。
  2. 利用插件实现延时队列
    • RabbitMQ 社区提供了一些插件,例如 rabbitmq_delayed_message_exchange 插件,它能够在 RabbitMQ 中实现更精确的延时消息发送和消费。
    • 这种插件通过引入一个特殊的交换器类型(Delayed Message Exchange),可以让生产者在消息发送时指定一个延迟时间,消息将会在指定的延迟时间之后被交换器路由到相应的队列,然后被消费者处理。
  3. 使用定时器和消息轮询
    • 在消费者端,可以使用定时器或者消息轮询的方式,定期检查队列中的消息是否已经到达了处理时间。一旦消息到达处理时间,消费者就可以将其从队列中取出并处理。
    • 这种方法虽然没有直接利用 RabbitMQ 的内置功能,但适用于一些简单的延时消息处理需求,可以通过编码实现。

六、小结

在使用 RabbitMQ 构建消息队列系统时,关键要点包括确保消息的可靠性和完整性、实现消费者的幂等性、有效管理消息堆积、处理有序消息以及实现延时队列功能。通过合理配置和实施这些策略,可以提升系统的性能和可靠性,确保消息系统稳定运行。

推荐阅读

  1. Spring 三级缓存
  2. 深入了解 MyBatis 插件:定制化你的持久层框架
  3. Zookeeper 注册中心:单机部署
  4. 【JavaScript】探索 JavaScript 中的解构赋值
  5. 深入理解 JavaScript 中的 Promise、async 和 await

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/37393.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第100+13步 ChatGPT学习:R实现决策树分类

基于R 4.2.2版本演示 一、写在前面 有不少大佬问做机器学习分类能不能用R语言,不想学Python咯。 答曰:可!用GPT或者Kimi转一下就得了呗。 加上最近也没啥内容写了,就帮各位搬运一下吧。 二、R代码实现决策树分类 (…

【漏洞复现】宏景HCM人力资源信息管理系统——任意文件读取漏洞

声明:本文档或演示材料仅供教育和教学目的使用,任何个人或组织使用本文档中的信息进行非法活动,均与本文档的作者或发布者无关。 文章目录 漏洞描述漏洞复现测试工具 漏洞描述 宏景HCM人力资源信息管理系统是一款全面覆盖人力资源管理各模块…

docker pull 镜像的时候遇到Pulling fs layer问题

最近遇到一个很奇怪的问题,docker pull 镜像的时候,总是出现Pulling fs layer问题,导致镜像拉取不成功,以前是安装好docker,正常拉取镜像都是没什么问题的,在这里记录一下这个问题的解决方法,当然,可能并不通用。 1、进入阿里云容器服务 地址:https://cr.console.aliy…

【Django】网上蛋糕项目商城-热销和新品

概念 本文将完成实现项目的热销和新品两个分类的商品列表进行分页展示。 热销和新品功能实现步骤 在head.html头部页面中点击这两个超链接向服务器发送请求。 在urls.py文件中定义该请求地址 path(goodsrecommend_list/,views.goodsrecommend_list) 在views.py文件中定义g…

AV Foundation学习笔记二 - 播放器

ASSets AVFoundation框架的最核心的类是AVAsset,该类是整个AVFoundation框架设计的中心。AVAsset是一个抽象的(意味着你不能调用AVAsset的alloc或者new方法来创建一个AVAsset实例对象,而是通过该类的静态方法来创建实例对象)、不…

DevOps CMDB平台整合Jira工单

背景 在DevOps CMDB平台建设的过程中,我们可以很容易的将业务应用所涉及的云资源(WAF、K8S、虚拟机等)、CICD工具链(Jenkins、ArgoCD)、监控、日志等一次性的维护到CMDB平台,但随着时间的推移,…

PHP爬虫类的并发与多线程处理技巧

PHP爬虫类的并发与多线程处理技巧 引言: 随着互联网的快速发展,大量的数据信息存储在各种网站上,获取这些数据已经成为很多业务场景下的需求。而爬虫作为一种自动化获取网络信息的工具,被广泛应用于数据采集、搜索引擎、舆情分析…

关于组织赴俄罗斯(莫斯科)第 28 届国际汽车零部件、汽车维修设备和商品展览会商务考察的通知

关于组织赴俄罗斯(莫斯科) 第 28 届国际汽车零部件、汽车维修设备和商品展览会商务考察的通知 展会名称:俄罗斯(莫斯科)第 28 届国际汽车零部件、汽车零部件、汽车维修设备和商品展览会 时间:2024 年 8 月…

Python | Leetcode Python题解之第204题计数质数

题目: 题解: MX5000000 is_prime [1] * MX is_prime[0]is_prime[1]0 for i in range(2, MX):if is_prime[i]:for j in range(i * i, MX, i):#循环每次增加iis_prime[j] 0 class Solution:def countPrimes(self, n: int) -> int:return sum(is_prim…

【MongoDB】分布式数据库入门级学习

SueWakeup 个人主页:SueWakeup 系列专栏:为祖国的科技进步添砖Java 个性签名:保留赤子之心也许是种幸运吧 本文封面由 凯楠📸友情提供 凯楠📸 - 不夜长安 目录 MongoDB 相关 数据库排行榜单 MongoDB 中文官网 菜鸟…

如何把mkv转成mp4?介绍一下将mkv转成MP4的几种方法

如何把mkv转成mp4?如果你有一个MKV格式的视频文件,但是需要将其转换为MP4格式以便更广泛地在各种设备和平台上播放和共享,你可以通过进行简单的文件格式转换来实现。转换MKV到MP4格式可以提供更好的兼容性,并确保你的视频文件能够…

在预训练语言模型主流架构

文章目录 编码器-解码器架构因果解码器架构前缀解码器架构在预训练语言模型时代,自然语言处理领域广泛采用了预训练 + 微调的范式,并诞生了以 BERT 为代表的编码器(Encoder-only)架构、以 GPT 为代表的解码器(Decoder-only)架构和以 T5 为代表的编码器-解码器(Encoder-d…

React:tabs或标签页自定义右击菜单内容,支持内嵌iframe关闭菜单方案

React:tabs或标签页自定义右击菜单内容,支持内嵌iframe关闭菜单方案 不管是react、vue还是原生js,原理是一样的。 注意如果内嵌iframe情况下,iframe无法使用事件监听,但是可以使用iframe的任何点击行为都会往父级wind…

入门Java爬虫:认识其基本概念和应用方法

Java爬虫初探:了解它的基本概念与用途,需要具体代码示例 随着互联网的快速发展,获取并处理大量的数据成为企业和个人不可或缺的一项任务。而爬虫(Web Scraping)作为一种自动化的数据获取方法,不仅能够快速…

vue2(vue-cli3x[vue.config.js])使用cesium新版(1.117.0)配置过程

看来很多解决方法都没有办法,最后终于。呜呜呜呜 这里我用的是vue-cli去搭建的项目的vue2 项目,其实不建议用vue2搭配cesium。因为目前cesium停止了对vue2的版本更新,现在默认安装都是vue3版本,因此需要控制版本,否则…

Node.js简介

一:Node.js简介 Node.js是一个跨平台的JavaScript运行环境,使开发者可以搭建服务器端的JavaScript应用程序 作用:使用Node.js编写服务器端程序 编写数据接口,提供网页资源浏览功能有利于前端工程化,可以集成各种开发…

【力扣高频题】011. 盛最多水的容器

前面的算法文章,更新了许多 专题系列 。包括:滑动窗口、动态规划、加强堆、二叉树递归套路 等。 还没读过的小伙伴可以关注一下,在主页中点击对应链接查看哦~ 接下来的一段时间,将持续 「力扣高频题」 系列文章,想刷 …

idea2024使用springboot3.x系列新建java项目,使用jdk17,启动项目报错

身为一名开发人员,敲代码无数,竟被一个小小启动给我卡了大半天,太丢脸了 报错一:Field infoSysRepository in com.erectile.Impl.PersonalInfoServiceImpl required a bean of type ‘com.erectile.jpa.repository.InfoSysReposit…

IP地址与在线教育平台资源分配优化

IP地址的资源分配与优化策略可以帮助在线教育平台提供更高质量、稳定且个性化的教育服务。 IP地址作为网络设备的标识符,能够为在线教育平台提供有关学生地理位置和网络环境信息。通过对学生IP地址的分析,平台可以初步了解学生所在的地区、网络服务提供商…

回收站的照片删除了怎么找回?

大家在日常使用电脑的过程中,难免会遇到不小心删除重要文件的情况,尤其是珍贵的照片。当我们意识到误删照片时,第一反应通常是去回收站找回。然而,如果连回收站的照片都被删除了,该如何恢复呢?本文将详细探…