207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

目录

  • ★ 发送消息
  • ★ 创建队列的两种方式
  • 代码演示
    • 需求1:发送消息
      • 1、ContentUtil 先定义常量
      • 2、RabbitMQConfig 创建队列的两种方式之一:
        • 配置式:
          • 问题:
      • 3、MessageService 编写逻辑
      • PublishController 控制器
      • application.properties 配置属性
      • 测试:消息发送
  • ★ 接收消息
    • 代码演示:
      • 测试: 消息接收
  • ★ 定制监听器容器工厂
  • 完整代码:
    • application.properties RabbitMQ的连接等属性配置
    • ContentUtil 常量工具类
    • RabbitMQConfig 配置式创建消息队列
    • MessageService 发送消息的业务代码
    • PublishController.java 发送消息的控制层
    • MyRabbitMQListener 监听器,监听消息队列
    • pom.xml


在这里插入图片描述

★ 发送消息

- Spring Boot可以将AmqpAdmin和AmqpTemplate注入任何其他组件,接下来该组件即可通过AmqpAdmin来管理Exchange、队列和绑定,还可通过AmqpTemplate来发送消息。 - Spring Boot还会自动配置一个RabbitMessagingTemplate Bean(RabbitAutoConfiguration负责配置),如果想使用它来发送、接收消息,可使用RabbitMessagingTemplate代替上面的AmqpTemplate,两个Template的注入方式完全相同。

在这里插入图片描述

★ 创建队列的两种方式

 方式一(编程式):在程序中通过AmqpAdmin创建队列。方式二(配置式):在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。

代码演示

需求1:发送消息

1、ContentUtil 先定义常量

在这里插入图片描述

2、RabbitMQConfig 创建队列的两种方式之一:

配置式:

在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。
就是在配置类中创建一个生成消息队列的@Bean。

问题:

用 @Configuration 注解声明为配置类,但是项目启动的时候没有自动生成这个队列。
据了解是因为RabbitMQ使用了懒加载,大概是没有消费者监听这个队列,就没有创建。
但是当我写后面的代码后,这个消息队列就生成了,但是并没有消费者去监听这个队列。
这有点想不通。
不知道后面是哪里的代码让这个配置类能成功声明这个消息队列出来。
在这里插入图片描述
水落石出:
经过测试:
在下面的MessageService 这个类中,依赖注入了 AmqpAdmin 和 AmqpTemplate 这两个对象,当我们通过这两个对象去声明队列、Exchange 和绑定的时候,配置类中的创建消息队列的bean就能成功创建队列。
这张图结合下面的 MessageService 中的代码就可得知:
这是依赖注入 AmqpAdmin 和 AmqpTemplate 这两个对象的有参构造器中声明的。
在这里插入图片描述

3、MessageService 编写逻辑

声明Exchange 、 消息队列 、 Exchange和消息队列的绑定、发送消息的方法等
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

PublishController 控制器

在这里插入图片描述

application.properties 配置属性

在这里插入图片描述
在这里插入图片描述

测试:消息发送

成功生成队列
在这里插入图片描述
发送消息测试
在这里插入图片描述
在这里插入图片描述




★ 接收消息

@RabbitListener 注解修饰的方法将被注册为消息监听器方法。

 【备注】:该注解可通过queues属性指定它要监听的、已有的消息队列,它也可使用queuesToDeclare来声明队列,并监听该队列。- 如果没有显式配置监听器容器工厂(RabbitListenerContainerFactory),Spring Boot会在容器中自动配置一个SimpleRabbitListenerContainerFactory Bean作为监听器容器工厂,如果希望使用DirectRabbitListenerContainerFactory,可在application.properties文件中添加如下配置:spring.rabbitmq.listener.type=direct▲ 如果在容器中配置了MessageRecoverer或MessageConverter,它们会被自动关联到默认的监听器容器工厂。



代码演示:

创建个消息队列的监听器就可以了。

@RabbitListener 注解修饰的方法将被注册为消息监听器方法。

该注解可通过queues属性指定它要监听的、已有的消息队列
它也可使用queuesToDeclare来声明队列,并监听该队列
还可以用 bindings 进行 Exchange和queue的绑定操作。
在这里插入图片描述

