RabbitMQ高级篇

目录

确保发送者的可靠

为什么需要确保发送者的可靠性

RabbitMQ 的发送者重连机制配置 

springAMQP实现发送者确认 

MQ的可靠性

为什么需要实现MQ的可靠性?

数据持久化

Lazy Queue 

核心思想

总结RabbitMQ 如何保证消息的可靠性 

持久化

 Lazy Queue

消息确认机制

消费者的可靠性

消费者确认机制的核心原理

 消费者确认机制​编辑

none 模式(无确认)

manual 模式(手动确认)

auto 模式(自动确认)

失败重试机制 

 失败消息处理策略

测试不出来?如何解决?

业务幂处理

为什么会产生重复消费?

使用唯一标识符(Message ID)

业务判断 


确保发送者的可靠

为什么需要确保发送者的可靠性

避免消息丢失

  • 在分布式系统中,如果发送者未确认消息是否被 RabbitMQ 接收,可能会因为网络波动、RabbitMQ 服务异常等原因导致消息丢失。
  • 消息丢失会导致数据不一致或业务逻辑中断,尤其是在支付系统、订单处理等关键场景下

保证系统的可用性和稳定性

  • 可靠性确保了生产者和消息队列之间的通信稳定性,即使在系统压力较大时,消息仍然能够被妥善处理。

RabbitMQ 的发送者重连机制配置 

实际应用场景

临时网络波动

  • 当网络抖动时,短时间内可能会导致连接失败,通过重试机制可以自动恢复连接。

RabbitMQ 短暂不可用

  • RabbitMQ 重启或服务瞬间不可用时,通过重试可以避免消息丢失。

spring:rabbitmq:connection-timeout: 1s  # 设置连接超时时间template:retry:enabled: true       # 启用发送者重试机制initial-interval: 1000ms  # 初始重试间隔为 1000ms (1 秒)multiplier: 2       # 每次重试间隔时间加倍max-attempts: 3     # 最大重试 3 次

Spring 配置说明

  • spring.rabbitmq.connection-timeout
    配置与 RabbitMQ 建立连接时的超时时间,单位为毫秒或秒。在网络波动的情况下,如果连接超时,会触发重试机制。

  • spring.rabbitmq.template.retry.enabled
    开启 RabbitMQ 发送消息的重试功能。当发送消息失败时,系统会按照配置的重试规则重新发送。

重试参数()

multiplier为2的时候,相当于是计网当中CMSA/CD的截断二进制退避算法

  1. initial-interval
    第一次重试的间隔时间,单位为毫秒(ms)。比如配置为 1000ms,表示第一次失败后等待 1 秒开始重试。

  2. multiplier
    每次重试间隔时间的倍增系数。例如配置为 1,说明每次重试的间隔时间是固定的;如果配置为 2,那么每次间隔时间会加倍。

    计算公式

    下一次重试间隔 = initial-interval × multiplier
    

springAMQP实现发送者确认 

 


MQ的可靠性

为什么需要实现MQ的可靠性?

内存中消息会丢失

  • RabbitMQ 默认将消息存储在内存中(非持久化),以降低延迟。
  • 如果 MQ 宕机,内存中的消息会被清空,从而丢失。

内存空间有限,可能导致消息积压

  • 如果消费者处理过慢或不可用,内存中的消息无法被及时消费,消息会积压,占用 RabbitMQ 的内存。
  • 当内存使用达到限制时,RabbitMQ 可能会触发流控(Flow Control),甚至出现系统阻塞或崩溃RabbitMQ 会将这些消息被动地写入磁盘,以缓解内存压力。当队列中消息积压严重时,RabbitMQ 被迫将大量消息写入磁盘(被动持久化),磁盘的读写速度可能无法跟上队列的处理速度,从而引发 队列阻塞
  • 为了解决这个问题才有了同步持久化策略,不会导致消息队列阻塞

同步持久化策略

  • 消息到达时立即写入磁盘,消除了被动写入导致的延迟和瓶颈。
  • 优点:可靠性高,不会因磁盘写入延迟导致队列阻塞。
  • 缺点:性能略低,需要优化磁盘性能来减小延迟。

