RabbitMQ--延迟队列

(一)延迟队列

1.概念

 延迟队列是一种特殊的队列,消息被发送后,消费者并不会立刻拿到消息,而是等待一段时间后,消费者才可以从这个队列中拿到消息进行消费

2.应用场景

 延迟队列的应用场景很多,就比如大部分定时的场景,我们都可以利用延迟队列例如:闹钟定时,预约会议,空调定时开关等

 但是RabbitMQ是没有直接给我们提供延迟队列的,但是我们可以通过上一篇博客说的ttl和死信来达到延迟队列的效果,具体操作如下

 首先我们有一个交换机和一个队列,然后此队列又指定一个死信交换机,死信交换机绑定一个死信队列,然后我们消费者并不是从正常队列中获取消息,而是从死信队列中获取消息,通过给消息/队列设置过期时间来影响消息到达死信队列的时间,消费者拿到消息就会延迟,这样就可以模拟出延迟的效果。

那接下来就是我们的代码实现

首先我们通过设置队列ttl来实现

 @Bean("ttlExchange")public Exchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000).deadLetterExchange(Constants.DEAD_EXCHANGE).deadLetterRoutingKey("dead").build();}@Bean("ttlBind")public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();}@Bean("deadExchange")public Exchange deadExchange(){return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}@Bean("deadQueue")public Queue deadQueue(){return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}@Bean("deadBind")public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();}

然后生产者代码没什么变化

 @RequestMapping("ttl")public String TTLPro(){String s1="ttl test";Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));
//        message.getMessageProperties().setExpiration("10000");RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);return "发送成功";}

只不过消费者订阅的是死信队列

@RabbitListener(queues = Constants.DEAD_QUEUE)public void ListenerQueue2(Message message,Channel channel) throws IOException {long Tag=message.getMessageProperties().getDeliveryTag();try {System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+Tag);int num=3/0;     //模拟失败channel.basicAck(Tag,false);System.out.println("处理完成");}catch (Exception e){channel.basicReject(Tag,false);}}

这样过了5s后我们就可以从死信队列中获取到延迟消息了

那我们再来通过设置消息的ttl来看一下

首先我们要把队列的ttl给取消掉,记得要删队列

 @Bean("ttlExchange")public Exchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE).deadLetterRoutingKey("dead").build();}@Bean("ttlBind")public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();}@Bean("deadExchange")public Exchange deadExchange(){return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}@Bean("deadQueue")public Queue deadQueue(){return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}@Bean("deadBind")public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();}

  然后我们发送一个ttl时间为10s的,再发送一个5s的,我们知道这样两条数据是会发生错误的,因为我们设置消息过期时间,我们RabbitMQ(性能问题)并不会遍历整个消息队列看看谁过没过期,如果过期的消息不在队头,那么只有当使用的时候,才会真正的进行一些过期处理,比如传给死信交换机 

@RequestMapping("ttl")public String TTLPro(){String s1="ttl test";Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));message.getMessageProperties().setExpiration("10000");RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);message.getMessageProperties().setExpiration("5000");RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);return "发送成功";}

如果正常,会在5s后接收到第一个消息,在10s后接收到第二个消息,但是此时我们会同一时间(10s)接收到两条消息

  那这个问题在上一篇ttl的时候就说过了,这里依然是个问题,虽然设置队列的ttl不会有这个问题,但是设置队列ttl我们针对不同延迟时间就需要创建多个队列,这是不太合理的,所以针对这个问题,我们有一个延迟队列的插件可以使用

 3.延迟队列插件

延迟队列插件,会给我们提供一个特殊的交换机,来完成我们的延迟功能

这是我们插件的下载地址

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  我们需要找到ez文件并下载,但是注意这里的版本要与你的RabbitMQ版本可以匹配,否则之后会出现问题

那插件下完后,我们要找到对应目录,下载插件

上面两个目录,我们可以任选一个下载即可,没有这个目录,我们手动创建

然后把下载的ez文件,copy到这个目录中即可,然后我们可以使用命令 rabbitmq-plugins list 来查看插件列表,看看我们有没有成功放进去,但是注意,即使我们成功放进去并成功显示了,也可能会出错,这就可能是你们下载的RabbitMQ版本与整个延迟插件的版本不匹配,重新下载其他版本即可

然后我们启动插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange

之后重启服务service rabbitmq-server restart

在没有发生错误的情况下,我们就发现我们会多了一个默认的交换机

