RabbitMQ消息的发布确认机制详解

RabbitMQ发布确认机制确保消息从生产者成功传输到交换机和队列,提高系统可靠性。在Spring Boot项目中,通过配置publisher-confirm-typepublisher-returns,启用发布确认和消息返回机制。配置RabbitTemplate的确认回调和返回回调,可以捕捉消息传输状态,处理不同传输结果。测试场景包括消息无法到达交换机、消息到达交换机但无法到达队列以及消息成功到达队列。通过合理设置和优化,可以确保高并发环境下的消息可靠传输,适用于金融支付、电商系统等对消息传输可靠性要求高的场景。

1. RabbitMQ发布确认机制概述

发布确认(Publisher Confirms)是RabbitMQ提供的一种机制,用于确保消息从生产者发送到RabbitMQ服务器并被成功处理。与事务机制不同,发布确认的性能开销更小,非常适合高吞吐量的场景。发布确认机制提供了两种类型的确认:

  • 消息到达交换机(Exchange)后的确认
  • 消息从交换机路由到队列(Queue)后的确认

2. 配置文件中添加发布确认相关配置

在Spring Boot项目中,通过配置文件来启用发布确认机制非常方便。以下是需要添加到application.propertiesapplication.yml中的配置:

# 消息到达交换机后会回调发送者
spring.rabbitmq.publisher-confirm-type=correlated
# 消息无法路由到队列时回调发送者
spring.rabbitmq.publisher-returns=true

配置解释:

  • publisher-confirm-type:设置为correlated表示使用CorrelationData来关联确认与发送的消息。
  • publisher-returns:设置为true表示启用消息返回机制,当消息无法路由到队列时会触发回调。

3. 发布确认类型

在Spring AMQP中,发布确认类型通过ConfirmType枚举类来定义:

public enum ConfirmType {SIMPLE,     // 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()CORRELATED, // 使用 CorrelationData 关联确认与发送的消息NONE        // 不启用发布确认
}

4. 配置RabbitTemplate

为了使用发布确认机制,需要配置RabbitTemplate,包括设置确认回调和返回回调:

@Slf4j
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 设置mandatory为true,当找不到队列时,broker会调用basic.return方法将消息返还给生产者rabbitTemplate.setMandatory(true);// 设置确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息已经到达Exchange");} else {log.info("消息没有到达Exchange");}if (correlationData != null) {log.info("相关数据:" + correlationData);}if (cause != null) {log.info("原因:" + cause);}});// 设置返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息无法到达队列时触发");log.info("ReturnCallback:     " + "消息:" + message);log.info("ReturnCallback:     " + "回应码:" + replyCode);log.info("ReturnCallback:     " + "回应信息:" + replyText);log.info("ReturnCallback:     " + "交换机:" + exchange);log.info("ReturnCallback:     " + "路由键:" + routingKey);});return rabbitTemplate;}
}

5. 配置测试交换机和队列

为了测试发布确认机制,我们需要配置相应的交换机和队列:

@Slf4j
@Configuration
public class ConfirmConfig {@Beanpublic Queue confirmQueue() {return new Queue(Constant.CONFIRM_QUEUE, false);}@BeanDirectExchange confirmExchange() {DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, false, false);directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);return directExchange;}@BeanBinding bindingConfirm() {return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(Constant.CONFIRM_ROUTING_KEY);}@BeanFanoutExchange backupExchange() {return new FanoutExchange(Constant.CONFIRM_BACKUP_EXCHANGE, false, false);}@Beanpublic Queue backupQueue() {return new Queue(Constant.CONFIRM_BACKUP_QUEUE, false);}@Beanpublic Queue warningQueue() {return new Queue(Constant.CONFIRM_WARNING_QUEUE, false);}@BeanBinding bindingConfirmBackup() {return BindingBuilder.bind(backupQueue()).to(backupExchange());}@BeanBinding bindingConfirmWarning() {return BindingBuilder.bind(warningQueue()).to(backupExchange());}
}

