从0开始搭建一个生产级SpringBoot2.0.X项目(十)SpringBoot 集成RabbitMQ

前言

最近有个想法想整理一个内容比较完整springboot项目初始化Demo。

SpringBoot集成RabbitMQ

 RabbitMQ中的一些角色:

  • publisher:生产者

  • consumer:消费者

  • exchange个:交换机,负责消息路由

  • queue:队列,存储消息

  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 

junit用于测试。

一、pom引入依赖amqp

        <!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><!--rabbitmq-->

二、application-dev.yaml 增加RabbitMQ相关配置

spring:#RabbitMQ服务器配置,地址账号密码,virtualhost等配置rabbitmq:host: 127.0.0.1port: 5672username: murgpassword: 123456virtual-host: murg-host#队列中没有消息,阻塞等待时间template:receive-timeout: 2000
logging:level:org.springframework.security: debug

三、发布/订阅

RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型。此处只列举发布/订阅模式。

此模式下根据交换机类型又分为三种。

1.Fanout类型:   广播模式    把消息交给所有绑定到交换机的队列

2.Direct类型:    路由模式     把消息交给符合指定routing key 的队列

3Topic类型:     主题模式      把消息交给符合主题通配符的队列

3.1 Fanout类型

在广播模式下,消息发送流程是这样的:

1) 可以有多个队列

2) 每个队列都要绑定到Exchange(交换机)

3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定

4) 交换机把消息发送给绑定过的所有队列

5) 订阅队列的消费者都能拿到消

3.1.1 声明Fanout类型交换机和队列 将交换机和队列绑定在一起

创建配置类FanoutConfig 

声明一个Fanout类型交换机命名为murg.fanout

声明两个Queue队列分别为fanout.queue1和fanout.queue2

分别将两个队列和交换机绑定,后续用于消费消息。

package com.murg.bootdemo.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Fanout 广播模式下*声明交换机和队列*/
@Configuration
public class FanoutConfig {/*** 声明交换机和队列 将交换机和队列绑定在一起* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("murg.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

3.1.2创建消息生产服务

创建消息生产服务MessageProducerService ,注入RabbitTemplate 用于发送消息。

 注意一点是rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列

新增测试广播模式下发送消息方法

package com.murg.bootdemo.rabbitmq;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;/*** Rabbitmq消息生产者*/
@Service
@RequiredArgsConstructor
public class MessageProducerService {private final RabbitTemplate rabbitTemplate;/*** 测试广播模式下发送消息* @param msg*/public void testFanoutExchange(String msg) {// 发送消息//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了//rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列rabbitTemplate.convertAndSend("murg.fanout","",msg);}}

3.13创建消息消费服务

创建消息生产服务MessageConsumerService 

@RabbitListener是 SpringAMQP AMQP提供的注解,用于简化 RabbitMQ 消息监听器的创建。通过在方法上添加 @RabbitListener 注解,可以将方法注册为消息监听器用于处理从 RabbitMQ 中接收到的消息。queues 参数定义队列名字 ,此处创建两个监听 在上述FanoutConfig 中已经将这两个队列和交换机murg.fanout绑定,所有可同时消费murg.fanout交换机的消息。

package com.murg.bootdemo.rabbitmq;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;/*** 消息消费者**/
@Service
@RequiredArgsConstructor
public class MessageConsumerService {/*** 下面是Fanout,广播模式的监听* 通过RabbitListener监听队列名字FanoutConfig 中定义的 fanout.queue1 和fanout.queue2* @param msg*/@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}}

3.14修改测试类进行测试

修改创建项目时生成的 BootdemoApplicationTests.class

增加以下注解

@ActiveProfiles("dev") 

指定运行环境为开发环境
@RunWith(SpringRunner.class)

指定测试类的运行器(Runner)。其主要作用是将Spring的测试支持集成到JUnit测试中,使得在运行JUnit测试时,Spring的上下文可以被正确地加载和配置。

@SpringBootTest(classes={BootdemoApplication.class})

指定启动类

package com.murg.bootdemo;import com.murg.bootdemo.rabbitmq.MessageProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Profile;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest(classes={BootdemoApplication.class})// 指定启动类
public class BootdemoApplicationTests {@AutowiredMessageProducerService messageProducerService;@Testpublic void contextLoads() {System.out.printf("aaaaaaaaaaaaa");}//测试@Testpublic void testFanoutExchange(){String msg = "遍身罗绮者,不是养蚕人";for (int i=0;i<10;i++){messageProducerService.testFanoutExchange(msg);}}}

3.15 增加一个测试方法调用消息生产服务,发送 Fanout类型的消息。运行测试控制台输出结果

两个队列都可消费。 

3.2 Direct类型

3.2.1MessageProducerService生产服务增加方法testDirectExchange

在Direct模型下:
 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
 Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

/*** 测试Direct模式下发送消息* 在Direct模型下:** 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)** 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。** Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息* murg.Direct* @param msg*/public void testDirectExchange(String msg,String rountingkey) {// 发送消息//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了//rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列rabbitTemplate.convertAndSend("murg.direct",rountingkey,msg);}

3.2.2MessageConsumerService消费服务增加方法testDirectExchange

Direct模式的消费监听修改队列和交换机的方式改为注解方式。
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:@Exchange(name = "murg.direct",type = ExchangeTypes.DIRECT)声明交换机名字及类型
通过key的值声明接收不同路由的消息

direct.queue1 消费 rountingkey为“蚕妇”的消息

direct.queue2 消费 rountingkey为“自京赴奉先县咏怀五百字”的消息

    /**** 下面是rountingKey 路由key 模式的消费监听* 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。** 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:**/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),//定义队列名字 direct.queue1exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型key = {"蚕妇"} //指定消费rountingkey))public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),//定义队列名字 direct.queue2exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型key = {"自京赴奉先县咏怀五百字"}//指定消费rountingkey))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");}

3.2.3测试类增加测试方法

