Spring RabbitMQ那些事(1-交换机配置消息发送订阅实操)

这里写目录标题

  • 一、序言
  • 二、配置文件application.yml
  • 三、RabbitMQ交换机和队列配置
    • 1、定义4个队列
    • 2、定义Fanout交换机和队列绑定关系
    • 2、定义Direct交换机和队列绑定关系
    • 3、定义Topic交换机和队列绑定关系
    • 4、定义Header交换机和队列绑定关系
  • 四、RabbitMQ消费者配置
  • 五、RabbitMQ生产者
  • 六、测试用例
    • 1、发送到FanoutExchage
    • 2、发送到DirectExchage
    • 3、发送到TopicExchange
    • 4、发动到HeadersExchage
  • 七、结语

一、序言

在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。


二、配置文件application.yml

我们先上应用启动配置文件application.yml,如下:

server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /listener:type: simplesimple:acknowledge-mode: autoconcurrency: 5max-concurrency: 20prefetch: 5

备注:这里我们指定了RabbitListenerContainerFactory的类型为SimpleRabbitListenerContainerFactory,并且指定消息确认模式为自动确认

三、RabbitMQ交换机和队列配置

Spring官方提供了一套 流式API 来定义队列交换机绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。

1、定义4个队列

/*** 定义4个队列*/
@Configuration
protected static class QueueConfig {@Beanpublic Queue queue1() {return QueueBuilder.durable("queue-1").build();}@Beanpublic Queue queue2() {return QueueBuilder.durable("queue-2").build();}@Beanpublic Queue queue3() {return QueueBuilder.durable("queue-3").build();}@Beanpublic Queue queue4() {return QueueBuilder.durable("queue-4").build();}
}

2、定义Fanout交换机和队列绑定关系

/*** 定义Fanout交换机和对应的绑定关系*/
@Configuration
protected static class FanoutExchangeBindingConfig {@Beanpublic FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("fanout-exchange").build();}/*** 定义多个Fanout交换机和队列的绑定关系* @param fanoutExchange* @param queue1* @param queue2* @param queue3* @param queue4* @return*/@Beanpublic Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);}}

备注:这里我们将4个队列绑定到了名为fanout-exchange的交换机上。

2、定义Direct交换机和队列绑定关系

@Configuration
protected static class DirectExchangeBindingConfig {@Beanpublic DirectExchange directExchange() {return ExchangeBuilder.directExchange("direct-exchange").build();}@Beanpublic Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");}
}

备注:这里我们定义了名为direct-exchange的交换机并通过路由keyqueue3-route-keyqueue-3绑定到了该交换机上。


3、定义Topic交换机和队列绑定关系

@Configuration
protected static class TopicExchangeBindingConfig {@Beanpublic TopicExchange topicExchange() {return ExchangeBuilder.topicExchange("topic-exchange").build();}@Beanpublic Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");return new Declarables(queue1Binding, queue2Binding);}
}

这里我们定义了名为topic-exchange类型的交换机,该类型交换机支持路由key通配符匹配,*代表一个任意字符,#代表一个或多个任意字符。

备注:

  1. 通过路由keycom.order.*queue-1绑定到了该交换机上。
  2. 通过路由key com.#queue-2也绑定到了该交换机上。

4、定义Header交换机和队列绑定关系

@Configuration
protected static class HeaderExchangeBinding {@Beanpublic HeadersExchange headersExchange() {return ExchangeBuilder.headersExchange("headers-exchange").build();}@Beanpublic Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");}
}

备注:这里我们定义了名为headers-exchange类型的交换机,并通过参数function=loggingqueue-4绑定到了该交换机上。


四、RabbitMQ消费者配置

Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "queue-1")public void handleMsgFromQueue1(String msg) {log.info("Message received from queue-1, message body: {}", msg);}@RabbitListener(queues = "queue-2")public void handleMsgFromQueue2(String msg) {log.info("Message received from queue-2, message body: {}", msg);}@RabbitListener(queues = "queue-3")public void handleMsgFromQueue3(String msg) {log.info("Message received from queue-3, message body: {}", msg);}@RabbitListener(queues = "queue-4")public void handleMsgFromQueue4(String msg) {log.info("Message received from queue-4, message body: {}", msg);}
}

备注:这里我们分别定义了4个消费者,分别用来接受4个队列的消息。

