rabbitmq的消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费者在接
收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

消息应答的方法 

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

Channel.basicNack(用于否定确认)

Channel.basicReject(用于否定确认)

与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了

自动应答 

消息发送后立即被认为已经传送成功,这种模式需要在 高吞吐量和数据传输安全性方面做权
,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢
失了,当然另一方面这种模式消费者那边可以传递过载的消息, 没有对传递的消息数量进行限制
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终
使得内存耗尽,最终这些消费者线程被操作系统杀死, 所以这种模式仅适用在消费者可以高效并
以某种速率能够处理这些消息的情况下使用。默认消息采用的是自动应答.

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵multiple 的 true 和 false 代表不同意思

true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

消费者1 

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1 等待接收消息处理时间较短");DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);try {// 睡眠1秒Thread.sleep(1000*1);} catch (InterruptedException e) {throw new RuntimeException(e);}// 消息标记tag,2.false代表只应答接收到的那个传递的消息,true为应答所有消息包括传递过来的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费......");boolean autoAck=false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

 消费者2


import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker02 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2 等待接收消息处理时间较长");DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);try {// 睡眠30秒Thread.sleep(1000*30);} catch (InterruptedException e) {throw new RuntimeException(e);}// 消息标记tag,2.false代表只应答接收到的那个传递的消息,true为应答所有消息包括传递过来的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C2 消费者启动等待消费......");boolean autoAck=false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

生产者 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world!!!!";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/for (int i = 0; i < 10; i++) {channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}System.out.println("消息发送完毕");}}
}

结果

在发送者发送消息 ,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是
由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,
此时会看到消息被 C1 接收到了,说明消息  被重新入队,然后分配给能处理消息的 C1 处理了 

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息
未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者
可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/37145.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据分析两件套ClickHouse+Metabase(一)

ClickHouse篇 安装ClickHouse ClickHouse有中文文档, 安装简单 -> 文档 官方提供了四种包的安装方式, deb/rpm/tgz/docker, 自行选择适合自己操作系统的安装方式 这里我们选deb的方式, 其他方式看文档 sudo apt-get install -y apt-transport-https ca-certificates dirm…

魔改 axuanup 的 aardio和python 猜拳游戏 代码

根据 axuanup 的 aardio和python 猜拳游戏 代码&#xff0c;魔改了一个风格不一样的代码。 争取做到代码尽量“简”&#xff0c;但还没到“变态简”的程度&#xff0c;因为还能看懂。 原文&#xff1a;aardio和python 猜拳游戏-自由交流乐园-Aardio资源网 代码如下&#xff…

【Flutter】【基础】CustomPaint 绘画功能(一)

功能&#xff1a;CustomPaint 相当于在一个画布上面画画&#xff0c;可以自己绘制不同的颜色形状等 在各种widget 或者是插件不能满足到需求的时候&#xff0c;可以自己定义一些形状 使用实例和代码&#xff1a; CustomPaint&#xff1a; 能使你绘制的东西显示在你的ui 上面&a…

竞赛项目 酒店评价的情感倾向分析

前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 酒店评价的情感倾向分析 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng-senior/post…

用Python做一个滑雪小游戏

游戏是让人娱乐和放松的好方式&#xff0c;而编写和玩自己的游戏则是一种特别有趣的体验。在本文中&#xff0c;我们将使用Python和pygame库来创建一个简单的滑雪小游戏。通过这个小游戏项目&#xff0c;我们将学习如何使用Python编程语言来制作自己的游戏&#xff0c;并且享受…

IT运维:使用数据分析平台监控深信服防火墙

概述 深信服防火墙自身监控可以满足绝大部分需求&#xff0c;比如哪个应用占了最大带宽&#xff0c;哪个用户访问了哪些网站&#xff1f;这里我们为什么使用鸿鹄呢&#xff1f;因为我们要的是数据的处理和分析&#xff0c;比如某个用户在某个事件都做了哪些行为&#xff0c;这个…

【设计模式】前端控制器模式

前端控制器模式&#xff08;Front Controller Pattern&#xff09;是用来提供一个集中的请求处理机制&#xff0c;所有的请求都将由一个单一的处理程序处理。该处理程序可以做认证/授权/记录日志&#xff0c;或者跟踪请求&#xff0c;然后把请求传给相应的处理程序。以下是这种…

基于鲲鹏平台Ceph深度性能调优

刘亮奇 架构师技术联盟 2021-04-12 07:50 摘自&#xff1a; https://mp.weixin.qq.com/s/o9HH-8TF0DbMqHrvsFh1NA 随着 IOT、大数据、移动互联等应用的暴涨&#xff0c;产生的数据也越来越多&#xff0c;整个存储市场总量也逐年增长&#xff0c;预计到 2021 年分布式存储会占到…