数据持久化

Spring AMQP 中,默认的可靠性措施已经覆盖了 队列持久化自动重连,你只需要在以下两种情况下手动配置:

确保消息可靠传递和存储

  • 需要显式声明消息为持久化(PERSISTENT)。
  • 需要注意的是,在java代码中我们需要自定义构建消息来发送临时消息。
     public void sendPersistentMessage(String exchange, String routingKey, String messageContent) {// 创建持久化消息Message message = MessageBuilder.withBody(messageContent.getBytes()).setDeliveryMode(MessageProperties.DeliveryMode.PERSISTENT) // 设置为持久化.build();// 发送消息rabbitTemplate.send(exchange, routingKey, message);System.out.println("Sent persistent message: " + messageContent);}

Lazy Queue 

Lazy Queue 是 RabbitMQ 的一种队列模式,主要用于优化 消息持久化策略 的缺点和高消息积压场景下的性能问题。它将消息尽可能存储在磁盘上,而不是内存中,从而减少内存占用和避免内存不足导致的队列阻塞。

Lazy Queue 设计的目标

Lazy Queue 是为了解决上述问题的一种优化方案,其目标是最大限度地减少内存占用,优先使用磁盘存储消息,从而解决持久化策略的缺点。

核心思想

消息直接写入磁盘

  • Lazy Queue 会在消息到达时直接将其存储在磁盘上,而不是先存储在内存中。
  • 只有在需要消费时,才会将消息从磁盘加载到内存中。

延迟加载消息

  • 消息只有在消费者需要处理时才会被加载到内存中,而不是全部预加载。
  • 这减少了内存占用,同时避免了磁盘与内存之间频繁切换的问题。

总结RabbitMQ 如何保证消息的可靠性 

持久化
  • 确保消息(需要手动配置)、队列和交换机在 RabbitMQ 重启后依然存在。
  • 优点:保障消息不会因服务中断而丢失。
  • 缺点:磁盘 IO 开销增加,可能影响性能。
 Lazy Queue
  • 解决持久化策略中内存占用高的问题,消息优先存储在磁盘中
  • 优点:减少内存消耗,适合消息积压场景。
  • 缺点:磁盘 IO 性能略低于内存操作,不适合高吞吐实时场景。
消息确认机制
  • 保证消息从生产者到 RabbitMQ 的可靠传递。
  • 优点生产者可以通过 ACK 确认消息是否成功到达 RabbitMQ。一般是可以成功到达除是程序员配置错误。因此我们可以根据实际情况选择。
  • 缺点:增加网络和确认延迟。


消费者的可靠性

消费者确认机制的核心原理

消费者确认机制的目的是确保消息被成功消费后,RabbitMQ 可以安全地将消息从队列中移除。如果消息处理失败,则可以选择重新投递或丢弃,以实现可靠的消息消费。

确认机制的三种状态

ACK(Acknowledgement)

  • 描述:消息已成功处理,通知 RabbitMQ 可以安全删除消息。
  • 结果:RabbitMQ 将消息从队列中移除。

NACK(Negative Acknowledgement)

  • 描述:消息处理失败,消费者通知 RabbitMQ 需要重新投递消息。
  • 结果:消息重新进入队列,供其他消费者或当前消费者再次处理。

REJECT

  • 描述:消息处理失败,但消费者明确表示拒绝消息。例如:格式问题
  • 结果:消息从队列中删除,不会重新投递。

 消费者确认机制

在 RabbitMQ 的消费者确认机制中,消费者需要通知 RabbitMQ 消息是否被成功处理。Spring AMQP 通过 acknowledge-mode 提供了三种消息确认模式:

none 模式(无确认)

特点

  • 消息推送到消费者后,RabbitMQ 会立即将其标记为已确认,无论消息是否被成功处理。
  • 极不安全,因为如果消费者出现异常或宕机,消息可能会丢失。

适用场景:不推荐使用,除非对消息丢失完全没有影响。


manual 模式(手动确认)

特点

  • 消费者需要手动调用 RabbitMQ 提供的 API(如 basicAckbasicNackbasicReject)来确认消息是否被成功处理。
  • 优点:可以精确控制消息的确认时机,适合对可靠性要求高的场景。

实现方式

  • @RabbitListener 方法中捕获异常,根据业务逻辑决定发送 ACKNACKREJECT

auto 模式(自动确认)

特点

  • Spring AMQP 默认使用 AOP 对消息处理方法进行代理,消息处理成功后会自动发送 ACK
  • 如果方法抛出异常,Spring AMQP 会自动发送 NACKREJECT
  • 适用场景:适合消息可靠性要求不高、处理逻辑较简单的场景。

实现方式:无需手动确认,Spring 会根据方法执行结果自动处理。

spring:rabbitmq:host:  # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码listener:simple:prefetch: 1 # 控制消费者预取的消息数量,处理完一条再处理acknowledge-mode: auto

消费者处理失败时,消息被 NACK 并重新入队,RabbitMQ 会不断尝试将消息重新投递给消费者,导致消息反复处理失败,最终对 RabbitMQ 和服务产生巨大的负担。因此我们可以通过设置失败重试机制,限制消息重试次数

失败重试机制 

Spring AMQP 提供了 重试机制,可以限制消费者的最大重试次数。如果消息在多次重试后仍然失败,可以转移到死信队列或记录到日志中。

如果重试了最大次数了之后还是不能成功会直接把消息丢弃了,因此我们还需要配置重试失败处理策略。

 失败消息处理策略

Spring AMQP 提供了三种常见的失败消息处理策略:

  1. RejectAndDontRequeueRecoverer(默认策略):丢弃消息。
  2. ImmediateRequeueMessageRecoverer:消息重新入队。
  3. RepublishMessageRecoverer:将消息转移到指定的交换机。原来的队列中不会再有这个消息

选择建议

  • 非关键场景:使用默认的 RejectAndDontRequeueRecoverer
  • 临时错误场景:使用 ImmediateRequeueMessageRecoverer,重新入队。
  • 关键场景:使用 RepublishMessageRecoverer,将消息投递到死信队列或专用的交换机。

 配置消费者重试机制 

spring:rabbitmq:listener:simple:retry:enabled: truemax-attempts: 5              # 最大重试次数initial-interval: 1000ms     # 初始重试间隔multiplier: 2.0              # 每次重试间隔倍增max-interval: 10000ms        # 最大重试间隔

结合 MessageRecoverer

在重试失败时,指定使用的消息恢复策略:

@Configuration
public class ErrorMessageConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct"); // 声明一个 Direct 类型的交换机,名称为 "error.direct"}@Beanpublic Queue errorQueue() {return new Queue("error.queue",true); // 声明一个队列,名称为 "error.queue"}@Beanpublic Binding errorQueueBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with ("error"); // 将队列 "error.queue" 和交换机 "error.direct" 绑定,路由键为 "error"}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {System.out.println("执行了消息恢复器!!!");return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");// 配置一个消息恢复器,将消费失败的消息重试多次后投递到 "error.direct" 交换机,路由键为 "error"}
}
测试不出来?如何解决?

