rabbit MQ的延迟队列处理模型示例(基于SpringBoot死信模式)

在这里插入图片描述

说明:
生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。

这里用SpringBoot maven:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
在这里插入图片描述

框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:

package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** TTL延迟队列配置文件类**/
@Configuration
public class TtlQueueConfig {////普通交换机的名称 Xpublic static final String X_EXCHANGE = "X";//死信交换机名称 Ypublic static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列QA QBpublic static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列名称QDpublic static final String DEAD_LETTER_QUEUE = "QD";////声明X_EXCHANGE@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换Y_DEAD_LETTER_EXCHANGE@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 QA@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明队列 QB@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//声明死信队列QD@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//捆绑//绑定队列QA与交换机X_EXCHANGE@Beanpublic Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定队列QB与交换机X_EXCHANGE@Beanpublic Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定队列QD与交换机Y_Exchange@Beanpublic Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;//发送延迟消息import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/senMsg/{message}")public void sendMes(@PathVariable String message){log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);}
}
消费者与死信队列创建一个监听者示例:
package com.esint.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 队列TTL消费者*/@Slf4j
@Component
public class DeadLetterQueueConsumer {//接受消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);}
}

rabbitmq的配置文件:

spring:rabbitmq:host: *.*.*.*port: 5672username: guestpassword: guest
接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置

队列:
在这里插入图片描述

交换机:
在这里插入图片描述
点开详细后,也能考到他们之间的绑定关系:

在这里插入图片描述

在这里插入图片描述

消息发布测试:

生产者发送消息:

浏览器:
http://127.0.0.1:19092/ttl/senMsg/nice

通过生产者发送:nice

当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice

消费者在10s后和40秒分别收到了消息:
在这里插入图片描述


拓展:是不是有一种可能,如果再队列中不设置过期时间,在生产者发送消息时设置过期时间 来实现过期时间自由设定,而延迟自由?

结论是不能:
rabbitMQ队列只会检查第一个消息是否过期。举例如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。

示例验证:

增加一个无过期时间约束的队列,以routing-key为XC绑定X交换机,过期后以routing-key为YD绑定Y交换机。
过期时间放生产者发送时设定。

在的rabbitMQ配置类中增加QC 绑定前(X routing-key=XC)后(Y routing-key=YD)交换机:

    // 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时public static final  String QUEUE_C ="QC";//声明队列 QC 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时@Bean("queueC")public Queue queueC(){Map<String,Object> arguments = new HashMap<>(2);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置routing-keyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}
//绑定队列QC与交换机X_EXCHANGE   优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时@Beanpublic Binding queueCBindXExchange(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}

生产者:

