免费学建筑知识网站/网站收录优化

免费学建筑知识网站,网站收录优化,网站建设公司客户开发手册,idc机房托管费用RabbitMQ 核心功能 RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客 RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)-CSDN博客 前言 最近再看 RabbitMQ,看了看自己之前写…

RabbitMQ 核心功能

RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客

RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)-CSDN博客


前言

最近再看 RabbitMQ,看了看自己之前写的博客,诶,一言难尽,当时学的懵懵懂懂的。这里重新整理 RabbitMQ 的核心功能。

在分布式系统中,消息队列是实现异步通信、解耦服务的关键组件。RabbitMQ 作为一款功能强大的消息队列,其消息可靠性是确保系统稳定运行的重要因素。这里将深入探讨 RabbitMQ 的消息确认机制、持久化策略、发送方确认机制以及重试机制!!


一、消息确认机制

1.1 消息确认机制概述

生产者发送消息到消费端后,可能出现消息处理成功或异常的情况。如果 RabbitMQ 在发送消息后就将其删除,当消息处理异常时,就会造成消息丢失。为了确保消费端成功接收并正确处理消息,RabbitMQ 提供了消息确认机制(message acknowledgement)。

消费者在订阅队列时,可以指定autoAck参数,根据该参数设置,消息确认机制分为自动确认和手动确认两种:

  • 自动确认(autoAck=true):RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正消费到了这些消息。这种模式适合对消息可靠性要求不高的场景。
  • 手动确认(autoAck=false):RabbitMQ 会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息。这种模式适合对消息可靠性要求比较高的场景。

以下是basicConsume方法的定义:

