Springboot集成RabbitMq+延时队列

1. 引入jar包

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置yml

2.1 配置生产者yml

 

spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #开启发送失败退回# simple:同步等待confirm结果,直到超时#correlated:异步回调,次你故意ConfirmCallback,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated

2.2 配置消费者yml

spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestcloud:stream:bindings:delayed-topic-input:destination: delayed-topic-demo #将消费者队列绑定到指定交换机group: group-1 #消费默认分组,消息到达时同一个分组下多个实例情况,只会有一个实例消费这条消息consumer:delayed-exchange: true #开启延时,生产者和消费者端都需要开启这个配置

 3.生产者生产消息

3.1 direct 直连

把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中

3.1.1 直连队列消息发送

/***直接交换机 **/public static final String directExchange = "directExchangeOne";public static final String routingKey1 = "directKey1";public static final String routingKey2 = "directKey2";public static final String directQueue1 = "directQueueOne";public static final String directQueue2 = "directQueueTwo";/*** 直接交换机 一个交换机可以绑定一个队列一个消费者,也可以绑定多个队列多个消费者* 通过指定路由键directRouting发送给交换机directExchange* 交互机directExchange通过指定的路由键把消息msg投递到对应的队列上面去* @param map*/public void directToQueue(Map<String, String> map) {map.put("direct-路由key:",RabbitConstants.routingKey1);rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey1, map);map.put("direct-路由key:",RabbitConstants.routingKey2);rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey2, map);}

3.1.2 直连队列消息绑定

package rabbit.config;import config.RabbitConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置类 : 创建我们的直接交换机和队列,以及直接交换机跟队列的绑定关系* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去**  */
@Configuration
public class DirectConfig {/*** Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列* @return*/@Beanpublic DirectExchange directExchangeOne(){return new DirectExchange(RabbitConstants.directExchange);}@Beanpublic Queue directQueueOne(){return new Queue(RabbitConstants.directQueue1);}@Beanpublic Queue directQueueTwo(){return new Queue(RabbitConstants.directQueue2);}/*** 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息* 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去* @param directQueueOne* @param directExchangeOne* @return*/@Beanpublic Binding directBindingOne(Queue directQueueOne, DirectExchange directExchangeOne){return BindingBuilder.bind(directQueueOne).to(directExchangeOne).with(RabbitConstants.routingKey1);}/*** 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息* 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去* @param directQueueTwo* @param directExchangeOne* @return*/@Beanpublic Binding directBindingTwo(Queue directQueueTwo, DirectExchange directExchangeOne) {return BindingBuilder.bind(directQueueTwo).to(directExchangeOne).with(RabbitConstants.routingKey2);}}

3.1.3 直连队列消息接收

@RabbitListener(queues = RabbitConstants.directQueue1)@RabbitHandler // 指定对消息的处理public void directClientOne(HashMap<String,String> mes){System.out.println("直连队列消息1:" + mes);}/** @RabbitListener(queues = {"directQueue1","directQueue2"}):这样就可以一次消费两条消息 **/@RabbitListener(queues = RabbitConstants.directQueue2)@RabbitHandlerpublic void directClientTwo(HashMap<String,String> mes){System.out.println("直连队列消息2: " + mes);}

 3.1.4 结果:

3.2 fanout 扇形

把消息发送到所有与它绑定的Queue中,没有路由概念

3.2.1 扇形消息发送

@Autowiredpublic RabbitMqProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this);}/**** 扇形交换机* 这个交换机没有路由键概念,就算你绑了路由键也是无视的* 消息会发送到所有绑定的队列上。* @param fanoutMap1*/public void fanoutToQueue(Map<String, String> fanoutMap1) {fanoutMap1.put("fanout-交换机:",RabbitConstants.fanoutExchange1);rabbitTemplate.convertAndSend(RabbitConstants.fanoutExchange1,null,fanoutMap1);}

3.2.2 扇形消息绑定

/*** 扇形交换机* Fanout:转发消息到所有绑定队列,没有路由key* */
@Configuration
public class FanoutConfig {/*** 不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。* @return*/@Beanpublic FanoutExchange fanoutExchange1(){return new FanoutExchange(RabbitConstants.fanoutExchange1);}@Beanpublic Queue fanoutQueue1(){return new Queue(RabbitConstants.fanoutQueue1);}@Beanpublic Queue fanoutQueue2(){return new Queue(RabbitConstants.fanoutQueue2);}/** 扇形交换机没有路由key */@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange1){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);}/** 扇形交换机没有路由key */@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);}}