五、RabbitMQ生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendMsgToFanoutExchange(String body) {log.info("开始发送消息到fanout-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);}public void sendMsgToDirectExchange(String body) {log.info("开始发送消息到direct-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("direct-exchange", "queue3-route-key", message);}public void sendMsgToTopicExchange(String routingKey, String body) {log.info("开始发送消息到topic-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("topic-exchange", routingKey, message);}public void sendMsgToHeadersExchange(String body) {log.info("开始发送消息到headers-exchange, 消息体:{}", body);MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);}}

六、测试用例

这里写了个简单的Controller用来测试,如下:

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/fanout")public ResponseEntity<String> sendMsgToFanoutExchange(String body) {rabbitMqProducer.sendMsgToFanoutExchange(body);return ResponseEntity.ok("广播消息发送成功");}@RequestMapping("/exchange/direct")public ResponseEntity<String> sendMsgToDirectExchange(String body) {rabbitMqProducer.sendMsgToDirectExchange(body);return ResponseEntity.ok("消息发送到Direct交换成功");}@RequestMapping("/exchange/topic")public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);return ResponseEntity.ok("消息发送到Topic交换机成功");}@RequestMapping("/exchange/headers")public ResponseEntity<String> sendMsgToHeadersExchange(String body) {rabbitMqProducer.sendMsgToHeadersExchange(body);return ResponseEntity.ok("消息发送到Headers交换机成功");}}

1、发送到FanoutExchage

直接访问http://localhost:8080/exchange/fanout?body=hello,可以看到该消息广播到了4个队列上。

2023-11-07 17:41:12.959  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到fanout-exchange, 消息体:hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

2、发送到DirectExchage

访问http://localhost:8080/exchange/direct?body=hello,可以看到消息通过路由keyqueue3-route-key发送到了queue-3上。

2023-11-07 17:43:26.804  INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到direct-exchange, 消息体:hello
2023-11-07 17:43:26.822  INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello

3、发送到TopicExchange

访问http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create,路由key为 com.order.create的消息分别发送到了queue-1queue-2上。

2023-11-07 17:44:45.301  INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到topic-exchange, 消息体:hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

4、发动到HeadersExchage

访问http://localhost:8080/exchange/headers?body=hello,消息通过头部信息function=logging发送到了headers-exchange上。

2023-11-07 17:47:21.736  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到headers-exchange, 消息体:hello
2023-11-07 17:47:21.749  INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello

七、结语

下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/134849.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【GEE】8、Google 地球引擎中的时间序列分析【时间序列】

1简介 在本模块中&#xff0c;我们将讨论以下概念&#xff1a; 处理海洋的遥感图像。 从图像时间序列创建视频。 GEE 中的时间序列分析。 向图形用户界面添加基本元素。 2背景 深水地平线漏油事件被认为是有史以来最大的海上意外漏油事件。该井释放了超过 490 万桶石油&am…

Sysmon 日志监控

系统监视器 &#xff08;Sysmon&#xff09; 是一个 Windows 日志记录加载项&#xff0c;它提供精细的日志记录功能并捕获默认情况下通常不记录的安全事件。它提供有关进程创建、网络连接、文件系统更改等的信息。分析 Sysmon 日志对于发现恶意活动和安全威胁至关重要。 在不断…

基于单片机的甲醛检测器设计

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 技术交流认准下方 CSDN 官方提供的联系方式 文章目录 概要 一、设计的主要内容二、系统硬件设计三、软件设计4.1 程序结构流程图原理图 四、结论五、 文章目录 概要 本文将要提…

海思SD3403/SS928开发板 开发记录二: 设置网络 telnet连接开发板

1.设置网络 设置桥接网络 并修改虚拟机IP网段 问题1.参照前一篇博客 2.ping 测试 主机 虚拟机 板端 相互通信 3.telnet 登录板端

什么是超级托斯卡纳葡萄酒?

超级托斯卡纳葡萄酒通常被认为是在托斯卡纳用国际葡萄品种制成的葡萄酒&#xff0c;如赤霞珠、品丽珠或梅洛&#xff0c;而不是传统的托斯卡纳葡萄桑娇维塞。来自云仓酒庄品牌雷盛红酒分享这些葡萄酒可能包含一些桑娇维塞&#xff0c;但这通常不是混合中的主要葡萄。这些大胆的…

SQL Server SSIS ETL job执行相关操作

创建SSIS项目 Excel导入SQL Server 构建Excel源 配置Excel源信息 配置SQL Server目标 双击“ADO NET目标” job执行 新建job 右键“SQL Server代理”的“作业”&#xff0c;点击“新建作业”&#xff0c;弹出“新建作业”的选项页 首先是“常规”选项页&#xff0c;…

四川竹哲电子商务有限公司是真的还是假的?

随着数字科技的飞速发展&#xff0c;电子商务的形式也在不断变化。近年来&#xff0c;抖音带货服务成为了电商领域的新风向。许多公司纷纷涌入这一市场&#xff0c;希望通过这种新型的商业模式获取更多的商业机会。在这其中&#xff0c;四川竹哲电子商务有限公司以其卓越的服务…

Redis Desktop Manager安装和使用

Redis Desktop Manager&#xff08;RDM&#xff09;是一款用于管理和操作Redis数据库的图形化界面工具。提供了简单易用的界面&#xff0c;使用户能够方便地执行各种Redis数据库操作&#xff0c;并且支持多个Redis服务器的连接RDM功能介绍&#xff1a;1.连接管理&#xff1a;RD…

哪款手机便签软件支持存储录音文件并支持转文字?

手机便签类软件带有存储录音转文字功能是比较实用的&#xff0c;很多人通常会整理很多录音类型的文件&#xff0c;录音文件整合在一起后&#xff0c;后续有需要可以逐条点开播放收听。尤其是在工作中&#xff0c;当领导说一些重点时&#xff0c;大家无法借助灵活的大脑来成功的…

LeetCode148.排序链表

看完题目的想法是&#xff0c;直接把所有节点的值都遍历出来放进优先队列里面&#xff0c;然后从头节点遍历一次&#xff0c;每次把优先队列poll()的值赋给节点的val即可&#xff0c;说实话&#xff0c;想完还觉得估计有问题怎么可能这么简单&#xff0c;但是不管了&#xff0c…

人工智能在汽车业应用的五项挑战

在汽车行业扩展人工智能应用时需要注意的问题 随着更多企业投资于汽车人工智能 (AI) 解决方案&#xff0c;我们也愈加接近大规模部署 5 级全自动驾驶汽车。汽车行业的组织如果希望加入这场 AI 带来的颠覆性变革&#xff0c;就应该已提前考虑如何成功和大规模地将人工智能部署到…

MoveFunsDAO 星航计划|从Move入门Web3与深入实践「公益课堂」

Move 语言作为最安全的编程语言之一&#xff0c;在资产的安全性和保护方面有着显著优势&#xff0c;被寄予引领 Web3 世界的全新叙事的厚望。 随着 Sui 在今年五月主网上线&#xff0c;它为 Move 生态带来一股新的浪潮。上线以来&#xff0c;Sui 公链的开发活跃度持续数月位居…

【Qt之绘制兔纸】

效果 代码 class drawRabbit: public QWidget { public:drawRabbit(QWidget *parent nullptr) : QWidget(parent) {}private:void paintEvent(QPaintEvent *event) {QPainter painter(this);painter.setRenderHint(QPainter::Antialiasing, true);// 绘制兔子的耳朵painter.s…

Read-Easy Excel源码解析(一)

Read&Write-Easy Excel 当我们需要导入大Excel时候&#xff0c;用POI会内存溢出&#xff0c;这时候我们用EasyExcel来解决&#xff0c;它底层采用的是SAX&#xff08;Simple Api for Xml&#xff09;事件驱动&#xff0c;解析xml的方式来解析excel文件。 首先我们看他的re…

十四、W5100S/W5500+RP2040树莓派Pico<NetBIOS>

文章目录 1 前言2 简介2 .1 什么是NetBIOS&#xff1f;2.2 NetBIOS的优点2.3 NetBIOS工作原理2.4 NetBIOS应用场景 3 WIZnet以太网芯片4 NetBIOS网络设置示例概述以及使用4.1 流程图4.2 准备工作核心4.3 连接方式4.4 主要代码概述4.5 结果演示 5 注意事项6 相关链接 1 前言 随着…

响应式成人高考自考教育机构网站模板源码下载带后台

模板信息&#xff1a; 模板编号&#xff1a;30558 模板编码&#xff1a;UTF8 模板分类&#xff1a;学校、教育、培训、科研 适合行业&#xff1a;教育机构类企业 模板介绍&#xff1a; 本模板自带eyoucms内核&#xff0c;无需再下载eyou系统&#xff0c;原创设计、手工书写DIVC…

多级缓存之JVM进程缓存

1.什么是多级缓存 传统的缓存策略一般是请求到达Tomcat后&#xff0c;先查询Redis&#xff0c;如果未命中则查询数据库&#xff0c;如图&#xff1a; 存在下面的问题&#xff1a; 请求要经过Tomcat处理&#xff0c;Tomcat的性能成为整个系统的瓶颈 Redis缓存失效时&#xff0…

【Linux】了解文件的inode元信息,以及日志分析

目录 一、inode表结构&#xff0c;以及元信息 1、了解inode信息有哪些 2、关于inode表的说明 Linux中访问文件的过程&#xff1a; 3、硬连接与软连接的区别&#xff0c;&#xff08;请看前面&#xff0c;写过的&#xff09; 二、文件系统的备份与恢复 三、几种常见的日志…

idea 模板参数注释 {@link}

1. 新增组 2. 设置方法注释及变量 增加模板文本 ** * $param$ * return {link $return$} */3. 设置变量表达式 勾选跳过param 参数表达式 groovyScript("def result ;def params \"${_1}\".replaceAll([\\\\[|\\\\]|\\\\s], ).split(,).toList();def param…

小白学爬虫:手机app分享商品短连接获取淘宝商品链接接口|淘宝淘口令接口|淘宝真实商品链接接口|淘宝商品详情接口

通过手机APP分享的商品短链接&#xff0c;我们可以调用相应的接口来获取淘口令真实URL&#xff0c;进而获取到PC端的商品链接及商品ID。具体步骤如下&#xff1a; 1、通过手机APP分享至PC端的短链接&#xff0c;调用“item_password”接口。 2、该接口将返回淘口令真实URL。 3…