RabbitMQ 高级特性——发送方确认

在这里插入图片描述

文章目录

  • 前言
  • 发送方确认
    • confirm 确认模式
    • return 退回模式
  • 常见面试题

前言

前面我们学习了 RabbitMQ 中交换机、队列和消息的持久化,这样能够保证存储在 RabbitMQ Broker 中的交换机和队列中的消息实现持久化,就算 RabbitMQ 服务发生了重启或者是宕机,也不会导致交换机和消息的丢失。那么这个机制是保证存储在 RabbitMQ Broker 中的可靠性,但是对于生产者发送的消息如果都到达不了 RabbitMQ 的话,那么这些持久化操作也就没有意义了,那么对于生产者发送的消息,生产者如何知道消息是否已经成功到达 RabbitMQ Broker 了呢?这里就需要用到 RabbitMQ 发送发确认这个特性了,前面我们大概的讲了一下 RabbitMQ Java Client 中的 Publisher/confirm 发送方确认,那么这篇文章我们将学习在 SpringBoot 中如何实现发送方确认。

发送方确认

其实对于上面的问题,RabbitMQ 为我们提供了两种解决方案:

  1. 通过事务机制实现
  2. 通过发送方确认机制实现

因为使用事务机制的话比较消耗性能,在实际工作中使用的不多,所以我们就主要介绍发送方确认的机制来实现发送方的确认。并且对于发送方确认的机制 RabbitMQ 也为我们提供了两个方式来控制消息的可靠性。

  1. confirm 确认模式
  2. return 退回模式

confirm 模式是确认消息是否到达指定的 Exchange 交换机的,而 return 退回模式则是确认消息是否到达指定队列的。

confirm 确认模式

Producer 在发送消息的时候,对发送端设置一个 ConfirmCallback 的监听,无论消息是否到达 Exchange,这个监听都会被执行,如果 Exchange 成功收到,ACK (Ackonwledge character 确认字符)为 true,如果没有收到消息,ACK 就为 false。

那么下面我们就来看看在 SpringBoot 中如何实现 confirm 确认模式:

首先在配置文件中配置信息:

spring:rabbitmq:publisher-confirm-type: correlated #消息发送确认

然后设置确认回调函数的内容并且发送消息:

无论消息是否成功送到,都会执行这个回调函数,确认消息是否成功送达的判断依据就是 ACK 的值:

public class Constants {public static final String CONFIRM_EXCHANGE = "confirm.exchange";public static final String CONFIRM_QUEUE = "confirm.queue";
}
@Configuration
public class RabbitConfig {@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("confirm");}@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {//创建新的RabbitTemplate对象,并且设置confirm回调函数RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.printf("消息接收成功,id:%s\n",correlationData.getId());}else {System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);}}});return rabbitTemplate;}
}
@RequestMapping("/producer")
@RestController
public class ProducerController {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","rabbitmq confirm",correlationData);return "消息确认成功";}
}

在这里插入图片描述
然后我们指定交换机的时候,如果指定一个不存在的交换机,也就是消息无法到达指定的交换机,那么看看时候会执行确认回调函数:

rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + 1,"confirm","rabbitmq confirm",correlationData);

2024-08-13 14:47:52.646 ERROR 11252 — [3.57.1.114:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)
消息接受失败,id:1,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)

可以看到,如果消息没有到达指定的交换机,那么也是会执行相应的回调函数的。

