RabbitMQ消息的应答

消息的应答机制

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了, RabbitMQ 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵,批量应答的批量范围是channel 上未应答的消息。比如说 channel 上有传送 tag 的消息5,6,7,8 当前 tag 是8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答。但是实际上还是不建议开启批量应答的。

image-20220530100237882

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。

Channel.basicNack(用于否定确认)

不处理该消息了直接拒绝,可以将其丢弃了。

Channel.basicReject(用于否定确认)

与 Channel.basicNack 相比少一个参数。

不处理该消息了直接拒绝,可以将其丢弃了。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

代码实现

生产者
public class MyProducer {@Testpublic void test() throws Exception {// 队列名称String queue = "xw_queue";String message = "Hello World -> ";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 0; i < 20; i++) {// 发布消息channel.basicPublish("xw_exchange", queue, null, (message + i).getBytes());}}
}
消费者1

开启手动确认后,消费者1如果在处理消息的回调中不确认消息,那么队列中的消息会处于unacked的状态,如果消费者1突然挂掉,那么这些未确认的消费会重新发送给其他消费者。

public class MyConsumer1 {public static void main(String[] args) throws Exception {// 队列名称String queue = "xw_queue";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(queue, true, false, false, null);channel.queueBind("", "xw_exchange", queue);// 配置开启手动应答channel.basicConsume(queue, false, new DefaultConsumer(channel) {@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费完成后手动应答// channel.basicAck(envelope.getDeliveryTag(), false);Thread.sleep(5000);System.out.println("消费者1:接收到消息: " + new String(body));}});}
}
消费者2

消费者2正常消费消息,收到消息后立刻确认。

public class MyConsumer2 {public static void main(String[] args) throws Exception {// 队列名称String queue = "xw_queue";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(queue, true, false, false, null);channel.queueBind("", "xw_exchange", queue);// 配置开启手动应答channel.basicConsume(queue, false, new DefaultConsumer(channel) {@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费完成后手动应答channel.basicAck(envelope.getDeliveryTag(), false);System.out.println("消费者2:接收到消息: " + new String(body));}});}
}

效果展示

image-20220530102415347

image-20220530102445700

此时把消费者1停掉,那么上面这10条为确认的消费会重新入队,发送给另外的消费者。

image-20220530102642150

image-20220530102708467

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/182665.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机视觉:使用dlib实现人脸检测

1 dlib介绍 Dlib是一个广泛使用的开源库&#xff0c;在计算机视觉和机器学习领域具有重要影响。它是由Davis King在2002年开发&#xff0c;主要用C语言编写&#xff0c;但也提供了Python接口。Dlib结合了高效的算法和易用性&#xff0c;使其成为学术界和工业界的热门选择。 1.…

SpringBoot项目启动后自动停止了?

1 现象 2023-11-22T09:05:13.36108:00 DEBUG 17521 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state LivenessState changed to CORRECT 2023-11-22T09:05:13.36208:00 DEBUG 17521 --- [ main] o.s.b.a.Applicat…

一文1000字彻底搞懂Web测试与App测试的区别

总结分享一些项目需要结合Web测试和App测试的工作经验给大家&#xff1a; 从功能测试区分&#xff0c;Web测试与App测试在测试用例设计和测试流程上没什么区别。 而两者的主要区别体现在如下几个方面&#xff1a; 1 系统结构方面 Web项目&#xff0c;B/S架构&#xff0c;基…

Android中实现RecyclerView,并对item及其多个子控件的点击事件监听

目录 背景 实现RecyclerView 第一步、 新建item的xml 第二步、在activity的布局中引入 RecyclerView 第三步、新建一个adapter 第四步、在activity中初始化绑定adapter即可 实现item及其多个子组件点击事件监听 第一步、 适配器中创建监听对象 第二步、适配器中绑定监听…

uniapp ios 授权弹窗 uniapp弹出框怎么实现

新版本的信息弹窗组件 可以弹出很多条信息&#xff0c;并单独控制消失时间、点击消失。 用循环来生成很多个弹窗&#xff0c;用this.$refs来传值&#xff0c;并添加数组。 1.布局 2.js 具体流程。需要一个弹窗&#xff0c;基本信息传入组件&#xff0c;处理后添加入数组&am…

智能优化算法应用:基于闪电搜索算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于闪电搜索算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于闪电搜索算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.闪电搜索算法4.实验参数设定5.算法结果6.参考…

【C++】异常处理 ① ( 异常概念引入 | 抛出异常语法 | 捕获异常语法 | 异常捕获流程 | 异常处理代码示例 )

文章目录 一、异常处理1、异常概念引入2、抛出异常语法3、捕获异常语法4、异常捕获流程 二、异常处理代码示例1、错误代码示例 - 抛出异常 / 不捕获异常2、正确代码示例 - 抛出异常 / 捕获异常3、正确代码示例 - 抛出异常 / 捕获异常不处理继续抛出异常 一、异常处理 1、异常概…

接口测试入门8问(含答案+文档)

Q1&#xff1a;什么是接口测试&#xff0c;基础知识什么的讲讲吧&#xff01; A&#xff1a;你好&#xff0c;接口可以分下面几种 1、系统与系统之间的调用&#xff0c;比如银行会提供接口供电子商务网站调用&#xff0c;或者说&#xff0c;支付宝会提供接口给淘宝调用 2、上…

Table和HashBasedTable的使用案例

------------------- 1.普通使用 package org.example.testhashbasedtable;import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table;import java.util.Map;public class TestHashBasedTable {public static void main(String[] args) {Ta…

【方法】PowerPoint如何删除“限制编辑”?

如果PPT文件设置成“只读模式”&#xff0c;就会被限制编辑&#xff0c;也就是无法对PPT进行编辑或更改&#xff0c;那要如何删除这个“限制”呢&#xff1f; 下面小编会按照“无密码的只读方式”、“有密码的只读方式”以及“忘记了密码的只读方式”这3种情况&#xff0c;来说…

enote笔记法之附录2——5w1h2k关联词(ver0.22)

enote笔记法之附录2——5w1h2k关联词&#xff08;ver0.22&#xff09; 最上面的是截屏的完整版&#xff0c;分割线下面的是纯文字版本&#xff1a; 作者姓名&#xff08;本人的真实姓名&#xff09;&#xff1a;胡佳吉 居住地&#xff1a;上海 作者网名&#xff1a;EverSt…

【从删库到跑路 | MySQL总结篇】表的增删查改(进阶下)

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【MySQL学习专栏】&#x1f388; 本专栏旨在分享学习MySQL的一点学习心得&#xff0c;欢迎大家在评论区讨论&#x1f48c; 目录 一、联合…

【接口技术】实验3:可编程并行接口8255

实验3 可编程并行接口8255实验 一、实验目的 1&#xff1a;了解8255芯片结构及编程方法。 2&#xff1a;了解8255输入/输出实验方法。 3&#xff1a;掌握8255控制键盘及显示电路的基本功能及编程方法。 4&#xff1a;掌握一般键盘和显示电路的工作原理。 二、实验内容 1&…

WS2812灯条基于WLED开源项目无门槛使用简介

WS2812灯条基于WLED开源项目无门槛使用简介 &#x1f4cc;项目github地址&#xff1a;https://github.com/Aircoookie/WLED&#x1f4cd;WLED详情地址&#xff1a;https://kno.wled.ge/&#x1f388;网页在线烧录固件地址&#xff1a;https://install.wled.me/ ✨ 仅作为使用的…

安全技术与防火墙

目录 一、安全技术 1、安全技术 2、防火墙的分类 二、netfilter 1、netfilter简述 2、防火墙工具 1.iptables工具 2.netfilter的四表五链 3.内核中数据包的传输过程 4.三种报文流向 5.实操 总结&#xff1a;本章主要介绍了安全技术与防火墙 一、安全技术 1、安全技…

解决Unable to preventDefault inside passive event listener invocation.报错

报错信息&#xff1a; 这个报错大致说的是&#xff1a;无法在被动事件侦听器调用中防止Default 查了其他博主的解决办法&#xff1a;比如&#xff1a; 1、声明事件监听的时候设置为主动事件监听&#xff1a; window.addEventListener(‘touchmove’, handler, { passive: fal…

【软件测试学习】—软件测试模型(二)

【软件测试学习】—软件测试模型&#xff08;二&#xff09; 我 | 在这里 &#x1f469;‍&#x1f9b0;&#x1f469;‍&#x1f9b0; 读书 | 长沙 ⭐计算机科学与技术 ⭐ 本科 【2024届】 &#x1f383;&#x1f383; 爱好 | 旅游、跑步、网易云、美食、摄影 &#x1f396;️…

VR特警野外武装仿真虚拟训练实操教学保证训练效果

特警VR模拟仿真训练软件的优势主要体现在以下几个方面&#xff1a; 真实感和沉浸感&#xff1a;通过VR技术&#xff0c;特警可以在虚拟环境中体验真实的训练场景&#xff0c;如人质解救、反恐行动等。这种真实感和沉浸感可以帮助特警更好地理解和适应实际情况&#xff0c;提高训…

GoLang切片

一、切片基础 1、切片的定义 切片&#xff08;Slice&#xff09;是一个拥有相同类型元素的可变长度的序列它是基于数组类型做的一层封装它非常灵活&#xff0c;支持自动扩容切片是一个引用类型&#xff0c;它的内部结构包含地址、长度和容量声明切片类型的基本语法如下&#…

阿里巴巴矢量图标库的使用

iconfont-阿里巴巴矢量图标库iconfont-国内功能很强大且图标内容很丰富的矢量图标库&#xff0c;提供矢量图标下载、在线存储、格式转换等功能。阿里巴巴体验团队倾力打造&#xff0c;设计和前端开发的便捷工具https://www.iconfont.cn/ 今天来介绍一下阿里巴巴矢量图标库的使用…