RocketMQ 和 Kafka 关于消息队列的推拉模式是怎么做的?

引言:在当今的大数据和分布式系统中,消息队列扮演着至关重要的角色,它们作为系统之间通信和数据传输的媒介,为各种场景下的数据流动提供了可靠的基础设施支持。在消息队列的设计中,推拉模式是两种常见的消息传递机制,它们各自具有独特的优势和适用场景。本文将聚焦于两个著名的消息队列系统:RocketMQ 和 Kafka,并探讨它们在消息传递过程中是如何实现拉模式的。虽然两者都选择了拉模式,但它们的具体实现方式略有不同,从内部机制到性能优化,都反映了对不同应用场景的思考和针对性的改进。

题目

RocketMQ 和 Kafka 关于消息队列的推拉模式是怎么做的?

推荐解析

那到底是推还是拉?

推模式和拉模式各有优缺点,到底该如何选择呢?

RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。

我个人觉得拉模式更加的合适,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。

而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你保存好消息了,你要就来拿好了。

虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务消耗比较慢,但是 Broker 毕竟是一个中心点,能轻量就尽量轻量。

那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么? 怕,所以它们操作了一波,减轻了拉模式的缺点。

长轮询

RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式,我们就来看看它们是如何操作的。

为了简单化,下面我把消息不满足本次拉取的条数啊、总大小啊等等都统一描述成还没有消息,反正都是不满足条件。

RocketMQ 中的长轮询

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已

因为 RocketMQ 在被背后偷偷的帮我们去 Broker 请求数据了。

后台会有个 RebalanceService 线程,这个线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入阻塞队列 pullRequestQueue 中。然后又有个PullMessageService 线程不断的从阻塞队列 pullRequestQueue 中获取 pullRequest,然后通过网络请求 broker,这样实现的准实时拉取消息。

这一部分代码我不截了,就是这么个事儿,稍后会用图来展示。

然后 Broker 的 PullMessageProcessor 里面的 processRequest 方法是用来处理拉消息请求的,有消息就直接返回,如果没有消息怎么办呢?我们来看一下代码。

Snipaste_2024-06-17_05-49-17.jpg

我们再来看下 suspendPullRequest 方法做了什么。

Snipaste_2024-06-17_05-50-03.jpg

而 PullRequestHoldService 这个线程会每 5 秒从 pullRequestTable 取PullRequest请求,然后看看待拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了,则会调用 notifyMessageArriving ,最终调用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新尝试处理这个消息的请求,也就是再来一次,整个长轮询的时间默认 30 秒。

Snipaste_2024-06-17_05-51-17.jpg

简单的说就是 5 秒会检查一次消息时候到了,如果到了则调用 processRequest 再处理一次。这好像不太实时啊? 5秒?

别急,还有个 ReputMessageService 线程,这个线程用来不断地从 commitLog 中解析数据并分发请求,构建出 ConsumeQueue 和 IndexFile 两种类型的数据,并且也会有唤醒请求的操作,来弥补每 5s 一次这么慢的延迟

代码我就不截了,就是消息写入并且会调用 pullRequestHoldService 的 notifyMessageArriving 方法。

最后我再来画个图,描述一下整个流程。

Snipaste_2024-06-17_05-53-18.jpg

Kafka 中的长轮询

像 Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。

简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。

并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。

我们来简单的看一下源码,为了突出重点,我会删减一些代码。

先来看消费者端的代码。

Snipaste_2024-06-17_05-54-55.jpg

上面那个 poll 接口想必大家都很熟悉,其实从注解直接就知道了确实是等待数据的到来或者超时,我们再简单的往下看。

Snipaste_2024-06-17_05-55-33.jpg

我们再来看下最终 client.poll 调用的是什么。

Snipaste_2024-06-17_05-55-58.jpg

最后调用的就是 Kafka 包装过的 selector,而最终会调用 Java nio 的 select(timeout)

现在消费者端的代码已经清晰了,我们再来看看 Broker 如何做的

Broker 处理所有请求的入口其实我在之前的文章介绍过,就在 KafkaApis.scala 文件的 handle 方法下,这次的主角就是 handleFetchRequest 。

Snipaste_2024-06-17_05-57-27.jpg

这个方法进来,我截取最重要的部分。

Snipaste_2024-06-17_05-57-59.jpg

