文章目录
- 逻辑实现
- RabbitExchangeEnum
- RabbitConfig
- RabbitModuleInfo
- RabbitModuleInitializer
- RabbitProperties
- RabbitProducerManager
- POM.xml
- spring.factories
- 功能测试
- application.yml配置
- 生产者:
- 消费者:
- 测试结果:
- 总结
本章内容主要介绍编写一个rabbitmq starter,能够通过配置文件进行配置交换机、队列以及绑定关系等等。项目引用该组件后能够自动初始化交换机和队列,并进行简单通信。
如若有其他需求,可自行扩展,例如消息消费的确认等
参考文章:SpringBoot日常:自定义实现SpringBoot Starter
逻辑实现
下面直接进入主题,介绍整体用到的文件和逻辑内容
RabbitExchangeEnum
交换机枚举类,四种交换机类型,分别是直连交换机、主题交换机、扇出交换机和标题交换机
/*** @Author 码至终章* @Version 1.0*/
public enum RabbitExchangeEnum {DIRECT,TOPIC,FANOUT,HEADERS;
}
RabbitConfig
初始化配置文件
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author 码至终章* @Version 1.0*/
@Configuration
public class RabbitConfig {/*** 通过yaml配置,创建队列、交换机初始化器*/@Bean@ConditionalOnMissingBeanpublic RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);}
}
RabbitModuleInfo
配置信息的映射的文件,用于接收配置文件中配置的交换机和队列属性
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.Data;import java.util.Map;/*** 队列和交换机机绑定关系实体对象** @Author 码至终章* @Version 1.0*/
@Data
public class RabbitModuleInfo {/*** 路由Key*/private String routingKey;/*** 队列信息*/private Queue queue;/*** 交换机信息*/private Exchange exchange;/*** 交换机信息类*/@Datapublic static class Exchange {/*** 交换机类型* 默认直连交换机*/private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;/*** 交换机名称*/private String name;/*** 是否持久化* 默认true持久化,重启消息不会丢失*/private boolean durable = true;/*** 当所有队绑定列均不在使用时,是否自动删除交换机* 默认false,不自动删除*/private boolean autoDelete = false;/*** 交换机其他参数*/private Map<String, Object> arguments;}/*** 队列信息类*/@Datapublic static class Queue {/*** 队列名称*/private String name;/*** 是否持久化* 默认true持久化,重启消息不会丢失*/private boolean durable = true;/*** 是否具有排他性* 默认false,可多个消费者消费同一个队列*/private boolean exclusive = false;/*** 当消费者均断开连接,是否自动删除队列* 默认false,不自动删除,避免消费者断开队列丢弃消息*/private boolean autoDelete = false;/*** 绑定死信队列的交换机名称*/private String deadLetterExchange;/*** 绑定死信队列的路由key*/private String deadLetterRoutingKey;private Map<String, Object> arguments;}}
RabbitModuleInitializer
执行初始化逻辑详情文件,具体的逻辑为根据配置文件信息创建对应的交换机和队列,并设置其属性和绑定关系。
import cn.hutool.core.convert.Convert;
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;/*** @Author cys* @Date 2024/6/17 14:23* @Version 1.0*/
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {AmqpAdmin amqpAdmin;RabbitProperties rabbitProperties;public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {this.amqpAdmin = amqpAdmin;this.rabbitProperties = rabbitProperties;}@Overridepublic void afterSingletonsInstantiated() {log.info("初始化rabbitmq交换机、队列----------------start");declareRabbitModule();log.info("初始化rabbitmq交换机、队列----------------end");}/*** RabbitMQ 根据配置动态创建和绑定队列、交换机*/private void declareRabbitModule() {List<RabbitModuleInfo> rabbitModuleInfos = rabbitProperties.getModules();if (CollectionUtils.isEmpty(rabbitModuleInfos)) {return;}for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {configParamValidate(rabbitModuleInfo);// 队列Queue queue = convertQueue(rabbitModuleInfo.getQueue());// 交换机Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());// 绑定关系String routingKey = rabbitModuleInfo.getRoutingKey();String queueName = rabbitModuleInfo.getQueue().getName();String exchangeName = rabbitModuleInfo.getExchange().getName();Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);// 创建队列if (!isExistQueue(queueName)) {amqpAdmin.declareQueue(queue);}// 创建交换机amqpAdmin.declareExchange(exchange);// 队列 绑定 交换机amqpAdmin.declareBinding(binding);}}/*** RabbitMQ动态配置参数校验** @param rabbitModuleInfo 队列和交换机机绑定关系*/public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {String routingKey = rabbitModuleInfo.getRoutingKey();Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");Assert.isTrue(rabbitModuleInfo.getExchange() != null, String.format("routingKey:%s未配置exchange", routingKey));Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));Assert.isTrue(rabbitModuleInfo.getQueue() != null, String.format("routingKey:%s未配置queue", routingKey));Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));}/*** 转换生成RabbitMQ队列** @param queue 队列* @return Queue*/public Queue convertQueue(RabbitModuleInfo.Queue queue) {Map<String, Object> arguments = queue.getArguments();// 转换ttl的类型为longif (arguments != null && arguments.containsKey("x-message-ttl")) {arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));}// 是否需要绑定死信队列String deadLetterExchange = queue.getDeadLetterExchange();String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {if (arguments == null) {arguments = new HashMap<>(4);}arguments.put("x-dead-letter-exchange", deadLetterExchange);arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);}return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);}/*** 转换生成RabbitMQ交换机** @param exchangeInfo 交换机信息* @return Exchange*/public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {AbstractExchange exchange = null;RabbitExchangeEnum exchangeType = exchangeInfo.getType();String exchangeName = exchangeInfo.getName();boolean isDurable = exchangeInfo.isDurable();boolean isAutoDelete = exchangeInfo.isAutoDelete();Map<String, Object> arguments = exchangeInfo.getArguments();switch (exchangeType) {case DIRECT:// 直连交换机exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case TOPIC:// 主题交换机exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case FANOUT://扇形交换机exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case HEADERS:// 头交换机exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);break;}return exchange;}/*** 判断队列是否存在** @param queueName 队列名* @return boolean*/private boolean isExistQueue(String queueName) {if (StringUtils.isBlank(queueName)) {throw new RuntimeException("队列名称为空");}boolean flag = true;Properties queueProperties = amqpAdmin.getQueueProperties(queueName);if (queueProperties == null) {flag = false;}return flag;}}
RabbitProperties
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.List;/*** @Author 码至终章* @Version 1.0*/
@Component
@ConfigurationProperties(prefix = "cys.rabbit")
@Data
public class RabbitProperties {private List<RabbitModuleInfo> modules;
}
RabbitProducerManager
发送消息的生产者方法
public class RabbitProducerManager {private static final Logger log = LoggerFactory.getLogger(RabbitProducerManager.class);private final RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String rabbitRouting, Object message) {this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);log.info("向路由:{}, 发送消息成功:{}", rabbitRouting, message);}public void sendMessage(String exchange, String rabbitRouting, Object message, CorrelationData correlationData) {this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);log.info("向路由:{}, 发送消息成功:{}, correlationData:{}", new Object[]{rabbitRouting, message, correlationData});}public RabbitProducerManager(final RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}
}
POM.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.7.18</version></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency></dependencies>
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.mycomponent.starter.rabbitmq.config.RabbitConfig,\
cn.mycomponent.starter.rabbitmq.client.RabbitProducerManager,\
cn.mycomponent.starter.rabbitmq.config.RabbitProperties
功能测试
application.yml配置
spring:profiles:active: dev## rabbitmq链接配置 rabbitmq:host: 192.168.199.199port: 5672username: testpassword: 123456789virtual-host: testcys:rabbit:modules:- exchange:name: mytest#type为RabbitExchangeTypeEnum枚举中的值。不配置默认为Directtype: DIRECTqueue:name: default.queuearguments:# 队列中所有消息的最大存活时间。单位毫秒。 1分钟x-message-ttl: 60000# routing-key可以为空routing-key: default.queue.key
生产者:
@TableName(value ="task",autoResultMap = true)
@Data
public class TaskEntity implements Serializable {/*** 主键*/@TableId(type = IdType.AUTO)@TableField(value = "cust_id")private Long custId;
}@RestController
@RequestMapping("/mqtest")
public class MqController {@AutowiredRabbitProducerManager rabbitProducerManager;@AutowiredMailService mailService;@GetMapping("/mqtest")public void test(){TaskEntity taskEntity = new TaskEntity();taskEntity.setCustId(211212L);rabbitProducerManager.sendMessage("mytest","default.queue.key", JSON.toJSONString(taskEntity));}
}
消费者:
@Component
public class MyListener {@RabbitListener(queues = "default.queue")public void handMessage(String message){TaskEntity taskEntity = JSON.parseObject(message, TaskEntity.class);System.out.println("接收到的消息"+taskEntity);}
}
测试结果:
请求接口/mqtest/mqtest
总结
到这为止,关于封装rabbitmq starter就结束了。当然,本文只是介绍了最基础的部分,后续大家可以在这基础上实现扩展,比如统一接受消息再通过事件监听、同一队列设置多个消费者线程等等,说到这里,如果只是丰富的小伙伴可能会想到spring-cloud-starter-stream-rabbit,大家也可以参考参考这个是如何实现的。