RabbitMQ 是如何做延迟消息的 ?——Java全栈知识(15)

RabbitMQ 是如何做延迟消息的 ?

1、什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递
    如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
    死信交换机有什么作用呢?
  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因 TTL(有效期)到期的消息

2、死信队列

架构:
image.png
由于第一个队列没有消费者,所以可以在第一个队列中设置 TTL,当消息过期的时候,这个消息就变成了死信,被丢掉私信交换机中,以此实现延迟任务功能。

3、延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,大家设想一下这样的场景: 如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒: image.png注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。
消息肯定会被投递到 ttl.queue 之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信: image.png 死信被再次投递到死信交换机 hmall.direct,并沿用之前的 RoutingKey,也就是 blueimage.png 由于 direct.queue1hmall.direct 绑定的 key 是 blue,因此最终消息被成功路由到 direct.queue1,如果此时有消费者与 direct.queue1 绑定,也就能成功消费消息了。但此时已经是5秒钟以后了: image.png 也就是说,publisher 发送了一条消息,但最终 consumer 在5秒后才收到消息。我们成功实现了延迟消息

[!info]
而且,RabbitMQ 中的这个 TTL 是可以设置任意时长的,这相比于 RocketMQ 只支持一些固定的时长而显得更加灵活一些。

死信队列消息堆积问题

[!danger] 死信队列消息堆积问题
但是,死信队列的实现方式存在一个问题,那就是可能造成队头阻塞。RabbitMQ 会定期扫描队列的头部检查队首的消息是否过期。如果队首消息过期了,它会被放到死信队列中。然而,RabbitMQ 不会逐个检查队列中的所有消息是否过期,而是仅检查队首消息。这样,如果队列的队头消息未过期,而它后面的消息已过期,这些后续消息将无法被单独移除,直到队头的消息被消费或过期。
因为队列是先进先出的,在普通队列中的消息,每次只会判断邢队头的消息是否过期,那么,如果队头的消息时间很长,一直都不过期,那么就会阻塞整个队列,这时候即使排在他后面的消息过期了,那么也会被一直阻塞。

基于 RabbitMQ 的死信队列,可以实现延迟消息,非常灵活的实现定时关单,并且借助 RabbitMQ 的集群扩展性,可以实现高可用,以及处理大并发量。他的缺点第一是可能存在消息阻塞的问题,还有就是方案比较复杂,不仅要依赖 RabbitMQ, 而目还需要声明很多队列出来,增加系统的复杂度

3、DelayExchange 插件

前面我们提到的基于死信队列的方式,是消息先会投递到一个正常队列,在 TTL 过期后进入死信队列。但是基于插件的这种方式,消息并不会立即进入队列,而是先把他们保存在一个基于 Erlang 开发的 Mnesia 数据库中,然后通过一个定时器去查询需要被投递的消息,再把他们投递到 x-delayed-message 交换机中。
基于 RabbitMQ 插件的方式可以实现延迟消息,并且不存在消息阻塞的问题,但是因为是基于插件的,而这个插件支持的最大延长时间是 (2^32)-1 毫秒,大约 49 天,超过这个时间就会被立即消费。

插件下载地址: GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
由于我们安装的 MQ 是 3.8 版本,因此这里下载 3.8.17 版本:
image.png|600px
附件:![[rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez]]

4.2.2. 安装

因为我们是基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:

[  {  "CreatedAt": "2024-06-19T09:22:59+08:00",  "Driver": "local",  "Labels": null,  "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",  "Name": "mq-plugins",  "Options": null,  "Scope": "local"  }  
]  

插件目录被挂载到了 /var/lib/docker/volumes/mq-plugins/_data 这个目录,我们上传插件到该目录下

注意上传插件

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:
image.png

4.2.3. 声明延迟交换机

image.png

根据

1、创建交换机:
image.png
2、创建队列
image.png
3、根据 bandingKey 绑定队列:
image.png|500

基于注解方式:

@RabbitListener(bindings = @QueueBinding(  value = @Queue(name = "delay.queue", durable = "true"),  exchange = @Exchange(name = "delay.direct", delayed = "true"),  key = "delay"  
))  
public void listenDelayMessage(String msg){  log.info("接收到delay.queue的延迟消息:{}", msg);  
}