public interface ConfirmCallback {
/**
* 确认回调
* @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消
息
* @param ack: 交换机是否收到消息, 收到为true, 未收到为false
* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调
试和错误处理.
* 成功时, cause为null
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack,
@Nullable String cause);
}

RabbitTemplate.ConfirmCallback 和 ConfirmListener 的区别:

  • RabbitTemplate.ConfirmCallback:这是Spring AMQP库提供的一个回调接口,主要用于在使用RabbitTemplate发送消息时,接收来自RabbitMQ服务器的确认信息。这些确认信息表明消息是否已成功发送到RabbitMQ的交换机(Exchange)。
  • ConfirmListener:这个接口或功能更多是直接与RabbitMQ的Channel相关,而不是直接通过Spring AMQP的RabbitTemplate来使用的。它用于监听RabbitMQ Channel上的消息确认事件,包括消息的ACK(确认)和NACK(不确认)。这种方式通常需要更底层的操作,直接处理RabbitMQ的Channel和连接。

return 退回模式

当消息成功到达 Exchange 交换机的时候,交换机会根据路由规则匹配对应的队列,将消息路由到指定的队列,在消息从 Exchange 到 Queue 的过程中,如果一条消息无法被任何队列消费(即没有队列与消息的 Routing Key 匹配或者队列不存在等),可以选择把消息退回给发送者,消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理。

那么使用 SpringBoot 如何实现 return 退回模式呢?

首先还是需要进行配置,配置和上面的 confirm 模式是一样的:

spring:rabbitmq:publisher-returns: true #设置回退

设置返回回调逻辑并发送消息:

@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.printf("消息接收成功,id:%s\n",correlationData.getId());}else {System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);}}});rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.printf("消息被退回:%s",returnedMessage);}});return rabbitTemplate;
}

setConfirmCallback() 和 setReturnCallback() 方法可以同时存在也可以单独设置。

rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm1","rabbitmq confirm",correlationData);

发送消息的时候,我们的 Routing Key 设置为没有 Binding Key 与之匹配的,然后来看看这个 returnCallback 是否会被执行:

消息被退回:ReturnedMessage [message=(Body:‘rabbitmq confirm’ MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=confirm1]
消息接收成功,id:1

消息成功到达了 Exhcange,但是没有到达指定的队列,所以执行了 returnCallback 方法。

public class ReturnedMessage {//返回的消息对象,包含了消息体和消息属性private final Message message;//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义.private final int replyCode;//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.private final String replyText;//消息被发送到的交换机名称private final String exchange;//消息的路由键,即发送消息时指定的键private final String routingKey;
}

常见面试题

如何保证 RabbitMQ 消息的可靠传输:

在这里插入图片描述

从这个图中可以看出,消息可能丢失的场景以及解决方案:

  1. 生产者将消息发送到RabbitMQ失败
    a. 可能原因: 网络问题等
    b. 解决办法: 参考本章节[发送方确认-confirm确认模式]

  2. 消息在交换机中无法路由到指定队列:
    a. 可能原因: 代码或者配置层面错误,导致消息路由失败
    b. 解决办法: 参考本章节[发送方确认-return模式]

  3. 消息队列自身数据丢失
    a. 可能原因: 消息到达RabbitMQ之后,RabbitMQ Server宕机导致消息丢失。
    b. 解决办法: 参考本章节[持久性]。开启RabbitMQ持久化,就是消息写入之后会持久化到磁盘,如果RabbitMQ挂了,恢复之后会自动读取之前存储的数据。(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)

  4. 消费者异常,导致消息丢失
    a. 可能原因: 消息到达消费者,还没来得及消费,消费者宕机。消费者逻辑有问题。
    b. 解决办法: 参考本章节[消息确认]。RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动应答的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制(参考下一章节),当消息消费异常时,通过消息重试确保消息的可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/54508.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx实用篇:实现负载均衡、限流与动静分离

Nginx实用篇&#xff1a;实现负载均衡、限流与动静分离 | 原创作者/编辑&#xff1a;凯哥Java | 分类&#xff1a;Nginx学习系列教程 Nginx 作为一款高性能的 HTTP 服务器及反向代理解决方案&#xff0c;在互联网架构中扮演着至关重要的角色。它…

Acwing DFS

DFS&#xff1a;深度优先搜索 DFS与BFS的对比 DFS使用栈来实现&#xff0c;BFS使用队列来实现 DFS所需要的空间是 O ( h ) O(h) O(h),而BFS需要的空间是 O ( 2 h ) O(2^h) O(2h),其中h是树的高度&#xff1b; DFS不具有最短路的特性&#xff0c;BFS有最短路的特性 DFS回溯…

102.SAPUI5 sap.ndc.BarcodeScannerButton调用摄像头时,localhost访问正常,使用IP访问失败

目录 原因 解决办法 1.修改谷歌浏览器的setting 2.在tomcat中配置https访问 参考 使用SAPUI5的sap.ndc.BarcodeScannerButton调用摄像头时&#xff0c;localhost访问正常&#xff0c;使用IP访问时&#xff0c;一直打不开摄像头&#xff0c;提示getUserMedia()问题。 原因…

2024 “华为杯” 中国研究生数学建模竞赛(D题)深度剖析|大数据驱动的地理综合问题|数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题&#xff01; CS团队倾注了大量时间和心血&#xff0c;深入挖掘解…

elasticsearch同步mysql方案

文章目录 1、1. 使用数据库触发器2. 使用定时任务3. 监听MySQL二进制日志&#xff08;binlog&#xff09;4. 使用数据管道5. 使用第三方工具或服务6. 编写自定义脚本注意事项 2、1. 使用Logstash步骤&#xff1a;示例配置&#xff1a; 2. 使用Debezium步骤&#xff1a; 3. 自定…

828华为云征文 | 解锁企业级邮件服务,在华为云fFlexus x实例上部署Mailcow开源方案

前言 华为云Flexus X实例携手Mailcow开源邮件方案&#xff0c;为企业打造了一个既高效又安全的邮件服务解决方案。Flexus X实例的柔性算力与高性能&#xff0c;是这一方案的坚实基石。它提供CPU内存的灵活定义&#xff0c;以经济型价格实现旗舰级性能&#xff0c;确保邮件服务的…

云计算课程作业1

作业1 Xmanager连接 rhel连接 作业2 首先确认你的虚拟机设置的是NAT 1-3 然后打开这篇blog&#xff0c;并完成第一步和第二步 因为我们是NAT&#xff0c;所以不需要连接网桥&#xff0c;即跳过第三步&#xff0c;但是这里ping一下测试网络连接 2- 如果到这里你发现提示yum…

Stylized Smooth Clouds 卡通风格化云朵包

下载:​​Unity资源商店链接资源下载链接 效果图:

828华为云征文|Flexus X实例Docker+Jenkins+gitee实现CI/CD自动化部署-解放你的双手~

目录 前言 实验步骤 环境准备 安装Portainer 拉取镜像 更换镜像源 启动容器 安装jenkins 拉取镜像 获取管理员密码 新建流水线项目 Portainer配置 gitee配置WebHooks 构建 修改代码&#xff0c;自动部署 前言 &#x1f680; 828 B2B企业节特惠来袭&#xff0c;…

如何通过蜂巢(容器安全)管理内部部署数据安全产品与云数据安全产品?

本文将探讨内部部署和云数据安全产品之间的主要区别。在思考这个问题之前&#xff0c;首先了解内部部署和云数据安全产品之间的主要区别。 内部部署数据安全产品意味着管理控制台位于企业客户的内部部署&#xff0c;而德迅云安全则在云中托管云数据安全产品。德迅云安全供应商通…

gin集成jaeger中间件实现链路追踪

1. 背景 新业务线带来新项目启动&#xff0c;需要改进原有项目的基础框架和组件能力&#xff0c;以提升后续开发和维护效率。项目搭建主要包括技术选型、框架搭建、基础服务搭建等。这其中就涉及到链路追踪的内容&#xff0c;结合其中的踩坑情况&#xff0c;用一篇文章来说明完…

【第十三章:Sentosa_DSML社区版-机器学习聚类】

目录 【第十三章&#xff1a;Sentosa_DSML社区版-机器学习聚类】 13.1 KMeans聚类 13.2 二分KMeans聚类 13.3 高斯混合聚类 13.4 模糊C均值聚类 13.5 Canopy聚类 13.6 Canopy-KMeans聚类 13.7 文档主题生成模型聚类 13.8 谱聚类 【第十三章&#xff1a;Sentosa_DSML社…

54.【C语言】 字符函数和字符串函数(strncpy,strncat,strncmp函数)

和strcpy,strcat,strcmp函数对应的是strncpy,strncat,strncmp函数 8.strncpy函数 *简单使用 cplusplus的介绍 点我跳转 翻译: 函数 strncpy char * strncpy ( char * destination, const char * source, size_t num ); 从字符串中复制一些字符 复制源(source)字符串的前num个…

MySQL高阶1890-2020年最后一次登录

目录 题目 准备数据 分析数据 题目 编写解决方案以获取在 2020 年登录过的所有用户的本年度 最后一次 登录时间。结果集 不 包含 2020 年没有登录过的用户。 返回的结果集可以按 任意顺序 排列。 准备数据 Create table If Not Exists Logins (user_id int, time_stamp …

JavaSE--集合总览02:单列集合Collection的体系之一:List

Collection体系的特点 分为 list 和set集合&#xff0c;这篇文章主要讲述List&#xff0c;下篇讲述Set。 简单认识单列集合collection集合的特点 : list集合的特点&#xff1a; 有序 可重复 有索引 set集合的特点&#xff1a;无序 不重复 无索引 其中LinkedHashSet有序 TreeS…

【delphi】正则判断windows完整合法文件名,包括路径

在 Delphi 中&#xff0c;可以使用正则表达式来检查 Windows 文件名称或路径是否合法。合法的文件名和路径要求符合以下几点&#xff1a; 禁止的字符&#xff1a;文件名和路径不能包含以下字符&#xff1a;<, >, :, ", /, \, |, ?, *。文件名不能以空格或点结束。…

关于Spring Cloud Gateway中 Filters的理解

Spring Cloud Gateway中 Filters的理解 Filters Filters拦截器的作用是&#xff0c;对请求进行处理 可以进行流量染色 ⭐增加请求头 例子 spring:cloud:gateway:routes:- id: add_request_header_routeuri: http://localhost:8123predicates:- Path/api/**filters:- AddR…

【图像压缩与重构】基于标准+改进BP神经网络

课题名称&#xff1a;基于标准改进BP神经网络的图像压缩与重构&#xff08;带GUI) 代码获取方式(付费&#xff09;&#xff1a; 相关资料&#xff1a; 1. 代码注释 2.BP神经网络原理文档资料 3.图像压缩原理文档资料 程序实例截图&#xff1a; 1. 基于标准BP神经网络的图…

windows下,用docker部署xinference,为什么老是提示localhost无法访问?

部署xinference有两种方式&#xff1a; 一、本地部署 &#xff08;略&#xff09; 二、使用Docker部署&#xff08;与运行&#xff09; 其中又包括&#xff1a; 1&#xff09;使用CPU的方式&#xff1a;&#xff08;略&#xff09; 1&#xff09;使用GPU的方式&#xff1…

计算机的错误计算(九十九)

摘要 讨论 的计算精度问题。 计算机的错误计算&#xff08;五十五&#xff09;、&#xff08;七十八&#xff09;以及&#xff08;九十六&#xff09;分别列出了 IEEE 754-2019[1]中的一些函数与运算。下面再截图给出其另外几个运算。 另外&#xff0c;计算机的错误计算&…