RabbitMQ消息确认机制

文章目录

      • 1. 事务机制
      • 2. Confirm模式
        • 2.1 生产者
          • 2.1.1 普通Confirm模式
          • 2.1.2 批量Confirm模式
          • 2.1.3 异步Confirm模式
        • 2.2 消费者
      • 3. 其他

消费者如何确保消息一定能够消费成功呢?

由于在前面工作队列模式里面我们了解了应答模式,所以我们可以很自信的回答如上题目。

通过应答形式,默认自动应答,可以修改为手动应答来保证消息消费成功。

其实应答形式就是 RabbitMQ 消息确认机制的一种体现,我们再来看看问题的产生背景:

生产者发送消息出去之后,不知道到底有没有发送到 RabbitMQ 服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。

两种解决方案:

  1. AMQP 事务机制
  2. Confirm 模式

1. 事务机制

事务机制分为三部分,开启事务,提交事务,事务回滚,如下:

  1. txSelect 将当前 channel 通道设置为 transaction 模式(开启事务)
  2. txCommit 提交当前事务
  3. txRollback 事务回滚

我们通过一个例子模拟消息生产者发送消息过程发生异常,进行事务回滚的过程。

public class Producer {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.发送消息 */try {/** 4.1 开启事务 */channel.txSelect();String msg = "我是生产者生成的消息";System.out.println("生产者发送消息:"+msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());/** 4.2 提交事务 - 模拟异常 */int i = 1/0;channel.txCommit();}catch (Exception e){e.printStackTrace();System.out.println("发生异常,我要进行事务回滚了!");/** 4.3 事务回滚 */channel.txRollback();}finally {channel.close();newConnection.close();}}}

打印结果:
生产者发送消息:我是生产者生成的消息
java.lang.ArithmeticException: / by zero at club.sscai.producer.Producer.main(Producer.java:37)
发生异常,我要进行事务回滚了!

2. Confirm模式

像上方这种采用 AMQP 事务机制来保证消息的准确到达,在一定程度上是消耗了性能的,所以我们再来看看 Confirm 模式。

Confirm 模式分为两块,一是生产者的 Confirm 模式,再就是消费者的 Confirm 模式。

2.1 生产者

通过生产者的确认模式我们是要保证消息准确达到客户端,而与 AMQP 事务不同的是 Confirm 是针对一条消息的,而事务是可以针对多条消息的。

Confirm 模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。

Confirm 的三种实现方式:

  1. channel.waitForConfirms() 普通发送方确认模式;
  2. channel.waitForConfirmsOrDie() 批量确认模式;
  3. channel.addConfirmListener() 异步监听发送方确认模式
2.1.1 普通Confirm模式
public class Producer11 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();/** 5.发送消息 */for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());try {if (channel.waitForConfirms()) {System.out.println("发送成功");}else{System.out.println("进行消息重发");}} catch (InterruptedException e) {e.printStackTrace();}}/** 5.关闭通道、连接 */channel.close();newConnection.close();}
}

在推送消息之前,channel.confirmSelect() 声明开启发送方确认模式,再使用channel.waitForConfirms() 等待消息被服务器确认即可。

2.1.2 批量Confirm模式
public class Producer22 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();/** 5.发送消息 */for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, null, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());}/** 6.直到所有信息都发布,只要有一个未确认就会IOException */channel.waitForConfirmsOrDie();System.out.println("全部执行完成");/** 5.关闭通道、连接 */channel.close();newConnection.close();}
}

channel.waitForConfirmsOrDie() 使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出 IOException 异常。

2.1.3 异步Confirm模式
public class Producer33 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();for (int i = 0; i < 10; i++) {String message = "我是生产者生成的消息:" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));}/** 5.发送消息 异步监听确认和未确认的消息 */channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未确认消息,标识:" + deliveryTag);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));}});/** 6.关闭通道、连接 *//** channel.close();*//** newConnection.close();*/}}

异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可,以上异步返回的信息如下:

可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的 multiple 的参数,此参数为 bool 值,如果 true 表示批量执行了 deliveryTag 这个值以前的所有消息,如果为 false 的话表示单条确认。

维持异步调用要求我们不能断掉连接,因此注释掉第6步。

2.2 消费者

为了保证消息从队列可靠地到达消费者,RabbitMQ 提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false 时, RabbitMQ 会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ 会在队列中消息被消费后立即删除它。