基于 @Bean 的方式:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;  
import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;@Slf4j  
@Configuration  
public class DelayExchangeConfig {@Bean  public DirectExchange delayExchange(){  return ExchangeBuilder  .directExchange("delay.direct") // 指定交换机类型和名称  .delayed() // 设置delay的属性为true  .durable(true) // 持久化  .build();  }@Bean  public Queue delayedQueue(){  return new Queue("delay.queue");  }  @Bean  public Binding delayQueueBinding(){  return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");  }  
}  

4.2.4. 发送延迟消息

发送消息时,必须通过 x-delay 属性设定延迟时间:

@Test  
void testPublisherDelayMessage() {  // 1.创建消息  String message = "hello, delayed message";  // 2.发送消息,利用消息后置处理器添加消息头  rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {  @Override  public Message postProcessMessage(Message message) throws AmqpException {  // 添加延迟消息属性  message.getMessageProperties().setDelay(5000);  return message;  }  });  
}

warning 注意: 延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/7097.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2.4Java全栈开发前端+后端(全栈工程师进阶之路)-前端框架VUE3-基础-Vue组件

初识Vue组件 Vue中的组件是页面中的一部分,通过层层拼装,最终形成了一个完整的组件。这也是目前前端最流行的开发方 式。下面是Vue3官方给出的一张图,通过图片能清楚的了解到什么是Vue中的组件。 图的左边是一个网页,网页分为了…

ECS弹性云服务器居然这么好用。

引言 在过去的十年里,云计算从一个前沿概念发展为企业和开发者的必备工具。传统的计算模型通常局限于单一的、物理的位置和有限的资源,而云计算则通过分布式的资源和服务,为计算能力带来了前所未有的"弹性"。 云弹性服务器——为什…

AAA、RADIUS、TACACS、Diameter协议介绍

准备软考高级时碰到的一个概念,于是搜集网络资源整理得出此文。 概述 AAA是Authentication、Authorization、Accounting的缩写简称,即认证、授权、记帐。Cisco开发的一个提供网络安全的系统。AAA协议决定哪些用户能够访问服务,以及用户能够…

精品干货 | 数据中台与数据仓库建设(免费下载)

【1】关注本公众号,转发当前文章到微信朋友圈 【2】私信发送 数据中台与数据仓库建设 【3】获取本方案PDF下载链接,直接下载即可。 如需下载本方案PPT/WORD原格式,请加入微信扫描以下方案驿站知识星球,获取上万份PPT/WORD解决方…

PPO 学习笔记

用PPO算法求解整个神经网络在迭代过程中的梯度问题 每走一步就会得到一个新的状态,把这个状态传到网络里面,会得到一个 action,执行这个 action 又会到达一个新状态 policy 中由状态 st 生成动作 at,生成的这个 at 是由整个网络的…

什么是X电容和Y电容?

先补充个知识: 一、什么是差模信号和共模信号 差模信号:大小相等,方向相反的交流信号;双端输入时,两个信号的相位相差180度 共模信号:大小相等。方向相同。双端输入时,两个信号相同。 二、安规…

Redis探索之旅(基础)

目录 今日良言:满怀憧憬,阔步向前 一、基础命令 1.1 通用命令 1.2 五大基本类型的命令 1.2.1 String 1.2.2 Hash 1.2.3 List 1.2.4 Set 1.2.5 Zset 二、过期策略以及单线程模型 2.1 过期策略 2.2 单线程模型 2.3 Redis 效率为什么这么高 三…

cmake进阶:文件操作

一. 简介 前面几篇文章学习了 cmake的文件操作,写文件,读文件。文章如下: cmake进阶:文件操作之写文件-CSDN博客 cmake进阶:文件操作之读文件-CSDN博客 本文继续学习文件操作。主要学习 文件重命名,删…

python爬虫(一)之 抓取极氪网站汽车文章

极氪汽车文章爬虫 闲来没事,将极氪网站的汽车文章吃干抹尽,全部抓取到本地,还是有点小小的难度。不能抓取太快,太快容易被封禁IP,不过就算被封了问题也不大,大不了重启路由器,然后你的IP里面又…

文件夹加密软件哪个好?文件夹加密软件排行榜

许多人给小编说,我们公司想实现文件私自发出呈乱码状态,这说明公司逐渐认识到文件加密的重要性。 目前,加密软件已经广泛应用于企业办公、商业贸易、个人应用等多个领域,成为保护数据安全和隐私的重要手段。 为了保护企业机密&am…

OpenNJet评测,探寻云原生之美

在信息时代的大海上,云原生应用引擎如一艘航行于波涛之间的帆船,承载着创新的梦想和数字化的未来。本文将带领您登上这艘船,聚焦其中之一的OpenNJet,一同探寻其中的奥秘和精妙,领略其独特之美。 OpenNJet 内容浅析 O…

智慧工地)智慧工地标准化方案(107页)

