RabbitMQ七种工作模式之 RPC通信模式, 发布确认模式

文章目录

  • 六. RPC(RPC通信模式)
    • 客户端
    • 服务端
  • 七. Publisher Confirms(发布确认模式)
    • 1. Publishing Messages Individually(单独确认)
    • 2. Publishing Messages in Batches(批量确认)
    • 3. Handling Publisher Confirms Asynchronously(异步确认)

六. RPC(RPC通信模式)

在这里插入图片描述
在这里插入图片描述

  1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, ⽤于接收服务端的响应.
  2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应

公共代码:

 	public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

客户端

客户端需要完成两件事, 发送请求, 接收响应
发送请求:

//发送请求://声明队列channel.queueDeclare(Common.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Common.RPC_RESPONSE_QUEUE, true, false, false, null);//定义回调队列String replyQueueName = Common.RPC_RESPONSE_QUEUE;//本次请求的唯一标识String corrId = UUID.randomUUID().toString();//生成发送消息的属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();//通过内置交换机, 发送消息String message = "hello, rpc......";channel.basicPublish("", Common.RPC_REQUEST_QUEUE, props, message.getBytes(StandardCharsets.UTF_8));

接收响应:

 //接收响应://使用阻塞队列来存储回调结果, 避免了客户端反复访问队列final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));//如果唯一标识正确, 放在阻塞队列中if(properties.getCorrelationId().equals(corrId)){response.offer(new String(body));}}};channel.basicConsume(replyQueueName, true, consumer);//获取回调的结果String result = response.take();System.out.println("[RPCClient] Result: " + result);

服务端

在这里插入图片描述

		//设置同时最多只能获取一个消息channel.basicQos(1);//接收消息, 并对消息进行应答DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String message = new String(body);String response = "接收到消息 request: " + message;channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));//对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Common.RPC_REQUEST_QUEUE, false, consumer);//自动应答设置成false, 在成功回调后, 再进行手动应答

在这里插入图片描述

七. Publisher Confirms(发布确认模式)

在这里插入图片描述

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  1. ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者(包含消息的唯⼀ID),表明消息已经送达.
    通过Publisher Confirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收, 从⽽避免消息丢失的问题.
    适⽤场景: 对数据安全性要求较⾼的场景. ⽐如⾦融交易, 订单处理

⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参数, 表⽰到这个序号之前的所有消息都已经得到了处理.
在这里插入图片描述

使⽤发送确认机制, 必须要信道设置成confirm(确认)模式

发布确认有3种策略:

1. Publishing Messages Individually(单独确认)

