RabbitMQ 之 延迟队列

目录

​编辑一、延迟队列概念

二、延迟队列使用场景

三、整合 SpringBoot

1、创建项目

2、添加依赖

3、修改配置文件

4、添加 Swagger 配置类

四、队列 TTL

1、代码架构图

2、配置文件代码类

3、生产者

4、消费者

5、结果展示

五、延时队列优化

1、代码架构图

2、配置文件

3、生产者​编辑

4、结果展示

六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

2、代码架构图

 3、配置文件类代码

4、生产者

5、消费者

6、结果展示


一、延迟队列概念

延时队列 , 队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。

二、延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:
发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。


三、整合 SpringBoot

1、创建项目

2、添加依赖

<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

3、修改配置文件

spring.rabbitmq.host=111.229.153.16
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.mvc.pathmatch.matching-strategy=ant_path_matcher

4、添加 Swagger 配置类

@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig(){return new Docket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo(){return new ApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(new Contact("馒头警告", "www.baidu.com","2714858327@qq.com")).build();}
}

四、队列 TTL

1、代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:


2、配置文件代码类

// TTL 队列,配置文件类代码
@Configuration
public class TTLQueueConfig {// 普通交换机的名称public static final String X_EXCHANGE = "X";// 死信交换机名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";// 普通队列名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";// 死信队列名称public static final String DEAD_LETTER_QUEUE_D = "QD";// 声明 X 交换机@Bean(value = "xExchange") // 相当于别名public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}// 声明 Y 交换机@Bean(value = "yExchange") // 相当于别名public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明普通队列 TTL 为 10s@Bean(value = "queueA") // 相当于别名public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","YD");// 设置 TTLarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明普通队列 TTL 为 10s@Bean(value = "queueB") // 相当于别名public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","YD");// 设置 TTLarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明死信队列@Bean(value = "queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();}// 绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA ,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB ,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 绑定@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD ,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

3、生产者

// 发送延迟消息
@Slf4j
@Configuration
@RequestMapping("/ttl")
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 开始发消息@GetMapping("/sendmsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自 TTL 为 10s 的队列" + message);rabbitTemplate.convertAndSend("X","XB","消息来自 TTL 为 40s 的队列" + message);}
}

4、消费者

// 队列 TTL
@Slf4j
@Component
public class DeadLetterQueueConsumer {// 接收消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel){String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

5、结果展示


五、延时队列优化

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?


1、代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间


2、配置文件

 


3、生产者


4、结果展示

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“

因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。


六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

大家可以参考网上的教程自行去下载安装,这里就不再进行描述了

如果安装好了,就会有一个叫做  x-delayed-message 的类型

一旦装完插件之后,消息延迟的位置就换到交换机了,相当于生产者发消息,这个消息停留在交换机这个位置,再将消息发给队列


2、代码架构图

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

一个生产者,要走延迟的交换机,延迟的交换机根据 RoutingKey ,路由到延迟的队列,而延迟的位置是在交换机的位置,队列被消费者所消费


 3、配置文件类代码

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@Configuration
public class DelayedQueueConfig {// 交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";// 队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";// RoutingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";// 声明交换机 基于插件的@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}// 声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}// 绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

4、生产者

