手写超级好用的rabbitmq-spring-boot-start启动器

手写超级好用的rabbitmq-spring-boot-start启动器

文章目录

  • 1.前言
  • 2.工程目录结构
  • 3.主要实现原理
    • 3.1spring.factories配置
    • 3.2EnableZlfRabbitMq配置
    • 3.3RabbitAutoConfiguration配置
    • 3.4ZlfRabbitMqRegistrar配置
  • 4.总结

1.前言

  由于springBoot官方提供的默认的rabbitMq自动装配不是那么好用,一个项目中只能配置使用一个rabbitMq的服务器,队列也需要编码的方式定义,这种繁杂且不易使用,用一次需要写一次硬编码,之前有一个想法是,能不能使用springBoot官方提供的自动装配实现一个多rabbitMq多队列配置并且支持多种延迟队列的这种多对多关系的实现,但是左思右想,springBoot官方提供的这个rabbitMq自动装配不能满足我的需求,所以我在酝酿了很久,也把官方那个自动装配的源码看了一遍又一遍,随着我之前手写实现了好几个starter启动器,然后就想实现一个rabbitMq的starter,只要简单的配置即可轻松的实现上面的功能,然后提供了一套好用的api,使用的时候只需要在项目中引入这个启动器,节省很大的硬编码和配置灵活,配置改变只需要重启项目即可,对业务使用友好的一个starter启动器,再也不用为如何使用rabbitMq的集成而烦恼了,只需要简单的配置就可以实现好用的功能,让我们把精力放在业务上,而不是代码和代码集成上,大大的提升开发效率和节省我们宝贵的时间,让我们用宝贵的时间来享受时光,生命和生活,效率至上,远离加班,简约也简单,优雅也优美,简单配置就可以实现交换机、队列、绑定关系等根据配置自动装配,然后实现发送普通消息和3种延迟队列发送延迟消息,3中延迟队列实现如下:

  一:延迟插件实现延迟队列

    交换机类型必须CustomExchange

  二:TTL + 死信队列/延迟交换机实现延迟队列

  三: 延迟交换机 + 消息设置setHeader(“x-delay”, xxx)

  还可以配置相同的rabbitMq服务器不同的虚拟机,单独配置,遵循下标递增不重复即可

实现思路如下。

2.工程目录结构

image-20240312212331654

3.主要实现原理

3.1spring.factories配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.zlf.config.RabbitConfig,\com.zlf.config.ExchangeQueueConfig,\com.zlf.starter.RabbitAutoConfiguration

3.2EnableZlfRabbitMq配置

package com.zlf.starter;import org.springframework.context.annotation.Import;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 使用需要在启动类上加入@EnableZlfRabbit注解* 启动类上排除默认的自动装配RabbitAutoConfiguration** @author zlf* 启动类上加入如下配置* @SpringBootApplication(exclude = {* RabbitAutoConfiguration.class})* @Import(value = {RabbitService.class, ZlfMqSpringUtils.class})*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(ZlfRabbitMqRegistrar.class)
public @interface EnableZlfRabbitMq {}

3.3RabbitAutoConfiguration配置

package com.zlf.starter;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Configuration;@Configuration(proxyBeanMethods = false)
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public class RabbitAutoConfiguration {}

3.4ZlfRabbitMqRegistrar配置