public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()){//创建channelChannel channel = connection.createChannel();//开启信道确认模式channel.confirmSelect();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();for(int i = 0; i < MESSAGE_COUNT; i++){String body = "消息" + i;channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE1, null, body.getBytes(StandardCharsets.UTF_8));//等待确认消息, 只要消息被确认, 这个方法就会被返回//如果超时过期, 则抛出TimeoutException, 如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOExceptionchannel.waitForConfirmsOrDie();}Long end = System.currentTimeMillis(5000);System.out.printf("Published %d message individually in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述

2. Publishing Messages in Batches(批量确认)

    private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()){Channel channel = connection.createChannel();channel.confirmSelect();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//批量个数int batchSize = 100;int outstandingMessageCount = 0;long start = System.currentTimeMillis();for(int i = 0; i < MESSAGE_COUNT; i++){String body = "消息" + i;channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE2, null, body.getBytes(StandardCharsets.UTF_8));outstandingMessageCount++;if(outstandingMessageCount == batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if(outstandingMessageCount > 0){channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("Published %d message batch in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述
相⽐于单独确认策略, 批量确认极⼤地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量, 当消息经常丢失时,批量确认的性能应该是不升反降的

3. Handling Publisher Confirms Asynchronously(异步确认)

异步confirm⽅法的编程实现最为复杂. Channel 接⼝提供了⼀个⽅法addConfirmListener. 这个⽅法可以添加ConfirmListener 回调接⼝.
ConfirmListener 接⼝中包含两个⽅法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表⽰发送消息的序号. multiple 表⽰是否批量确认.
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.

  1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表⽰⼩于等于当前序号deliveryTag的消息都收到了, 则清除对应集合
  2. 当收到nack时, 处理逻辑类似, 不过需要结合具体的业务情况, 进⾏消息重发等操作
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException{try(Connection connection = createConnection()){Channel channel = connection.createChannel();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);channel.confirmSelect();//有序集合, 元素按照自然顺序进行排序, 存储未confirm消息序号SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple){//批量confirmSet.headSet(deliveryTag + 1).clear();}else{confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if(multiple){//批量confirmSet.headSet(deliveryTag + 1).clear();}else{confirmSet.remove(deliveryTag);}//如果处理失败, 需要有消息重发的环节, 此处省略}});long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE3, null, message.getBytes(StandardCharsets.UTF_8));confirmSet.add(nextPublishSeqNo);}while(!confirmSet.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("Published %d message ConfirmsAsynchronously in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

模型训练数据-MinerU一款Pdf转Markdown软件

模型训练数据-MinerU一款Pdf转Markdown软件-说明 简介&#xff1a; MinerU是什么 MinerU是上海人工智能实验室OpenDataLab团队推出的开源智能数据提取工具&#xff0c;专注于复杂PDF文档的高效解析与提取。MinerU能将包含图片、公式、表格等元素的多模态PDF文档转化为易于分析…

STM32F103 PWM配置

在《STM32F103定时器配置》中我们介绍了PWM的产生原理&#xff0c;本节介绍介绍如何编码实现PWM的输出。 一、PWM相关寄存器 TIMx如果要产生PWM&#xff0c;除了我们上一节提到的如下寄存器&#xff1a; 控制寄存器(TIMx_CR1)&#xff1b;DMA/中断使能寄存器(TIMx_DIER)&#x…

Flink Python作业快速入门

Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心 import argparse # 用于处理命令行参数和选项&#xff0c;使程序能够接收用户通过命令行传递的参数 import logging import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.data…

三菱CNC数采超详细,资料全备教程,后续更新发那科数采教程

三菱数采详细教程 文章目录 三菱数采详细教程一、介绍1.背景2.需要掌握知识3.需要资料①三菱SDK包&#xff1a;A2②三菱com接口文档③C#代码&#xff1a;④VStudio⑤资料存放网盘 二、程序运行1.调试设备①条件②命令 2.运行软件①打开软件②运行程序 三、数据采集1.代码了解2.…

Atcoder ABC382

C 在回转寿司店里面有n个标为 A i A_i Ai​人和m个标为 B j B_j Bj​寿司。 每个寿司依次从1 2 3 … n人面前经过&#xff0c;当 B j ≥ A i B_j \geq A_i Bj​≥Ai​时i人就拿走寿司。 问&#xff1a;每个寿司被谁吃了&#xff1f; 对寿司进行排序&#xff0c;依次从1 2 3 ……

常见限流算法详细解析

常见限流算法详细解析 分布式系统中&#xff0c;由于接口API无法控制上游调用方的行为&#xff0c;因此当瞬时请求量突增时&#xff0c;会导致服务器占用过多资源&#xff0c;发生响应速度降低、超时、乃至宕机&#xff0c;甚至引发雪崩造成整个系统不可用。 限流&#xff0c;…

图片上传HTML

alioss sky:jwt:# 设置jwt签名加密时使用的秘钥admin-secret-key: itcast# 设置jwt过期时间admin-ttl: 7200000# 设置前端传递过来的令牌名称admin-token-name: tokenalioss:endpoint: ${sky.alioss.endpoint}access-key-id: ${sky.alioss.access-key-id}access-key-secret: $…

C# 抽奖程序winform示例

C# 抽奖程序winform示例 using System; using System.Collections.Generic; using System.Linq;public class LotterySimulator {private Random random new Random();public List<string> GenerateWinners(int numberOfWinners, int totalParticipants){List<strin…

java+ssm+mysql高校学籍管理系统

项目介绍&#xff1a; 使用javassmmysql开发的高校学籍管理系统&#xff0c;系统包含超级管理员&#xff0c;系统管理员、教师、学生角色&#xff0c;功能如下&#xff1a; 超级管理员&#xff1a;管理员管理&#xff08;可以新增管理员&#xff09;&#xff1b;专业管理&…

Couchbase 简介

Couchbase 是一款分布式 NoSQL 数据库&#xff0c;主要用于现代应用程序中高性能、高可扩展性和灵活的数据存储需求。它结合了文档存储和键值存储的特性&#xff0c;为开发者提供了一种高效的数据库解决方案。 Couchbase 的特点 高性能&#xff1a; 支持内存优先的架构&#x…

LeetCode题练习与总结:132 模式--456

一、题目描述 给你一个整数数组 nums &#xff0c;数组中共有 n 个整数。132 模式的子序列 由三个整数 nums[i]、nums[j] 和 nums[k] 组成&#xff0c;并同时满足&#xff1a;i < j < k 和 nums[i] < nums[k] < nums[j] 。 如果 nums 中存在 132 模式的子序列 &a…

(5)JS-Clipper2之PolyNode

1. 描述 PolyNodes是被封装在PolyTree的容器中&#xff0c;同时提供了一个数据结构来代表由Excute()方法返回的多边形轮廓中的父子关系。 一个PolyNode对象代表一个多边形&#xff1b;它的“IsHole”属性表明它是一个“外轮廓”还是一个“内孔”&#xff0c;PolyNodes可能包含…

为什么 JavaScript 中的回调函数未按顺序执行?

在 JavaScript 中&#xff0c;回调函数未按顺序执行的原因&#xff0c;通常是由于 JavaScript 的 异步非阻塞 执行模型。JavaScript 在执行异步操作时&#xff0c;会将回调函数推入 任务队列&#xff0c;并不会立即执行这些回调函数&#xff0c;而是要等到主线程执行完同步任务…

Java项目实战II基于微信小程序的无中介租房系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。 一、前言 随着城市化进程的加速&#xff0c;租房市场日益繁荣&a…

图像处理插件:让小程序焕发视觉新生的秘密武器

在小程序开发中&#xff0c;图像处理是一个重要的环节&#xff0c;它涉及到图片的加载、显示、裁剪、压缩等多个方面。为了简化这一复杂过程&#xff0c;开发者通常会使用图像处理插件。这些插件不仅提供了丰富的图像处理功能&#xff0c;还封装了底层的图像操作逻辑&#xff0…

MATLAB稀疏感知图像和体数据恢复的系统对象研究

稀疏感知图像和体数据恢复是一种用于恢复损坏、噪声或不完整的图像和体数据的技术。它利用了信号的稀疏性&#xff0c;即信号在某种基础下可以用较少的非零系数表示&#xff0c;从而实现高质量的恢复。 在进行稀疏感知图像和体数据恢复的研究时&#xff0c;需要定义一些系统对…

安卓调试环境搭建

前言 前段时间电脑重装了系统&#xff0c;最近准备调试一个apk&#xff0c;没想到装环境的过程并不顺利&#xff0c;很让人火大&#xff0c;于是记录一下。 反编译工具下载 下载apktool.bat和apktool.jar 官网地址&#xff1a;https://ibotpeaches.github.io/Apktool/install…

【工具】音频文件格式转换工具

找开源资源、下载测试不同库的效果&#xff0c;然后找音频、下载音频、编写代码、测试转换、流程通畅。写一个工具花的时间越来越多了&#xff01;这个 5 天 这个工具是一个音频文件格式转换工具&#xff0c;支持对 mp3.aac.wav.caf.flac.ircam.mp2.mpeg.oga.opus.pcm.ra.spx.…

在ARM Linux应用层下使用SPI驱动WS2812

文章目录 1、前言2、结果展示3、接线4、SPI驱动WS2812原理4.1、0码要发送的字节4.2、1码要发送的字节4.3、SPI时钟频率 5、点亮RGB5.1、亮绿灯5.2、亮红灯5.3、亮蓝灯5.4、完整程序 6、RGB呼吸灯7、总结 1、前言 事情是这样的&#xff0c;前段时间&#xff0c;写了一个基于RK3…

BERT:用于语言理解的深度双向 Transformer 的预训练。

文章目录 0. 摘要1. 介绍2. 相关工作2.1 无监督的基于特征的方法2.3 无监督微调方法2.3 从受监督数据中迁移学习 3. BERT3.1 预训练 BERT3.2 微调 BERT 4. 实验4.1 GLUE4.2 SQuAD v1.14.3 SQuAD v2.04.4 SWAG 5. 消融研究5.1 预训练任务的影响5.2 模型大小的影响5.3 使用 BERT …