    @Testpublic void testDirectExchange() throws InterruptedException {String msg1 = "遍身罗绮者,不是养蚕人";String key1 ="蚕妇";String msg2 = "朱门酒肉臭,路有冻死骨";String key2 ="自京赴奉先县咏怀五百字";for (int i=0;i<10;i++){if (i % 2 == 0){messageProducerService.testDirectExchange(msg2,key2);Thread.sleep(1000);}else {messageProducerService.testDirectExchange(msg1,key1);Thread.sleep(1000);}}}

调用消息生产服务,发送 Direct类型的消息。运行测试控制台输出结果

3.3 Topic类型

3.3.1MessageProducerService生产服务增加方法testTopicExchange

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
demo.#:能够匹配demo.spu.insert 或者 demo.spu    #.demo写法也可以
demo.*:只能匹配demo.spu

  public void testTopicExchange(String msg, String key) {rabbitTemplate.convertAndSend("murg.topic",key,msg);}

3.3.2MessageConsumerService消费服务增加方法testDirectExchange

@Exchange(name = "murg.topic",type = ExchangeTypes.TOPIC)声明交换机名字及类型
通过key的值声明接收不同路由的消息

topic.queue1 消费 rountingkey为“罗隐.*”的消息

topic.queue2 消费 rountingkey为“#.贫女”的消息

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),//定义队列名字 topic.queue1exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型key = {"罗隐.*"} //指定消费rountingkey))public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),//定义队列名字 topic.queue2exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型key = {"#.贫女"}//指定消费rountingkey))public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");}

3.3.3测试类增加测试方法 