package com.zlf.starter;import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.zlf.config.ExchangeQueueConfig;
import com.zlf.config.ExchangeQueueProperties;
import com.zlf.config.RabbitConfig;
import com.zlf.config.RabbitProperties;
import com.zlf.config.RabbitProperties.AmqpContainer;
import com.zlf.config.RabbitProperties.Cache;
import com.zlf.config.RabbitProperties.Cache.Connection;
import com.zlf.config.RabbitProperties.ContainerType;
import com.zlf.config.RabbitProperties.DirectContainer;
import com.zlf.config.RabbitProperties.ListenerRetry;
import com.zlf.config.RabbitProperties.Retry;
import com.zlf.config.RabbitProperties.SimpleContainer;
import com.zlf.config.RabbitProperties.Template;
import com.zlf.constants.ErrorExchangeQueueInfo;
import com.zlf.dto.ExchangeQueueDto;
import com.zlf.enums.DelayTypeEnum;
import com.zlf.enums.ExchangeTypeEnum;
import com.zlf.enums.FunctionTypeEnum;
import com.zlf.service.RabbitService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.AbstractExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.CollectionUtils;import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;/*** spring:* rabbitmq:* listener:* simple:* acknowledge-mode: auto #由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack* manual:手动ack,需要在业务代码结束后,调用api发送ack。* auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack* none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(此时,消息投递是不可靠的,可能丢失)* <p>* 原文链接:https://blog.csdn.net/m0_53142956/article/details/127792054** @author zlf*/
@Slf4j
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties(RabbitConfig.class)
public class ZlfRabbitMqRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware, BeanFactoryAware {private BeanFactory beanFactory;private RabbitConfig rabbitConfig;private ExchangeQueueConfig exchangeQueueConfig;@SneakyThrows@Overridepublic void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {List<RabbitProperties> rps = rabbitConfig.getRps();if (CollectionUtils.isEmpty(rps)) {throw new RuntimeException("rabbitMq的rps配置不为空,请检查配置!");}log.info("zlf.registerBeanDefinitions:rps.size:{},rps:{}", rps.size(), JSON.toJSONString(rps));List<ExchangeQueueProperties> eqps = exchangeQueueConfig.getEqps();if (CollectionUtils.isEmpty(eqps)) {throw new RuntimeException("rabbitMq的eqps配置不为空,请检查配置!");}log.info("zlf.registerBeanDefinitions:eqps.size:{},rps:{}", eqps.size(), JSON.toJSONString(eqps));for (int i = 0; i < rps.size(); i++) {this.checkRabbitProperties(rps.get(i));CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(getRabbitConnectionFactoryBean(rps.get(i)).getObject());cachingConnectionFactory.setAddresses(rps.get(i).determineAddresses());cachingConnectionFactory.setPublisherReturns(rps.get(i).getPublisherReturns());cachingConnectionFactory.setPublisherConfirmType(rps.get(i).getPublisherConfirmType());Cache.Channel channel = rps.get(i).getCache().getChannel();if (Objects.nonNull(channel.getSize())) {cachingConnectionFactory.setChannelCacheSize(channel.getSize());}if (Objects.nonNull(channel.getCheckoutTimeout())) {Duration checkoutTimeout = channel.getCheckoutTimeout();cachingConnectionFactory.setChannelCheckoutTimeout(checkoutTimeout.toMillis());}Connection connection = rps.get(i).getCache().getConnection();if (Objects.nonNull(connection.getMode())) {cachingConnectionFactory.setCacheMode(connection.getMode());}if (Objects.nonNull(connection.getSize())) {cachingConnectionFactory.setConnectionCacheSize(connection.getSize());}// 注册cachingConnectionFactory的bean((ConfigurableBeanFactory) this.beanFactory).registerSingleton(CachingConnectionFactory.class.getName() + i, cachingConnectionFactory);log.info("zlf.ConfigurableBeanFactory注册完成,beanName:{}", CachingConnectionFactory.class.getName() + i);// 注册RabbitAdmin的beanRabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory);((ConfigurableBeanFactory) this.beanFactory).registerSingleton(RabbitAdmin.class.getName() + i, rabbitAdmin);log.info("zlf.RabbitAdmin注册完成,beanName:{}", RabbitAdmin.class.getName() + i);//构建发送的RabbitTemplate实例关联连接工厂Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);Template template = rps.get(i).getTemplate();ConfirmType publisherConfirmType = rps.get(i).getPublisherConfirmType();log.info("第{}个配置的publisherConfirmType:{}", i, JSON.toJSONString(publisherConfirmType));//生产者confirm/*** publish-confirm-type:开启publisher-confirm,这里支持两种类型:* simple:【同步】等待confirm结果,直到超时(可能引起代码阻塞)* correlated:【异步】回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback* publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback* template.mandatory:* 定义当消息从交换机路由到队列失败时的策略。* 【true,则调用ReturnCallback;false:则直接丢弃消息】*/if (ConfirmType.CORRELATED.equals(publisherConfirmType)) {rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (Objects.nonNull(correlationData)) {if (Objects.nonNull(ack) && ack) {log.info("消息发送成功->correlationData:{}", JSON.toJSONString(correlationData));} else if (StringUtils.isNotBlank(cause)) {log.error("消息->correlationData:{}->发送失败原因->{}", JSON.toJSONString(correlationData), cause);}}if (Objects.nonNull(ack) && ack) {log.info("消息发送成功ack:{}", ack);}if (StringUtils.isNotBlank(cause)) {log.error("消息发送失败原因->cause:{}", cause);}if (Objects.isNull(correlationData) && Objects.isNull(ack) && StringUtils.isEmpty(cause)) {log.info("消息发送成功,收到correlationData,ack,cause都是null");}});}Boolean publisherReturns = rps.get(i).getPublisherReturns();Boolean mandatory = template.getMandatory();log.info("第{}个配置的publisherReturns:{},mandatory:{}", i, publisherReturns, mandatory);//消息回调//开启强制回调if (mandatory && publisherReturns) {rabbitTemplate.setMandatory(template.getMandatory());rabbitTemplate.setMandatoryExpression(new ValueExpression<>(template.getMandatory()));//设置消息回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error("消息->{}路由失败", message);// 如果有需要的话,重发消息});}Retry retry = rps.get(i).getTemplate().getRetry();if (retry.isEnabled()) {RetryTemplate retryTemplate = new RetryTemplate();SimpleRetryPolicy policy = new SimpleRetryPolicy();retryTemplate.setRetryPolicy(policy);policy.setMaxAttempts(retry.getMaxAttempts());ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setMultiplier(retry.getMultiplier());if (Objects.nonNull(retry.getMaxInterval())) {backOffPolicy.setMaxInterval(retry.getMaxInterval().toMillis());}rabbitTemplate.setRetryTemplate(retryTemplate);}Duration receiveTimeout = template.getReceiveTimeout();if (Objects.nonNull(receiveTimeout)) {rabbitTemplate.setReceiveTimeout(receiveTimeout.toMillis());}Duration replyTimeout = template.getReplyTimeout();if (Objects.nonNull(replyTimeout)) {rabbitTemplate.setReplyTimeout(replyTimeout.toMillis());}String exchange = template.getExchange();if (StringUtils.isNotBlank(exchange)) {rabbitTemplate.setExchange(exchange);}String routingKey = template.getRoutingKey();if (StringUtils.isNotBlank(routingKey)) {rabbitTemplate.setRoutingKey(routingKey);}String defaultReceiveQueue = template.getDefaultReceiveQueue();if (StringUtils.isNotBlank(defaultReceiveQueue)) {rabbitTemplate.setDefaultReceiveQueue(defaultReceiveQueue);}((ConfigurableBeanFactory) this.beanFactory).registerSingleton(RabbitTemplate.class.getName() + i, rabbitTemplate);log.info("zlf.RabbitTemplate注册完成,beanName:{}", RabbitTemplate.class.getName() + i);// 不注册RabbitServiceRabbitService rabbitService = new RabbitService();//构建监听工厂实例并注入ContainerType type = rps.get(i).getListener().getType();if (ContainerType.SIMPLE.equals(type)) {Map<String, String> errorExchangeQueueRelationship = this.createErrorExchangeQueueRelationship(String.valueOf(i), rabbitService, rabbitAdmin);SimpleContainer simple = rps.get(i).getListener().getSimple();ConstructorArgumentValues cas3 = new ConstructorArgumentValues();MutablePropertyValues values3 = new MutablePropertyValues();this.getAmqpContainer(simple, values3, cachingConnectionFactory, jackson2JsonMessageConverter, rabbitTemplate, errorExchangeQueueRelationship);if (Objects.nonNull(simple.getConcurrency())) {values3.add("concurrentConsumers", simple.getConcurrency());}if (Objects.nonNull(simple.getMaxConcurrency())) {values3.add("maxConcurrentConsumers", simple.getMaxConcurrency());}if (Objects.nonNull(simple.getBatchSize())) {values3.add("batchSize", simple.getBatchSize());}RootBeanDefinition rootBeanDefinition3 = new RootBeanDefinition(SimpleRabbitListenerContainerFactory.class, cas3, values3);beanDefinitionRegistry.registerBeanDefinition(SimpleRabbitListenerContainerFactory.class.getName() + i, rootBeanDefinition3);log.info("zlf.SimpleRabbitListenerContainerFactory注册完成,beanName:{}", SimpleRabbitListenerContainerFactory.class.getName() + i);} else if (ContainerType.DIRECT.equals(type)) {Map<String, String> errorExchangeQueueRelationship = this.createErrorExchangeQueueRelationship(String.valueOf(i), rabbitService, rabbitAdmin);DirectContainer direct = rps.get(i).getListener().getDirect();ConstructorArgumentValues cas4 = new ConstructorArgumentValues();MutablePropertyValues values4 = new MutablePropertyValues();this.getAmqpContainer(direct, values4, cachingConnectionFactory, jackson2JsonMessageConverter, rabbitTemplate, errorExchangeQueueRelationship);if (Objects.nonNull(direct.getConsumersPerQueue())) {values4.add("consumersPerQueue", direct.getConsumersPerQueue());}RootBeanDefinition rootBeanDefinition4 = new RootBeanDefinition(DirectRabbitListenerContainerFactory.class, cas4, values4);beanDefinitionRegistry.registerBeanDefinition(DirectRabbitListenerContainerFactory.class.getName() + i, rootBeanDefinition4);log.info("zlf.DirectRabbitListenerContainerFactory注册完成,beanName:{}", DirectRabbitListenerContainerFactory.class.getName() + i);}//解析注册交换机、队列和绑定关系ExchangeQueueProperties exchangeQueueProperties = eqps.get(i);log.info("zlf.exchangeQueueProperties:{}", JSON.toJSONString(exchangeQueueProperties));Integer index = exchangeQueueProperties.getIndex();log.info("zlf.exchangeQueueProperties.index:{}", index);if (Objects.isNull(index)) {throw new RuntimeException("exchangeQueueProperties.index不为空");}if (Objects.nonNull(exchangeQueueProperties)) {log.info("zlf.exchangeQueueProperties:{}", JSON.toJSONString(exchangeQueueProperties));List<ExchangeQueueDto> eqs = exchangeQueueProperties.getEqs();if (CollectionUtil.isNotEmpty(eqs)) {for (int k = 0; k < eqs.size(); k++) {String bindingIndex = index.toString() + k;log.info("zlf.bindingIndex:{}", bindingIndex);ExchangeQueueDto exchangeQueueDto = eqs.get(k);log.info("zlf.exchangeQueueDto:{}", JSON.toJSONString(exchangeQueueDto));String functionType = exchangeQueueDto.getFunctionType();log.info("zlf.functionType:{}", functionType);if (FunctionTypeEnum.NORMAL.getFunctionType().equals(functionType)) {this.createRelationship(FunctionTypeEnum.NORMAL, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex, false);} else if (FunctionTypeEnum.DELAY.getFunctionType().equals(functionType)) {Integer delayType = exchangeQueueDto.getDelayType();log.info("zlf.delayType:{}", delayType);if (DelayTypeEnum.ONE.getDelayType().equals(delayType)) {//延迟插件实现延迟队列String exchangeType = exchangeQueueDto.getExchangeType();if (!ExchangeTypeEnum.CUSTOM.getExchangeType().equals(exchangeType)) {throw new RuntimeException("延迟插件实现延迟队列交换机类型exchangeType必须定义为:custom");}this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex, false);} else if (DelayTypeEnum.TWO.getDelayType().equals(delayType)) {//TTL + 死信队列实现延迟队列this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex, false);} else if (DelayTypeEnum.THREE.getDelayType().equals(delayType)) {//延迟交换机 + 消息设置setHeader("x-delay", xxx)this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex, true);}}}}}}}/*** 检查rabbitProperties配置的主要参数** @param rabbitProperties*/private void checkRabbitProperties(RabbitProperties rabbitProperties) {String virtualHost = rabbitProperties.getVirtualHost();if (StringUtils.isEmpty(virtualHost)) {throw new RuntimeException("RabbitProperties.virtualHost不为空");}String addresses = rabbitProperties.getAddresses();if (StringUtils.isEmpty(addresses)) {throw new RuntimeException("RabbitProperties.addresses不为空");}Integer port = rabbitProperties.getPort();if (Objects.isNull(port)) {throw new RuntimeException("RabbitProperties.port不为空");}String username = rabbitProperties.getUsername();if (StringUtils.isEmpty(username)) {throw new RuntimeException("RabbitProperties.username不为空");}String password = rabbitProperties.getPassword();if (StringUtils.isEmpty(password)) {throw new RuntimeException("RabbitProperties.password不为空");}}/*** 创建关系** @param functionTypeEnum* @param exchangeQueueDto* @param rabbitService* @param rabbitAdmin* @param bindingIndex*/private void createRelationship(FunctionTypeEnum functionTypeEnum, ExchangeQueueDto exchangeQueueDto, RabbitService rabbitService, RabbitAdmin rabbitAdmin, String bindingIndex, Boolean isDelayed) {String exchangeName = exchangeQueueDto.getExchangeName();String exchangeType = exchangeQueueDto.getExchangeType();HashMap<String, Object> exchangeArgs = exchangeQueueDto.getExchangeArgs();log.info("zlf" + functionTypeEnum.getFunctionType() + ".exchangeName:{},exchangeType:{},exchangeArgs:{}", exchangeName, exchangeType, JSON.toJSONString(exchangeArgs));AbstractExchange exchange1 = rabbitService.createExchange(rabbitAdmin, exchangeName, exchangeType, exchangeArgs, isDelayed);exchangeName = exchangeName + bindingIndex;((ConfigurableBeanFactory) this.beanFactory).registerSingleton(exchangeName, exchange1);log.info("zlf." + functionTypeEnum.getFunctionType() + ".Exchange注册完成,beanName:{}", exchangeName);String queueName = exchangeQueueDto.getQueueName();HashMap<String, Object> queueArgs = exchangeQueueDto.getQueueArgs();String routingKey1 = exchangeQueueDto.getRoutingKey();log.info("zlf." + functionTypeEnum.getFunctionType() + ".queueName:{},queueArgs:{},routingKey1:{}", queueName, JSON.toJSONString(queueArgs), routingKey1);Queue queue = rabbitService.createQueue(rabbitAdmin, queueName, queueArgs);queueName = queueName + bindingIndex;((ConfigurableBeanFactory) this.beanFactory).registerSingleton(queueName, queue);log.info("zlf." + functionTypeEnum.getFunctionType() + ".Queue注册完成,beanName:{}", queueName);Binding binding = rabbitService.binding(rabbitAdmin, exchange1, queue, routingKey1);((ConfigurableBeanFactory) this.beanFactory).registerSingleton(Binding.class.getName() + bindingIndex, binding);log.info("zlf." + functionTypeEnum.getFunctionType() + ".Binding注册完成bindingIndex:{},beanName:{}", bindingIndex, Binding.class.getName() + bindingIndex);Integer delayType = exchangeQueueDto.getDelayType();if (DelayTypeEnum.TWO.getDelayType().equals(delayType)) {String dlxExchangeName = exchangeQueueDto.getDlxExchangeName();if (StringUtils.isEmpty(dlxExchangeName)) {throw new RuntimeException("TTL + 死信队列实现延迟队列配置参数dlxExchangeName不为空!");}String dlxExchangeType = exchangeQueueDto.getDlxExchangeType();if (StringUtils.isEmpty(dlxExchangeType)) {throw new RuntimeException("TTL + 死信队列实现延迟队列配置参数dlxExchangeType不为空!");}AbstractExchange exchange2 = rabbitService.createExchange(rabbitAdmin, dlxExchangeName, dlxExchangeType, exchangeArgs, false);dlxExchangeName = dlxExchangeName + bindingIndex;((ConfigurableBeanFactory) this.beanFactory).registerSingleton(dlxExchangeName, exchange2);log.info("zlf.TTL + 死信队列实现延迟队列,死信交换机注册完成,beanName:{}", dlxExchangeName);String dlxQueueName = exchangeQueueDto.getDlxQueueName();Queue queue2 = rabbitService.createQueue(rabbitAdmin, dlxQueueName, null);dlxQueueName = dlxQueueName + bindingIndex;((ConfigurableBeanFactory) this.beanFactory).registerSingleton(dlxQueueName, queue2);log.info("zlf.TTL + 死信队列实现延迟队列,死信队列注册完成,beanName:{}", dlxQueueName);String dlxKey = exchangeQueueDto.getDlxKey();Binding binding2 = rabbitService.binding(rabbitAdmin, exchange2, queue2, dlxKey);String dlxBeanName = "dlx" + Binding.class.getName() + bindingIndex + 1;((ConfigurableBeanFactory) this.beanFactory).registerSingleton(dlxBeanName, binding2);log.info("zlf.TTL + 死信队列实现延迟队列,死信交换机绑定队列的绑定关系注册完成,beanName:{}", dlxBeanName);}}private Map<String, String> createErrorExchangeQueueRelationship(String index, RabbitService rabbitService, RabbitAdmin rabbitAdmin) {Map<String, String> resultMap = new HashMap<>();String exchangeName = ErrorExchangeQueueInfo.ERROR_EXCHANGE_PREFIX + index;AbstractExchange exchange = rabbitService.createExchange(rabbitAdmin, exchangeName, ErrorExchangeQueueInfo.ERROR_EXCHANGE_TYPE, null, false);((ConfigurableBeanFactory) this.beanFactory).registerSingleton(exchangeName, exchange);log.info("zlf.ErrorExchange注册完成,beanName:{}", exchangeName);String queueName = ErrorExchangeQueueInfo.ERROR_QUEUE_PREFIX + index;Queue queue = rabbitService.createQueue(rabbitAdmin, queueName, null);((ConfigurableBeanFactory) this.beanFactory).registerSingleton(queueName, queue);log.info("zlf.ErrorQueue注册完成,beanName:{}", queueName);String errorRoutingKey = ErrorExchangeQueueInfo.ERROR_KEY_PREFIX + index;log.info("zlf.errorRoutingKey:{}", errorRoutingKey);Binding errorBinding = rabbitService.binding(rabbitAdmin, exchange, queue, errorRoutingKey);String errorBingBeanName = ErrorExchangeQueueInfo.ERROR_BINDING_BANE_NAME_PREFIX + Binding.class.getSimpleName() + index;((ConfigurableBeanFactory) this.beanFactory).registerSingleton(errorBingBeanName, errorBinding);log.info("zlf.ErrorBing注册完成,beanName:{}", errorBingBeanName);resultMap.put("errorExchange", exchangeName);resultMap.put("errorRoutingKey", errorRoutingKey);return resultMap;}private void getAmqpContainer(AmqpContainer amqpContainer, MutablePropertyValues values, CachingConnectionFactory cachingConnectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter, RabbitTemplate rabbitTemplate, Map<String, String> errorExchangeQueueRelationship) {values.add("connectionFactory", cachingConnectionFactory);values.add("autoStartup", amqpContainer.isAutoStartup());values.add("messageConverter", jackson2JsonMessageConverter);if (Objects.nonNull(amqpContainer.getAcknowledgeMode())) {values.add("acknowledgeMode", amqpContainer.getAcknowledgeMode());}if (Objects.nonNull(amqpContainer.getPrefetch())) {values.add("prefetch", amqpContainer.getPrefetch());}if (Objects.nonNull(amqpContainer.getDefaultRequeueRejected())) {values.add("defaultRequeueRejected", amqpContainer.getDefaultRequeueRejected());}if (Objects.nonNull(amqpContainer.getIdleEventInterval())) {values.add("idleEventInterval", amqpContainer.getIdleEventInterval());}values.add("missingQueuesFatal", amqpContainer.isMissingQueuesFatal());ListenerRetry retry2 = amqpContainer.getRetry();if (retry2.isEnabled()) {RetryInterceptorBuilder<?, ?> builder = (retry2.isStateless()) ? RetryInterceptorBuilder.stateless(): RetryInterceptorBuilder.stateful();RetryTemplate retryTemplate = new RetryTemplate();SimpleRetryPolicy policy = new SimpleRetryPolicy();retryTemplate.setRetryPolicy(policy);policy.setMaxAttempts(retry2.getMaxAttempts());ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setMultiplier(retry2.getMultiplier());if (Objects.nonNull(retry2.getMaxInterval())) {backOffPolicy.setMaxInterval(retry2.getMaxInterval().toMillis());}builder.retryOperations(retryTemplate);/*** 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:** RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,【丢弃消息】【默认】就是这种方式* ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(Immediate立刻重入队)(但是频率比没有配置消费失败重载机制低一些)* RepublishMessageRecoverer(推荐):重试耗尽后,将失败消息投递到指定的交换机*///消息接受拒绝后发送到异常队列String errorExchange = errorExchangeQueueRelationship.get("errorExchange");String errorRoutingKey = errorExchangeQueueRelationship.get("errorRoutingKey");MessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, errorExchange, errorRoutingKey);log.info("zlf.MessageRecoverer.errorExchange:{},errorRoutingKey:{}", errorExchange, errorRoutingKey);builder.recoverer(recoverer);values.add("adviceChain", builder.build());}}private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties)throws Exception {PropertyMapper map = PropertyMapper.get();RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();map.from(properties::determineHost).whenNonNull().to(factory::setHost);map.from(properties::determinePort).to(factory::setPort);map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);map.from(properties::getRequestedChannelMax).to(factory::setRequestedChannelMax);RabbitProperties.Ssl ssl = properties.getSsl();if (ssl.determineEnabled()) {factory.setUseSSL(true);map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);map.from(ssl::getKeyStore).to(factory::setKeyStore);map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);map.from(ssl::getTrustStore).to(factory::setTrustStore);map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);map.from(ssl::isValidateServerCertificate).to((validate) -> factory.setSkipServerCertificateValidation(!validate));map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification);}map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout);factory.afterPropertiesSet();return factory;}@Overridepublic void setEnvironment(Environment environment) {// 通过Binder将environment中的值转成对象rabbitConfig = Binder.get(environment).bind(getPropertiesPrefix(RabbitConfig.class), RabbitConfig.class).get();exchangeQueueConfig = Binder.get(environment).bind(getPropertiesPrefix(ExchangeQueueConfig.class), ExchangeQueueConfig.class).get();}private String getPropertiesPrefix(Class<?> tClass) {return Objects.requireNonNull(AnnotationUtils.getAnnotation(tClass, ConfigurationProperties.class)).prefix();}@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {this.beanFactory = beanFactory;}}

4.总结

