文章目录
- 前言
- 初识MQ
- SpringAMQP如何首发消息?
- 消费者
- 交换机
- Fanout:广播
- Direct交换机
- Topic交换机
- 声明队列和交换机
- 总结
前言
微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?
同步通信和异步通信(如下图):
● 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。
● 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。
两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。
所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。
初识MQ
整体架构及核心概念
● publisher:消息发送者
● consumer:消息的消费者
● queue:队列,存储消息
● exchange:交换机,负责路由消息
● virtual-host:虚拟注解,起到数据隔离的作用
SpringAMQP如何首发消息?
1、引入spring-boot-starter-amqp依赖
2、配置rabbitmq服务端信息
3、利用RabbitTemplate发送消息
4、利用@RabbitListener注解声明要监听的队列,监听消息
消费者
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一次消费者。并没有考虑到消费者是否已经处理完消息,可能出现消息堆积情况。需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息
spring:rabbitmq:host: localhostport: 5672virtual-host: /username: adminpassword: adminlistener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成之后才能获取下一个消息
Work模型的使用:
● 多个消费者绑定到一个队列,可以加快消息处理速度
● 同一条消息只会被一个消费者处理
● 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
交换机
真正生产环境都会经过exchange:来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
● Fanout:广播
● Direct:定向
● Topic:话题
Fanout:广播
Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
void testSendFanout() {String exchangeName = "hmall.fanout";String msg = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, null, msg);}
hmall类型的交换机上绑定了2个队列
交换机作用:
● 接收oublisher发送的消息
● 将消息按照规则路由到与之绑定的队列
● FanoutExchangel的会将消息路由到每个绑定的队列
Direct交换机
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
● 每一个Queue都与Exchange设置一个BindingKey
● 发布者发送消息时,指定消息的RoutingKey
● Exchange:将消息路由到BindingKey与消息RoutingKey一致的队列
Topic交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以“.”分割。
Queue与Exchange指定BindingKeyl时可以使用通配符:
● #代表0个或多个单词
● *代表一个单词
声明队列和交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
● ·Queue:用于声明队列,可以用工厂类QueueBuilder构建
● Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
● Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
队列和交换机是在消费者方进行声明
@Configuration
public class FanoutConfiguration {// 声明交换机@Beanpublic FanoutExchange fanoutExchange(){// ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("hmall.fanout2");}// 声明队列@Beanpublic Queue fanoutQueue3(){// QueueBuilder.durable("ff").build();return new Queue("fanout.queue3");}@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}@Beanpublic Queue fanoutQueue4(){return new Queue("fanout.queue4");}@Beanpublic Binding fanoutBinding4(){return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}
}
SpringAMQP:还提供了基于@RabbitListener注解来声明队列和交换机的方式
总结
在RabbitMQ的基础部分,我们了解了如何利用消息队列来实现应用程序之间的可靠通信。通过RabbitMQ,我们可以轻松地实现生产者-消费者模式,确保消息的可靠传递,并实现解耦和可扩展性。
总的来说,RabbitMQ具有以下几个关键特点和优势:
消息队列模型:RabbitMQ采用了先进的AMQP协议,提供了灵活且可靠的消息队列模型,支持多种消息传递模式。
可靠性:RabbitMQ具有高度的可靠性,能够确保消息的传递和处理,即使在生产者或消费者发生故障的情况下也能够保证消息不丢失。
灵活性:RabbitMQ支持多种消息传递模式,包括点对点、发布/订阅、路由和主题等,可以根据具体需求选择最合适的模式。
解耦和可扩展性:通过引入消息队列,可以将系统中的各个组件解耦,从而提高系统的可维护性和可扩展性,同时也能够减少系统之间的依赖性。
性能优化:RabbitMQ提供了丰富的性能优化选项,包括消息持久化、消息优先级、消息确认和流控制等,可以根据具体场景进行灵活配置。
在使用RabbitMQ时,需要注意以下几点:
消息持久化:确保重要的消息被持久化到磁盘,以防止消息丢失。
错误处理:处理消息传递过程中可能出现的错误,包括消息投递失败、消费者处理失败等情况。
监控和管理:定期监控RabbitMQ的性能指标,及时发现并解决潜在的问题,保证系统的稳定运行。
安全性:采取必要的安全措施,保护RabbitMQ服务器免受未经授权的访问和攻击。
综上所述,RabbitMQ作为一种可靠、灵活和高性能的消息队列系统,为构建分布式应用程序提供了强大的支持。通过合理的设计和配置,可以充分发挥其优势,提高系统的可靠性、可维护性和可扩展性,从而实现更加稳定和高效的应用程序。