1 架构变化
RocketMQ 5.0 架构上的变化主要是为了更好的走向云原生
RocketMQ 4.x 架构如下:
Broker 向 Name Server 注册 Topic 路由信息,Producer 和 Consumer 则从 Name Server 获取路由信息,然后 Producer 根据路由信息向 Broker 发送消息,Consumer 则根据路由信息从 Broker 拉取消息。
这个架构存在以下几个问题:
- 富客户端,客户端同时支持 Java、C++、Go 等各种语言,如果为了跟应用程序隔离,把客户端部署到 sidecar 中,这个 sidecar 会很大,部署难度高;
2.Broker 同时承担计算和存储的功能,不利于云原生环境下的资源解耦。
RocketMQ 5.0 架构如下图:
RocketMQ 5.0 在架构上主要做了两个优化:1. 通过引入无状态的代理模块,将 Broker 原来的协议适配、权限管理、消息管理等计算功能抽离到了代理模块中,Broker 则专注于存储功能。这样在云环境下可以更好地进行资源调度;
2.RocketMQ 5.0 基于 gRPC 支持多语言 SDK,各语言 SDK API 在本地语言层面对齐,API 非常轻量级,更容易被使用和集成。
2 集成事件、流处理
RocketMQ 5.0 采用事件驱动架构来支持消息流式处理和轻计算,可以实现消息的就近计算和分析。
RocketMQ 5.0 增加了 RocketMQ-EventBridge 组件,这个组件兼容标准 CloudEvents 协议标准,既可以链接社区活跃的生态,又可以跟各大云厂商的产品进行集成,对云原生的支持非常友好。下面这张图来自官网:
2.1 流式处理
为了更好地支持流失处理,RocketMQ 5.0 在原有 MessageQueue 的基础上抽象出了逻辑队列。一个逻辑队列可以包含多个物理队列,以此拼接出流式队列。如下图:
这样可以更加轻量级,做到秒级的扩缩容,即使物理节点发生变化也不需要复制迁移数据,数据存储的调度也更加灵活。
2.2 计算框架
在计算框架方面,RocketMQ 5.0 主要有两个变化:
-
引入流式处理框架 RSteams,这样 RocketMQ 自身就可以完成轻量级的理和计算;
-
引入轻量级 SQL 查询引擎 RSQLDB,RSQLDB 可以兼容了 Flink/Blink SQL 标准,实现了 RocketMQ 和 Flink/Blink 的融合。比如对于轻量级的计算,可以使用 SQL 在 RocketMQ 完成,而对于重量级的计算,RocketMQ 资源受限时,可以从 RocketMQ 迁移到 Flink 处理。
3 高可用
在 RocketMQ 5.0 之前,高可用架构有两个阶段:
1.RocketMQ 4.5 之前采用 Master-Slave 部署,这种架构 Master 发生故障后是不能自动切换的,对集群的影响会比较大;
2.RocketMQ 4.5 之后采用基于 raft 协议的 DLedger 算法来进行主从切换,架构如下图:
3.1 Master-Slave 架构优化
RocketMQ 5.0 对 Master-Slave 架构和基于 Raft 的架构都做了优化。
对于 Master-Slave 架构的升级,RocketMQ 5.0 引入了 BrokerContainer 的概念,一个 BrokerContainer 中可以部署多个 Broker,这些 Broker 拥有独立的端口,功能完全独立,可以通过 admin 来增加或减少 Broker。如下图:
这样一个 BrokerContainer 中的多个 Broker 可以共享同一个节点的资源,提高资源利用率。
同时,在一个 BrokerContainer 中可以同时部署 Broker 的 Master 和 Slave 节点,这样就可以通过 Master/Slave 交叉部署来实现节点对等,如下图两节点对等部署:
即使 Node1 挂了,Node2 节点中的 Broker1 可以提供读功能,并不会丢消息,Broker2 可以提供读写功能。
再看下面三个节点的对等部署架构图:
每个 Node 的 BrokerContainer 中都有 1 个 Master 跟 2 个 Slave 节点,如果其中一个 Node 挂了,其他两个 Node 上的 Broker 可以继续提供读写服务。
3.2 Raft 架构优化
基于 Raft 架构虽然可以实现主节点故障后自动切换,但也存在几个问题:
-
消息日志副本数必须是 3 个以上,这个是 Raft 协议自动选主的要求,造成资源浪费;
-
Raft 选主过程中必须有多数节点同意才能选主成功,副本数越多时间开销会越大,这会加大 ACK 延时;
-
CommitLog 主从同步需要使用 DLedger 库,也就是说 CommitLog 被看作是 Raft log 进行复制,这样 RocketMQ 原生的零拷贝、堆外内存的优势无法保留了。
RocketMQ 5.0 专门增加了轻量级的 DLedgerControlller 选主组件,将选主的切换能力上移,这个组件是可拔插的,既可以部署在 NameServer 中,也可以部署在本地。如下图:
引入了 DLedgerControlller 组件后,消息主备复制还是采用 RocketMQ 原生的基于 Master-Slave 架构的复制能力,复制效率高。
4. 定时/延时消息
5.0 支持毫秒级任意时间点定时任务
参考
https://blog.csdn.net/agonie201218/article/details/133610576
//定时/延时消息发送MessageBuilder messageBuilder = new MessageBuilderImpl();;//以下示例表示:延迟时间为10分钟之后的Unix时间戳。Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag").setDeliveryTimestamp(deliverTimeStamp)//消息体.setBody("messageBody".getBytes()).build();try {//发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);System.out.println(sendReceipt.getMessageId());} catch (ClientException e) {e.printStackTrace();}//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。MessageListener messageListener = new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {System.out.println(messageView.getDeliveryTimestamp());//根据消费结果返回状态。return ConsumeResult.SUCCESS;}};//消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。List<MessageView> messageViewList = null;try {messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView -> {System.out.println(messageView);//消费处理完成后,需要主动调用ACK提交消费结果。try {simpleConsumer.ack(messageView);} catch (ClientException e) {e.printStackTrace();}});} catch (ClientException e) {//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。e.printStackTrace();}
5 消息(Message)
消息类型
- 定义:当前消息的类型。
- 取值:从客户端SDK接口获取。Apache RocketMQ 支持的消息类型如下:
- Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。
- FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
- Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。
- Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。
6 流控机制
消息流控基本概念
消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。
触发条件
Apache RocketMQ 的消息流控触发条件如下:
- 存储压力大:参考消费进度管理的原理机制,消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
- 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。
7.无状态代理模
https://blog.csdn.net/alisystemsoftware/article/details/127248969
RocketMQ 5.0 文档地址 https://rocketmq.apache.org/zh/docs/