测试: 消息接收

在这里插入图片描述
发送消息和监听消息
在这里插入图片描述




★ 定制监听器容器工厂

▲ 如果要定义更多的监听器容器工厂或覆盖默认的监听器容器工厂,可通过Spring Boot提供的SimpleRabbitListenerContainerFactoryConfigurer
或DirectRabbitListenerContainerFactoryConfigurer来实现,它们可对SimpleRabbitListenerContainerFactory
或DirectRabbitListenerContainerFactory进行与自动配置相同的设置。 ▲ 有了自定义的监听器容器工厂之后,可通过@RabbitListener注解的containerFactory属性来指定使用自定义的监听器容器工厂,
例如如下注解代码:@RabbitListener(queues = "myQueue1", containerFactory="myFactory")

完整代码:

application.properties RabbitMQ的连接等属性配置

# 配置连接 RabbitMQ 的基本信息------------------------------------------------------
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 下面属性可配置多个以逗号隔开的连接地址,一旦配置了该属性,host 和 port 属性就会被忽略
# spring.rabbitmq.addresses=
spring.rabbitmq.username=ljh
spring.rabbitmq.password=123456
# 连接虚拟主机
spring.rabbitmq.virtual-host=my-vhost01# 配置RabbitMQ的缓存相关信息--------------------------------------------------------
# 指定缓存 connection ,还是缓存 channel
spring.rabbitmq.cache.connection.mode=channel
# 指定可以缓存多少个 Channel
spring.rabbitmq.cache.channel.size=50
# 如果选择的缓存模式是 connection , 那么就可以配置如下属性
# spring.rabbitmq.cache.connection.size=15# 配置 和 RabbitTemplate 相关的属性--------------------------------------------------
# 指定 RabbitTemplate 发送消息失败时会重新尝试
spring.rabbitmq.template.retry.enabled=true
# RabbitTemplate 发送消息失败后每隔1秒重新尝试发送消息
spring.rabbitmq.template.retry.initial-interval=1s
# RabbitTemplate 发送消息失败时,最多尝试重新发送消息的次数
spring.rabbitmq.template.retry.max-attempts=5
# 设置每次尝试重新发送消息的时间间隔是一个等比数列:1s, 2s, 4s, 8s, 16s
# 第一次等1s后尝试,第二次等2s后尝试,第三次等4s后尝试重新发送消息......
spring.rabbitmq.template.retry.multiplier=2
# 指定发送消息时默认的Exchange名
spring.rabbitmq.template.exchange=""
# 指定发送消息时默认的路由key
spring.rabbitmq.template.routing-key="test"# 配置和消息监听器的容器工厂相关的属性--------------------------------------------------
# 指定监听器容器工厂的类型
spring.rabbitmq.listener.type=simple
# 指定消息的确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 指定获取消息失败时,是否重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 发送消息失败时,最多尝试重新发送消息的次数
spring.rabbitmq.listener.simple.retry.max-attempts=2
# 发送消息失败后每隔1秒重新尝试发送消息
spring.rabbitmq.listener.simple.retry.initial-interval=1s

ContentUtil 常量工具类