此时我们代码中就不需要声明普通交换机了而是直接使用默认交换机即可

我们生产者代码是需要改一下的,我们需要调用一个方法来设置延迟时间

@RequestMapping("/delay2")
public String delay2() {//发送带ttl的消息 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 20s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(20000L); return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 10s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(10000L); //设置延迟时间 return messagePostProcessor;});return "发送成功!";
}

此时我们就可以在10s正确接收一个消息,在20s正确接收另一个消息 

  注意我们使用TTL+死信时消息传递给交换机后映射之后一直在正常队列中的,等待TTL时间到了把消息给死信交换机再映射到死信队列再拿到消息,我们使用插件的时候,消息是在RabbitMQ给我们提供的那个特殊的交换机中的,等待时间到了,再映射给队列,然后从队列中拿消息

4.常见面试题

介绍下延迟队列

我们可以这样回答:

 延迟队列是一个特殊的队列,消息发送后,消费者并不会立刻拿到,而是等待一定延迟时间后才发送给消费者进行消费

 并且延迟队列的应用场景很多,比如订单支付,智能家电,以及定时邮箱

 但是延迟队列在RabbitMQ中并没有直接给我们提供,我们可以通过TTL+死信的方式或者使用延迟插件的方式来实现延迟功能

 两者的区别:

1.通过TTL+死信

 优点:比较灵活,不需要我们额外引入插件

 缺点:我们设置消息TTL的时候可能会出现顺序的问题,而且我们需要多创建死信队列和死信交换机,完成一些绑定,增加了系统的复杂性

2.基于插件实现的延迟队列

 优点:通过插件能够简化延迟消息的实现,并且避免了时序问题

 缺点:需要依赖插件,不同版本RabbitMQ需要不同版本插件,有运维工作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/67464.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flutter开发-figma交互设计图可以转换为flutter源代码-如何将设计图转换为flutter源代码-优雅草央千澈

flutter开发-figma交互设计图可以转换为flutter源代码-如何将设计图转换为flutter源代码-优雅草央千澈 开发背景 可能大家听过过蓝湖可以转ui设计图为vue.js,react native代码,那么请问听说过将figma的设计图转换为flutter源代码吗?本文优雅草央千澈带…

当设置dialog中有el-table时,并设置el-table区域的滚动,看到el-table中多了一条横线

问题:当设置dialog中有el-table时,并设置el-table区域的滚动,看到el-table中多了一条横线; 原因:el-table有一个before的伪元素作为表格的下边框下,初始的时候已设置,在滚动的时候并没有重新设置…

代理模式实现

一、概念:代理模式属于结构型设计模式。客户端不能直接访问一个对象,可以通过代理的第三者来间接访问该对象,代理对象控制着对于原对象的访问,并允许在客户端访问对象的前后进行一些扩展和处理;这种设置模式称为代理模…

windows 搭建flutter环境,开发windows程序

环境安装配置: 下载flutter sdk https://docs.flutter.dev/get-started/install/windows 下载到本地后,随便找个地方解压,然后配置下系统环境变量 编译windows程序本地需要安装vs2019或更新的开发环境 主要就这2步安装后就可以了&#xff0…

Redis系列之底层数据结构字典Dict

Redis系列之底层数据结构字典Dict Dict数据结构 Dict是Redis数据结构中使用最为频繁的复合型数据结构,本质上是一个哈希表 查看redis6.0版本的源码,链接:https://github.com/redis/redis/blob/6.0/src/dict.h 哈希表的结构定义&#xff1…

【Azure 架构师学习笔记】- Azure Function (2) --实操1

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Function 】系列。 接上文【Azure 架构师学习笔记】- Azure Function (1) --环境搭建和背景介绍 前言 上一文介绍了环境搭建,接下来就在本地环境下使用一下。 环境准备 这里我下载了最新的VS studio&…

【NextJS】PostgreSQL 遇上 Prisma ORM

NextJS 数据库 之 遇上Prisma ORM 前言一、环境要求二、概念介绍1、Prisma Schema Language(PSL) 结构描述语言1.1 概念1.2 组成1.2.1 Data Source 数据源1.2.2 Generators 生成器1.2.3 Data Model Definition 数据模型定义字段(数据)类型和约束关系&…

左神算法基础提升--3

