RabbitMQ消息确认机制

文章目录

      • 1. 事务机制
      • 2. Confirm模式
        • 2.1 生产者
          • 2.1.1 普通Confirm模式
          • 2.1.2 批量Confirm模式
          • 2.1.3 异步Confirm模式
        • 2.2 消费者
      • 3. 其他

消费者如何确保消息一定能够消费成功呢?

由于在前面工作队列模式里面我们了解了应答模式,所以我们可以很自信的回答如上题目。

通过应答形式,默认自动应答,可以修改为手动应答来保证消息消费成功。

其实应答形式就是 RabbitMQ 消息确认机制的一种体现,我们再来看看问题的产生背景:

生产者发送消息出去之后,不知道到底有没有发送到 RabbitMQ 服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。

两种解决方案:

  1. AMQP 事务机制
  2. Confirm 模式

1. 事务机制

事务机制分为三部分,开启事务,提交事务,事务回滚,如下:

  1. txSelect 将当前 channel 通道设置为 transaction 模式(开启事务)
  2. txCommit 提交当前事务
  3. txRollback 事务回滚

我们通过一个例子模拟消息生产者发送消息过程发生异常,进行事务回滚的过程。

public class Producer {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.发送消息 */try {/** 4.1 开启事务 */channel.txSelect();String msg = "我是生产者生成的消息";System.out.println("生产者发送消息:"+msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());/** 4.2 提交事务 - 模拟异常 */int i = 1/0;channel.txCommit();}catch (Exception e){e.printStackTrace();System.out.println("发生异常,我要进行事务回滚了!");/** 4.3 事务回滚 */channel.txRollback();}finally {channel.close();newConnection.close();}}}

打印结果:
生产者发送消息:我是生产者生成的消息
java.lang.ArithmeticException: / by zero at club.sscai.producer.Producer.main(Producer.java:37)
发生异常,我要进行事务回滚了!

2. Confirm模式

像上方这种采用 AMQP 事务机制来保证消息的准确到达,在一定程度上是消耗了性能的,所以我们再来看看 Confirm 模式。

Confirm 模式分为两块,一是生产者的 Confirm 模式,再就是消费者的 Confirm 模式。

2.1 生产者

通过生产者的确认模式我们是要保证消息准确达到客户端,而与 AMQP 事务不同的是 Confirm 是针对一条消息的,而事务是可以针对多条消息的。

Confirm 模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。

Confirm 的三种实现方式:

  1. channel.waitForConfirms() 普通发送方确认模式;
  2. channel.waitForConfirmsOrDie() 批量确认模式;
  3. channel.addConfirmListener() 异步监听发送方确认模式
2.1.1 普通Confirm模式
public class Producer11 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();/** 5.发送消息 */for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());try {if (channel.waitForConfirms()) {System.out.println("发送成功");}else{System.out.println("进行消息重发");}} catch (InterruptedException e) {e.printStackTrace();}}/** 5.关闭通道、连接 */channel.close();newConnection.close();}
}

在推送消息之前,channel.confirmSelect() 声明开启发送方确认模式,再使用channel.waitForConfirms() 等待消息被服务器确认即可。

2.1.2 批量Confirm模式
public class Producer22 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();/** 5.发送消息 */for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, null, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());}/** 6.直到所有信息都发布,只要有一个未确认就会IOException */channel.waitForConfirmsOrDie();System.out.println("全部执行完成");/** 5.关闭通道、连接 */channel.close();newConnection.close();}
}

channel.waitForConfirmsOrDie() 使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出 IOException 异常。

2.1.3 异步Confirm模式
public class Producer33 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();for (int i = 0; i < 10; i++) {String message = "我是生产者生成的消息:" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));}/** 5.发送消息 异步监听确认和未确认的消息 */channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未确认消息,标识:" + deliveryTag);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));}});/** 6.关闭通道、连接 *//** channel.close();*//** newConnection.close();*/}}

异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可,以上异步返回的信息如下:

可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的 multiple 的参数,此参数为 bool 值,如果 true 表示批量执行了 deliveryTag 这个值以前的所有消息,如果为 false 的话表示单条确认。

