学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?
真的好用吗?鲜有人提的 RabbitMQ-RPC模式



在这里插入图片描述
前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是延迟队列?

延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。

二、延迟队列的实现

延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。

在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。

1. x-delayed-message插件

x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件

首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下
在这里插入图片描述
然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可
然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");

不难发现,此时其实是交换机在做延迟,
在这里插入图片描述

当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.headers(new HashMap<String, Object>(){{put("x-delay", 5000);}});
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());

2. TTL + 死信队列

此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:
在这里插入图片描述

首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
arguments.put("x-message-ttl", 5000);channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");

在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:

channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

三、手写延时队列

当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列

1. 时间轮概念

在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中
在这里插入图片描述
时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务
在这里插入图片描述

2. JAVA演示

我们先使用JUC相关内容实现一个时间轮

import java.util.*;
import java.util.concurrent.*;class TimeWheel {private int size;private int currentIndex;private List<BlockingQueue<Task>> slots;private Executor executor;public TimeWheel(int size, Executor executor) {this.size = size;this.slots = new ArrayList<>(size);for (int i = 0; i < size; i++) {slots.add(new LinkedBlockingQueue<>());}this.executor = executor;}public void addTask(Task task) {int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;slots.get(expireIndex).add(task);}public void start() {new Thread(() -> {while (true) {currentIndex = (currentIndex + 1) % size;BlockingQueue<Task> currentSlot = slots.get(currentIndex);List<Task> tasks = new ArrayList<>();currentSlot.drainTo(tasks);for (Task task : tasks) {executor.execute(task);}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}class Task implements Runnable {private long delay; // 延迟时间,单位毫秒private Runnable task; // 任务public Task(long delay, Runnable task) {this.delay = delay;this.task = task;}public long getDelay() {return delay;}@Overridepublic void run() {task.run();}
}

我们可以使用main方法来尝试验证这个时间轮效果:

    public static void main(String[] args) {TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));// 添加任务,延迟5秒执行timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));// 添加任务,延迟10秒执行timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));// 启动时间轮timeWheel.start();}

在这里插入图片描述

当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。

四、应用场景与注意事项

1. 应用场景

  1. 红包预告
    在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包
    在这里插入图片描述

  2. 订单系统
    在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。

  3. 优惠券系统
    在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。

2. 注意事项

  1. 延迟队列不要使用太多
    使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。

  2. 延迟队列可能会导致消息丢失
    在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。

  3. 设置合适的延迟时间
    在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。

总结

RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28148.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++初阶——函数重载

前言&#xff1a;C中除了可以在不同的命名空间中使用同名函数&#xff0c;还有一种支持在同一个作用域中同名函数的方式——函数重载。 函数重载 一.什么是函数重载&#xff1f;二.函数重载的3种规则三.特殊情况 一.什么是函数重载&#xff1f; C允许同样同一作用域中声明几个功…

Ubuntu 20.04 安装 Stable Diffusionn

步骤 1&#xff1a;安装 wget、git、Python3 和 Python3虚拟环境&#xff08;如果已安装可忽略这步骤&#xff09; sudo apt install wget git python3 python3-venv步骤 2&#xff1a;克隆 SD 项目到本地 git clone https://github.com/AUTOMATIC1111/stable-diffusion-webu…

【网络编程】实现一个简单多线程版本TCP服务器(附源码)

TCP多线程 &#x1f335;预备知识&#x1f384; Accept函数&#x1f332;字节序转换函数&#x1f333;listen函数 &#x1f334;代码&#x1f331;Log.hpp&#x1f33f;Makefile☘️TCPClient.cc&#x1f340;TCPServer.cc&#x1f38d; util.hpp &#x1f335;预备知识 &…

RabbitMQ - 简单案例

目录 0.引用 1.Hello world 2.轮训分发消息 2.1 抽取工具类 2.2 启动两个工作线程接受消息 2.4 结果展示 3.消息应答 3.1 自动应答 3.2 手动消息应答的方法 3.3 消息自动重新入队 3.4 消息手动应答代码 4.RabbitMQ 持久化 4.1 队列如何实现持久化 4.2 消息实现持久化 5.不…

7.1 动手实现AlexNet

AlexNet引入了dropput层 代码 import torch from torch import nn from d2l import torch as d2lnet nn.Sequential(# 样本数为1,通道数为96,11x11的卷积核,步幅为4&#xff0c;减少输出的高度和深度。 LeNet的通道数才6&#xff0c;此处96&#xff0c;为什么要增加这么多通…

MIT 6.830数据库系统 -- lab six

MIT 6.830数据库系统 -- lab six 项目拉取引言steal/no-force策略redo log与undo log日志格式和检查点 开始回滚练习1&#xff1a;LogFile.rollback() 恢复练习2&#xff1a;LogFile.recover() 测试结果疑问点分析 项目拉取 原项目使用ant进行项目构建&#xff0c;我已经更改为…

微服务技术栈(1.0)

微服务技术栈 认识微服务 单体架构 单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署 优点&#xff1a; 架构简单部署成本低 缺点&#xff1a; 耦合度高 分布式架构 分布式架构&#xff1a;根据业务功能对系统进行拆分&#xff0c…

如何在 Spring Boot 中集成日志框架 SLF4J、Log4j

文章目录 具体步骤附录 笔者的操作环境&#xff1a; Spring Cloud Alibaba&#xff1a;2022.0.0.0-RC2 Spring Cloud&#xff1a;2022.0.0 Spring Boot&#xff1a;3.0.2 Nacos 2.2.3 Maven 3.8.3 JDK 17.0.7 IntelliJ IDEA 2022.3.1 (Ultimate Edition) 具体步骤 因为 …

Java课题笔记~ 使用 Spring 的事务注解管理事务(掌握)

通过Transactional 注解方式&#xff0c;可将事务织入到相应 public 方法中&#xff0c;实现事务管理。 Transactional 的所有可选属性如下所示&#xff1a; propagation&#xff1a;用于设置事务传播属性。该属性类型为 Propagation 枚举&#xff0c; 默认值为 Propagation.R…

ESP32 Max30102 (3)修复心率误差

1. 运行效果 2. 新建修复心率误差.py 代码如下: from machine import sleep, SoftI2C, Pin, Timer from utime import ticks_diff, ticks_us from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM from hrcalc import calc_hr_and_spo2BEATS = 0 # 存储心率 FINGER_F…

如何识别手机是否有灵动岛(dynamic island)

如何识别手机是否有灵动岛&#xff08;dynamic island&#xff09; 灵动岛是苹果2022年9月推出的iPhone 14 Pro、iPhone 14 Pro Max首次出现&#xff0c;操作系统最低是iOS16.0。带灵动岛的手机在竖屏时顶部工具栏大于等于51像素。 #define isHaveDynamicIsland ({ BOOL isH…

微信小程序的项目解构

视频链接 黑马程序员前端微信小程序开发教程&#xff0c;微信小程序从基础到发布全流程_企业级商城实战(含uni-app项目多端部署)_哔哩哔哩_bilibili 接口文档 https://www.escook.cn/docs-uni-shop/mds/1.start.html 1&#xff1a;微信小程序宿主环境 1&#xff1a;常见的宿…

安达发制造工业迈向智能化:APS高级计划排程助力提升生产效率

随着市场竞争的加剧&#xff0c;制造企业纷纷寻求提高生产效率和降低成本的方法。近年来&#xff0c;越来越多的制造企业开始采用APS(高级计划与排程)系统&#xff0c;以优化生产计划和排程&#xff0c;提高生产效率&#xff0c;并在竞争中取得优势。 现代制造业通常面临复杂的…

【第一阶段】kotlin的range表达式

range:范围&#xff1a;从哪里到哪里的意思 in:表示在 !in&#xff1a;表示不在 … :表示range表达式 代码示例&#xff1a; fun main() {var num:Int20if(num in 0..9){println("差劲")}else if(num in 10..59){println("不及格")}else if(num in 60..89…

c语言每日一练(3)

前言&#xff1a;每日一练系列&#xff0c;每一期都包含5道选择题&#xff0c;2道编程题&#xff0c;博主会尽可能详细地进行讲解&#xff0c;令初学者也能听的清晰。每日一练系列会持续更新&#xff0c;暑假时三天之内必有一更&#xff0c;到了开学之后&#xff0c;将看学业情…

MySQL数据库的操作

MySQL 连接服务器 库的操作创建数据库数据库删除查看数据库进入数据库查看所在的数据库修改数据库显示创建语句查看连接情况 表的操作创建表查看数据库所有的表查看表的详细信息查看创建表时的详细信息删除表修改表名向表中插入数据在表结构中新增一列对表结构数据的修改删除表…

std::string 的append方法 存放文本和非文本数据

今天在用std::string来拼接数据 有文本数据 也有 非文本数据 如果是文本数据那么append方法参数为 ( char *data, int len&#xff09; 将data的前len个字节附加到 string中 如果是非文本数据 则参数为&#xff08;int size, char data&#xff09;; 重复size个data 附加…

【技巧】如何保护PowerPoint不被改动?

PPT&#xff0c;也就是PowerPoint&#xff0c;是很多小伙伴在工作生活中经常用到的图形演示文稿软件。 做好PPT后&#xff0c;担心自己不小心改动了或者不想他人随意更改&#xff0c;我们可以如何保护PPT呢&#xff1f;下面小编就来分享两个常用的方法&#xff1a; 1. 将PPT改…

STM32 4G学习(二)

特性参数 ATK-IDM750C是正点原子开发的一款高性能4G Cat1 DTU产品&#xff0c;支持移动4G、联通4G和电信4G手机卡。 它以高速率、低延迟和无线数传作为核心功能&#xff0c;可快速解决应用场景下的无线数传方案。 它支持TCP/UDP/HTTP/MQTT/DNS/RNDIS/NTP协议&#xff0c;支持…

ASCP系列电气防火限流式保护器在养老院的应用-安科瑞黄安南

摘要&#xff1a;2020年&#xff0c;我国65岁及以上老年人口数量为1.91亿&#xff0c;老龄化率达到13.5%。总体来看&#xff0c;大部分省市的养老机构数量还较少。养老设施的建设与民生息息相关&#xff0c;养老院的电气安全也非常重要。如果发生电气火灾&#xff0c;对于行动不…