package cn.ljh.app.rabbitmq.util;//常量
public class ContentUtil
{//定义Exchange的常量-----fanout:扇形,就是广播类型public static final String EXCHANGE_NAME = "boot.fanout";//消息队列数组public static final String[] QUEUE_NAMES =new String[] {"queue_boot_01","queue_boot_02","queue_boot_03"};}

RabbitMQConfig 配置式创建消息队列

package cn.ljh.app.rabbitmq.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;//配置式:在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列
//声明这个类为配置类
@Configuration
public class RabbitMQConfig
{//用配置式的方式在RabbitMQ中定义队列@Beanpublic Queue myQueue(){//在容器中配置一个 Queue Bean,Spring 就会为它在 RabbitMQ 中自动创建对应的 Queuereturn new Queue("queue_boot",   /* Queue 消息队列名 */true,         /* 是否是持久的消息队列 */false,       /* 是否是独占的消息队列,独占就是是否只允许该消息消费者消费该队列的消息 */false,     /* 是否在没有消息的时候自动删除消息队列 */null       /* 额外的一些消息队列的参数 */);}
}

MessageService 发送消息的业务代码

声明Exchange 、Queue ,Exchange 绑定Queue,发送消息代码

package cn.ljh.app.rabbitmq.service;import cn.ljh.app.rabbitmq.util.ContentUtil;
import org.springframework.amqp.core.*;
import org.springframework.stereotype.Service;//业务逻辑:声明Exchange 和 Queue 消息队列,发送消息的方法
@Service
public class MessageService
{//AmqpAdmin来管理Exchange、队列和绑定private final AmqpAdmin amqpAdmin;//AmqpTemplate来发送消息private final AmqpTemplate amqpTemplate;//通过有参构造器进行依赖注入public MessageService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate){this.amqpAdmin = amqpAdmin;this.amqpTemplate = amqpTemplate;//由于声明 Exchange 、 队列 、 绑定(Exchange绑定队列),都只需要做一次即可,因此放在此处构造器中完成即可//创建 fanout 类型的 Exchange ,使用FanoutExchange实现类FanoutExchange exchange = new FanoutExchange(ContentUtil.EXCHANGE_NAME,true,    /* Exchange是否持久化 */false, /* 是否自动删除 */null   /* 额外的参数属性 */);//声明 Exchangethis.amqpAdmin.declareExchange(exchange);//此处循环声明 Queue ,也相当于代码式创建 Queuefor (String queueName : ContentUtil.QUEUE_NAMES){Queue queue = new Queue(queueName,   /* Queue 消息队列名 */true,         /* 是否是持久的消息队列 */false,       /* 是否是独占的消息队列,独占就是是否只允许该消息消费者消费该队列的消息 */false,     /* 是否在没有消息的时候自动删除消息队列 */null       /* 额外的一些消息队列的参数 */);//此处声明 Queue ,也相当于【代码式】创建 Queuethis.amqpAdmin.declareQueue(queue);//声明 Queue 的绑定Binding binding = new Binding(queueName,  /* 指定要分发消息目的地的名称--这里是要发送到这个消息队列里面去 */Binding.DestinationType.QUEUE, /* 分发消息目的的类型,指定要绑定 queue 还是 Exchange */ContentUtil.EXCHANGE_NAME, /* 要绑定的Exchange */"x", /* 因为绑定的Exchange类型是 fanout 扇形(广播)模式,所以路由key随便写,没啥作用 */null);//声明 Queue 的绑定amqpAdmin.declareBinding(binding);}}//发送消息的方法public void publish(String content){//发送消息amqpTemplate.convertAndSend(ContentUtil.EXCHANGE_NAME, /* 指定将消息发送到这个Exchange */"",  /* 因为Exchange是fanout 类型的(广播类型),所以写什么路由key都行,都没意义 */content /* 发送的消息体 */);}
}

PublishController.java 发送消息的控制层

package cn.ljh.app.rabbitmq.controller;import cn.ljh.app.rabbitmq.service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;//发送消息
@RestController
public class PublishController
{private final MessageService messageService;//有参构造器进行依赖注入public PublishController(MessageService messageService){this.messageService = messageService;}@GetMapping("/publish/{message}")//因为{message}是一个路径参数,所以方法接收的时候需要加上注解 @PathVariablepublic String publish(@PathVariable String message){//发布消息messageService.publish(message);return "消息发布成功";}}

MyRabbitMQListener 监听器,监听消息队列

package cn.ljh.app.rabbitmq.listener;import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;//监听器:监听消息队列并进行消费
@Component
public class MyRabbitMQListener
{//queues 指定监听已有的哪个消费队列@RabbitListener(queues = "queue_boot_01")public void onQ1Message(String message){System.err.println("从 queue_boot_01 消息队列接收到的消息:" + message);}//queues 指定监听已有的哪个消费队列@RabbitListener(queues = "queue_boot_02")public void onQ2Message(String message){System.err.println("从 queue_boot_02 消息队列接收到的消息:" + message);}//queues 指定监听已有的哪个消费队列//还可以用 queuesToDeclare 直接声明并监听该队列,还可以用 bindings 进行Exchange和queue的绑定@RabbitListener(queuesToDeclare = @Queue(name = "queue_boot_03",durable = "true",exclusive = "false",autoDelete = "false"),admin = "amqpAdmin" /*指定声明Queue,绑定Queue所用的 amqpAdmin,不指定的话就用容器中默认的那一个 */)public void onQ3Message(String message){System.err.println("从 queue_boot_03 消息队列接收到的消息:" + message);}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.5</version></parent><groupId>cn.ljh</groupId><artifactId>rabbitmq_boot</artifactId><version>1.0.0</version><name>rabbitmq_boot</name><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- RabbitMQ 的依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- web 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 开发者工具的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!-- lombok 依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/105635.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

思维模型 峰终定律

本系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。 1 峰-终定律的应用 1.1 迪士尼游乐园 迪士尼乐园采用了多种策略来创造令人难忘的体验&#xff0c;从而遵循峰终定律的原则。具体如下&#xff1a; 迪士尼乐园的入口设计和服务体验&…

基于workbench的PTFE矩形密封圈压缩回弹仿真分析

研究背景&#xff1a; 近年来随着工业发展和科技进步&#xff0c;高压容器使用场景逐渐增大&#xff0c;使用环境越发苛刻&#xff0c;如高温、高压以及内部压力的波动&#xff0c;这都对容器端面密封性能的要求更为严格。端面密封所用的密封件必须具备优良的回弹性能和耐化学…

【Vue基础-数字大屏】加载动漫效果

一、需求描述 当网页正在加载而处于空白页面状态时&#xff0c;可以在该页面上显示加载动画提示。 二、步骤代码 1、全局下载npm install -g json-server npm install -g json-server 2、在src目录下新建文件夹mock&#xff0c;新建文件data.json存放模拟数据 {"one&…

推荐《金田一少年事件簿》

天树征丸原作&#xff0c;佐藤文也作画的漫画 金田一少年事件簿 播报编辑讨论7上传视频 《金田一少年事件簿》是一部日本推理漫画。早期原作为金成阳三郎&#xff08;后担任剧本&#xff09;&#xff0c;原作为天树征丸&#xff08;前原案&#xff09;&#xff0c;由漫画家佐…

【教程】使用vuepress构建静态文档网站,并部署到github上

官网 快速上手 | VuePress (vuejs.org) 构建项目 我们跟着官网的教程先构建一个demo 这里我把 vuepress-starter 这个项目名称换成了 howtolive 创建并进入一个新目录 mkdir howtolive && cd howtolive使用你喜欢的包管理器进行初始化 yarn init 这里的问题可以一…

2023-2024-1 for循环-1(15-38)

7-15 输出闰年 输出21世纪中截止某个年份以来的所有闰年年份。注意&#xff1a;闰年的判别条件是该年年份能被4整除但不能被100整除、或者能被400整除。 输入格式: 输入在一行中给出21世纪的某个截止年份。 输出格式: 逐行输出满足条件的所有闰年年份&#xff0c;即每个年…

前端TypeScript学习day04-交叉类型与泛型

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 交叉类型 泛型 创建泛型函数 调用泛型函数&#xff1a; 简化调用泛型函数&#xff1a; 泛型约束 指定…

Marin说PCB之BGA焊盘削焊盘带来的焊接问题和解决办法

每周日上午10点钟都是小编最开心的时间了&#xff0c;这个点是斗破苍穹播出的时间。小编我从萧炎从这个动漫开播到现在都追了好多年了&#xff0c;强烈推荐喜欢这个小说的可以看这个动漫&#xff0c;拍的还不错&#xff0c;只是萧炎的配音不再是张沛老师了&#xff0c;有点可惜…

基于Java的宠物领养管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

在 Windows 平台上启动 MATLAB

目录 在 Windows 平台上启动 MATLAB 选择 MATLAB 图标 从 Windows 系统命令行调用 matlab 从 MATLAB 命令提示符调用 matlab 打开与 MATLAB 相关联的文件 从 Windows 资源管理器工具中选择 MATLAB 可执行文件 在 Windows 平台上启动 MATLAB 选择以下一种方式启动 MATLAB…

105AspectRatio调整宽高比组件_flutter

AspectRatio组件 AspectRatio 的作用是根据设置调整子元素 child 的宽高比。 AspectRatio 首先会在布局限制条件允许的范围内尽可能的扩展&#xff0c;widget 的高度是由宽 度和比率决定的&#xff0c;类似于 BoxFit 中的 contain&#xff0c;按照固定比率去尽量占满区域。 …

Java 继承与实现

一、继承&#xff08;extends&#xff09; 1.1 继承概念 继承是面向对象的基本特征&#xff0c;它允许子类继承父类的特征和行为&#xff0c;以提高代码的复用率和维护性等。下面一张图生动地展示了继承和类之间的关系&#xff1a; 继承图 上图中&#xff0c;“动物”、“食草…

数据结构与算法-栈

栈和队列是两种常用的线性结构&#xff0c;属于特殊的线性表&#xff0c;是线性表相关运算的一个子集。一般来说&#xff0c;线性表上的插入和删除操作不受任何限制&#xff0c;但栈只能在表的一端进行插入和删除操作&#xff0c;而队列则只能在一端进行插入操作&#xff0c;在…

物联网市场规模迅速增加,在交通、医疗、农业等方面发展势头迅猛

物联网&#xff08;Internet of things&#xff09;是一系列用于解决物的信息识别、交换、控制等技术的集合应用形成的网络。当连接从互联网时代的人与人走向万物互联&#xff0c;万物的数字化、智能化依赖物联网技术。因此&#xff0c;物联网是指利用各类信息识别设备&#xf…

Ceph介绍与部署

Ceph介绍与部署 一、存储基础1.1、单机存储设备1.1.1、单机存储的问题 1.2、商业存储解决方案1.3、分布式存储&#xff08;软件定义的存储 SDS&#xff09;1.3.1、分布式存储的类型 二、Ceph 简介三、Ceph 优势四、Ceph 架构五、Ceph 核心组件5.1、Pool中数据保存方式支持两种类…

Vue2实现图片预览功能 -- v-viewer:图片查看器

一. 先看效果图 二. 具体步骤 简介&#xff1a;一款基于 viewer.js 封装的Vue版插件&#xff0c;可用于图像查看&#xff0c;以及图片的旋转、缩放等功能预览 官网&#xff1a;v-viewer 文档说明&#xff1a;Vue图片浏览组件v-viewer&#xff0c;支持旋转、缩放、翻转等操作 - …

[小林coding]4.2TCP重传,滑动窗口,流量控制,拥塞控制_1013

1.重传 1.1 超时重传 两个情况&#xff1a; a 数据包丢失 b ack应答丢失 RTT&#xff1a;网络包往返的时间&#xff08;不是一个定值&#xff09; RTO&#xff1a;超时重传的时间间隔&#xff08;也是一个动态的&#xff09; RTO设置的时间长&#xff1a;浪费时间资源&…

grafana api创建dashboard 记录

文章目录 json model导入申请api key创建dashboard删除dashboard json model导入 直接在ui通过json model 导入&#xff0c;开发自己用还好&#xff0c;但对非开发人员不太友好&#xff0c;故考虑通过api后台自动创建 api doc : https://grafana.com/docs/grafana/v9.3/devel…

AR动态贴纸SDK,让创作更加生动有趣

在当今的社交媒体时代&#xff0c;视频已经成为了人们表达自我、分享生活的重要方式。然而&#xff0c;如何让你的视频在众多的信息中脱颖而出&#xff0c;吸引更多的关注和点赞呢&#xff1f;答案可能就在你的手中——美摄AR动态贴纸SDK。 美摄AR动态贴纸SDK是一款专为视频编辑…

R语言——赋值(= ,<- ,<<-)

R语言 R语言——赋值&#xff08; &#xff0c;<- &#xff0c;<<-&#xff09; 文章目录 R语言一、 与 <- 的区别二、 <<- ,向上一环境层写入变量 R语言中" <- " 与 " " 都可以用来赋值&#xff0c;但R中建议使用" <- “…