如果你和代码写的一样或者代码没有明显的错误,但是就是测试不出来。

可以选择将simple.queue删除之后,在重新新建并且重新发送消息。

在 RabbitMQ 中,Purge Messages 只会清空处于 Ready 状态(也就是未被消费者接收或未投递中的消息)。如果有消息已经分配给消费者,处于 Unacked 状态,RabbitMQ 并不会通过 Purge 操作将它们从队列里移除。也就是说,Purge 并不能清除还在进行中的消费或处于 Unacked 状态的消息

因此,当你点击 Purge Messages 时,如果队列里大部分消息都处于 Unacked 状态(可能由于消费者一直没 ACK),Purge 操作其实没有把这些 Unacked 消息移除。它们依然存在,消费者再次重连或者容器重启时,RabbitMQ 还会继续投递。

反之,当你删除并重新创建这个队列时,RabbitMQ 会彻底销毁所有该队列关联的消息(不管它们是 Ready 还是 Unacked),然后新建一个空队列,之前的消息自然就不复存在,手动删队列再建”可以成功清空消息,而 Purge 不行的现象。

业务幂处理

幂等性 是指在分布式系统中,某一操作无论被执行一次还是多次,其产生的效果是相同的。在使用 RabbitMQ 或其他消息队列时,消费者可能会因重试、重复投递等原因处理相同的消息多次。为了保证系统数据的一致性和可靠性,需要解决幂等性问题。