在消费者中 Confirm 模式又分为手动确认和自动确认。

关于两者的介绍:

自动确认: 在自动确认模式下,消息在发送后立即被认为是发送成功。 这种模式可以提高吞吐量(只要消费者能够跟上),不过会降低投递和消费者处理的安全性。 这种模式通常被称为“发后即忘”。 与手动确认模式不同,如果消费者的TCP连接或信道在成功投递之前关闭,该消息则会丢失。

手动确认: 使用自动确认模式时需要考虑的另一件事是消费者过载。 手动确认模式通常与有限的信道预取一起使用,限制信道上未完成(“进行中”)传送的数量。 然而,对于自动确认,根据定义没有这样的限制。 因此,消费者可能会被交付速度所压倒,可能积压在内存中,堆积如山,或者被操作系统终止。 某些客户端库将应用TCP反压(直到未处理的交付积压下降超过一定的限制时才停止从套接字读取)。 因此,只建议当消费者可以有效且稳定地处理投递时才使用自动投递方式。

综上:尽量选择手动确认方式。

主要实现代码:

// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);// 关闭自动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3. 其他

1、如果 RabbitMQ 服务器宕机了,消息会丢失吗?

不会丢失,RabbitMQ 服务器支持消息持久化机制,会把消息持久化到硬盘上。

2、如何确保消息正确地发送至RabbitMQ?

RabbitMQ 使用发送方确认模式,确保消息正确地发送到 RabbitMQ。


发送方确认模式:将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。


发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。


我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下微信公众号哈:niceyoo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/414179.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMQ消息幂等性问题

文章目录1. 什么是幂等性&#xff1f;1.1 消息队列的幂等性1.2 模拟重试机制1.2.1 生产者代码1.2.2 消费者代码1.2.3 消费者 application.yml 配置2. 如何保证消息幂等性&#xff0c;不被重复消费&#xff1f;解决方法1. 什么是幂等性&#xff1f; 在编程中一个幂等操作的特点是…

Centos安装JDK(java环境)

王小私下问我 centos 中 jdk 怎么安装呀&#xff0c;所以再次整理了这篇基础环境搭建的文章。 1、创建java目录2、下载上传jdk3、解压jdk4、配置环境变量 1、创建java目录 首先我们创建java的安装目录 cd /usrmkdir javacd java 2、下载上传jdk 我们如上在 usr 目录下创建了 ja…

工作288:根据时间戳处理接口

<template><div class"table-list-page"><div class"query-area"><el-date-pickerv-model"value1"type"daterange"range-separator"至"start-placeholder"开始日期"end-placeholder"结…

Centos7安装MySQL(多图)

文章目录一、在线安装1、替换网易yum源2、清理缓存3、下载rpm文件4、安装MySQL数据库二、本地安装1、上传MySQL安装包2、安装依赖的程序包3、卸载mariadb程序包4、安装MySQL程序包5、修改MySQL目录权限6、初始化MySQL三、启动MySQL1.1、在线安装方式启动MySQL1.2、本地安装方式…

lower_case_table_names=1 启动报错 mysql8.0

本文为采集文章&#xff0c;主博客地址&#xff1a;https://www.cnblogs.com/niceyoo 我们知道在 Linux 环境下默认是区分大小写的&#xff0c;所以我们需要改变这种默认方式&#xff0c;经过网上各种搜索后&#xff0c;基本就是清一色的修改 lower_case_table_names&#xff0…

工作292:修改父子组件传值错误

[Vue warn]: Missing required prop: “title” 在写vue项目中&#xff0c;在子组件中通过props传值的时候&#xff0c;在父组件中没有定义的话就会看到类似的报错&#xff0c; 这个意思是calendar这个组件中通过props传递一个title属性给父组件&#xff0c;并且title属性是必…

MacOS下IDEA设置智能提示不区分大小写

本文只针对&#xff0c;IDEA-2019.2.3版本 目录地址&#xff1a; Edit -> General -> Code Completion -> Match case -> 勾选去掉 截图如下&#xff1a;

博客园文章方块背景格式

有小伙伴问到方格背景的问题&#xff0c;所以写一篇文章记录我的博客园文章背景是如何制作的。 一、辅助网站1. 一键排版2. 代码主题3. 复制二、 图床设置 一、辅助网站 辅助网址&#xff1a;Md2All 作者提供了一篇帮助文章&#xff1a;玩转公众号Markdown 其实大致看完辅助网址…

