消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

九、发布确认

在这里插入图片描述

1、发布确认逻辑

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,(单个)如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。(批量)

(异步)confirm 模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。

2、发布确认的策略

开启发布确认的方法发布确认默认是没有开启的,如果要开启,需要调用方法 confirmSelect每当你要想使用发布确认,都需要在 channel 上调用该方法(反复确认)。

1)单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

/*** 单个发送*/
public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMqUtils.getChannel();//队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);//开启发布确认channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//服务端返回 false 或超时时间内未返回,生产者可以消息重发boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");}

测试效果:

在这里插入图片描述

在这里插入图片描述

2)批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

在这里插入图片描述

在这里插入图片描述

/*** 批量*/
public static void publishMessageBatch() throws Exception {Channel channel = RabbitMqUtils.getChannel();//队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);//开启发布确认channel.confirmSelect();//批量确认消息大小int batchSize = 100;//未确认消息个数int outstandingMessageCount = 0;long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {channel.waitForConfirms();outstandingMessageCount = 0;}}//为了确保还有剩余没有确认消息 再次确认if (outstandingMessageCount > 0) {channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}

测试效果:

在这里插入图片描述

在这里插入图片描述

3)异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。

异步确认发布

1、RabbitMQ使用map来记录消息序号和内容

2、代码实现异步确认

(1)选择有两个参数的addConfirmListener函数

在这里插入图片描述

(2)查看源码:知道需要配置的参数
在这里插入图片描述

都是需要实例化接口函数的

在这里插入图片描述

(3)使用匿名函数的方式

在这里插入图片描述

(4)打印消息序号

在这里插入图片描述

在这里插入图片描述

(5)测试效果:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

如何处理异步未确认消息?

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

(1)代码实现:

在这里插入图片描述

①记录下所有要发送的消息,消息的总和

在这里插入图片描述

②删除到已经确认的消息,剩下的就是未确认的消息

在这里插入图片描述

③打印一下未确认的消息都有哪些

在这里插入图片描述

//    异步确认发布private static void publishMessageAsync() throws Exception {Channel channel = RabbitMqUtils.getChannel();//队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);//开启发布确认channel.confirmSelect();/**线程安全有序的一个哈希表,适用于高并发的情况下1.轻松的将序号与消息进行关联2.轻松批量删除条目,只要给到序号3.支持高并发(多线程)**///消息确认成功,回调函数ConcurrentSkipListMap<Long,String> outstandingConfirms =new ConcurrentSkipListMap<>();ConfirmCallback ackCallback = (deliveryTag , multiple)->{ //确认了多少条,multiple:批量或单个if (multiple){  //批量的//2.轻松批量删除条目,只要给到序号ConcurrentNavigableMap<Long,String> confirmed =outstandingConfirms.headMap(deliveryTag); //消息的序号confirmed.clear();}else {  //单个的outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败,回调函数/*** 1.消息的标记* 2.是否为批量确认*/ConfirmCallback nackCallback = (deliveryTag , multiple)->{//3、打印一下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是:"+message+":::未确认的消息:"+deliveryTag);};//准备消息的监听器 ,监听哪些消息成功了, 哪些失败了channel.addConfirmListener( ackCallback, nackCallback);//开始时间long begin = System.currentTimeMillis();//发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//1.轻松的将序号与消息进行关联,将每一条消息都存放hash表里面outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}//结束时间long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");}

测试效果:

在这里插入图片描述

以上 3 种发布确认速度对比 :

  • 单独发布消息:同步等待确认,简单,但吞吐量非常有限。

  • 批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。

  • 异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。

消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略 到此完结,笔者归纳、创作不易,大佬们给个3连再起飞吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/700697.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python基础教程——17个工作必备的Python自动化代码

您是否厌倦了在日常工作中做那些重复性的任务&#xff1f;简单但多功能的Python脚本可以解决您的问题。 引言 Python是一种流行的编程语言&#xff0c;以其简单性和可读性而闻名。因其能够提供大量的库和模块&#xff0c;它成为了自动化各种任务的绝佳选择。让我们进入自动化…

