【初始RabbitMQ】发布订阅的实现

发布确认原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队 列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传 给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信 道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调 方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消 息,生产者应用程序同样可以在回调方法中处理该 nack 消息

简单的来说发布确认的作用就是防止数据丢失

发布确认的策略 

开启发布确认的方法

发布确认默认是没有开启的,如果要开启需要调用方法confirmSelect,每当你要想使用发布确认,都需要在channel上调用该方法

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能够继续发布,waitForConfirmsOrDie(long) 这个方法发只有在消息被确认的时候才能够返回,如果在指定时间范围内这个消息没有被确认就会抛出异常

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了

public static void publicMessageIndividually() throws IOException, InterruptedException {Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();/**生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化(磁盘)默认情况时在内存* 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许* 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数 延迟消息等*/channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));//服务器返回false或超时内未返回,生产者可以消息重新发送boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发布成功!");}}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");}

批量确认发布

上面的方式发布时是很慢的,与单个等待确认相比,先发布一批消息让然后一起确认可以极大地提高吞吐量。缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布

    public static void publishMessageBatch() throws IOException, InterruptedException {Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//批量确认消息大小int batchSize = 100;//未确定消息int outstandingMessageCount = 0;long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));outstandingMessageCount++;if(outstandingMessageCount==batchSize){channel.waitForConfirms();outstandingMessageCount=0;}}if(outstandingMessageCount>0){channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");}

异步确认发布

异步确认发布通常指的是在消息发布过程中,消息的生产者(或发送方)在发送消息后,不需要等待消息被消费者(或接收方)确认接收,而是可以继续进行其他操作。这种异步确认的机制可以提高系统的并发性和效率

在具体实现上,异步确认发布通常涉及到以下几个步骤:

  1. 消息生产者将消息发送到消息队列或中间件中
  2. 消息队列或中间件在接收到消息后,会为每条消息分配一个唯一的序列号或ID
  3. 消息队列或中间件将消息投递到相应的消费者队列中
  4. 消费者从自己的消费者队列中拉取消息并进行处理
  5. 消费者在处理完消息后,会向消息队列或中间件发送一个确认消息,表示该消息已经被成功处理
  6. 消息队列或中间件在接收到消费者的确认消息后,会将该确认消息与原始消息的序列号或ID进行匹配,确认该消息已经被成功处理
  7. 消息队列或中间件会向消息生产者发送一个异步确认消息,通知消息生产者该消息已经被成功处理

 

public static void publishMessageAsync() throws IOException {Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确定channel.confirmSelect();/*** 线程安全有序的一个哈希表,适合用于高并发情况* 1、轻松将序号和消息进行关联* 2、轻松批量删除条目 只需要给到序列号* 3、支持高并发访问*/ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();//开始时间long begin = System.currentTimeMillis();ConfirmCallback ackCallback = (deliveryTag,multiple)->{//是否是批量处理if(multiple){//返回的是小于等于(如果是小于等于需要在参数中加true)当前序列号的未确认消息 是一个 mapConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag,true);confirmed.clear();}else{//只清除当前序列号的消息outstandingConfirms.remove(deliveryTag);}System.out.println("确认消息"+deliveryTag);};ConfirmCallback nackCallback = (deliveryTag,multiple)->{String message = outstandingConfirms.get(deliveryTag);System.out.println("发布的消息"+message+"未被确认,序列号"+deliveryTag);// System.out.println("未确认消息"+deliveryTag);};//准备消息的监听器 监听哪些消息成功 哪些消息失效channel.addConfirmListener(ackCallback,nackCallback);//批量发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息"+i;/*** channel.getNextPublishSeqNo()获取下一个消息的序列号* 通过序列号与消息体进行一个关联* 全部都是未确认的消息体*/outstandingConfirms.put(channel.getNextPublishSeqNo(),message);channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");}

以上 3 种发布确认速度对比

单独发布消息:同步等待确认,简单,但吞吐量非常有限

批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条 消息出现了问题

异步处理: 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

花费时间对比

    public static void main(String[] args) throws IOException, InterruptedException {//单个确认发布ConfirmMessage.publicMessageIndividually();//发布1000个单独确认消息,耗时47298ms//批量确认发布ConfirmMessage.publishMessageBatch();//发布1000个批量确认消息,耗时866ms//异步确认发布ConfirmMessage.publishMessageAsync();//发布1000个批量确认消息,耗时104ms}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/700725.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux之ACL权限chmod命令

一. chmod命令 chmod命令来自英文词组change mode的缩写&#xff0c;其功能是改变文件或目录权限的命令。默认只有文件的所有者和管理员可以设置文件权限&#xff0c;普通用户只能管理自己文件的权限属性。 设置权限时可以使用数字法&#xff0c;亦可使用字母表达式&#xff0…

GO-ICP的使用(一)

一、代码下载以、修改以及使用 下载&#xff1a; 链接&#xff1a;yangjiaolong/Go-ICP: Implementation of the Go-ICP algorithm for globally optimal 3D pointset registration (github.com) 解压之后 &#xff1a; 首先visual studio项目&#xff0c;配置好PCL环境&…

更简单地介绍 CUDA

这篇文章是对 CUDA 的超级简单介绍&#xff0c;CUDA 是 NVIDIA 流行的并行计算平台和编程模型。我之前在2013年写过一篇文章《CUDA简单介绍》&#xff0c;多年来一直很受欢迎。但 CUDA 编程变得更加容易&#xff0c;GPU 也变得更快&#xff0c;所以是时候进行更新&#xff08;甚…

SQL-Labs46关order by注入姿势

君衍. 四十六关 ORDER BY数字型注入1、源码分析2、rand()盲注3、if语句盲注4、时间盲注5、报错注入6、Limit注入7、盲注脚本 四十六关 ORDER BY数字型注入 请求方式注入类型拼接方式GET报错、布尔盲注、延时盲注ORDER BY $id 我们直接可以从界面中得知传参的参数为SORT&#x…

Yolo v9 “Silence”模块结构及作用!

论文链接&#xff1a;&#x1f47f; YOLOv9: Learning What You Want to Learn Using Programmable Gradient Information 代码链接&#xff1a;&#x1f47f; https://github.com/WongKinYiu/yolov9/tree/main Silence代码 class Silence(nn.Module):def __init__(self):supe…

vue2和vue3对比(语法层面)

阅读文章你将收获&#xff1a; 1 了解不使用组件化工具时&#xff0c;vue在html是如何使用的 2 知道vue2的生命周期函数有哪些 3 知道如何在组件化开发中使用vue 4 大致了解了vue2和vue3在使用上什么不同 最后&#xff1a;vue2和vue3除了下面我列出的有差异化的地方&…

day41打卡

day41打卡 46. 携带研究材料&#xff08;第六期模拟笔试&#xff09; 状态表示 ​ 二维&#xff1a;dp[i] [j] 表示从下标为[0-i]的物品里任意取&#xff0c;放进容量为j的背包&#xff0c;价值总和最大是多少。 一维&#xff1a; ​ dp[j]表示&#xff1a;容量为j的背包&a…

模型 HBG(品牌增长)

系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_总纲目录。品牌增长法。 1 HBG(品牌增长)模型的应用 1.1 江小白使用HBG模型提高品牌知名度和销售额 选择受众市场&#xff1a;江小白的目标客户是年轻人&#xff0c;他们喜欢简单、时尚的产品。因此&#xff0c;江…

数据结构D4作业

1.实现单向循环链表的功能 loop.c #include "loop.h" loop_p create_loop() { loop_p H(loop_p)malloc(sizeof(loop)); if(HNULL) { printf("创建失败\n"); return NULL; } H->len0; H->nextH; ret…

基于ElementUI封装省市区四级联动下拉选择

基于ElementUI封装的省市区下拉级联选择 效果 数据 最新省市区JSON数据获取&#xff1a;https://xiangyuecn.github.io/AreaCity-JsSpider-StatsGov/ 参数说明 参数说明inputNumShow下拉框的数量&#xff0c;最多4个defaultAddress默认显示省市区 例&#xff1a;[‘安徽’, …

【C++初阶】--类和对象(下)

目录 一.const成员 1.权限放大问题 2.权限的缩小 二.再谈构造函数 1.构造函数体赋值 2.初始化列表 (1)概念 (2)使用 ①在对象实例化过程中&#xff0c;成员变量先依次进行初始化 ②再进行函数体内二次赋值 3.explicit关键字 (1)C为什么要存在自动隐式类型转换…

算法打卡day1|数组篇|Leetcode 704.二分查找、27.移除元素

数组理论基础 数组是存放在连续内存空间上的相同类型数据的集合&#xff0c;可以方便的通过下标索引的方式获取到下标下对应的数据。 1.数组下标都是从0开始的。 2.数组内存空间的地址是连续的。 正是因为数组的在内存空间的地址是连续的&#xff0c;所以我们在删除或者增添…

【深度学习笔记】3_5 图像分类数据集fashion-mnist

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 3.5 图像分类数据集&#xff08;Fashion-MNIST&#xff09; 在介绍softmax回归的实现前我们先引入一个多类图像分类数据集。它将在后面的章节中被多次使用&#xff0c…

《Docker 简易速速上手小册》第1章 Docker 基础入门(2024 最新版)

文章目录 1.1 Docker 简介与历史1.1.1 Docker 基础知识1.1.2 重点案例&#xff1a;Python Web 应用的 Docker 化1.1.3 拓展案例 1&#xff1a;使用 Docker 进行 Python 数据分析1.1.4 拓展案例 2&#xff1a;Docker 中的 Python 机器学习环境 1.2 安装与配置 Docker1.2.1 重点基…

消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

九、发布确认 1、发布确认逻辑 生产者将信道设置成 confirm 模式&#xff0c;一旦信道进入 confirm 模式&#xff0c;所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始)&#xff0c;一旦消息被投递到所有匹配的队列之后&#xff0c;broker 就会发送一个确认给…

Python基础教程——17个工作必备的Python自动化代码

您是否厌倦了在日常工作中做那些重复性的任务&#xff1f;简单但多功能的Python脚本可以解决您的问题。 引言 Python是一种流行的编程语言&#xff0c;以其简单性和可读性而闻名。因其能够提供大量的库和模块&#xff0c;它成为了自动化各种任务的绝佳选择。让我们进入自动化…

K8s环境搭建

一、基础环境准备 VMware虚拟机&#xff0c;安装三台CentOS&#xff0c;网络环境选择NAT模式&#xff0c;推荐配置如下&#xff08;具体安装步骤省略&#xff0c;网上很多虚拟机安装CentOS7的教程&#xff09; 二、网络环境说明 使用NAT模式&#xff0c;我的IP分别是&#xf…

Promise相关理解记录

一、Promise基础定义相关 Promise是一个构造函数&#xff0c;调用时需要使用new关键字 Promise是解决回调地狱的一种异步解决方式 Promise有三个状态&#xff1a;pending(进行中)、fulfilled(成功)、rejected(失败) Promise的状态只会从 pending→fulfilled 或者 pending→…

300分钟吃透分布式缓存-13讲:如何完整学习MC协议及优化client访问?

协议分析 异常错误响应 接下来&#xff0c;我们来完整学习 Mc 协议。在学习 Mc 协议之前&#xff0c;首先来看看 Mc 处理协议指令&#xff0c;如果发现异常&#xff0c;如何进行异常错误响应的。Mc 在处理所有 client 端指令时&#xff0c;如果遇到错误&#xff0c;就会返回 …

信号系统之线性图像处理

1 卷积 图像卷积的工作原理与一维卷积相同。例如&#xff0c;图像可以被视为脉冲的总和&#xff0c;即缩放和移位的delta函数。同样&#xff0c;线性系统的特征在于它们如何响应脉冲。也就是说&#xff0c;通过它们的脉冲响应。系统的输出图像等于输入图像与系统脉冲响应的卷积…