3.2.3 扇形消息接收

/** 扇形交换机 */public static final String fanoutExchange1 = "fanout_exchange1";public static final String fanoutQueue1 = "fanout_queue1";public static final String fanoutQueue2 = "fanout_queue2";@RabbitListener(queues = RabbitConstants.fanoutQueue1)@RabbitHandlerpublic void fanoutQueue1(HashMap<String,String> fanoutMes){System.out.println("扇形队列消息1: " + fanoutMes);}@RabbitListener(queues = RabbitConstants.fanoutQueue2)@RabbitHandlerpublic void fanoutQueue2(HashMap<String,String> fanoutMes){System.out.println("扇形队列消息2: " + fanoutMes);}

3.2.4 扇形--结果

3.3  topic 主题

将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中--多了匹配的概念

3.3.1 主题队列消息发送

@Autowiredpublic RabbitMqProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this);}/***主题交换机:模糊匹配队列* *:星号表示任意一个字符* 	#:表示任意一个或者多个字符*/// topic 的 routingKeypublic static final String topicA = "helloTopic.world";public static final String topicB = "helloTopic.#";public static final String topicAll = "#";public static final String topicExchange = "topic_exchange";/** 绑定 topicA = "helloTopic.world"*/public static final String topicQueue1 = "topic_queue1";/** 绑定 topicB="helloTopic.#"*/public static final String topicQueue2 = "topic_queue2";/** 绑定 #,匹配所有 */public static final String topicQueue3 = "topic_queue3";/*** 主题交换机:模糊匹配队列* topic.# 可匹配topic topic.add topic.add.add* topic.* 可匹配topic.add  topic.delete* @param map*/public void topicToQueue(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息map.put("Topic-路由key:",RabbitConstants.topicA);rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicA, map);map.put("Topic-路由key:",RabbitConstants.topicB);rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicB, map);map.put("Topic-路由key:",RabbitConstants.topicAll);rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicAll, map);}

3.3.2 主题队列消息绑定

/**** 按规则转发消息*/
@Configuration
public class TopicConfig {/*** Topic Exchange 转发消息主要是根据通配符* 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中* @return*/@Beanpublic TopicExchange topicExchange1(){return new TopicExchange(RabbitConstants.topicExchange);}@Beanpublic Queue topicQueue1(){return new Queue(RabbitConstants.topicQueue1);}@Beanpublic Queue topicQueue2(){return new Queue(RabbitConstants.topicQueue2);}@Beanpublic Queue topicQueue3(){return new Queue(RabbitConstants.topicQueue3);}/*** 消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,* Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中* @param topicQueue1* @param topicExchange1* @return*/@Beanpublic Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange1){return BindingBuilder.bind(topicQueue1).to(topicExchange1).with(RabbitConstants.topicA);}@Beanpublic Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange1){return BindingBuilder.bind(topicQueue2).to(topicExchange1).with(RabbitConstants.topicB);}@Beanpublic Binding topicBinding3(Queue topicQueue3, TopicExchange topicExchange1){return BindingBuilder.bind(topicQueue3).to(topicExchange1).with(RabbitConstants.topicAll);}}

3.3.3 主题队列消息接收

@RabbitListener(queues = RabbitConstants.topicQueue1)@RabbitHandlerpublic void topicQueue1(HashMap<String,String> topicMes){System.out.println("主题消息队列1: " + topicMes);}@RabbitListener(queues = RabbitConstants.topicQueue2)@RabbitHandlerpublic void topicQueue2(HashMap<String,String> topicMes){System.out.println("主题消息队列2: " + topicMes);}@RabbitListener(queues = RabbitConstants.topicQueue3)@RabbitHandlerpublic void topicQueue3(HashMap<String,String> topicMes){System.out.println("主题消息队列匹配所有: " + topicMes);}

3.3.4 主题--结果

3.4 Delayed 延时(需要延时插件,参考我另一篇插件安装)

3.4.1 延时队列消息发送

 /** 延迟队列 */public static final String DELAYED_EXCHANGE_NAME = "myDelayedExchange";public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";/*** 死信延迟队列* @param message*/public void sendDelayedMessage(String message) {System.out.println("Send time 开始: " + LocalDateTime.now());rabbitTemplate.convertAndSend(RabbitConstants.DELAYED_EXCHANGE_NAME,RabbitConstants.DELAYED_ROUTING_KEY,message,messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay(10000); // 设置消息的延长时间延,单位毫秒return messagePostProcessor;});System.out.println("Send time 结束: " + LocalDateTime.now() );}