为什么会产生重复消费?

RabbitMQ 和其他消息队列可能会因为以下原因导致消息重复消费:

消息重试机制:

  • 消费者处理失败时,消息会被重新投递。
  • 例如:消息超时、消费者抛出异常。

网络问题:

  • 如果 RabbitMQ 没有收到消费者的 ACK,会认为消息未被成功处理,并重新投递。

消息重复发送:

  • 生产者在发送消息时,如果未正确处理确认机制,可能会发送相同的消息多次。

使用唯一标识符(Message ID)

生产端设置唯一消息 ID

  • 在生产消息时,为每条消息生成一个唯一的 ID(通常是 UUID)。
  • Message ID 会作为消息的属性,和消息一起发送到 RabbitMQ。

消费端接收唯一消息 ID

  • 消费者在处理消息时,从消息的属性中提取 Message ID
  • 利用这个 Message ID 在数据库或缓存(Redis)中查询是否已处理过该消息。

去重逻辑

  • 如果 Message ID 已存在,说明消息是重复的,直接忽略不处理。
  • 如果 Message ID 不存在,说明消息是新的,正常处理,并将 Message ID 标记为已处理。
  • 如果消息只需短期去重(如几天内不重复):使用 Redis,并设置合理的过期时间。
  • 如果需要长期保存处理状态:使用 MySQL 或其他关系型数据库,结合唯一约束实现幂等性。

 配置消息转换器

通过 Jackson2JsonMessageConverter 设置消息的 Message ID

 作用:

  • Jackson2JsonMessageConverter 会在消息体序列化为 JSON 时自动生成一个唯一的 Message ID,并附加到消息属性中。
  • 如果消息体已经包含一个业务 ID(如订单号),也可以使用该 ID 作为去重标识。
@Bean
public MessageConverter messageConverter() {Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();jjmc.setCreateMessageIds(true); // 自动生成消息 IDreturn jjmc;
}

消费者代码

在消费者中,通过 MessageProperties 获取消息的唯一 ID,并处理去重逻辑:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(Message message) {// 获取消息的唯一 IDString messageId = message.getMessageProperties().getMessageId();String body = new String(message.getBody());log.info("监听simple.queue的消息. ID: [{}]", messageId);log.info("监听simple.queue的消息: [{}]", body);// 模拟处理逻辑throw new RuntimeException("我要故意出错!");// 存入数据库或者Redis当中
}


业务判断 

核心思路

根据业务的唯一标识符(如 orderId)查询业务状态:

  • 消费者处理消息时,通过 orderId 查询订单信息。

判断订单状态是否已被处理:

  • 如果订单状态表明该订单已经处理过(如状态为已支付),则直接忽略当前消息。
  • 如果订单状态表明该订单尚未处理,则正常处理消息。

更新业务状态:

  • 在消息成功处理后,将订单状态标记为已处理(如更新为“已支付”状态)。
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.direct"),key = "pay.success"
))
public void listenPaySuccess(Long orderId) {// 1. 查询订单Order order = orderService.getById(orderId);// 2. 判断订单状态,是否为未支付if (order == null || order.getStatus() != 1) {// 如果订单不存在,或者状态不为“待支付”(1),则直接忽略return;}// 3. 标记订单状态为已支付orderService.markOrderPaySuccess(orderId);
}

