Kafka-服务端-网络层

Reactor模式

Kafka网络层采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程应该了解JavaNIO提供了实现Reactor模式的API。常见的单线程Java NIO的编程模式如图所示。

在这里插入图片描述
为了满足高并发的需求,也为了充分利用服务器的资源,服务端需要使用多线程来执行业务逻辑。我们对上述架构稍作调整,将网络读写的逻辑与业务处理的逻辑进行拆分,让其由不同的线程池来处理,从而实现多线程处理。设计架构如图所示。

在这里插入图片描述
通过将网络处理与业务逻辑进行切分后实现了上述设计,此设计中读取、写入、业务处理都实现了多线程处理,不再存在性能瓶颈。

但是,如果同一时间出现大量I/O事件,单个Selector就可能在分发事件时阻塞(或延时)而成为瓶颈。

我们可以将上述设计中单独的Selector对象扩展成多个,让它们监听不同的I/O事件,这样就可以避免单个Selector带来的瓶颈问题。设计如图所示。

在这里插入图片描述

SocketServer

Kafka的网络层是采用多线程、多个Selector的设计实现的。核心类是SocketServer,其中包含一个Acceptor用于接受并处理所有的新连接,每个Acceptor对应多个Processor线程,每个Processor线程拥有自己的Selector,主要用于从连接中读取请求和写回响应。每个Acceptor对应多个Handler线程,主要用于处理请求并将产生响应返回给Processor线程。Processor线程与Handler线程之间通过RequestChannel进行通信。整个网络层的结构如图所示。

在这里插入图片描述
下面介绍SocketServer的具体实现。首先来看SocketServer依赖的组件,如图所示。

在这里插入图片描述

AbstractServerThread

Acceptor和Processor都继承了AbstractServerThread,如图所示,AbstractServerThread是实现了Runnable接口的抽象类。在AbstractServerThread中为Acceptor和Processor提供了一些启动关闭相关的控制类方法。

在这里插入图片描述

Acceptor

Acceptor的主要功能是接收客户端建立连接的请求,创建Socket连接并分配给Processor处理。

Acceptor中有两个比较重要的字段:一个是Java NIO Selector,注意不要与前面介绍的KSelector混淆;二是用于接收客户端请求的ServerSocketChannel对象。在创建Acceptor时会初始化上面两个字段,同时还会创建并启动其管理的Processors线程。

Acceptor.accept()方法实现了对OP_ACCEPT事件的处理,它会创建SocketChannel并将其交给Processoraccept方法处理,同时还会增加ConnectionQuotas中记录的连接数。

Processor

Processor主要用于完成读取请求和写回响应的操作,Processor不参与具体业务逻辑的处理。Processor的核心字段如下所述,在创建Processor对象时会初始化这些字段。

  • newConnections:ConcurrentLinkedQueue[SocketChannel]类型,其中保存了由此Processor处理的新建的SocketChannel。
  • inflightResponses:保存未发送的响应。有读者可能会将inflightResponses与客户端的InFlightRequests进行类比,但也要注意其区别,客户端并不会对服务端发送的响应消息再次发送确认,所以infightResponse中的响应会在发送成功后移除,而InFlightRequests中的请求是在收到响应后才移除。
  • selector:KSelector类型,负责管理网络连接。
  • requestChannel:Processor与Handler线程之间传递数据的队列。

在Acceptoraccept方法中创建的SocketChannel会通过Processor.accept方法交给Processor进行处理。

Processoraccpet方法接收到一个新的SocketChannel时会先将其放入newConnections队列中,然后会唤醒Processor线程来处理newConnections队列。

注意,newConnections队列由Acceptor线程和Processor线程并发操作,所以选择线程安全的ConcurrentLinkedQueue。

在Processor.run()方法中实现了从网络连接上读写数据的功能。run()方法的流程如图所示。

在这里插入图片描述
如果Response是SendAction类型,表示该Response需要发送给客户端,则查找对应的KafkaChannel,为其注册OP_WRITE事件,并将KafkaChannel.send字段指向待发送的Response对象。

同时还会将Response从responseQueue队列中移出,放入infightResponses中。如果关心OP_WRITE事件的取消时机,可以回顾KafkaChannel.send方法,即发送完一个完整的响应后,会取消此连接注册的OP_WRITE事件。

如果Response是NoOpAction类型,表示此连接暂无响应需要发送,则为KafkaChannel注册OP_READ,允许其继续读取请求。

如果Response是CloseConnectionAction类型,则关闭对应的连接。

RequestChannel

Processor线程与Handler线程之间传递数据是通过RequestChannel完成的。

在RequestChannel中包含了一个requestQueue队列和多个responseQueues队列,每个Processor线程对应一个responseQueue。

Processor线程将读取到的请求存入requestQueue中,Handler线程从requestQueue队列中取出请求进行处理;Handler线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程从其对应的responseQueue中取出响应并发送给客户端。RequestChannel的结构如图所示。

