【RabbitMQ】一文详解消息可靠性

目录:

1.前言

2.生产者

3.数据持久化

4.消费者

5.死信队列

1.前言

RabbitMQ 是一款高性能、高可靠性的消息中间件,广泛应用于分布式系统中。它允许系统中的各个模块进行异步通信,提供了高度的灵活性和可伸缩性。然而,这种通信模式也带来了一些挑战,其中最重要的之一是确保消息的可靠性

影响消息可靠性的因素主要有以下几点:

  • 发送消息时连接RabbitMQ失败
  • 发送时丢失:
    • 生产者发送的消息未送达交换机;
    • 消息到达交换机后未到达队列;
  • MQ 宕机,队列中的消息会丢失;
  • 消费者接收到消息后未消费就宕机了。

2.生产者

2.1.生产者重连机制

生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,RabbitMQ提供的消息发送时的重连机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

在生产者yml文件添加配置开启重连机制

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。但是RabbitMQ提供的重试机制是阻塞式的重试。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,就需要合理配置等待时长和重试次数,或者使用异步线程来执行发送消息的代码

2.2.生产者确认机制

RabbitMQ的生产者确认机制(Publisher Confirm)是一种确保消息从生产者发送到MQ过程中不丢失的机制。当消息发送到 RabbitMQ 后,系统会返回一个结果给消息的发送者,表明消息的处理状态。这个结果有两种可能的值:

返回结果有两种方式:

  • publisher-confirm(发送者确认)
    • 消息成功投递到交换机,返回ACK。
    • 消息未投递到交换机,返回NACK。(可能是由于网络波动未能连接到RabbitMQ,可利用生产者重连机制解决)
  • publisher-return(发送者回执)
    • 消息投递到交换机了,但是没有路由到队列。返回ACK和路由失败原因。(这种问题一般是因为路由键设置错误,可以人为规避)

通过这种机制,生产者在发送消息后获取返回的回执结果,从而采取对应的策略,如消息重发或记录失败信息。

3.数据持久化

3.1.配置持久化

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题

  1. RabbitMQ宕机,存在内存中的消息会丢失。
  2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞。

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。RabbitMQ可以通过配置数据持久化,从而将消息保存在磁盘,包括:

  • 交换机持久化(确保RabbitMQ重启后交换机仍然存在)
  • 队列持久化(确保RabbitMQ重启后队列仍然存在)
  • 消息持久化(确保RabbitMQ重启后队列中的消息仍然存在)

由于Spring会在创建队列时默认将交换机和队列设置为持久化,发送消息时也默认指定消息为持久化消息,因此不需要额外配置。

// 将消息指定为持久化消息
Message message = MessageBuilder.withBody("hello".getBytes(standardcharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 给队列发送消息
rabbitTemplate.convertAndSend("simple.queue", message);

3.2.惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

惰性队列的特点如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)

  • 消费者要消费消息时才会从磁盘中读取并加载到内存

  • 支持数百万条的消息存储

对于低于3.12版本的情况,可以使用注解的arguments来指定

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "grade.queue", durable = "true"),exchange = @Exchange(name = "intel.topic", type = ExchangeTypes.TOPIC),key = "intel.grade",arguments = @Argument(name = "x-queue-mode", value = "lazy")))

3.3.为什么需要数据持久化?

数据持久化在 RabbitMQ 中有以下重要作用:

队列和交换机的持久化:

  • 防止重启后丢失:将队列和交换机设置为持久化,可以防止 RabbitMQ 服务器重启后丢失这些队列和交换机,确保它们的存在和绑定关系保持不变。

消息的持久化:

  1. 安全性
    • 防止数据丢失:消息持久化后,可以防止 RabbitMQ 服务器重启或宕机时数据丢失,方便数据恢复,保证消息的可靠性和耐久性。
  2. 性能
    • 内存管理:未持久化的临时消息默认存储在内存中。内存空间有限,大量消息涌入时会导致内存占满,系统需要进行 page out 操作将消息写入磁盘。频繁的 page out 操作会严重影响性能。
    • 预防内存溢出:通过持久化消息,可以缓解内存压力,防止因内存溢出导致的系统性能问题和崩溃。

4.消费者

4.1.消费者确认机制

为了确认消费者是否正确处理了消息,RabbitMQ提供了消费者确认机制。当消费者处理消息后,会返回回执信息给RabbitMQ。回执有三种值:

  • ack:消息处理成功,RabbitMQ从队列中删除消息。
  • nack:消息处理失败,RabbitMQ需要再次投递消息。
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除消息。

