【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码

参考资料

  • RabbitMQ官方网站
  • RabbitMQ官方文档
  • 噼咔噼咔-动力节点教程

文章目录

    • 七、延迟队列
      • 7.1 什么是延迟队列
      • 7.2 延迟队列的解决方案
        • 7.2.1 定时任务
        • 7.2.2 **被动取消**
        • 7.2.3 JDK的延迟队列
        • 7.2.3 采用消息中间件(rabbitMQ
          • 7.2.3.1 适用专门优化后的死信队列实现延迟队列
          • 7.2.3.2 :star:实例代码
          • 7.2.3.2 测试结果
        • 7.2.4 使用rabbitmq_delayed_message_exchange插件.
          • 7.2.4.1 插件下载
          • 7.2.4.2 :star:如何在docker环境下安装插件
          • 7.2.4.3 :star: 代码示例:如何使用该插件
          • 7.2.4.4 测试结果
      • 7.3 问题:多个消息的延迟时间不同该如何解决?
        • 7.3.1 解决方案一:用延迟队列区分
        • 7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange

七、延迟队列

7.1 什么是延迟队列

正常的MQ应用场景中,我们希望消息可以快速稳定的传递。但是有一些场景中,希望在指定的延迟后再消费信息,比如订单支付场景(订单15部分内未支付则关闭订单)。

这类实现延迟任务的场景,就可以采用延迟队列来实现。

以下介绍一下其他的一些方法。

7.2 延迟队列的解决方案

7.2.1 定时任务

每隔n秒扫描一次数据库,查询数据库装为过期的订单进行处理。

实现方式

spring schedule、quartz、xxljob等

优点

简单,容易实现;

缺点

  1. 存在延迟(受定时器延迟时间限制
  2. 性能较差,每次扫描数据库,如果订单量交大,会给数据库造成较大压力。
7.2.2 被动取消

当用户主动查询订单时,判断订单是否超时,超时则取消

  • 优点:服务器压力小
  • 缺点:如果用户长时间不查询,则会造成统计异常;而且用户打开订单页面会变慢,严重的话会影响用户体验
7.2.3 JDK的延迟队列

DelayedQueue:无界阻塞队列,该队列只有在延迟期满后,才能从中获取元素。

优点

实现简单,任务的延迟低。

缺点

  • 服务器重启宕机,数据会丢失
  • 只适用于单机版
  • 订单量大时,可能会造成内存不足:OOM
7.2.3 采用消息中间件(rabbitMQ

RabbitMQ 本身不支持延迟队列,可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递(前面提到的死信队列)。.

image-20231017141210411

把 DLX 跟某个队列绑定,到了指定时间,消息过期后,就会从 DLX 路由到这个队列,消费者可以从DLX的队列中取走消息。

7.2.3.1 适用专门优化后的死信队列实现延迟队列

在上面的mq方案中,存在两个不同的交换机,我们可以利用直连交换机的特性,将交换机优化成一个交换机,同时通过不同的routingKey指定普通队列和死信队列。

image-20231017141445269

思路解释

  1. 生产者发送消息到交换机X,并指定ttl的key
  2. 消息被交换机传递到ttl队列中(指定了消息过期时间的队列
  3. 同时,ttl队列还指定的死信交换机DLX为自身的交换机X,但是指定的routingKey为死信队列的key
  4. 这样,当消息在ttl队列中到期后,这条消息就会被传递到死信队列中,提供给消费者
7.2.3.2 ⭐️实例代码

为了便于测试,将发送和接收写在同一个服务中

配置信息

@Configuration
public class DelayExchangeConfig {public static String exchangeName = "order.ttl.exchange";public static String orderQ = "order.ttl.queue";public static String dlxQ = "order.dlx.queue";@Beanpublic DirectExchange delayedExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}@Beanpublic Queue orderQueue(){// 指定该队列的过期时间和死信队列Map<String , Object> properties = new HashMap<>();properties.put("x-message-ttl" , 15000);properties.put("x-dead-letter-exchange" , exchangeName);properties.put("x-dead-letter-routing-key" , "dead-letter");return QueueBuilder.durable(orderQ).withArguments(properties).build();}@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(dlxQ).build();}@Beanpublic Binding dlxBinding1(){return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("dead-letter");}@Beanpublic Binding ttlBinding1(){return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("order");}}

测试代码

@RestController
@RequestMapping("/delay")
@Slf4j
public class DelayedController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/{msg}")public void sentErrorMsg(@PathVariable("msg") String msg) {log.info("(延迟队列)准备发送的信息:{} , 路由键 :{}", msg, "order");// 发送到普通的延时列表中rabbitTemplate.convertAndSend(exchangeName, "order", msg.getBytes(StandardCharsets.UTF_8));log.info("(延迟队列)成功发送!发送时间{}" , LocalDateTimeUtil.now());}@RabbitListener(queues = "order.dlx.queue")public void receiveDelayedMsg(Message message){log.info("(延迟队列)接受到的消息是:{}" , new String(message.getBody()));}
}
7.2.3.2 测试结果

配置正确

image-20231017144033384

控制台打印正确:15秒后接收到的了之前发送的信息

image-20231017144116843


7.2.4 使用rabbitmq_delayed_message_exchange插件.
7.2.4.1 插件下载

插件下载地址

  • https://www.rabbitmq.com/community-plugins.html
  • https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
    • 根据自己的rabbit版本,我这里用的是3.9
7.2.4.2 ⭐️如何在docker环境下安装插件

参考文章:https://juejin.cn/post/7138717546894589966

  1. 将下载到的文件,移动到容器内

    docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
    

image-20231017153230781

  1. 进入容器bash指令,并启动插件

    docker exec -it rabbitmq bashroot@rabbit:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 使用下面的指令查看插件列表
    rabbitmq-plugins list
    

image-20231017153257970

进入控制台新建交换机,能查看到新的交换机类型

image-20231017154024943

7.2.4.3 ⭐️ 代码示例:如何使用该插件

官方说明文档:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#usage

image-20231017153803323

理解原理:delay exchange在接受到消息后,会先存在内部数据库中,检查x-delay延迟时间(头部

image-20231017154940504

代码使用思路

  1. 要创建自定义的交换机类型,要使用CustomExchange()来创建。几个参数的解释如下:

    • name:rabbit中交换机的名称
    • type:交换机类型 (x-delayed-message)
    • durable:是否持久
    • autoDelete:是否自动删除
    • arguments:参数信息
  2. arguments:参数信息从官方文档中获取

    // ... elided code ...
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
    // ... more code ...
    
  3. 交换机创建好后,只需要创建一条队列即可,并进行绑定

  4. 注意:消息发送需要在头部存放信息headers.put("x-delay", 延迟时间)。不需要使用自带的expiration来控制延迟时间了

配置类

@Configuration
public class DelayPluginConfig {public static String exchangeName = "delay-x-plugin.x";public static String key = "demo";@Beanpublic CustomExchange customExchange(){// 参考官方文档,创建插件提供的自定义交换机Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");// public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new CustomExchange(exchangeName, "x-delayed-message" , true , false , args);}@Beanpublic Queue delayDemoQueue(){return QueueBuilder.durable("delay-x-plugin.queue.demo").build();}@Beanpublic Binding delayPluginBinding(){return BindingBuilder.bind(delayDemoQueue()).to(customExchange()).with(key).noargs();}
}

生产者

@RestController
@RequestMapping("/delay/plugin")
@Slf4j
public class DelayedPluginController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/{delay}/{msg}")public void sentErrorMsg(@PathVariable("msg") String msg, @PathVariable("delay") Long delay) {log.info("(延迟插件队列)准备发送的信息:{} ,延迟时间:{} 路由键 :{}", msg, delay , "demo");// 在头部设置过期时间MessageProperties properties = new MessageProperties();properties.setHeader("x-delay", delay);Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();// 发送信息rabbitTemplate.convertAndSend(exchangeName, "demo", message);log.info("(延迟插件队列)成功发送!发送时间:{}", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()));}@RabbitListener(queues = "delay-x-plugin.queue.demo")public void receiveDelayedMsg(Message message) {log.info("(延迟插件队列)接受到的消息是:{}", new String(message.getBody()));}
}
7.2.4.4 测试结果

生成交换机和队列

image-20231017160126659image-20231017160147125

访问路径/delay/plugin/25000/一条25秒过期的信息:查看日志打印:成功

image-20231017160422203

7.3 问题:多个消息的延迟时间不同该如何解决?

由于队列先进先出的特性,如果不同消息的延迟时间不同,一旦出现后进的消息延迟时间小于先进的队列,那么消息过期的时间就会出错。

7.3.1 解决方案一:用延迟队列区分

要解决这个问题,就需要将队列的延迟时间统一,将不同的延迟的消息发送到对应延迟的队列中。

保证队列的延迟时间和消息的延迟时间是一样的即可。

如下

image-20231017144817671

7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange

由于该插件的原理并不是单纯的队列实现,而是使用rabbit内部数据库时间,所以可以很好的解决问题。

可以进行一个简单测试验证:

  • 先发送一条25秒过期的信息,再发送3条5秒过期的信息

  • 查看结果:正常消费,解决问题

    image-20231017160917110

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/109330.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软考高级系统架构设计师系列之:数学与经济管理

软考高级系统架构设计师系列之:数学与经济管理 一、数学与经济管理二、图论应用-最小生成树三、图论应用-最短路径四、图论应用-网络与最大流量五、运筹方法-线性规划六、运筹方法-动态规划七、运筹方法-转移矩阵八、运筹方法-排队论九、运筹方法-决策-不确定决策十、运筹方法…

Linux常见基本指令合集及其效果展示

Linux基本命令 文章目录 Linux基本命令1. whoami2. who3. clear4. pwd5. 查看文件信息5.0 什么是文件5.1 ls5.2 ls -l5.3 ls -a5.4 ls -a -l 6. 补充知识&#xff1a;对于Linux系统目录的认知6.1 什么是路径 7. cd8. touch9. mkdir10. rmdir11. rm12. man13. cp14. mv15. nano1…

魔行观察》一款免费的品牌/商业地产数据查询平台

给大家推荐一款免费的商业数据查询平台"魔行观察"&#xff0c;可免费查询品牌&#xff0c;品牌门店&#xff0c;商场&#xff0c;全国小区&#xff0c;写字楼等相关信息&#xff0c;更多数据敬请期待 小程序搜索&#xff1a;魔行观察 即可使用

Android组件通信——广播机制:BroadcastReceiver(二十九)

1. BroadcastReceiver 1.1 知识点 &#xff08;1&#xff09;掌握广播接收器的主要作用及基本实现&#xff1b; &#xff08;2&#xff09;可以使用广播启动Service&#xff1b; &#xff08;3&#xff09;理解闹钟服务的使用&#xff1b; 1.2 具体内容 广播这个名词大家…

信创办公–基于WPS的Word最佳实践系列 (图文环绕方式)

信创办公–基于WPS的Word最佳实践系列 &#xff08;图文环绕方式&#xff09; 目录 应用背景操作步骤1、 打开布局选项中图文环绕方式的方法2、 图文环绕三大类型 应用背景 在Word中&#xff0c;对文字和图片进行排版时&#xff0c;采用各种不同的图片与文字组合效果能够使页面…

Android 自定义view 圆形进度条

Android 自定义view 圆形进度条 前言一、码前分析二、开码1.画笔2.弧度3.圆弧的位置4.暴露给外部设置进度条的方法三、使用四、完整代码 总结 前言 先来看看效果&#xff0c;大概要实现这么一个圆形的进度条 一、码前分析 要实现这么一个进度条的效果&#xff0c;实际上是要画…

cbu和无cc的shiro反序列化

前置知识 学习CommonsBeanutils之前应该知道 javaBean&#xff0c;可以看《Java简单特性》也可以看这里有关BeanComparator的介绍TemplatesImpl gadget&#xff0c;前两个方法是public TemplatesImpl#getOutputProperties() -> TemplatesImpl#newTransformer() -> Tem…

【前端学习】—ES6新增的方法有哪些(十五)

【前端学习】—ES6新增的方法有哪些&#xff08;十五&#xff09; 一 、ES6中新增的方法 &#xff08;一&#xff09;、Object.is() //用于判断两个值/数据类型是否相等/* 特点&#xff1a;不仅可以对值类型进行正常处理&#xff0c;对象类型的值也可以处理对于特殊的值NaN 也…

第七版教材下的PMP考试有多难?

考过了几次就没多难了&#xff0c;主要是看考纲&#xff0c;其中的难点就是敏捷的内容多了不少&#xff0c;包含在考纲的三大模块中&#xff0c;pmp考试没有专门的敏捷教材&#xff0c;需要自己去找资料备考。 第七版教材主要内容&#xff1a; 考纲三大模块分析&#xff1a; …

10-k8s-身份认证与鉴权

文章目录 一、ServiceAccount介绍二、ServiceAccount相关的资源对象三、dashboard空间示例 一、ServiceAccount介绍 ServiceAccount&#xff08;服务账户&#xff09;概念介绍 1&#xff09;ServiceAccount是Kubernetes集群中的一种资源对象&#xff0c;用于为Pod或其他资源提供…

【C++】多态 -- 详解

⚪前言 声明一下&#xff0c;下面的代码和解释都是在 VS2019 下的 X86 程序中进行的&#xff0c;涉及的指针都是 4 bytes。如果要其他平台下&#xff0c;部分代码需要改动。比如&#xff1a;如果是 X64 程序&#xff0c;则需要考虑指针是 8 bytes 问题等等。其它编译环境下&…

2023年中国有创呼吸机产量、需求量及行业市场规模分析[图]

有创呼吸机主要是通过气管插管或者气管切开&#xff0c;然后通过管道连接在呼吸机上&#xff0c;为患者提供呼吸支持&#xff0c;主要针对的患者是昏迷的&#xff0c;无自主呼吸或不能耐受无创呼吸机的患者。 有创呼吸机是高端医疗装备&#xff0c;设计、生产和临床验证都必须经…

机器学习笔记 - 3D 对象跟踪极简概述

一、简述 大多数对象跟踪应用程序都是 2D 的。但现实世界是 3D 的,无论您是跟踪汽车、人、直升机、导弹,还是进行增强现实,您都需要使用 3D。在 CVPR 2022(计算机视觉和模式识别)会议上,已经出现了大量3D目标检测论文。 二、什么是 3D 对象跟踪? 对象跟踪是指随着时间的…

【JVM面试】从JDK7 到 JDK8, JVM为啥用元空间替换永久代?

系列文章目录 【JVM系列】第一章 运行时数据区 【面试】第二章 从JDK7 到 JDK8, JVM为啥用元空间替换永久代&#xff1f; 大家好&#xff0c;我是青花。拥有多项发明专利&#xff08;都是关于商品、广告等推荐产品&#xff09;。对广告、Web全栈以及Java生态微服务拥有自己独到…

MySQL [基础] 学习笔记

MySQL 学习 文章目录 MySQL 学习1. 数据库三层结构2. 数据在数据库中的存储方式3. SQL 语句分类3.1 备份恢复数据库的表 4. Mysql 常用数据类型(列类型)4.1 数值型(整数)的基本使用4.2 数值型(bit)的使用4.3 数值型(小数)的基本使用4.4 字符串的基本使用(面试题)4.5 字符串使用…

2020年下半年~2022下半年下午题易错总结

2020年下半年 试题一&#xff1a; 1.组播报文对无线网络空口的影响主要有(14) &#xff0c;随着业务数据转发的方式不同, 组播报文的抑制分别在 (15)、(16) 配置。 答案&#xff1a; &#xff08;14&#xff09;无线空口拥塞 &#xff08;15&#xff09;直连AP的交换…

软件测试需要学习什么?好学吗?需要学多久?到底是报班好还是自学好?

前言&#xff1a; 上篇文章看到很多小伙伴在讨论做测试到底怎么样&#xff0c; 其中很有很多的小伙伴还踩不少的坑&#xff0c;花费了大量的精力和时间去探索&#xff0c;结果还是一无所获。这里给大家出一期关于软件测试萌新的疑惑&#xff0c;看完这篇文章你就知道软件测试…

AR智能眼镜主板设计方案_AR眼镜PCB板设计

AR智能眼镜是一种采用先进技术的创新产品&#xff0c;具备强大的功能和性能。它采用了MTK8788八核 12nm低功耗硬件平台&#xff0c;搭载IMG GE830063OMhz或以上的GPU&#xff0c;并运行Android 11.0或以上的操作系统。该眼镜支持光波导1080P显示和LVDS接口自由曲面显示&#xf…

信钰证券:新增融券交易明显降温 业内称新规将平衡多类型投资者利益

10月14日&#xff0c;中国证监会发布调整优化融券相关准则的通知&#xff0c;沪深北买卖所齐发具体安排&#xff0c;阶段性收紧融券和战略出资者配售股份出借。其间&#xff0c;融券保证金比例进步自10月30日起实施&#xff0c;战略出资者配售股份出借收紧等其他条款自10月16日…

爆肝整理,性能测试-非GUI模式执行Jemter压测,看这篇就够了...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、上传脚本 把在…