SpringBoot日常:封装rabbitmq starter组件

文章目录

    • 逻辑实现
      • RabbitExchangeEnum
      • RabbitConfig
      • RabbitModuleInfo
      • RabbitModuleInitializer
      • RabbitProperties
      • RabbitProducerManager
      • POM.xml
      • spring.factories
    • 功能测试
      • application.yml配置
      • 生产者:
      • 消费者:
      • 测试结果:
      • 总结

本章内容主要介绍编写一个rabbitmq starter,能够通过配置文件进行配置交换机、队列以及绑定关系等等。项目引用该组件后能够自动初始化交换机和队列,并进行简单通信。
如若有其他需求,可自行扩展,例如消息消费的确认等
参考文章:SpringBoot日常:自定义实现SpringBoot Starter

逻辑实现

下面直接进入主题,介绍整体用到的文件和逻辑内容

RabbitExchangeEnum

交换机枚举类,四种交换机类型,分别是直连交换机、主题交换机、扇出交换机和标题交换机

/*** @Author 码至终章* @Version 1.0*/
public enum RabbitExchangeEnum {DIRECT,TOPIC,FANOUT,HEADERS;
}

RabbitConfig

初始化配置文件

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author 码至终章* @Version 1.0*/
@Configuration
public class RabbitConfig {/*** 通过yaml配置,创建队列、交换机初始化器*/@Bean@ConditionalOnMissingBeanpublic RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);}
}

RabbitModuleInfo

配置信息的映射的文件,用于接收配置文件中配置的交换机和队列属性

import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.Data;import java.util.Map;/*** 队列和交换机机绑定关系实体对象** @Author 码至终章* @Version 1.0*/
@Data
public class RabbitModuleInfo {/*** 路由Key*/private String routingKey;/*** 队列信息*/private Queue queue;/*** 交换机信息*/private Exchange exchange;/*** 交换机信息类*/@Datapublic static class Exchange {/*** 交换机类型* 默认直连交换机*/private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;/*** 交换机名称*/private String name;/*** 是否持久化* 默认true持久化,重启消息不会丢失*/private boolean durable = true;/*** 当所有队绑定列均不在使用时,是否自动删除交换机* 默认false,不自动删除*/private boolean autoDelete = false;/*** 交换机其他参数*/private Map<String, Object> arguments;}/*** 队列信息类*/@Datapublic static class Queue {/*** 队列名称*/private String name;/*** 是否持久化* 默认true持久化,重启消息不会丢失*/private boolean durable = true;/*** 是否具有排他性* 默认false,可多个消费者消费同一个队列*/private boolean exclusive = false;/*** 当消费者均断开连接,是否自动删除队列* 默认false,不自动删除,避免消费者断开队列丢弃消息*/private boolean autoDelete = false;/*** 绑定死信队列的交换机名称*/private String deadLetterExchange;/*** 绑定死信队列的路由key*/private String deadLetterRoutingKey;private Map<String, Object> arguments;}}

RabbitModuleInitializer

执行初始化逻辑详情文件,具体的逻辑为根据配置文件信息创建对应的交换机和队列,并设置其属性和绑定关系。

