207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

目录

  • ★ 发送消息
  • ★ 创建队列的两种方式
  • 代码演示
    • 需求1:发送消息
      • 1、ContentUtil 先定义常量
      • 2、RabbitMQConfig 创建队列的两种方式之一:
        • 配置式:
          • 问题:
      • 3、MessageService 编写逻辑
      • PublishController 控制器
      • application.properties 配置属性
      • 测试:消息发送
  • ★ 接收消息
    • 代码演示:
      • 测试: 消息接收
  • ★ 定制监听器容器工厂
  • 完整代码:
    • application.properties RabbitMQ的连接等属性配置
    • ContentUtil 常量工具类
    • RabbitMQConfig 配置式创建消息队列
    • MessageService 发送消息的业务代码
    • PublishController.java 发送消息的控制层
    • MyRabbitMQListener 监听器,监听消息队列
    • pom.xml


在这里插入图片描述

★ 发送消息

- Spring Boot可以将AmqpAdmin和AmqpTemplate注入任何其他组件,接下来该组件即可通过AmqpAdmin来管理Exchange、队列和绑定,还可通过AmqpTemplate来发送消息。 - Spring Boot还会自动配置一个RabbitMessagingTemplate Bean(RabbitAutoConfiguration负责配置),如果想使用它来发送、接收消息,可使用RabbitMessagingTemplate代替上面的AmqpTemplate,两个Template的注入方式完全相同。

在这里插入图片描述

★ 创建队列的两种方式

 方式一(编程式):在程序中通过AmqpAdmin创建队列。方式二(配置式):在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。

代码演示

需求1:发送消息

1、ContentUtil 先定义常量

在这里插入图片描述

2、RabbitMQConfig 创建队列的两种方式之一:

配置式:

在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。
就是在配置类中创建一个生成消息队列的@Bean。

问题:

用 @Configuration 注解声明为配置类,但是项目启动的时候没有自动生成这个队列。
据了解是因为RabbitMQ使用了懒加载,大概是没有消费者监听这个队列,就没有创建。
但是当我写后面的代码后,这个消息队列就生成了,但是并没有消费者去监听这个队列。
这有点想不通。
不知道后面是哪里的代码让这个配置类能成功声明这个消息队列出来。
在这里插入图片描述
水落石出:
经过测试:
在下面的MessageService 这个类中,依赖注入了 AmqpAdmin 和 AmqpTemplate 这两个对象,当我们通过这两个对象去声明队列、Exchange 和绑定的时候,配置类中的创建消息队列的bean就能成功创建队列。
这张图结合下面的 MessageService 中的代码就可得知:
这是依赖注入 AmqpAdmin 和 AmqpTemplate 这两个对象的有参构造器中声明的。
在这里插入图片描述

3、MessageService 编写逻辑

声明Exchange 、 消息队列 、 Exchange和消息队列的绑定、发送消息的方法等
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

PublishController 控制器

在这里插入图片描述

application.properties 配置属性

在这里插入图片描述
在这里插入图片描述

测试:消息发送

成功生成队列
在这里插入图片描述
发送消息测试
在这里插入图片描述
在这里插入图片描述




★ 接收消息

@RabbitListener 注解修饰的方法将被注册为消息监听器方法。

 【备注】:该注解可通过queues属性指定它要监听的、已有的消息队列,它也可使用queuesToDeclare来声明队列,并监听该队列。- 如果没有显式配置监听器容器工厂(RabbitListenerContainerFactory),Spring Boot会在容器中自动配置一个SimpleRabbitListenerContainerFactory Bean作为监听器容器工厂,如果希望使用DirectRabbitListenerContainerFactory,可在application.properties文件中添加如下配置:spring.rabbitmq.listener.type=direct▲ 如果在容器中配置了MessageRecoverer或MessageConverter,它们会被自动关联到默认的监听器容器工厂。



代码演示:

创建个消息队列的监听器就可以了。

@RabbitListener 注解修饰的方法将被注册为消息监听器方法。

该注解可通过queues属性指定它要监听的、已有的消息队列
它也可使用queuesToDeclare来声明队列,并监听该队列
还可以用 bindings 进行 Exchange和queue的绑定操作。
在这里插入图片描述