维持异步调用要求我们不能断掉连接,因此注释掉第6步。

2.2 消费者

为了保证消息从队列可靠地到达消费者,RabbitMQ 提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false 时, RabbitMQ 会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ 会在队列中消息被消费后立即删除它。

在消费者中 Confirm 模式又分为手动确认和自动确认。

关于两者的介绍:

自动确认: 在自动确认模式下,消息在发送后立即被认为是发送成功。 这种模式可以提高吞吐量(只要消费者能够跟上),不过会降低投递和消费者处理的安全性。 这种模式通常被称为“发后即忘”。 与手动确认模式不同,如果消费者的TCP连接或信道在成功投递之前关闭,该消息则会丢失。

手动确认: 使用自动确认模式时需要考虑的另一件事是消费者过载。 手动确认模式通常与有限的信道预取一起使用,限制信道上未完成(“进行中”)传送的数量。 然而,对于自动确认,根据定义没有这样的限制。 因此,消费者可能会被交付速度所压倒,可能积压在内存中,堆积如山,或者被操作系统终止。 某些客户端库将应用TCP反压(直到未处理的交付积压下降超过一定的限制时才停止从套接字读取)。 因此,只建议当消费者可以有效且稳定地处理投递时才使用自动投递方式。

综上:尽量选择手动确认方式。

主要实现代码:

// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);// 关闭自动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3. 其他

1、如果 RabbitMQ 服务器宕机了,消息会丢失吗?

不会丢失,RabbitMQ 服务器支持消息持久化机制,会把消息持久化到硬盘上。

2、如何确保消息正确地发送至RabbitMQ?

RabbitMQ 使用发送方确认模式,确保消息正确地发送到 RabbitMQ。


发送方确认模式:将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。


发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。


我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下微信公众号哈:niceyoo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/414179.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