文章目录 Manacher 算法经典算法Manacher算法原理 单调栈或单调队列 Manacher 算法 经典算法 在每学习Manacher算法之前我们可能会使用一种比较经典暴力的算法:遍历str字符串,将字符串中的每个字符作为对称点,向两边扩散找到回文字段&#x…

浅谈操作系统与初识Linux

一、Linux操作系统的出现 1.1操作系统的出现以及相关的四个要素 1.2最早出现的操作系统及其创始人 起初,IBM为了让计算机可以以更低技术成本进行使用,以此来售卖计算机; 为计算机搭载上了Unix操作系统,Unix由肯汤普森用汇编语…

ElasticSearch下

DSL查询 叶子查询:在特定字段里查询特定值,属于简单查询,很少单独使用复合查询:以逻辑方式组合多个叶子查询或更改叶子查询的行为方式 在查询后还可以对查询结果做处理: 排序:按照1个或多个字段做排序分页…

java根据模板导出word,并在word中插入echarts相关统计图片以及表格

引入依赖创建word模板创建ftl模板文件保存的ftl可能会出现占位符分割的问题,需要处理将ftl文件中的图片的Base64删除,并使用占位符代替插入表格,并指定表格的位置在图片下方 Echarts转图片根据模板生成word文档DocUtil导出word文档 生成的wor…

链式前向星的写法

【图论02】动画说图的三种保存方式 降低理解门槛 邻接表 链式前向星 邻接矩阵_哔哩哔哩_bilibili 杭电ACM刘老师-算法入门培训-第12讲-拓扑排序及链式前向星_哔哩哔哩_bilibili 图论003链式前向星_哔哩哔哩_bilibili(链式前向星的遍历) head数组的下标…

想品客老师的第一天:值类型使用

前面两章的摘要 ECMAscript(也就是ES)是JavaScript的一个标准,就像c的c11和c99一样,几把的一年出一套标准 freeze()是一个对象方法,表示锁定、固定一个对象不可改变(因为const对于标量不可变,…

贪心算法(题1)区间选点

输出 2 #include <iostream> #include<algorithm>using namespace std;const int N 100010 ;int n; struct Range {int l,r;bool operator <(const Range &W)const{return r<W.r;} }range[N];int main() {scanf("%d",&n);for(int i0;i&l…

解决本地运行MR程序访问权限问题

文章目录 1. 提出问题2. 解决问题2.1 临时解决方案2.2 永久解决方案 3. 小结 1. 提出问题 运行DeduplicateIPsDriver类&#xff0c;抛出如下异常&#xff1a; 该错误信息表明在尝试运行 DeduplicateIPsDriver 类时&#xff0c;遇到了 HDFS&#xff08;Hadoop 分布式文件系统&a…

html全局遮罩,通过websocket来实现实时发布公告

1.index.html代码示例 <div id"websocket" style"display:none;position: absolute;color:red;background-color: black;width: 100%;height: 100%;z-index: 100; opacity: 0.9; padding-top: 30%;padding-left: 30%; padding-border:1px; "onclick&q…

高通8255 Android STR 启动失败要因分析调查

目录 背景&#xff1a; 调查过程&#xff1a; 步骤1&#xff1a; slog2info | grep vmm_service 步骤2&#xff1a; slog2info | grep qvm 总结&#xff1a; 解决方案 背景&#xff1a; 调试高通8255 STR的STR过程中发现Android和QNX进入STR状态后&#xff0c;脱出STR时…

Linux UDP 编程详解

一、引言 在网络编程领域&#xff0c;UDP&#xff08;User Datagram Protocol&#xff0c;用户数据报协议&#xff09;作为一种轻量级的传输层协议&#xff0c;具有独特的优势和适用场景。与 TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff0…

可解释性机器学习

一、引言 随着机器学习&#xff08;ML&#xff09;在各个领域的广泛应用&#xff0c;模型的复杂度不断增加&#xff0c;如深度神经网络等黑盒模型逐渐成为主流。这些模型虽然具有很高的预测性能&#xff0c;但其内部的决策机制往往难以理解&#xff0c;导致模型的透明度和可解释…

PyTorch使用教程(8)-一文了解torchvision

一、什么是torchvision torchvision提供了丰富的功能&#xff0c;主要包括数据集、模型、转换工具和实用方法四大模块。数据集模块内置了多种广泛使用的图像和视频数据集&#xff0c;如ImageNet、CIFAR-10、MNIST等&#xff0c;方便开发者进行训练和评估。模型模块封装了大量经…