6. 测试场景及分析

6.1 消息无法到达交换机

测试代码:

@Autowired
RabbitTemplate rabbitTemplate;
String msg = "一条用于发布确认的消息";@GetMapping("/noExchange")
public void noExchange() {rabbitTemplate.convertAndSend("noExchange", "noExchange", msg);
}

配置了rabbitTemplate.setMandatory(true),当消息无法到达交换机时会回调:

ConfirmCallback 消息没有到达Exchange
ConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)

6.2 消息到达交换机但无法到达队列

测试代码:

@GetMapping("/toExchange")
public void toExchange() {rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, "xxx.xxx.xxx", msg);
}

输出:

ConfirmCallback 消息已经到达Exchange

没有收到无法到达队列的消息,是因为配置了备份队列,消息被路由到了备份队列。

6.3 注掉备份队列再试

修改配置:

@Bean
DirectExchange confirmExchange() {DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, true, false);return directExchange;
}

测试结果:

消息无法到达队列时触发
ReturnCallback:     消息:(Body:'一条用于发布确认的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback:     回应码:312
ReturnCallback:     回应信息:NO_ROUTE
ReturnCallback:     交换机:myConfirmExchange
ReturnCallback:     路由键:xxx.xxx.xxx
ConfirmCallback 消息已经到达Exchange

此时,ConfirmCallbackReturnCallback都被调用了。

6.4 成功到达队列

测试代码:

@GetMapping("/toQueue")
public void toQueue() {rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, Constant.CONFIRM_ROUTING_KEY, msg);
}

输出:

ConfirmCallback 消息已经到达Exchange

7. 发布确认流程

下图展示了RabbitMQ发布确认流程:

image.png

8. 深入解析RabbitMQ发布确认机制

8.1 事务机制与发布确认机制的对比

事务机制和发布确认机制都是确保消息可靠投递的手段,但它们在实现和性能方面有明显区别:

  • 事务机制:通过txSelecttxCommittxRollback实现,性能开销较大,不适合高并发场景。
  • 发布确认机制:通过异步确认消息是否成功到达交换机和队列,性能开销小,适合高并发场景。

8.2 发布确认机制的优缺点

优点
  1. 性能高:相比事务机制,发布确认机制对性能的影响较小。
  2. 异步处理:使用回调函数处理确认结果,不阻塞消息发送。
  3. 可靠性高:确保消息成功到达交换机和队列,提高系统可靠性。
缺点
  1. 实现复杂:需要配置和处理回调函数,增加了代码复杂度。
  2. 延迟高:确认机制引入了额外的网络延迟。

8.3 发布确认机制的应用场景

  1. 金融支付系统:确保支付消息的可靠传输,避免重复支付或支付丢失。
  2. 电商系统:确保订单消息的可靠传输,避免订单丢失或重复处理。
  3. 日志系统:确保日志消息的可靠传输,避免日志丢

失。

8.4 发布确认机制的最佳实践

  1. 合理设置超时时间:在高并发场景下,设置合理的超时时间,避免消息发送阻塞。
  2. 优化回调函数:回调函数中避免复杂逻辑,确保回调处理快速完成。
  3. 监控和报警:建立监控机制,及时发现和处理消息投递失败问题。

9. 总结

本文详细介绍了RabbitMQ消息的发布确认机制,包括配置、实现及其在不同场景下的表现。通过合理配置和使用发布确认机制,可以有效提高消息传输的可靠性,确保消息在高并发环境下的可靠投递。希望本文能够帮助读者深入理解并应用RabbitMQ的发布确认机制,提高系统的可靠性和性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/849685.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里 Qwen2 模型开源,教你如何将 Qwen2 扩展到百万级上下文

