初识MQ
1.1.同步和异步通讯
微服务间通讯有同步和异步两种⽅式:
同步通讯:就像打电话,需要实时响应。
异步通讯:就像发邮件,不需要⻢上回复。
两种⽅式各有优劣,打电话可以⽴即得到响应,但是你却不能跟多个⼈同时通话。发送邮件可以同
时与多个⼈收发邮件,但是往往响应会有延迟。
1.1.1.同步通讯
Feign调⽤就属于同步⽅式,虽然调⽤可以实时得到结果,但存在下⾯的问题
总结:
同步调⽤的优点:
- 时效性较强,可以⽴即得到结果
同步调⽤的问题:
- 耦合度⾼
- 性能和吞吐能⼒下降
- 有额外的资源消耗
- 有级联失败问题(由于⼀个故障导致了连锁反应,使得系统中的其他组件或节点也相继失败)
1.1.2.异步通讯
异步调⽤则可以避免上述问题:
我们以购买商品为例,⽤户⽀付后需要调⽤订单服务完成订单状态修改,调⽤物流服务,从仓库分配响应的库存并准备发货。
在事件模式中,⽀付服务是事件发布者(publisher),在⽀付完成后只需要发布⼀个⽀付成功的事件 (event),事件中带上订单id。
订单服务和物流服务是事件订阅者(Consumer),订阅⽀付成功的事件,监听到事件后完成⾃⼰业务即可。
为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,⽽是有⼀个中间⼈(Broker)。发布者发布事件到Broker,不关⼼谁来订阅事件。订阅者从Broker订阅事件,不关⼼谁发来的消息
Broker 是⼀个像数据总线⼀样的东⻄,所有的服务要接收数据和发送数据都发到这个总线上,这
个总线就像协议⼀样,让服务间的通讯变得标准和可控。
好处:
吞吐量提升:⽆需等待订阅者处理完成,响应更快速
故障隔离:服务没有直接调⽤,不存在级联失败问题
调⽤间没有阻塞,不会造成⽆效的资源占⽤
耦合度极低,每个服务都可以灵活插拔,可替换
流量削峰:不管发布事件的流量波动多⼤,都由Broker接收,订阅者可以按照⾃⼰的速度去处理
事件
缺点:
架构复杂了,业务没有明显的流程线,不好管理
需要依赖于Broker的可靠、安全、性能
好在现在开源软件或云平台上 Broker 的软件是⾮常成熟的,⽐较常⻅的⼀种就是我们今天要学习
的MQ技术。
1.2.技术对⽐
MQ,中⽂是消息队列(MessageQueue),字⾯来看就是存放消息的队列。也就是事件驱动架构
中的Broker。
⼏种常⻅MQ的对⽐:
追求可⽤性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能⼒:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
不同的消息队列系统在不同场景下有各⾃的优势和适⽤性。以下是各个消息队列系统在不同场合下的最佳选择:
Kafka:
最佳场合:⼤规模数据处理、实时⽇志收集和分析、流式处理。
优势:⾼吞吐量、低延迟、⽔平扩展能⼒强、⻓期消息存储,适合构建⼤规模的实时数据流处理平台,如实时⽇志收集和分析、事件流处理等。
RabbitMQ:
最佳场合:传统的企业级应⽤、轻量级的消息传递场景。
优势:简单易⽤、⽀持多种消息协议、适合点对点和发布/订阅模式,对于传统的企业应⽤和中⼩规模的消息传递需求,是⼀种可靠的选择
ActiveMQ:
最佳场合:中⼩规模的企业应⽤、Java⽣态系统中的集成需求。
优势:Java开发环境友好、⽀持多种消息协议,适合与Java⽣态系统的其他组件集成,如Spring框架等。
RocketMQ:
最佳场合:⼤规模的分布式系统、互联⽹应⽤、⾦融领域的消息处理。
优势:⾼吞吐量、低延迟、丰富的消息存储模式,适⽤于处理⼤规模的消息传递场景,特别是在互联⽹
和⾦融领域。
综合考虑以上因素,可以做如下简单总结:
如果需要处理⼤规模的实时数据流、⽇志收集和分析等⾼吞吐量场景,⾸选Kafka。
如果对于消息传递的简单性和易⽤性有较⾼要求,适合中⼩规模的企业应⽤和轻量级消息传递需求,可以选择RabbitMQ或ActiveMQ。
如果在⼤规模的分布式系统、互联⽹应⽤或⾦融领域需要处理消息传递,RocketMQ是⼀个较好的选择。
2.RocketMQ简介
官⽹: http://rocketmq.apache.org/
RocketMQ是阿⾥巴巴2016年MQ中间件,使⽤Java语⾔开发,RocketMQ 是⼀款开源的分布式消息系 统,基于⾼可⽤分布式集群技术,提供低延时的、⾼可靠的消息发布与订阅服务。同时,⼴泛应⽤于多个领域,包括异步通信解耦、企业解决⽅案、⾦融⽀付、电信、电⼦商务、快递物流、⼴告营销、社交、即时通信、移动应⽤、⼿游、视频、物联⽹、⻋联⽹等。
RocketMQ的设计⽬标是⽀持⼤规模消息处理,具有⾼并发、⾼可⽤和容错能⼒。它在多个⽅⾯提供了强⼤的功能和特性:
分布式架构:RocketMQ采⽤分布式架构,⽀持在多个节点之间进⾏消息的发送和接收,实现了⽔平扩展能⼒。
⾼吞吐量:RocketMQ可以在⼤规模并发场景下实现⾼吞吐量的消息处理,适⽤于⾼并发的业务场景。
低延迟:RocketMQ具有较低的消息传递延迟,适⽤于需要实时性的应⽤场景。
消息可靠性:RocketMQ提供了多种消息存储模式,可以确保消息的可靠传递,包括同步刷盘和异步刷盘等⽅式。
消息顺序性:RocketMQ⽀持消息的顺序传递,可以确保同⼀消息队列中的消息按照发送顺序被消费。
⽀持多种消息模式:RocketMQ⽀持发布/订阅模式和点对点模式,可以根据业务需求选择合适的消息模式。
灵活的部署⽅式:RocketMQ⽀持多种部署⽅式,可以在单机上运⾏,也可以搭建集群部署。
丰富的监控和管理⼯具:RocketMQ提供了丰富的监控和管理⼯具,⽅便管理员对消息队列进⾏监控和管理。
核⼼概念
Producer:消息的发送者,⽣产者;举例:发件⼈。
Consumer:消息接收者,消费者;举例:收件⼈。
Broker:消息队列的中间服务器,负责存储消息并将消息传递给消费者;举例:快递。
NameServer:可以理解为是⼀个注册中⼼,主要是⽤来保存topic路由信息,管理Broker。在 NameServer的集群中,NameServer与NameServer之间是没有任何通信的;举例:各个快递公司的管理机构相当于broker的注册中⼼,保留了broker的信息。
Queue:队列,消息存放的位置,⼀个Broker中可以有多个队列。
Topic:消息的逻辑分类,⽣产者发送消息到指定的Topic,消费者从指定的Topic订阅消息。⼀个Topic可以有多个Producer和多个Consumer。
ProducerGroup:⽣产者组 。
ConsumerGroup:消费者组,多个消费者组可以同时消费⼀个主题的消息。
⼯作流程
Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册⾃⼰的信息,这些信息包括⾃⼰的ip和端⼝号,⾃⼰这台Broker有哪些topic等信息。
Producer在启动之后会跟会NameServer建⽴连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许⾃动创建topic来决定是否发送消息。
Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份
Consumer启动之后也会跟会NameServer建⽴连接,定期从NameServer中获取Broker和对应topic的信息,然后根据⾃⼰需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建⽴连接,获取消息,进⾏消费
3.RocketMQ安装
本⽂档所涉及的是单机版的RocketMQ安装教程,能够满⾜基本的学习使⽤,属于⼊⻔级的教程,
如果想要搭集群部署,可以参考其他资料,进⾏配置即可
进⼊[RocketMQ官⽹下载](下载 | RocketMQ (apache.org))
1、选择Binary 下载
2、将压缩包解压⾄⾃定路径
3、配置系统中的环境变量
4.启动RocketMQ
4.启动RocketMQ
若出现如上图所示的命令框,说明启动成功,保留窗⼝切勿关闭
继续启动broker
与上述同样的路径下呼出cmd,执⾏如下命令:
Start-Process "mqbroker.cmd" -ArgumentList "-n 127.0.0.1:9876","autoCreateTopicEnable=true"
5.配置可视化⻚⾯
下载可视化插件源码
github下载地址:https://github.com/apache/rocketmq-dashboard
复制下载链接后使⽤git下载
可⾃建⽂件夹,进⼊后使⽤git bash下载
git clone https://github.com/apache/rocketmq-dashboard.git
下载完成后,进⼊ application.yml 中查看配置
yarn-v1.22.10.tar.gz 下载超时了Downloading https://github.com/yarnpkg/yarn/releases/download/v1.22.10/yarn-v1.22.10.tar.gz toD:\Maven\mvn_resp\com\github\eirslett\yarn\1.22.10\yarn-1.22.10.tar.gz [INFO] No proxies configured[INFO] No proxy was configured, downloading directly这⾥直接去github拉去就⾏,存⼊你的maven仓库
在该⽬录下打开cmd,输⼊指令==(请保证已经运⾏NameServer和broker)==:java -jar rocketmq-dashboard-2.0.1-SNAPSHOT.jar
6.集成springboot
SpringBoot集成RocketMQ
⾸先,在pom.xml中添加RocketMQ的依赖,具体如下所示:
< dependency >< groupId > org . apache . rocketmq < /groupId>< artifactId > rocketmq - spring - boot - starter < /artifactId>< version > 2.1 . 1 < /version>< /dependency>
然后,在application.yml中添加RocketMQ的基本配置:
# RocketMqrocketmq:name - server : 127.0.0 . 1 : 9876producer:group: producer - groupconsumer:group: consumer - group
然后,在application.yml中添加RocketMQ的基本配置:
# RocketMqrocketmq:name - server : 127.0.0 . 1 : 9876producer:group: producer - groupconsumer:group: consumer - group
创建消息⽣产者
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private final String topic = "demo-topic";// 1.同步发送消息// 同步发送是指发送⽅发送⼀条消息后,会等待服务器返回确认信息后再进⾏后续操作。这种⽅式适⽤于需要可靠性保证的场景。public void sendSyncMessage(String message){rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());System.out.printf("同步发送结果: %s\n", message);}// 2.异步发送消息// 异步发送是指发送⽅发送消息后,不等待服务器返回确认信息,⽽是通过回调接⼝处理返回结果。这种⽅式适⽤于对响应时间要求较⾼的场景。public void sendAsyncMessage(String message){rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("异步发送成功: %s\n", sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.printf("异步发送失败: %s\n", throwable.getMessage());}});}// 3.单向发送消息// 单向发送是指发送⽅只负责发送消息,不关⼼服务器的响应。该⽅式适⽤于对可靠性要求不⾼的场景,如⽇志收集。public void sendOneWayMessage(String message){rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());System.out.println("单向消息发送成功");}
}
创建消息消费者
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "demo-topic", consumerGroup = "consumer-g
roup", messageModel = MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.printf("收到消息: %s\n", s);}
}