RabbitMQ 发布确认机制

发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段

发布确认模式

  • 原理说明
  • 实现方式
    • 开启confirm(确认)模式
    • 阻塞确认
    • 异步确认
  • 总结

原理说明

  生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。
  confirm模式下的信道所发送的消息都将被应带ack或者nack一次,不会出现一条消息即被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做出保证,消息被confirm是异步进行。
在这里插入图片描述

  如上图所示为confirm模式下的消息发送过程,其中4和6为异步应答,也就是说4过程并不一定在5之前,也有可能是在下一条消息发送后才会进行上一条消息的应答。
  RabbitMQ 事务和发送确认机制确保的是消息能够正确的发送至RabbitMQ的交换机,如果交换机没有匹配的队列,那么消息也会被丢失。和事务不同的是,发布确认机制是异步进行的,因此在性能上发布确认模式将更加优秀,需要注意的是:事务和确认机制是互斥的,不能共存
  事务机制和发布确认机制都存在以下注意点:

  • 如果消息需要持久化并且存在队列,则在消息入队并且持久化后进行返回事务提交成功或者应答消息。
  • 如果消息不需要持久化但是存在队列,则在消息入队后返回事务提交成功或者应答消息。
  • 如果消息不可路由到队列中,则在路由失败后返回事务提交成功或者应答消息。

  上文中一直强调的时发布确认针对发布发送到RabbitMQ中的交换机进行保证,但消息实际是否能入队发布确认机制并不能提供保证,因此还需要和mandatory参数配合使用。

实现方式

  RabbitMQ的发布确认机制可以分为三种实现方式:阻塞等待确认、批量阻塞等待确认、异步确认。
阻塞等待确认:每当消息发送后,发送者都阻塞的等待应答消息。这种实现方式将无法体现发布确认模式的异步性能优势。
批量阻塞确认:批量阻塞确认类似于阻塞等待确认,区别在于批量阻塞确认并不会针对每条消息进行阻塞等待,他会针对一些消息进行统一阻塞等待应答消息。这种实现方式将同步和异步结合起来进行使用,对应答性能有一定的提升。
异步应答:实现一个监听器的方式接收应答消息,应答消息的处理逻辑不会影响消息的发送,消息的应答和消息发送是异步进行的,他们并不直接相互干扰。
上面对三种确认方式进行简单说明,下面将分别介绍发布确认机制的实现方式。

开启confirm(确认)模式

  确认模式的开启是针对信道设置的,一旦信道进入了confirm模式,所有在该信道上面发布的消息都会被指派唯一的ID,RabbitMQ也将针对该信道发送的所有消息都进行应答。
  RabbitMQ回传给生产者的确认消息中的deliverryTag包含了确认消息的序号,但在使用(批量)阻塞确认方式进行实现的时候该消息序号无意义。开启confirm模式仅需要以下代码进行实现即可:

channel.confirmSelect();

阻塞确认

  阻塞确认的方式依赖于channel.waitForConfirms()方法,该方法如下所示:

    /*** Wait until all messages published since the last call have been* either ack'd or nack'd by the broker.  Note, when called on a* non-Confirm channel, waitForConfirms throws an IllegalStateException.* @return whether all the messages were ack'd (and none were nack'd)* @throws java.lang.IllegalStateException*/boolean waitForConfirms() throws InterruptedException;

  自从上次调用该方法后直到所有发送的消息都被应答后返回所有消息的应答结果,如果所有发送的消息应答结果都是成功则返回true,一旦存在任何一条消息应答失败则返回false。
  根据该方法的描述可知,可以通过该方法实现阻塞等待确认和批量阻塞确认两种方案,区别仅在于是发送一条消息调用一次该方法还是发送一批消息后调用一次这个方法。
  阻塞等待确认的方式如下代码所示:

//发送消息
channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 如果发送失败则进行该条消息的重新发送
if(!channel.waitForConfirms()){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}

 阻塞批量确认的方式如下代码所示:

        // 存储未应带消息队列List<String> messages = new ArrayList<>();for (int i = 1; i < 20000 ;  i++){String msg = String.valueOf(i);messages.add(msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());// 每发送十条消息进行一次确认if(i > 0 && i % 10 == 0 ){// 如果确认不通过则将消息重新发送if(!channel.waitForConfirms()){for (String e : messages) {channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,e.getBytes());}}else{// 如果确认成功则将这些消息从未应答队列中移除messages.clear();}}}

异步确认

  客户端Channel提供了addConfirmListener方法,该可以添加ConfirmListener这个回调接口,该接口包含两个方法:handleAck和handleNack,分别用来处理饭hi的Ack和Nack,这两个方法都将返回一个参数deliveryTag(消息的唯一有序序号)和一个boolean型参数multiple,如果该参数为true表示自该消息之前的所有消息RabbitMQ服务都已经做出了应答。我们可以通过该值实现具体业务的发布确认。