在SpringBoot项目中,我们可以通过配置文件选择回执信息的处理方式,一共有三种处理方式:

  • none:不处理。RabbitMQ 假定消费者获取消息后会一定会成功处理,因此消息投递后立即返回ack,将消息从队列中删除。

  • manual:手动模式。需要在业务代码结束后,调用SpringAMQP提供的API发送ackreject,存在代码侵入问题,但比较灵活。

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑进行了环绕增强,返回结果如下:

    • 如果消费者正常处理消息,自动返回ack并删除队列的消息。

    • 如果消费者消息处理失败,自动返回nack并重新向消费者投递消息。

    • 如果消息校验异常,自动返回reject并删除队列中的消息。

注意: 手动模式返回回执消息时通常需要显式指定requeue参数,当requeue=true时,表明消息需要重新入队;当requeue=false时,RabbitMQ将从队列删除消息。

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto,自动 ack

4.2.消息失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力。

可以通过设置yml文件开启失败重试机制,在消息异常时利用本地重试,而不是无限制的进行requeue操作。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false 有状态。如果业务中包含事务,这里改为 false

4.3.消息失败处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接reject,丢弃消息,这是默认采取的方式;
  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回nack,消息重新入队;
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

5.死信队列

尽管通过以上设置可以确保消息在生产者、消息队列和消费者之间的传递过程中不会丢失,但在某些情况下,消费者仍可能无法成功处理消息(如消息重试次数耗尽后仍无法被消费)。这时候,我们需要一个机制来妥善处理这些无法被正常消费的消息。死信队列便是用于解决这一问题的兜底机制。

5.1.死信

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消息被拒绝: 当消费者明确拒绝一个消息并且设置不再重新入队(requeue=false)时,这个消息会被标记为死信。
  • 消息过期: 每个消息或队列可以设置一个TTL(Time-To-Live),即消息的存活时间。如果消息在队列中停留的时间超过了这个TTL,消息会被认为过期,并被转移到死信队列。
  • 队列达到最大长度: 如果队列设置了最大长度并且达到了这个限制,那么新进入的消息会被转移到死信队列中。

5.2.创建死信队列

5.2.1.创建死信交换机和死信队列

正常使用注解,创建交换机和队列即可

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dead.queue", durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")),exchange = @Exchange(name = "dead.exchange", type = ExchangeTypes.TOPIC),key = "dead.key"
))
public void deadLetterQueue(String msg) {System.out.println("您的消息已经死亡:" + msg);
}
5.2.2.绑定死信交换机

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

可以通过@Argument注解指定死信交互机和路由键,如下。

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "simple.queue", durable = "true",arguments = {@Argument(name = "x-queue-mode", value = "lazy"),@Argument(name = "x-dead-letter-exchange", value = "dead.exchange"),@Argument(name = "x-dead-letter-routing-key", value = "dead.key")}),exchange = @Exchange(name = "simple.topic",type = ExchangeTypes.TOPIC),key = "simple.key"))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/45985.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

.NET MAUI开源架构_1.学习资源分享

最近需要开发Android的App,想预研下使用.NET开源架构.NET MAUI来开发App程序。因此网上搜索了下相关资料,现在把我查询的结果记录下,方便后面学习。 1.官方文档 1.1MAUI官方学习网站 .NET Multi-Platform App UI 文档 - .NET MAUI | Micro…

Open-TeleVision——通过VR沉浸式感受人形机器人视野:兼备远程控制和深度感知能力

前言 7.3日,我司七月在线(集AI大模型职教、应用开发、机器人解决方案为一体的科技公司)的「大模型机器人(具身智能)线下营」群里的一学员发了《Open-TeleVision: Teleoperation with Immersive Active Visual Feedback》这篇论文的链接,我当时快速看了一…

shell脚本之if/case语句

一、条件测试 1、1 返回码 $? $? :返回码,用来判断命令或者脚本是否执行成功。 0 :表示true ,成功;非0 则表示flase ,失败。 1、2 test命令 可以进行条件测试,然后根据返回值来判断条件是否成立 -e…

RISC-V异常处理流程概述(2):异常处理机制

RISC-V异常处理流程概述(2):异常处理机制 一、异常处理流程和异常委托1.1 异常处理流程1.2 异常委托二、RISC-V异常处理中软件相关内容2.1 异常处理准备工作2.2 异常处理函数2.3 Opensbi系统调用的注册一、异常处理流程和异常委托 1.1 异常处理流程 发生异常时,首先需要执…

4、linux相关基础知识

1、gcc编译过程 .c通过编译生成.o文件,.out目标文件进过链接生成.so库文件。 2、在C中可以使用system(("mkdir -p "path).c_str())创建目录。c_str()把string转化为c字符串,便于system命令识别,system命令会新启动一个进程来创建文…

移动硬盘有盘符打不开:深度解析与高效恢复指南

在数字化信息爆炸的今天,移动硬盘作为便捷的数据存储与传输工具,其重要性不言而喻。然而,当您遇到移动硬盘有盘符却无法正常打开的情况时,无疑会给您的工作和生活带来不小的困扰。本文将深入探讨移动硬盘有盘符打不开的原因&#…

