RabbitMQ七种工作模式之 RPC通信模式, 发布确认模式

文章目录

  • 六. RPC(RPC通信模式)
    • 客户端
    • 服务端
  • 七. Publisher Confirms(发布确认模式)
    • 1. Publishing Messages Individually(单独确认)
    • 2. Publishing Messages in Batches(批量确认)
    • 3. Handling Publisher Confirms Asynchronously(异步确认)

六. RPC(RPC通信模式)

在这里插入图片描述
在这里插入图片描述

  1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, ⽤于接收服务端的响应.
  2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应

公共代码:

 	public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

客户端

客户端需要完成两件事, 发送请求, 接收响应
发送请求:

//发送请求://声明队列channel.queueDeclare(Common.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Common.RPC_RESPONSE_QUEUE, true, false, false, null);//定义回调队列String replyQueueName = Common.RPC_RESPONSE_QUEUE;//本次请求的唯一标识String corrId = UUID.randomUUID().toString();//生成发送消息的属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();//通过内置交换机, 发送消息String message = "hello, rpc......";channel.basicPublish("", Common.RPC_REQUEST_QUEUE, props, message.getBytes(StandardCharsets.UTF_8));

接收响应:

 //接收响应://使用阻塞队列来存储回调结果, 避免了客户端反复访问队列final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));//如果唯一标识正确, 放在阻塞队列中if(properties.getCorrelationId().equals(corrId)){response.offer(new String(body));}}};channel.basicConsume(replyQueueName, true, consumer);//获取回调的结果String result = response.take();System.out.println("[RPCClient] Result: " + result);

服务端

在这里插入图片描述

		//设置同时最多只能获取一个消息channel.basicQos(1);//接收消息, 并对消息进行应答DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String message = new String(body);String response = "接收到消息 request: " + message;channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));//对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Common.RPC_REQUEST_QUEUE, false, consumer);//自动应答设置成false, 在成功回调后, 再进行手动应答

在这里插入图片描述

七. Publisher Confirms(发布确认模式)

在这里插入图片描述

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  1. ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者(包含消息的唯⼀ID),表明消息已经送达.
    通过Publisher Confirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收, 从⽽避免消息丢失的问题.
    适⽤场景: 对数据安全性要求较⾼的场景. ⽐如⾦融交易, 订单处理

⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参数, 表⽰到这个序号之前的所有消息都已经得到了处理.
在这里插入图片描述

使⽤发送确认机制, 必须要信道设置成confirm(确认)模式

发布确认有3种策略:

1. Publishing Messages Individually(单独确认)