    // 开始发消息  基于插件的  消息及延迟的时间@GetMapping("/senddelaymsg/{message}/{delaytime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delaytime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delaytime,message);rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey",message, msg ->{// 设置发送消息时候的延迟时长msg.getMessageProperties().setDelay(delaytime);return msg;});}

5、消费者

// 消费者 基于插件的延迟消息
@Slf4j
@Component
public class DelayQueueConsumer {// 监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiverDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}
}

6、结果展示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/864499.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

白骑士的Python教学高级篇 3.4 Web开发

系列目录 上一篇&#xff1a;白骑士的Python教学高级篇 3.3 数据库编程 在现代软件开发中&#xff0c;Web开发占据了重要的一席之地。通过Web开发&#xff0c;我们可以创建从简单的个人博客到复杂的电子商务网站等各种应用。在Python的生态系统中&#xff0c;Flask和Django是两…

Python番外篇之责任转移:有关于虚拟机编程语言的往事

编程之痛 如果&#xff0c;你像笔者一样&#xff0c;有过学习或者使用汇编语言与C、C等语言的经历&#xff0c;一定对下面所说的痛苦感同身受。 汇编语言 将以二进制表示的一条条CPU的机器指令&#xff0c;以人类可读的方式进行表示。虽然&#xff0c;人类可读了&#xff0c…

Java后端开发的最佳工程实践与规范

Java后端开发的最佳工程实践与规范 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将探讨Java后端开发中的最佳工程实践与规范&#xff0c;这些实践可以…

使用deep修改前端框架中的样式

目录 1.deep的作用 2.使用方式 3.特别说明 scoped 的实现原理&#xff1a; !important 1.deep的作用 /deep/、::v-deep 和 :deep 都是用于穿透组件作用域的选择器。它们的主要目的是允许开发者在父组件中直接选择并样式化子组件内部的元素&#xff0c;即使这些元素被封装在…

拓扑学习系列(1)拓扑球与拓扑环面与同胚

拓扑球与拓扑环面 在拓扑学中&#xff0c;"Topological Sphere" 和 "Topological Torus" 是两种不同的拓扑空间&#xff0c;它们具有特定的拓扑性质和几何特征。 Topological Sphere&#xff08;拓扑球&#xff09;&#xff1a; 拓扑球是一个二维曲面&…

JVM原理(九):JVM虚拟机工具之可视化故障处理工具

1. JHSDB:基于服务性代理的调试工具 JHSDB是一款基于服务性代理实现的进程外调试工具。 服务性代理是HotSpot虚拟机中一组用于映射Java虚拟机运行信息的、主要基于Java语言实现的API集合。 2. JConsole:Java监视与管理控制台 JConsole是一款基于JMX的可视化监视、管理工具。…

电脑有线网卡和无线网卡的MAC地址

电脑上的无线网卡和有线网卡是两种不同类型的网络接口卡&#xff0c;它们各自有不同的功能和连接方式。 无线网卡&#xff1a; 功能&#xff1a;无线网卡允许计算机通过无线信号连接到网络&#xff0c;通常是Wi-Fi网络。连接方式&#xff1a;无需物理电缆&#xff0c;通过无线…

编程与教学的奇妙交织:探索未来的教育模式

编程与教学的奇妙交织&#xff1a;探索未来的教育模式 在当今快速发展的科技时代&#xff0c;编程与教学之间的融合已经不再是遥不可及的梦想&#xff0c;而是逐渐成为教育领域的一股新浪潮。本文将从四个方面、五个方面、六个方面和七个方面深入探讨编程与教学的奇妙交织&…

Mac批量替换文件夹下所有文件内容

今天接了一个小需求&#xff0c;将文件夹下字段为% XXDM %替换成#{XXDM} 在苹果系统&#xff08;macOS&#xff09;下&#xff0c;可以使用命令行工具find结合sed来实现对A文件夹下所有文件内容的批量替换。具体操作步骤如下&#xff1a; 1、打开终端&#xff08;Terminal&am…

千益畅行,旅游卡,如何赚钱?

​ 赚钱这件事情&#xff0c;只有自己努力执行才会有结果。生活中没有幸运二字&#xff0c;每个光鲜亮丽的背后&#xff0c;都是不为人知的付出&#xff01; #旅游卡服务#

springboot拦截器,ThreadLocal(每个线程的公共区域)

拦截器 配置信息&#xff08;拦截所有请求&#xff09; 其实这种可以作为springAOP作日志记录

Spring 框架中都用到了哪些设计模式:单例模式、策略模式、代理模式

Spring 框架是一个功能强大的企业级应用开发框架&#xff0c;它使用了多种设计模式来提高代码的可维护性、可扩展性和可重用性。以下是 Spring 框架中常见的几个设计模式&#xff0c;并简要说明它们的应用场景&#xff1a; 1. 单例模式&#xff08;Singleton Pattern&#xff…

SpringCloud_Eureka注册中心

概述 Eureka是SpringCloud的注册中心。 是一款基于REST的服务治理框架&#xff0c;用于实现微服务架构中的服务发现和负载均衡。 在Eureka体系中&#xff0c;有两种角色: 服务提供者和服务消费者。 服务提供者将自己注册到Eureka服务器&#xff0c;服务消费者从Eureka服务器中…

白骑士的Python教学实战项目篇 4.1 数据分析与可视化

系列目录 上一篇&#xff1a;白骑士的Python教学高级篇 3.4 Web开发​​​​​​​ 在本篇内容中&#xff0c;我们将介绍如何使用Python进行数据分析与可视化。数据分析与可视化是数据科学的重要组成部分&#xff0c;能够帮助我们从数据中提取有价值的信息和洞察。我们将使用P…

软件著作权申请:保障开发者权益,促进软件创新

一、引言 在数字化时代&#xff0c;软件作为信息技术的核心&#xff0c;已成为推动社会进步和经济发展的重要力量。然而&#xff0c;随着软件产业的蓬勃发展&#xff0c;软件侵权和抄袭现象也日益严重。为了保护软件开发者的合法权益&#xff0c;促进软件产业的健康发展&#…

MPP和Hadoop的架构与场景分析

1、架构&#xff1a; MPP&#xff1a; MPP (Massively Parallel Processing)大规模并行处理&#xff0c;最开始的设计目的是为了消除共享资源的使用。 MPP数据库是&#xff1a;采用非共享架构&#xff08;Shared Nothing&#xff09;架构的分布式并行结构化数据库集群&#xff…

PyTorch从零开始实现LSTM

文章目录 LSTM基础理论从零开始实现LSTM简洁版LSTM实现参考资料 LSTM基础理论 关于LSTM的基础理论不再赘述&#xff0c;可以参考资料&#xff1a; RNN神经网络-LSTM模型结构https://github.com/ShusenTang/Dive-into-DL-PyTorch/blob/master/docs/chapter06_RNN/6.8_lstm.md …

摄影后期色彩管理流程(Lightroom篇)

在摄影后期处理中&#xff0c;色彩管理是确保图像从捕捉到输出的一致性和准确性的关键。Lightroom 和 Photoshop 其实已经将这套色彩管理流程作为默认选项&#xff0c;如果实质操作时仍存在色彩偏差的问题&#xff0c;可参考以下内容。 ProPhoto RGB > Adobe RGB > sRGB …

【Android面试八股文】性能优化相关面试题:如何查找CPU占用?

文章目录 一、 如何查找CPU的占用问题二、TraceView的使用关于TraceView和Android Studio的Profiler第一步、通过Android studio 打开`Android profiler`第二步、使用步骤第三步、技术说明第四步、CPU占用相关指标说明扩展阅读一、 如何查找CPU的占用问题 在Android开发中,如…

linux 控制台非常好用的 PS1 设置

直接上代码 IP$(/sbin/ifconfig eth0 | awk /inet / {print $2}) export PS1"\[\e[35m\]^o^\[\e[0m\]$ \[\e[31m\]\t\[\e[0m\] [\[\e[36m\]\w\[\e[0m\]] \[\e[32m\]\u\[\e[0m\]\[\e[33m\]\[\e[0m\]\[\e[34m\]\h(\[\e[31m\]$IP\[\e[m\])\[\e[0m\]\n\[\e[35m\].O.\[\e[0m\]…