K8s环境搭建

一、基础环境准备 VMware虚拟机&#xff0c;安装三台CentOS&#xff0c;网络环境选择NAT模式&#xff0c;推荐配置如下&#xff08;具体安装步骤省略&#xff0c;网上很多虚拟机安装CentOS7的教程&#xff09; 二、网络环境说明 使用NAT模式&#xff0c;我的IP分别是&#xf…

Promise相关理解记录

一、Promise基础定义相关 Promise是一个构造函数&#xff0c;调用时需要使用new关键字 Promise是解决回调地狱的一种异步解决方式 Promise有三个状态&#xff1a;pending(进行中)、fulfilled(成功)、rejected(失败) Promise的状态只会从 pending→fulfilled 或者 pending→…

300分钟吃透分布式缓存-13讲:如何完整学习MC协议及优化client访问?

协议分析 异常错误响应 接下来&#xff0c;我们来完整学习 Mc 协议。在学习 Mc 协议之前&#xff0c;首先来看看 Mc 处理协议指令&#xff0c;如果发现异常&#xff0c;如何进行异常错误响应的。Mc 在处理所有 client 端指令时&#xff0c;如果遇到错误&#xff0c;就会返回 …

信号系统之线性图像处理

1 卷积 图像卷积的工作原理与一维卷积相同。例如&#xff0c;图像可以被视为脉冲的总和&#xff0c;即缩放和移位的delta函数。同样&#xff0c;线性系统的特征在于它们如何响应脉冲。也就是说&#xff0c;通过它们的脉冲响应。系统的输出图像等于输入图像与系统脉冲响应的卷积…

pclpy 半径滤波实现

pclpy 半径滤波实现 一、算法原理背景 二、代码1.pclpy 官方给与RadiusOutlierRemoval2.手写的半径滤波&#xff08;速度太慢了&#xff0c;用官方的吧&#xff09; 三、结果1.左边为原始点云&#xff0c;右边为半径滤波后点云 四、相关数据 一、算法原理 背景 RadiusOutlier…

Linux——进程概念

目录 冯诺依曼体系结构 操作系统 管理 系统调用和库函数 进程的概念 进程控制块——PCB 查看进程 通过系统调用获取进程标示符 通过系统调用创建进程 进程状态 运行状态-R ​编辑 浅度睡眠状态-S 深度睡眠状态-D 暂停状态-T 死亡状态-X 僵尸状态-Z 僵尸进程…

AD24-PCB的DRC电气性能检查

1、 2、如果报错器件选中&#xff0c;不能跳转时&#xff0c;按下图设置 3、开始出现以下提示时处理 4、到后期&#xff0c;错误改得差不多的时候&#xff1b;出现以下的处理步骤 ①将顶层和底层铜皮选中&#xff0c;移动200mm ②执行以下操作 ③将铜皮在移动回来&#xff0c;进…

STM32_IIC_AT24C02_1_芯片简介即管脚配置

STM32的IIC总线是存在bug&#xff0c;感兴趣的可以上网搜一搜。我们可以使用两个I/O口和软件的方式来模拟stm32的iic总线的控制&#xff0c;所以就不需要使用stm32的硬件控制器了&#xff0c;同理数据手册中的I2C库函数也没有用了。 ROM&#xff08;只读存储器&#xff09;和…

黄仁勋最新专访:机器人基础模型可能即将出现,新一代GPU性能超乎想象

最近&#xff0c;《连线》的记者采访了英伟达CEO黄仁勋。 记者表示&#xff0c;与Jensen Huang交流应该带有警告标签&#xff0c;因为这位Nvidia首席执行官对人工智能的发展方向如此投入&#xff0c;以至于在经过近 90 分钟的热烈交谈后&#xff0c;我&#xff08;指代本采访的…

杰发科技AC7801——SRAM 错误检测纠正