/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker.  Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {void handleAck(long deliveryTag, boolean multiple)throws IOException;void handleNack(long deliveryTag, boolean multiple)throws IOException;
}

  异步确认的方式实现起来比较复杂,在生产者端需要维护一个消息队列,如果消息应答成功则将该消息从队列中移除,如果消息应答失败则将该消息再重新发送或进行其他业务处理。该逻辑伪码如下所示:

        // 存储未确认消息,其中key为消息序号,value为消息实体HashMap<Long,String> msgMap = new HashMap<>();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {msgMap.remove(deliveryTag);}/*** 如果消息应带结果为nack则重新发送该消息* @param deliveryTag* @param multiple* @throws IOException*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {String msg = msgMap.get(deliveryTag);if(msg != null){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}}});for (int i = 1; i < 20000 ;  i++){String msg = String.valueOf(i);// 将消息序号和消息存储map中msgMap.put(channel.getNextPublishSeqNo(),msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}

  上述代码使用了map存储消息序号和消息实体,这种存储方式应该会存在风险,由于监听器和消息发送过程是异步进行了,因此可能会存在线程安全的问题,HashMap是非线程安全的。

总结

  发布确认模式是为我们解决消息自生产者发送到RabbitMQ交换机过程中消息丢失的问题的,这一场景需求我们也可以通过事务机制实现。发布确认模式和事务机制比较如下表所示:

比较事务机制发布确认机制
实现方式通过AMQP协议层面实现轻量级实现,采用RabbitMQ应答机制
命令详解Tx.Select
Basic.Publish
Tx.Commit
Commit.OK
Basic.Publish
Basic.Ack
性能同步,性能较慢可异步实现也可同步实现,性能快,AMQP命令交互少
消息到达队列时机事务提交后消息才会进入队列,消息入队存在滞后性消息发送后就进入队列,发布确认模式不影响消息进入队列时机
事务提交成功或消息应答时机消息被交换机处理完成后,或消息不可达同事务
实现复杂度简单相对复杂
适合场景批量发送消息,实现批量消息的原子性和一致性确保消息发送到交换机

  发布确认模式的具体实现可以划分为三种:阻塞等待、批量确认、异步确认,这三者的比较如下表所示:

比较内容阻塞等待批量等待异步确认
性能
实现复杂度
确认范围每条消息批量消息每条消息
是否可以精准确认每条消息

  根据上述内容,我们在实现避免消息自生产者到交换机丢失的机制时建议使用发布确认模式的异步确认,因为异步确认性能最高,并且可以准确的得到被应答的消息的序号,有助于我们进行后续逻辑处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/29822.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

R语言APSIM模型高级应用及批量模拟

随着数字农业和智慧农业的发展&#xff0c;基于过程的农业生产系统模型在模拟作物对气候变化的响应与适应、农田管理优化、作物品种和株型筛选、农田固碳和温室气体排放等领域扮演着越来越重要的作用。APSIM (Agricultural Production Systems sIMulator)模型是世界知名的作物生…

云服务器SVN仓库搭建(以阿里云为例)

远程连接阿里云服务器 安装svn(注意需要root权限使用命令sudo su) yum install subversion 安装成功后查看svn版本 svnserve --version 创建版本库的根目录 mkdir /var/svn 创建代码仓库 svnadmin create /var/svn/test 当前生成的目录结构 此处为svn的配置文件 创建用户名…

信创环境下 FTP如何进行国产化替代?

一、政策驱动&#xff0c;倡导自主、创新、协同 信创&#xff0c;即信息技术应用创新产业&#xff0c;其是数据安全、网络安全的基础&#xff0c;也是新基建的重要组成部分。为打破国外巨头的垄断&#xff0c;解决核心技术关键环节“卡脖子”问题&#xff0c;在核心芯片、基础…

C++项目:在线五子棋对战(网页版)

项目介绍 本项⽬主要实现⼀个⽹⻚版的五⼦棋对战游戏&#xff0c;其主要⽀持以下核⼼功能&#xff1a; • 用户管理:实现用户注册&#xff0c;用户登录、获取用户信息、用户天梯分数记录、用户比赛场次记录等。 • 匹配对战:实现两个玩家在网页端根据天梯分数匹配游戏对⼿&…

Java版企业电子招标采购系统源码Spring Cloud + Spring Boot +二次开发+ MybatisPlus + Redis tbms

​ 功能描述 1、门户管理&#xff1a;所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含&#xff1a;招标公告、非招标公告、系统通知、政策法规。 2、立项管理&#xff1a;企业用户可对需要采购的项目进行立项申请&#xff0c;并提交审批&#xff0c;查…

HarmonyOS应用开发者基础认证考试题库

此博文为HarmonyOS应用开发者基础认证考试的最后的大考&#xff0c;要求100分取得90分方可获取证书、现将考试的题库进行分享&#xff0c;希望能帮到大家。但是需要注意的是&#xff0c;题库会不定时的进行题目删减&#xff0c;但是大概的内容是不会进行改变的。真心希望这篇博…

第126天:内网安全-隧道技术SSHDNSICMPSMB上线通讯LinuxMac

知识点 #知识点&#xff1a; 1、入站规则不出网上线方案 2、出站规则不出网上线方案 3、隧道技术-SMB&ICMP&DNS&SSH 4、控制上线-Linux&Mac&IOS&Android-连接方向&#xff1a;正向&反向&#xff08;基础课程有讲过&#xff09; -内网穿透&#xf…