工作292:数据绑定逻辑处理

},confirmAssociation() {if(this.selected!"") {putAction(this.url.put / this.task_id /bound, {content_id: this.selected}).then(res > {this.$message.success("绑定成功");this.$emit("ok")this.$refs["dialog"].close(…

工作293:调节删除顺序删除

}this.$confirm(您确定删除吗?, 提示, {confirmButtonText: 确定,cancelButtonText: 取消,type: warning}).then(res>{this.loadingtruedeleteAction(path.join(this.url.delete, record.id))/* this.list()*/this.reload()this.$message.success("删除成功");th…

RabbitMQ消息幂等性问题

文章目录1. 什么是幂等性&#xff1f;1.1 消息队列的幂等性1.2 模拟重试机制1.2.1 生产者代码1.2.2 消费者代码1.2.3 消费者 application.yml 配置2. 如何保证消息幂等性&#xff0c;不被重复消费&#xff1f;解决方法1. 什么是幂等性&#xff1f; 在编程中一个幂等操作的特点是…

JAVA面向对象明星类

public class _01Celebrity{//属性public String name;public int age;public double height;public char gender;//构造器public _01Celebrity(String name,int age,double height,char gender){this.name name;this.age age;this.height height;this.gender gender;}//方…

工作287:命名报错

return:{data:{account_id: ,BindData: [],RomoteData:[],dialogVisible: false,ff_account_index: ,form:{},}},这种命名报错

Centos安装JDK(java环境)

王小私下问我 centos 中 jdk 怎么安装呀&#xff0c;所以再次整理了这篇基础环境搭建的文章。 1、创建java目录2、下载上传jdk3、解压jdk4、配置环境变量 1、创建java目录 首先我们创建java的安装目录 cd /usrmkdir javacd java 2、下载上传jdk 我们如上在 usr 目录下创建了 ja…

iOS用workspace和cocoapods管理多个项目

工作空间下多工程共享cocoapods第三方库的方法 引自 https://www.jianshu.com/p/e3cfae830985转载于:https://www.cnblogs.com/-WML-/p/8946370.html

工作288:根据时间戳处理接口

<template><div class"table-list-page"><div class"query-area"><el-date-pickerv-model"value1"type"daterange"range-separator"至"start-placeholder"开始日期"end-placeholder"结…

Centos7安装MySQL(多图)

文章目录一、在线安装1、替换网易yum源2、清理缓存3、下载rpm文件4、安装MySQL数据库二、本地安装1、上传MySQL安装包2、安装依赖的程序包3、卸载mariadb程序包4、安装MySQL程序包5、修改MySQL目录权限6、初始化MySQL三、启动MySQL1.1、在线安装方式启动MySQL1.2、本地安装方式…

ROS与Arduino学习(六)Logging日志

ROS与Arduino学习&#xff08;六&#xff09;Logging日志 Tutorial Level:客户端与服务器 Next Tutorial&#xff1a;小案例节点通信 本节较为简单告诉大家如何向系统发布日志信息。 Tips 1 日志信息发布 节点提供了五种日志消息&#xff0c;分别是debug、information、warn、…

工作289:js取整

只保留整数部分&#xff08;丢弃小数部分&#xff09; parseInt(5.1234); // 5 向下取整&#xff08;< 该数值的最大整数&#xff0c;和parseInt()一样) Math.floor(5.1234); // 5 向上取整&#xff08;有小数&#xff0c;整数部分就1&#xff09; Math.ceil(5…

读书笔记8-浪潮之巅(part3)

浪潮之巅 ——风险投资 《浪潮之巅》的前半部分列举了在现代史上举足轻重的几家大型科技公司的历史&#xff0c;虽说成功的公司各有各的绝招&#xff0c;但是读多之后又略显重复、无聊&#xff08;这不是说原书的内容、描述是无聊的&#xff0c;相反其中的每一篇都值得多次欣赏…

工作290:js日期操作

Js获取当前日期时间及其它操作 var myDate new Date(); myDate.getYear(); //获取当前年份(2位) myDate.getFullYear(); //获取完整的年份(4位,1970-????) myDate.getMonth(); //获取当前月份(0-11,0代表1月) myDate.getDate(); //获取当前日(1-31…

lower_case_table_names=1 启动报错 mysql8.0

本文为采集文章&#xff0c;主博客地址&#xff1a;https://www.cnblogs.com/niceyoo 我们知道在 Linux 环境下默认是区分大小写的&#xff0c;所以我们需要改变这种默认方式&#xff0c;经过网上各种搜索后&#xff0c;基本就是清一色的修改 lower_case_table_names&#xff0…

微信小程序 weui 使用方法

https://github.com/Tencent/weui-wxss/ 下载地址用于小程序的https://github.com/Tencent/weui 下载地址用于H5 运用示例在下载文件的文件夹 weui-wxss-master\dist\example目录下小程序全局用法在app.wxss用import "weui.wxss"转载于:https://www.cnblogs.com/ch…

工作291:当前账号是否绑定操作

<template><el-dialogopen"open"title"绑定分发平台账号":visible.sync"dialogVisible"width"40%":before-close"handleClose"><el-form style"display: flex;justify-content: center;align-items: ce…

Redis之客户端连接方式

Reis的客户端连接方式有如下几种&#xff1a; 1.基本方式 /*** 简单基本方式调用*/Testpublic void test1JedisStandardClient() {Jedis jedis new Jedis("192.168.56.101", 6379);jedis.set("123", "first line is null");String valueString …

工作292:修改父子组件传值错误

[Vue warn]: Missing required prop: “title” 在写vue项目中&#xff0c;在子组件中通过props传值的时候&#xff0c;在父组件中没有定义的话就会看到类似的报错&#xff0c; 这个意思是calendar这个组件中通过props传递一个title属性给父组件&#xff0c;并且title属性是必…

MacOS下IDEA设置智能提示不区分大小写

本文只针对&#xff0c;IDEA-2019.2.3版本 目录地址&#xff1a; Edit -> General -> Code Completion -> Match case -> 勾选去掉 截图如下&#xff1a;

工作293:新的打印操作

getAction("/task/" id "/release").then(res > {console.log(res, 8888)if (res.code 404) {this.$message({message: res.msg,type: error});this.dialogVisible false;}