Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

目录

  • 一、序言
  • 二、死信交换机和消息TTL实现延迟消息
    • 1、死信队列介绍
    • 2、代码示例
      • (1) 死信交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
    • 3、测试用例
  • 三、延迟消息交换机实现延迟消息
    • 1、安装延时消息插件
    • 2、代码示例
      • (1) 延时消息交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
    • 3、测试用例
  • 四、两种实现方式优缺点
  • 1、延时消息插件
  • 2、TLL&死信交换机

一、序言

业务开发中有很多延时操作的场景,比如最常见的超时订单自动关闭延时异步处理,我们常用的实现方式有:

  • 定时任务轮询(有延时)。
  • 借助Redission的延时队列
  • Redis的key过期事件通知机制(需开启key过期事件通知,对Redis有性能损耗)。
  • RocketMQ中定时消息推送(支持的时间间隔固定,不支持自定义)。
  • RabbitMQ中的死信队列和延迟消息交换机

其中用的最多的也是借助Redisson实现的数据结构延迟队列RabbitMQ中的死信队列来实现,今天我们通过RabbitMQ死信队列和延迟消息交换机(新特性)来实现延时消息推送。


二、死信交换机和消息TTL实现延迟消息

1、死信队列介绍

这种方式主要通过结合消息过期和私信交换机来实现延迟消息推送,首先先了解下哪些消息会进入死信队列:

  • 被消费者nack(negatively acknowleged)的消息。
  • TTL过期后未被消费的消息。
  • 超过队列长度限制后被丢弃的消息。

备注:更多信息请参考RabbitMQ中的 Dead Letter Exchange。

2、代码示例

(1) 死信交换机配置

@Configuration
protected static class DeadLetterExchangeConfig {@Beanpublic Queue deadLetterQueue(){return QueueBuilder.durable("dead-letter-queue").build();}@Beanpublic DirectExchange deadLetterExchange() {return ExchangeBuilder.directExchange("dead-letter-exchange").build();}@Beanpublic Binding bindQueueToDeadLetterExchange(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");}@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").deadLetterExchange("dead-letter-exchange").deadLetterRoutingKey("dead-letter-routing-key").build();}}

(2) 消息生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendMsgToDeadLetterExchange(String body, int timeoutInMillSeconds) {log.info("开始发送消息到dead letter exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(String.valueOf(timeoutInMillSeconds)).build();Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("normal-queue", message);}}

(3) 消息消费者

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "dead-letter-queue")public void handleMsgFromDeadLetterQueue(String msg) {log.info("Message received from dead-letter-queue, message body: {}, current time:{}", msg, LocalDateTime.now());}
}

3、测试用例

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/dead-letter")public ResponseEntity<String> sendMsgToDeadLetterExchange(String body, int timeout) {rabbitMqProducer.sendMsgToDeadLetterExchange(body, timeout);return ResponseEntity.ok("消息发送到死信交换机成功");}}

浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

2023-11-26 11:50:33.041  INFO 19152 --- [nio-8080-exec-7] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到dead letter exchange 消息体:hello, 消息延迟:5000ms, 当前时间:2023-11-26T11:50:33.041
2023-11-26 11:50:38.054  INFO 19152 --- [ntContainer#4-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from dead-letter-queue, message body: hello, current time:2023-11-26T11:50:38.054

三、延迟消息交换机实现延迟消息

上面通过消息TTL和死信交换机实现延迟消息的解决方案是由一个叫James Carr的人提出来的,后来RabbitMQ提供了一个开箱即用的解决方案,通过延时消息插件来实现。

该插件以前被当做是试验性产品,但是现在已经可以投产使用了。(PS:2015年4月16号就已经有该插件文档)

Spring AMQP中,同样提供了对该延时消息插件的支持,并且在RabbitMQ 3.6.0版本就已经测试通过。

1、安装延时消息插件

该延时消息插件为社区插件,因此需要自己手动下载安装的RabbMQ版本对应的插件,下载地址:RabbitMQ延时消息插件releases。

我安装的RabbitMQ版本为3.9.9,3.9.0版本的插件对所有3.9.x版本的RabbitMQ都支持。

在这里插入图片描述
下载完后把.ez结尾的插件复制RabbitMQ的插件目录下,插件目录为/usr/lib/rabbitmq/plugins

在这里插入图片描述
通过命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装该插件,通过命令rabbitmq-plugins list查看插件列表,可以看到该延时消息插件已经成功安装。

在这里插入图片描述

2、代码示例

(1) 延时消息交换机配置

@Configuration
protected static class DelayedMsgExchangePluginConfig {@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayed-queue").build();}@Beanpublic DirectExchange delayedExchange() {return ExchangeBuilder.directExchange("delayed-exchange").delayed().build();}@Beanpublic Binding bindDelayedQueueToDelayedChange(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed-routing-key");}
}