本次开源的 Qwen2 模型包括 5 个尺寸&#xff0c;分别是 0.5B、1.5B、7B、72B、57B&#xff0c;其中 57B 的属于 MoE 模型&#xff08;激活参数 14B&#xff09;&#xff0c;其余为 Dense 模型&#xff0c;本篇文章会快速介绍下各个尺寸模型的情况&#xff0c;然后重点介绍下如…

20212416 2023-2024-2 《网络与系统攻防技术》实验八实验报告

Web安全实践 1.实验内容2.实验过程2.1 Web前端HTML2.1.1 正常安装、启停Apache2.1.2 编写一个含有表单的HTML2.1.2.1 基础知识2.1.2.2 实践 2.2 Web前端javascipt2.2.1 基础知识2.2.2 实践 2.3 Web后端&#xff1a;MySQL基础2.3.1 正常安装、启动MySQL2.3.2 创建用户、修改密码…

torch.cat 与 torch.concat函数

文章目录 区别torch.cat介绍作用参数使用实例关于参数dim为None的使用 区别 先说结论&#xff1a;没有区别在功能、用法以及作用上&#xff0c;concat函数就是cat函数的别名&#xff08;官方就是这样说的&#xff09;。下面截图为证&#xff1a;   因此接下来就主要是介绍 to…

[NOVATEK] NT96580行车记录仪功能学习笔记

一、u-Boot升级灯 运行u-Boot程序时LED灯闪烁,找到运行过程中一直在运行的函数在里面进行LED引脚电平的翻转 宏定义 Z:\SunFan\AHD580\pip\na51055_PIP\BSP\u-boot\include\configs\nvt-na51055-evb.h Z:\SunFan\AHD580\pip\na51055_PIP\BSP\u-boot\drivers\mtd\nvt_flash_…

MATLAB format

在MATLAB中&#xff0c;format 是一个函数&#xff0c;用于控制命令窗口中数值的显示格式。这个函数可以设置数值的精度、显示的位数等。以下是一些常用的 format 命令&#xff1a; format long&#xff1a;以默认的长格式显示数值&#xff0c;通常显示15位有效数字。format s…

【Linux】深入解析动静态库:原理、制作、使用与动态链接机制

文章目录 前言&#xff1a;1. 什么是动静态库2. 动静态库的制作和使用3. 动态库的查找问题4. 理解动态库的加载4.1. 站在系统的角度理解4.2. 编址、可执行程序4.3. 动态库动态链接和加载问题 总结&#xff1a; 前言&#xff1a; 在软件开发中&#xff0c;动静态库是两种重要的…

11.盛水最多的容器

给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你不能倾斜容器。 示例 1&a…

编写程序提示用户输入一个数目(例如:100)、年利率(例如:5)以及月份数(例如:6),然后显示给定月份后账户上的钱数。

(财务应用程序:复利值)假设你每月向银行账户存 100美元&#xff0c;年利率为5%&#xff0c;那么每 月利率是 0.05/12-0.00417。 第一个月之后&#xff0c;账户上的值就变成:100*(10.00417)100.417 第二个月之后&#xff0c;账户上的值就变成(100100.417)*(10.00417)-201.252 第…

算法金 | 不愧是腾讯,问基础巨细节 。。。

大侠幸会&#xff0c;在下全网同名「算法金」 0 基础转 AI 上岸&#xff0c;多个算法赛 Top 「日更万日&#xff0c;让更多人享受智能乐趣」 最近&#xff0c;有读者参加了腾讯算法岗位的面试&#xff0c;面试着重考察了基础知识&#xff0c;并且提问非常详细。 特别是关于Ada…

[UE 虚幻引擎] DTLoadFbx 运行时加载FBX本地模型插件说明

本插件可以在打包后运行时动态加载FBX模型。 新建一个Actor 并添加一个 DT Runtime Fbx Component。 然后直接调用组件的函数 LoadFile 加载显示模型&#xff08;注&#xff1a;不支持模型动画&#xff09; FilePath : 加载模型的绝对路径。 Create Collision : 是否创建碰撞…

