Spring Boot | Spring Boot 整合 “RabbitMQ“ ( 消息中间件 ) 实现

目录:

  • Spring Boot 整合 "RabbitMQ" ( 消息中间件 )实现 :
    • 一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )
      • 1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 AmqpAdmin "定制消息发送组件"
        • (3) 查看 "RabbitMQ消息组件" 的定制效果
        • (4) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (5) "消息消费者" 接收消息
      • 1.2 基于 "配置类" 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "配置类"的方式 "定制消息发送组件"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) "消息消费者" 接收消息
      • 1.3 基于 "注解" 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式定制 "消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台查看 "消费者" 消费信息情况
    • 二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式
      • 2.1 基于 "注解" 的方式 ( 实现 Routing "路由模式"工作模式 )
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式 "定制消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台 查看 "消费者" 消费信息情况
    • 三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式
      • 3.1 基于 "注解" 的方式 ( 实现 Topics"通配符模式"工作模式 )
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式 "定制消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台 查看 "消费者" 消费信息情况

Spring Boot 整合 “RabbitMQ” ( 消息中间件 )实现 :

在这里插入图片描述

作者简介 :一只大皮卡丘,计算机专业学生,正在努力学习、努力敲代码中! 让我们一起继续努力学习!

该文章参考学习教材为:
《Spring Boot企业级开发教程》 黑马程序员 / 编著
文章以课本知识点 + 代码为主线,结合自己看书学习过程中的理解和感悟 ,最终成就了该文章

文章用于本人学习使用 , 同时希望能帮助大家。
欢迎大家点赞👍 收藏⭐ 关注💖哦!!!

(侵权可联系我,进行删除,如果雷同,纯属巧合)


  • Spring Boot 可对 RabbitMQ工作模式 ( 6种工作模式 )进行了 整合,并支持 多种整合方式,包括 : 基于API 的方式 基于配置类的方式 基于注解的方式
  • 下面将 选取常用 Publish/Subscribe ( 发布订阅 )工作模式 Routing ( 路由 )工作模式Topics ( 通配符模式 ) 工作模式 3种工作模式完成在 Spring Boot 项目 中的 消息服务整合实现 ( 整合实现"消息中间件" )。

一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )

  • Spring Boot 整合 RabbitMQ 中间件实现消息服务,主要围绕3个部分的工作进行展开 :
    定制中间件消息发送者发送消息消息消费者接收消息。其中,定制中间件比较麻烦的工作,且必须预先定制
  • 下面我们以 用户注册成功同时 “发送邮件通知”发送短信通知 这一场景为例,分别使用 : 基于API 的方式 基于配置类的方式 基于注解的方式3种 方式实现 Publish/Subscribe ( 发布订阅 )工作模式 整合

1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用

  • 基于API 的方式主要讲的是使用 Spring 框架提供的 API管理类 : AmqpAdmin 定制 消息发送组件,并进行消息发送
  • 这种 定制消息发送组件方式与在 RabbitMQ 可视化界面上通过 对应面板 进行 组件操作实现基本一样,都是通过管理员的身份预先手动声明交换器队列路由键等,然后 “组装消息队列” 供应用程序调用,从而 实现消息服务。下面我们就对这种 基于 API的方式进行 讲解和演示
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>--></project>
    
