延迟队列的理解与使用

目录

一、场景引入

二、延迟队列的三种场景

 1、死信队列+TTL对队列进行延迟

 2、创建通用延时消息+死信队列 对消息延迟

 3、使用rabbitmq的延时队列插件

x-delayed-message使用

父pom文件

pom文件

配置文件

config

生产者

消费者

结果


一、场景引入

我们知道可以通过TTL来对队列进行设置过期时间;通过后置处理器MessagePostProcessor对消息进行设置过期时间;

 那么根据TTL及MessagePostProcessor机制可以处理关于延迟方面的问题。

比如:秒杀之后,给30分钟时间进行支付,如果30分钟后,没有支付,订单取消。

比如:餐厅订座,A用户早上8点预定的某家餐厅下午6点的座位,B用户中午12点预定的下午5点的座位;根据场景我们需要的时先让B用户进行消费,然后A用户再消费;这时TTL和MessagePostProcessor延迟就已经不能满足订餐的场景了;

因为TTL是对队列进行延迟,MessagePostProcessor是对消息进行延迟,但是MessagePostProcessor对消息延迟是不能根据订座的时间去排序消费的;

/*** 比如当我们发送第一个消息时延迟的时间时50s,而发送的第二个消息延迟时间是30s,虽然延迟30s的消息比延迟50s发送的晚* 但按照我们设想的情况,延迟30s的消息应该先消费;可是实际情况却不是这样,而是延迟50s的消息到达时间后 30s的才能消费!(队列先进先出)* 那这样此方式的不足就出现了!* 场景:*      A用户和B用户预定餐厅,A用户先开始预定的,预定的是下午6点。B用户比A用户预定操作晚一些,但是B用户预定的时间是下午5点。通过此场景*      我们希望的是B用户先进行用餐(因为他预定的吃饭时间比A早一些,需要先安排吃饭。不能说A用户没到6点B用户预定5点的吃不了。),根据此*      场景 之前的队列延迟还是消息延迟都不能满足场景需求了,这样就需要另一种延迟方式进行解决了! ->使用rabbitmq的延时队列插件*//*** 注意:*      TTL是对队列进行延迟,只要是在此队列中的消息都会按照TTL设置时间进行延迟;*      MessagePostProcessor是对消息进行延迟;**      如果我们不仅使用了消息延迟,而且还使用了队列延迟,那么延迟的时间就会以小的时间为准!*   理解:*      如果a消息设置的消息延迟是30s,b消息设置的延迟时间是90s,队列设置的延迟是60s。那么a消息最终的延迟是30s(a的消息延迟与队列延迟*      比对以延迟时间小的为准!),b消息最终延迟的时间是60s(b的消息延迟与队列延迟比对以延迟的时间小的为准!)*/

 二、延迟队列的三种场景

1、死信队列+TTL对队列进行延迟