基于业务判断实现幂等性的主要局限性是强依赖于业务状态字段的设计,并且在动态数据、高并发场景下可能难以准确实现幂等性控制。例如,怎么保证库存的幂等性?这种业务判断不能保证库存这种动态数据的变化。因此,在实际项目中,业务判断通常作为一种基础手段,结合唯一消息 ID、分布式锁、缓存或事务机制共同实现幂等性,才能保证系统的可靠性和高效性。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/67883.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序用的SSL证书有什么要求吗?

微信小程序主要建立在手机端使用,然而手机又涉及到各种系统及版本,所以对SSL证书也有要求,如果要小程序可以安全有效的访问需要满足以下要求: 1、原厂SSL证书(原厂封)。 2、DV单域名或者DV通配符。 3、兼…

OpenCV计算机视觉 07 图像的模块匹配

在做目标检测、图像识别时,我们经常用到模板匹配,以确定模板在输入图像中的可能位置 API函数 cv2.matchTemplate(image, templ, method, resultNone, maskNone) 参数含义: image:待搜索图像 templ:模板图像 method&…

uniapp开发u-icon图标不显示问题

uniapp开发图标用u-icon不显示,换成uv-icon就可以了 插件市场从这里下载:uv-ui 破釜沉舟之兼容vue32、app、h5、小程序等多端,灵活导入,利剑出击 - DCloud 插件市场 组件库看这个:介绍 | 我的资料管理-uv-ui 是全面兼…

ELK的搭建

ELK elk:elasticsearch logstatsh kibana统一日志收集系统 elasticsearch:分布式的全文索引引擎点非关系型数据库,存储所有的日志信息,主和从,最少需要2台 logstatsh:动态的从各种指定的数据源,获取数据…

【git】-2 分支管理

目录 一、分支的概念 二、查看、创建、切换分支 1、查看分支-git branch 2、创建分支- git branch 分支名 3、切换分支- git checkout 分支名 三、git指针 -实现分支和版本间的切换 四、普通合并分支 git merge 文件名 五、冲突分支合并 ​​​​​​【git】-初始gi…

【项目】修改远程仓库地址、报错jdk

一、修改远程仓库地址 进入你刚刚克隆到本地的仓库目录&#xff0c;执行以下命令来修改远程仓库的 URL&#xff0c;将其指向你自己的新仓库&#xff1a; cd 原仓库名 git remote set-url origin <你自己的新仓库的 Git 地址>补充&#xff1a; 错误分析&#xff1a; wa…

实训云上搭建集群

文章目录 1. 登录实训云1.1 实训云网址1.2 登录实训云 2. 创建网络2.1 网络概述2.2 创建步骤 3. 创建路由器3.1 路由器名称3.1 创建路由器3.3 查看网络拓扑 4. 连接子网5. 创建虚拟网卡5.1 创建原因5.2 查看端口5.3 创建虚拟网卡 6. 管理安全组规则6.1 为什么要管理安全组规则6…

vue3+ts+element-plus 输入框el-input设置背景颜色

普通情况&#xff1a; 组件内容&#xff1a; <el-input v-model"applyBasicInfo.outerApplyId"/> 样式设置&#xff1a; ::v-deep .el-input__wrapper {background-color: pink; }// 也可以这样设置 ::v-deep(.el-input__wrapper) {background-color: pink…

直线模组中导轨和滑块松动如何处理?

直线模组抖动是直线模组在日常运用中比较常见的一种异常行为&#xff0c;直线模组的抖动对精度和寿命都会产生严重影响。直线模组出现抖动通常是由于导轨和滑块之间的摩擦、松动或不平衡所引起的。那么&#xff0c;针对直线模组中导轨和滑块存在的松动问题&#xff0c;可以采取…

Python 写的 智慧记 进销存 辅助 程序 导入导出 excel 可打印 Pyside6版

图 这图是第2版, 现在发布原型版 代码: order_system_pyside6.py from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,QHBoxLayout, QLabel, QLineEdit, QPushButton, QMessageBox,QTableWidget, QTableWidgetItem, QComboBox, QFrame,QH…