day02 pycharm 安装

pycharm 是一款现在比较主流的辅助开发软件 不选择虚拟 所以选择Existing现有的 安装后只需打开当前窗口 默认的 不需要大家新的窗口 使用鼠标滚轮来实现放大缩小 使用debug模式测试代码 转载于:https://www.cnblogs.com/zhaohongyu6688/p/8962253.html

eclipse启动项目

今天做的任务不多&#xff0c;没有自己写代码&#xff0c;上午看了些文章&#xff0c;下午我司后台给配了配项目环境&#xff0c;全装C盘了。。以后有我好受的。。 看着后台操作&#xff0c;修改了N多配置&#xff0c;tomcat、redis、zkServer.、Nginx&#xff0c;navcat、ecli…

如何写一份优秀的java程序员简历

背景&#xff1a;进入第一家公司已经工作将近两年了&#xff0c;其中闲了一年&#xff0c;在准备自己的简历的时候&#xff0c;有种江郎才尽的感觉&#xff0c;不知道怎么写&#xff0c;看来平时还是要多积累多熟悉。 PS&#xff1a;这里面的分享看完还是很受用的。 简历看得比…

macos -bash: yarn: command not found/-bash: cnpm: command not found

博客主要更新地址&#xff1a;?https://www.cnblogs.com/niceyoo -bash: cnpm: command not found -bash: yarn: command not found -bash: xxxx: command not found 如上yarn/cnpm皆通用&#xff0c;前提是安装成功后报这个错误哈&#xff01; Error: EACCES: permission den…

部署项目到jetty

一、打包项目 1、在pom.xml中添加以下依赖 <dependency><groupId>org.mortbay.jetty</groupId><artifactId>jetty-plus</artifactId><version>7.0.0.pre5</version><scope>provided</scope> </dependency> <de…

maven jar包冲突的发现与解决[工具篇]

本文是我的第177篇文章。 关于jar冲突排查解决的问题&#xff0c;相信很多小伙伴也都知道有一些&#xff0c;无非就是两类&#xff1a;命令 or 工具。 命令方式比如&#xff1a; mvn dependency:tree 工具方式比如&#xff1a; Maven Helper 而今天的主角就是 Maven Helper 了。…

@Path注解

最近用到的一个项目&#xff0c;看到Controller控制层、Method方法都是通篇的Path注解&#xff0c;由于之前并没有使用过该注解&#xff0c;故记此篇。 首先看一下项目中的使用方式&#xff1a; Path("clientWeb")public class ClientWeb { POST Path("/g…

iOS的SVN

1、cornerstone2、smart svn mac &#xff08;比较好用&#xff09;3、还xcode自带的。转载于:https://www.cnblogs.com/YangBinChina/p/8971148.html

导入数据任务(id:373985)异常, 错误信息:解析导入文件错误,请检查导入文件内容,仅支持导入json格式数据及excel文件...

小程序导入&#xff0c;别人导出的数据库json文件&#xff0c;错误信息如下&#xff1a; 导入数据库失败, Error: Poll error, 导入数据任务&#xff08;id:373985&#xff09;异常&#xff0c;错误信息&#xff1a;解析导入文件错误&#xff0c;请检查导入文件内容&#xff0c…

ArrayList与String[]

不逼自己一把&#xff0c;你永远不知道什么是绝望。 今天被初学java的朋友问到了String[]跟ArrayList是不是有关系呢&#xff1f; 猜测是名称之间的联想&#xff0c;记此篇解惑。 Array英语单词里是数组、阵列的意思&#xff0c;在java中数组是这样定义的&#xff1a;一组相关类…

WebStorm取消默认style样式折叠

WebStorm取消默认style样式折叠&#xff1a; File--->Settings打开一个窗口 Edit--->CodeFolding 把HTML style attribute的前面的钩去掉[取消勾选] 右下角点击Apply--->ok

Vue中的computed属性

1、前言 本篇是Vue中最常用到的API之一computed属性&#xff0c;转载信息如下&#xff1a; 作者&#xff1a;gunelark链接&#xff1a;https://www.cnblogs.com/gunelark/p/8492468.html 2、正文 看了网上很多资料&#xff0c;对vue的computed讲解自己看的都不是很清晰&#xf…