使用python绘制桑基图

使用python绘制桑基图 桑基图效果代码 桑基图 桑基图&#xff08;Sankey Diagram&#xff09;是一种用来表示流动&#xff08;如能源、资金、材料等&#xff09;在不同实体之间转移的图表。 每个流的宽度与流量成正比&#xff0c;通常用于显示能量或成本流动的分布情况。 桑基…

C++的重载

重载关系 同一作用域中&#xff0c;函数名相同&#xff0c;参数表不同的函数只有同一作用域中的同名函数才涉及重载问题&#xff0c;不 同作用域中同名函数遵循标识符隐藏原则 #include <QtCore/QCoreApplication> #include <QList> #include <QDebug> #in…

Cloudpods 强大的多云管理平台部署

简介 Cloudpods 是一款简单、可靠的企业IaaS资源管理软件。帮助未云化企业全面云化IDC物理资源&#xff0c;提升企业IT管理效率。 Cloudpods 帮助客户在一个地方管理所有云计算资源。统一管理异构IT基础设施资源&#xff0c;极大简化多云架构复杂度和难度&#xff0c;帮助企业…

AI绘画教程分享:Stable Diffusion使用指南,12000+AI关键词大合集

01 首先下载好SD的安装包&#xff08;百度、B站、小红书等都可以找到资源&#xff09;&#xff0c;用启动器开始运行 02 从这里下载别人的模型套用&#xff0c;可以多多探索一下&#xff01;以下是各个模型的具体介绍&#xff1a; 03 这就是我们打开的初始界面&#xff0c;常…

CondaSSLError: OpenSSL appears to be unavailable on this machine.

conda create -n x1 python3.7报错 PS C:\Users\Richardo.M.Song\Desktop\lele_seg\x1> conda create -n x1 python3.7 Collecting package metadata (current_repodata.json): failed CondaSSLError: OpenSSL appears to be unavailable on this machine. OpenSSL is requ…

Allure在jenkins中无法显示的问题

jenkins中使用allure生成报告需要注意工作环境和路径的配置 前提条件&#xff1a; jenkins容器中已安装jdk和allure jenkins中配置全局工具环境&#xff1a; 项目中配置allure路径&#xff1a; 路径来源&#xff1a; Path需要选择相对路径的allure-report、allure-results

第八篇——矢量化:象形文字和拼音文字是如何演化的?

目录 一、背景介绍二、思路&方案三、过程1.思维导图2.文章中经典的句子理解3.学习之后对于投资市场的理解4.通过这篇文章结合我知道的东西我能想到什么&#xff1f; 四、总结五、升华 一、背景介绍 通过这篇看似在讲文字的演化过程&#xff0c;实际是在说人生应该如何走&a…

天才简史——Tamim Asfour与他的H²T实验室

一、Tamim Asfour介绍 Tamim Asfour为KIT人类学和机器人学研究所&#xff08;Institute for Anthropomatics and Robotics&#xff09;的全职教授&#xff0c;并担任高性能人形技术实验室 (High Performance Humanoid Technologies Lab&#xff0c;HT) 负责人。他目前的研究兴…

使用LabVIEW进行大数据数组操作的优化方法

针对大数据量数组操作&#xff0c;传统的内存处理方法可能导致内存不足。通过LabVIEW的图像批处理技术&#xff0c;可以有效地进行大数据数组操作&#xff0c;包括分块处理、并行处理和内存优化等。这种方法能显著提高处理效率和系统稳定性。 图像批处理的优势 内存优化&#…

vs2017中C2440错误:“初始化”:无法从const char[6]转换为char*问题解决

本文摘要&#xff1a;本文已解决 Python FileNotFoundError 的相关报错问题&#xff0c;并总结提出了几种可用解决方案。同时结合人工智能GPT排除可能得隐患及错误。 &#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领…