备注:延时交换机的类型可以为DirectExchage、TopicExcahgeFanoutExchange,这些都支持。

(2) 消息生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendDelayedMsg(String body, int timeoutInMillSeconds) {log.info("开始发送消息到delayed-exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(timeoutInMillSeconds);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("delayed-exchange", "delayed-routing-key", message);}
}

(3) 消息消费者

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "delayed-queue")public void handleMsgFromDelayedQueue(String msg) {log.info("Message received from delayed-queue, message body: {}, current time:{}", msg, LocalDateTime.now());}
}

3、测试用例

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/delayed")public ResponseEntity<String> sendMsgToHeadersExchange(String body, int timeout) {rabbitMqProducer.sendDelayedMsg(body, timeout);return ResponseEntity.ok("消息发送到延迟交换机成功");}}

浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

2023-11-26 13:02:07.816  INFO 26524 --- [nio-8080-exec-3] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到delayed-exchange 消息体:Hello, 消息延迟:5000ms, 当前时间:2023-11-26T13:02:07.816
2023-11-26 13:02:12.830  INFO 26524 --- [ntContainer#5-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from delayed-queue, message body: Hello, current time:2023-11-26T13:02:12.829

四、两种实现方式优缺点

1、延时消息插件

  • 优点:配置更加简单,少配置1个队列,且语义更明确,容易定位消息出入口。
  • 缺点:延时消息插件对RabbitMQ版本有要求,延时消息交换机

2、TLL&死信交换机

  • 优点:基本适用于所有RabbitMQ版本。
  • 缺点:配置相对来说复杂一些,还有就是我们最开始提到的,不只是TTL过期的消息才会进入死信队列,还有超出队列限制nack的消息也会进入死信队列,触发的条件没那么纯粹。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/171262.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Linux]进程创建➕进程终止

文章目录 1.再谈fork()函数1.1fork()创建子进程 OS都做了哪些工作?1.2对上述问题的理解1.3写时拷贝进行父子进程分离的优势1.4了解eip寄存器和pc1.5了解进程的上下文数据1.6对计算机组成的理解1.7fork常规用法1.8fork调用失败的原因 2.进程终止2.1进程终止时操作系统要做的工作…

人工智能-注意力机制之Transformer

Transformer 比较了卷积神经网络&#xff08;CNN&#xff09;、循环神经网络&#xff08;RNN&#xff09;和自注意力&#xff08;self-attention&#xff09;。值得注意的是&#xff0c;自注意力同时具有并行计算和最短的最大路径长度这两个优势。因此&#xff0c;使用自注意力…

13年老鸟总结,性能测试方法汇总+性能响应很慢排查方法(详全)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、性能测试包含哪…

windows的bat文件(学习笔记)

简介 通过windows的cmd执行的批处理&#xff0c;扩展名可以是.bat或.cmd&#xff08;类似linux的shell脚本&#xff09; 所有语句符号不区分大小写 帮助提示信息&#xff1a;命令 /? 1 基本语法 (1) 注释&#xff1a;rem 注释文本不执行 (2) 关闭盘符输出&#xff1a;e…

Java实现-数据结构 2.时间和空间复杂度

.如何衡量一个算法的好坏&#xff1a;时间复杂度和空间复杂度 算法效率分为时间效率和空间效率&#xff0c;时间效率称为时间复杂度&#xff0c;空间效率称为空间复杂度 时间复杂度 算法的时间复杂度是一个数学函数&#xff0c;它描述了算法的运行时间&#xff0c;一个算法执…

vim+xxd编辑十六进制的一个大坑:自动添加0x0a

问题描述 今天在做一个ctf题&#xff0c;它给了一个elf文件&#xff0c;我要做的事情是修复这个elf文件&#xff0c;最后执行它&#xff0c;这个可执行文件会计算它自身的md5作为这道题的flag。我把所有需要修复的地方都修复了&#xff0c;程序也能成功运行&#xff0c;但是fl…

【电路笔记】-快速了电阻

电阻类型 文章目录 电阻类型1、概述2、电阻器的组成类型2.1 碳电阻器2.2 薄膜电阻器2.3 绕线电阻器 3、总结 电阻器是所有电子元件中最基本、最常用的元件&#xff0c;人们几乎认为电阻器是理所当然的&#xff0c;但它们在电路中起着至关重要的作用。 1、概述 有许多不同类型的…

SpringCloud之Gateway(统一网关)

文章目录 前言一、搭建网关服务1、导入依赖2、在application.yml中写配置 二、路由断言工厂Route Predicate Factory三、路由过滤器 GatewayFilter案例1给所有进入userservice的请求添加一个请求头总结 四、全局过滤器 GlobalFilter定义全局过滤器&#xff0c;拦截并判断用户身…

深入解析:如何开发抖音票务小程序