下面的图片就是 fetchMessages 方法内部实现,源码给的注释已经很清晰了,大家放大图片看下即可。

Snipaste_2024-06-17_05-58-31.jpg

这个炼狱名字取得很有趣,简单的说就是利用时间轮来执行定时任务,例如这里是 delayedFetchPurgatory ,专门用来处理延迟拉取操作。

我们先简单想一下,这个延迟操作都需要实现哪些方法,首先构建的延迟操作需要有检查机制,来查看消息是否已经到了,然后呢还得有个消息到了之后该执行的方法,还需要有执行完毕之后该干啥的方法,当然还得有个超时之后得干啥的方法。

这几个方法其实对应的就是代码里的 DelayedFetch ,这个类继承了 DelayedOperation 内部有:

  • isCompleted 检查条件是否满足的方法
  • tryComplete 条件满足之后执行的方法
  • onComplete 执行完毕之后调用的方法
  • onExpiration 过期之后需要执行的方法

判断是否过期就是由时间轮来推动判断的,但是总不能等过期的时候再去看消息到了没吧?

这里 Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延迟请求消息来了,具体代码我不贴了, 在 ReplicaManager 和 appendRecords 方法内部再深入个两方法可以看到。

不过虽说代码不贴,图还是要画一下的。

Snipaste_2024-06-17_06-01-34.jpg

小结一下

可以看到 RocketMQ 和 Kafka 都是采用“长轮询”的机制,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。

一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。

最后

总的而言推拉模式各有优劣,而我个人觉得一般情况下拉模式更适合于消息队列。

其他补充

鱼聪明 AI 的回答:

鱼聪明 AI 地址:https://www.yucongming.com/

RocketMQ

优点:

  1. 高吞吐量和低延迟: RocketMQ 能够处理每秒数百万条消息的高吞吐量,并提供低延迟的消息传递能力,适合需要快速数据处理和响应的场景。
  2. 丰富的消息模型支持: RocketMQ 支持丰富的消息模型,包括顺序消息、事务消息等,能够满足复杂应用中对消息可靠性和顺序性的需求。
  3. 分布式特性和高可用性: RocketMQ 设计为分布式系统,具备良好的水平扩展性和高可用性,支持消息的持久化存储和数据备份,保障数据的安全性和可靠性。
  4. 灵活的部署模式: RocketMQ 支持多种部署模式,包括单机部署、主从模式和集群部署,可以根据应用需求选择合适的部署方式。

缺点:

  1. 社区和生态系统相对较小: 相较于Kafka,RocketMQ 的社区规模和生态系统相对较小,可能在某些特定功能支持和第三方集成方面不如Kafka成熟。
  2. 复杂性较高: RocketMQ 在配置和管理上相对较复杂,需要一定的运维经验和专业知识来保障系统的稳定性和性能表现。

主要使用场景:

  • 电商和金融行业的实时数据处理: RocketMQ 的低延迟和高吞吐量特性使其非常适合处理电商和金融领域的实时数据流,如订单处理、支付通知等。
  • 分布式事务应用: RocketMQ 的事务消息模型支持确保消息的可靠性传递,适合需要事务性保证的应用场景,如分布式事务、应用程序集成等。
  • 大规模日志收集与分析: RocketMQ 的高吞吐量和顺序消息特性,使其成为大规模日志收集和分析系统的理想选择。

Kafka

优点:

  1. 极高的吞吐量和存储容量: Kafka 是为大规模数据流设计的,能够处理每秒数百万条消息和PB级别的存储数据,适合大数据场景下的实时数据处理和分析。
  2. 持久性和可靠性: Kafka 提供了持久化存储和数据复制机制,确保消息的可靠传递和数据的安全性,支持高可用性和故障容错。
  3. 灵活的消息发布订阅模型: Kafka 提供了灵活的发布订阅模型,支持多种消费者订阅方式和数据分发模式,如分区、复制和副本机制,能够满足复杂数据流的处理需求。
  4. 成熟的生态系统和社区支持: Kafka 拥有庞大的开发社区和丰富的生态系统,支持大量的第三方集成和工具,如数据流处理、流媒体处理等。

缺点:

  1. 运维复杂度较高: Kafka 的配置和管理较为复杂,需要专业的运维团队来维护和管理整个集群,包括监控、扩展和故障处理等方面的工作。
  2. 实时性稍逊: 尽管 Kafka 在吞吐量上表现出色,但在一些低延迟的场景下可能无法满足实时性要求,特别是对于顺序消息的处理。

