目录
一、单个RabbitMQ配置
1.1、导入Maven坐标
1.2、yaml配置
1.3、java配置类
1.3.1、交换机配置
1.3.2、队列配置
1.3.3、绑定配置
1.3.4、连接配置
1.4、生产者与消费者操作配置
1.4.1、生产者操作配置
1.4.2、消费者操作配置
二、多个RabbitMQ配置
2.1、yaml配置
2.2、java配置类
2.3、生产者与消费者操作配置
2.3.1、生产者操作配置
2.3.1、消费者操作配置
三、总结
需求描述:原SpringBoot工程已经配置了一个RabbitMQ,现需求是再配置一个RabbitMQ,实现效果是不同RabbitMQ推送到不同的队列中,且互不干扰影响使用。
一、单个RabbitMQ配置
1.1、导入Maven坐标
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.4.4</version></dependency>
1.2、yaml配置
rabbitmq:host: xx.xxx.xxx.xxxport: xxxxusername: xxxxpassword: xxxxxxvirtual-host: xxxxpublisher-returns: truepublisher-confirms: truelistener:simple:default-requeue-rejected: trueretry:enabled: falsemax-attempts: 3initial-interval: 5000
1.3、java配置类
1.3.1、交换机配置
package com.ruoyi.report.config;import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class ExchangeConfig {public static final String ecoa_exchange = "ecoaExchange";/*** 1.定义direct exchange* 2.durable="true" rabbitmq重启的时候不需要创建新的交换机* 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列*/@Beanpublic DirectExchange ecoaExchange() {DirectExchange directExchange = new DirectExchange(ecoa_exchange, true, false);return directExchange;}}
1.3.2、队列配置
package com.ruoyi.report.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** @ClassName QueueConfig* @Description* @Author Mr.Huang* @Date 2023/9/22 16:26* @Version 1.0**/
@Component
public class QueueConfig {private static final String ecoa_file_upload_queue = "ecoa_file_upload_queue";@Beanpublic Queue ecoaFileUploadDispatchQueue() {/**durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列auto-delete 表示消息队列没有在使用时将被自动删除 默认是falseexclusive 表示该消息队列是否只在当前connection生效,默认是false*/return new Queue(ecoa_file_upload_queue, true, false, false);}
}
1.3.3、绑定配置
package com.ruoyi.report.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** @ClassName BindingConfig* @Description* @Author Mr.Huang* @Date 2023/9/22 16:31* @Version 1.0**/
@Component
public class BindingConfig {@Autowiredprivate QueueConfig queueConfig;@Autowiredprivate ExchangeConfig exchangeConfig;public static final String ECOA_file_upload_key = "ecoa_file_upload_key";@Beanpublic Binding ecoaFileUploadDispatchBinding() {return BindingBuilder.bind(queueConfig.ecoaFileUploadDispatchQueue()).to(exchangeConfig.ecoaExchange()).with(ECOA_file_upload_key);}
}
1.3.4、连接配置
package com.ruoyi.report.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @ClassName RabbitMqConfig* @Description* @Author Mr.Huang* @Date 2023/9/22 16:14* @Version 1.0**/
@Configuration
public class RabbitMqConfig {/*** 连接工厂*/@Autowiredprivate ConnectionFactory connectionFactory;/*** 自定义rabbit template用于数据的接收和发送* 可以设置消息确认机制和回调** @return*/@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory);return template;}
}
1.4、生产者与消费者操作配置
1.4.1、生产者操作配置
package com.ruoyi.report.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.report.config.BindingConfig;
import com.ruoyi.report.config.ExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** @ClassName MessageUtils* @Description* @Author Mr.Huang* @Date 2023/9/22 16:36* @Version 1.0**/
@Component
public class MessageUtils {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息* 发送随货单信息* @param message 消息*/public void sendMessage(Object message) {String uuid = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuid);Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();rabbitTemplate.convertAndSend(ExchangeConfig.ecoa_exchange, BindingConfig.ECOA_file_upload_key, msg, correlationId);}
}
1.4.2、消费者操作配置
package com.ruoyi.report.consumer;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @ClassName PrintFeedbackConsumer* @Description* @Author Mr.Huang* @Date 2024/4/30 10:23* @Version 1.0**/
@Slf4j
@Component
public class PrintFeedbackConsumer {@Autowiredprivate PrintSendLogService printSendLogService;@RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")public void receiveMq(Message message, Channel channel) {try {String body = new String(message.getBody());log.info("接受【Print结果推送】RabbitMQ消息:"+body);JSONObject objJson = JSONObject.parseObject(body);Thread.sleep(1000);PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);printSendLogService.updatePrintSendLog(printResult);}catch (Exception e){log.error("",e);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException ex) {ex.printStackTrace();}}}
}
二、多个RabbitMQ配置
Maven坐标与上面单个RabbitMQ配置一致
2.1、yaml配置
rabbitmq:first:host: xx.xxx.xxx.xxxport: xxxxusername: xxxxpassword: xxxxxxvirtual-host: xxxxpublisher-returns: truepublisher-confirms: truelistener:simple:default-requeue-rejected: trueretry:enabled: falsemax-attempts: 3initial-interval: 5000 second:host: xx.xxx.xxx.xxxport: xxxxusername: xxxxpassword: xxxxxxpublisher-returns: truepublisher-confirms: truevirtual-host: xxxxlistener:simple:default-requeue-rejected: trueretry:enabled: falsemax-attempts: 3initial-interval: 5000
2.2、java配置类
package com.ruoyi.report.config;import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/*** @ClassName RabbitMqConfig* @Description* @Author Mr.Huang* @Date 2023/9/22 16:14* @Version 1.0**/
@Configuration
public class RabbitMqConfig {// 第一个MQ电子药检队列与keypublic static final String ECOA_file_upload_queue = "ecoa_file_upload_queue";public static final String ECOA_file_upload_key = "ecoa_file_upload_key";// 第二个MQ单据打印平台队列与keypublic static final String print_tms_dispatch_info_queue = "print_tms_dispatch_info_queue";public static final String print_4pl_dispatch_info_feedback_queue = "print_4pl_dispatch_info_feedback_queue";public static final String print_tms_dispatch_info_key = "print_tms_dispatch_info_key";public static final String print_4pl_dispatch_info_feedback_key = "print_4pl_dispatch_info_feedback_key";/** 交换机名称 */public static final String EXCHANGE = "ecoaExchange";public static final String EXCHANGE2 = "tms_exchange";/** 第一个rabbitMq队列 */@Bean(name = "ECOAConnectionFactory")@Primarypublic ConnectionFactory ECOAConnectionFactory(@Value("${spring.rabbitmq.first.host}") String host,@Value("${spring.rabbitmq.first.port}") int port,@Value("${spring.rabbitmq.first.username}") String username,@Value("${spring.rabbitmq.first.password}") String password,@Value("${spring.rabbitmq.first.virtual-host}") String virtualHost) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}/** 第二个rabbitMq队列 */@Bean(name = "printConnectionFactory")public ConnectionFactory printConnectionFactory(@Value("${spring.rabbitmq.second.host}") String host,@Value("${spring.rabbitmq.second.port}") int port,@Value("${spring.rabbitmq.second.username}") String username,@Value("${spring.rabbitmq.second.password}") String password,@Value("${spring.rabbitmq.second.virtual-host}") String virtualHost) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}/** 第一个rabbitMq操作模板 */@Bean(name="ECOARabbitTemplate")@Primarypublic RabbitTemplate fplRabbitTemplate(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory){RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);return firstRabbitTemplate;}/** 第二个rabbitMq操作模板 */@Bean(name="printRabbitTemplate")public RabbitTemplate tcscRabbitTemplate(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory){RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);return secondRabbitTemplate;}/** 第一个rabbitMq连接工厂 */@Bean(name="ECOAContainerFactory")public SimpleRabbitListenerContainerFactory ECOAContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setMaxConcurrentConsumers(5);factory.setConcurrentConsumers(1);factory.setPrefetchCount(1);configurer.configure(factory, connectionFactory);return factory;}/** 第二个rabbitMq连接工厂 */@Bean(name="printContainerFactory")public SimpleRabbitListenerContainerFactory printContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setMaxConcurrentConsumers(5);factory.setConcurrentConsumers(1);factory.setPrefetchCount(1);configurer.configure(factory, connectionFactory);return factory;}/** 第一个mq绑定队列绑定交换机 */@Beanpublic String runECOAQueue(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {System.out.println("configuration ECOAQueue ........................");Connection connection = connectionFactory.createConnection();Channel channel = connection.createChannel(false);try {channel.exchangeDeclare(EXCHANGE, "direct", true, false, null);// 单据推送电子药检队列channel.queueDeclare(ECOA_file_upload_queue, true, false, false, null);channel.queueBind(ECOA_file_upload_queue, EXCHANGE, ECOA_file_upload_key);} catch (Exception e) {e.printStackTrace();} finally {return "ECOAQueue";}}/** 第二个mq绑定队列绑定交换机 */@Beanpublic String runPrintQueue(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {System.out.println("configuration printQueue ........................");Connection connection = connectionFactory.createConnection();Channel channel = connection.createChannel(false);try {channel.exchangeDeclare(EXCHANGE2, "direct", true, false, null);// 单据推送单据打印平台队列channel.queueDeclare(print_tms_dispatch_info_queue, true, false, false, null);channel.queueBind(print_tms_dispatch_info_queue, EXCHANGE2, print_tms_dispatch_info_key);// 单据打印平台反馈队列channel.queueDeclare(print_4pl_dispatch_info_feedback_queue,true,false,false,null);channel.queueBind(print_4pl_dispatch_info_feedback_queue,EXCHANGE2,print_4pl_dispatch_info_feedback_key);} catch (Exception e) {e.printStackTrace();} finally {return "printQueue";}}
}
注意:需将原MQ:交换机、队列、绑定配置类注释掉,只留这一个配置文件即可,这个配置文件已经将对应的:交换机、队列绑定好,只是需要注意队列名字、交换机不要绑定错了
2.3、生产者与消费者操作配置
2.3.1、生产者操作配置
package com.ruoyi.report.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.report.config.RabbitMqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.UUID;/*** @ClassName MessageUtils* @Description* @Author Mr.Huang* @Date 2023/9/22 16:36* @Version 1.0**/
@Component
public class MessageUtils {@Resource(name = "ECOARabbitTemplate")private RabbitTemplate ECOARabbitTemplate;@Resource(name = "printRabbitTemplate")private RabbitTemplate printRabbitTemplate;/*** 向ECOA发送消息* 发送随货单信息* @param message 消息*/public void sendMessage(Object message) {String uuid = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuid);Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();ECOARabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ECOA_file_upload_key, msg, correlationId);}/*** 向print发送消息* 发送派车单信息* @param message 消息*/public void sendPrintMessage(Object message) {String uuid = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuid);Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();printRabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE2, RabbitMqConfig.print_tms_dispatch_info_key, msg, correlationId);}
}
2.3.1、消费者操作配置
package com.ruoyi.report.consumer;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @ClassName PrintFeedbackConsumer* @Description* @Author Mr.Huang* @Date 2024/4/30 10:23* @Version 1.0**/
@Slf4j
@Component
public class PrintFeedbackConsumer {@Autowiredprivate PrintSendLogService printSendLogService;@RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")public void receiveMq(Message message, Channel channel) {try {String body = new String(message.getBody());log.info("接受【Print结果推送】RabbitMQ消息:"+body);JSONObject objJson = JSONObject.parseObject(body);Thread.sleep(1000);PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);printSendLogService.updatePrintSendLog(printResult);}catch (Exception e){log.error("",e);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException ex) {ex.printStackTrace();}}}
}
与单个RabbitMQ消费者操作一致,只是注意要消费的队列和连接工厂不要搞错了
三、总结
配置单个RabbitMQ时不需要关心底层的连接工厂是如何配置的,当把yaml内容填好它会自动配置连接工厂,只需要把交换机、队列、配置绑定起来即可。 当需要配置多个mq时才需要自己手动配置连接工厂,并不是只能配置两个RabbitMQ,可以按这个格式配置更多个。唯一注意的是不要把这些队列和交换机搞混了即可。