/*** Start a non-nolocal, non-exclusive consumer, with* a server-generated consumerTag.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param callback an interface to the consumer object* @return the consumerTag generated by the server* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

1.2 手动确认方法

消费者在收到消息之后,可以选择确认、直接拒绝或者跳过,RabbitMQ 提供了三种不同的确认应答方式:

  • 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple):RabbitMQ 已知道该消息并且成功处理消息,可以将其丢弃。
    • deliveryTag:消息的唯一标识,是一个单调递增的 64 位长整型值,每个通道(Channel)独立维护,所以在每个通道上都是唯一的。当消费者确认(ack)一条消息时,必须使用对应的通道进行确认。
    • multiple:是否批量确认。值为true则会一次性 ack 所有小于或等于指定deliveryTag的消息;值为false,则只确认当前指定deliveryTag的消息。
  • 否定确认:Channel.basicReject(long deliveryTag, boolean requeue):消费者客户端可以调用channel.basicReject方法来告诉 RabbitMQ 拒绝这个消息。
    • deliveryTag:参考channel.basicAck
    • requeue:表示拒绝后,这条消息如何处理。如果requeue参数设置为true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue参数设置为false,则 RabbitMQ 会把消息从队列中移除,而不会把它发送给新的消费者。
  • 否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):如果想要批量拒绝消息,可以使用Basic.Nack命令。
    • 参数介绍参考前面两个方法。multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。

1.3 代码示例

我们基于 Spring Boot 来演示消息的确认机制,Spring - AMQP 对消息确认机制提供了三种策略:

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
1.3.1 AcknowledgeMode.NONE
  • 配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
  • 发送消息
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}@Configuration
public class RabbitmqConfig {@Bean("ackExchange")public Exchange ackExchange(){return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}
}@RestController
@RequestMapping("/producer")
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");return "发送成功!";}
}
  • 消费端逻辑
@Component
public class AckQueueListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");}
}

运行结果:消息处理失败,但消息已从 RabbitMQ 中移除,因为NONE模式下消息一旦投递就会被自动确认。

1.3.2 AcknowledgeMode.AUTO
  • 配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto

重新运行程序,当消费者出现异常时,RabbitMQ 会不断重发消息,由于异常多次重试还是失败,消息没被确认,也无法 nack,就一直是unacked状态,导致消息积压。

1.3.3 AcknowledgeMode.MANUAL
  • 配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual
  • 消费端手动确认逻辑
@Component
public class AckQueueListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//手动设置一个异常, 来测试异常拒绝机制// int num = 3/0;//3. 手动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送, 若为false, 则直接丢弃channel.basicNack(deliveryTag, true, true);}}
}

运行结果:消息正常处理时会被签收;异常时会不断重试。


二、持久性

2.1 交换机持久化

交换器的持久化是通过在声明交换机时将durable参数置为true实现的。这样当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建立交换机,交换机会自动建立。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失。

ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();

2.2 队列持久化

队列的持久化是通过在声明队列时将durable参数置为true实现的。如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,该队列就会被删掉,数据也会丢失。但队列持久化不能保证内部所存储的消息不丢失,要确保消息不丢失,需要将消息设置为持久化。

QueueBuilder.durable(Constant.ACK_QUEUE).build();

创建非持久化队列:

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build(); 

2.3 消息持久化

消息实现持久化,需要把消息的投递模式(MessageProperties中的deliveryMode)设置为 2,也就是MessageDeliveryMode.PERSISTENT

// 要发送的消息内容
String message = "This is a persistent message";
// 创建一个Message对象,设置为持久化
Message messageObject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使用RabbitTemplate发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);

需要注意的是,将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能,因为写入磁盘的速度比写入内存的速度慢很多。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

即使将交换器、队列、消息都设置了持久化,也不能百分之百保证数据不丢失。例如,消费者订阅队列时autoAck参数设置为true,消费者接收到消息后还没来得及处理就宕机,会导致数据丢失;持久化的消息存入 RabbitMQ 后,还需要一段时间才能存入磁盘,如果在这段时间内 RabbitMQ 服务节点发生异常,消息可能会丢失。可以通过引入 RabbitMQ 的仲裁队列或在发送端引入事务机制、发送方确认机制来提高可靠性。(后续都会讲到)


三、发送方确认

3.1 confirm 确认模式

Producer 在发送消息时,对发送端设置一个ConfirmCallback的监听,无论消息是否到达Exchange,这个监听都会被执行。如果Exchange成功收到消息,ACKtrue;如果没收到消息,ACKfalse

配置 RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确认

设置确认回调逻辑并发送消息

@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){System.out.printf("消息接收成功, id:%s \n", correlationData.getId());}else {System.out.printf("消息接收失败, id:%s, cause: %s", correlationData.getId(), cause);}}});return rabbitTemplate;}
}@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedException {CorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm", "confirm test...", correlationData1);return "确认成功";}
}
  • 测试
    运行程序,调用接口http://127.0.0.1:8080/product/confirm,观察控制台,消息确认成功。修改交换机名称,重新运行,会触发消息发送失败的结果。

3.2 return 退回模式

消息到达Exchange之后,会根据路由规则匹配,把消息放入Queue中。如果一条消息无法被任何队列消费,可以选择把消息退还给发送者。

配置 RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确认
  • 设置返回回调逻辑并发送消息
@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}});return rabbitTemplate;}
}@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/msgReturn")public String msgReturn(){CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm11", "message return test...", correlationData);return "消息发送成功";}
}

测试
运行程序,调用接口http://127.0.0.1:8080/product/msgReturn,观察控制台,消息被退回。


四、重试机制

在消息传递过程中,可能会遇到各种问题,如网络故障、服务不可用、资源不足等,这些问题可能导致消息处理失败。为了解决这些问题,RabbitMQ 提供了重试机制,允许消息在处理失败后重新发送。但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数。

4.1 重试配置

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto #消息接收确认retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长为5秒max-attempts: 5 # 最大重试次数(包括自身消费的一次)

4.2 配置交换机 & 队列

//重试机制
public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";//重试机制 发布订阅模式
//1. 交换机
@Bean("retryExchange")
public Exchange retryExchange() {return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
}
//2. 队列
@Bean("retryQueue")
public Queue retryQueue() {return QueueBuilder.durable(Constant.RETRY_QUEUE).build();

五:如何保证 RabbitMQ 消息的可靠传输?

消息可能丢失的场景以及解决方案如下:

生产者将消息发送到 RabbitMQ 失败

  • 可能原因是网络问题等,
  • 解决办法是参考发送方确认 - confirm确认模式

消息在交换机中无法路由到指定队列

  • 可能原因是代码或者配置层面错误,导致消息路由失败,
  • 解决办法是参考发送方确认 - return模式

消息队列自身数据丢失

  • 可能原因是消息到达 RabbitMQ 之后,RabbitMQ Server 宕机导致消息丢失,
  • 解决办法是参考持久化开启 RabbitMQ 持久化,也可以通过集群的方式提高可靠性。

消费者异常,导致消息丢失

  • 可能原因是消息到达消费者,还没来得及消费,消费者宕机或消费者逻辑有问题,
  • 解决办法是参考消息确认,开启手动确认,配置重试机制

以上就是四个RabbitMQ保证消息可靠性的四个机制,后续有更多核心机制的更新,感谢阅览!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/72856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用DeepSeek-R1-Distill-data-110k蒸馏中文数据集 微调Qwen2.5-7B-Instruct!

下载模型与数据 模型下载: huggingface: Qwen/Qwen2.5-7B-Instruct HF MirrorWe’re on a journey to advance and democratize artificial intelligence through open source and open science.https://hf-mirror.com/Qwen/Qwen2.5-7B-Instruct 魔搭&a…

在IDEA中进行git回滚操作:Reset current branch to here‌或Reset HEAD

问题描述 1)在本地修改好的代码,commit到本地仓库,突然发觉有问题不想push推到远程仓库了,但它一直在push的列表中存在,那该怎么去掉push列表中的内容呢? 2)合并别的分支到当前分支&#xff0…

六十天前端强化训练之第十一天之事件机制超详解析

欢迎来到编程星辰海的博客讲解 目录 一、事件模型演进史 1.1 原始事件模型(DOM Level 0) 1.2 DOM Level 2事件模型 1.3 DOM Level 3事件模型 二、事件流深度剖析 2.1 捕获与冒泡对比实验 2.2 事件终止方法对比 三、事件委托高级应用 3.1 动态元…

匿名GitHub链接使用教程(Anonymous GitHub)2025

Anonymous GitHub 1. 引言2. 准备3. 进入Anonymous GitHub官网4. 用GitHub登录匿名GitHub并授权5. 进入个人中心,然后点击• Anonymize Repo实例化6. 输入你的GitHub链接7. 填写匿名链接的基础信息8. 提交9. 实例化对应匿名GitHub链接10. 进入个人中心管理项目11. 查…

Git系列之git tag和ReleaseMilestone

以下是关于 Git Tag、Release 和 Milestone 的深度融合内容,并补充了关于 Git Tag 的所有命令、详细解释和指令实例,条理清晰,结合实际使用场景和案例。 1. Git Tag 1.1 定义 • Tag 是 Git 中用于标记特定提交(commit&#xf…

开源项目介绍:Native-LLM-for-Android

项目地址:Native-LLM-for-Android 创作活动时间:2025年 支持在 Android 设备上运行大型语言模型 (LLM) ,具体支持的模型包括: DeepSeek-R1-Distill-Qwen: 1.5B Qwen2.5-Instruct: 0.5B, 1.5B Qwen2/2.5VL:…

深入理解 Java 虚拟机内存区域

Java 虚拟机(JVM)是 Java 程序运行的核心环境,它通过内存管理为程序提供高效的执行支持。JVM 在运行时将内存划分为多个区域,每个区域都有特定的作用和生命周期。本文将详细介绍 JVM 的运行时数据区域及其功能,并探讨与…

PDF转JPG(并去除多余的白边)

首先,手动下载一个软件(poppler for Windows),下载地址:https://github.com/oschwartz10612/poppler-windows/releases/tag/v24.08.0-0 否则会出现以下错误: PDFInfoNotInstalledError: Unable to get pag…

VanillaVueSvelteReactSolidAngularPreact前端框架/库的简要介绍及其优势

VanillaVueSvelteReactSolidAngularPreact前端框架/库的简要介绍及其优势。以下是这些前端框架/库的简要介绍及其优势: 1. Vanilla 定义:Vanilla 并不是一个框架,而是指 原生 JavaScript(即不使用任何框架或库)。优势…

Jmeter接口测试详解

今天笔者呢,想给大家聊聊Jmeter接口测试流程详解,废话不多说直接进入正题。 一、jmeter简介 Jmeter是由Apache公司开发的java开源项目,所以想要使用它必须基于java环境才可以; Jmeter采用多线程,允许通过多个线程并…

DeepSeek开启AI办公新模式,WPS/Office集成DeepSeek-R1本地大模型!

从央视到地方媒体,已有多家媒体机构推出AI主播,最近杭州文化广播电视集团的《杭州新闻联播》节目,使用AI主持人进行新闻播报,且做到了0失误率,可见AI正在逐渐取代部分行业和一些重复性的工作,这一现象引发很…

通过Golang的container/list实现LRU缓存算法

文章目录 力扣:146. LRU 缓存主要结构 List 和 Element常用方法1. 初始化链表2. 插入元素3. 删除元素4. 遍历链表5. 获取链表长度使用场景注意事项 源代码阅读 在 Go 语言中,container/list 包提供了一个双向链表的实现。链表是一种常见的数据结构&#…

【大学生体质】智能 AI 旅游推荐平台(Vue+SpringBoot3)-完整部署教程

智能 AI 旅游推荐平台开源文档 项目前端地址 ☀️项目介绍 智能 AI 旅游推荐平台(Intelligent AI Travel Recommendation Platform)是一个利用 AI 模型和数据分析为用户提供个性化旅游路线推荐、景点评分、旅游攻略分享等功能的综合性系统。该系统融合…

DeepSeek R1-32B医疗大模型的完整微调实战分析(全码版)

DeepSeek R1-32B微调实战指南 ├── 1. 环境准备 │ ├── 1.1 硬件配置 │ │ ├─ 全参数微调:4*A100 80GB │ │ └─ LoRA微调:单卡24GB │ ├── 1.2 软件依赖 │ │ ├─ PyTorch 2.1.2+CUDA │ │ └─ Unsloth/ColossalAI │ └── 1.3 模…

npm install -g @vue/cli 方式已经无法创建VUE3项目

采用该方式,启动VUE3项目,运行命令,出现报错: npm install -g vue/cli PS D:\> npm install -g vue/cli npm warn deprecated inflight1.0.6: This module is not supported, and leaks memory. Do not use it. Check out lr…

3.8[a]cv

函数核心目标 实现屏幕空间内三角形的光栅化,将三角形覆盖的像素点颜色填充到帧缓冲区,同时处理深度测试(Z-Buffer)。这是渲染管线中几何阶段到像素阶段的关键步骤 包围盒计算(Bounding Box)​** ​功能&…

导入 Excel 规则批量修改或删除 Excel 表格内容

我们前面介绍过按照规则批量修改 Excel 文档内容的操作,可以对大量的 Excel 文档按照一定的规则进行统一的修改,可以很好的解决我们批量修改 Excel 文档内容的需求。但是某些场景下,我们批量修改 Excel 文档内容的场景比较复杂,比…

在人工智能软件的帮助下学习编程实例

1 引言 本文记录在人工智能软件的帮助下学习一种全新的编程环境的实例,之所以提人工智能软件而不是单指DeepSeek,一方面DeepSeek太火了,经常服务器繁忙,用本机本地部署的最多运行70b模型,又似乎稍差。另一方面也作为一…

Selenium遇到Exception自动截图

# 随手小记 场景:测试百度: 点击新闻,跳转到新的窗口,找到输入框,输入“hello,world" 等到输入框的内容是hello,world, 这里有个错误,少了一个] 后来就实现了错误截图的功能,可以参考 …

【神经网络】python实现神经网络(一)——数据集获取

一.概述 在文章【机器学习】一个例子带你了解神经网络是什么中,我们大致了解神经网络的正向信息传导、反向传导以及学习过程的大致流程,现在我们正式开始进行代码的实现,首先我们来实现第一步的运算过程模拟讲解:正向传导。本次代…