RocketMQ 5.0 POP 消费模式探秘

简介: POP Consumer—使客户端无状态,更轻量!

作者:凯易&耘田

前言:随着 RocketMQ 5.0 preview 的发布,5.0 的重大特性逐步与大家见面。POP Consumer 作为 5.0 的一大特性,POP 消费模式展现了一种全新的消费模式。其具备的轻量级,无状态,无队列独占等特点,对于消息积压场景,Streaming 消费场景等都非常友好。在介绍 POP Consumer 之前,我们先回顾一下目前使用较多的 Push Consumer。

Push Consumer

熟悉 RocketMQ 的同学对 Push Consumer 肯定不会陌生,客户端消费一般都会使用这种消费模式,使用这种消费模式也比较简单。我们只需简单设置,并在回调方法 ConsumeMessage 中写好业务逻辑即可,启动客户端应用就可以正常消费消息了。

public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("test_topic", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

那么 Push Consumer 是如何消费消息的呢?

1.png

当然,Consumer 收到消息的前提是 Producer 先发消息发到 Topic 当中。Producer 使用轮询的方式分别向每个 Queue 中发送消息,一般消费端都不止一个,客户端启动的时候会在 Topic,Consumer group 维度发生负载均衡,为每个客户端分配需要处理的 Queue。负载均衡过程中每个客户端都获取到全部的的 ConsumerID 和所有 Queue 并进行排序,每个客户端使用相同负责均衡算法,例如平均分配的算法,这样每个客户端都会计算出自己需要消费那些 Queue,每当 Consumer 增加或减少就会触发负载均衡,所以我们可以通过 RocketMQ 负载均衡机制实现动态扩容,提升客户端收发消息能力。

这里有个小问题:可以一直增加客户端的数量提升消费能力吗?当然不可以,因为 Queue 数量有限,客户端数量一旦达到 Queue 数量,再扩容新节点无法提升消费能力,因为会有节点分配不到 Queue 而无法消费。

客户端负责均衡为客户端分配好 Queue 后,客户端会不断向 Broker 拉取消息,在客户端进行消费。不是 Push 客户端吗?怎么会是客户端向 Broker 拉消息,不应该是 Broker 推消息到客户端吗?这是一个很有意思的点,因为 RocketMQ 无论是 Push Consumer,还是 Pull Consumer,还是后面要介绍的 POP Consumer,都是客户端拉的方式消费消息。Push Consumer 只是通过客户端 API 层面的封装让我们感觉是 Broker 推送的。

经过客户端负载均衡以及拉消息,客户端就可以正常消费消息了。

2.png

完整的的Push Consumer处理逻辑可以看下上面这张图,我们可以看到Push Consumer完整处理流程。

首先客户端 Rebalance 确定哪些 Consumer 客户端处理哪些 Queue,然后通过 PullMessageService 服务拉取消息,拉取到消息以后 ConsumeMessageConcurrentlyService 提交消费请求到消息消费线程池,然后调用回调方法 ConsumeMessage,到这里就可以拿到消息处理业务了,最后消费成功更新本地 offset 并上报 offset 到 Broker。如果消费失败(抛异常,超时等),客户端会发送 sendBack 告诉 Broker 哪些消息消费失败了,Broker会将消费失败的消息发送到延时队列,延时后再放到retry Topic,客户端消费retry Topic完成消息重投。这样做的好处是不会因为部分消费失败的消息而影响正常消息的消费。想了解细节的同学可以到 github 下载源码对照这张图看一下实际的代码处理流程。

3.png

通过前面 Push Consumer 的介绍,我们对 Push Consumer 原理有了一定的认识。我们可以发现,RocketMQ 的客户端做了很多事情,负载均衡,拉消息,消费位点管理,消费失败后的 sendBack 等等。这对多语言支持无疑是不友好的。参与过多语言开发的同学应该会感同身受,将这么多的逻辑移植到不同的语言,肯定不是一件简单的事情。同时客户端的升级运维也会增加难度。

所以我们思考可不可为客户端瘦身,把一部分逻辑从客户端移到 Broker?当然是可以的,前面介绍 Push Consumer 客户端负责均衡的时候,我们可以发现,负载均衡需要的信息,所有ConsumerId,原本就是客户端从 Broker 获取的,所有 Queue 信息,Broker 也可以通过 nameServer 拿到,负责均衡算法在客户端还是 Broker 端调用也没有什么大的差异,所以把 Rebalance 移植到 Broker 是一个不错选择,Broker 负载均衡可以跟客户端负责均衡达到基本相同的效果,客户端逻辑会减少,多语言实现更加简单,后续升级运维也会更加可控。除此以外因为 Broker 相对客户端具有全局信息,还可以做一些更有意思的事情。例如在负责均衡的时候根据 Queue 的积压情况做负载均衡,将一些压力比较大的客户端上的 Queue 分配给其它客户端处理等等。

POP Consumer

通过前面 Push Consumer 的介绍,我们了解到 Push Consumer 的一些特点。

  • 队列独占:Broker 上的每个队列只能分配到相同 Consumer group 的一台 Push Consumer 机器上。 
  • 消费后更新 offset:每次 Pull 请求拉取批量消息到本地队列缓存,本地消费成功才会 commit offset。

以上特点可能会带来一些问题,比如客户端异常机器 hang,导致分配队列消息堆积,无法消费。

RocketMQ 的 Push Consumer 消费对于机器异常 hang 时并不十分友好。如果遇到客户端机器 hang 住,处于半死不活的状态,与 Broker 的心跳没有断掉的时候,客户端 Rebalance 依然会分配消费队列到 hang 机器上,并且 hang 机器消费速度很慢甚至无法消费的时候,会导致消费堆积。另外类似还有服务端 Broker 发布时,也会由于客户端多次 Rebalance 导致消费延迟影响等无法避免的问题。如下图所示:

4.png

当 Push Consumer 2 机器发生 hang 的时候,它所分配到的 Broker 上的 Q2 出现严重的堆积。我们目前处理这种问题,一般可能是找到这台机器重启,或者下线。保证业务不受异常机器影响,但是如果队列挤压到一定程度可能机器恢复了也没办法快速追赶消费进度,这也是受 Push Consumer 的能力限制。

我们总结下 Push Consumer 存在的一些痛点问题:

  • 富客户端,客户端逻辑比较重,多语言支持不友好;
  • 客户端或者 Broker 升级发布,重启等 Rebalance 可能导致消费挤压;
  • 队列占位,单队列与单 Consumer 绑定,单个 Queue 消费能力无法横向扩展;
  • 机器 hang,会导致挤压。

基于上述问题,RocketMQ 5.0 实现了全新的消费模型-POP Consumer。

POP Consumer 能够解决上述稳定性和解除队列占位的扩展能力。

我们下面来简单看一下 POP Consumer 是如何消费消息的:

5.png

POP Client 从 Broker 的队列中发出 POP 请求消息,Broker 返回消息 message。在消息的系统属性里面有一个比较重要的属性叫做 POP_CK,POP_CK 为一条消息的 handler,通过一个 handler 就可以定位到一条消息。当消息消费成功之后,POP client 发送 ackMessage 并传递 handler 向 broker 确认消息消费成功。

6.png

对于消息的重试,当 POP 出一条消息之后,这条消息就会进入一个不可见的时间,在这段时间就不会再被 POP 出来。如果没有在这段不可见时间通过 ackMessage 确认消息消费成功,那么过了不可见时间之后,这条消息就会再一次的可见。

另外,对于消息的重试,我们的重试策略是一个梯度的延迟时间,重试的间隔时间是一个逐步递增的。所以,还有一个 changeInvisibleTime 可以修改消息的不可见时间。

7.png

从图上可以看见,本来消息会在中间这个时间点再一次的可见的,但是我们在可见之前提前使用 changeInvisibleTime延长了不可见时间,让这条消息的可见时间推迟了。当用户业务代码返回 reconsumeLater 或者抛异常的时候,我们就可以通过 changeInvisibleTime 按照重试次数来修改下一次的可见时间了。另外如果消费 RT 超过了 30 秒(默认值,可以修改),则 Broker 也会把消息放到重试队列。

除此以外,POP 消费的位点是由 Broker 保存和控制,而且 POP 消费是可以多个 Client 消费同一个队列,如下图所示:

8.png

三个客户端并不需要 Rebalance 去分配 Queue,取而代之的是,它们都会使用 POP 请求所有的 Broker 获取消息进行消费。即使 POP Consumer 2 出现 hang,其内部消息也会让 POP Consumer1 和 POP Consumer3 进行消费。这样就解决了 hang 机器可能造成的消费堆积问题。

从整体流程可见,POP 消费可以避免 Rebalance 带来的消费延时,同时客户端可以消费 Broker 的所有队列,这样就可以避免机器 hang 而导致堆积的问题。

同时扩展能力提升,POP Consumer 可以消费同一 Topic 下所有 Queue,相比 Push Consumer 解除了每个 Queue 必须 Rebalance 到一台客户端消费的限制,Push Consuner 客户端数量最多只能等于 Queue 的数量。POP Consumer 可以突破这个限制,多个 POP Consumer 可以消费同一个 Queue。

Broker 实现

POP Consumer 在 Broker 端是如何实现的呢?

9.png

POP Consumer 拉取消息后,会在 Queue 维度上加锁,保证同一时刻只有一个客户端可以拉去到同一个 Queue 的消息。获取到消息后,会保存 checkPoint 信息在 Broker,checkPoint 信息主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId 等信息。checkPoint 信息会优先保存到 buffer 当中,等待 ack 消息,在一段时间内收到客户端回复的 ack 消息,对应的 checkPoint 信息从 buffer 中移除,并且更新消费进度,标识消息消费成功。

10.png

当 checkPoint 消息在 buffer 中等待一段时间,一直未等到 ack 消息时,checkPoint 信息会清理出 buffer 并发送 ck msg 到 store,ck msg 首先被发送到延时队列 SCHEDULE_Topic_XXXX 中,延时完成以后会进入 REVIVE_LOG Topic,REVIVE_LOG Topic 是保存在 store 当中待处理的 ck msg 和 ack msg 的 Topic,POPReceiveService 拉取 REVIVE_LOG Topic 的消息放到一个 map 当中,如果 ck 有对应的 ack 则会更新 REVIVE_LOG 的消费位点,标识消息消费完成,超时未被确认的 ck msg,会查询到 ck msg 对应的真实的消息,并把这个消息放到 retry Topic 当中,等待客户端消费,POP Consumer 正常消费的时候会概率性的消费到 retry Topic 中的消息。我们从这块设计中可以看到 RocketMQ 的常用设计,通过一些内部的 Topic 实现业务逻辑,事务消息,定时消息都用了这种设计方式。

我们简单终结一下 POP Consumer 的优势:

  • 无状态,offset 信息 Broker 维护,客户端与 Queue 无绑定。
  • 轻量级,客户端只需要收发消息,确认消息。
  • 无队列占位,Queue 不再与客户端绑定。
  • 多语言友好,方便多语言移植。
  • 升级更可控,逻辑都收敛到 Broker,升级更加方便可控。

POP&Push 融合

既然 POP 有这么多优势,我们能否使用 POP 解决 Push 的一些问题呢?前面我们提到 Push Consumer 当一个队列因为 Consumer 问题已经堆积很多的时候,受限于单个 Consumer 的消费能力,也无法快速的追赶消费进度,延迟会很高。核心问题是单队列单 Consumer 的限制,导致消费能力无法横向扩展。

我们希望通过 POPAPI 的形式,当一个队列堆积太多的情况下,可以切换到 POP 模式,有机会让多个 Consumer 来一起消费该队列,追赶进度,我们在 5.0 的实现中也实现了这一点。

POP/Push 模式切换方式

可以通过两种方式进行切换。

1、命令行

mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8

2、代码切换

    public static final String CONSUMER_GROUP = "CID_JODIE_1";public static final String TOPIC = "TopicTest";// Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8private static void switchPop() throws Exception {DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();mqAdminExt.start();ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());for (String brokerAddr : brokerAddrs) {mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);}}

通过下面 POP Consumer Demo,我们看到 POP Consumer 跟 Push API 基本是统一,使用也比较简单,相比 Push API 只是多了一步消费模式切换。

11.png

Push & POP Retry 队列差异

在使用 POP 消费模式时我们只需要在 Push API 的基础上切换模式即可,对于 Broker 来说还是需要做一些处理的。主要需要处理的地方是 retry 队列。

Push 和 POP 模式对 retry 队列处理不一样

  • Push 的 retry 处理
  • 服务端有一个 %RETRY%ConsumerGroup 队列
  • 客户端会有拉取任务拉取这个队列的消息。
  • POP 的 retry 处理
  • 服务端针对每个Topic,都有一个名为 %RETRY%ConsumerGroup_Topic 的 retry 队列
  • 客户端没有专门针对 retry 队列的拉任务,每次普通 POP 请求都有一定概率消费相应的 retry 队列

模式切换之后,老模式的 retry 里的消息还需要继续处理,否则就丢消息了。

Push & POP 切换

Push 切换到 POP

  • 正常队列切换到 POP 模式
  • 正常队列的 POP 请求会处理对应的 POP retry 队列
  • 针对 Push retry 队列,我们保留原来 Push retry 队列的拉取任务,并且是工作在 Push 模式。

POP 切换到 Push

  • 正常队列切换到 Push 模式
  • Push retry 队列自然有相应的拉取任务
  • 之前 POP 的 retry 队列,我们在客户端自动创建拉取任务,以Push 模式去拉取。注意这里的拉取任务只拉取 POP 的 retry 队列。

总结下来就是,对于 retry 队列,我们会特殊处理不参与模式切换。

总结

最后我们总结下 POP Consumer。POP 作为一种全新的消费模式,解决了 Push 模式的一些痛点,使客户端无状态,更加轻量,消费逻辑也基本都收敛到了 Broker,对多语言的支持十分的友好。在 API 层面也与 Push 完成了融合,继承了 Push API 的简单易用,同时实现了 Push,POP 之间的自由切换。

原文链接
本文为阿里云原创内容,未经允许不得转载。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ESSD技术解读-01】 云原生时代,阿里云块存储 ESSD 快照服务如何被企业级数据保护所集成?

简介&#xff1a; 本文描述了阿里云块存储快照服务基于高性能 ESSD 云盘提升快照服务性能&#xff0c;提供轻量、实时的用户体验及揭秘背后的技术原理。依据行业发展及云上数据保护场景&#xff0c;为企业用户及备份厂商提供基于快照高级特性的数据保护的技术方案&#xff0c;满…

一把王者的时间,我就学会了Nginx

作者 | 步尔斯特来源 | CSDN博客Nginx 简介Nginx("engine x")是一个高性能的 HTTP 和反向代理服务器,特点是占有内存少&#xff0c;并发能力强&#xff0c;事实上 nginx 的并发能力确实在同类型的网页服务器中表现较好&#xff0c;中国大陆使用 nginx 网站用户有&…

【ESSD技术解读-02】企业级利器,阿里云 NVMe 盘和共享存储

简介&#xff1a; 当前 NVMe 云盘结合了业界最先进的软硬件技术&#xff0c;在云存储市场&#xff0c;首创性同时实现了 NVMe 协议 共享访问 IO Fencing 技术。它在 ESSD 之上获得了高可靠、高可用、高性能&#xff0c;同时基于 NVMe 协议实现了丰富的企业特性&#xff0c;如…

php数组json函数,php数组转json的函数是什么

php数组转json的函数是json_encode()。json_encode()函数可以对变量进行JSON编码&#xff0c;将其转换为json字符串数据&#xff0c;语法格式“json_encode (value)”。本教程操作环境&#xff1a;windows7系统、PHP7.1版&#xff0c;DELL G3电脑php数组如何转为json&#xff1…

使用友盟+的APM服务实现对移动端APP的性能监控

简介&#xff1a; 对于信息系统服务&#xff0c;一般我们的重点监控对象都是核心的后端服务&#xff0c;通常会采用一些主流的APM(Application Performance Management)框架进行监控、告警、分析。那么对于移动端的APP、小程序的运行时状态如何进行实时监控与分析呢&#xff1f…

首届“中国物联网数据基础设施最佳案例评选”结果出炉

供稿 | 映云科技 出品 | CSDN云计算 随着物联网技术的成熟与普及&#xff0c;如今的世界早已进入万物互联的时代&#xff0c;全球年活跃连接的物联网设备已达数百亿规模 &#xff08;IoT Analytics, 2021&#xff09;。海量物联设备产生的数据&#xff0c;需要通过统一汇聚和…

Serverless 工程实践 | 快速搭建 Kubeless 平台

简介&#xff1a; Kubeless 是基于 Kubernetes 的原生无服务器框架。其允许用户部署少量的代码&#xff08;函数&#xff09;&#xff0c;而无须担心底层架构。 快速搭建 Kubeless 平台 Kubeless 简介 Kubeless 是基于 Kubernetes 的原生无服务器框架。其允许用户部署少量的…

并发编程实践之公平有界阻塞队列实现

简介&#xff1a; JUC 工具包是 JAVA 并发编程的利器。本文讲述在没有 JUC 工具包帮助下&#xff0c;借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。希望你也能在文后体会到并发编程的复杂之处&#xff0c;以及 JUC 工具包的强。 作者 | 李新然 来源 | 阿里技术公…

iOS App 启动优化

简介&#xff1a; 作为程序猿来说&#xff0c;“性能优化”是我们都很熟悉的词&#xff0c;也是我们需要不断努⼒以及持续进⾏的事情&#xff1b;其实优化是⼀个很⼤的课题&#xff0c;因为细分来说的话有⼤⼤⼩⼩⼗⼏种优化⽅向 &#xff0c;但是切忌在实际开发过程中不能盲⽬…

apache1.3 php编译,安装Apache1.3.29 - Linux+Apache+Mysql+PHP典型配置详解_Linux教程_Linux公社-Linux系统门户网站...

2.安装Apache1.3.29。我没有选择安装Apache2.0是我对他还是不放心&#xff0c;因为网上最新公布的apache的漏洞基本上是针对2.0&#xff0c;当然大家可以自己选择安装相应的版本。我这里讲的都是采用DSO动态编译的方法编译Apache.至于有关apache的编译方法&#xff0c;可以参考…

前后端、多语言、跨云部署,全链路追踪到底有多难?

简介&#xff1a; 完整的全链路追踪可以为业务带来三大核心价值&#xff1a;端到端问题诊断&#xff0c;系统间依赖梳理&#xff0c;自定义标记透传。 作者 | 涯海 全链路追踪的价值 链路追踪的价值在于“关联”&#xff0c;终端用户、后端应用、云端组件&#xff08;数据库…

供应商太多,怎么才能高效比价?

本篇文章暨 CSDN《中国 101 计划》系列数字化转型场景之一。 《中国 101 计划——探索企业数字化发展新生态》为 CSDN 联合《新程序员》、GitCode.net 开源代码仓共同策划推出的系列活动&#xff0c;寻访一百零一个数字化转型场景&#xff0c;聚合呈现并开通评选通道&#xff0…

7张图揭晓RocketMQ存储设计的精髓

简介&#xff1a; RocketMQ 作为一款基于磁盘存储的中间件&#xff0c;具有无限积压能力&#xff0c;并提供高吞吐、低延迟的服务能力&#xff0c;其最核心的部分必然是它优雅的存储设计。 存储概述 RocketMQ 存储的文件主要包括 Commitlog 文件、ConsumeQueue 文件、Index 文…

庖丁解InnoDB之UNDO LOG

简介&#xff1a; Undo Log是InnoDB十分重要的组成部分&#xff0c;它的作用横贯InnoDB中两个最主要的部分&#xff0c;并发控制&#xff08;Concurrency Control&#xff09;和故障恢复&#xff08;Crash Recovery&#xff09;&#xff0c;InnoDB中Undo Log的实现亦日志亦数据…

Ampere Altra Max 对比测试数据公布,性能能效双领先

在云计算领域&#xff0c;发展创新的脚步永不停歇。十多年前&#xff0c;伴随着虚拟化及高速网络的发展和成熟&#xff0c;云计算应运而生。在将工作负载迁移到云端的过程中&#xff0c;为了更好地适应云环境&#xff0c;软件架构得以重建&#xff0c;就如同搬进新家时&#xf…

钉钉宜搭入选Forrester《中国低代码平台市场分析报告》

简介&#xff1a; &#x1f389; 最新&#xff1a;钉钉宜搭入选Forrester《中国低代码平台市场分析报告》&#xff01; 11月12日&#xff0c;全球知名研究机构Forrester发布《中国低代码平台市场分析报告&#xff08;The State Of Low-Code Platforms In China&#xff09;》&…

被自己的行为蠢哭了,意识到原因后真香!

作者 | 零一来源 | 前端印象这两天在学习 node 相关的知识时&#xff0c;做出了一些错误的行为~在做用户登录相关业务时涉及到了 cookie、session 的存取&#xff0c;一搜就找到了 express-session 这个中间件&#xff0c;真香&#xff01;配几个配置就可以自动生成 cookie、se…

一种命令行解析的新思路(Go 语言描述)

简介&#xff1a; 本文通过打破大家对命令行的固有印象&#xff0c;对命令行的概念解构后重新梳理&#xff0c;开发出一种功能强大但使用极为简单的命令行解析方法。这种方法支持任意多的子命令&#xff0c;支持可选和必选参数&#xff0c;对可选参数可提供默认值&#xff0c;支…

云原生 DevOps,模型化应用交付能力很重要

简介&#xff1a; DevOps 文化及其支撑其落地实践的自动化工具与平台能力在云原生架构渐为普及的背后&#xff0c;发挥了关键的价值。 撰稿&#xff1a;溪洋 云原生正在成为企业业务创新和解决规模化挑战的加速器。 云原生带来的变革绝不限于基础设施和应用架构等技术层面&a…

如何在 Kubernetes Pod 内进行网络抓包

作者 | Addo Zhang来源 | 云原生指北使用 Kubernetes 时&#xff0c;经常会遇到一些棘手的网络问题需要对 Pod 内的流量进行抓包分析。然而所使用的镜像一般不会带有 tcpdump 命令&#xff0c;过去常用的做法简单直接暴力&#xff1a;登录到节点所在节点&#xff0c;使用 root …