Kafka-服务端-网络层-源码流程

整体架构如下所示:

在这里插入图片描述

responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue

  1. 客户端发送的请求被Acceptor转发给Processor处理
  2. 处理器将请求放到RequestChannel的requestQueue中
  3. KafkaRequestHandler取出requestQueue中的请求
  4. 调用KafkaApis进行业务逻辑处理
  5. KafkaApis将响应结果放到对应的Processor的responseQueue中
  6. processor从responseQueue中取出响应结果
  7. processor将响应结果返回给客户端

KafkaServer是Kafka服务端的主类,KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议,具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。

class KafkaServer(val config: KafkaConfig) {def startup() {socketServer = new SocketServer(config, metrics, time, credentialProvider)socketServer.startup(startupProcessors = false)/* start processing requests */apis = new KafkaApis(socketServer.requestChannel, ...)requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, ...)}}

SocketServer

  def startup(startupProcessors: Boolean = true) {this.synchronized {...createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}}private def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {...endpoints.foreach { endpoint =>...val acceptor = new Acceptor(endpoint, ...)addProcessors(acceptor, endpoint, processorsPerListener)KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()acceptor.awaitStartup()acceptors.put(endpoint, acceptor)}}

可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor,每个acceptor关联数个processor。这是一种典型的Reactor模式,acceptor负责与客户端建立连接,并将连接分发给processor,processor负责所分连接后续的所有读写交互。