centos linux 安装RDMA Soft-RoCE|虚拟机安装Soft-RoCE

什么是Soft-RoCE softRoCE的目标是在所有支持以太网的设备上都可以部署RDMA传输&#xff0c;可以使不具备RoCE能力的硬件和支持RoCE的硬件间进行基于IB语义的交流。 大白话就是模拟RDMA的软件栈&#xff0c;使得在没有RDMA网卡的环境上&#xff0c;也可以运行基于RDMA写的传输…

leetcode870. 优势洗牌(java)

优势洗牌 leetcode870. 优势洗牌题目描述双指针 排序代码 滑动窗口 leetcode870. 优势洗牌 难度 - 中等 leetcode870. 优势洗牌 题目描述 给定两个长度相等的数组 nums1 和 nums2&#xff0c;nums1 相对于 nums2 的优势可以用满足 nums1[i] > nums2[i] 的索引 i 的数目来描…

博客网站添加复制转载提醒弹窗Html代码

网站如果是完全禁止右键&#xff08;复制、另存为等&#xff09;操作&#xff0c;对用户来说体验感会降低&#xff0c;但是又不希望自己的原创内容直接被copy&#xff0c;今天飞飞和你们分享几行复制转载提醒弹窗Html代码。 效果展示&#xff1a; 复制以下代码&#xff0c;将其…

matplotlib 笔记:基本用法

1 axis 1.0 对比原始图像 import numpy as np import matplotlib.pyplot as plt xrange(5) yrange(10,20,2) plt.plot(x,y) 1.1 plt.axis(equal) x轴和y轴单位长度相同 import numpy as np import matplotlib.pyplot as plt plt.axis(equal) xrange(5) yrange(10,20,2) pl…

【iOS】autoreleasepool

来说一下最近在了解的autoreleasepool吧&#xff0c;我们可能平时书写过许多脑残代码&#xff0c;其有很多的缺陷但是我们可能当时学的比较浅就也不太了解&#xff0c;就像下面这样的&#xff1a; for (int i 0; i < 1000000; i) {NSNumber *num [NSNumber numberWithInt…

SAP MIGO 移动原因维护

在OMJJ中维护 在OMJJ中&#xff0c;选择你要维护的移动类型&#xff0c;在“对话结构”中选择“移动原因”&#xff0c;可以修改和添加了。

Java用方法实现登录名和密码的校验

Java用方法实现登录名和密码的校验 需求分析代码实现小结Time 需求分析 系统正确的登录名和密码是:学习/123&#xff0c;请在控制台开发一个登录界面&#xff0c;接收用户输入的登录名和密码&#xff0c;判断用户是否登录成功&#xff0c;登录成功后展示:“欢迎进入系统!”&…

C# OpenCvSharp 去水印 图像修复

效果 项目 VS2022.net4.8OpenCvSharp4 代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.IO; using System.Linq; using System.Security.Cryptography; using System.Text; usi…

Improved Deep Metric Learning with Multi-class N-pair Loss Objective

Improved Deep Metric Learning with Multi-class N-pair Loss Objective 来源&#xff1a; NIPS’2016NEC Laboratories America 文章目录 Improved Deep Metric Learning with Multi-class N-pair Loss ObjectiveDistance Metric LearningDeep Metric Learning with Multip…

时间复杂度空间复杂度相关练习题

1.消失的数字 【题目】&#xff1a;题目链接 思路1&#xff1a;排序——》qsort快排——》时间复杂度O&#xff08;n*log2n&#xff09; 不符合要求 思路2&#xff1a;&#xff08;0123...n)-(a[0]a[1][2]...a[n-2]) ——》 时间复杂度O&#xff08;N&#xff09;空间复杂度…

spring cloud智慧工地源码(项目端+监管端+数据大屏+APP)

spring cloud智慧工地源码&#xff08;项目端监管端数据大屏APP&#xff09; 系统功能介绍 【智慧工地PC项目端功能总览】 一.项目人员管理 包括&#xff1a;信息管理、信息采集、证件管理、考勤管理、考勤明细、工资管理、现场统计、WIFI教育、工种管理、分包商管理、班组管…

STM32 HAL 驱动PM2.5传感器(GP2Y10AU气体检测模块)

目录 1、简介 2、CubeMX初始化配置 2.1 基础配置 2.1.1 SYS配置 2.1.2 RCC配置 2.2 ADC外设配置 2.3 串口外设配置 2.4 项目生成 3、KEIL端程序整合 3.1 串口重映射 3.2 ADC数据采集 3.3 主函数代 3.4 效果展示 1、简介 本文通过STM32F103C8T6单片机通过HAL库方式对G…

Qt 使用QLabel的派生类实现QLabel的双击响应

1 介绍 在QLabel中没有双击等事件响应&#xff0c;需要构建其派生类&#xff0c;自定义信号(signals)、重载事件函数(event)&#xff0c;最后在Qwidget中使用connect链接即可&#xff0c;进而实现响应功能。 对于其余没有需求事件响应的QObject同样适用。 此外&#xff0c;该功…