东软“引战”国家队 通用技术“补链”大国重器

向来低调温和的东软创始人刘积仁,这一次抛出了“王炸”级的资产交易。 7月3日,《多肽链》获得一则足以引爆国内医疗设备行业的投资信息:被东软集团视为核心资产、掌上明珠的东软医疗,成功引入通用技术集团资本有限公司与中国国有…

BI佐罗,居然抄袭洗稿我的文章

必须曝光此博主不当行径。 4月2日这天发表的原创文章:BI报表系统建设10大坑,因为都是切身的实际项目经验总结,获得了很多人的关注。 我觉得写文章要写的是亲身、真的做过的专业的项目经验,而不是信口开河随口忽悠。 如果有些博…

Fancybox: 号称世界上最流行的灯箱脚本!这款“花盒“为什么与众不同?

今天要分享的是一个灯箱脚本库:Fancybox。最近了不起刚好用到它。这里就和大家分享下。 简介 Fancybox 是终极的 JavaScript 灯箱替代品,为多媒体显示中的优质用户体验设定了标准。支持图像、视频、地图、内联内容、iframe 和任何其他 HTML 内容。 此…

如何在SpringCloud中使用Kafka Streams实现实时数据处理

使用Kafka Streams在Spring Cloud中实现实时数据处理可以帮助我们构建可扩展、高性能的实时数据处理应用。Kafka Streams是一个基于Kafka的流处理库,它可以用来处理流式数据,进行流式计算和转换操作。 下面将介绍如何在Spring Cloud中使用Kafka Streams实…

Pytorch中nn.Sequential()函数创建网络的几种方法

1. 创作灵感 在创建大型网络的时候,如果使用nn.Sequential()将几个有紧密联系的运算组成一个序列,可以使网络的结构更加清晰。 2.应用举例 为了记录nn.Sequential()的用法,搭建以下测试网络&…

数字电路-建立时间和保持时间详解

对于数字系统而言,建立时间(setup time)和保持时间(hold time)是数字电路时序的基础。数字电路系统的稳定性,基本取决于时序是否满足建立时间和保持时间。我自己在初学时一度很难理解清楚他们的概念&#x…

基于JAVA+SpringBoot+Vue+uniApp小程序的心理健康测试平台

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取项目下载方式🍅 一、项目背景介绍: 该系统由三个核心角色…

【PVE】新增2.5G网卡作为主网卡暨iperf测速流程

【PVE】新增2.5G网卡作为主网卡暨iperf测速流程 新增网卡 新增网卡的首先当然需要关闭PVE母机,把新网卡插上,我用淘宝遥现金搞了个红包,花了26元买了块SSU的2.5G网卡。说实话这个价位连散热片都没有,确实挺丐的。稍后测下速度看…

移动硬盘有盘符打不开的全方位解决方案

一、现象描述:移动硬盘有盘符却无法访问 在日常的数据存储与传输中,移动硬盘无疑扮演着举足轻重的角色。然而,不少用户可能会遇到这样一个令人头疼的问题:移动硬盘在连接电脑后,虽然能正常显示盘符,但双击…

【算法】单调队列

一、什么是单调队列 单调队列是一种数据结构,其特点是队列中的元素始终保持单调递增或递减,主要用于维护队列中的最小值或最大值。 不同于普通队列只能从队头出队、队尾入队,单调队列为了维护其特征,还允许从队尾出队 不管怎么…

深入Linux:权限管理与常用命令详解

文章目录 ❤️Linux常用指令🩷zip/unzip指令🩷tar指令🩷bc指令🩷uname指令🩷shutdown指令 ❤️shell命令以及原理❤️什么是 Shell 命令❤️Linux权限管理的概念❤️Linux权限管理🩷文件访问者的分类&#…

【微信小程序知识点】getApp()全局数据共享,页面间通信,组件间通信

getApp()-全局数据共享 在小程序中,可以通过getApp()方法获取到小程序全局唯一的App实例。因此在App()方法中添加全局共享的数据,方法,从而实现页面,组件的数据传值。 // app.js App({//全局共享的数据globalData: {token: &qu…

力扣每日一题:3011. 判断一个数组是否可以变为有序

力扣官网:前往作答!!!! 今日份每日一题: 题目要求: 给你一个下标从 0 开始且全是 正 整数的数组 nums 。 一次 操作 中,如果两个 相邻 元素在二进制下数位为 1 的数目 相同 &…

Cxx Primer-CP-2

开篇第一句话足见作者的高屋建瓴:类型决定程序中数据和操作的意义。随后列举了简单语句i i j;的意义取决于i和j的类型。若它们都是整形,则为通常的算术意义。若它们都为字符串型,则为进行拼接操作。若为用户自定义的class类型,则…