文章目录
- 1 认识MQ
- 1.1 同步和异步通讯
- 1.1.1 同步通讯
- 1.1.2 异步通讯
- 1.2 技术对比
- 2 RabbitMQ入门
- 2.1 RabbitMQ单机部署
- 2.2 RabbitMQ基本结构
- 2.3 RabbitMQ队列模型
- 2.3.1 简单队列模型(Simple Queue Model)
- 2.3.2 工作队列模型(Work Queue Model)
- 2.3.3 发布/订阅模型(Publish/Subscribe Model)
- 2.3.3.1 广播模型(Fanout)
- 2.3.3.2 定向模型(Direct)
- 2.3.3.3 通配符模型(Topic)
- 2.4 使用案例
- 2.4.1 Publisher实现
- 2.4.2 Consumer实现
1 认识MQ
1.1 同步和异步通讯
微服务间的通讯有同步和异步两种方式:
- 同步通讯:就像打电话,需要实时响应;
- 异步通讯:就像发邮件,不需要马上回复。
两种方式各有优劣,打电话可以立即得到响应,但是却不能跟多个人同时通话;发邮件可以同时与多个人收发邮件,但是往往响应会有延迟。
1.1.1 同步通讯
微服务之间采用Feign调用就属于同步方式,调用可以实时得到结果,但存在下面的问题:
- 耦合度高:每次加入新的需求,都要修改原来的代码;
- 性能下降:调用者需要等待服务提供者的响应,如果调用链过长则响应时间等于各个调用时间之和;
- 资源浪费:调用者在等待响应时,不能释放请求所占用的资源,高并发场景下会极度浪费系统资源;
- 级联失败:如果服务提供者出现问题,则调用者也会出现问题,甚至导致集群故障。
1.1.2 异步通讯
异步调用则可以避免上述问题。以购买商品为例,用户支付成功后需要调用订单服务完成订单状态修改,调用物流服务从仓库减库存并发货。
在该案例中,支付服务是事件发布者(Publisher),在支付成功后只需要发布一个支付成功的事件(Event),事件中带上订单id。而订单服务和物流服务是事件订阅者(Consumer),监听支付成功的事件,监听到支付成功事件后完成自己业务即可。
如上图所示,为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,而不关心谁来订阅事件;订阅者从Broker订阅事件,也不关心是谁发来的消息。
Broker就像数据总线一样,所有的服务要接收数据和发送数据都在这个总线上。这个总线就像协议一样,让服务间的通讯变得标准和可控。
异步调用的优点:
- 吞吐量提升:无需等待订阅者处理完成,响应更快速;
- 故障隔离:服务没有直接调用,不存在级联失败问题;
- 资源释放:调用间没有阻塞,不会造成无效的资源占用;
- 耦合度极低:每个服务都可以灵活插拔,可替换;
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件。
异步调用的缺点:
- 架构复杂:业务没有明显的流程线,不好管理;
- 依赖性强:需要依赖于Broker的可靠、安全、性能。
现在开源软件或云平台上Broker的软件是非常成熟的,比较常见的一种就是MQ技术。
1.2 技术对比
MQ(MessageQueue),即消息队列,也就是存放消息的队列,一般充当事件驱动架构中的Broker。
比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP, SMTP,STOMP | OpenWire,STOMP, REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
根据以上表格,选择哪种MQ的策略可以参考如下:
- 追求可用性:Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性:RabbitMQ、RocketMQ
- 追求吞吐能力:RocketMQ、Kafka
- 追求消息低延迟:RabbitMQ、Kafka
2 RabbitMQ入门
2.1 RabbitMQ单机部署
在Centos7虚拟机中使用Docker来单机部署RabbitMQ:
- 1)在线拉取镜像
docker pull rabbitmq:3-management
- 2)创建并运行RabbitMQ容器
docker run \-e RABBITMQ_DEFAULT_USER=rabbitmq \-e RABBITMQ_DEFAULT_PASS=123321 \--name rmq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
- 3)在浏览器访问
http://192.168.146.129:15672
:
输入账号密码:
至此,单机RabbitMQ部署完毕。
2.2 RabbitMQ基本结构
如上图所示,RabbitMQ中包含如下一些角色:
- Publisher:生产者
- Consumer:消费者
- Exchange:交换机,负责消息路由
- Queue:队列,存储消息
- VirtualHost:虚拟主机,不同租户的exchange、queue、消息的隔离
2.3 RabbitMQ队列模型
2.3.1 简单队列模型(Simple Queue Model)
简单队列模型是最基础的RabbitMQ模型。它包括单个生产者和单个消费者。生产者将消息发送到一个队列中,然后消费者从这个队列中读取消息并处理。
这种模式实现简单,易于理解和部署;但不支持并发消费,不支持多个消费者共同消费一个队列,因为一旦消息被一个消费者接收,它就会从队列中删除。因此,这种模式适用于单生产者和单消费者之间的点对点通信。
2.3.2 工作队列模型(Work Queue Model)
工作队列模型允许多个消费者协同地从一个队列中接收、处理和分发消息。在这种模型中,消息在不同的消费者之间进行负载均衡,被平均分配给不同的消费者。同时,当一个消费者正在处理一个消息时,它不能接收新的消息。
这种模型支持多个消费者同时处理同一个队列中的消息,实现了并发消费,具有更高的消息吞吐量。
2.3.3 发布/订阅模型(Publish/Subscribe Model)
2.3.3.1 广播模型(Fanout)
广播模型允许一个生产者向多个消费者广播一条消息。在这种模型中,生产者将消息发送到一个交换机(图中的X)中,然后这个交换机将消息路由到所有与之绑定的队列。每个队列对应一个消费者,可以独立地处理这个队列中的消息。
这种模型适用于将一条消息发送给多个消费者的场景,例如事件通知或新闻发布。
2.3.3.2 定向模型(Direct)
定向模型允许生产者根据路由将消息发送到指定的队列中。在这种模型中,生产者将消息发送到一个交换机中,交换机会将消息路由到与它所绑定的、且路由完全匹配的队列中。每个队列对应一个消费者,可以独立地处理这个队列中的消息。
这种模型适用于根据某些特定属性或条件将消息路由到指定队列的场景,例如日志记录或按优先级处理任务。
2.3.3.3 通配符模型(Topic)
通配符模型是定向模型的扩展实现。在这种模型中,允许使用通配符匹配来匹配路由。生产者将消息发送到一个交换机中,交换机会将消息路由到与它所绑定的、且与通配符匹配的路由所指向的队列中。每个队列对应一个消费者,可以独立地处理这个队列中的消息。
这种模型适用于根据消息内容将消息路由到不同队列的场景,例如按标签或关键字分发和处理不同的任务。
2.4 使用案例
下面基于简单队列模型实现一个使用案例,该案例中包含三个角色,分别是:
- Publisher:消息发布者,将消息发送到队列Queue
- Queue:消息队列,负责接受并缓存消息
- Consumer:订阅队列,处理队列中的消息
创建一个maven项目,引入RabbitMQ依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.4.1 Publisher实现
@Test
public void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.146.129");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("rabbitmq");factory.setPassword("123321");Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();
}
执行以上单元测试,控制台打印结果如下:
此时在RabbotMQ管理页面可以看到这条消息:
2.4.2 Consumer实现
@Test
public void testGetMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.146.129");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("rabbitmq");factory.setPassword("123321");Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");
}
执行以上单元测试,控制台打印结果如下:
…
本节完,更多内容请查阅分类专栏:微服务学习笔记
感兴趣的读者还可以查阅我的另外几个专栏:
- SpringBoot源码解读与原理分析
- MyBatis3源码深度解析
- Redis从入门到精通
- MyBatisPlus详解
- SpringCloud学习笔记