0.概述 7801暂时无错误注入&#xff0c;无法直接进中断看错误情况&#xff0c;具体效果后续看7840的带错误注入的测试情况。 1.简介 2.特性 3.功能 4.调试 可以看到在库文件里面有ecc_sram的库。 在官方GPIO代码里面写了点测试代码 成功打开2bit中断 因为没有错误注入&#x…

Netdata:实时高分辨率监控工具 | 开源日报 No.173

netdata/netdata Stars: 63.9k License: GPL-3.0 Netdata 是一个监控工具&#xff0c;可以实时高分辨率地监视服务器、容器和应用程序。 以下是该项目的主要功能&#xff1a; 收集来自 800 多个整合方案的指标&#xff1a;操作系统指标、容器指标、虚拟机、硬件传感器等。实…

软件常见设计模式

设计模式 设计模式是为了解决在软件开发过程中遇到的某些问题而形成的思想。同一场景有多种设计模式可以应用&#xff0c;不同的模式有各自的优缺点&#xff0c;开发者可以基于自身需求选择合适的设计模式&#xff0c;去解决相应的工程难题。 良好的软件设计和架构&#xff0…

k8s的svc流量通过iptables和ipvs转发到pod的流程解析

文章目录 1. k8s的svc流量转发1.1 service 说明1.2 endpoints说明1.3 pod 说明1.4 svc流量转发的主要工作 2. iptables规则解析2.1 svc涉及的iptables链流程说明2.2 svc涉及的iptables规则实例2.2.1 KUBE-SERVICES规则链2.2.2 KUBE-SVC-EFPSQH5654KMWHJ5规则链2.2.3 KUBE-SEP-L…

css复习

盒模型相关&#xff1a; border&#xff1a;1px solid red (没有顺序) 单元格的border会发生重叠&#xff0c;如果不想要重叠设置 border-collapse:collapse (表示相邻边框合并在一起) padding padding影响盒子大小的好处使用 margin应用&#xff1a; 行内或行内块元素水…

windows Server下Let‘s Encrypt的SSL证书续期

一、手动续期方法&#xff1a; 暂停IIS服务器 --> 暂时关闭防火墙 --> 执行certbot renew --> 打开防火墙 --> 用OpenSSL将证书转换为PFX格式-->pfx文件导入到IIS --> IIS对应网站中绑定新证书 --> 重新启动IIS -->完成 1、暂停IIS服务器 2、暂时关闭…

【LeetCode每日一题】 单调栈的案例 42. 接雨水

这道题是困难&#xff0c;但是可以使用单调栈&#xff0c;非常简洁通俗。 关于单调栈可以参考单调栈总结以及Leetcode案例解读与复盘 42. 接雨水 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 示例 …

浅析SpringBoot框架常见未授权访问漏洞

文章目录 前言Swagger未授权访问RESTful API 设计风格swagger-ui 未授权访问swagger 接口批量探测 Springboot Actuator未授权访问数据利用未授权访问防御手段漏洞自动化检测工具 CVE-2022-22947 RCE漏洞原理分析与复现漏洞自动化利用工具 其他常见未授权访问Druid未授权访问漏…

私域运营-需要认清的事实

一、私域不能单纯依靠微信渠道 误区&#xff1a;很多企业仍停留在如何让用户在微信去分享裂变&#xff0c;然后带动新用户的阶段。 私域的核心在于“开源节流”&#xff0c;就是如何通过更多渠道获取更多客户&#xff0c;并且避免客户的批量流失。 私域讲究的是如何从公域的“…

【读博杂记】:近期日常240223

近期日常 最近莫名其妙&#xff0c;小导悄悄卷起来&#xff0c;说要早上八点半开始打卡&#xff0c;我感觉这是要针对我们在学校住的&#xff0c;想让我们自己妥协来这边租房子住&#xff0c;但我感觉这是在逼我养成规律作息啊&#xff01;现在基本上就是6~7点撤退&#xff0c;…