Kafka源码分析(四) - Server端-请求处理框架

系列文章目录

Kafka源码分析-目录

一. 总体结构

先给一张概览图:
在这里插入图片描述

服务端请求处理过程涉及到两个模块:kafka.networkkafka.server

1.1 kafka.network

该包是kafka底层模块,提供了服务端NIO通信能力基础。

有4个核心类:SocketServer、Acceptor、Processor、RequestChannel。各自角色如下:

  • SocketServer:服务端的抽象,是服务端通信的入口;

  • Acceptor:Reactor通信模式中处理连接ACCEPT事件的线程/线程池所执行的任务;

  • Processor:Reactor通信模式中处理连接可读/可写事件的线程/线程池所执行的任务;

  • RequestChannel:请求队列,存储已经解析好的请求以等待处理;

对于上层模块而言,该基础模块有两个输入和一个输出

  1. 输入:IP+端口号,该模块会对目标端口实现监听;

  2. 输出:解析好的请求,通过RequestChannel进行输出;

  3. 输入:待发送的Response,通过Processor.responseQueue来完成输入;

1.2 kafka.server

该包在kafka.network的基础上实现各种请求的处理逻辑,主要包含KafkaServer和KafkaApis两个类。其中:

  • KafkaServer:Kafka服务端的抽象,统一维护Kafka服务端的各流程和状态;

  • KakfaApis:维护了各类请求对应的业务逻辑,通过KafkaServer.apis字段组合到KafkaServer之中;

二. Server的端口监听

整体流程如图:
在这里插入图片描述

接下来按调用顺序依次分析各方法

2.1 KafkaServer.startup()

关于端口监听的核心逻辑分4步,代码如下(用注释说明各部分的目的):

def startup() {// 省略无关代码... ...// 1. 创建SocketServersocketServer = new SocketServer(config, metrics, time, credentialProvider)// 2. 启动端口监听// (在这里完成了Acceptor的创建和端口ACCEPT事件的监听)// (startupProcessors = false表示暂不启动Processor处理线程)socketServer.startup(startupProcessors = false)// 3. 启动请求处理过程中的相关依赖// (这也是第2步中不启动Processor处理线程的原因,有依赖项需要处理)... ...// 4. 启动端口可读/可写事件处理线程(即Processor线程)socketServer.startProcessors()// 省略无关代码... ...
}

2.2 SocketServer.startup(Boolean)

代码及说明性注释如下:

def startup(startupProcessors: Boolean = true) {this.synchronized {// 省略无关代码... ...// 1. 创建Accetpor和Processor的实例,// 同时页完成了Acceptor对端口ACCEPT事件的监听createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)// 2. [可选]启动各Acceptor对应的Processor线程if (startupProcessors) {startProcessors()}}
}

2.3 ScocketServer.createAcceptorAndProcessor()

直接上注释版的代码,流程分3步:

// 入参解释
// processorsPerListener: 对于每个IP:Port, 指定Reactor模式子线程池大小, 
//                        即处理端口可读/可写事件的线程数(Processor线程);
// endpoints: 接收请求的IP:Port列表;
def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {// 省略无关代码... ...endpoints.foreach { endpoint =>// 省略无关代码... ...// 1. 创建Acceptor对象// 在此步骤中调用Acceptor.openServerSocket, 完成了对端口ACCEPT事件的监听val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)// 2. 创建了与acceptor对应的Processor对象列表// (这里并未真正启动Processor线程)addProcessors(acceptor, endpoint, processorsPerListener)// 3. 启动Acceptor线程KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()// 省略无关代码... ...}}

2.4 Acceptor.openServerSocket()

该方法中没什么特殊点,就是java NIO的标准流程:

def openServerSocket(host: String, port: Int): ServerSocketChannel = {// 1. 构建InetSocketAddress对象val socketAddress =if (host == null || host.trim.isEmpty)new InetSocketAddress(port)elsenew InetSocketAddress(host, port)// 2. 构建ServerSocketChannel对象, 并设置必要参数值val serverChannel = ServerSocketChannel.open()serverChannel.configureBlocking(false)if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)serverChannel.socket().setReceiveBufferSize(recvBufferSize)// 3. 端口绑定, 实现事件监听try {serverChannel.socket.bind(socketAddress)info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))} catch {case e: SocketException =>throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)}// 4. 返回ServerSocketChannel对象, 用于后续register到Selector中serverChannel
}

2.5 SocketServer.startProcessor()

从这步开始,仅剩的工作就是启动Processor线程,代码都非常简单。比如本方法只是遍历Acceptor列表,并调用Acceptor.startProcessors()

def startProcessors(): Unit = synchronized {acceptors.values.asScala.foreach { _.startProcessors() }info(s"Started processors for ${acceptors.size} acceptors")
}

2.6 Acceptor.startProcessors()

该方法很简明,直接上代码

def startProcessors(): Unit = synchronized {if (!processorsStarted.getAndSet(true)) {startProcessors(processors)}
}def startProcessors(processors: Seq[Processor]): Unit = synchronized {processors.foreach { processor =>KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",processor).start()}
}

三. 请求/响应的格式

3.1 格式概述

在这里插入图片描述
请求和响应都由两部分组成:Header和Body。RequestHeader中包含ApiKey、ApiVersion、CorrelationId、ClientId;ResponseHeader中只包含CorrelationId字段。接下来逐个讲解这些字段。

  • ApiKey

    2字节整型,指明请求的类型;比如0代表Produce请求,1代表Fetch请求;具体id和请求类型之间的映射关系可在 org.apache.kafka.common.protocol.ApiKeys 中找到;

  • ApiVersion

    随着API的升级迭代,各类型请求的请求体格式可能有变更;这个2字节的整型指明了请求体结构的版本;

  • CorrelationId

    4字节整型,在Response中传回,Kafka Server端不处理,用于客户端内部关联业务数据;

  • ClientId

    可变长字符串,标识客户端;

3.2 请求体/响应体的具体格式

各业务操作(比如Produce、Fetch等)对应的请求体和响应体格式都维护在 org.apache.kafka.common.protocol.ApiKeys 中。接下来以Produce为例讲解ApiKeys是如何表达数据格式的。

ApiKeys是个枚举类,其核心属性如下:

public enum ApiKeys {// 省略部分代码... ...// 上文提到的请求类型对应的idpublic final short id;// 业务操作名称public final String name;// 各版本请求体格式public final Schema[] requestSchemas;// 各版本响应体格式public final Schema[] responseSchemas;// 省略部分代码... ...
}

其中PRODUCE枚举项的定义如下

PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions())

可以看到各版本的请求格式维护在 ProduceRequest.schemaVersions(),代码如下

public static Schema[] schemaVersions() {return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6};
}

这里只是简单返回了一个Schema数组。一个Schema对象代表了一种数据格式。请求头中的ApiVersion指明了请求体的格式对应数组的第几项(从0开始)。

接下来我们看看Schema是如何表达数据格式的。其结构如下
在这里插入图片描述
Schema有两个字段:fields和fieldsByName。其中fields是体现数据格式的关键,它指明了字段的排序和各字段类型;而fieldsByName只是按字段名重新组织的Map,用于根据名称查找对应字段。

BoundField只是Field的简单封装。Field有两个核心字段:name和type。其中name表示字段名称,type表示字段类型。常见的Type如下:

Type.BOOLEAN;
Type.INT8;
Type.INT16;
Type.INT32;// 可通过org.apache.kafka.common.protocol.types.Type查看全部类型
... ...

回到PRODUCE API,通过查看Schema的定义,能看到其V0版本的请求体和响应体的结构如下:
在这里插入图片描述

四. 请求的处理流程

在这里插入图片描述

  1. Acceptor监听到ACCEPT事件(TCP创建连接"第一次握手"的SYN);

  2. Acceptor将将连接注册到Processor列表内的其中一个,由该Processor监听这个连接的后续可读可写事件;

  3. Processor接收到完整请求后,会将Request追加到RequestChannel中进行排队,等待后续处理;

  4. KafkaServer中有个requestHandlerPool的字段,KafkaRequestHandlerPool类型,代表请求处理线程池;KafkaRequestHandler就是其中的线程,会从RequestChannel拉请求进行处理;

  5. KafkaRequestHandler将拉到的Request传入KafkaApis.handle(Request)方法进行处理;

  6. KafkaApis根据不同的ApiKey调用不同的方法进行处理,处理完毕后会将Response最终写入对应的Processor的ResponseQueue中等待发送;KafkaApis.handle(Request)的方法结构如下:

    def handle(request: RequestChannel.Request) {try {// 省略部分代码... ...request.header.apiKey match {case ApiKeys.PRODUCE => handleProduceRequest(request)case ApiKeys.FETCH => handleFetchRequest(request)case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)case ApiKeys.METADATA => handleTopicMetadataRequest(request)case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)// 省略部分代码... ...}} catch {case e: FatalExitError => throw ecase e: Throwable => handleError(request, e)} finally {request.apiLocalCompleteTimeNanos = time.nanoseconds}
    }
    
  7. Processor从自己的ResponseQueue中拉取待发送的Respnose;

  8. Processor将Response发给客户端;

五. 总结

才疏学浅,未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。

另外也可以在目录中找到同系列的其他文章:
Kafka源码分析系列-目录(收藏关注不迷路)
感谢阅读。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/1504.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Django】django.core.exceptions.AppRegistryNotReady: Apps aren‘t loaded yet.

其中django后台manage.py入口程序报错,检索很多问题解决方案,这里记录下个人问题原因 1.django启动异常问题详情 django.core.exceptions.AppRegistryNotReady: Apps aren’t loaded yet. 2.问题原因 Python第三方包安装版本不一致或缺少依赖包&…

Flink窗口机制

1.窗口的概念 时间是为窗口服务的。窗口是什么?为什么会有窗口呢? (1)Flink要处理的数据,一般是从Kafka过来的流式数据,如果只是单纯地统计流的数据量,是没办法统计的。 (2&#xff…

C语言程序设计:简易版的printf函数实现

简易版的printf函数实现 功能说明 (1)使用putchar函数、va可变参完成printf函数基本功能的实现; (2)函数说明: 实现对下列数据类型的输出,并返回成功输出打印的字符个数: 整数&…

在CSDN创作了6个月,我收获了什么?文末送书~

作者主页:阿玥的小东东主页! 正在学习:python和C/C 期待大家的关注哦 目录 一次很好的机会,让我开始了CSDN之旅 首先来看看我的几位领路人 创作动力 1W粉丝 在CSDN我收获了什么? 很高的展现量 认证创作者身份 社…

【Linux】系统安全及应用

目录 一、账号安全基本措施 1.系统账号清理 2.密码安全控制 3.历史命令安全管理 4.限制su切换用户 1)将信任的用户加入到wheel组中 2)修改su的PAM认证配置文件 5.ssh远程登录输入三次密码错误则锁定用户 二、Linux中的PAM安全认证 1.su命令的…

Redis入门到通关之数据结构解析-动态字符串SDS

文章目录 Redis数据结构-动态字符串动态扩容举例二进制安全SDS优点与C语言中的字符串的区别 Redis数据结构-动态字符串 我们都知道 Redis 中保存的Key是字符串,value 往往是字符串或者字符串的集合。可见字符串是 Redis 中最常用的一种数据结构。 不过 Redis 没有…

Android Studio超级详细讲解下载、安装配置教程(建议收藏)

博主介绍:✌专注于前后端、机器学习、人工智能应用领域开发的优质创作者、秉着互联网精神开源贡献精神,答疑解惑、坚持优质作品共享。本人是掘金/腾讯云/阿里云等平台优质作者、擅长前后端项目开发和毕业项目实战,深受全网粉丝喜爱与支持✌有…

贪吃蛇游戏实现(VS编译环境)

贪吃蛇游戏 🥕个人主页:开敲🍉 🔥所属专栏:C语言🍓 🌼文章目录🌼 0. 前言 1. 游戏背景 2. 实现后游戏画面展示 3. 技术要求 4. Win32 API介绍 4.1 Win32 API 4.2 控制台程序 4.…

Java之类和对象

一面向对象的初步认知 1.什么是面向对象 Java是一门纯面向对象的语言(Object Oriented Program,简称OOP),在面向对象的世界里,一切皆为对象。面向对象是解决问题的一种思想,主要依靠对象之间的交互完成一件事情。用面向对象的思想…

嵌入式物联网实战开发笔记-乐鑫ESP32开发环境ESP-IDF搭建【doc.yotill.com】

乐鑫ESP32入门到精通项目开发参考百例下载: 链接:百度网盘 请输入提取码 提取码:4e33 3.1 ESP-IDF 简介 ESP-IDF(Espressif IoT Development Framework)是乐鑫(Espressif Systems)为 ESP 系列…

大型网站系统架构演化实例_2.使用缓存改善网站性能

1.使用缓存改善网站性能 网站访问的特点和现实世界的财富分配一样遵循二八定律:80%的业务访问集中在20%的数据上。既然大部分业务访问集中在一小部分数据上,那么如果把这一小部分数据缓存在内存中,就可以减少数据库的访问压力&#xf…

【Python】自定义修改pip下载模块默认的安装路径

因为电脑下载了Anaconda提供的默认Python 3.9 以及后期下载的python3.10所以在Pychram进行项目开发时,发现一些库怎么导入都导入不了,手动install也是失败,后期在cmd里面发现python以及pip配置有点儿混乱,导致执行命令时&#xff…

碳循环、人类、遥感之间的关联

1. 碳与碳循环 碳是自然界中很常见的一种元素,它以多种形式广泛存在于大气和地壳之中。碳单质很早就被人认识和利用,碳的一系列化合物——有机物是生命的根本。 1.1 自然界中的碳 地球上最大的两个碳库是岩石圈和化石燃料,含碳量约占…

在RISC-V64架构的CV1811C开发板上应用perf工具进行多线程程序性能分析及火焰图调试

CV1811C环境编译 SDK目录结构 . ├── build // 编译目录,存放编译脚本以及各board差异化配置 ├── buildroot-2021.05 // buildroot开源工具 ├── freertos // freertos系统 ├── fsbl // fsbl启动固件,prebuilt形式存在…

Android14 - WindowManagerService之客户端Activity布局

Android14 - WindowManagerService之客户端Activity布局 一、主要角色 WMS作为一个服务端,有多种客户端与其交互的场景。我们以常见的Activity为例: Activity:在ActivityThread构建一个Activity后,会调用其attach方法,…

[docker] volume 补充 环境变量 参数

[docker] volume 补充 & 环境变量 & 参数 这里补充一下 volume 剩下的内容,以及添加参数(ARG) 和 环境变量 ENV 的内容 read only volumes ❯ docker run-p 3000:80--rm--name feedback-app-v feedback:/app/feedback-v "$(pwd):/app"-v /app/…

【C++初阶】vector使用特性 vector模拟实现

1.vector的介绍及其使用 1.1 vector的介绍 vector文档介绍 1. vector是表示可变大小数组的序列容器。 2. 就像数组一样,vector也采用的连续存储空间来存储元素。也就是意味着可以采用下标对vector的元素进行访问,和数组一样高效。但是又不像数组&#…

第24天:安全开发-PHP应用文件管理模块显示上传黑白名单类型过滤访问控制

第二十四天 一、PHP文件管理-显示&上传功能实现 如果被抓包抓到数据包,并修改Content-Type内容 则也可以绕过筛查 正常进行上传和下载 二、文件上传-$_FILES&过滤机制实现 无过滤机制 黑名单过滤机制 使用 explode 函数通过点号分割文件名,…

VTC视频时序控制器原理以及Verilog实现

文章目录 一、前言二、视频时序控制原理三、Verilog实现3.1 代码3.2 仿真以及分析 一、前言 VTC(Video Timing Controller)是一种用于产生视频时序的控制器,在FPGA图像领域经常用到。Xilinx Vivado 也有专门用于生成视频时序的 IP&#xff0c…

webpack-babel2

浏览器的兼容性问题 浏览器的兼容性问题不知包括随屏幕大小而变化,还包括针对浏览器支持的特性(如css特性,js特性) 做处理。 目前市场上有很多浏览器:Chrome,Safari,IE,Edge等,要根据它们的市场占有率来决…