import cn.hutool.core.convert.Convert;
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;/*** @Author cys* @Date 2024/6/17 14:23* @Version 1.0*/
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {AmqpAdmin amqpAdmin;RabbitProperties rabbitProperties;public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {this.amqpAdmin = amqpAdmin;this.rabbitProperties = rabbitProperties;}@Overridepublic void afterSingletonsInstantiated() {log.info("初始化rabbitmq交换机、队列----------------start");declareRabbitModule();log.info("初始化rabbitmq交换机、队列----------------end");}/*** RabbitMQ 根据配置动态创建和绑定队列、交换机*/private void declareRabbitModule() {List<RabbitModuleInfo> rabbitModuleInfos = rabbitProperties.getModules();if (CollectionUtils.isEmpty(rabbitModuleInfos)) {return;}for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {configParamValidate(rabbitModuleInfo);// 队列Queue queue = convertQueue(rabbitModuleInfo.getQueue());// 交换机Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());// 绑定关系String routingKey = rabbitModuleInfo.getRoutingKey();String queueName = rabbitModuleInfo.getQueue().getName();String exchangeName = rabbitModuleInfo.getExchange().getName();Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);// 创建队列if (!isExistQueue(queueName)) {amqpAdmin.declareQueue(queue);}// 创建交换机amqpAdmin.declareExchange(exchange);// 队列 绑定 交换机amqpAdmin.declareBinding(binding);}}/*** RabbitMQ动态配置参数校验** @param rabbitModuleInfo 队列和交换机机绑定关系*/public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {String routingKey = rabbitModuleInfo.getRoutingKey();Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");Assert.isTrue(rabbitModuleInfo.getExchange() != null, String.format("routingKey:%s未配置exchange", routingKey));Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));Assert.isTrue(rabbitModuleInfo.getQueue() != null, String.format("routingKey:%s未配置queue", routingKey));Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));}/*** 转换生成RabbitMQ队列** @param queue 队列* @return Queue*/public Queue convertQueue(RabbitModuleInfo.Queue queue) {Map<String, Object> arguments = queue.getArguments();// 转换ttl的类型为longif (arguments != null && arguments.containsKey("x-message-ttl")) {arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));}// 是否需要绑定死信队列String deadLetterExchange = queue.getDeadLetterExchange();String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {if (arguments == null) {arguments = new HashMap<>(4);}arguments.put("x-dead-letter-exchange", deadLetterExchange);arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);}return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);}/*** 转换生成RabbitMQ交换机** @param exchangeInfo 交换机信息* @return Exchange*/public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {AbstractExchange exchange = null;RabbitExchangeEnum exchangeType = exchangeInfo.getType();String exchangeName = exchangeInfo.getName();boolean isDurable = exchangeInfo.isDurable();boolean isAutoDelete = exchangeInfo.isAutoDelete();Map<String, Object> arguments = exchangeInfo.getArguments();switch (exchangeType) {case DIRECT:// 直连交换机exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case TOPIC:// 主题交换机exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case FANOUT://扇形交换机exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case HEADERS:// 头交换机exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);break;}return exchange;}/*** 判断队列是否存在** @param queueName 队列名* @return boolean*/private boolean isExistQueue(String queueName) {if (StringUtils.isBlank(queueName)) {throw new RuntimeException("队列名称为空");}boolean flag = true;Properties queueProperties = amqpAdmin.getQueueProperties(queueName);if (queueProperties == null) {flag = false;}return flag;}}

RabbitProperties

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.List;/*** @Author 码至终章* @Version 1.0*/
@Component
@ConfigurationProperties(prefix = "cys.rabbit")
@Data
public class RabbitProperties {private List<RabbitModuleInfo> modules;
}

RabbitProducerManager

发送消息的生产者方法

public class RabbitProducerManager {private static final Logger log = LoggerFactory.getLogger(RabbitProducerManager.class);private final RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String rabbitRouting, Object message) {this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);log.info("向路由:{}, 发送消息成功:{}", rabbitRouting, message);}public void sendMessage(String exchange, String rabbitRouting, Object message, CorrelationData correlationData) {this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);log.info("向路由:{}, 发送消息成功:{}, correlationData:{}", new Object[]{rabbitRouting, message, correlationData});}public RabbitProducerManager(final RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}
}

POM.xml

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.7.18</version></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency></dependencies>

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.mycomponent.starter.rabbitmq.config.RabbitConfig,\
cn.mycomponent.starter.rabbitmq.client.RabbitProducerManager,\
cn.mycomponent.starter.rabbitmq.config.RabbitProperties

功能测试

application.yml配置

spring:profiles:active: dev## rabbitmq链接配置  rabbitmq:host: 192.168.199.199port: 5672username: testpassword: 123456789virtual-host: testcys:rabbit:modules:- exchange:name: mytest#type为RabbitExchangeTypeEnum枚举中的值。不配置默认为Directtype: DIRECTqueue:name: default.queuearguments:# 队列中所有消息的最大存活时间。单位毫秒。 1分钟x-message-ttl: 60000# routing-key可以为空routing-key: default.queue.key

生产者:

@TableName(value ="task",autoResultMap = true)
@Data
public class TaskEntity implements Serializable {/*** 主键*/@TableId(type = IdType.AUTO)@TableField(value = "cust_id")private Long custId;
}@RestController
@RequestMapping("/mqtest")
public class MqController {@AutowiredRabbitProducerManager rabbitProducerManager;@AutowiredMailService mailService;@GetMapping("/mqtest")public void test(){TaskEntity taskEntity = new TaskEntity();taskEntity.setCustId(211212L);rabbitProducerManager.sendMessage("mytest","default.queue.key", JSON.toJSONString(taskEntity));}
}

消费者:

@Component
public class MyListener {@RabbitListener(queues = "default.queue")public void handMessage(String message){TaskEntity taskEntity = JSON.parseObject(message, TaskEntity.class);System.out.println("接收到的消息"+taskEntity);}
}

测试结果:

请求接口/mqtest/mqtest
在这里插入图片描述

总结

到这为止,关于封装rabbitmq starter就结束了。当然,本文只是介绍了最基础的部分,后续大家可以在这基础上实现扩展,比如统一接受消息再通过事件监听、同一队列设置多个消费者线程等等,说到这里,如果只是丰富的小伙伴可能会想到spring-cloud-starter-stream-rabbit,大家也可以参考参考这个是如何实现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42121.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

stm32 USB CDC类虚拟串口初体验

1. 目标 本文介绍CubeMX生成 USB CDC类虚拟串口工程的操作步骤。 2. 配置流程 时钟配置 usb外设需要48M时钟输入 stm32405使用外部时钟源HSE,否则配不出来48M时钟stm32h750内部有一个48M时钟 stm32f405时钟配置 stm32h750时钟配置 Connectivity ->USB_OTG_FS 和 Connect…

C++初阶:从C过渡到C++的入门基础

✨✨所属专栏&#xff1a;C✨✨ ✨✨作者主页&#xff1a;嶔某✨✨ C发展历史 C的起源可以追溯到1979年&#xff0c;当时BjarneStroustrup(本贾尼斯特劳斯特卢普&#xff0c;这个翻译的名字不同的地⽅可能有差异)在⻉尔实验室从事计算机科学和软件⼯程的研究⼯作。⾯对项⽬中复…

JavaDS —— 顺序表ArrayList

顺序表 顺序表是用一段物理地址连续的存储单元依次存储数据元素的线性结构&#xff0c;一般情况下采用数组存储。在数组上完成数据的增删查改。在物理和逻辑上都是连续的。 模拟实现 下面是我们要自己模拟实现的方法&#xff1a; 首先我们要创建一个顺序表&#xff0c;顺序表…

关于Mars3d的入门

关于Mars3d的入门 一. 创建地球&#xff0c;加载瓦片图层二 矢量图层2.1 常用矢量图层2.1.1 GraphicLayer2.1.2 GeoJsonLayer 2.2 矢量图层的点击事件 三 矢量数据四 事件机制 一. 创建地球&#xff0c;加载瓦片图层 // 1. 创建地球let map new mars3d.Map("mars3dContai…

从零开始做题:My_lllp