主要使用场景:

  • 大数据流处理和实时分析: Kafka 的高吞吐量和大容量存储能力使其成为大数据场景下的首选,如日志收集、实时监控、用户行为分析等。
  • 事件驱动架构: Kafka 的消息发布订阅模型适合构建事件驱动的微服务架构和实时数据处理流水线,如事件溯源、实时通知等。
  • 流媒体处理和持续集成: Kafka 的持久化存储和可靠传输特性使其适合于流媒体处理和持续集成的场景,如实时推荐系统、数据流管道等。

总结

RocketMQ 和 Kafka 都是功能强大的消息队列系统,各自在不同的应用场景中有着显著的优势和适用性。选择合适的系统取决于具体的业务需求,包括数据处理的速度、可靠性要求以及整体架构设计等方面的考量。

欢迎交流

本文主要介绍两种不同的消息队列的推拉模式、以及各种的优缺点和使用场景,在文末还有三个关于消息队列的问题,欢迎小伙伴在评论区留言!近期面试鸭小程序已全面上线,想要刷题的小伙伴可以积极参与!

1)消息队列系统在分布式系统中的角色和优化策略是什么?

2)消息队列系统如何确保消息顺序性?

3)消息队列如何保证消息的可靠性传递?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/32517.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

02 Shell编程之条件语句(补充实验部分)

1、双分支if语句的补充(实验部分) 例如,要编写一个连通性测试脚本,通过位置参数来提供目标主机地址,然后根据ping检测结果给出相应的提示 (能ping通的,回馈一个信息:该服务器是开启…

浔川AI社宣布正式开创“浔川AI助手”——浔川AI社

这是浔川AI社的标志。 2024.6.22晚8点35分宣布,浔川AI社正式开创“浔川AI助手” 全面发展。 据浔川AI社报道称‘“浔川AI助手”内容包含全部,写作、聊天......都有。’ 让我们敬请期待!

【JAVA】精致的五角星

输出的这幅图像中,一颗精致的金色五角星跃然于深红色背景之上,绽放出迷人的光彩。 要绘画这颗五角星,首先要了解五角星的构造和角度问题。我们可以分为内五边形,和外五边形。内五边形从他的中心到每个外点,连接起来&am…

ECharts词云图(案例一)+配置项详解

ECharts词云图(案例一)配置项详解 ECharts 是一款由百度团队开发的基于 JavaScript 的开源可视化图表库,它提供了丰富的图表类型,包括常见的折线图、柱状图、饼图等,以及一些较为特殊的图表,如词云图。从版…

带百分比的进度条控件(ProgressBar)源码

带百分比的进度条控件(ProgressBar): 源码下载地址:https://download.csdn.net/download/wgxds/89472915

打破数据分析壁垒:SPSS复习必备(六)

一、数据的报表呈现 1.报表概述 (1).SPSS中的报表功能 1)Base 模块 2)Custom Tables 模块 3) Original Tables 模块 (2).报表的基本绘制步骤 步骤一:确定基本结构 步骤二:使用对话框绘制表格的基本结构 步骤三:完善细节 步骤四:添加其余变…

Javase.图书管理系统基本框架

图书管理系统基本框架 1.核心类介绍2. book包详解2.1 Book 类2.1.2 代码展示2.1.2 代码解析 2.2 BookList 类2.2.2 代码展示2.2.2 代码解析 2.3Book类和BookList类的联系 3. 用户角色与管理3.1 User 类3.1.1 代码展示3.1.2 代码解析 3.2 adminUser 类3.2.1 代码展示3.2.2代码解…

我做了个Hexo博客

最近花了两个周末的时间边学变做Hexo博客,最终成品地址如下: https://blog.mybatis.io 下面先说说做博客的经过,想做Hexo博客一开始是因为看到了 hexo-theme-icarus 主题,这个主题样式如下: 首页 内容页 这个主题是…

KEIL5软件仿真观察PIN脚电平(软件仿真逻辑分析仪的使用)

仿真前的调整: 例:STM32F103C8T6 (如果是F4的板子稍微对着修改一下) 逻辑分析仪的使用 输入 PORTA.6( PORAT(哪一组).(哪一个引脚) )