当下&#xff0c;开发抖音票务小程序成为了吸引年轻用户群体的一种创新方式。本文将深入解析如何开发抖音票务小程序&#xff0c;探讨关键步骤和技术要点。 1.确定需求和功能 考虑到抖音的用户特点&#xff0c;可以加入与短视频相关的票务功能&#xff0c;如在线购票、观影记录…

IDEA中的Postman?完全免费!

Postman是大家最常用的API调试工具&#xff0c;那么有没有一种方法可以不用手动写入接口到Postman&#xff0c;即可进行接口调试操作&#xff1f;今天给大家推荐一款IDEA插件&#xff1a;Apipost Helper&#xff0c;写完代码就可以调试接口并一键生成接口文档&#xff01;而且还…

Jenkins用126邮箱发邮件为什么发不出去

1、检查 Jenkins Location中的邮件地址配置与发邮件的地址配置是否一致 Manage Jenkins -》 system 2、检查地址和端口号 3、检查邮箱的登录配置是否正确&#xff08;这个地方的配置方式网上一抓一大把&#xff0c;自己搜一下就好&#xff09; 4、126邮箱发邮件不需要勾选ssl协…

Blender学习--模型贴图傻瓜级教程

Blender 官方文档 1. Blender快捷键&#xff1a; 快捷键说明 按住鼠标滚轮&#xff1a;移动视角Tab&#xff1a;切换编辑模式和物体模式鼠标右键&#xff1a; 编辑模式&#xff1a; 物体模式&#xff1a; 其他&#xff1a; 2. 下面做一个球体贴一张纹理的操作 2.1 效果如下…

VM虚拟机中Ubuntu14.04安装VM tools后仍不能全屏显示

1、查看Ubuntu所支持的分辨率大小。 在终端处输入&#xff1a; xrandr&#xff0c;回车 2、输入你想设置的分辨率参数。 我设置的为1360x768&#xff0c;大家可以根据自己的具体设备设置。 在终端输入&#xff1a;xrandr -s 1360x768 注意&#xff1a;这里1360后边是字母 x 且…

Selenium-介绍下其他骚操作

Chrome DevTools 简介 Chrome DevTools 是一组直接内置在基于 Chromium 的浏览器&#xff08;如 Chrome、Opera 和 Microsoft Edge&#xff09;中的工具&#xff0c;用于帮助开发人员调试和研究网站。 借助 Chrome DevTools&#xff0c;开发人员可以更深入地访问网站&#xf…

交叉编译 和 软硬链接 的初识(面试重点)

目录 交叉编译的初认识Q&A Q1: 编译是什么&#xff1f; Q2: 交叉编译是什么&#xff1f; Q3: 为什么要交叉编译 Q3.1&#xff1a;树莓派相对于C51大得多&#xff0c;可以集成编译器比如gcc&#xff0c;那么树莓派就不需要交叉编译了吗&#xff1f; Q4: 什么是宿主机和…

鸿蒙HarmonyOS 编辑器 下载 安装

好 各位 之前的文章 注册并实名认证华为开发者账号 我们基实名注册了华为的开发者账号 我们可以访问官网 https://developer.harmonyos.com/cn/develop/deveco-studio 在这里 直接就有我们编辑器的下载按钮 我们直接点击立即下载 这里 我们根据自己的系统选择要下载的系统 例…

Unsupervised MVS论文笔记(2019年)

Unsupervised MVS论文笔记&#xff08;2019年&#xff09; 摘要1 引言2 相关工作3 实现方法3.1 网络架构3.2 通过光度一致性学习3.3 MVS的鲁棒光度一致性3.4 学习设置和实施的细节3.5.预测每幅图像的深度图 4 实验4.1 在DTU上的结果4.2 消融实验4.3 在ETH3D数据集上的微调4.4 在…

电力感知边缘计算网关产品设计方案-网关软件设计方案

网关采用网络协议和软件技术在通信网络中针对工业协议、互联网通用协议进行分析和记录,提升工业控制系统环境的安全防护能力。A类和B类网关采用容器技术的软件架构,采用C/S架构软件客户端提供应用软件平台,为管理员提供功能丰富的图形管理控制界面。 因A类和B类网关在产品定…

按需引入 ElMessage,没有样式且类型检查失败

文章目录 ElMessage 弹框没有样式问题描述解决方案 ts 类型检查失败问题描述解决办法 eslint 检查失败问题描述解决办法 ElMessage 弹框没有样式 问题描述 Element-plus 在使用 ElMessage 消息弹框的时候没有样式&#xff0c;按照官方的按需加载的方式引入的 import { ElMes…

AIGC ChatGPT4总结Linux Shell命令集合

在Linux中,Shell命令的数量非常庞大,因为Linux提供了各种各样的命令来处理系统任务。这些命令包括GNU核心工具集、系统命令、shell内置命令以及通过安装获得的第三方应用程序命令。以下是一些常见的Linux命令分类及其示例,但请注意,这不是一个全面的列表,因为列出所有命令…