UNIX基础知识:UNIX体系结构、登录、文件和目录、输入和输出、程序和进程、出错处理、用户标识、信号、时间值、系统调用和库函数

引言&#xff1a; 所有的操作系统都为运行在其上的程序提供服务&#xff0c;比如&#xff1a;执行新程序、打开文件、读写文件、分配存储区、获得系统当前时间等等 1. UNIX体系结构 从严格意义上来说&#xff0c;操作系统可被定义为一种软件&#xff0c;它控制计算机硬件资源&…

CTFshow 限时活动 红包挑战7、红包挑战8

CTFshow红包挑战7 写不出来一点&#xff0c;还是等了官方wp之后才复现。 直接给了源码 <?php highlight_file(__FILE__); error_reporting(2);extract($_GET); ini_set($name,$value);system("ls ".filter($_GET[1])."" );function filter($cmd){$cmd…

【图像分类】理论篇(2)经典卷积神经网络 Lenet~Densenet

1、卷积运算 在二维卷积运算中&#xff0c;卷积窗口从输入张量的左上角开始&#xff0c;从左到右、从上到下滑动。 当卷积窗口滑动到新一个位置时&#xff0c;包含在该窗口中的部分张量与卷积核张量进行按元素相乘&#xff0c;得到的张量再求和得到一个单一的标量值&#xff0c…

SQL- 每日一题【1327. 列出指定时间段内所有的下单产品】

题目 表: Products 表: Orders 写一个解决方案&#xff0c;要求获取在 2020 年 2 月份下单的数量不少于 100 的产品的名字和数目。 返回结果表单的 顺序无要求 。 查询结果的格式如下。 示例 1: 解题思路 1.题目要求我们获取在 2020 年 2 月份下单的数量不少于 100 的产品的…

IO多路复用

常见的网络IO模型 网络 IO 模型分为四种&#xff1a;同步阻塞 IO(Blocking IO, BIO)、同步非阻塞IO(NIO, NewIO)、IO 多路复用、异步非阻塞 IO(Async IO, AIO)&#xff0c;其中AIO为异步IO&#xff0c;其他都是同步IO 同步阻塞IO 同步阻塞IO&#xff1a;在线程处理过程中&am…

Redis_事务操作

13. redis事务操作 13.1事务简介 原子性(Atomicity) 一致性(Consistency) 隔离性(isolation) 持久性(durabiliby) ACID 13.2 Redis事务 提供了multi、exec命令来完成 第一步&#xff0c;客户端使用multi命令显式地开启事务第二步&#xff0c;客户端把事务中要执行的指令发…

前沿分享-通过经皮神经刺激来治疗糖尿病神经性疼痛

经皮神经电刺激&#xff08;PENS&#xff09;设备用于对糖尿病周围神经病变引起的慢性、顽固性疼痛进行多次治疗。 放在耳朵上的这种可穿戴设备在几天内持续提供低水平的脉冲电流。 这是一种安全有效的非麻醉性替代治疗慢性疼痛的方法。还有一张设备放在糖足上的照片&#xff0…

向量数据库 Milvus Cloud Partition Key:租户数量多,单个租户数据少的三种解决方案

三种解决方案 这个问题提出的时候,Milvus 的最新版本是 2.2.8,我们做个角色互换,在当时站在这个用户的角度,留在我们面前的选择有这么几个: 为每个租户创建一个 collection 为每个租户创建一个 partition 创建一个租户名称的标量字段 接下来,我们依次分析下这三种方案的可…

《零基础实践深度学习》(第2版)学习笔记,(五)深度学习与计算机视觉

文章目录 1. 计算机视觉概述2. 图像分类3. 目标检测 1. 计算机视觉概述 图像分类 目标检测 2. 图像分类 3. 目标检测

01-C++数据类型

3、基础类型 3.1、简单变量 变量的命名 carDrip和cardRip 或boat_sport和boats_port 此外&#xff0c;还有有前缀的命名&#xff0c;使用前缀表示数据类型。常见的前缀有:str&#xff08;表示字符串&#xff09;、n&#xff08;表示整数值&#xff09;、b&#xff08;表示…

深入探究QCheckBox的三种状态及其用法

文章目录 引言&#xff1a;三种状态一、未选中状态&#xff08;0&#xff09;&#xff1a;二、选中状态&#xff08;2&#xff09;&#xff1a;三、部分选中状态&#xff08;1&#xff09;&#xff1a; 判断方法结论&#xff1a; 引言&#xff1a; QCheckBox是Qt框架中常用的复…