在这里插入图片描述
在RequestChannel中保存的是RequestChannel.Request和RequestChannel.Response两个类的对象。

RequestChannel.Request会对请求进行解析,形成requestld(请求类型 ID)、header(请求头)、body(请求体)等字段,供Handler线程使用,并提供了一些记录操作时间的字段供监控程序使用。

RequestChannel.Response需要注意其responseAction字段,有SendAction、NoOpAction、CloseConnectionAction三种 类 型。

当请求放入RequestChannel.requestQueue之后,会有多个Handler线程并发处理从其中取出请求处理,那如何保证客户端请求的顺序性呢?在Processorrun方法,其中有多处注册/取消OP_READ事件以及注册/取消OP_WRITE事件的操作,通过这些操作的组合可以保证每个连接上只有一个请求和一个对应的响应,从而实现请求的顺序性。

现在回头来总结一个请求数据从生产者发送到服务端的流转过程,如图所示。

在这里插入图片描述
KafkaProducer线程创建ProducerRecord后,会将其缓存进RecordAccumulator。

Sender线程从RecordAccumulator中获取缓存的消息,放入KafkaChannel.send字段中等待发送,同时放入InFlightRequests队列中等待响应。

之后,客户端会通过KSelector将请求发送出去。

在服务端,Processor线程使用KSelector读取请求并暂存到stageReceives队列中,KSelector.poll方法结束后,请求被移转移到completeReceives队列中。之后,Processor将请求进行一些解析操作后,放入RequestChannel.requestQueue队列。

Handler线程会从RequestChannel.requestQueue队列中取出请求进行处理,将处理之后生成的响应放入RequestChannel.responseQueue队列。

Processor线程从其对应的RequestChannel.responseQueue队列中取出响应并放入inflightResponses队列中缓存,当响应发送出去之后会将其从inflightResponse中删除。生产者读取响应的过程与服务端读取请求的过程类似,主要的区别是生产者需要对InFlightRequest中的请求进行确认。

Kafka网络层的设计原理和实现就介绍到这里了。在高性能的分布式框架中经常采用这种Reactor模式的设计,例如,HDFS RPC框架的服务端、ZooKeeper等。也有实现了Reactor模式的框架,例如,Netty和Mina。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640723.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中间件存储设计 - 数组与链表

文章目录 数组ArrayListLinkedListHashMap小结 中间件主要包括如下三方面的基础:数据结构、JUC 和 Netty,接下来,我们先讲数据结构。 数据结构主要解决的是数据的存储方式问题,是程序设计的基座。 按照重要性和复杂程度&#xf…

Python fork方法:创建新进程

