RabbitMQ事务和Confirm发送方消息确认——深入解读

引言

根据前面的知识(深入了解RabbitMQ工作原理及简单使用、Rabbit的几种工作模式介绍与实践)我们知道,如果要保证消息的可靠性,需要对消息进行持久化处理,然而消息持久化除了需要代码的设置之外,还有一个重要步骤是至关重要的,那就是保证你的消息顺利进入Broker(代理服务器),如图所示:

正常情况下,如果消息经过交换器进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失,有没有办法可以解决这个问题?

RabbitMQ有两种方式来解决这个问题:

  1. 通过AMQP提供的事务机制实现;
  2. 使用发送者确认模式实现;

一、事务使用

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

  1. channel.txSelect()声明启动事务模式;

  2. channel.txComment()提交事务;

  3. channel.txRollback()回滚事务;

从上面的可以看出事务都是以tx开头的,tx应该是transaction extend(事务扩展模块)的缩写,如果有准确的解释欢迎在博客下留言。

我们来看具体的代码实现:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);   
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(_queueName, true, false, false, null);
String message = String.format("时间 => %s", new Date().getTime());
try {channel.txSelect(); // 声明事务// 发送消息channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));channel.txCommit(); // 提交事务
} catch (Exception e) {channel.txRollback();
} finally {channel.close();conn.close();
}

注意:用户需把config.xx配置成自己Rabbit的信息。

从上面的代码我们可以看出,在发送消息之前的代码和之前介绍的都是一样的,只是在发送消息之前,需要声明channel为事务模式,提交或者回滚事务即可。

了解了事务的实现之后,那么事务究竟是怎么执行的,让我们来使用wireshark抓个包看看,如图所示:

输入ip.addr==rabbitip && amqp查看客户端和rabbit之间的通讯,可以看到交互流程:

  • 客户端发送给服务器Tx.Select(开启事务模式)
  • 服务器端返回Tx.Select-Ok(开启事务模式ok)
  • 推送消息
  • 客户端发送给事务提交Tx.Commit
  • 服务器端返回Tx.Commit-Ok

以上就完成了事务的交互流程,如果其中任意一个环节出现问题,就会抛出IoException移除,这样用户就可以拦截异常进行事务回滚,或决定要不要重复消息。

那么,既然已经有事务了,没什么还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。事务性能测试

事务模式,结果如下:

  • 事务模式,发送1w条数据,执行花费时间:14197s
  • 事务模式,发送1w条数据,执行花费时间:13597s
  • 事务模式,发送1w条数据,执行花费时间:14216s

非事务模式,结果如下:

  • 非事务模式,发送1w条数据,执行花费时间:101s
  • 非事务模式,发送1w条数据,执行花费时间:77s
  • 非事务模式,发送1w条数据,执行花费时间:106s

从上面可以看出,非事务模式的性能是事务模式的性能高149倍,我的电脑测试是这样的结果,不同的电脑配置略有差异,但结论是一样的,事务模式的性能要差很多,那有没有既能保证消息的可靠性又能兼顾性能的解决方案呢?那就是接下来要讲的Confirm发送方确认模式。

扩展知识

我们知道,消费者可以使用消息自动或手动发送来确认消费消息,那如果我们在消费者模式中使用事务(当然如果使用了手动确认消息,完全用不到事务的),会发生什么呢?

消费者模式使用事务

假设消费者模式中使用了事务,并且在消息确认之后进行了事务回滚,那么RabbitMQ会产生什么样的变化?

结果分为两种情况:

  1. autoAck=false手动应对的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但在确认消息会等事务的返回解决之后,在做决定是确认消息还是重新放回队列,如果你手动确认现在之后,又回滚了事务,那么已事务回滚为主,此条消息会重新放回队列;
  2. autoAck=true如果自定确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了;

二、Confirm发送方确认模式

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。

Confirm的三种实现方式:

方式一:channel.waitForConfirms()普通发送方确认模式;

方式二:channel.waitForConfirmsOrDie()批量确认模式;

方式三:channel.addConfirmListener()异步监听发送方确认模式;

