Kafka-服务端-网络层-源码流程

整体架构如下所示:

在这里插入图片描述

responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue

  1. 客户端发送的请求被Acceptor转发给Processor处理
  2. 处理器将请求放到RequestChannel的requestQueue中
  3. KafkaRequestHandler取出requestQueue中的请求
  4. 调用KafkaApis进行业务逻辑处理
  5. KafkaApis将响应结果放到对应的Processor的responseQueue中
  6. processor从responseQueue中取出响应结果
  7. processor将响应结果返回给客户端

KafkaServer是Kafka服务端的主类,KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议,具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。

class KafkaServer(val config: KafkaConfig) {def startup() {socketServer = new SocketServer(config, metrics, time, credentialProvider)socketServer.startup(startupProcessors = false)/* start processing requests */apis = new KafkaApis(socketServer.requestChannel, ...)requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, ...)}}

SocketServer

  def startup(startupProcessors: Boolean = true) {this.synchronized {...createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}}private def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {...endpoints.foreach { endpoint =>...val acceptor = new Acceptor(endpoint, ...)addProcessors(acceptor, endpoint, processorsPerListener)KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()acceptor.awaitStartup()acceptors.put(endpoint, acceptor)}}

可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor,每个acceptor关联数个processor。这是一种典型的Reactor模式,acceptor负责与客户端建立连接,并将连接分发给processor,processor负责所分连接后续的所有读写交互。

