RabbitMQ消息的应答

消息的应答机制

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了, RabbitMQ 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵,批量应答的批量范围是channel 上未应答的消息。比如说 channel 上有传送 tag 的消息5,6,7,8 当前 tag 是8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答。但是实际上还是不建议开启批量应答的。

image-20220530100237882

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。

Channel.basicNack(用于否定确认)

不处理该消息了直接拒绝,可以将其丢弃了。

Channel.basicReject(用于否定确认)

与 Channel.basicNack 相比少一个参数。

不处理该消息了直接拒绝,可以将其丢弃了。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

代码实现

生产者
public class MyProducer {@Testpublic void test() throws Exception {// 队列名称String queue = "xw_queue";String message = "Hello World -> ";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 0; i < 20; i++) {// 发布消息channel.basicPublish("xw_exchange", queue, null, (message + i).getBytes());}}
}
消费者1

开启手动确认后,消费者1如果在处理消息的回调中不确认消息,那么队列中的消息会处于unacked的状态,如果消费者1突然挂掉,那么这些未确认的消费会重新发送给其他消费者。

public class MyConsumer1 {public static void main(String[] args) throws Exception {// 队列名称String queue = "xw_queue";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(queue, true, false, false, null);channel.queueBind("", "xw_exchange", queue);// 配置开启手动应答channel.basicConsume(queue, false, new DefaultConsumer(channel) {@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费完成后手动应答// channel.basicAck(envelope.getDeliveryTag(), false);Thread.sleep(5000);System.out.println("消费者1:接收到消息: " + new String(body));}});}
}
消费者2

消费者2正常消费消息,收到消息后立刻确认。

public class MyConsumer2 {public static void main(String[] args) throws Exception {// 队列名称String queue = "xw_queue";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(queue, true, false, false, null);channel.queueBind("", "xw_exchange", queue);// 配置开启手动应答channel.basicConsume(queue, false, new DefaultConsumer(channel) {@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费完成后手动应答channel.basicAck(envelope.getDeliveryTag(), false);System.out.println("消费者2:接收到消息: " + new String(body));}});}
}

效果展示

image-20220530102415347

image-20220530102445700

此时把消费者1停掉,那么上面这10条为确认的消费会重新入队,发送给另外的消费者。

image-20220530102642150

image-20220530102708467

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/182665.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AM@微分中值定理的证明问题举例

文章目录 微分学中出现的几个中值定理中值定理证明存在性不等式或等式问题 辅助函数的构造例例例例分析方法1方法2方法3 微分学中出现的几个中值定理 Rolle中值定理(Rolle定理)Lagrange中值定理Cauchy中值定理Taylor中值定理其中Rolle定理是最基础的,而Largrange可视为Cauchy中…

计算机视觉:使用dlib实现人脸检测

1 dlib介绍 Dlib是一个广泛使用的开源库&#xff0c;在计算机视觉和机器学习领域具有重要影响。它是由Davis King在2002年开发&#xff0c;主要用C语言编写&#xff0c;但也提供了Python接口。Dlib结合了高效的算法和易用性&#xff0c;使其成为学术界和工业界的热门选择。 1.…

SpringBoot项目启动后自动停止了?

1 现象 2023-11-22T09:05:13.36108:00 DEBUG 17521 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state LivenessState changed to CORRECT 2023-11-22T09:05:13.36208:00 DEBUG 17521 --- [ main] o.s.b.a.Applicat…

一文1000字彻底搞懂Web测试与App测试的区别

总结分享一些项目需要结合Web测试和App测试的工作经验给大家&#xff1a; 从功能测试区分&#xff0c;Web测试与App测试在测试用例设计和测试流程上没什么区别。 而两者的主要区别体现在如下几个方面&#xff1a; 1 系统结构方面 Web项目&#xff0c;B/S架构&#xff0c;基…

冒泡排序(适合编程新手的体质)

冒泡排序&#xff1a;简单而高效的排序技巧 欢迎来到我们今天的博客&#xff0c;我们将一起探索计算机科学中最基本但同时也非常重要的概念之一&#xff1a;冒泡排序。无论你是编程新手还是有一些编程经验的读者&#xff0c;这篇博客都将帮助你更好地理解冒泡排序的原理和应用…

Android中实现RecyclerView,并对item及其多个子控件的点击事件监听

目录 背景 实现RecyclerView 第一步、 新建item的xml 第二步、在activity的布局中引入 RecyclerView 第三步、新建一个adapter 第四步、在activity中初始化绑定adapter即可 实现item及其多个子组件点击事件监听 第一步、 适配器中创建监听对象 第二步、适配器中绑定监听…

uniapp ios 授权弹窗 uniapp弹出框怎么实现

新版本的信息弹窗组件 可以弹出很多条信息&#xff0c;并单独控制消失时间、点击消失。 用循环来生成很多个弹窗&#xff0c;用this.$refs来传值&#xff0c;并添加数组。 1.布局 2.js 具体流程。需要一个弹窗&#xff0c;基本信息传入组件&#xff0c;处理后添加入数组&am…

Linux unset命令详解:如何删除已定义的 shell 变量(包括环境变量)和 shell 函数(附实例教程和注意事项)

Linux unset命令介绍 unset是一个内建于Linux的命令&#xff0c;用于在程序执行过程中删除变量&#xff08;包括环境变量&#xff09;或函数。unset命令可以删除函数和shell变量。如果指定了"varName"&#xff0c;它将指向一个变量名&#xff0c;shell将取消设置它并…

智能优化算法应用:基于闪电搜索算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于闪电搜索算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于闪电搜索算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.闪电搜索算法4.实验参数设定5.算法结果6.参考…

【C++】异常处理 ① ( 异常概念引入 | 抛出异常语法 | 捕获异常语法 | 异常捕获流程 | 异常处理代码示例 )

文章目录 一、异常处理1、异常概念引入2、抛出异常语法3、捕获异常语法4、异常捕获流程 二、异常处理代码示例1、错误代码示例 - 抛出异常 / 不捕获异常2、正确代码示例 - 抛出异常 / 捕获异常3、正确代码示例 - 抛出异常 / 捕获异常不处理继续抛出异常 一、异常处理 1、异常概…

Java面向对象第8天

精华笔记&#xff1a; 接口&#xff1a; 是一种引用数据类型 由interface定义 只能包含常量和抽象方法 不能被实例化 接口是需要被实现/继承的&#xff0c;实现类/派生类&#xff1a;必须重写接口中的所有抽象方法 一个类可以实现多个接口&#xff0c;用逗号分隔。若又继承…

接口测试入门8问(含答案+文档)

Q1&#xff1a;什么是接口测试&#xff0c;基础知识什么的讲讲吧&#xff01; A&#xff1a;你好&#xff0c;接口可以分下面几种 1、系统与系统之间的调用&#xff0c;比如银行会提供接口供电子商务网站调用&#xff0c;或者说&#xff0c;支付宝会提供接口给淘宝调用 2、上…

Table和HashBasedTable的使用案例

------------------- 1.普通使用 package org.example.testhashbasedtable;import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table;import java.util.Map;public class TestHashBasedTable {public static void main(String[] args) {Ta…

【方法】PowerPoint如何删除“限制编辑”?

如果PPT文件设置成“只读模式”&#xff0c;就会被限制编辑&#xff0c;也就是无法对PPT进行编辑或更改&#xff0c;那要如何删除这个“限制”呢&#xff1f; 下面小编会按照“无密码的只读方式”、“有密码的只读方式”以及“忘记了密码的只读方式”这3种情况&#xff0c;来说…

enote笔记法之附录2——5w1h2k关联词(ver0.22)

enote笔记法之附录2——5w1h2k关联词&#xff08;ver0.22&#xff09; 最上面的是截屏的完整版&#xff0c;分割线下面的是纯文字版本&#xff1a; 作者姓名&#xff08;本人的真实姓名&#xff09;&#xff1a;胡佳吉 居住地&#xff1a;上海 作者网名&#xff1a;EverSt…

【从删库到跑路 | MySQL总结篇】表的增删查改(进阶下)

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【MySQL学习专栏】&#x1f388; 本专栏旨在分享学习MySQL的一点学习心得&#xff0c;欢迎大家在评论区讨论&#x1f48c; 目录 一、联合…

【接口技术】实验3:可编程并行接口8255

实验3 可编程并行接口8255实验 一、实验目的 1&#xff1a;了解8255芯片结构及编程方法。 2&#xff1a;了解8255输入/输出实验方法。 3&#xff1a;掌握8255控制键盘及显示电路的基本功能及编程方法。 4&#xff1a;掌握一般键盘和显示电路的工作原理。 二、实验内容 1&…

WS2812灯条基于WLED开源项目无门槛使用简介

WS2812灯条基于WLED开源项目无门槛使用简介 &#x1f4cc;项目github地址&#xff1a;https://github.com/Aircoookie/WLED&#x1f4cd;WLED详情地址&#xff1a;https://kno.wled.ge/&#x1f388;网页在线烧录固件地址&#xff1a;https://install.wled.me/ ✨ 仅作为使用的…

linux查询某个进程使用的内存量

linux查询某个进程使用的内存量 查进程用的内存&#xff0c;查看进程占用的内存量&#xff0c;centos查询内存使用 查某个进程id使用的内存量 ps -p 24450 -o rss | awk {print int($1/1024)"MB"} 该命令的含义是&#xff1a; ps -p 24450: 查找进程ID为24450的进…

Nginx漏洞复现与分析

Nginx如何处理PHP请求 Nginx本身不支持直接解析和执行PHP代码,但可以通过与PHP解释器的集成来处理PHP请求。一种常见的方法是使用PHP-FPM(FastCGI Process Manager)作为PHP解释器。 原理图: Step 1 Step 2 +---------------------+ …