方式一:普通Confirm模式

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
String message = String.format("时间 => %s", new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {System.out.println("消息发送成功" );
}

看代码可以知道,我们只需要在推送消息之前,channel.confirmSelect()声明开启发送方确认模式,再使用channel.waitForConfirms()等待消息被服务器确认即可。

方式二:批量Confirm模式

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {String message = String.format("时间 => %s", new Date().getTime());channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("全部执行完成");

以上代码可以看出来channel.waitForConfirmsOrDie(),使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。

方式三:异步Confirm模式

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {String message = String.format("时间 => %s", new Date().getTime());channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未确认消息,标识:" + deliveryTag);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));}
});

异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可,以上异步返回的信息如下:

可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。

Confirm性能测试

测试前提:与事务一样,我们发送1w条消息。

方式一:Confirm普通模式

  • 执行花费时间:2253s
  • 执行花费时间:2018s
  • 执行花费时间:2043s

方式二:Confirm批量模式

  • 执行花费时间:1576s
  • 执行花费时间:1400s
  • 执行花费时间:1374s

方式三:Confirm异步监听方式

  • 执行花费时间:1498s
  • 执行花费时间:1368s
  • 执行花费时间:1363s

总结

综合总体测试情况来看:Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。

长按二维码关注我的技术公众号

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/546912.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

十四、PyCharm开发Python利用WMI修改电脑IP、DNS

1.在PyCharm开发中安装WMI插件 搜索需要添加的WM插件,并安装,安装成功后会有提示!

Python 3深度置信网络(DBN)在Tensorflow中的实现MNIST手写数字识别

任何程序错误&#xff0c;以及技术疑问或需要解答的&#xff0c;请扫码添加作者VX&#xff1a;1755337994 使用DBN识别手写体 传统的多层感知机或者神经网络的一个问题&#xff1a; 反向传播可能总是导致局部最小值。 当误差表面(error surface)包含了多个凹槽&#xff0c;当你…

PHP安装ZIP扩展

2019独角兽企业重金招聘Python工程师标准>>> 下载ZIP扩展包 wget http://pecl.php.net/get/zip-1.10.2.tgztar zxvf zip-1.10.2.tgz 进入解压后的目录&#xff0c;执行 /usr/local/php/bin/phpize 编译 ./configure --with-php-config/usr/local/php/bin/php-config…

使用Docker部署RabbitMQ集群

使用Docker部署RabbitMQ集群 概述 本文重点介绍的Docker的使用&#xff0c;以及如何部署RabbitMQ集群&#xff0c;最基础的Docker安装&#xff0c;本文不做过多的描述&#xff0c;读者可以自行度娘。 Windows10上Docker的安装 因为本人用的是Windows系统&#xff0c;所有推…

Python官方中文开发文档

Python官方中文开发文档&#xff1a; https://docs.python.org/zh-cn/3/

perl anyevent socket监控web日志server

上篇已经讲过client端的CODE这部分code主要用来接收client端发送来的日志,从数据库中读取reglar然后去匹配.如果出现匹配则判断为XSS***.server端的SOCKET接收用了coro相关的模块.配置文件仿照前一篇博客读取即可.#!/usr/bin/perl use warnings; use strict; use AnyEvent; use…

Tensorflow No module named ‘tensorflow.examples.tutorials‘解决办法,有用

任何程序错误&#xff0c;以及技术疑问或需要解答的&#xff0c;请扫码添加作者VX&#xff1a;&#xff1a;1755337994 1 .利用TensorFlow代码下载MNIS丁 TensorFlow 提供了一个库&#xff0c; 可以直接用来自动下载与安装MNIST &#xff0c; 见如下代码&#xff1a; 代码5-1 M…

你不知道的RabbitMQ集群架构全解

你不知道的RabbitMQ集群架构全解 前言 本文将系统的介绍一下RabbitMQ集群架构的特点、异常处理、搭建和使用中要注意的一些细节。 知识点 一、为什么使用集群&#xff1f; 二、集群的特点 三、集群异常处理 四、集群节点类型 五、集群搭建方法 六、镜像队列 一、为什…

IPFS搭建HTTPS去中心化网站,真实可用

、 首先&#xff0c;我们需要知道IPFS是什么&#xff1f; 其实IPFS是一种协议&#xff0c;全称为Inter-Planetary File System&#xff0c;是一种点对点超媒体协议&#xff0c;旨在取代旧的HTTP&#xff0c;使网络更快&#xff0c;更安全&#xff0c;更开放。 我们平常都通过…

Ubuntu 18.04.1 搭建Java环境和HelloWorld

一、搭建Java环境 系统环境 Ubuntu 18.04.1JDK 8IDEA 2018.2 1.下载JDK 官网地址&#xff1a;http://www.oracle.com/technetwork/java/javase/downloads/index.html 选择相应的版本&#xff0c;点击jdk&#xff0c;进入下载页面&#xff0c;选择“Linux x64”版本的后缀为…

Python openpyxl打开有公式的excel表取值错误的解决办法,Python openpyxl获取excel有公式的单元格的数值错误,Python操作excel(.xlsx)封装类

Python openpyxl打开有公式的表格&#xff0c;如果直接读取&#xff0c;会出现有公式的单元格为空或零的情况。 参见&#xff1a; https://blog.csdn.net/weixin_45903952/article/details/105073611?utm_mediumdistribute.wap_relevant.none-task-blog-title-3 wb openpyxl…

Python实现GCS bucket断点续传功能,分块上传文件

Python实现GCS bucket断点续传功能&#xff0c;分块上传文件 环境&#xff1a;Python 3.6 我有一个关于使用断点续传到Google Cloud Storage的上传速度的问题。我已经编写了一个Python客户端&#xff0c;用于将大文件上传到GCS&#xff08;它具有一些特殊功能&#xff0c;这…

Spring Boot 最佳实践(一)快速入门

一、关于Spring Boot 在开始了解Spring Boot之前&#xff0c;我们需要先了解一下Spring&#xff0c;因为Spring Boot的诞生和Spring是息息相关的&#xff0c;Spring Boot是Spring发展到一定程度的一个产物&#xff0c;但并不是Spring的替代品&#xff0c;Spring Boot是为了让程…

Wo Cloud CentOS 挂载磁盘小计

为什么80%的码农都做不了架构师&#xff1f;>>> 涉及到的命令&#xff1a;fdisk/mkfs/mount 列出当前磁盘[rootvity ~]# fdisk -lDisk /dev/vda: 21.5 GB, 21474836480 bytes 16 heads, 63 sectors/track, 41610 cylinders Units cylinders of 1008 * 512 516096…

PC通过IE浏览器对华为S5700交换机进行WEB管理

1.PC和交换机通过网线连接,通过CONSOLE线缆连接华为S5700交换机,使用如下命令查看是否有web.7z文件 <Quidway>dir2.新建VLAN和配置VLAN的IP <Quidway>system-view [Quidway]<

最邻近插值、双线性插值、三次卷积插值最通俗入门理论解析,论文材料

如有任何问题&#xff0c;请联系VX&#xff1a;1755337994 前言 图像处理中有三种常用的插值算法&#xff1a; 最邻近插值 双线性插值 双立方&#xff08;三次卷积&#xff09;插值 其中效果最好的是双立方&#xff08;三次卷积&#xff09;插值&#xff0c;本文介绍它的原…

Spring Boot 最佳实践(二)集成Jsp与生产环境部署

一、简介 提起Java不得不说的一个开发场景就是Web开发&#xff0c;也是Java最热门的开发场景之一&#xff0c;说到Web开发绕不开的一个技术就是JSP&#xff0c;因为目前市面上仍有很多的公司在使用JSP&#xff0c;所以本文就来介绍一下Spring Boot 怎么集成JSP开发&#xff0c…

全网最详细 Python如何读取NIFTI格式图像(.nii文件)和 .npy格式文件和pkl标签文件内容

在医学图像处理中&#xff0c;我们经常使用一种NIFTI格式图像&#xff08;.nii文件&#xff09;&#xff0c;现在我们来看看 什么是.nii文件&#xff1f;该如何读取.nii文件&#xff1f; 1. NIFTI格式图像 什么是NIFTI&#xff08;Neuroimaging Informatics Technology Initia…

Spring Boot 最佳实践(三)模板引擎FreeMarker集成

一、FreeMaker介绍 FreeMarker是一款免费的Java模板引擎&#xff0c;是一种基于模板和数据生成文本&#xff08;HMLT、电子邮件、配置文件、源代码等&#xff09;的工具&#xff0c;它不是面向最终用户的&#xff0c;而是一款程序员使用的组件。 FreeMarker最初设计是用来在M…

Android开发之通过浏览器链接打开任意app页面

老套路先上图&#xff1a; 先说下上面的流程&#xff0c;第一张图是模拟浏览器的网页点击链接打开app,第二张图系统弹框提示是否打开app,第三张图已打开APP&#xff0c;弹出的吐司是打开APP携带的数据 具体实现分为两步&#xff0c;第一步配置你要打开的activity页面如下&…