@GetMapping("/sendttl/{message}/{ttlTime}")public void sendMes(@PathVariable String message,@PathVariable String ttlTime){
/*** 死信队列做延迟时的缺陷:* rabbitMQ只会检查第一个消息是否过期带来的问题就是,如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。*/log.info("当前时间:{},发送一条ttl为{}ms的消息给QC队列:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,mes->{mes.getMessageProperties().setExpiration(ttlTime);return mes;});}

消费者不变,启动服务!

生产者发送消息:第一条 3000ms
http://127.0.0.1:19092/ttl/sendttl/第一条30000ms消息/30000
http://127.0.0.1:19092/ttl/sendttl/第二条3000ms消息/3000

在这里插入图片描述
结论:第二条虽然早早过期,它依然需要等待第一条过期后,才能排到他。rabbitMQ的队列过期检查机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/158998.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

迪文科技工业串口屏(DMG10600C070-03WTC)更新程序烧录刷机

迪文科技工业串口屏(DMG10600C070-03WTC)更新程序烧录刷机 问题 使用SD卡上电烧录&#xff0c;SD卡文件路径如下&#xff1a; 烧录时&#xff0c;无法写入&#xff0c;成功烧录文件数为0 解决方法 格式化读卡器 格式化脚本 echo off %1 %2 ver|find "5.">…

【Redis篇】简述Redis | 详解Redis命令

文章目录 &#x1f38d;什么是Redis&#x1f38d;Redis特点&#x1f38d;Redis应用场景&#x1f354;Windows安装Redis⭐启动Redis &#x1f33a;Redis数据类型&#x1f33a;Redis常用命令⭐字符串string操作命令⭐哈希hash操作命令⭐列表list操作命令⭐集合set操作命令⭐有序集…

C++实战学习笔记

文章目录 erase()uniquevector的insert()std::string::npos erase() &#xff08;1&#xff09;erase(pos,n); 删除从pos开始的n个字符&#xff0c;比如erase(0,1)就是删除第一个字符 &#xff08;2&#xff09;erase(position);删除position处的一个字符(position是个string类…

linux下流媒体压力测试工具的使用

前言 因为领导要求做linux的推拉流时服务器压力测试&#xff0c;于是在网上找了找。一顿操作下来&#xff0c;发现很多软件盗用一款名为srs-bench的开源软件。 该代码仓库有详细的使用说明&#xff0c;而且可以在issues中找到可能会遇到的问题的解决办法 需要下载该仓库的源…

RK3568开发板在工控工业物联网网关方面的应用

在数字化转型的浪潮中&#xff0c;工控物联网关产品扮演着重要的角色。这些产品通过连接工业设备和网络&#xff0c;为数据传输和分析提供了便利。而迅为RK3568核心板作为一款高性能的芯片&#xff0c;为工控物联网关产品的性能提升和功能扩展提供了强大的支持。 迅为RK3568核心…

基于JAVA+SpringBoot+VUE+微信小程序的前后端分离咖啡小程序

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; 随着社会的快速发展和…

2023年约特干故城夜间演艺《万方乐奏有于阗》完美谢幕

11月19日&#xff0c;记者走进约特干故城看到演员在欢乐地跳着刀郎舞和古典舞&#xff0c;庆祝今年以来夜间演艺《万方乐奏有于阗》演出200场完美谢幕。 11月19日在约特干故城&#xff0c;演员正在表演迎宾乐舞。阿卜力克木依卜拉依木摄 当天晚上&#xff0c;城楼上旌旗猎猎&am…

Java-接口

接口 接口 接口就是公共的行为规范,只要实现时符合标准就可以通用. 接口可以看成是: 多个类的公共规范,是一种引用数据类型. 使用关键字interface实现接口. 接口是不能被实例化的. 接口中的成员变量默认是 public static final 接口中只能有抽象方法,当中的方法不写,也是pu…

C语言中的多线程调用

功能 开启一个线程&#xff0c;不断打印传进去的参数&#xff0c;并且每次打印后自增1 代码 #include<windows.h> #include<pthread.h> #include<stdio.h>void* print(void *a) {int *ic(int*)a;float *fc(float*)(asizeof(int)*2);double *dc(double*)(as…

拆解现货黄金隔夜利息计算公式

在讨论现货黄金投资手续费的时候&#xff0c;隔夜利息是经常被忽略的一个方面&#xff0c;但它是投资者不得不考虑的成本因素&#xff0c;特别是在中长线交易的情况下。隔夜利息是根据投资者的持仓数量和交易方向所计算出的利息&#xff0c;如果投资者需要持仓过夜&#xff0c;…

9.3 Windows驱动开发:内核解析PE结构节表

在笔者上一篇文章《内核解析PE结构导出表》介绍了如何解析内存导出表结构&#xff0c;本章将继续延申实现解析PE结构的PE头&#xff0c;PE节表等数据&#xff0c;总体而言内核中解析PE结构与应用层没什么不同&#xff0c;在上一篇文章中LyShark封装实现了KernelMapFile()内存映…

Transformer的一点理解,附一个简单例子理解attention中的QKV

Transformer用于目标检测的开山之作DETR&#xff0c;论文作者在附录最后放了一段简单的代码便于理解DETR模型。 DETR的backbone用的是resnet-50去掉了最后的AdaptiveAvgPool2d和Linear这两层。 self.backbone nn.Sequential(*list(resnet50(pretrainedTrue).children())[:-2…

LabVIEW如何获取波形图上游标所在位置的数值

LabVIEW如何获取波形图上游标所在位置的数值 获取游标所在位置数值的一种方法是利用波形图的游标列表属性。 在VI的程序框图中&#xff0c;右键单击波形图并选择创建引用 &#xff0c;然后将创建的引用节点放在程序框图上。 在程序框图上放置一个属性节点&#xff0c;并将其…

聚焦数字化项目管理——2023年PMI项目管理大会亮点回顾

11月18日-19日&#xff0c;由PMI&#xff08;中国&#xff09;主办的2023年PMI项目管理大会在上海浦东嘉里大酒店圆满召开。本次大会以“数智时代&#xff0c;汇创未来”为主题&#xff0c;聚焦数智时代大背景下的项目管理行业发展和人才培养&#xff0c;吸引了海内外千余名项目…

基于Android校园交流uniAPP+vue 微信小程序v7e1

本系统结合现今XX校园交流APP的功能模块以及设计方式进行分析&#xff0c;使用Android平台和Ssm框架进行开发设计&#xff0c;具体研究内容如下&#xff1a; (1) 系统管理员主要对用户管理、类型管理、娱乐天地管理、投诉举报管理、学习平台、我的收藏管理、系统管理等功能进…

wvp-gb28181-pro打包

生成可执行jar cd wvp-GB28181-pro mvn package复制错误已复制 生成war cd wvp-GB28181-pro mvn package -P war 生成的包的路径 wvp-GB28181-pro\target

海康威视监控相机的SDK与opencv调用(非工业相机)

1.研究内容 本篇主要对海康威视的监控相机的SDK回调进行研究&#xff0c;并于opencv结合&#xff0c;保存图像,以供后续其他处理&#xff0c;开发语言为C 2.步骤及方法 2.1 海康SDK介绍 海康SDK下载地址 根据自身编译环境&#xff0c;下载对应的SDK&#xff0c;需要注意的是…

身份证号码校验

根据《新版外国人永久居留身份证适配性改造要点》&#xff0c;公司需要把代码中对身份证的校验进行优化 就文档内容可以看到需要优化的要点是&#xff1a; 新版永居证号码以 9 开头 受理地区代码出生日期顺序码校验码&#xff1b;&#xff08;共18位&#xff09; eg&#xff…

渗透工具---BurpSuite 插件开发之HelloWorld

本文主要记录如何利用burp官方的新版API即MontoyaApi 写helloworld&#xff08;上一篇的demo使用旧版api写的&#xff0c;这篇及后续开发将采用新版api&#xff09; 先看效果图 更多详细内容见下方 这里有更详细更全面的代码内容 以及配置相关的内容 https://mp.weixin.qq.co…

HarmonyOS ArkTS 基础组件的使用(四)

1 组件介绍 组件&#xff08;Component&#xff09;是界面搭建与显示的最小单位&#xff0c;HarmonyOS ArkUI声明式开发范式为开发者提供了丰富多样的UI组件&#xff0c;我们可以使用这些组件轻松的编写出更加丰富、漂亮的界面。 组件根据功能可以分为以下五大类&#xff1a;…