引言:在当今的大数据和分布式系统中,消息队列扮演着至关重要的角色,它们作为系统之间通信和数据传输的媒介,为各种场景下的数据流动提供了可靠的基础设施支持。在消息队列的设计中,推拉模式是两种常见的消息传递机制,它们各自具有独特的优势和适用场景。本文将聚焦于两个著名的消息队列系统:RocketMQ 和 Kafka,并探讨它们在消息传递过程中是如何实现拉模式的。虽然两者都选择了拉模式,但它们的具体实现方式略有不同,从内部机制到性能优化,都反映了对不同应用场景的思考和针对性的改进。
题目
RocketMQ 和 Kafka 关于消息队列的推拉模式是怎么做的?
推荐解析
那到底是推还是拉?
推模式和拉模式各有优缺点,到底该如何选择呢?
RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。
我个人觉得拉模式更加的合适,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。
而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你保存好消息了,你要就来拿好了。
虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务消耗比较慢,但是 Broker 毕竟是一个中心点,能轻量就尽量轻量。
那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么? 怕,所以它们操作了一波,减轻了拉模式的缺点。
长轮询
RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式,我们就来看看它们是如何操作的。
为了简单化,下面我把消息不满足本次拉取的条数啊、总大小啊等等都统一描述成还没有消息,反正都是不满足条件。
RocketMQ 中的长轮询
RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已。
因为 RocketMQ 在被背后偷偷的帮我们去 Broker 请求数据了。
后台会有个 RebalanceService 线程,这个线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入阻塞队列 pullRequestQueue 中。然后又有个PullMessageService 线程不断的从阻塞队列 pullRequestQueue 中获取 pullRequest,然后通过网络请求 broker,这样实现的准实时拉取消息。
这一部分代码我不截了,就是这么个事儿,稍后会用图来展示。
然后 Broker 的 PullMessageProcessor 里面的 processRequest 方法是用来处理拉消息请求的,有消息就直接返回,如果没有消息怎么办呢?我们来看一下代码。
我们再来看下 suspendPullRequest 方法做了什么。
而 PullRequestHoldService 这个线程会每 5 秒从 pullRequestTable 取PullRequest请求,然后看看待拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了,则会调用 notifyMessageArriving ,最终调用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新尝试处理这个消息的请求,也就是再来一次,整个长轮询的时间默认 30 秒。
简单的说就是 5 秒会检查一次消息时候到了,如果到了则调用 processRequest 再处理一次。这好像不太实时啊? 5秒?
别急,还有个 ReputMessageService 线程,这个线程用来不断地从 commitLog 中解析数据并分发请求,构建出 ConsumeQueue 和 IndexFile 两种类型的数据,并且也会有唤醒请求的操作,来弥补每 5s 一次这么慢的延迟
代码我就不截了,就是消息写入并且会调用 pullRequestHoldService 的 notifyMessageArriving 方法。
最后我再来画个图,描述一下整个流程。
Kafka 中的长轮询
像 Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。
简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。
并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。
我们来简单的看一下源码,为了突出重点,我会删减一些代码。
先来看消费者端的代码。
上面那个 poll 接口想必大家都很熟悉,其实从注解直接就知道了确实是等待数据的到来或者超时,我们再简单的往下看。
我们再来看下最终 client.poll 调用的是什么。
最后调用的就是 Kafka 包装过的 selector,而最终会调用 Java nio 的 select(timeout)。
现在消费者端的代码已经清晰了,我们再来看看 Broker 如何做的。
Broker 处理所有请求的入口其实我在之前的文章介绍过,就在 KafkaApis.scala 文件的 handle 方法下,这次的主角就是 handleFetchRequest 。
这个方法进来,我截取最重要的部分。
下面的图片就是 fetchMessages 方法内部实现,源码给的注释已经很清晰了,大家放大图片看下即可。
这个炼狱名字取得很有趣,简单的说就是利用时间轮来执行定时任务,例如这里是 delayedFetchPurgatory ,专门用来处理延迟拉取操作。
我们先简单想一下,这个延迟操作都需要实现哪些方法,首先构建的延迟操作需要有检查机制,来查看消息是否已经到了,然后呢还得有个消息到了之后该执行的方法,还需要有执行完毕之后该干啥的方法,当然还得有个超时之后得干啥的方法。
这几个方法其实对应的就是代码里的 DelayedFetch ,这个类继承了 DelayedOperation 内部有:
- isCompleted 检查条件是否满足的方法
- tryComplete 条件满足之后执行的方法
- onComplete 执行完毕之后调用的方法
- onExpiration 过期之后需要执行的方法
判断是否过期就是由时间轮来推动判断的,但是总不能等过期的时候再去看消息到了没吧?
这里 Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延迟请求消息来了,具体代码我不贴了, 在 ReplicaManager 和 appendRecords 方法内部再深入个两方法可以看到。
不过虽说代码不贴,图还是要画一下的。
小结一下
可以看到 RocketMQ 和 Kafka 都是采用“长轮询”的机制,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。
一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。
最后
总的而言推拉模式各有优劣,而我个人觉得一般情况下拉模式更适合于消息队列。
其他补充
鱼聪明 AI 的回答:
鱼聪明 AI 地址:https://www.yucongming.com/
RocketMQ
优点:
- 高吞吐量和低延迟: RocketMQ 能够处理每秒数百万条消息的高吞吐量,并提供低延迟的消息传递能力,适合需要快速数据处理和响应的场景。
- 丰富的消息模型支持: RocketMQ 支持丰富的消息模型,包括顺序消息、事务消息等,能够满足复杂应用中对消息可靠性和顺序性的需求。
- 分布式特性和高可用性: RocketMQ 设计为分布式系统,具备良好的水平扩展性和高可用性,支持消息的持久化存储和数据备份,保障数据的安全性和可靠性。
- 灵活的部署模式: RocketMQ 支持多种部署模式,包括单机部署、主从模式和集群部署,可以根据应用需求选择合适的部署方式。
缺点:
- 社区和生态系统相对较小: 相较于Kafka,RocketMQ 的社区规模和生态系统相对较小,可能在某些特定功能支持和第三方集成方面不如Kafka成熟。
- 复杂性较高: RocketMQ 在配置和管理上相对较复杂,需要一定的运维经验和专业知识来保障系统的稳定性和性能表现。
主要使用场景:
- 电商和金融行业的实时数据处理: RocketMQ 的低延迟和高吞吐量特性使其非常适合处理电商和金融领域的实时数据流,如订单处理、支付通知等。
- 分布式事务应用: RocketMQ 的事务消息模型支持确保消息的可靠性传递,适合需要事务性保证的应用场景,如分布式事务、应用程序集成等。
- 大规模日志收集与分析: RocketMQ 的高吞吐量和顺序消息特性,使其成为大规模日志收集和分析系统的理想选择。
Kafka
优点:
- 极高的吞吐量和存储容量: Kafka 是为大规模数据流设计的,能够处理每秒数百万条消息和PB级别的存储数据,适合大数据场景下的实时数据处理和分析。
- 持久性和可靠性: Kafka 提供了持久化存储和数据复制机制,确保消息的可靠传递和数据的安全性,支持高可用性和故障容错。
- 灵活的消息发布订阅模型: Kafka 提供了灵活的发布订阅模型,支持多种消费者订阅方式和数据分发模式,如分区、复制和副本机制,能够满足复杂数据流的处理需求。
- 成熟的生态系统和社区支持: Kafka 拥有庞大的开发社区和丰富的生态系统,支持大量的第三方集成和工具,如数据流处理、流媒体处理等。
缺点:
- 运维复杂度较高: Kafka 的配置和管理较为复杂,需要专业的运维团队来维护和管理整个集群,包括监控、扩展和故障处理等方面的工作。
- 实时性稍逊: 尽管 Kafka 在吞吐量上表现出色,但在一些低延迟的场景下可能无法满足实时性要求,特别是对于顺序消息的处理。
主要使用场景:
- 大数据流处理和实时分析: Kafka 的高吞吐量和大容量存储能力使其成为大数据场景下的首选,如日志收集、实时监控、用户行为分析等。
- 事件驱动架构: Kafka 的消息发布订阅模型适合构建事件驱动的微服务架构和实时数据处理流水线,如事件溯源、实时通知等。
- 流媒体处理和持续集成: Kafka 的持久化存储和可靠传输特性使其适合于流媒体处理和持续集成的场景,如实时推荐系统、数据流管道等。
总结
RocketMQ 和 Kafka 都是功能强大的消息队列系统,各自在不同的应用场景中有着显著的优势和适用性。选择合适的系统取决于具体的业务需求,包括数据处理的速度、可靠性要求以及整体架构设计等方面的考量。
欢迎交流
本文主要介绍两种不同的消息队列的推拉模式、以及各种的优缺点和使用场景,在文末还有三个关于消息队列的问题,欢迎小伙伴在评论区留言!近期面试鸭小程序已全面上线,想要刷题的小伙伴可以积极参与!
1)消息队列系统在分布式系统中的角色和优化策略是什么?
2)消息队列系统如何确保消息顺序性?
3)消息队列如何保证消息的可靠性传递?