(2) 使用 AmqpAdmin “定制消息发送组件”
  • 在项目的测试类 : Chapter22ApplicationTests , 在该测试类先引入 AmqpAdmin 管理类 定制 Publish/Subscribe 工作模式 所需的 消息组件代码如下所示 :

    Chapter22ApplicationTests.Java ( 测试类 ) :

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {@Testvoid contextLoads() {}//自动注入 RabbitMQ消息中间件的API操作类: AmqpAdmin@Autowiredprivate AmqpAdmin amqpAdmin;@Testpublic void amqpAdmin() {//1.定义"fanout"类型的交换器amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));//2.定义两个"默认持久化队列" , 分别处理email 和 smsamqpAdmin.declareQueue(new Queue("fanout_queue_email")); //该队列处理"邮件信息"amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));//该队列处理"短信信息"//3.将两个队列分别与"交换器"进行绑定amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));}}
    

    上面的代码中,使用 Spring 框架提供的消息管理组件 AmqpAdmin定制了消息组件。其中定义了一个 fanout 类型交换器 : fanout_exchange ;
    还定义两个消息队列 : fanout_queue_emailfanout_queue_sms,分别用来处理邮件信息短信信息 ;最后 将定义的两个队列分别交换器绑定

(3) 查看 “RabbitMQ消息组件” 的定制效果
  • 执行上述单元测试方法 : amgpAdmin()验证 RabbitMQ 消息组件的定制效果单元测试方法执行成功,可通过 RabbitMQ 可视化管理页面具体操作为: 打开RabbitMQ安装目录中的 sbin目录,双击 rabbitmq-server.bat ,后在浏览器访问访问 : http://localhost:15672 ,输入密码和密码 : guest , 此时 可在可视化界面查询 “RabbitMQ消息组件定制效果 ,如下图所示

    在这里插入图片描述


    在这里插入图片描述

    通过上面图片看出,在 Queues 队列面板页面中,展示有定制消息队列信息与程序中 "定制"消息队列一致。可以单击消息队列名称查看每个队列详情

    通过上述操作可以发现,在管理页面中提供了 消息组件交换器队列的定制功能。在程序中使用 Spring 框架提供的管理员 API组件 AmqpAdmin 定制消息组件和在管理页面上手动定制消息组件本质是一样 的。

(4) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送示例代码如下 ( 在原测试类中加入以下"主要代码") :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;/*** "消息发送者" 发送信息*/@Testpublic void psubPublisher() {User user = new User(); //消息发送者 要发送的"消息内容"user.setId(1);user.setUsername("石头");/*使用 RabbitTemplate中的convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为String exchange : 表示发送信息的"交换器"String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"Object object : 表示要"发送的信息内容"*///执行哪个"交换器" 和 指定给该"交换器"的"消息内容"rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"}}
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决 上述 消息中间件发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
  • 再次执行 psubPublisher( ) 方法,该方法 执行成功后,查看 RabbitMQ可视化管理页面 中的 Queues面板信息,具体如下图所示

    在这里插入图片描述

  • 进入 队列详情页面查看消息 , 如下图所示

    在这里插入图片描述

    上图看出,在 消息队列存储有指定发送消息详情其他参数信息,这与程序指定发送信息完全一致

(5) “消息消费者” 接收消息
  • 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService
    代码如下所示

    RabbitMQService.java

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类/*** Publish/Subscribe 工作模式接收、处理 "邮件业务"** @RabbitListener(queues = "fanout_queue_email") :*  使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。*  */
    @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作public void psubConsumerEmail(Message message) {byte[] body = message.getBody();//转化为字符串String str = new String(body);System.out.println("邮件业务接收到的信息为: "+str);}/*** Publish/Subscribe 工作模式接收、处理 "短信业务"*/
    @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作public void psubConsumerSms(Message message) {byte[] body = message.getBody();//转化为字符串String str = new String(body);System.out.println("短信业务接收到的信息为: "+str);}}
    

    上面的代码中创建了一个 接收处理 RabbitMQ消息业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解监听队列名称为 fanout_queue_emailfanout_queue_sms消息监听 的这 两个 队列是 前面 “指定发送并存储” 消息消息队列

    需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听指定的队列消息存在 ( 目前两个队列都存在消息 ),对应注解方法立即接收并消费队列消息 ( 即 注解对应方法会被调用 )。另外,在接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型Message 类型如果使用与消息类型对应参数接收消息的话,只能够得到 具体的消息体信息如果使用 Object或者 Message 类型参数接收消息的话,还可以获得除了消息体外的消息参数信息 MessageProperties


    此时 成功启动项目后控制台显示的消息消费效果下图所示

    在这里插入图片描述

    从上图可以看出项目启动成功后消息消费者监听” 到 “消息队列” 中存在两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面Queues 面板查看队列消息情况会发现两个队列中存储的消息 已经 被消费如下图所示

    在这里插入图片描述

    至此,一条完整消息发送、消息中间件存储消息消费Publish/Subscribe 工作模式的业务案例 已经实现


    ps

    使用的是开发中常用@RabbitListener注解 监听指定名称队列消息情况这种方式会在监听指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息

1.2 基于 “配置类” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用

  • 基于 配置类方式 主要讲的是使用 Spring Boot 框架提供的 @Configuration 注解 配置类 定制消息发送组件并进行消息发送例子代码如下所示
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>--></project>
    
(2) 使用 "配置类"的方式 “定制消息发送组件”
  • 使用 "配置类"的方式 “定制消息发送组件(Publish/Subscribe 工作模式) 代码如下所示 :

    RabbitMQConfig.Java ( 配置类 ) :

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"/*** 使用基于"配置类"的方式定义"消息发送组件"*///1.定义定义fanout类型的交换器@Beanpublic Exchange fanout_exchange() {//创建一个名为"fanout_exchange" 的 "交换器"return ExchangeBuilder.fanoutExchange("fanout_exchange").build();}//2.定义两个不同名称的消息队列@Beanpublic Queue fanout_queue_email() { //创建消息队列//创建一个名为 "fanout_queue_email" 的 "消息队列"return new Queue("fanout_queue_email");}@Beanpublic Queue fanout_queue_sms() { //创建消息队列//创建一个名为 "fanout_queue_sms" 的 "消息队列"return new Queue("fanout_queue_sms");}//3.将两个不同名称的"消息队列"@Beanpublic Binding bindingEmail() { //将"fanout_queue_email"消息队列 和 "fanout_exchange"交换器 进行"绑定"return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();}@Beanpublic Binding bindingSms() { //将"fanout_queue_sms"消息队列 和 "fanout_exchange"交换器 进行"绑定"return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();}}
    
(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送示例代码如下 ( 在原测试类中加入以下"主要代码") :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;/*** "消息发送者" 发送信息*/@Testpublic void psubPublisher() {User user = new User(); //消息发送者 要发送的"消息内容"user.setId(1);user.setUsername("石头");/*使用 RabbitTemplate中的convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为String exchange : 表示发送信息的"交换器"String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"Object object : 表示要"发送的信息内容"*///执行哪个"交换器" 和 指定给该"交换器"的"消息内容"rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"}}
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决 上述 消息中间件发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
(4) “消息消费者” 接收消息
  • 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService
    代码如下所示

    RabbitMQService.java

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类/*** Publish/Subscribe 工作模式接收、处理 "邮件业务"** @RabbitListener(queues = "fanout_queue_email") :*  使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。*  */
    @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作public void psubConsumerEmail(Message message) {byte[] body = message.getBody();//转化为字符串String str = new String(body);System.out.println("邮件业务接收到的信息为: "+str);}/*** Publish/Subscribe 工作模式接收、处理 "短信业务"*/
    @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作public void psubConsumerSms(Message message) {byte[] body = message.getBody();//转化为字符串String str = new String(body);System.out.println("短信业务接收到的信息为: "+str);}}
    

    上面的代码中创建了一个 接收处理 RabbitMQ消息业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解监听队列名称为 fanout_queue_emailfanout_queue_sms消息监听的这 两个 队列是 前面 “指定发送并存储” 消息消息队列

    需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听指定的队列消息存在 ( 目前两个队列都存在消息 ),对应注解方法会**立即接收并消费队列消息 ( 即注解对应方法会被调用 )。另外,在 接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型Message 类型如果 使用与消息类型对应参数**接收消息的话,只能够得到 具体的消息体信息如果 使用 Object 或者 Message 类型参数 接收消息 的话,还可以获得 除了 消息体外的消息参数信息 MessageProperties


    此时 成功启动项目后控制台显示的消息消费效果下图所示

    在这里插入图片描述

    从上图可以看出项目启动成功后消息消费者监听” 到 “消息队列” 中存在两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面Queues 面板查看队列消息情况会发现两个队列中存储的消息 已经 被消费如下图所示

    在这里插入图片描述

    至此,一条完整消息发送、消息中间件存储消息消费Publish/Subscribe 工作模式的业务案例 已经实现


    ps

    使用的是开发中常用@RabbitListener注解 监听指定名称队列消息情况这种方式会在监听指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息

1.3 基于 “注解” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用

  • 基于注解的方式指的是使用 Spring框架@RabbitListener注解 定制消息发送组件 "消息消费者" 接收信息 , 当然还要结合之前的代码,才能完整实现个功能需求
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

application.properties ( 全局配置文件 ):

#配置RabbitMQ消息中间件的"连接配置"
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
spring.rabbitmq.virtual-host=/

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
<!--		<plugins>-->
<!--			<plugin>-->
<!--				<groupId>org.springframework.boot</groupId>-->
<!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--			</plugin>-->
<!--		</plugins>-->
<!--	</build>--></project>
(2) 使用 “注解” 的方式定制 “消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import com.myh.chapter_24.domain.User;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类/*** 使用基于注解的方式实现消息服务 ( 使用"注解"的方式 ①定制消息组件 ②接收信息)* 1.1 Publish/Subscribe工作模式接收、处理"邮件业务"*/@RabbitListener(bindings = @QueueBinding(value =@Queue("fanout_queue_email"),exchange =@Exchange(value = "fanout_exchange",type = "fanout")))public void psubConsumerEmailAno(User user) {System.out.println("邮件业务接收到消息: "+user);}/**** 1.2 Publish/Subscribe工作模式接收、处理"短信业务"*/@RabbitListener(bindings = @QueueBinding(value =@Queue("fanout_queue_sms"),exchange =@Exchange(value = "fanout_exchange",type = "fanout")))public void psubConsumerSmsAno(User user) {System.out.println("短信业务接收到消息: "+user);}}
    

    上面代码中,使用 @RabbitListener 注解及 其相关属性定制了两个消息组件消费者,这两个消费者都接收实体类 User 并消费。在 @RabbitListener 注解中,bindings 属性用于创建并绑定交换器消息队列组件,需要注意的是,为了能使两个消息组件的消费者接受实体类User,需要我们在 定制交换器将交换器类型 type 设置为 fanout。另外,bindings 属性@QueueBinding 注解除了有 valuetype 属性外,还有key 属性用于定制路由键 : routingKey (当前发布订阅模式不需要)。

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;/*** "消息发送者" 发送信息*/@Testpublic void psubPublisher() {User user = new User(); //消息发送者 要发送的"消息内容"user.setId(1);user.setUsername("石头");/*使用 RabbitTemplate中的convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为String exchange : 表示发送信息的"交换器"String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"Object object : 表示要"发送的信息内容"*///执行哪个"交换器" 和 指定给该"交换器"的"消息内容"rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"}}
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
(4) 控制台查看 “消费者” 消费信息情况
  • 此时 成功启动项目后执行psubPublisher( )方法控制台显示的消息消费效果下图所示

    在这里插入图片描述

  • 至此,在 Spring Boot 中完成了 使用基于 API基于配置类基于注解3 种方式来实现 Publish/Subscribe 工作模式整合讲解 在这 3 种实现消息服务的方式中,

  • 上面讲述过三种配置方式区别
    基于 API 的方式
    相对简单、直观
    ,但容易与业务代码产生 耦合

    基于 配置类方式相对隔离 容易统一管理、符合Spring Boot 框架思想;

    基于 注解的方式 清晰明了方便各自管理,但是也容易与业务代码产生耦合。在实际开发中,使用 基于配置类的方式 和 基于注解的方式定制组件 实现消息服务较为常见。使用 基于 API的方式偶尔使用,具体还需要根据实际情况进行选择

二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式

2.1 基于 “注解” 的方式 ( 实现 Routing "路由模式"工作模式 )

(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>--></project>
    
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类//使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息"/*** 2.1路由模式消息接收、处理error级别日志信息* (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息")*/@RabbitListener(bindings = @QueueBinding(value =@Queue("routing_queue_error"), exchange =@Exchange(value = "routing_exchange", type = "direct"),key = "error_routing_key"))public void routingConsumerError(String message) {System.out.println("接收到error级别日志信息: "+message);}/*** 2.2路由模式消息接收、处理info、error、warning级别日志信息* (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息")*/@RabbitListener(bindings = @QueueBinding(value =@Queue("routing_queue_all"), exchange =@Exchange(value = "routing_exchange", type = "direct"),key = {"error_routing_key","info_routing_key","warning_routing_key"}))public void routingConsumerAll(String message) {System.out.println("接收到info、error、warning级别日志信息: "+message);}}

    上述代码中,在消息业务处理类 : RabbitMQService 中编写了两个用来 处理 Routing 路由模式消息消费者方法,在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制路由模式消息服务组件

    Routing路由模式下的交换器类型 : type属性direct,而且还必须 指定 key 属性
    ( 每个消息队列可以
    映射多个路由键
    但在 Spring Boot 1.X版本中@QueueBinding 中的 key 属性只接收 Spring 类型不接收 Spring [ ] 类型 )。

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter24ApplicationTests.java : ( 项目测试类 )

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter24ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;//Routing工作模式消息发送端 ( "消息发布者" "发送信息" )@Testpublic void routingPublisher() {//发送信息//给指定的"交换器" "发送信息"rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send 错误信息...");}}
    

    上述代码中,通过调用 RabbitTemplateconverAndSend ( String exchange , String routingKey , Object object )方法来 "发送消息"。在 Routing 工作模式下发送消息时,必须指定路由键参数该参数要与消息队列映射路由键保持一致,否则发送的消息将会丢失

    本次示例中使用的是error_routing_key 路由键,根据定制规则,编写的两个消息消费者方法应该都可以正常接收并消费发送端发送消息

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
(4) 控制台 查看 “消费者” 消费信息情况
  • 此时 成功启动项目后执行routingPublisher( )方法控制台显示的消息消费效果下图所示

    在这里插入图片描述


    此时,修改 routingPublisher( )方法中的消息发送参数调整发送info 级别日志信息( 注意同修改 info_routing_key 路由键 ),再次启动 routingPublisher( )方法控制台效果如下图所示

    在这里插入图片描述

    上图所示,控制台打印出使用 info_routing_key 路由键发送 info 级别日志信息,说明只有配置映射 info_routing_key 路由键消息消费者 的方法 消费了消息

三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式

3.1 基于 “注解” 的方式 ( 实现 Topics"通配符模式"工作模式 )

(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

application.properties ( 全局配置文件 ):

#配置RabbitMQ消息中间件的"连接配置"
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
spring.rabbitmq.virtual-host=/

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
<!--		<plugins>-->
<!--			<plugin>-->
<!--				<groupId>org.springframework.boot</groupId>-->
<!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--			</plugin>-->
<!--		</plugins>-->
<!--	</build>--></project>
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类//使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息"/*** 3.1使用"通配符模式"消息接收、进行"邮件业务"订阅处理*   ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" )*/@RabbitListener(bindings = @QueueBinding(value =@Queue("topic_queue_email"),exchange =@Exchange(value = "topic_exchange",type = "topic"),key = "info.#.email.#"))public void topicConsumerEmail(String message) {System.out.println("接收到邮件订阅需求处理信息: "+message);}/*** 3.2使用"通配符模式"消息接收、进行"短信业务"订阅处理*   ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" )*/@RabbitListener(bindings = @QueueBinding(value =@Queue("topic_queue_sms"),exchange =@Exchange(value = "topic_exchange",type = "topic"),key = "info.#.sms.#"))public void topicConsumerSms(String message) {System.out.println("接收到短信订阅需求处理信息: "+message);}}
    

    上述代码中,在消息业务处理类RabbitMQService 中编写了两个处理 Topics 通配符模式消息消费者方法 ( 这两个方法即创建了"消息组件",又创建了"消息消费者接收且消费信息 "),在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制了通配符模式下的 消息组件

    从上述示例可以看出Topics 通配符模式下注解使用方式与 Routing 路由模式使用基本一样,主要是将交换器类型 type 修改为了 topic,还分别使用通配符的样式指定路由键 routingKey

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    

    针对不同的用户订阅需求,使用 RabbitTemplate 模板工具类的 convertAndSend ( Stringexchange,String routingKey,Object object )方法发送不同的消息发送消息时,必须 根据具体需求已经定制路由键通配符 设置 具体的路由键

② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter24ApplicationTests.java : ( 项目测试类 )

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter24ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;//3.Topcis工作模式下的"消息发布者" "发送消息"@Testpublic void topicPublisher() {//(1)只发送"邮件"订阅用户信息 ---(给指定的"交换器" "发送信息")rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message...");//(2)只发送"短信"订阅用户信息rabbitTemplate.convertAndSend("topic_exchange","info.sms","topics send  sms message...");//(3)"邮件"订阅用户 和 "短信"订阅用户 都发送消息rabbitTemplate.convertAndSend("topic_exchange","info.email.sms","topics send email and sms  message...");}}
    
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
(4) 控制台 查看 “消费者” 消费信息情况
  • 执行topicPublisher( ) 方法 中的 步骤(1)邮件订阅用户的消息发送控制台效果下图所示 :

    在这里插入图片描述


    在这里插入图片描述

  • topicPublisher( )方法步骤(1) 进行 注释打开步骤(2) 中只进行 短信订阅用户消息发送方法,并 再次启动该测试方法控制台效果如下图所示 : 在这里插入图片描述


在这里插入图片描述

  • 为了查看 topicPublisher( )方法步骤(3)的效果,我们需要把 步骤(2) 的代码注释步骤(3) 的代码主要进行 邮件短信订阅用户消息发送方法项目重新启动 后的效果如下图所示 :

    在这里插入图片描述


在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/8559.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 MobaXterm 链接 Ubuntu(Windows子系统)

MobaXterm_Personal_22.1 Ubuntu&#xff08;Windows子系统&#xff09;

难定取舍,静观其变

今&#xff08;2024年5月8日&#xff09;天&#xff0c;本“人民体验官”在推广人民日报官方微博文化产品《带着笑意的眼睛&#xff0c;能看见最美的风景》的同时&#xff0c;还要联系4月初至今期间&#xff0c;与隐藏在《麻辣论坛》幕后的那位昵称“800727”者所爆发的一连串&…

BACnet到OPC UA的楼宇自动化系统与生产执行系统(MES)整合

在智能制造的浪潮下&#xff0c;一家位于深圳的精密电子制造企业面临着前所未有的挑战&#xff1a;如何高效地将楼宇自动化系统与生产执行系统&#xff08;MES&#xff09;整合&#xff0c;实现能源管理与生产流程的精细化控制。这家企业的楼宇控制系统使用的是BACnet协议&…

Java 线程池之 ThreadPoolExecutor

Java线程池&#xff0c;特别是ThreadPoolExecutor&#xff0c;是构建高性能、可扩展应用程序的基石之一。它不仅关乎效率&#xff0c;还直接关系到资源管理与系统稳定性。想象一下&#xff0c;如果每来一个请求就创建一个新的线程&#xff0c;服务器怕是很快就要举白旗了。而Th…

【图书推荐】《图神经网络基础、模型与应用实战》

本书目的 详解PyTorch 图神经网络基础理论、模型与十多个应用案例&#xff0c;带领读者掌握图神经网络在自然语言处理、计算机视觉、推荐系统、社交网络4个领域的应用开发方法&#xff0c;丰富读者利用深度学习算法解决实际问题的能力。 本书案例 图卷积网络实现图注意力网络…

Comate,一款基于文心大模型的智能编程助手

一、官网 Baidu Comate官网 二、安装VSCode 如何下载安装VSCode 三、VSCode安装Comate 安装方式1 访问Comate官网点击 立即安装Comate插件 按钮快速安装 安装方式2 访问VSCode市场中的BaiduComate 点击 Install 按钮访问扩展详情界面 2.打开VSCode 3.安装Comate 四、…

先经营好自己,才是成事最坚实的基础!做事要稳!

电影《教父》里有句著名的台词说&#xff1a;花半秒钟就能看透事物本质的人&#xff0c;和花一辈子也看不透事物本质的人&#xff0c;注定是截然不同的命运。而这所谓的“看透本质”&#xff0c;就是事物的底层逻辑。 底层逻辑是一种解决问题的思维模式。底层逻辑越坚固&#x…

中金:如何把握不断轮动的资产“风口”

从比特币到日股&#xff0c;到黄金与铜再到当前的港股&#xff0c;每次超预期大涨后都透支回调。 今年以来资产的“风口”不断轮动&#xff0c;从比特币到日股&#xff0c;到黄金与铜&#xff0c;再到当前的港股&#xff0c;资产仿佛“接力”般交替领先&#xff0c;同时“风口”…

js api part6

正则表达式 正则表达式 &#xff08;Regular Expression&#xff09;是用于 匹配字符串中字符组合 的模式。在 JavaScript中&#xff0c;正则表达式也是对象。通常用来查找、替换那些符合正则表达式的文本&#xff0c;许多语言都支持正则表达式。 正则表达式在 JavaScript中的…

spring框架学习记录(3)

Spring事务 Spring事务简介 事务作用&#xff1a;在数据层保障一系列的数据库操作同成功同失败Spring事务作用&#xff1a;在数据层或业务层保障一系列的数据库操作同成功或同失败 Spring事务角色 事务管理员&#xff1a;发起事务方&#xff0c;在Spring中通常指代业务层开…

AI智能分析视频监控行业的发展趋势和市场发展浅析

监控视频AI智能分析技术的现状呈现出蓬勃发展的态势&#xff0c;这一技术源于计算机视觉和人工智能的研究&#xff0c;旨在将图像与事件描述之间建立映射关系&#xff0c;使计算机能够从视频图像中分辨出目标信息。 在技术上&#xff0c;监控视频AI智能分析技术已经实现了对视…

Ps中 饱和度 和 自然饱和度 的区别?

1.饱和度&#xff08;Saturation&#xff09;&#xff1a;在Photoshop中&#xff0c;饱和度是一个全局性调整&#xff0c;它影响图像中所有颜色的鲜艳程度。当你增加饱和度时&#xff0c;所有的颜色都会变得更浓烈、更鲜艳&#xff1b;相反&#xff0c;减小饱和度会使图像整体变…

小猪APP分发:重塑应用分发市场的创新力量

在移动互联网蓬勃发展的今天&#xff0c;应用分发平台作为连接开发者与用户的桥梁&#xff0c;扮演着至关重要的角色。然而&#xff0c;随着市场的饱和&#xff0c;如何在众多平台中脱颖而出&#xff0c;为开发者提供更宽广的舞台&#xff0c;同时确保用户能够便捷、安全地获取…

程序员必备的7大神器,效率飞起!

我们都知道程序员在工作时&#xff0c;会经常遇到任务繁重的情况&#xff0c;为了提高效率&#xff0c;程序员们也会借助一些软件&#xff0c;那么哪些软件可以帮助程序员们提高工作效率呢&#xff1f; 整理不易&#xff0c;关注一波&#xff01;&#xff01; 1. Xftp 7 Xft…

06-beanFactoryPostProcessor的执行

文章目录 invokeBeanFactoryPostProcessors(beanFactory)invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors())invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);invokeBeanFactoryPostProcessors(regularPostProc…

将ESP工作为AP路由模式并当成服务器

将ESP8266模块通过usb转串口接入电脑 ATCWMODE3 //1.配置成双模ATCIPMUX1 //2.使能多链接ATCIPSERVER1 //3.建立TCPServerATCIPSEND0,4 //4.发送4个字节在链接0通道上 >ATCIPCLOSE0 //5.断开连接通过wifi找到安信可的wifi信号并连接 连接后查看自己的ip地址变为192.168.4.…

Java中next()与nextLine()的区别[不废话,直接讲例子]

在使用牛客进行刷题时&#xff0c;我们很多时候会遇到这样的情况&#xff1a; 区别很简单&#xff0c;如果你要输入用空格或者回车分开的数据如&#xff1a; abc_def_ghi 这三组数据&#xff08; _ 是空格&#xff09; 用hasNext: 执行结果&#xff1a; 如果只用换行符号进行…

6层板学习笔记1

说明:笔记基于6层全志H3消费电子0.65MM间距BGA 目的:掌握各类接口的布局思路和布线,掌握DDR高速存储设计 1、网表的导入是原理图的元件电气连接关系,位号,封装,名称等参数信息的总和 2、原理图文件包含(历史版本记录,功能总框图,电源树,GPIO分配,DDR功能,CPU,US…

Mysql:Before start of result set

解决方法&#xff1a;使用resultSet.getString&#xff08;&#xff09;之前一定要调用resultSet.next() ResultSet resultSet statement1.executeQuery();while (resultSet.next()){String username1 resultSet.getString("username");int id1 resultSet.getInt…

pytorch基础: torch.unbind()

1. torch.unbind 作用 说明&#xff1a;移除指定维后&#xff0c;返回一个元组&#xff0c;包含了沿着指定维切片后的各个切片。 参数&#xff1a; tensor(Tensor) – 输入张量dim(int) – 删除的维度 2. 案例 案例1 x torch.rand(1,80,3,360,360)y x.unbind(dim2)print(&…