element upload上传图片,上传完成隐藏组件或者禁用上传

背景&#xff1a; 在项目开发&#xff0c;需要上传图片&#xff0c;一张或者多张。当上传1张图片时&#xff0c;upload组件有一张图片时&#xff0c;组件自带的disabletrue设置为true禁用上传&#xff0c;就不会触发上传接口了&#xff0c;但是还是可以点开图片进行选择&#x…

【2024年华为OD机试】 (A卷,100分)- 二元组个数(Java JS PythonC/C++)

一、问题描述 以下是题目描述的 Markdown 格式&#xff1a; 题目描述 给定两个数组 a 和 b&#xff0c;若 a[i] b[j]&#xff0c;则称 [i, j] 为一个二元组。求在给定的两个数组中&#xff0c;二元组的个数。 输入描述 第一行输入 m&#xff0c;表示第一个数组的长度。第二…

Homebrew 【MAC安装软件利器】

1、brew介绍 Homebrew游来: Homebrew 的诞生源于一个年轻程序员的不满和创新。2009 年&#xff0c;Max Howell 当时是一名在苹果公司工作的程序员。他觉得在 Mac 上安装和管理开源软件特别麻烦&#xff0c;常常需要手动下载源代码、解决依赖关系、编译安装&#xff0c;过程繁琐…

AOP实现操作日志记录

文章目录 1.common-log4j2-starter1.目录2.pom.xml 引入依赖3.LogAspect.java4.Log4j2AutoConfiguration.java Log4j2自动配置类条件注入切面类 2.common-log4j2-starter-demo 测试1.目录2.application.yml 启用日志切面3.TraceController.java4.结果 1.common-log4j2-starter …

JavaEE之线程池

前面我们了解了多个任务可以通过创建多个线程去处理&#xff0c;达到节约时间的效果&#xff0c;但是每一次的线程创建和销毁也是会消耗计算机资源的&#xff0c;那么我们是否可以将线程进阶一下&#xff0c;让消耗计算机的资源尽可能缩小呢&#xff1f;线程池可以达到此效果&a…

YOLOv11改进,YOLOv11添加HAttention注意机制用于图像修复的混合注意力转换器,CVPR2023,超分辨率重建

摘要 基于Transformer的方法在低层视觉任务中表现出色,例如图像超分辨率。然而,作者通过归因分析发现,这些网络只能利用有限的空间范围的输入信息。这意味着现有网络尚未充分发挥Transformer的潜力。为了激活更多的输入像素以获得更好的重建效果,作者提出了一种新型的混合…

Shader -> SweepGradient扫描渐变着色器详解

XML文件 <com.example.myapplication.MyViewxmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_gravity"center"android:layout_height"400dp"/>自定义View代码 c…

LabVIEW调用不定长数组 DLL数组

在使用 LabVIEW 调用 DLL 库函数时&#xff0c;如果函数中的结构体包含不定长数组&#xff0c;直接通过 调用库函数节点&#xff08;Call Library Function Node&#xff09; 调用通常会遇到问题。这是因为 LabVIEW 需要与 DLL 中的数据结构完全匹配&#xff0c;而包含不定长数…

IOS开发如何从入门进阶到高级

针对iOS开发的学习&#xff0c;不同阶段应采取不同的学习方式&#xff0c;以实现高效提升.本文将iOS开发的学习分为入门、实战、进阶三个阶段&#xff0c;下面分别详细介绍. 一、学习社区 iOS开源中国社区 这个社区专注于iOS开发的开源项目分享与协作&#xff0c;汇集了大量开…

Next.js 实战 (七):浅谈 Layout 布局的嵌套设计模式

业务场景 在目前常见的中后台管理系统中&#xff0c;比较常见的是固定的布局方式包裹页面&#xff0c;但一些特殊页面&#xff0c;比如&#xff1a;登录页面、注册页面、忘记密码页面这些页面是不需要布局包裹的。 但在 Next.js AppRouter 中&#xff0c;必须包含一个根布局文…