SpringBoot整合RabbitMq企业级使用封装
- 1、RabbitMq基础说明
- 2、SpringBoot整合RabbitMq,以及RabbitMq的封装和高级用法
- 2.1、pom.xml
- 2.2、application.yml
- 2.3、Mq配置类MessageQueueConfiguration
- 2.3.1、代码
- 2.3.2、设置replyTimeout
- 2.3.3、publisher-confirm-type和mandatory
- 2.4、自定义发送消息帮助类MessageQueueHelper
- 2.4、消费者
- 2.5、两个pojo
- 2.6、Controller
- 3、SpringBoot整合RabbitMq,用SimpleMessageListenerContainer更复杂业务的封装
- 4、死信队列和延迟队列
1、RabbitMq基础说明
本文主要是企业级的SpringBoot整合RabbitMq,妥妥的符合任务使用MQ的业务配置。
下面实际讲的就是高级部分,rabbitmq的初级部分没说,也就分为以下几个点,你理解即可:
1、消息应答,就是下面的ACK机制,分为自动应答和手动应答。
2、发布确认,就是下面ConfirmCallback,ReturnCallback这两个回调函数的执行场景。
3、交换机:分为3中Fanout,Direct ,Topic 。随便百度理解下概念,一般最常用的就是Topic ,因为Topic 扩展性强,下面案例中也就是使用的Topic 。
如果下面案例你真正理解了,那么你的MQ可以适用于任何复杂业务的封装场景了。
2、SpringBoot整合RabbitMq,以及RabbitMq的封装和高级用法
说明:下面这个例子,是把邮件和短信放到MQ里,然后消费端去消费。
列子中列举了MQ的所有的高级特性,具体看下代码注释,很详细。
2.1、pom.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.7.5</version></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--整合rabbitMq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.5</version></dependency></dependencies>
2.2、application.yml
server:port: 8080spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: rooturl: jdbc:mysql://127.0.0.1:3306/rightcloud?useUnicode=true&autoReconnect=true&failOverReadOnly=false&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=CONVERT_TO_NULLrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestrequested-heartbeat: 60spublisher-confirm-type: correlatedpublisher-returns: true
2.3、Mq配置类MessageQueueConfiguration
2.3.1、代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
public class MessageQueueConfiguration {@Value("${spring.rabbitmq.template.reply-timeout:1800000}")private Integer replyTimeout;/*** 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),* 如果想自定义来区分开 需要改变bean 的名称* 配置的其他的bean也都遵循这个规则配置* @param connectionFactory* @return*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);/*** 单位:毫秒* 同步消息方法convertSendAndReceive(),发送端等待接收消费端给出return msg的时间*/template.setReplyTimeout(replyTimeout);template.setMessageConverter(new Jackson2JsonMessageConverter());initMessageSendConfirm(template);return template;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置手动ACKfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}@Bean(name = "connectionFactory")@Primarypublic ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.host}") String host,@Value("${spring.rabbitmq.port}") int port,@Value("${spring.rabbitmq.username}") String username,@Value("${spring.rabbitmq.password}") String password) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setRequestedHeartBeat(60);/*** CORRELATED:异步回调,消息发送到交换机时会回调这个ConfirmCallback* SIMPLE:则不会出发ConfirmCallback*/connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}private void initMessageSendConfirm(RabbitTemplate rabbitTemplate) {/*** ConfirmCallback为发送Exchange(交换器)时回调,成功或者失败都会触发;*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息发送到exchange成功");} else {log.error("消息发送到exchange失败,原因: {}, CorrelationData: {}", cause,correlationData);}});/*** Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发ReturnCallback* 为false时,匹配不到会直接被丢弃*//*** Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发ReturnCallback* 为false时,匹配不到会直接被丢弃** spring.rabbitmq.template.mandatory属性的优先级高于spring.rabbitmq.publisher-returns的优先级* 一般不设置publisher-returns* spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true.* spring.rabbitmq.template.mandatory结果为true、false时会忽略掉spring.rabbitmq.publisher-returns属性的值* spring.rabbitmq.template.mandatory结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns确定*/rabbitTemplate.setMandatory(true);/*** ReturnCallback为路由不到队列时触发,成功则不触发;*/rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", message,replyCode, replyText,exchange, routingKey);});}
}
2.3.2、设置replyTimeout
单位:毫秒,同步消息,方法convertSendAndReceive(),发送端等待接收消费端给出return msg的时间。
convertSendAndReceive方法是mq的同步方法,调用该方法会阻塞主方法,直到消费端消费完才继续往下走。replyTimeout设置的是消费端执行的最大时间,如果超过设置的时间还没执行完,则会报错。
2.3.3、publisher-confirm-type和mandatory
这两个配置代码注释很详细,可以看注释理解,直接拿来用即可。
2.4、自定义发送消息帮助类MessageQueueHelper
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;@Slf4j
@Component
public class MessageQueueHelper {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate RabbitAdmin rabbitAdmin;@PostConstructpublic void init() {rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());}/*** 发送异步消息,根据参数动态创建交换机、队列,和业务更解耦** @param exchangeName* @param queueName* @param sendMessage*/public void sendMessage(String exchangeName, String queueName, String routingKey, Object sendMessage) {try {TopicExchange exchange = new TopicExchange(exchangeName);rabbitAdmin.declareExchange(exchange);Queue queue = new Queue(queueName);rabbitAdmin.declareQueue(queue);String simpleName = sendMessage.getClass().getSimpleName();/*** *(星号)可以代替一个单词* #(井号)可以替代零个或多个单词*/rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(simpleName.toLowerCase() + ".#"));rabbitTemplate.convertAndSend(exchangeName, routingKey, sendMessage, message -> {/*** 指定消费结果返回的队列*/message.getMessageProperties().setReplyTo("result-stu");message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());return message;});} catch (Exception e) {e.printStackTrace();}}/*** 发送同步消息** @param exchangeName* @param queueName* @param sendMessage*/public void sendMessageAndReceive(String exchangeName, String queueName, Object sendMessage) {try {TopicExchange exchange = new TopicExchange(exchangeName);rabbitAdmin.declareExchange(exchange);Queue queue = new Queue(queueName);rabbitAdmin.declareQueue(queue);/*** *(星号)可以代替一个单词* #(井号)可以替代零个或多个单词*/String routingKey = "vm.#";rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));Object o = rabbitTemplate.convertSendAndReceive(exchangeName, "vm.fff", sendMessage);System.out.println(o);} catch (Exception e) {e.printStackTrace();}}
}
2.4、消费者
import cn.yx.zg.pojo.Mail;
import cn.yx.zg.pojo.Sms;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class TestConsumer {@RabbitListener(queues = "mail.send")@RabbitHandlerpublic String testConsumer(Mail mail, Channel channel, Message message) throws Exception {log.info("消费消息:{}", mail.toString());/*** ACK,用的最多的一种* deliveryTag:该消息的index* false:表示不是批量*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);/*** Nack:手动拒绝* deliveryTag:该消息的index* false:表示不是批量* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);/*** Reject:手动拒绝,和Nack相比少一个参数* deliveryTag:该消息的index* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);return "消费-result";}@RabbitListener(queues = "sms.send")@RabbitHandlerpublic String testConsumer2(Sms sms, Channel channel, Message message) throws Exception {log.info("消费消息:{}", sms.toString());/*** ACK,用的最多的一种* deliveryTag:该消息的index* false:表示不是批量*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);/*** Nack:手动拒绝* deliveryTag:该消息的index* false:表示不是批量* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);/*** Reject:手动拒绝,和Nack相比少一个参数* deliveryTag:该消息的index* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);return "消费-result";}
2.5、两个pojo
import lombok.Data;
import lombok.ToString;import java.io.Serializable;/*** 邮件*/
@Data
@ToString
public class Mail implements Serializable {private String mailId;private String content;public Mail() {}public Mail(String mailId, String content) {this.mailId = mailId;this.content = content;}
}
import lombok.Data;
import lombok.ToString;import java.io.Serializable;
@Data
@ToString
public class Sms implements Serializable {private String smsId;private String content;public Sms() {}public Sms(String smsId, String content) {this.smsId = smsId;this.content = content;}}
2.6、Controller
@Resourceprivate MessageQueueHelper messageQueueHelper;@RequestMapping("send")public void sendMsage() {Mail mail = new Mail("1","我是邮件");messageQueueHelper.sendMessage("message_ex", "mail.send", "mail", mail);Sms sms = new Sms("1","我是短信");messageQueueHelper.sendMessage("message_ex", "sms.send", "sms", sms);}
访问地址:http://localhost:8080/send 测试消息
3、SpringBoot整合RabbitMq,用SimpleMessageListenerContainer更复杂业务的封装
下面封装的这个属于比较复杂的业务,很多公司也是用不到的,有兴趣的可以了解一下。
先说下上面的缺点,其实也不算缺点,比如想让同一个消费者消费多个队列的数据,这样我们就得写多个@RabbitListener(queues = “mail.send”), 这里有个比较高级的写法,就是通过SimpleMessageListenerContainer动态的设置队列的消费类。
还是以上面代码为基础, 先把上面的消费者给注释,我们重新写个消费者。
新建类:TestConsumerListener.java
import cn.yx.zg.pojo.Mail;
import cn.yx.zg.pojo.Sms;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class TestConsumerListener {/*** 注意方法名称,一定要是handleMessage* @param mail* @return* @throws Exception*/public String handleMessage(Mail mail) throws Exception {log.info("消费消息listener:{}", mail.toString());/*** ACK,用的最多的一种* deliveryTag:该消息的index* false:表示不是批量*/
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);/*** Nack:手动拒绝* deliveryTag:该消息的index* false:表示不是批量* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);/*** Reject:手动拒绝,和Nack相比少一个参数* deliveryTag:该消息的index* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);return "消费-result";}public String handleMessage(Sms sms) throws Exception {log.info("消费消息listener:{}", sms.toString());/*** ACK,用的最多的一种* deliveryTag:该消息的index* false:表示不是批量*/
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);/*** Nack:手动拒绝* deliveryTag:该消息的index* false:表示不是批量* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);/*** Reject:手动拒绝,和Nack相比少一个参数* deliveryTag:该消息的index* false:被拒绝的是否重新入队列,一般默认false,因为第一次被拒绝后,后面多次肯定也被拒绝*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);return "消费-result";}
}
修改类:MessageQueueHelper.java
import cn.yx.zg.consumer.TestConsumerListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;@Slf4j
@Component
public class MessageQueueHelper {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate RabbitAdmin rabbitAdmin;@Resourceprivate CachingConnectionFactory cachingConnectionFactory;@Resourceprivate TestConsumerListener testConsumerListener;@PostConstructpublic void init() {rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());startListenerForConsumer(testConsumerListener);}/*** 发送异步消息,根据参数动态创建交换机、队列,和业务更解耦** @param exchangeName* @param queueName* @param sendMessage*/public void sendMessage(String exchangeName, String queueName, String routingKey, Object sendMessage) {try {TopicExchange exchange = new TopicExchange(exchangeName);rabbitAdmin.declareExchange(exchange);Queue queue = new Queue(queueName);rabbitAdmin.declareQueue(queue);String simpleName = sendMessage.getClass().getSimpleName();/*** *(星号)可以代替一个单词* #(井号)可以替代零个或多个单词*/rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(simpleName.toLowerCase() + ".#"));rabbitTemplate.convertAndSend(exchangeName, routingKey, sendMessage, message -> {/*** 指定消费结果返回的队列*/message.getMessageProperties().setReplyTo("result-stu");message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());return message;});} catch (Exception e) {e.printStackTrace();}}/*** 发送同步消息** @param exchangeName* @param queueName* @param sendMessage*/public void sendMessageAndReceive(String exchangeName, String queueName, Object sendMessage) {try {TopicExchange exchange = new TopicExchange(exchangeName);rabbitAdmin.declareExchange(exchange);Queue queue = new Queue(queueName);rabbitAdmin.declareQueue(queue);/*** *(星号)可以代替一个单词* #(井号)可以替代零个或多个单词*/String routingKey = "vm.#";rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));Object o = rabbitTemplate.convertSendAndReceive(exchangeName, "vm.fff", sendMessage);System.out.println(o);} catch (Exception e) {e.printStackTrace();}}public void startListenerForConsumer(Object listener) {SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(cachingConnectionFactory);MessageListenerAdapter adapter = new MessageListenerAdapter(listener,new Jackson2JsonMessageConverter());simpleMessageListenerContainer.setMessageListener(adapter);//针对哪些队列(参数为可变参数)simpleMessageListenerContainer.setQueueNames("mail.send","sms.send");//同时有多少个消费者线程在消费这个队列,相当于线程池的线程数字。simpleMessageListenerContainer.setConcurrentConsumers(6);//最大的消费者线程数simpleMessageListenerContainer.setMaxConcurrentConsumers(6);/*** 这种设置监听对象的方式,需要重新设置ACK方式,* 不过这里我们设置了手动ACK和MessageListener,并不会触发消费者,就先不设置了,很多业务也不用手动ACK。* 队列消费的结果,还是回放到我们发送消息时设置的返回队列*/
// //手动确认(单条确认)
// simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
// log.info("消费端接收到的消息:[{}]", message);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// });//消费端限流simpleMessageListenerContainer.setPrefetchCount(1);simpleMessageListenerContainer.start();}}
访问地址:http://localhost:8080/send 测试消息
4、死信队列和延迟队列
很简单,这里不举列子了