Acceptor

  def run() {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor = 0while (isRunning) {try {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && isRunning) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {val processor = synchronized {currentProcessor = currentProcessor % processors.sizeprocessors(currentProcessor)}accept(key, processor)} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")// round robin to the next processor thread, mod(numProcessors) will be done latercurrentProcessor = currentProcessor + 1} catch {case e: Throwable => error("Error while accepting connection", e)}}}}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We don't want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable => throw ecase e: Throwable => error("Error occurred", e)}}} finally {debug("Closing server socket and selector.")CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)shutdownComplete()}}

上面是Acceptor的run()方法,可以看出,Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,客户端监听OP_CONNECT事件,负责发起连接,服务端监听OP_CONNECT事件,负责建立连接),负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。

Processor

每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示:

  override def run() {startupComplete()try {while (isRunning) {try {// 在新分到的客户端连接上注册OP_READ事件configureNewConnections()// 从responseQueue中取响应,赋值给KafkaChannel的send,等待poll时发送processNewResponses()// selector轮询各种事件,读取请求或者发送响应poll()// 封装selector.completedReceives中的请求,放入requestQueueprocessCompletedReceives()// 处理selector.completedSends响应(移除inflightResponses中的记录;执行响应的回调函数)processCompletedSends()processDisconnected()} catch {...}}} finally {...}}

Processor线程的名字中有kafka-network字样,可以通过jstack -l pid | grep kafka-network进行筛选。

KafkaRequestHandlerPool

KafkaServer会创建请求处理线程池(KafkaRequestHandlerPool),在请求处理线程池中会创建并启动多个请求处理线程(KafkaRequestHandler)。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理,在内部实际处理会交给KafkaApis完成。

class KafkaRequestHandlerPool(val brokerId: Int, ...) {...for (i <- 0 until numThreads) {createHandler(i)}def createHandler(id: Int): Unit = synchronized {runnables += new KafkaRequestHandler(..., requestChannel, apis, time)KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()}
}

KafkaRequestHandler的run()方法如下:

class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {...def run() {while (!stopped) {val req = requestChannel.receiveRequest(300)req match {case RequestChannel.ShutdownRequest =>shutdownComplete.countDown()returncase request: RequestChannel.Request =>try {request.requestDequeueTimeNanos = endTimeapis.handle(request)} catch {case e: FatalExitError =>shutdownComplete.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)} finally {request.releaseBuffer()}case null => // continue}}shutdownComplete.countDown()}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/40981.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度解析Java世界中的对象镜像:浅拷贝与深拷贝的奥秘与应用

在Java编程的浩瀚宇宙中&#xff0c;对象拷贝是一项既基础又至关重要的技术。它直接关系到程序的性能、资源管理及数据安全性。然而&#xff0c;提及对象拷贝&#xff0c;不得不深入探讨其两大核心类型&#xff1a;浅拷贝&#xff08;Shallow Copy&#xff09;与深拷贝&#xf…

防爆智能手机如何解决危险环境下通信难题?

在化工厂、石油行业、矿山等危险环境中&#xff0c;通信安全一直是难题。传统手机因不具备防爆功能&#xff0c;可能引发火花、爆炸等安全风险&#xff0c;让工作人员在关键时刻难以及时沟通。但如今&#xff0c;防爆智能手机的出现彻底改变了这一现状&#xff01; 安全通信&am…

手机如何充当电脑摄像头,新手使用教程分享(新)

手机如何充当电脑摄像头&#xff1f;随着科技的发展&#xff0c;智能手机已经成为我们日常生活中不可或缺的一部分。手机的摄像头除了拍摄记录美好瞬间之外&#xff0c;其实还有个妙用&#xff0c;那就是充当电脑的摄像头。手机摄像头充当电脑摄像头使用的话&#xff0c;我们就…

一分钟学习数据安全—自主管理身份SSI分布式加密密钥管理

在这篇之前&#xff0c;我们已经对SSI有了一个全局的了解。这个系列的文章可以作为一个学习笔记来参考&#xff0c;真正要实践其中的一些方案、协议&#xff0c;还需要参考专业的书籍和官方文档。作为一个SSI系列学习笔记的最后一篇&#xff0c;我们做一个简单的延伸&#xff0…

【中项第三版】系统集成项目管理工程师 | 第 9 章 项目管理概论① | 9.1 - 9.3

前言 第 9 章对应的内容选择题和案例分析都会进行考查&#xff0c;这一章节理论性较强&#xff0c;学习要以教材为准。本章分值预计在4-5分。 目录 9.1 PMBOK的发展 9.2 项目基本要素 9.2.1 项目基础 9.2.2 项目管理 9.2.3 项目成功的标准 9.2.4 项目、项目集、项目组合…

多态的优点

多态的优点 1、多态的优点1.1 可替换性&#xff08;Substitutability&#xff09;2、可扩充性&#xff08;Extensibility&#xff09; 2、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 1、多态的优点 在面向对象编程&#xff08;OOP…

无服务器【Serverless】架构的深度剖析:组件介绍、优缺点与适用场景

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《未来已来&#xff1a;云原生之旅》&#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、引言 1、云计算的发展趋势 2、无服务器计算简介 二、无服务…

红海云签约海新域集团,产业服务运营领军企业加速人力资源数字化转型

北京海新域城市更新集团有限公司&#xff08;以下简称“海新域集团”&#xff09;是北京市海淀国有资产投资集团有限公司一级监管企业&#xff0c;致力于成为国内领先的产业服务运营商。集团积极探索城市和产业升级新模式&#xff0c;通过对老旧、低效等空间载体重新定位规划、…

FPGA基本资源介绍

文章目录 FPGA资源介绍1.可编程输入输出单元(IOB)2.可配置逻辑块(CLB)3.数字时钟管理模块(DCM)4.嵌入式块RAM(BLOCK RAM / BRAM)4.1其他ram 5.丰富的布线资源6.底层内嵌功能单元7.内嵌专用硬核软核、硬核、以及固核的概念 FPGA资源介绍 1.可编程输入输出单元(IOB) 可编程输入…

C++视觉开发 五.答题卡识别

答题卡识别主要步骤 (1)反二值化&#xff0c;选项处理为前景&#xff08;白色&#xff09;&#xff0c;其它处理为背景&#xff08;黑色&#xff09;。 (2)每个选项提取出来&#xff0c;计算各选项白色像素点个数。 (3)筛选出白色像素点最多的选项作为考生答案。 (4)与标准答案…

【机器学习】连续字段的特征变换

介绍 除了离散变量的重编码外&#xff0c;有的时候我们也需要对连续变量进行转化&#xff0c;以提升模型表现或模型训练效率。在之前的内容中我们曾介绍了关于连续变量标准化和归一化的相关内容&#xff0c;对连续变量而言&#xff0c;标准化可以消除量纲影响并且加快梯度下降…

年化达21%(K=1),最大回撤35%,K=3时,卡玛比最优,最大回撤20%(年化15.2%)| Quantlab5.0代码发布

原创文章第578篇&#xff0c;专注“AI量化投资、世界运行的规律、个人成长与财富自由"。 Quantlab5.0代码发布&#xff1a; 值得说明&#xff0c;Quantlab5与4没有继承关系&#xff0c;5开始的思路是&#xff1a; 1、尽量少封装&#xff0c;保留回测框架最原始的功能。…

【LabView学习篇 - 1】:初始LabView

文章目录 初始LabView前面板和程序框图前面板&#xff08;Front Panel&#xff09;程序框图&#xff08;Block Diagram&#xff09;交互和工作流程 练手小案例&#xff1a;LabView中实现加法操作 初始LabView LabVIEW&#xff08;Laboratory Virtual Instrument Engineering W…

【CT】LeetCode手撕—93. 复原 IP 地址

目录 题目1- 思路2- 实现⭐93. 复原 IP 地址——题解思路 3- ACM 实现 题目 原题连接&#xff1a;93. 复原 IP 地址 1- 思路 模式识别&#xff1a;给一个 String 字符串 ——> 复原 IP 地址 ——> 回溯三部曲 &#xff0c;回溯的切割问题 ——> 实现一个左闭右闭区间…

利用redis数据库管理代理库爬取cosplay网站-cnblog

爬取cos猎人 数据库管理主要分为4个模块&#xff0c;代理获取模块&#xff0c;代理储存模块&#xff0c;代理测试模块&#xff0c;爬取模块 cos猎人已经倒闭&#xff0c;所以放出爬虫源码 api.py 为爬虫评分提供接口支持 import requests import concurrent.futures import …

Artificial Intelligence Self-study

Artificial Intelligence Self-study Traditional AI (Symbolic AI) 基于&#xff1a;符号表示 数理逻辑 搜索 - 有明确规则&#xff0c;依靠算力。Appliance &#xff1a; 数学难题(Heuristic Algorithm)&#xff0c;棋牌对抗(围棋)&#xff0c;专家系统(输入病症&#xf…

linux安装jdk1.8(无废话版)

文章目录 1、下载安装包2、创建文件目录&#xff0c;并将安装包上传到该目录下3、解压安装包4、配置环境变量5、加载配置文件6、验证 前言&#xff1a;linux系统以ubuntu20.04.6版本为例&#xff0c;jdk版本jdk-8u411-linux-x64.tar.gz版本为例 1、下载安装包 jdk下载地址&am…

2. 创建kvm虚拟机

创建kvm虚拟机 一、创建kvm虚拟机1、virt-manager 图形化工具2、virt-install 命令行工具3、查看虚拟机 一、创建kvm虚拟机 1、virt-manager 图形化工具 2、virt-install 命令行工具 [rootlocalhost ~]# virt-install --namevm02_centos79 \ > --graphics vnc,listen0.0.0…

Java的数据类型(复习版)

思维导图 一.字面常量 什么是常量&#xff1f;在我的理解看来常量就是在一个程序运行期间&#xff0c;保持不变的量就是常量。 例如&#xff1a; System.out.println(100);System.out.println(a);System.out.println(3.114);这些都可以称为常量。 字面常量的分类&#xff1a;…

护眼落地灯哪个牌子好?盘点五款必入不踩雷的护眼大路灯

护眼落地灯哪个牌子好&#xff1f;在这个快节奏的时代&#xff0c;护眼落地灯已经从一种高端选择转变为日常用眼生活中的必须品。不论是提升普通照明&#xff0c;还是针对孩子学习是改善光线质量环境&#xff0c;一款优秀的护眼落地灯都能成为我们生活中的照明神器。怎么选择一…