测试: 消息接收

在这里插入图片描述
发送消息和监听消息
在这里插入图片描述




★ 定制监听器容器工厂

▲ 如果要定义更多的监听器容器工厂或覆盖默认的监听器容器工厂,可通过Spring Boot提供的SimpleRabbitListenerContainerFactoryConfigurer
或DirectRabbitListenerContainerFactoryConfigurer来实现,它们可对SimpleRabbitListenerContainerFactory
或DirectRabbitListenerContainerFactory进行与自动配置相同的设置。 ▲ 有了自定义的监听器容器工厂之后,可通过@RabbitListener注解的containerFactory属性来指定使用自定义的监听器容器工厂,
例如如下注解代码:@RabbitListener(queues = "myQueue1", containerFactory="myFactory")

完整代码:

application.properties RabbitMQ的连接等属性配置

# 配置连接 RabbitMQ 的基本信息------------------------------------------------------
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 下面属性可配置多个以逗号隔开的连接地址,一旦配置了该属性,host 和 port 属性就会被忽略
# spring.rabbitmq.addresses=
spring.rabbitmq.username=ljh
spring.rabbitmq.password=123456
# 连接虚拟主机
spring.rabbitmq.virtual-host=my-vhost01# 配置RabbitMQ的缓存相关信息--------------------------------------------------------
# 指定缓存 connection ,还是缓存 channel
spring.rabbitmq.cache.connection.mode=channel
# 指定可以缓存多少个 Channel
spring.rabbitmq.cache.channel.size=50
# 如果选择的缓存模式是 connection , 那么就可以配置如下属性
# spring.rabbitmq.cache.connection.size=15# 配置 和 RabbitTemplate 相关的属性--------------------------------------------------
# 指定 RabbitTemplate 发送消息失败时会重新尝试
spring.rabbitmq.template.retry.enabled=true
# RabbitTemplate 发送消息失败后每隔1秒重新尝试发送消息
spring.rabbitmq.template.retry.initial-interval=1s
# RabbitTemplate 发送消息失败时,最多尝试重新发送消息的次数
spring.rabbitmq.template.retry.max-attempts=5
# 设置每次尝试重新发送消息的时间间隔是一个等比数列:1s, 2s, 4s, 8s, 16s
# 第一次等1s后尝试,第二次等2s后尝试,第三次等4s后尝试重新发送消息......
spring.rabbitmq.template.retry.multiplier=2
# 指定发送消息时默认的Exchange名
spring.rabbitmq.template.exchange=""
# 指定发送消息时默认的路由key
spring.rabbitmq.template.routing-key="test"# 配置和消息监听器的容器工厂相关的属性--------------------------------------------------
# 指定监听器容器工厂的类型
spring.rabbitmq.listener.type=simple
# 指定消息的确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 指定获取消息失败时,是否重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 发送消息失败时,最多尝试重新发送消息的次数
spring.rabbitmq.listener.simple.retry.max-attempts=2
# 发送消息失败后每隔1秒重新尝试发送消息
spring.rabbitmq.listener.simple.retry.initial-interval=1s

ContentUtil 常量工具类

