RabbitMQ 之 延迟队列

目录

​编辑一、延迟队列概念

二、延迟队列使用场景

三、整合 SpringBoot

1、创建项目

2、添加依赖

3、修改配置文件

4、添加 Swagger 配置类

四、队列 TTL

1、代码架构图

2、配置文件代码类

3、生产者

4、消费者

5、结果展示

五、延时队列优化

1、代码架构图

2、配置文件

3、生产者​编辑

4、结果展示

六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

2、代码架构图

 3、配置文件类代码

4、生产者

5、消费者

6、结果展示


一、延迟队列概念

延时队列 , 队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。

二、延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:
发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。


三、整合 SpringBoot

1、创建项目

2、添加依赖

<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

3、修改配置文件

spring.rabbitmq.host=111.229.153.16
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.mvc.pathmatch.matching-strategy=ant_path_matcher

4、添加 Swagger 配置类

@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig(){return new Docket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo(){return new ApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(new Contact("馒头警告", "www.baidu.com","2714858327@qq.com")).build();}
}

四、队列 TTL

1、代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:


2、配置文件代码类

// TTL 队列,配置文件类代码
@Configuration
public class TTLQueueConfig {// 普通交换机的名称public static final String X_EXCHANGE = "X";// 死信交换机名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";// 普通队列名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";// 死信队列名称public static final String DEAD_LETTER_QUEUE_D = "QD";// 声明 X 交换机@Bean(value = "xExchange") // 相当于别名public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}// 声明 Y 交换机@Bean(value = "yExchange") // 相当于别名public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明普通队列 TTL 为 10s@Bean(value = "queueA") // 相当于别名public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","YD");// 设置 TTLarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明普通队列 TTL 为 10s@Bean(value = "queueB") // 相当于别名public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","YD");// 设置 TTLarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明死信队列@Bean(value = "queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();}// 绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA ,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB ,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 绑定@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD ,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

3、生产者

// 发送延迟消息
@Slf4j
@Configuration
@RequestMapping("/ttl")
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 开始发消息@GetMapping("/sendmsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自 TTL 为 10s 的队列" + message);rabbitTemplate.convertAndSend("X","XB","消息来自 TTL 为 40s 的队列" + message);}
}

4、消费者

// 队列 TTL
@Slf4j
@Component
public class DeadLetterQueueConsumer {// 接收消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel){String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

5、结果展示


五、延时队列优化

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?


1、代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间


2、配置文件

 


3、生产者


4、结果展示

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“

因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。


六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

大家可以参考网上的教程自行去下载安装,这里就不再进行描述了

如果安装好了,就会有一个叫做  x-delayed-message 的类型

一旦装完插件之后,消息延迟的位置就换到交换机了,相当于生产者发消息,这个消息停留在交换机这个位置,再将消息发给队列


2、代码架构图

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

一个生产者,要走延迟的交换机,延迟的交换机根据 RoutingKey ,路由到延迟的队列,而延迟的位置是在交换机的位置,队列被消费者所消费


 3、配置文件类代码

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@Configuration
public class DelayedQueueConfig {// 交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";// 队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";// RoutingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";// 声明交换机 基于插件的@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}// 声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}// 绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

4、生产者