给队列设置TTL过期时间,此队列可不用绑定消费者,时间到后把消息投递到死信队列中,由死信队列的消费者进行消费,这样就能达到延迟消费的作用

   @Beanpublic Queue directQueueLong(){return   QueueBuilder.durable("业务队列名称").deadLetterExchange("死信交换机名称").deadLetterRoutingKey("死信队列 RoutingKey").ttl(20000) // 消息停留时间//.maxLength(500).build();}

监听死信队列,即可处理超时的消息队列

缺点:

上述实现方式中,ttl延时队列中所有的消息超时时间都是一样的,如果不同消息想设置不一样的超时时间,就需要建立多个不同超时时间的消息队列,比较麻烦,且不利于维护。

 2、创建通用延时消息+死信队列 对消息延迟

这样的方式可对每一个消息设置指定的过期时间,不用像TTL那样给队列设置过期时间(若队列设置的过期时间达到后,其队列中的消息均会被删除或别的处理),但是由于队列是先进先出,若先投递的消息设置的过期时间是40s,后投递的消息过期时间是30s,那么设置过期时间为30s的并不会到30s时就投递到死信队列中,而是等40s到期后才会一起被投递。

rabbitTemplate.convertAndSend("交换机名称", "RoutingKey","对象",
message => {message.getMessageProperties().setExpiration(String.valueOf(5000))// 设置消息的持久化属性//这样发送的消息就会被持久化到 RabbitMQ 中,即使 RabbitMQ 重启,消息也不会丢失。                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});

缺点:

该种方式可以创建一个承载不同超时时间消息的消息队列,但是这种方式有一个问题,如果消息队列中排在前面的消息没有到超时时间,即使后面的消息到了超时时间,先到超时时间的消息也不会进入死信队列,而是先检查排在最前面的消息队列是否到了超时时间,如果到了超时时间才会继续检查后面的消息。

 3、使用rabbitmq的延时队列插件

使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队

1、下载延迟插件

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。

下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2、安装插件并启用

下载完成后直接把插件放在 /home/rabbitmq 目录,然后拷贝到容器内plugins目录下(my-rabbit是容器的name,也可以使用容器id)

docker cp /home/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins

 进入 Docker 容器

docker exec -it rabbit /bin/bash

 在plugins内启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 退出容器

exit

重启 RabbitMQ

docker restart my-rabbit

 貌似不重启也能生效!

 结果:

 就多了一个x-delayed-message交换机

x-delayed-message使用

父pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.1</version>
<!--        <version>2.2.5.RELEASE</version>--><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq</name><properties><java.version>8</java.version><hutool.version>5.8.3</hutool.version><lombok.version>1.18.24</lombok.version></properties><description>spring-boot-rabbitmq</description><packaging>pom</packaging><modules><module>direct-exchange</module><module>fanout-exchange</module><module>topic-exchange</module><module>game-exchange</module><module>dead-letter-queue</module><module>delay-queue</module><module>delay-queue2</module></modules><dependencyManagement><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies></dependencyManagement></project>

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><relativePath>../pom.xml </relativePath></parent><artifactId>delay-queue2</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件

logging.level.com.chensir = debug
server.port=8086
#host
spring.rabbitmq.host=121.40.100.66
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#每个消费者每次可最大处理的nack消息数量
spring.rabbitmq.listener.simple.prefetch=1
#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#监听重试是否可用
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#最大重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
#第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.multiplier=2
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

config

package com.chensir.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Beanpublic MessageConverter messageConverter(){return  new Jackson2JsonMessageConverter();}@Beanpublic CustomExchange customExchange(){Map<String,Object> args = new HashMap<>();//延迟时以direct直连方式绑定args.put("x-delayed-type","direct");// name:交换机名称 type:类型 固定值x-delayed-messagereturn new CustomExchange("chen-x-delayedExchange","x-delayed-message",true,false,args);}@Beanpublic Queue directQueueLong(){return   QueueBuilder.durable("chen-DirectQueue").build();}@Beanpublic Binding binding(){return  BindingBuilder.bind(directQueueLong()).to(customExchange()).with("direct123").noargs();}
}

生产者

package com.chensir.provider;import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class DirectProvider {@Resourceprivate RabbitTemplate rabbitTemplate;public  void  send(){OrderIngOk orderIngOk = new OrderIngOk();orderIngOk.setOrderNo("202207149687-1");orderIngOk.setId(1);orderIngOk.setUserName("倪海杉前-延迟40秒");rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk,m->{m.getMessageProperties().setDelay(40*1000); //设置延迟时间,对延迟交换机有效// m.getMessageProperties().setExpiration(String.valueOf(30*1000)); 设置过期时间,对队列有效return m;});OrderIngOk orderIngOk2 = new OrderIngOk();orderIngOk2.setOrderNo("202207149687-2");orderIngOk2.setId(2);orderIngOk2.setUserName("倪海杉后-延迟30秒");rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk2,m->{m.getMessageProperties().setDelay(30*1000);return m;});log.debug("消息生产完成");}}

消费者

package com.chensir.consumer;import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class DirectConsumer {@RabbitHandler@RabbitListener(queues = "chen-DirectQueue" )public void  process(OrderIngOk orderIngOk) throws IOException {try {// 处理业务开始log.debug("接受到消息,并正常处理结束,{}", JSONUtil.toJsonStr(orderIngOk));// 处理业务结束} catch (Exception ex){throw ex;}}
}

结果

2023-08-29 18:27:45.983 DEBUG 15568 --- [           main] com.chensir.provider.DirectProvider      : 消息生产完成
2023-08-29 18:28:16.143 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer      : 接受到消息,并正常处理结束,{"id":2,"OrderNo":"202207149687-2","userName":"倪海杉后-延迟30秒"}
2023-08-29 18:28:26.057 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer      : 接受到消息,并正常处理结束,{"id":1,"OrderNo":"202207149687-1","userName":"倪海杉前-延迟40秒"}

可见 延迟40s的是先发送的,但是最终结果是先消费延迟30s的。这样就能达到我们订座的场景需求了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/66191.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mybatis学习|多对一、一对多

有多个学生&#xff0c;没个学生都对应&#xff08;关联&#xff09;了一个老师&#xff0c;这叫&#xff08;多对一&#xff09; 对于每个老师而言&#xff0c;每个老师都有N个学生&#xff08;学生集合&#xff09;&#xff0c;这叫&#xff08;一对多&#xff09; 测试环境…

[杂谈]-快速了解Modbus协议

快速了解Modbus协议 文章目录 快速了解Modbus协议1、为何 Modbus 如此受欢迎2、范围和数据速率3、逻辑电平4、层数5、网络与通讯6、数据帧格式7、数据类型8、服务器如何存储数据9、总结 ​ Modbus 是一种流行的低速串行通信协议&#xff0c;广泛应用于自动化行业。 该协议由 Mo…

力扣2. 两数相加

2. 两数相加 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff0c;这两个…

16 个前端安全知识

16 个前端安全知识 去年 security course 上的是 React&#xff0c;然后学了一些 一些 React 项目中可能存在的安全隐患&#xff0c;今年看了一下列表&#xff0c;正好看到了前端也有更新&#xff0c;所以就把这个补上了。 一个非常好学习各种安全隐患的机构是 https://owasp…

机器学习基础16-建立预测模型项目模板

机器学习是一项经验技能&#xff0c;经验越多越好。在项目建立的过程中&#xff0c;实 践是掌握机器学习的最佳手段。在实践过程中&#xff0c;通过实际操作加深对分类和回归问题的每一个步骤的理解&#xff0c;达到学习机器学习的目的 预测模型项目模板 不能只通过阅读来掌握…

机器学习的第一节基本概念的相关学习

目录 1.1 决策树的概念 1.2 KNN的概念 1.2.1KNN的基本原理 1.2.2 流程&#xff1a; 1.2.3 优缺点 1.3 深度学习 1.4 梯度下降 损失函数 1.5 特征与特征选择 特征选择的目的 1.6 python中dot函数总结 一维数组的点积&#xff1a; 二维数组&#xff08;矩阵&#xff09;的乘法&am…

深入了解Kubernetes(k8s):安装、使用和Java部署指南(持续更新中)

目录 Docker 和 k8s 简介1、kubernetes 组件及其联系1.1 Node1.2 Pod1.3 Service 2、安装docker3、单节点 kubernetes 和 KubeSphere 安装3.1 安装KubeKey3.2 安装 kubernetes 和 KubeSphere3.3 验证安装结果 4、集群版 kubernetes 和 KubeSphere 安装5、kubectl 常用命令6、资…

浅谈下cdn以及防盗链问题

目录 一、什么是cdn 二、使用cdn带来的好处 三、CDN工作原理 四、cdn使用场景 五、流媒体CDN之防盗链问题 一、什么是cdn CDN&#xff08;Content Delivery Network&#xff09;是一种分布式网络架构&#xff0c;用于提供高效的内容分发服务。CDN通过将内容缓存在离用户最…

Postgresql JSON对象和数组查询

文章目录 一. Postgresql 9.5以下版本1.1 简单查询(缺陷&#xff1a;数组必须指定下标&#xff0c;不推荐)1.1.1 模糊查询1.1.2 等值匹配1.1.3 时间搜索1.1.4 在列表1.1.5 包含 1.2 多层级JSONArray&#xff08;推荐&#xff09;1.2.1 模糊查询1.2.2 模糊查询 NOT1.2.3 等值匹配…

恢复数据的利器:易我数据恢复终身技术版v16.2.0.0

EaseUS Data Recovery Wizard为全球提供数据恢复方案,用于误删数据数据,电脑误删文件恢复,格式化硬盘数据恢复,手机U盘数据恢复等,RAID磁盘阵列数据恢复,分区丢失及其它未知原因丢失的数据恢复,简单易用轻松的搞定数据恢复。 特点描述 - 易我数据恢复中文便携版&#xff0c;无…

STM32f103入门(10)ADC模数转换器

ADC模数转换器 ADC简介AD单通道初始化代码编写第一步开启时钟第二步 RCCCLK分频 6分频 72M/612M第三步 配置GPIO 配置为AIN状态第四步&#xff0c;选择规则组的输入通道第五步 用结构体 初始化ADC第六步 对ADC进行校准编写获取电压函数初始化代码如下 Main函数编写 ADC简介 ADC…

植物根系基因组与数据分析

1.背景 这段内容主要是关于植物对干旱胁迫的反应&#xff0c;并介绍了生活在植物体内外以及根际的真菌和细菌的作用。然而&#xff0c;目前对这些真菌和细菌的稳定性了解甚少。作者通过调查微生物群落组成和微生物相关性的方法&#xff0c;对农业系统中真菌和细菌对干旱的抗性…

windows主机和Ubuntu虚拟机共享设置

参考文章 Ubuntu Linux 与主机共享文件夹 vim 修改文件出现错误 “ E45: ‘readonly’ option is set (add to override)“ vim退出时报错“E212: Cant open file for writing”的解决办法 VMware 安装后&#xff0c;安装Ubuntu 20.04一路顺利。 1&#xff0c;在VMware设置…

Qt应用开发(基础篇)——输入对话框 QInputDialog

一、前言 QInputDialog类继承于QDialog&#xff0c;是一个简单方便的对话框&#xff0c;用于从用户获取单个值。 对话框窗口 QDialog QInputDialog输入对话框带有一个文本标签、一个输入框和标准按钮。输入内容可以字符、数字和选项&#xff0c;文本标签用来告诉用户应该要输入…

LAMP介绍与配置

一.LAMP 1.1.LAMP架构的组成 CGI&#xff08;通用网关接口&#xff09;和FastCGI&#xff08;快速公共网关接口&#xff09;都是用于将Web服务器与后端应用程序&#xff08;如PHP、Python等&#xff09;进行交互的协议/接口。 特点 CGI FastCGI 运行方式 每个请求启动…

死信队列理解与使用

一、简介 在rabbitMQ中常用的交换机有三种&#xff0c;直连交换机、广播交换机、主题交换机&#xff1b; 直连交换机中队列与交换机需要约定好routingKey去进行绑定&#xff1b; 广播交换机并不需要routingKey绑定,只需队列与交换机绑定即可&#xff1b; 主题交换机最大的特…

​7.1 项目1 学生通讯录管理:文本文件增删改查(C++版本)(自顶向下设计+断点调试) (A)​

C自学精简教程 目录(必读) 作业目标&#xff1a; 这个作业中&#xff0c;你需要综合运用之前文章中的知识&#xff0c;来解决一个相对完整的应用程序。 作业描述&#xff1a; 1 在这个作业中你需要在文本文件中存储学生通讯录的信息&#xff0c;并在程序启动的时候加载这些…

python+requests实现接口自动化测试

这两天一直在找直接用python做接口自动化的方法&#xff0c;在网上也搜了一些博客参考&#xff0c;今天自己动手试了一下。 一、整体结构 上图是项目的目录结构&#xff0c;下面主要介绍下每个目录的作用。 Common:公共方法:主要放置公共的操作的类&#xff0c;比如数据库sql…

简单了解网络传输介质

目录 一、同轴电缆 二、双绞线 三、光纤 四、串口电缆 一、同轴电缆 10BASE前面的数字表示传输带宽为10M&#xff0c;由于带宽较低、现在已不再使用。 50Ω同轴电缆主要用来传送基带数字信号&#xff0c;因此也被称作为基带同轴电缆&#xff0c;在局域网中得到了广泛的应用…

Prompt GPT推荐社区

大家好&#xff0c;我是荷逸&#xff0c;这次给大家带来的是我日常学习Prompt社区推荐 Snack Prompt 访问地址&#xff1a;http://snackprompt.com Snack Prompt是一个采用的Prompts诱导填空式的社区&#xff0c;它提供了一种简单的prompt修改方式&#xff0c;你只需要输入关…