SpringBoot整合RabbitMQ学习笔记
以下三种类型的消息,生产者和消费者需各自启动一个服务,模拟生产者服务发送消息,消费者服务监听消息,分布式开发。
一 Fanout类型信息
- . RabbitMQ创建交换机和队列
在RabbitMQ控制台,新建交换机hmall.fanout,新建两个队列,fanout.queue1和fanout.queue2,并将连个队列和交换机进行绑定即可。
操作如下图所示:
一下操作可以通过代码实现,具体参考配置类
(1)创建队列
(2)创建交换机
(3)绑定
2. 代码实现
(1)引入依赖
<dependency><groupId>org.springframework.book</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置MQ配置信息
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.fanoutproducer:queue1: fanout.queue1
(3)声明队列和交换机配置类
@Component
public class FanoutConfg{@Value("${spring.rabbitmq.exchange}")private String exchange@Value("${spring.rabbitmq.producer.queue1}")private String queueName1// 声明fanout交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(exchanage);}// 声明队列@Beanpublic Queue fanoutQueue1(){return new Queue(queueName1);}//绑定队列和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder,build(fanoutQueue1).to(fanoutExchange);}
}
(4)生产者
@Component
public class RabbitMqProduce {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.queue})private String queueName;/*** 入参说明:* 第一个参数:queueName:队列名称* 第二个参数:路由键,fanout类型不需要路由键* 第三个参数:msg 消息题内容*/public void send(String msg){rabbitTemplete.covertAndSend(queueName,null,msg);}}
(4)消费者
@Component
public class RabbitMqListener {@RabbitListener(queues="${spring.rabbitmq.producer.queue}")public void counsume(String msg){System.out.pringln("消费者收到 fanout.queue队列发的消息",msg);}
}
(5)测试类
@SpringBootTest
public class SpringBootTest{@AUtowiredprivate RabbitMqProduce producer;@Testpublic void testSendFanoutMsg(){producer.send("fanout类型发送消息!!!");}
}
二 direct类型发送消息
- 控制台操作
(1)交换机和队列的创建参考fanout的操作
(2)绑定:与fanout不同的是 给交换机绑定队列的同时需要指定路由键,如下图所示:
- 代码实现
(1)依赖引入参考fanout类型的消息
(2)mq消息配置
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.directproducer:queue1: direct.queue1queue2: direct.queue2routingKey1: redroutingKey2: red2
(3)MQ配置类
以下配置可以在消费者注解上实现
@Component
public class RabbitMqConfig {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.exchange}")private String exchange;@value("${spring.rabbitmq.producer.queue1}")private String queueName1;@value("${spring.rabbitmq.producer.queue2}")private String queueName2;@value("${spring.rabbitmq.producer.routingKey1}")private String routingKey1;@value("${spring.rabbitmq.producer.routingKey2}")private String routingKey2;// 创建交换机@Bean("directExchange")public Exchange getExchange(){return ExchangeBuilder.topicExchange(exchange) // 交换机类型,交换机名称.durable(true) //ture为持久化,存到磁盘,false存到内存.build();}// 创建队列@Bean("directQueue1")public Queue getDirectQueue1(){retuen new Queue(queueName1);}// 交换机绑定队列@beanpublic Binging bindDirectQueue1(@Qualifier("directExchange") Exchange exchange,@Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey1).noargs(); }// 创建队列@Bean("directQueue2")public Queue getDirectQueue2(){retuen new Queue(queueName2);}// 交换机绑定队列@beanpublic Binging bindDirectQueue2(@Qualifier("directExchange") Exchange exchange,@Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey2).noargs(); }}
(4)生产者发送消息
@Component
public class RabbitMqProduce {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.queue1})private String queueName1;@value("${spring.rabbitmq.producer.queue2})private String queueName2;@value("${spring.rabbitmq.producer.routingKey2})private String routingKey1;@value("${spring.rabbitmq.producer.routingKey1})private String routingKey2;/*** 入参说明:* 第一个参数:queueName:队列名称* 第二个参数:路由键,fanout类型不需要路由键* 第三个参数:msg 消息题内容*/public void sendQueue1(String msg){rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);}public void sendQueue2(String msg){rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);}}
(5)消费者监听消息
第一种:已经编写了配置类
@Component
public class RabbitMqListener {@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")public void counsume(String msg){System.out.pringln("消费者收到 direct.queue1队列发的消息",msg);}@RabbitListener(queues="${spring.rabbitmq.producer.queue2}")public void counsume(String msg){System.out.pringln("消费者收到 direct.queue2队列发的消息",msg);}
}
第二种:在注解上配置交换机和队列以及路由键
@Component
public class RabbitMqListener {@RabbitListener(bindings = @QueueBinding(value = Queue(name="${spring.rabbitmq.producer.queue1}",durable="true"),exchange = @Exchange(name="${spring.rabbitmq.producer.exchange)",type=ExchangeType.DIRECT),key = {"${spring.rabbitmq.producer.routingKey1}","${spring.rabbitmq.producer.routingKey2}"} ))public void counsume(String msg){System.out.pringln("消费者收到 direct.queue1队列发的消息",msg);}@RabbitListener(bindings = @QueueBinding(value = Queue(name="${spring.rabbitmq.producer.queue2}",durable="true"),exchange = @Exchange(name="${spring.rabbitmq.producer.exchange)",type=ExchangeType.DIRECT),key = {"${spring.rabbitmq.producer.routingKey1}","${spring.rabbitmq.producer.routingKey2}"} ))public void counsume(String msg){System.out.pringln("消费者收到 direct.queue2队列发的消息",msg);}
}
(6)测试类
@SpringBootTest
public class SpringBootTest{@AUtowiredprivate RabbitMqProduce producer;@Testpublic void testSendDirectMsg(){producer.send("direct类型发送消息!!!");}
}
三 Topic类型消息
- 控制台操作
参考前面的创建交换机,队列,以及绑定关系操作 - 代码实现
(1)依赖引入参考fanout类型的消息
(2)mq消息配置
路由键使用通配符进行匹配,#代表多个,*代表一个
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.topicproducer:queue1: topic.queue1queue2: topic.queue2routingKey1: china.#routingKey2: #.news
(3)MQ配置类
@Component
public class RabbitMqConfig {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.exchange}")private String exchange;@value("${spring.rabbitmq.producer.queue1}")private String queueName1;@value("${spring.rabbitmq.producer.queue2}")private String queueName2;@value("${spring.rabbitmq.producer.routingKey1}")private String routingKey1;@value("${spring.rabbitmq.producer.routingKey2}")private String routingKey2;// 创建交换机@Bean("topicExchange")public Exchange getExchange(){return ExchangeBuilder.topicExchange(exchange) // 交换机类型,交换机名称.durable(true) //ture为持久化,存到磁盘,false存到内存.build();}// 创建队列@Bean("topicQueue1")public Queue getDirectQueue1(){retuen new Queue(queueName1);}// 交换机绑定队列@beanpublic Binging bindDirectQueue1(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey1).noargs(); }// 创建队列@Bean("topicQueue2")public Queue getDirectQueue2(){retuen new Queue(queueName2);}// 交换机绑定队列@beanpublic Binging bindDirectQueue2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey2).noargs(); }}
(4)生产者发送消息
@Component
public class RabbitMqProduce {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.queue1})private String queueName1;@value("${spring.rabbitmq.producer.queue2})private String queueName2;@value("${spring.rabbitmq.producer.routingKey2})private String routingKey1;@value("${spring.rabbitmq.producer.routingKey1})private String routingKey2;/*** 入参说明:* 第一个参数:queueName:队列名称* 第二个参数:路由键,fanout类型不需要路由键* 第三个参数:msg 消息题内容*/public void sendQueue1(String msg){rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);}public void sendQueue2(String msg){rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);}}
(5)消费者监听消息
@Component
public class RabbitMqListener {@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")public void counsume(String msg){System.out.pringln("消费者收到 topic.queue1队列发的消息",msg);}@RabbitListener(queues="${spring.rabbitmq.producer.queue2}")public void counsume(String msg){System.out.pringln("消费者收到 topic.queue2队列发的消息",msg);}
}
(6)测试类
@SpringBootTest
public class SpringBootTest{@AUtowiredprivate RabbitMqProduce producer;@Testpublic void testSendDirectMsg(){producer.send("direct类型发送消息!!!");}
}
四 消息转换器
MQ会把消息体变成字节码
解决办法:使用消息转换器,实现如下:
- 在生产者和消费者两个服务引入依赖
<dependency><groupId>com.fasterxml.jackson</groupId><artifactId>jasckson-databind</artifactId>
</dependency>
- 在生产者和消费者两个服务编写消息转换器配置
@Component
public class JacksonMessageConvertor{@Beanpublic MessageCoverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}
}
- 消息体
对于生产者来说,是map类型的,则生成者接收的时候也是map类型
例如:
@Component
public class RabbitMqListener {@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")public void counsume(Map<String,Objecct> msg){System.out.pringln("消费者收到 topic.queue1队列发的消息",msg);}}
五 案例演示
支付服务支付成功后通知交易服务进行后续操作
生产者和消费者两个服务都需要进行1,2,3步骤
- 添加依赖
<!--mq依赖-->
<dependency><groupId>org.springframework.book</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--消息转换器依赖-->
<dependency><groupId>com.fasterxml.jackson</groupId><artifactId>jasckson-databind</artifactId>
</dependency>
- 添加MQ配置信息
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: pay.topicqueue: mark.order.pay.queueroutKingKey: pay.success
- 消息转换器配置类
@Component
public class JacksonMessageConvertor{@Beanpublic MessageCoverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}
}
- 生产者
(1)生产者的配置
@Component
public class Rabbitroducer {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.queue})private String queueName;@value("${spring.rabbitmq.routingKey})private String routingKey;/*** 入参说明:* 第一个参数:queueName:队列名称* 第二个参数:路由键,fanout类型不需要路由键* 第三个参数:msg 消息题内容*/public void sendMsg(String msg){// 发送消息rabbitTemplete.covertAndSend(queueName,routingKey, msg);}
}
(2)业务代码支付成功发送消息
public class payOrderServiceImpl impletement PayOrderService{@Autowridprivate RabbitProducer payProducer;@Overirid@Transactional(rollback = Exception.class)public void payOrder(PayOrderDto payOrder){// 一些列操作最终交易成功// 发送消息通知try{payProducer.send(payOrder.getId());}catch(AmqpException e){log.error("交易成功,发送消息异常:{}",e.getMessages(););}}
}
- 消费者
@Component
public class PaySatusListener {@Autowiredprivate OrderService orderService;@RabbitListener(bindings = @QueueBinding(value = Queue(name="${spring.rabbitmq.queue}",durable="true"),exchange = @Exchange(name="${spring.rabbitmq.exchange)",type=ExchangeType.TOPIC),key = {"${spring.rabbitmq.routingKey}"} ))public void listenOrderPay(Long orderId){//标记订单为已支付orderService.markOrderPaySuccess(orderId);}}