public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()){//创建channelChannel channel = connection.createChannel();//开启信道确认模式channel.confirmSelect();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();for(int i = 0; i < MESSAGE_COUNT; i++){String body = "消息" + i;channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE1, null, body.getBytes(StandardCharsets.UTF_8));//等待确认消息, 只要消息被确认, 这个方法就会被返回//如果超时过期, 则抛出TimeoutException, 如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOExceptionchannel.waitForConfirmsOrDie();}Long end = System.currentTimeMillis(5000);System.out.printf("Published %d message individually in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述

2. Publishing Messages in Batches(批量确认)

    private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()){Channel channel = connection.createChannel();channel.confirmSelect();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//批量个数int batchSize = 100;int outstandingMessageCount = 0;long start = System.currentTimeMillis();for(int i = 0; i < MESSAGE_COUNT; i++){String body = "消息" + i;channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE2, null, body.getBytes(StandardCharsets.UTF_8));outstandingMessageCount++;if(outstandingMessageCount == batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if(outstandingMessageCount > 0){channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("Published %d message batch in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述
相⽐于单独确认策略, 批量确认极⼤地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量, 当消息经常丢失时,批量确认的性能应该是不升反降的

3. Handling Publisher Confirms Asynchronously(异步确认)

异步confirm⽅法的编程实现最为复杂. Channel 接⼝提供了⼀个⽅法addConfirmListener. 这个⽅法可以添加ConfirmListener 回调接⼝.
ConfirmListener 接⼝中包含两个⽅法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表⽰发送消息的序号. multiple 表⽰是否批量确认.
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.

  1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表⽰⼩于等于当前序号deliveryTag的消息都收到了, 则清除对应集合
  2. 当收到nack时, 处理逻辑类似, 不过需要结合具体的业务情况, 进⾏消息重发等操作
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException{try(Connection connection = createConnection()){Channel channel = connection.createChannel();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);channel.confirmSelect();//有序集合, 元素按照自然顺序进行排序, 存储未confirm消息序号SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple){//批量confirmSet.headSet(deliveryTag + 1).clear();}else{confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if(multiple){//批量confirmSet.headSet(deliveryTag + 1).clear();}else{confirmSet.remove(deliveryTag);}//如果处理失败, 需要有消息重发的环节, 此处省略}});long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE3, null, message.getBytes(StandardCharsets.UTF_8));confirmSet.add(nextPublishSeqNo);}while(!confirmSet.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("Published %d message ConfirmsAsynchronously in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

模型训练数据-MinerU一款Pdf转Markdown软件

模型训练数据-MinerU一款Pdf转Markdown软件-说明 简介&#xff1a; MinerU是什么 MinerU是上海人工智能实验室OpenDataLab团队推出的开源智能数据提取工具&#xff0c;专注于复杂PDF文档的高效解析与提取。MinerU能将包含图片、公式、表格等元素的多模态PDF文档转化为易于分析…

STM32F103 PWM配置

在《STM32F103定时器配置》中我们介绍了PWM的产生原理&#xff0c;本节介绍介绍如何编码实现PWM的输出。 一、PWM相关寄存器 TIMx如果要产生PWM&#xff0c;除了我们上一节提到的如下寄存器&#xff1a; 控制寄存器(TIMx_CR1)&#xff1b;DMA/中断使能寄存器(TIMx_DIER)&#x…

Flink Python作业快速入门

Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心 import argparse # 用于处理命令行参数和选项&#xff0c;使程序能够接收用户通过命令行传递的参数 import logging import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.data…

三菱CNC数采超详细,资料全备教程,后续更新发那科数采教程

三菱数采详细教程 文章目录 三菱数采详细教程一、介绍1.背景2.需要掌握知识3.需要资料①三菱SDK包&#xff1a;A2②三菱com接口文档③C#代码&#xff1a;④VStudio⑤资料存放网盘 二、程序运行1.调试设备①条件②命令 2.运行软件①打开软件②运行程序 三、数据采集1.代码了解2.…

常见限流算法详细解析

常见限流算法详细解析 分布式系统中&#xff0c;由于接口API无法控制上游调用方的行为&#xff0c;因此当瞬时请求量突增时&#xff0c;会导致服务器占用过多资源&#xff0c;发生响应速度降低、超时、乃至宕机&#xff0c;甚至引发雪崩造成整个系统不可用。 限流&#xff0c;…

java+ssm+mysql高校学籍管理系统

项目介绍&#xff1a; 使用javassmmysql开发的高校学籍管理系统&#xff0c;系统包含超级管理员&#xff0c;系统管理员、教师、学生角色&#xff0c;功能如下&#xff1a; 超级管理员&#xff1a;管理员管理&#xff08;可以新增管理员&#xff09;&#xff1b;专业管理&…

(5)JS-Clipper2之PolyNode

1. 描述 PolyNodes是被封装在PolyTree的容器中&#xff0c;同时提供了一个数据结构来代表由Excute()方法返回的多边形轮廓中的父子关系。 一个PolyNode对象代表一个多边形&#xff1b;它的“IsHole”属性表明它是一个“外轮廓”还是一个“内孔”&#xff0c;PolyNodes可能包含…

Java项目实战II基于微信小程序的无中介租房系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。 一、前言 随着城市化进程的加速&#xff0c;租房市场日益繁荣&a…

MATLAB稀疏感知图像和体数据恢复的系统对象研究

稀疏感知图像和体数据恢复是一种用于恢复损坏、噪声或不完整的图像和体数据的技术。它利用了信号的稀疏性&#xff0c;即信号在某种基础下可以用较少的非零系数表示&#xff0c;从而实现高质量的恢复。 在进行稀疏感知图像和体数据恢复的研究时&#xff0c;需要定义一些系统对…

安卓调试环境搭建

前言 前段时间电脑重装了系统&#xff0c;最近准备调试一个apk&#xff0c;没想到装环境的过程并不顺利&#xff0c;很让人火大&#xff0c;于是记录一下。 反编译工具下载 下载apktool.bat和apktool.jar 官网地址&#xff1a;https://ibotpeaches.github.io/Apktool/install…

【工具】音频文件格式转换工具

找开源资源、下载测试不同库的效果&#xff0c;然后找音频、下载音频、编写代码、测试转换、流程通畅。写一个工具花的时间越来越多了&#xff01;这个 5 天 这个工具是一个音频文件格式转换工具&#xff0c;支持对 mp3.aac.wav.caf.flac.ircam.mp2.mpeg.oga.opus.pcm.ra.spx.…

在ARM Linux应用层下使用SPI驱动WS2812

文章目录 1、前言2、结果展示3、接线4、SPI驱动WS2812原理4.1、0码要发送的字节4.2、1码要发送的字节4.3、SPI时钟频率 5、点亮RGB5.1、亮绿灯5.2、亮红灯5.3、亮蓝灯5.4、完整程序 6、RGB呼吸灯7、总结 1、前言 事情是这样的&#xff0c;前段时间&#xff0c;写了一个基于RK3…

BERT:用于语言理解的深度双向 Transformer 的预训练。

文章目录 0. 摘要1. 介绍2. 相关工作2.1 无监督的基于特征的方法2.3 无监督微调方法2.3 从受监督数据中迁移学习 3. BERT3.1 预训练 BERT3.2 微调 BERT 4. 实验4.1 GLUE4.2 SQuAD v1.14.3 SQuAD v2.04.4 SWAG 5. 消融研究5.1 预训练任务的影响5.2 模型大小的影响5.3 使用 BERT …

在算网云平台云端在线部署stable diffusion (0基础小白超详细教程)

Stable Diffusion无疑是AIGC领域中的AI绘画利器&#xff0c;具有以下显著优势&#xff1a; 1、开源性质&#xff0c;支持本地部署 2、能够实现对图像生成过程的精确控制 虽然SD在使用上有很多的有点&#xff0c;但缺点也是不言而喻的&#xff0c;由于AI绘画的整个过程以及现…

设计模式——Chain(责任链)设计模式

摘要 责任链设计模式是一种行为设计模式&#xff0c;通过链式调用将请求逐一传递给一系列处理器&#xff0c;直到某个处理器处理了请求或所有处理器都未能处理。它解耦了请求的发送者和接收者&#xff0c;允许动态地将请求处理职责分配给多个对象&#xff0c;支持请求的灵活传…

macOS 15.1.1 (24B2091) 系统中快捷键符号及其代表的按键的对照表

以下是 macOS 15.1.1 (24B2091) 系统中快捷键符号及其代表的按键的对照表&#xff1a; 符号按键名称描述⌘Command (Cmd)常用的功能键&#xff0c;用于执行大多数快捷操作。⌥Option (Alt)Option 键&#xff0c;常用于辅助操作和特殊字符输入。⇧ShiftShift 键&#xff0c;常用…

el-table一键选择全部行,切换分页后无法勾选

el-table一键全选&#xff0c;分页的完美支持 问题背景尝试解决存在问题问题分析 解决方案改进思路如下具体代码实现如下 问题背景 现在有个需求&#xff0c;一个表格有若干条数据(假设数量大于20&#xff0c;每页10条&#xff0c;保证有2个以上分页即可)。 现在需要在表格上方…

【55 Pandas+Pyecharts | 实习僧网Python岗位招聘数据分析可视化】

文章目录 &#x1f3f3;️‍&#x1f308; 1. 导入模块&#x1f3f3;️‍&#x1f308; 2. Pandas数据处理2.1 读取数据2.2 查看数据信息2.3 去除重复数据2.4 调整部分城市名称 &#x1f3f3;️‍&#x1f308; 3. Pyecharts数据可视化3.1 招聘数量前20岗位3.2 各城市招聘数量3…

【赵渝强老师】PostgreSQL的控制文件

PostgreSQL数据库的物理存储结构主要是指硬盘上存储的文件&#xff0c;包括&#xff1a;数据文件、日志文件、参数文件、控制文件、WAL预写日志文件等等。 下面重点讨论一下PostgreSQL的控制文件。 视频讲解如下 【赵渝强老师】PostgreSQL的控制文件 控制文件记录了数据库运行…

在做题中学习(79):最小K个数

解法&#xff1a;快速选择算法 说明&#xff1a;堆排序也是经典解决问题的算法&#xff0c;但时间复杂度为&#xff1a;O(NlogK)&#xff0c;K为k个元素 而将要介绍的快速选择算法的时间复杂度为: O(N) 先看我的前两篇文章&#xff0c;分别学习&#xff1a;数组分三块&#…