    // 开始发消息  基于插件的  消息及延迟的时间@GetMapping("/senddelaymsg/{message}/{delaytime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delaytime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delaytime,message);rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey",message, msg ->{// 设置发送消息时候的延迟时长msg.getMessageProperties().setDelay(delaytime);return msg;});}

5、消费者

// 消费者 基于插件的延迟消息
@Slf4j
@Component
public class DelayQueueConsumer {// 监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiverDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}
}

6、结果展示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/864499.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python番外篇之责任转移:有关于虚拟机编程语言的往事

编程之痛 如果&#xff0c;你像笔者一样&#xff0c;有过学习或者使用汇编语言与C、C等语言的经历&#xff0c;一定对下面所说的痛苦感同身受。 汇编语言 将以二进制表示的一条条CPU的机器指令&#xff0c;以人类可读的方式进行表示。虽然&#xff0c;人类可读了&#xff0c…

使用deep修改前端框架中的样式

目录 1.deep的作用 2.使用方式 3.特别说明 scoped 的实现原理&#xff1a; !important 1.deep的作用 /deep/、::v-deep 和 :deep 都是用于穿透组件作用域的选择器。它们的主要目的是允许开发者在父组件中直接选择并样式化子组件内部的元素&#xff0c;即使这些元素被封装在…

JVM原理(九):JVM虚拟机工具之可视化故障处理工具

1. JHSDB:基于服务性代理的调试工具 JHSDB是一款基于服务性代理实现的进程外调试工具。 服务性代理是HotSpot虚拟机中一组用于映射Java虚拟机运行信息的、主要基于Java语言实现的API集合。 2. JConsole:Java监视与管理控制台 JConsole是一款基于JMX的可视化监视、管理工具。…

千益畅行,旅游卡,如何赚钱?

​ 赚钱这件事情&#xff0c;只有自己努力执行才会有结果。生活中没有幸运二字&#xff0c;每个光鲜亮丽的背后&#xff0c;都是不为人知的付出&#xff01; #旅游卡服务#

springboot拦截器,ThreadLocal(每个线程的公共区域)

拦截器 配置信息&#xff08;拦截所有请求&#xff09; 其实这种可以作为springAOP作日志记录

SpringCloud_Eureka注册中心

概述 Eureka是SpringCloud的注册中心。 是一款基于REST的服务治理框架&#xff0c;用于实现微服务架构中的服务发现和负载均衡。 在Eureka体系中&#xff0c;有两种角色: 服务提供者和服务消费者。 服务提供者将自己注册到Eureka服务器&#xff0c;服务消费者从Eureka服务器中…

软件著作权申请:保障开发者权益,促进软件创新

一、引言 在数字化时代&#xff0c;软件作为信息技术的核心&#xff0c;已成为推动社会进步和经济发展的重要力量。然而&#xff0c;随着软件产业的蓬勃发展&#xff0c;软件侵权和抄袭现象也日益严重。为了保护软件开发者的合法权益&#xff0c;促进软件产业的健康发展&#…

摄影后期色彩管理流程(Lightroom篇)

在摄影后期处理中&#xff0c;色彩管理是确保图像从捕捉到输出的一致性和准确性的关键。Lightroom 和 Photoshop 其实已经将这套色彩管理流程作为默认选项&#xff0c;如果实质操作时仍存在色彩偏差的问题&#xff0c;可参考以下内容。 ProPhoto RGB > Adobe RGB > sRGB …

linux 控制台非常好用的 PS1 设置

直接上代码 IP$(/sbin/ifconfig eth0 | awk /inet / {print $2}) export PS1"\[\e[35m\]^o^\[\e[0m\]$ \[\e[31m\]\t\[\e[0m\] [\[\e[36m\]\w\[\e[0m\]] \[\e[32m\]\u\[\e[0m\]\[\e[33m\]\[\e[0m\]\[\e[34m\]\h(\[\e[31m\]$IP\[\e[m\])\[\e[0m\]\n\[\e[35m\].O.\[\e[0m\]…

Golang内存分配

Go内存分配语雀笔记整理 Golang内存模型设计理念思考核心代码阅读mspanmcachemcentral中心缓存mheap分配过程 Golang内存模型设计理念思考 golang内存分配基于TCmalloc模型&#xff0c;它核心在于&#xff1a;空间换时间&#xff0c;一次缓存&#xff0c;多次复用&#xff1b;…

HarmonyOS开发探索:父子组件手势绑定问题处理

场景一&#xff1a;父子组件同时绑定手势的冲突处理 效果图 方案 在默认情况下&#xff0c;手势事件为非冒泡事件&#xff0c;当父子组件绑定相同的手势时&#xff0c;父子组件绑定的手势事件会发生竞争&#xff0c;最多只有一个组件的手势事件能够获得响应&#xff0c;默认子…

二、基础—常用数据结构:列表、元祖、集合、字典、函数等(爬虫及数据可视化)

二、基础—常用数据结构&#xff1a;列表、元祖、集合、字典、函数等&#xff08;爬虫及数据可视化&#xff09; 1&#xff0c;字符串2&#xff0c;最常用的是列表&#xff08;重点掌握&#xff09;3&#xff0c;元组4&#xff0c;字典&#xff08;重要&#xff09;5&#xff0…

【CSS in Depth 2 精译】2.3 告别像素思维

当前内容所在位置 第一章 层叠、优先级与继承第二章 相对单位 2.1 相对单位的威力 2.1.1 响应式设计的兴起 2.2 em 与 rem 2.2.1 使用 em 定义字号2.2.2 使用 rem 设置字号 2.3 告别像素思维 ✔️2.4 视口的相对单位2.5 无单位的数值与行高2.6 自定义属性2.7 本章小结 2.3 告别…

3D交互可视化编辑器求推荐,最好是针对企业级使用的?

企业级使用的3D交互可视化编辑器&#xff0c;支持编辑和调整2D、3D渲染及交互设置&#xff0c;以下几款可以关注了解一下&#xff1a; 1、Unity&#xff1a;一个广泛使用的跨平台游戏引擎&#xff0c;由Unity Technologies开发。支持开发者创建2D和3D游戏、交互式应用以及虚拟…

pdf压缩,pdf压缩在线网页版,在线压缩pdf网站

在数字化时代&#xff0c;pdf文件已经成为我们工作、学习和生活中不可或缺的一部分。然而&#xff0c;pdf文件往往体积庞大&#xff0c;传输效率低下&#xff0c;还占用大量存储空间。如何在不影响文件质量的前提下&#xff0c;减小pdf文件的大小呢&#xff1f;今天&#xff0c…

74HC595芯片验证

目录 0x00 74595芯片简介0x01 实现原理 0x00 74595芯片简介 74595芯片有很多种封装&#xff0c;不管是贴片的还是直插式的&#xff0c;它们的引脚定义都如下图所示&#xff1a; 其中 &#xff1a; Q0-Q7为并行数据输出口&#xff0c;可以输出高低电平。OE 为使能引脚&#…

MySQL之高可用性和应用层优化(一)

高可用性 故障转移和故障恢复 在应用中处理故障转移 有时候让应用来处理故障转移会更加简单或者更加灵活。例如&#xff0c;如果应用遇到一个错误&#xff0c;这个错误外部观察者正常情况下是无法察觉的&#xff0c;例如关于数据库损坏的错误日志信息&#xff0c;那么应用可…

Hugging Face发布重量级版本:Transformer 4.42

Hugging Face 宣布发布Transformer 4.42&#xff0c;该版本为流行的机器学习库带来了许多新功能和增强功能。此版本引入了几个高级模型&#xff0c;支持新工具和检索增强生成 &#xff08;RAG&#xff09;&#xff0c;提供 GGUF 微调&#xff0c;并整合了量化的 KV 缓存&#x…

基于协同过滤的航空票务推荐系统的设计与实现(飞机票推荐系统)

&#x1f497;博主介绍&#x1f497;&#xff1a;✌在职Java研发工程师、专注于程序设计、源码分享、技术交流、专注于Java技术领域和毕业设计✌ 温馨提示&#xff1a;文末有 CSDN 平台官方提供的老师 Wechat / QQ 名片 :) Java精品实战案例《700套》 2025最新毕业设计选题推荐…

Android Studio 解决AAPT: error: file failed to compile

1.找到项目下的build.gradle 2.在android语块中添加下面代码 aaptOptions.cruncherEnabled false aaptOptions.useNewCruncher false 12