Spring RabbitMQ那些事(3-消息可靠传输和订阅)

目录

  • 一、序言
  • 二、生产者确保消息发送成功
    • 1、为什么需要Publisher Confirms
    • 2、哪些消息会被确认处理成功
  • 三、消费者保证消息被处理
  • 四、Spring RabbitMQ支持代码示例
    • 1、 application.yml
    • 2、RabbigtMQ配置
    • 3、可靠生产者配置
    • 4、可靠消费者配置
    • 5、测试用例

一、序言

在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。

消息是有可能丢的,比如生产者在发送消息时broker服务挂了,消息没有来得及落盘,这时消息就彻底丢了。

保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。


二、生产者确保消息发送成功

1、为什么需要Publisher Confirms

在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。

在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。

最直接的解决方案是通过事务,但是通过事务有两个问题:

  • 事务阻塞:发布者必须等待Broker处理完每条消息。
  • 事务很重:每次提交都会要求触发fsync(),强制磁盘,这个过程需要花很长的时间。

备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。

在这里插入图片描述

而通过Publisher Confirm机制,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。

2、哪些消息会被确认处理成功

当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:

  • 无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。
  • 非持久化消息在入队时会被确认。
  • 持久化消息当持久化到磁盘或者被消费者消费时会被确认。

三、消费者保证消息被处理

消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。


四、Spring RabbitMQ支持代码示例

1、 application.yml

server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /publisher-returns: truepublisher-confirm-type: correlatedlistener:type: simplesimple:acknowledge-mode: manualconcurrency: 5max-concurrency: 20prefetch: 5template:mandatory: true

备注:

  • 这里一定要设置spring.rabbitmq.publisher-returnstrue,并且设置spring.rabbitmq.publisher-confirm-typecorrelated,同时设置spring.rabbitmq.template.mandatorytrue
  • 上面我们将消费者的确认模式改为了手动确认

2、RabbigtMQ配置

@Configuration
public class RabbitReliableTransportConfig {/*** RabbitTemplate消息转换器配置,自动将对象转换为json字符串** @return*/@Beanpublic MessageConverter jackson2JsonMessageConverter() {Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());return messageConverter;}@Beanpublic Queue reliableQueue() {return QueueBuilder.durable("reliable-queue").build();}
}