题目 给出一张png图片 解题 ┌──(holyeyes㉿kali2023)-[~/Misc/题目/zulu/My_lllp] └─$ python2 lsb.py extract my_lllp.png out.txt my_lllp [] Image size: 1080x1079 pixels. [] Written extracted data to out.txt. ┌──(holyeyes㉿kali2023)-[~/Misc/题目/zul…

简易Qt串口助手

界面显示如下 关于串口类 初始化 设置串口号 设置波特率 打开串口 发送按钮功能实现 接收数据显示在控件中 关闭串口

使用 MFA 保护对企业应用程序的访问

多因素身份验证&#xff08;MFA&#xff09;是在授予用户访问特定资源的权限之前&#xff0c;使用多重身份验证来验证用户身份的过程&#xff0c;仅使用单一因素&#xff08;传统上是用户名和密码&#xff09;来保护资源&#xff0c;使它们容易受到破坏&#xff0c;添加其他身份…

springboot非物质文化遗产管理系统-计算机毕业设计源码16087

目录 摘要 1 绪论 1.1 选题背景与意义 1.2国内外研究现状 1.3论文结构与章节安排 2系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1系统开发流程 2.2.2 用户登录流程 2.2.3 系统操作流程 2.2.4 添加信息流程 2.2.5 修改信息流程 2.2.6 删除信息流程 2.3 系统功能…

前端开发过程中经常遇到的问题以及对应解决方法 (持续更新)

我的朋友已经工作了 3 年&#xff0c;他过去一直担任前端工程师。 不幸的是&#xff0c;他被老板批评了&#xff0c;因为他在工作中犯了一个错误&#xff0c;这是一个非常简单但容易忽视的问题&#xff0c;我想也是很多朋友容易忽视的一个问题。 今天我把它分享出来&#xff…

Linux三剑客(grep、awk和sed)操作及与管道结合使用

1. 总览 grep、sed和awk被称为Linux三剑客&#xff0c;是因为它们在文本处理和数据操作方面极其强大且常用。 Linux三剑客在文件处理中的作用&#xff1a; grep&#xff08;数据查找定位&#xff09;&#xff1a;文本搜索工具&#xff0c;在文件中搜索符合正则表达式的文本内容…

Redis原理-数据结构

Redis原理篇 1、原理篇-Redis数据结构 1.1 Redis数据结构-动态字符串 我们都知道Redis中保存的Key是字符串&#xff0c;value往往是字符串或者字符串的集合。可见字符串是Redis中最常用的一种数据结构。 不过Redis没有直接使用C语言中的字符串&#xff0c;因为C语言字符串存…

【大模型LLM面试合集】大语言模型架构_attention

1.attention 1.Attention 1.1 讲讲对Attention的理解&#xff1f; Attention机制是一种在处理时序相关问题的时候常用的技术&#xff0c;主要用于处理序列数据。 核心思想是在处理序列数据时&#xff0c;网络应该更关注输入中的重要部分&#xff0c;而忽略不重要的部分&…

BJT的结构(晶体管电压/电流+β+晶体管特性曲线/截止与饱和+直流负载线(Q点))+单片机数码管基础

2024-7-8&#xff0c;星期一&#xff0c;20:23&#xff0c;天气&#xff1a;晴&#xff0c;心情&#xff1a;晴。今天没有什么特殊的事情发生&#xff0c;周末休息了两天&#xff0c;周一回来继续学习啦&#xff0c;加油加油&#xff01;&#xff01;&#xff01; 今日完成模电…

视频号矩阵管理系统:短视频内容营销的智能助手

随着短视频行业的蓬勃发展&#xff0c;视频号矩阵管理系统应运而生&#xff0c;为内容创作者和品牌提供了一站式的短视频管理和营销解决方案。本文将深入探讨视频号矩阵管理系统的核心功能&#xff0c;以及它如何助力用户在短视频营销领域取得成功。 视频号矩阵管理系统概述 …

在PyTorch中使用TensorBoard

文章目录 在PyTorch中使用TensorBoard1.安装2.TensorBoard使用2.1创建SummaryWriter实例2.2利用add_scalar()记录metrics2.3关闭Writer2.4启动TensorBoard 3.本地连接服务器使用TensorBoard3.1方法一&#xff1a;使用SSH命令进行本地端口转发3.2方法二&#xff1a;启动TensorBo…

如何将资源前端通过 Docker 部署到远程服务器

作为一个程序员&#xff0c;在开发过程中&#xff0c;经常会遇到项目部署的问题&#xff0c;在现在本就不稳定的大环境下&#xff0c;前端开发也需要掌握部署技能&#xff0c;来提高自己的生存力&#xff0c;今天就详细说一下如何把一个前端资源放到远程服务器上面通过docker部…

【Python】不小心卸载pip后(手动安装pip的两种方式)

文章目录 方法一&#xff1a;使用get-pip.py脚本方法二&#xff1a;使用easy_install注意事项 不小心卸载pip后&#xff1a;手动安装pip的两种方式 在使用Python进行开发时&#xff0c;pip作为Python的包管理工具&#xff0c;是我们安装和管理Python库的重要工具。然而&#x…

产品经理技能揭秘:如何巧妙启发需求,引领市场新潮流

文章目录 引言一、需求启发的定义二、需求启发的艺术三、需求启发的重要性四、需求启发的流程五、需求启发的问题与挑战内部自身的问题与挑战&#xff1a;挑战一&#xff1a;知识的诅咒挑战二&#xff1a;做与定义的不同挑战三&#xff1a;沟通障碍挑战四&#xff1a;需求变更频…

solidity:构造函数和修饰器、事件

构造函数​ 构造函数&#xff08;constructor&#xff09;是一种特殊的函数&#xff0c;每个合约可以定义一个&#xff0c;并在部署合约的时候自动运行一次。它可以用来初始化合约的一些参数&#xff0c;例如初始化合约的owner地址&#xff1a; address owner; // 定义owner变…

电脑找回彻底删除文件?四个实测效果的方法【一键找回】

电脑数据删除了还能恢复吗&#xff1f;可以的&#xff0c;只要我们及时撤销上一步删除操作&#xff0c;还是有几率找回彻底删除文件。 当我们的电脑文件被彻底删除后&#xff0c;尽管恢复的成功率可能受到多种因素的影响&#xff0c;但仍有几种方法可以尝试找回这些文件。本文整…