Acceptor

  def run() {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor = 0while (isRunning) {try {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && isRunning) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {val processor = synchronized {currentProcessor = currentProcessor % processors.sizeprocessors(currentProcessor)}accept(key, processor)} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")// round robin to the next processor thread, mod(numProcessors) will be done latercurrentProcessor = currentProcessor + 1} catch {case e: Throwable => error("Error while accepting connection", e)}}}}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We don't want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable => throw ecase e: Throwable => error("Error occurred", e)}}} finally {debug("Closing server socket and selector.")CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)shutdownComplete()}}

上面是Acceptor的run()方法,可以看出,Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,客户端监听OP_CONNECT事件,负责发起连接,服务端监听OP_CONNECT事件,负责建立连接),负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。

Processor

每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示:

  override def run() {startupComplete()try {while (isRunning) {try {// 在新分到的客户端连接上注册OP_READ事件configureNewConnections()// 从responseQueue中取响应,赋值给KafkaChannel的send,等待poll时发送processNewResponses()// selector轮询各种事件,读取请求或者发送响应poll()// 封装selector.completedReceives中的请求,放入requestQueueprocessCompletedReceives()// 处理selector.completedSends响应(移除inflightResponses中的记录;执行响应的回调函数)processCompletedSends()processDisconnected()} catch {...}}} finally {...}}

Processor线程的名字中有kafka-network字样,可以通过jstack -l pid | grep kafka-network进行筛选。

KafkaRequestHandlerPool

KafkaServer会创建请求处理线程池(KafkaRequestHandlerPool),在请求处理线程池中会创建并启动多个请求处理线程(KafkaRequestHandler)。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理,在内部实际处理会交给KafkaApis完成。

class KafkaRequestHandlerPool(val brokerId: Int, ...) {...for (i <- 0 until numThreads) {createHandler(i)}def createHandler(id: Int): Unit = synchronized {runnables += new KafkaRequestHandler(..., requestChannel, apis, time)KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()}
}

KafkaRequestHandler的run()方法如下:

class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {...def run() {while (!stopped) {val req = requestChannel.receiveRequest(300)req match {case RequestChannel.ShutdownRequest =>shutdownComplete.countDown()returncase request: RequestChannel.Request =>try {request.requestDequeueTimeNanos = endTimeapis.handle(request)} catch {case e: FatalExitError =>shutdownComplete.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)} finally {request.releaseBuffer()}case null => // continue}}shutdownComplete.countDown()}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/40981.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度解析Java世界中的对象镜像:浅拷贝与深拷贝的奥秘与应用

在Java编程的浩瀚宇宙中&#xff0c;对象拷贝是一项既基础又至关重要的技术。它直接关系到程序的性能、资源管理及数据安全性。然而&#xff0c;提及对象拷贝&#xff0c;不得不深入探讨其两大核心类型&#xff1a;浅拷贝&#xff08;Shallow Copy&#xff09;与深拷贝&#xf…

防爆智能手机如何解决危险环境下通信难题?

在化工厂、石油行业、矿山等危险环境中&#xff0c;通信安全一直是难题。传统手机因不具备防爆功能&#xff0c;可能引发火花、爆炸等安全风险&#xff0c;让工作人员在关键时刻难以及时沟通。但如今&#xff0c;防爆智能手机的出现彻底改变了这一现状&#xff01; 安全通信&am…

【Python】找Excel重复行

【背景】 找重复行虽然可以通过Excel实现,但是当数据量巨大时光是找结果就很费时间,所以考虑用Python实现。 【代码】 import pandas as pd# 读取Excel文件 file_path = your excel file path df = pd.read_excel(file_path)# 查找重复行 # 这里假设要检查所有列的重复项 …

手机如何充当电脑摄像头,新手使用教程分享(新)

手机如何充当电脑摄像头&#xff1f;随着科技的发展&#xff0c;智能手机已经成为我们日常生活中不可或缺的一部分。手机的摄像头除了拍摄记录美好瞬间之外&#xff0c;其实还有个妙用&#xff0c;那就是充当电脑的摄像头。手机摄像头充当电脑摄像头使用的话&#xff0c;我们就…

一分钟学习数据安全—自主管理身份SSI分布式加密密钥管理

在这篇之前&#xff0c;我们已经对SSI有了一个全局的了解。这个系列的文章可以作为一个学习笔记来参考&#xff0c;真正要实践其中的一些方案、协议&#xff0c;还需要参考专业的书籍和官方文档。作为一个SSI系列学习笔记的最后一篇&#xff0c;我们做一个简单的延伸&#xff0…

【中项第三版】系统集成项目管理工程师 | 第 9 章 项目管理概论① | 9.1 - 9.3

前言 第 9 章对应的内容选择题和案例分析都会进行考查&#xff0c;这一章节理论性较强&#xff0c;学习要以教材为准。本章分值预计在4-5分。 目录 9.1 PMBOK的发展 9.2 项目基本要素 9.2.1 项目基础 9.2.2 项目管理 9.2.3 项目成功的标准 9.2.4 项目、项目集、项目组合…

Lemo 的 AGI 应用实战博文导航

AGI系列&#xff08;1&#xff09;&#xff1a;掌握AI大模型提示词优化术&#xff0c;提问准确率飙升秘籍 AGI系列&#xff08;2&#xff09;&#xff1a;掌握AI大模型提示词优化术&#xff0c;从容应对各种提问场景 AGI系列&#xff08;3&#xff09;&#xff1a;2024年国内最…

多态的优点

多态的优点 1、多态的优点1.1 可替换性&#xff08;Substitutability&#xff09;2、可扩充性&#xff08;Extensibility&#xff09; 2、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 1、多态的优点 在面向对象编程&#xff08;OOP…

无服务器【Serverless】架构的深度剖析:组件介绍、优缺点与适用场景

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《未来已来&#xff1a;云原生之旅》&#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、引言 1、云计算的发展趋势 2、无服务器计算简介 二、无服务…

项目迭代中新老逻辑切流和对比 - 异步查询新接口

项目迭代中新老逻辑切流-切换入口项目迭代中新老逻辑切流 - 异步查询新接口项目迭代中新老逻辑切流 - 新老数据对比—待更新 前言 前面说到了 项目迭代中新老逻辑切换入口 &#xff0c;有兴趣的可以先看一下&#xff0c;不知道大家有什么疑问吗&#xff1f; 怎么知道自己新接…

红海云签约海新域集团,产业服务运营领军企业加速人力资源数字化转型

北京海新域城市更新集团有限公司&#xff08;以下简称“海新域集团”&#xff09;是北京市海淀国有资产投资集团有限公司一级监管企业&#xff0c;致力于成为国内领先的产业服务运营商。集团积极探索城市和产业升级新模式&#xff0c;通过对老旧、低效等空间载体重新定位规划、…

FPGA基本资源介绍

文章目录 FPGA资源介绍1.可编程输入输出单元(IOB)2.可配置逻辑块(CLB)3.数字时钟管理模块(DCM)4.嵌入式块RAM(BLOCK RAM / BRAM)4.1其他ram 5.丰富的布线资源6.底层内嵌功能单元7.内嵌专用硬核软核、硬核、以及固核的概念 FPGA资源介绍 1.可编程输入输出单元(IOB) 可编程输入…

C++视觉开发 五.答题卡识别

答题卡识别主要步骤 (1)反二值化&#xff0c;选项处理为前景&#xff08;白色&#xff09;&#xff0c;其它处理为背景&#xff08;黑色&#xff09;。 (2)每个选项提取出来&#xff0c;计算各选项白色像素点个数。 (3)筛选出白色像素点最多的选项作为考生答案。 (4)与标准答案…

【机器学习】连续字段的特征变换

介绍 除了离散变量的重编码外&#xff0c;有的时候我们也需要对连续变量进行转化&#xff0c;以提升模型表现或模型训练效率。在之前的内容中我们曾介绍了关于连续变量标准化和归一化的相关内容&#xff0c;对连续变量而言&#xff0c;标准化可以消除量纲影响并且加快梯度下降…

微信扫码进入小程序的webview页面,发现左上角没有home键

问题描述&#xff1a; 微信扫小程序二维码进到web-view内嵌h5页面&#xff0c;左上角没有返回小程序主页的home键&#xff0c;ios正常显示&#xff0c;Android 没有显示 导致的原因&#xff1a; 因为顶部导航栏我是自定义的 【如果是使用小程序原生的顶部导航栏是不会出现这种…

DiffSynth-Studio全面解析与应用示例

DiffSynth-Studio简介 1.1 DiffSynth-Studio的定义与目标用户 DiffSynth-Studio 是一个创新的扩散引擎&#xff0c;专门设计用于实现图片和视频的风格转换。它通过先进的机器学习技术&#xff0c;为用户提供了一种全新的创作方式&#xff0c;使得风格转换变得更加高效和直观。…

latex中引用参考文献的命令

在LaTeX中&#xff0c;常用于引用参考文献的命令有几种&#xff0c;它们的区别主要在于引用的风格和输出的格式。以下是几种常见的引用命令及其区别&#xff1a; 命令输出效果区别与特点\cite{key}[1]基本的引用命令&#xff0c;输出带方括号的引用编号\citet{key}Author (Yea…

循环序列模型

循环序列模型 1.为什么选择序列模型&#xff1f; 2.数学符号 3.循环神经网络模型 4.通过时间的反向传播 5.不同类型的循环神经网络 6.语言模型和序列生成 7.对新序列采样 8.循环神经网络的梯度消失 9.GRU单元 10.长短期记忆 11.双向循环神经网络 12.深层循环神经网…

年化达21%(K=1),最大回撤35%,K=3时,卡玛比最优,最大回撤20%(年化15.2%)| Quantlab5.0代码发布

原创文章第578篇&#xff0c;专注“AI量化投资、世界运行的规律、个人成长与财富自由"。 Quantlab5.0代码发布&#xff1a; 值得说明&#xff0c;Quantlab5与4没有继承关系&#xff0c;5开始的思路是&#xff1a; 1、尽量少封装&#xff0c;保留回测框架最原始的功能。…

【LabView学习篇 - 1】:初始LabView

文章目录 初始LabView前面板和程序框图前面板&#xff08;Front Panel&#xff09;程序框图&#xff08;Block Diagram&#xff09;交互和工作流程 练手小案例&#xff1a;LabView中实现加法操作 初始LabView LabVIEW&#xff08;Laboratory Virtual Instrument Engineering W…