3.4.2 延时队列消息绑定

public class DelayedConfig {/** 定义一个延迟交换机 **/@Beanpublic CustomExchange delayedExchange() {/*Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");*/return new CustomExchange(RabbitConstants.DELAYED_EXCHANGE_NAME,"x-delayed-message", // 消息类型  x-delayed-messagetrue, // 是否持久化false); // 是否自动删除}/** 延时队列 **/@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(RabbitConstants.DELAYED_QUEUE_NAME).withArgument("x-delayed-type", "direct").build();}/** 绑定队列到这个延迟交换机 */@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(RabbitConstants.DELAYED_ROUTING_KEY).noargs();}
}

3.4.3 延时队列消息接收

@RabbitListener(queues = RabbitConstants.DELAYED_QUEUE_NAME)public void receiveDelayedMessage(String message,  Channel channel) {System.out.println("Received delayed message: " + message);log.info("当前时间:{},接收时长信息给延迟队列:{}", LocalTime.now(),message);System.out.println("Received time: " + LocalDateTime.now() + "  Received: " + message);//    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}

3.4.4 延时--结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/810382.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

主题换肤操作

有许多项目会遇见有light和dark多种颜色方案展示&#xff0c;如下&#xff1a; 这种是怎么实现的呢&#xff1f; 方案1&#xff1a;采用css变量来实现 /* 默认粉色主题 */ :root {--underline-dark: #000d8a;--gray-light: 229, 233, 240;--gray-dark: 34, 41, 57;--black: 1…

华为远程登陆管理配置:轻松掌握核心要点

实验拓扑及需求 实验步骤 A、配置相关地址及连通性测试 R1&#xff1a; [R1]int GigabitEthernet 0/0/0 [R1-GigabitEthernet0/0/0]ip address 192.168.12.1 24 R2&#xff1a; [R2]int gi 0/0/0 [R2-GigabitEthernet0/0/0]ip add 192.168.12.2 24 [R2]int gi 0/0/1 […

如何将对象转换成json字符串,以json格式输出,并获取到其中的特定字段

小王学习录 Json格式示例 1&#xff1a;简单的 JSON 对象示例 2&#xff1a;JSON 对象嵌套示例 3&#xff1a;JSON 数组示例 4&#xff1a;混合使用对象和数组 使用Gson将java对象转换成json字符串哪些数据类型的对象可以使用Gson转换为json字符串如何使用Gson将java对象转换成…

go语言学习--3.常用语句

目录 1.条件语句 1.1 if语句 1.2 if-else语句 1.3 switch语句 1.4 select语句 2.循环语句 2.1循环处理语句 2.2循环控制语句 3.go语言关键字 1.条件语句 和c语言类似&#xff0c;相关的条件语句如下表所示&#xff1a; 1.1 if语句 if 布尔表达式 {/* 在布尔表达式为 t…

小红的白色字符串

题目描述 小红拿到了一个字符串&#xff0c;她准备将一些字母变成白色&#xff0c;变成白色的字母看上去就和空格一样&#xff0c;这样字符串就变成了一些单词。 现在小红希望&#xff0c;每个单词都满足以下两种情况中的一种&#xff1a; 1.开头第一个大写&#xff0c;其余为…

echarts折线图自定义打点标记小工具

由于没研究明白echarts怎么用label和lableLine实现自定义打点标记&#xff0c;索性用markPoint把长方形压扁成线模拟了一番自定义打点标记&#xff0c;记录下来备用。&#xff08;markLine同理也能实现&#xff09; 实现代码如下&#xff1a; <!DOCTYPE html> <html…

C#基础--之数据类型

C#基础–之数据类型 在第一章我们了解了C#的输入、输出语句后&#xff0c;我这一节主要是介绍C#的基础知识&#xff0c;本节的内容也是后续章节的基础&#xff0c;好的开端等于成功的一半。在你阅读完本章后&#xff0c;你就有足够的C#知识编写简单的程序了。但还不能使用封装、…

电视盒子哪个好?2024口碑网络电视盒子排行榜

多年来电视盒子始终占据重要地位&#xff0c;功能上并没有受到影响。在这么多品牌中哪些电视盒子的评价是最好的呢&#xff1f;小编根据各大电商平台的用户评价情况整理了口碑最好的网络电视盒子排行榜&#xff0c;跟着小编一起看看市面上的电视盒子哪个好吧。 TOP 1&#xff1…

OpenHarmony 资源调度之内存管理源码分析

作者&#xff1a;张守忠 1 内存管理简介 内存管理部件位于全局资源调度管控子系统中&#xff0c;基于应用的生命周期状态&#xff0c;更新进程回收优先级列表&#xff0c;通过内存回收、查杀等手段管理系统内存&#xff0c;保障内存供给。 1.1 内存管理框架 内存管理部件主要…

外贸开发信必知技巧:高回复率不再是梦

外贸行业在Zoho的客户群体中占比较高。因为我们的国际化背景、丰富的产品组合、多语言多币种跨时区、高性价比等特点&#xff0c;成为外贸企业开展业务的选择。在和外贸客户沟通中&#xff0c;发现无论是外贸大拿还是新手小白&#xff0c;大家遇到一个共同的问题——发出去的开…

抖音上阳哥的视频号带货推荐靠谱吗?

在抖音这个短视频平台的广阔天地里&#xff0c;阳哥以其独到的眼光和精准的推荐&#xff0c;成为了众多粉丝心中的带货指南。不同于一些网红直接进行视频号带货&#xff0c;阳哥更多地是以一个推荐人的身份出现&#xff0c;为粉丝们筛选并推荐优质的带货内容。那么&#xff0c;…

python——列表(list)

概念 列表一般使用在一次性存储多个数据 语法 lst[数据1&#xff0c;数据2&#xff0c;.....]方法 #mermaid-svg-flVxgVdpSqFaZyrF {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-flVxgVdpSqFaZyrF .error-icon{…

图像版PDF文件OCR识别转换为文本的3款免费工具软件

图像版PDF文件里面都是图片&#xff0c;要先通过OCR技术识别出文本&#xff0c;然后才能进行进一步处理编辑。下面是3个免费的PDF文件OCR识别软件工具&#xff1a; ●简可信PDF批量识别工具 简可信PDF批量识别工具是一款专门用于将PDF文件进行批量OCR&#xff08;光学字符识别…

ObjectiveC-第一部分-基础入门-学习导航

专题地址:MacOS一站式程序开发系列专题 第一部分:基础入门学习导航 OSX-01-Mac OS应用开发概述:简单介绍下MacOS生态、Xcode使用以及使用Xcode创建app的方法OSX-02-Mac OS应用开发系列课程大纲和章节内容设计:介绍下此系列专题的文章内容组织形式以及此系列专题的覆盖内容…

虚假贸易防控:国资委74号文解读,技术人员如何建防?

官.网地址&#xff1a;合合TextIn - 合合信息旗下OCR云服务产品 2023年12月&#xff0c;国资委发布《关于规范中央企业贸易管理严禁各类虚假贸易的通知》&#xff08;国资发财评规[2023]74号&#xff09;&#xff0c;提出“十不准”&#xff0c;严禁央企开展各类虚假贸易业务…

Vue 引入config.js后别的js访问不到window对象下的属性

Vue项目里,我们项目配置的请求服务器地址都是在public里config.js里,如下例: 然后在index.html里引入config.js,如下图: 这里要注意的是,script的src要写上<%= BASE_URL %>,代码如下: <!DOCTYPE html> <html><head><meta charset="…

NCBI 数据下载

网上介绍的那几种直接下载NCBI数据的方法大都下载速度很慢&#xff0c;但是EBI (European Bioinformatics Institute) 下载很快&#xff0c;而且它的数据库和NCBI是共享的&#xff0c;所以我们可以直接从 EBI 下载。 1 、 确定要下载的 SRA 编号&#xff1b; 2 、 EBI (https…

探索点云与KD-Tree配对的方法

比较点云是处理和分析点云数据的关键步骤。然而,由于各个扫描之间固有的差异,无法进行逐点比较。因此,点云分析的第一步也是主要步骤是将点配对以进行有意义的比较。 配对点是区分表面变形和运动分析的关键任务。这个过程不仅为变形分析提供了见解,还使我们能够通过比较不…

华为海思数字芯片设计笔试第五套

声明 下面的题目作答都是自己认为正确的答案&#xff0c;并非官方答案&#xff0c;如果有不同的意见&#xff0c;可以评论区交流。 这些题目也是笔者从各个地方收集的&#xff0c;感觉有些题目答案并不正确&#xff0c;所以在个别题目会给出自己的见解&#xff0c;欢迎大家讨论…