package cn.ljh.app.rabbitmq.util;//常量
public class ContentUtil
{//定义Exchange的常量-----fanout:扇形,就是广播类型public static final String EXCHANGE_NAME = "boot.fanout";//消息队列数组public static final String[] QUEUE_NAMES =new String[] {"queue_boot_01","queue_boot_02","queue_boot_03"};}

RabbitMQConfig 配置式创建消息队列

package cn.ljh.app.rabbitmq.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;//配置式:在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列
//声明这个类为配置类
@Configuration
public class RabbitMQConfig
{//用配置式的方式在RabbitMQ中定义队列@Beanpublic Queue myQueue(){//在容器中配置一个 Queue Bean,Spring 就会为它在 RabbitMQ 中自动创建对应的 Queuereturn new Queue("queue_boot",   /* Queue 消息队列名 */true,         /* 是否是持久的消息队列 */false,       /* 是否是独占的消息队列,独占就是是否只允许该消息消费者消费该队列的消息 */false,     /* 是否在没有消息的时候自动删除消息队列 */null       /* 额外的一些消息队列的参数 */);}
}

MessageService 发送消息的业务代码

声明Exchange 、Queue ,Exchange 绑定Queue,发送消息代码

package cn.ljh.app.rabbitmq.service;import cn.ljh.app.rabbitmq.util.ContentUtil;
import org.springframework.amqp.core.*;
import org.springframework.stereotype.Service;//业务逻辑:声明Exchange 和 Queue 消息队列,发送消息的方法
@Service
public class MessageService
{//AmqpAdmin来管理Exchange、队列和绑定private final AmqpAdmin amqpAdmin;//AmqpTemplate来发送消息private final AmqpTemplate amqpTemplate;//通过有参构造器进行依赖注入public MessageService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate){this.amqpAdmin = amqpAdmin;this.amqpTemplate = amqpTemplate;//由于声明 Exchange 、 队列 、 绑定(Exchange绑定队列),都只需要做一次即可,因此放在此处构造器中完成即可//创建 fanout 类型的 Exchange ,使用FanoutExchange实现类FanoutExchange exchange = new FanoutExchange(ContentUtil.EXCHANGE_NAME,true,    /* Exchange是否持久化 */false, /* 是否自动删除 */null   /* 额外的参数属性 */);//声明 Exchangethis.amqpAdmin.declareExchange(exchange);//此处循环声明 Queue ,也相当于代码式创建 Queuefor (String queueName : ContentUtil.QUEUE_NAMES){Queue queue = new Queue(queueName,   /* Queue 消息队列名 */true,         /* 是否是持久的消息队列 */false,       /* 是否是独占的消息队列,独占就是是否只允许该消息消费者消费该队列的消息 */false,     /* 是否在没有消息的时候自动删除消息队列 */null       /* 额外的一些消息队列的参数 */);//此处声明 Queue ,也相当于【代码式】创建 Queuethis.amqpAdmin.declareQueue(queue);//声明 Queue 的绑定Binding binding = new Binding(queueName,  /* 指定要分发消息目的地的名称--这里是要发送到这个消息队列里面去 */Binding.DestinationType.QUEUE, /* 分发消息目的的类型,指定要绑定 queue 还是 Exchange */ContentUtil.EXCHANGE_NAME, /* 要绑定的Exchange */"x", /* 因为绑定的Exchange类型是 fanout 扇形(广播)模式,所以路由key随便写,没啥作用 */null);//声明 Queue 的绑定amqpAdmin.declareBinding(binding);}}//发送消息的方法public void publish(String content){//发送消息amqpTemplate.convertAndSend(ContentUtil.EXCHANGE_NAME, /* 指定将消息发送到这个Exchange */"",  /* 因为Exchange是fanout 类型的(广播类型),所以写什么路由key都行,都没意义 */content /* 发送的消息体 */);}
}

PublishController.java 发送消息的控制层

package cn.ljh.app.rabbitmq.controller;import cn.ljh.app.rabbitmq.service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;//发送消息
@RestController
public class PublishController
{private final MessageService messageService;//有参构造器进行依赖注入public PublishController(MessageService messageService){this.messageService = messageService;}@GetMapping("/publish/{message}")//因为{message}是一个路径参数,所以方法接收的时候需要加上注解 @PathVariablepublic String publish(@PathVariable String message){//发布消息messageService.publish(message);return "消息发布成功";}}

MyRabbitMQListener 监听器,监听消息队列

package cn.ljh.app.rabbitmq.listener;import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;//监听器:监听消息队列并进行消费
@Component
public class MyRabbitMQListener
{//queues 指定监听已有的哪个消费队列@RabbitListener(queues = "queue_boot_01")public void onQ1Message(String message){System.err.println("从 queue_boot_01 消息队列接收到的消息:" + message);}//queues 指定监听已有的哪个消费队列@RabbitListener(queues = "queue_boot_02")public void onQ2Message(String message){System.err.println("从 queue_boot_02 消息队列接收到的消息:" + message);}//queues 指定监听已有的哪个消费队列//还可以用 queuesToDeclare 直接声明并监听该队列,还可以用 bindings 进行Exchange和queue的绑定@RabbitListener(queuesToDeclare = @Queue(name = "queue_boot_03",durable = "true",exclusive = "false",autoDelete = "false"),admin = "amqpAdmin" /*指定声明Queue,绑定Queue所用的 amqpAdmin,不指定的话就用容器中默认的那一个 */)public void onQ3Message(String message){System.err.println("从 queue_boot_03 消息队列接收到的消息:" + message);}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.5</version></parent><groupId>cn.ljh</groupId><artifactId>rabbitmq_boot</artifactId><version>1.0.0</version><name>rabbitmq_boot</name><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- RabbitMQ 的依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- web 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 开发者工具的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!-- lombok 依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/105635.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

思维模型 峰终定律

本系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。 1 峰-终定律的应用 1.1 迪士尼游乐园 迪士尼乐园采用了多种策略来创造令人难忘的体验&#xff0c;从而遵循峰终定律的原则。具体如下&#xff1a; 迪士尼乐园的入口设计和服务体验&…

Interlay采用Moonbeam路由流动性,为波卡发展更多流动性

波卡上的首选多链开发平台Moonbeam宣布Interlay现在支持由Carrier赋能的Moonbeam路由流动性。Carrier是一个功能强大的token和NFT跨链桥&#xff0c;支持超过12个网络。Interlay是波卡上的一条平行链&#xff0c;与HydraDX一起通过Wormhole、Moonbeam和Carrier为波卡生态挖掘流…

基于workbench的PTFE矩形密封圈压缩回弹仿真分析

研究背景&#xff1a; 近年来随着工业发展和科技进步&#xff0c;高压容器使用场景逐渐增大&#xff0c;使用环境越发苛刻&#xff0c;如高温、高压以及内部压力的波动&#xff0c;这都对容器端面密封性能的要求更为严格。端面密封所用的密封件必须具备优良的回弹性能和耐化学…

【Vue基础-数字大屏】加载动漫效果

一、需求描述 当网页正在加载而处于空白页面状态时&#xff0c;可以在该页面上显示加载动画提示。 二、步骤代码 1、全局下载npm install -g json-server npm install -g json-server 2、在src目录下新建文件夹mock&#xff0c;新建文件data.json存放模拟数据 {"one&…

推荐《金田一少年事件簿》

天树征丸原作&#xff0c;佐藤文也作画的漫画 金田一少年事件簿 播报编辑讨论7上传视频 《金田一少年事件簿》是一部日本推理漫画。早期原作为金成阳三郎&#xff08;后担任剧本&#xff09;&#xff0c;原作为天树征丸&#xff08;前原案&#xff09;&#xff0c;由漫画家佐…

【网安】网络安全防止个人信息泄露

网络安全防止个人信息泄露 1、尝试检查自己的网络隐私数据是否泄漏过&#xff0c;可以使用下面的网站2、使用安全非盈利组织的浏览器3、安装浏览器插件&#xff0c;防止网络跟踪4、保持安全的访问方式 1、尝试检查自己的网络隐私数据是否泄漏过&#xff0c;可以使用下面的网站 …

【react基础02】编写函数式组件和类组件

函数式组件和类组件 React组件的规范函数式组件类组件 React组件的规范 1、命名遵循PascalCase原则&#xff0c;即所有单词首字母大写&#xff0c;然后拼接在一起 如&#xff1a;Age 、FirstName 函数式组件 代码&#xff1a; import ReactDOM from react-dom/client;const…

springboot的配置项ENC加解密

在web项目中我们看到application文件中很多出现配置项是ENC(xxxxx)&#xff0c;这就表示xxx这个参数是经过加密之后的结果。 我们想要在其他地方使用参数必须要做解密。以下是实现方法。 加解密的实现依赖jasypt。所以需要引入以下jar包 <dependency><groupId>org…

【教程】使用vuepress构建静态文档网站,并部署到github上

官网 快速上手 | VuePress (vuejs.org) 构建项目 我们跟着官网的教程先构建一个demo 这里我把 vuepress-starter 这个项目名称换成了 howtolive 创建并进入一个新目录 mkdir howtolive && cd howtolive使用你喜欢的包管理器进行初始化 yarn init 这里的问题可以一…

2023-2024-1 for循环-1(15-38)

7-15 输出闰年 输出21世纪中截止某个年份以来的所有闰年年份。注意&#xff1a;闰年的判别条件是该年年份能被4整除但不能被100整除、或者能被400整除。 输入格式: 输入在一行中给出21世纪的某个截止年份。 输出格式: 逐行输出满足条件的所有闰年年份&#xff0c;即每个年…

06-Scala面向对象

面向对象编程 ​ Scala是一门完全面向对象的语言&#xff0c;摒弃了Java中很多不是面向对象的语法。 ​ 虽然如此&#xff0c;但其面向对象思想和 Java的面向对象思想还是一致的 Scala包 1&#xff09;基本语法 Scala中基本的package包语法和 Java 完全一致 例如&#xf…

前端TypeScript学习day04-交叉类型与泛型

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 交叉类型 泛型 创建泛型函数 调用泛型函数&#xff1a; 简化调用泛型函数&#xff1a; 泛型约束 指定…

Marin说PCB之BGA焊盘削焊盘带来的焊接问题和解决办法

每周日上午10点钟都是小编最开心的时间了&#xff0c;这个点是斗破苍穹播出的时间。小编我从萧炎从这个动漫开播到现在都追了好多年了&#xff0c;强烈推荐喜欢这个小说的可以看这个动漫&#xff0c;拍的还不错&#xff0c;只是萧炎的配音不再是张沛老师了&#xff0c;有点可惜…

基于Java的宠物领养管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

编写HTTP协议代理的一些知识(源码)

早期上网经常需要使用代理服务器&#xff0c;现在用的比较少了&#xff0c;大家更耳熟能详的反而是“反向代理”如Nginx。 代理服务器一般用作局域网上网&#xff0c;而反向代理则是把来自互联网的连接转发到局域网上&#xff0c;作用刚好相反。 HTTP协议自身就带有对代理服务器…

在 Windows 平台上启动 MATLAB

目录 在 Windows 平台上启动 MATLAB 选择 MATLAB 图标 从 Windows 系统命令行调用 matlab 从 MATLAB 命令提示符调用 matlab 打开与 MATLAB 相关联的文件 从 Windows 资源管理器工具中选择 MATLAB 可执行文件 在 Windows 平台上启动 MATLAB 选择以下一种方式启动 MATLAB…

105AspectRatio调整宽高比组件_flutter

AspectRatio组件 AspectRatio 的作用是根据设置调整子元素 child 的宽高比。 AspectRatio 首先会在布局限制条件允许的范围内尽可能的扩展&#xff0c;widget 的高度是由宽 度和比率决定的&#xff0c;类似于 BoxFit 中的 contain&#xff0c;按照固定比率去尽量占满区域。 …

Java 继承与实现

一、继承&#xff08;extends&#xff09; 1.1 继承概念 继承是面向对象的基本特征&#xff0c;它允许子类继承父类的特征和行为&#xff0c;以提高代码的复用率和维护性等。下面一张图生动地展示了继承和类之间的关系&#xff1a; 继承图 上图中&#xff0c;“动物”、“食草…

数据结构与算法-栈

栈和队列是两种常用的线性结构&#xff0c;属于特殊的线性表&#xff0c;是线性表相关运算的一个子集。一般来说&#xff0c;线性表上的插入和删除操作不受任何限制&#xff0c;但栈只能在表的一端进行插入和删除操作&#xff0c;而队列则只能在一端进行插入操作&#xff0c;在…

物联网市场规模迅速增加,在交通、医疗、农业等方面发展势头迅猛

物联网&#xff08;Internet of things&#xff09;是一系列用于解决物的信息识别、交换、控制等技术的集合应用形成的网络。当连接从互联网时代的人与人走向万物互联&#xff0c;万物的数字化、智能化依赖物联网技术。因此&#xff0c;物联网是指利用各类信息识别设备&#xf…