3、可靠生产者配置

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqReliableProducer {private final RabbitTemplate rabbitTemplate;public void sendReliableMsg(String body) {// 发送可靠消息ReliableMsgDTO reliableMsgDTO = ReliableMsgDTO.builder().body(body).build();CorrelationData correlationData = new CorrelationData();rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);// 发送确认逻辑CompletableFuture<Confirm> future = correlationData.getFuture().completable();future.whenComplete((confirm, throwable) -> {if (confirm.isAck()) {log.info("消息已经被成功发送, 消息内容:{}", JSON.toJSONString(reliableMsgDTO));return;}log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);// 5秒后再发送LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);});}
}

4、可靠消费者配置

@Slf4j
@Component
public class RabbitMQReliableConsumer {@RabbitListener(queues = "reliable-queue")public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {channel.basicAck(tag, false);// channel.basicNack(tag, false, false);log.info("Message received from queue, message body: {}", JSON.toJSONString(reliableMsgDTO));}
}

备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。

5、测试用例

测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。

2024-01-20 18:13:11.399  INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer       : 消息已经被成功发送, 消息内容:{"body":"hello"}
2024-01-20 18:13:11.399  INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer       : Message received from queue, message body: {"body":"hello"}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/643970.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

外包干了3年,跑路了。。。

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号【互联网杂货铺】&#xff0c;回复 1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 故事的开始 最近这几天有点忧伤&#xff0c;因为带我的师傅要跑…

Vue3 Suspense 优雅地处理异步组件加载

✨ 专栏介绍 在当今Web开发领域中&#xff0c;构建交互性强、可复用且易于维护的用户界面是至关重要的。而Vue.js作为一款现代化且流行的JavaScript框架&#xff0c;正是为了满足这些需求而诞生。它采用了MVVM架构模式&#xff0c;并通过数据驱动和组件化的方式&#xff0c;使…

力扣 | 73. 矩阵置零

public class Problem_73_MatrixToZero {public void setZeros(int[][] matrix) {int m matrix.length;int n matrix[0].length;boolean flagRow false;boolean flagCol false;//先记录第一行本身是否包含0for (int i 0; i < n; i) {if (matrix[0][i] 0) flagRow tru…

使用Burp Collaborator验证无回显的RCE漏洞

使用Burp Collaborator验证无回显的RCE漏洞 1.概述2.Collaborator演示3.通过DNS查找外带命令执行结果1.概述 当应用程序容易受到命令注入攻击,但命令是异步执行时,就会发生异步操作系统命令注入漏洞。这意味着它对应用程序的响应没有明显影响 Burp Collaborator 可以帮助您…

Jenkins如何重置build number?

Jenkins如何重置build number&#xff1f; Jenkins调试过程中, 难免会产生很多无用的编译号&#xff0c;但需要清除无用build数据的时候&#xff0c;可以使用Script Consle来达到目的. 解决方案 直接说答案。 在脚本中填写如下语句&#xff1a; def jobName "xxx-Build&…

leetcode—— 腐烂的橘子

腐烂的橘子 在给定的 m x n 网格 grid 中&#xff0c;每个单元格可以有以下三个值之一&#xff1a; 值 0 代表空单元格&#xff1b;值 1 代表新鲜橘子&#xff1b;值 2 代表腐烂的橘子。 每分钟&#xff0c;腐烂的橘子 周围 4 个方向上相邻 的新鲜橘子都会腐烂。 返回 直到…

【Linux C | 进程】Linux 进程间通信的10种方式(1)

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

log4j2配置文件命名及优先级

log4j 2.x版本不再支持像1.x中的.properties后缀的文件配置方式&#xff0c;2.x版本配置文件后缀名只能为".xml",“.json"或者”.jsn"。 命名规则 默认配置文件名&#xff1a; log4j2.xml 或 log4j2.json 测试或特定环境配置文件名&#xff1a;可以以 -t…

Kafka(二)原理详解

一 、kafka核心总控制器&#xff08;Controller&#xff09; 在Kafka集群中会有一个或者多个broker&#xff0c;其中有一个broker会被选举为控制器&#xff08;Kafka Controller&#xff09;&#xff0c;它负责管理整个集群中所有分区和副本的状态。 作用&#xff1a;leader副…

【wayn商城】本地开发指南

这篇文章给大家带来我自己写的开源项目【wayn商城】的本地开发指南&#xff0c;帮助各位朋友在本地快速运行【wayn商城】&#xff0c;避免踩坑&#xff0c;减少不必要的精力在软件下载安装上。 &#x1f525;waynboot-mall 是一套全部开源的 H5 商城项目&#xff0c;实现了一套…

Jedis(一)与Redis的关系

一、Jedis介绍&#xff1a; 1、背景&#xff1a; Jedis是基于Java语言的Redis的客户端&#xff0c;Jedis Java Redis。Redis不仅可以使用命令来操作&#xff0c;现在基本上主流的语言都有API支持&#xff0c;比如Java、C#、C、PHP、Node.js、Go等。在官方网站里有一些Java的…

【系统DFX】如何诊断占用过多 CPU、内存、IO 等的神秘进程?

热门面试问题&#xff1a;如何诊断占用过多 CPU、内存、IO 等的神秘进程&#xff1f; 下图展示了 Linux 系统中有用的工具。 &#x1f539;’vmstat’ - 报告有关进程、内存、分页、块 IO、陷阱和 CPU 活动的信息。&#x1f539;’iostat’ - 报告系统的 CPU 和输入/输出统计信…

【制作100个unity游戏之23】实现类似七日杀、森林一样的生存游戏2(附项目源码)

本节最终效果演示 文章目录 本节最终效果演示系列目录前言添加小动物模型动画动物AI脚本效果 添加石头石头模型拾取物品效果 源码完结 系列目录 【制作100个unity游戏之23】实现类似七日杀、森林一样的生存游戏1&#xff08;附项目源码&#xff09; 【制作100个unity游戏之23】…

ubuntu22.04安装filebeat报错解决

1、查看报错 journalctl -u filebeat 或者 filebeat -c /etc/filebeat/filebeat.yml找到报错信息 runtime/cgo: pthread_create failed: Operation not permitted 2、解决报错 在filebeat.yml配置文件添加如下配置&#xff0c;重启filebeat seccomp:default_action: allow…

Prometheus 薪资翻倍的监控系统?

1. 介绍与架构 Prometheus是一个开源的系统监控和警报工具包&#xff0c;用于收集和存储时间序列数据&#xff0c;包括指标信息、记录时间戳以及可选的键值对标签。许多公司使用Prometheus监控K8s集群。 2. 合适与不合适场景 合适场景 Prometheus适用于记录各种数字时间序列…

构筑双品牌矩阵背后,广汽埃安讲出能源生态闭环的“新故事”

“一路繁花”用来形容广汽埃安的2023年并不为过。 2023年12月28日&#xff0c;埃安达成累计产销百万辆的目标&#xff0c;成为全球最快破百万的纯电品牌、新能源品牌以及汽车品牌&#xff1b;全年累计销量超48万辆&#xff0c;同比增长77%。 值得一提的是&#xff0c;2023年以…

C#中ArrayList运行机制及其涉及的装箱拆箱

C#中ArrayList运行机制及其涉及的装箱拆箱 1.1 基本用法1.1.1 属性1.1.2 方法 1.2 内部实现1.3 装箱1.4 拆箱1.5 object对象的相等性比较1.6 总结1.7 其他简单结构类 1.1 基本用法 命名空间&#xff1a; using System.Collections; 1.1.1 属性 Capacity&#xff1a;获取或设…

江苏省力学学会副理事长钱向东、邬萱一行来访天洑软件

近日&#xff0c;江苏省力学学会副理事长钱向东、邬萱率调研组一行来访天洑软件。 会上&#xff0c;双方就平台建设、成果转化、产品研发、品牌宣传、人才培养等方面开展了广泛深入的交流。江苏省力学学会副理事长钱向东、邬萱&#xff0c;分享了学会的平台优势和资源优势以及…

IP地址和端口

1. IP地址&#xff1a; 简介&#xff1a; IP 协议是为计算机网络相互连接进行通信而设计的协议。在因特网中&#xff0c;它是能使连接到网上的所 有计算机网络实现相互通信的一套规则&#xff0c;规定了计算机在因特网上进行通信时应当遵守的规则。任 何厂家生产的计算机系统…

ubuntu20安装网络调试助手遇到缺少qt4相关库的问题

最近需要做套接字通讯的工作&#xff0c;最好是有一个网络调试软件能够接受或者发送套接字&#xff0c;测试代码能够正常通讯。windows下有很多&#xff0c;但是linux下比较少&#xff0c;使用广泛的是下面这一款。 1、安装 首先从网盘&#xff08;链接: https://pan.baidu.c…