通过rpm命令查看特定rpm包的安装时间

通过rpm命令查看特定rpm包的安装时间 命令解读 [aqjgmaster ~]$ rpm -q --qf "%{INSTALLTIME}\n" kernel 1681468253 [aqjgmaster ~]$ [aqjgmaster ~]$ date -d rpm -q --qf "%{INSTALLTIME}\n" kernel Fri Apr 14 18:30:53 CST 2023 [aqjgmaster ~]$ [a…

已解决java.rmi.AlreadyBoundException异常的正确解决方法,亲测有效!!!

已解决java.rmi.AlreadyBoundException异常的正确解决方法,亲测有效!!! 目录 问题分析 出现问题的场景 报错原因 解决思路 解决方法 分析错误日志 检查重复绑定情况 解除已有的绑定 优化代码逻辑 使用同步机制 总结 …

基于格网的边缘点检测(python)

1、背景介绍 前文已介绍对点云进行格网处理,可以计算平面点云面积、格网拓扑关系构建,相关博客如下: (1)点云格网过程可视化(C PCL)-CSDN博客 (2)平面点云格网过程及可…

Kimichat使用案例026:AI翻译英语PDF文档的3种方法

文章目录 一、介绍二、腾讯交互翻译TranSmart https://transmart.qq.com/三、沉浸式翻译三、谷歌网页翻译一、介绍 短的文章,直接丢进kimichat、ChatGPT里面很快就可以翻译完成,而且效果很佳。但是,很长的PDF文档整篇需要翻译,怎么办呢? 二、腾讯交互翻译TranSmart https…

VScode如何调节编辑器字体大小

首先,在vscode界面,依照顺序输入“Ctrlk”、“Ctrls”,即可进入键盘快捷方式设定界面。(如下图所示) 其次,在搜索框中输入“缩小”或者“放大”,就会出现对应的“缩小编辑器字体”或者“放大编…

算法刷题总结

1. 排序算法 1.1 快速排序算法 public abstract class Sort<T extends Comparable<T>> {public abstract void sort(T[] array);protected boolean less(T first, T two) {return first.compareTo(two) < 0;}protected void swap(T[] array, int i, int j) {T…

Python数据分析-糖尿病数据集数据分析

一、研究背景介绍 糖尿病是美国最普遍的慢性病之一&#xff0c;每年影响数百万美国人&#xff0c;并对经济造成重大的经济负担。糖尿病是一种严重的慢性疾病&#xff0c;其中个体失去有效调节血液中葡萄糖水平的能力&#xff0c;并可能导致生活质量和预期寿命下降。。。。糖尿…

CentOS系统查看版本的各个命令

cat /etc/centos-release 查看CentOS版本 uname -a 命令的结果分别代表&#xff1a;当前系统的内核名称、主机名、内核发型版本、节点名、系统时间、硬件名称、硬件平台、处理器类型以及操作系统名称 cat /proc/version 命令用于查看Linux内核的版本信息。执行该命令后&#xf…

Springboot整合MinIO实现系统文件的便捷式管理实例

一、MinIO简介 1.基础描述 MinIO 是一个高性能的对象存储系统&#xff0c;用于存储大量非结构化数据。它以简洁、高效、可靠和高扩展性著称&#xff0c;能够胜任各种数据密集型任务。MinIO 采用了与 Amazon S3 兼容的 API&#xff0c;使得用户无需额外学习即可上手使用。下面…

DNF手游攻略:云手机辅助流光星陨刀详细攻略大全!

DNF手游中&#xff0c;流光星陨刀是鬼剑士的专属神器之一&#xff0c;拥有快速的攻击速度和优秀的物理与法术攻击属性&#xff0c;因其出色的性能和未来升级的潜力&#xff0c;成为广大玩家关注的焦点。 流光星陨刀的背景与起源 流光星陨刀作为鬼剑士的标志性武器之一&#xf…

STM32单片机开发入门(十)SSCOM串口通信助手软件安装及使用提供软件网盘链接

文章目录 一.概要二.SSCOM软件下载安装三.串口通讯配置及应用实例1.串口通讯基本配置2.字符串数据发送和接收的配置操作3.16进制数据发送和接收的配置操作4.定时自动发送数据配置操作5.wifi模块AT指令调试配置操作6.用串口烧录STM32单片机代码配置操作 四.以太网TCP服务器端配置…