除可以进行多线程编程之外,Python 还支持使用多进程来实现并发编程。 Python 的 os 模块提供了一个 fork() 方法,该方法可以 fork 出来一个子进程。简单来说,fork() 方法的作用在于,程序会启动两个进程(一个是父进程&…

《WebKit 技术内幕》学习之十(1): 插件与JavaScript扩展

虽然目前的浏览器的功能很强 ,但仍然有其局限性。早期的浏览器能力十分有限,Web前端开发者希望能够通过一定的机制来扩展浏览器的能力。早期的方法就是插件机制,现在流行次啊用混合编程(Hybird Programming)模式。插件…

大模型实战营 Day5作业

基础作业: 使用 LMDeploy 以本地对话、网页Gradio、API服务中的一种方式部署 InternLM-Chat-7B 模型,生成 300 字的小故事(需截图) TurboMind 推理命令行本地对话 lmdeploy chat turbomind /share/temp/model_repos/internlm-cha…

RK3568平台 TinyAlsa集成第三方音频算法

一.tinyalsa介绍 ALSA(Advanced Linux Sound Architecture)是一个开源项目,涵盖了用户空间和内核空间对音频设备的操作接口,通过应用层使用alsalib可以实现对音频设备的控制 TinyAlsa是android推出的一个精简的ALSA库&#xff0c…

【教程】npm的时候ssh报错ssh://git@github.com/frozeman/bignumber.js-nolookahead.git

问题: fiscoubuntu:~/fisco/benchmarks$ npm install install web30.20.7 npm ERR! code 128 npm ERR! An unknown git error occurred npm ERR! command git --no-replace-objects ls-remote ssh://gitgithub.com/frozeman/bignumber.js-nolookahead.git npm ERR! …

Spring Cloud 系列:Seata 中TCC模式具体实现

概述 https://seata.io/zh-cn/docs/dev/mode/tcc-mode https://seata.io/zh-cn/docs/user/mode/tcc TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法: Try:资源的检测和…

第4章-IP基本原理

目录 1. IP协议概述 1.1. 定义 1.2. 功能 1.3. IP网络的结构 1.4. IP头格式 2. IP地址和地址映射 3. IP包转发 4. 其他相关协议介绍 1. IP协议概述 1.1. 定义 IP协议:IP协议是网际互连协议; 工作层次:网络层; 封装&#…

dubbo:服务暴露

节点角色说明: Provider:暴露服务的服务提供方。 Consumer::调用远程服务的服务消费方。 Registry:服务注册与发现的注册中心。 Monitor:统计服务的调用次调和调用时间的监控中心。 Container:服务运行容器。 调用关系说明: 0.服务容器负责启动&#xff…

HTML+JavaScript-03

JavaScript函数 定义函数的格式 JavaScript 函数是通过 function 关键词定义的。 function sum(a, b){//函数体return a b;//返回值为a、b的和 }函数的调用 调用函数时直接书写函数名即可 function show(){alert("函数show被调用"); } show();函数的相互调用 …

使用Docker部署Apache Superset结合内网穿透实现远程访问本地服务

文章目录 前言1. 使用Docker部署Apache Superset1.1 第一步安装docker 、docker compose1.2 克隆superset代码到本地并使用docker compose启动 2. 安装cpolar内网穿透,实现公网访问3. 设置固定连接公网地址 前言 Superset是一款由中国知名科技公司开源的“现代化的…

PSEUDO-LIDAR++:自动驾驶中 3D 目标检测的精确深度

论文地址:PSEUDO-LIDAR: ACCURATE DEPTH FOR 3D OBJECT DETECTION IN AUTONOMOUS DRIVING 论文代码:https://github.com/mileyan/Pseudo_Lidar_V2 摘要 3D 检测汽车和行人等物体在自动驾驶中发挥着不可或缺的作用。现有方法很大程度上依赖昂贵的激光雷…

华为数通方向HCIP-DataCom H12-831题库(判断题:81-100)

第81题 基本QinQ能根据业务种类选择外层Tag封装的方式 正确 错误 答案: 错误 解析: 基本QinQ是基于端口方式实现的,不能根据业务种类选择外层TAG封装的方式。 第82题 display interface GE0/0/0-次,发现Total Error计数(该接口物理层的错误报文总数目)不是此时可以断定当前…

登录页面(附源码)

特色&#xff1a; 点击登录之后卡片翻转效果 话不多说&#xff0c;看展示。 还在等什么&#xff0c;赶快白嫖起来吧 HTML文件 <div id"window" style"display: none"><div class"page page-front"><div class"page-content&…

SCCB接口

文章目录 概述引脚传输时序起始/结束信号三线模式两线模式 传输周期3阶段写传输周期2阶段写传输周期2阶段读传输周期阶段一 ID Address阶段二 子地址/读数据阶段三 写数据 SCCB与IIC区别未完待续(还有代码&#xff09;... 概述 SCCB&#xff08;Serial Camera Control Bus&…

aspose-cells-20.7.jar 去除水印及次数限制

1.使用 jd-gui.exe 反编译查看&#xff0c;直接搜索 License 1.修改 public static boolean isLicenseSet() {return (a ! null);}改成 public static boolean isLicenseSet() {return true;}2.修改 public void setLicense(InputStream stream) {Document document null;if (…

【EI会议征稿通知】2024年第四届人工智能、自动化与高性能计算国际会议(AIAHPC 2024)

2024年第四届人工智能、自动化与高性能计算国际会议&#xff08;AIAHPC 2024&#xff09; 2024 4th International Conference on Artificial Intelligence, Automation and High Performance Computing 2024第四届人工智能、自动化与高性能计算国际会议(AIAHPC 2024)将于20…

SpringMVC:拦截器

一般我们会做一些统一的操作这个时候我们需要使用springmvc提供的拦截器&#xff0c;例如token的验证&#xff0c;字段必填的操作&#xff0c;接口超时判断&#xff0c;签名验证&#xff0c;字段加密等操作&#xff0c;所以我们需要了解执行先后顺序。 我们来简单介绍下实现过程…

第6章-路由器、交换机及其操作系统介绍

目录 1. 路由器与交换机的作用与特点 1.1. 路由器 1.2. 交换机 1.3. 路由交换 2. H3C路由器与交换机介绍 3. H3C网络设备操作系统Comware 1. 路由器与交换机的作用与特点 1.1. 路由器 1、定义&#xff1a;路由器&#xff08;Router&#xff09;是连接两个或多个网络的硬…

穿越Flink的时间隧道:解锁实时数据之窗,掌握流处理之巅

目录 Flink中的时间和窗口 1时间语义 1.1Flink中的时间语义 1.1.1处理时间 1.1.2事件时间 1.2那种时间语义更重要 2 水位线 2.1 事件时间和窗口 2.2 什么是水位线 2.3 如何生成水位线 2.3.1使用WatermarkGenerator 2.3.2使用SourceFunction 2.4 水位线的传递 2.5 水位…