  到此,手写rabbitMq的starter实现思路就已经全部分享完了,思路比代码更重要,代码只是一个参考,用这个思路实现更多更方便简单快捷高效的轮子,制造轮子也是一种提升和给你带来成就感的事情,累并快乐着,后面我会将我之前手写的starter全部开源出来,然后将gitHub或码云地址分享给大家,制造轮子,开源给大家使用,这本身就是一种开源的精神和乐趣,Java生态最缺的就不是轮子是好用的轮子,请在看我的文章或者是转发,请把本文的原出处和作者写上去,尊重版权,创作不易,禁止原模原样的搬过去就是自己的原创,这种是不道德的行为,见到请如实举报,或者联系本作者来举报,这个starter,说实话也是构思酝酿了好久,猛干了2-3天才干出来,颈椎都干的酸,我得休息加强锻炼身体了,说实话写这个starter还是挺累的,但是搞出来的成就感满满,也方便以后集成快速使用,配置多个的rabbitMq服务器也测试过了的,也是ok的,但是配置一个rabbitMq和多个交换机、队列和绑定关系以及3种延迟队列实现是亲自测试OK的,希望我的分享能给你帮助和启发,请一键三连,么么么哒!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/742789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python代码操作PPT:PowerPoint演示文稿的合并与拆分

多个PowerPoint演示文稿的处理可能会成为非常麻烦的工作。有时需要将多个演示文稿合并为一个演示文稿&#xff0c;从而不用在演示时重复打开演示文稿&#xff1b;有时又需要将单个演示文稿拆分为多个演示文稿&#xff0c;用于其他目的或进行分发。手动进行这些操作会非常麻烦&a…

阿里云环境下,从仅知的一个外网域名如何找出前端程序的部署所在和启动命令

一、背景 一个年久失修的前端项目&#xff0c;临时接到需求要迭代&#xff0c;三四年未迭代过的项目&#xff0c;交接更无从谈起。 所幸的是&#xff0c;源码还在&#xff0c;知道外网访问的入口地址。 本文试着带你一起找到该前端项目部署在哪台机器&#xff0c;以及发布的过…

opencv的approxPolyDP函数

cv2.approxPolyDP() 是 OpenCV 库中的一个函数&#xff0c;用于逼近多边形曲线。它可以将一条曲线用更少的点来表示&#xff0c;同时尽可能地保持其形状。原来是使用Douglas-Peucker算法&#xff0c;表示曲线上的点与逼近后的多边形之间的最大距离d&#xff0c;若d小于epsilon&…

SpringCloudEureka理论与入门

文章目录 1. 前置工作1.1 搭建 user-server1.1.1 pom1.1.2 po&#xff0c;mapper&#xff0c;controller1.1.3 yml1.1.4 启动类1.1.5 启动并访问 1.2 搭建 order-server1.2.1 pom1.2.2 po mapper controller1.2.3 yml1.2.4 启动类1.2.5 启动并访问 1.3 两个服务通信 2. Eureka2…

【Swing】Java Swing实现省市区选择编辑器

【Swing】Java Swing实现省市区选择编辑器 1.需求描述2.需求实现3.效果展示 系统&#xff1a;Win10 JDK&#xff1a;1.8.0_351 IDEA&#xff1a;2022.3.3 1.需求描述 在公司的一个 Swing 的项目上需要实现一个选择省市区的编辑器&#xff0c;这还是第一次做这种编辑器&#xf…

开源办公系统CRM管理系统

基于ThinkPHP6 Layui MySQL的企业办公系统。集成系统设置、人事管理、消息管理、审批管理、日常办公、客户管理、合同管理、项目管理、财务管理、电销接口集成、在线签章等模块。系统简约&#xff0c;易于功能扩展&#xff0c;方便二次开发。 服务器运行环境要求 PHP > 7.…

Mybatis Plus + Spring 分包配置 ClickHouse 和 Mysql 双数据源

目录 一、背景 二、各个配置文件总览&#xff08;文件位置因人而异&#xff09; 2.1 DataSourceConfig 2.2 MybatisClickHouseConfig &#xff08;ClickHouse 配置类&#xff09; 2.3 MybatisMysqlConfig&#xff08;Mysql 配置类&#xff09; 2.4 application.propertie…

《安富莱嵌入式周报》第334期:开源SEM扫描电子显微镜,自制编辑器并搭建嵌入式环境,免费产品设计审查服务,实用电子技术入门,USB资料汇总,UDS统一诊断

周报汇总地址&#xff1a;嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 视频版&#xff1a; https://www.bilibili.com/video/BV1om411Z714/ 《安富莱嵌入式周报》第334期&#xff1a;开源SEM…

ABCDE联合创始人和普通合伙人BMAN确认出席Hack .Summit() 2024

ABCDE联合创始人和普通合伙人BMAN确认出席Hack .Summit() 2024&#xff01; ABCDE联合创始人和普通合伙人BMAN确认出席由 Hack VC 主办&#xff0c;并由 AltLayer 和 Berachain 联合主办&#xff0c;与 SNZ 和数码港合作&#xff0c;由 Techub News 承办的Hack.Summit() 2024区…

【Paper Reading】6.RLHF-V 提出用RLHF的1.4k的数据微调显著降低MLLM的虚幻问题

分类 内容 论文题目 RLHF-V: Towards Trustworthy MLLMs via Behavior Alignment from Fine-grained Correctional Human Feedback 作者 作者团队&#xff1a;由来自清华大学和新加坡国立大学的研究者组成&#xff0c;包括Tianyu Yu, Yuan Yao, Haoye Zhang, Taiwen He, Y…

VB播放器(动态服务器获取歌词)-183-(代码+说明)

转载地址: http://www.3q2008.com/soft/search.asp?keyword183 VBASP vb动态从服务器读取歌词 VB asp交互 程序, 模式不一样, 与普通的MP3播放器不一样, 这个是可以实现歌词从服务器上查询功能的. 看好了在咨询 我可以給您演示 目  录 前  言 1 1 . 简述 2 1.1 开发…

阿里云国际DDoS高防接入配置最佳实践

业务接入DDoS高防产品后&#xff0c;可以将攻击流量引流到DDoS高防&#xff0c;有效避免业务在遭受大流量DDoS攻击时出现服务不可用的情况&#xff0c;确保源站服务器的稳定可靠。本文九河云的接入配置和防护策略最佳实践&#xff0c;在各类场景中使用DDoS高防更好地保护您的业…

Linux系统下基于VSCode和Cmake进行C++开发

目录 简介一、GCC编译器1.1创建cpp文件1.2编译过程1.3g重要编译参数 二、GDB调试器三、IDE-VScode3.1 VSCode常用快捷键3.2 swap测试 四、CMake4.1CMake介绍4.2 CMake语法特性介绍4.3 CMake重要指令和常用变量4.4 CMake编译流程4.5CMake代码实践 五、使用VSCode进行完整项目开发…

【JavaScript】面试手撕柯里化函数

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 引入柯里化定义实现快速使用柯里化的作用提高自由度bind函数 参考资料 引入 上周…

开源绘图工具 PlantUML 入门教程(常用于画类图、用例图、时序图等)

文章目录 一、类图二、用例图三、时序图 一、类图 类的UML图示 startuml skinparam classAttributeIconSize 0 class Dummy {-field1 : String#field2 : int~method1() : Stringmethod2() : void } enduml定义能见度&#xff08;可访问性&#xff09; startumlclass Dummy {-f…

ES6(一):let和const、模板字符串、函数默认值、剩余参数、扩展运算符、箭头函数

一、let和const声明变量 1.let没有变量提升&#xff0c;把let放下面打印不出来&#xff0c;放上面可以 <script>console.log(a);let a1;</script> 2.let是一个块级作用域,花括号里面声明的变量外面找不到 <script>console.log(b);if(true){let b1;}//und…

vscode插件开发-发布插件

安装vsce vsce是“Visual Studio Code Extensions”的缩写&#xff0c;是一个用于打包、发布和管理VS Code扩展的命令行工具。 确保您安装了Node.js。然后运行&#xff1a; npm install -g vscode/vsce 您可以使用vsce轻松打包和发布扩展&#xff1a; // 打包插件生成name…

一.java介绍和idea基础使用

java技术体系 Java技术体系说明Java SE(Java standard Edition):标准版Java技术的核心和基础Java EE(Java Enterprise Edition):企业版企业级应用开发的一套解决方案Java ME(Java Micro Edition):小型版针对移动设备应用的解决方案&#xff08;认可少&#xff0c;主要是安卓和…

目标跟踪SORT算法原理浅析

SORT算法 Simple Online and Realtime Tracking(SORT)是一个非常简单、有效、实用的多目标跟踪算法。在SORT中&#xff0c;仅仅通过IOU来进行匹配虽然速度非常快&#xff0c;但是ID switch依然非常严重。 SORT最大特点是基于Faster RCNN的目标检测方法&#xff0c;并利用卡尔…

Unity开发中Partial 详细使用案例

文章目录 **1. 分割大型类****2. 与 Unity 自动生成代码协同工作****3. 团队协作****4. 共享通用逻辑****5. 自定义编辑器相关代码****6. 配合 Unity 的 ScriptableObjects 使用****7. 多人协作与版本控制系统友好** 在 Unity 开发中&#xff0c; partial 关键字是 C# 语言提供…