2.2 设计思路 对于某某智慧工地管理系统的建设,绝不是对各个子系统进行简单堆砌,而是在满足各子系统功能的基础上,寻求内部各子系统之间、与外部其它智能化系统之间的完美结合。系统主要依托于智慧工地管理平台,来实现对众多子系统…

OpenNJet应用引擎——云原生时代的Web服务新选择

文章目录 OpenNJet应用引擎——云原生时代的Web服务新选择引言:数字化转型的推动力:OpenNJet应用引擎为什么选择OpenNJet? OpenNJet的核心优势1. 云原生功能增强2. 安全加固3. 代码重构与性能优化4. 动态加载机制5. 多样化的产品形态6. 易于集…

Python测试框架Pytest的参数化详解

上篇博文介绍过,Pytest是目前比较成熟功能齐全的测试框架,使用率肯定也不断攀升。 在实际工作中,许多测试用例都是类似的重复,一个个写最后代码会显得很冗余。这里,我们来了解一下pytest.mark.parametrize装饰器&…

后端接口返回二进制数据流,前端如何将其转换成对应的excel、csv和json文件格式并下载

本文主要是介绍在工作中遇到的后端接口返回一个二进制数据流,前端在界面上创建下载按钮并下载成对应格式的文件导出。 downloadData({start: startTime,end: endTime,exportType: 0, // 0-excel, 1-csv, 2-json }).then((res) > {download(res, startTime, endTi…

毕业设计:《基于 Prometheus 和 ELK 的基础平台监控系统设计与实现》

前言 《基于 Prometheus 和 ELK 的基础平台监控系统设计与实现》,这是我在本科阶段的毕业设计,通过引入 Prometheus 和 ELK 架构实现企业对指标与日志的全方位监控。并且基于云原生,使用容器化持续集成部署的开发方式,通过 Sprin…

通信系列:通信中如何度量消息中所包含的信息量?如何评估通信系统的性能?

微信公众号上线,搜索公众号小灰灰的FPGA,关注可获取相关源码,定期更新有关FPGA的项目以及开源项目源码,包括但不限于各类检测芯片驱动、低速接口驱动、高速接口驱动、数据信号处理、图像处理以及AXI总线等 本节目录 一、通信中如何度量消息…

小吉/希亦/鲸立内衣洗衣机怎么样?深度测评谁更好用!

内衣洗衣机是近几年新兴的家电产品,以清洁效果好、除菌能力强,被很多人种草入手了!但网上有不少人虽感兴趣,但不清楚如何选。担心买到质量差,清洗不干净的产品。作为一名家电测评博主,我今天特意围绕被问最…

神奇的Vue3 - 组件探索

神奇的Vue3 第一章 神奇的Vue3—基础篇 第二章 神奇的Vue3—Pinia 文章目录 神奇的Vue3了解组件一、注册组件1. 全局注册​2. 局部注册3. 组件命名 二、属性详解1. Props(1)基础使用方法(2)数据流向:单项绑定原则&…

5-在Linux上部署各类软件

1. MySQL 数据库安装部署 1.1 MySQL 5.7 版本在 CentOS 系统安装 注意:安装操作需要 root 权限 MySQL 的安装我们可以通过前面学习的 yum 命令进行。 1.1.1 安装 配置 yum 仓库 # 更新密钥 rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022# 安装Mysql…