    @Testpublic void testTopicExchange() throws InterruptedException {String msg = "采得百花成蜜后,为谁辛苦为谁甜";String key ="罗隐.蜂";String msg2 = "苦恨年年压金线,为他人作嫁衣裳";String key2 ="秦韬玉.贫女";for (int i=0;i<10;i++){if (i % 2 == 0){messageProducerService.testTopicExchange(msg,key);Thread.sleep(1000);}else {messageProducerService.testTopicExchange(msg2,key2);Thread.sleep(1000);}}}

调用消息生产服务,发送 Topic类型的消息。运行测试控制台输出结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/59906.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue 使用腾讯地图

项目要求web端用若依框架引用腾讯地图 &#xff0c;搜了些配合官网完成后&#xff0c;记录中间的曲折&#xff0c; 一、引用vue web端引用腾讯地图 注册腾讯地图appkey WebService API | 腾讯位置服务 注意点 这是一个Vue集成腾讯地图的demo 项目中需要在index.html上…

牧神记开分9.7,2024新国漫巅峰出现了

现在国漫越来越卷了&#xff0c;卷播放量也卷评分。最近&#xff0c;b站上线不久的国漫《牧神记》开分9.7&#xff0c;口碑还是相当不错的&#xff0c;已经和《凡人修仙传》评分齐平。这部国漫仅仅播出4集&#xff0c;为什么就能获得这么高的评分呢&#xff1f;下面就一起来看看…

了解GPT大模型,读这本书就够了!(文末送书)

小异最近发现&#xff0c;当国外OpenAI的权力的游戏反转再反转的时候&#xff0c;国内的AI创业者们除了吃瓜之外也没闲着。 有很多程序员正在AIGC赛道中默默发财&#xff0c;有通过短视频做内容爆火&#xff0c;接广告的&#xff1b;有卖AI解决方案的&#xff1b;有卖AI课程的…

〔 MySQL 〕数据库基础

1. 数据库基础&#xff08;重点&#xff09; 1.1 什么是数据库 存储数据用文件就可以了&#xff0c;为什么还要弄个数据库?文件保存数据有以下几个缺点&#xff1a; ● 文件的安全性问题 ● 文件不利于数据查询和管理 ● 文件不利于存储海量数据 …

【软件工程】耦合

耦合性指软件结构中模块相互紧密连接的紧密程度。 耦合性由高到低分别为&#xff1a;内容耦合、公共耦合、外部耦合、控制耦合、标记耦合、数据耦合、非直接耦合。 1.内容耦合&#xff1a; 一个模块直接使用或修改另一个模块的内部数据或逻辑。 例如&#xff1a;一个函数直…

关于C++友元函数的优缺点和应用场景!

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于C 友元函数的相关内容&#xff01; 关于…

笔记本怎么开启TPM2.0_笔记本开启TPM2.0教程(不同笔记本开启tpm2.0方法)

在win11最低要求是提示&#xff0c;电脑必须满足 TPM 2.0&#xff0c;并开需要开启TPM 才能正常安装windows11系统&#xff0c;有很多笔记本的用户问我&#xff0c;笔记本怎么开启tpm功能呢&#xff1f;下面小编就给大家详细介绍一下笔记本开启tpm功能的方法。 如何确认你笔记本…

HTML5+css3(浮动,浮动的相关属性,float,解决浮动的塌陷问题,clear,overflow,给父亲盒子加高度,伪元素)

浮动的相关属性 以下使浮动的常用属性值&#xff1a; float&#xff1a; 设置浮动 以下属性&#xff1a; left : 设置左浮动 right : 设置右浮动 none &#xff1a;不浮动&#xff0c;默认值clear 清除浮动 清除前面兄弟元素浮动元素的响应 以下属性&#xff1a; left &…

Fastjson反序列化漏洞部署复现1.2.24版本反序列RCE笔记

环境部署 docker容器地址&#xff1a;https://github.com/vulhub/vulhub/tree/master/fastjson/1.2.24-rce 启动docker环境 docker容器每次一启动就报错 根据docker log [容器id] 查看报错日志发现有如下报错 library initialization failed - unable to allocate file desc…

PostgreSQL pg-xact(clog)目录文件缺失处理

一、 背景 前些天晚上突然收到业务反馈&#xff0c;查询DB中的一个表报错 Could not open file "pg-xact/005E": No such file or directory. 两眼一黑难道是文件损坏了...登录查看DB日志&#xff0c;还好没有其他报错&#xff0c;业务也反馈只有这一个表在从库查询报…

【Python】项目结构

【Python】项目结构 前言前置知识Python 的基本项目结构int main() 与 def main() 的区别举例&#xff1a;基于 KNN 的 OpenCV 数字识别的项目结构 前言 本文总结了 Python 项目结构的知识&#xff0c;规范项目结构能使得项目开发过程高效流畅&#xff0c;提升代码可读性、团队…

arcgis pro 学习笔记

二维三维集合在一起&#xff0c;与arcgis不同 一、首次使用&#xff0c;几个基本设置 1.选项——常规里面设置自动保存时间 2.新建工程文件&#xff0c;会自动加载地图&#xff0c;可以在选项里面设置为无&#xff0c;以提高启动效率。 3.设置缓存位置&#xff0c;可勾选每次…

OpenCV视觉分析之目标跟踪(11)计算两个图像之间的最佳变换矩阵函数findTransformECC的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 根据 ECC 标准 78找到两幅图像之间的几何变换&#xff08;warp&#xff09;。 该函数根据 ECC 标准 ([78]) 估计最优变换&#xff08;warpMatri…

.vue文件中定义变量和在引用的.ts文件中定义变量的区别

在 Vue 3 项目中&#xff0c;你可以在 .vue 文件和单独的 .ts 文件中定义变量。这两种方式有一些关键的区别&#xff1a; 在 .vue 文件中定义变量 局部作用域&#xff1a; 在 .vue 文件的 <script setup> 或 <script> 标签中定义的变量&#xff0c;它们的作用域仅限…

开源项目工具:LeanTween - 为Unity 3D打造的高效缓动引擎详解(比较麻烦的API版)之二———补间动画控制API系列

1.文档中的cancel,resume,pause LeanTween.cancel ( gameObject:GameObject id:int )LeanTween.cancel ( ltRect:LTRect id:int )LeanTween.cancel ( gameObject:GameObject )LeanTween.pause ( gameObject:GameObject )LeanTween.resume ( id:int )LeanTween.resume ( game…

Redis 典型应用 - 缓存(cache)

一、什么是缓存 缓存(cache)是计算机中的⼀个经典的概念.在很多场景中都会涉及到. 核⼼思路就是把⼀些常⽤的数据放到触⼿可及(访问速度更快)的地⽅,⽅便随时读取. 这⾥所说的"触⼿可及"是个相对的概念. 对于硬件的访问速度来说,通常情况下: CPU寄存器>内存>…

第十六章 TCP 客户端 服务器通信

文章目录 第十六章 TCP 客户端/服务器通信TCP 客户端/服务器通信TCP 连接概述TCP 设备的 OPEN 命令 第十六章 TCP 客户端/服务器通信 TCP 客户端/服务器通信 本章介绍如何使用 TCP/IP 在 IRIS 数据平台进程之间设置远程通信。 IRIS 支持两种互联网协议 (IP)&#xff1a;TCP …

【数学二】线性代数-矩阵-初等变换、初等矩阵

考试要求 1、理解矩阵的概念,了解单位矩阵、数量矩阵、对角矩阵、三角矩阵、对称矩阵、反对称矩阵和正交矩阵以及它们的性质. 2、掌握矩阵的线性运算、乘法、转置以及它们的运算规律,了解方阵的幂与方阵乘积的行列式的性质. 3、理解逆矩阵的概念,掌握逆矩阵的性质以及矩阵可…

DevCheck Pro手机硬件检测工具v5.33

前言 DevCheck Pro是一款手机硬件和操作系统信息检测查看工具&#xff0c;该软件的功能非常强大&#xff0c;为用户提供了系统、硬件、应用程序、相机、网络、电池等一系列信息查看功能 安装环境 [名称]&#xff1a;DevCheckPro [版本]&#xff1a;5.33 [大小]&a…

教程:FFmpeg结合GPU实现720p至4K视频转换

将一个 720p 的视频放大编码到 4K&#xff0c;这样的视频处理在很多业务场景中都会用到。很多视频社交、短视频、视频点播等应用&#xff0c;都会需要通过服务器来处理大量的视频编辑需求。 本文我们会探讨一下做这样的视频处理&#xff0c;最低的 GPU 指标应该是多少。利用开源…