Kafka-服务端-网络层

Reactor模式

Kafka网络层采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程应该了解JavaNIO提供了实现Reactor模式的API。常见的单线程Java NIO的编程模式如图所示。

在这里插入图片描述
为了满足高并发的需求,也为了充分利用服务器的资源,服务端需要使用多线程来执行业务逻辑。我们对上述架构稍作调整,将网络读写的逻辑与业务处理的逻辑进行拆分,让其由不同的线程池来处理,从而实现多线程处理。设计架构如图所示。

在这里插入图片描述
通过将网络处理与业务逻辑进行切分后实现了上述设计,此设计中读取、写入、业务处理都实现了多线程处理,不再存在性能瓶颈。

但是,如果同一时间出现大量I/O事件,单个Selector就可能在分发事件时阻塞(或延时)而成为瓶颈。

我们可以将上述设计中单独的Selector对象扩展成多个,让它们监听不同的I/O事件,这样就可以避免单个Selector带来的瓶颈问题。设计如图所示。

在这里插入图片描述

SocketServer

Kafka的网络层是采用多线程、多个Selector的设计实现的。核心类是SocketServer,其中包含一个Acceptor用于接受并处理所有的新连接,每个Acceptor对应多个Processor线程,每个Processor线程拥有自己的Selector,主要用于从连接中读取请求和写回响应。每个Acceptor对应多个Handler线程,主要用于处理请求并将产生响应返回给Processor线程。Processor线程与Handler线程之间通过RequestChannel进行通信。整个网络层的结构如图所示。

在这里插入图片描述
下面介绍SocketServer的具体实现。首先来看SocketServer依赖的组件,如图所示。

在这里插入图片描述

AbstractServerThread

Acceptor和Processor都继承了AbstractServerThread,如图所示,AbstractServerThread是实现了Runnable接口的抽象类。在AbstractServerThread中为Acceptor和Processor提供了一些启动关闭相关的控制类方法。

在这里插入图片描述

Acceptor

Acceptor的主要功能是接收客户端建立连接的请求,创建Socket连接并分配给Processor处理。

Acceptor中有两个比较重要的字段:一个是Java NIO Selector,注意不要与前面介绍的KSelector混淆;二是用于接收客户端请求的ServerSocketChannel对象。在创建Acceptor时会初始化上面两个字段,同时还会创建并启动其管理的Processors线程。

Acceptor.accept()方法实现了对OP_ACCEPT事件的处理,它会创建SocketChannel并将其交给Processoraccept方法处理,同时还会增加ConnectionQuotas中记录的连接数。

Processor

Processor主要用于完成读取请求和写回响应的操作,Processor不参与具体业务逻辑的处理。Processor的核心字段如下所述,在创建Processor对象时会初始化这些字段。

  • newConnections:ConcurrentLinkedQueue[SocketChannel]类型,其中保存了由此Processor处理的新建的SocketChannel。
  • inflightResponses:保存未发送的响应。有读者可能会将inflightResponses与客户端的InFlightRequests进行类比,但也要注意其区别,客户端并不会对服务端发送的响应消息再次发送确认,所以infightResponse中的响应会在发送成功后移除,而InFlightRequests中的请求是在收到响应后才移除。
  • selector:KSelector类型,负责管理网络连接。
  • requestChannel:Processor与Handler线程之间传递数据的队列。

在Acceptoraccept方法中创建的SocketChannel会通过Processor.accept方法交给Processor进行处理。

Processoraccpet方法接收到一个新的SocketChannel时会先将其放入newConnections队列中,然后会唤醒Processor线程来处理newConnections队列。

注意,newConnections队列由Acceptor线程和Processor线程并发操作,所以选择线程安全的ConcurrentLinkedQueue。

在Processor.run()方法中实现了从网络连接上读写数据的功能。run()方法的流程如图所示。

在这里插入图片描述
如果Response是SendAction类型,表示该Response需要发送给客户端,则查找对应的KafkaChannel,为其注册OP_WRITE事件,并将KafkaChannel.send字段指向待发送的Response对象。

同时还会将Response从responseQueue队列中移出,放入infightResponses中。如果关心OP_WRITE事件的取消时机,可以回顾KafkaChannel.send方法,即发送完一个完整的响应后,会取消此连接注册的OP_WRITE事件。

如果Response是NoOpAction类型,表示此连接暂无响应需要发送,则为KafkaChannel注册OP_READ,允许其继续读取请求。

如果Response是CloseConnectionAction类型,则关闭对应的连接。

RequestChannel

Processor线程与Handler线程之间传递数据是通过RequestChannel完成的。

在RequestChannel中包含了一个requestQueue队列和多个responseQueues队列,每个Processor线程对应一个responseQueue。

Processor线程将读取到的请求存入requestQueue中,Handler线程从requestQueue队列中取出请求进行处理;Handler线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程从其对应的responseQueue中取出响应并发送给客户端。RequestChannel的结构如图所示。

在这里插入图片描述
在RequestChannel中保存的是RequestChannel.Request和RequestChannel.Response两个类的对象。

RequestChannel.Request会对请求进行解析,形成requestld(请求类型 ID)、header(请求头)、body(请求体)等字段,供Handler线程使用,并提供了一些记录操作时间的字段供监控程序使用。

RequestChannel.Response需要注意其responseAction字段,有SendAction、NoOpAction、CloseConnectionAction三种 类 型。

当请求放入RequestChannel.requestQueue之后,会有多个Handler线程并发处理从其中取出请求处理,那如何保证客户端请求的顺序性呢?在Processorrun方法,其中有多处注册/取消OP_READ事件以及注册/取消OP_WRITE事件的操作,通过这些操作的组合可以保证每个连接上只有一个请求和一个对应的响应,从而实现请求的顺序性。

现在回头来总结一个请求数据从生产者发送到服务端的流转过程,如图所示。

在这里插入图片描述
KafkaProducer线程创建ProducerRecord后,会将其缓存进RecordAccumulator。

Sender线程从RecordAccumulator中获取缓存的消息,放入KafkaChannel.send字段中等待发送,同时放入InFlightRequests队列中等待响应。

之后,客户端会通过KSelector将请求发送出去。

在服务端,Processor线程使用KSelector读取请求并暂存到stageReceives队列中,KSelector.poll方法结束后,请求被移转移到completeReceives队列中。之后,Processor将请求进行一些解析操作后,放入RequestChannel.requestQueue队列。

Handler线程会从RequestChannel.requestQueue队列中取出请求进行处理,将处理之后生成的响应放入RequestChannel.responseQueue队列。

Processor线程从其对应的RequestChannel.responseQueue队列中取出响应并放入inflightResponses队列中缓存,当响应发送出去之后会将其从inflightResponse中删除。生产者读取响应的过程与服务端读取请求的过程类似,主要的区别是生产者需要对InFlightRequest中的请求进行确认。

Kafka网络层的设计原理和实现就介绍到这里了。在高性能的分布式框架中经常采用这种Reactor模式的设计,例如,HDFS RPC框架的服务端、ZooKeeper等。也有实现了Reactor模式的框架,例如,Netty和Mina。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640723.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Selenium 自动化截取网页指定区域截图

一. 需求 在本篇博客中,我将分享一段使用Python编写的自动化网页截图代码,该代码基于Selenium和PIL库,可用于截取网页中指定区域的截图。这样的功能对于需要定期监控特定网页内容或进行网页数据采集的任务非常有用。 二. 代码解析 首先&am…

中间件存储设计 - 数组与链表

文章目录 数组ArrayListLinkedListHashMap小结 中间件主要包括如下三方面的基础:数据结构、JUC 和 Netty,接下来,我们先讲数据结构。 数据结构主要解决的是数据的存储方式问题,是程序设计的基座。 按照重要性和复杂程度&#xf…

Python fork方法:创建新进程

除可以进行多线程编程之外,Python 还支持使用多进程来实现并发编程。 Python 的 os 模块提供了一个 fork() 方法,该方法可以 fork 出来一个子进程。简单来说,fork() 方法的作用在于,程序会启动两个进程(一个是父进程&…

《WebKit 技术内幕》学习之十(1): 插件与JavaScript扩展

虽然目前的浏览器的功能很强 ,但仍然有其局限性。早期的浏览器能力十分有限,Web前端开发者希望能够通过一定的机制来扩展浏览器的能力。早期的方法就是插件机制,现在流行次啊用混合编程(Hybird Programming)模式。插件…

大模型实战营 Day5作业

基础作业: 使用 LMDeploy 以本地对话、网页Gradio、API服务中的一种方式部署 InternLM-Chat-7B 模型,生成 300 字的小故事(需截图) TurboMind 推理命令行本地对话 lmdeploy chat turbomind /share/temp/model_repos/internlm-cha…

RK3568平台 TinyAlsa集成第三方音频算法

一.tinyalsa介绍 ALSA(Advanced Linux Sound Architecture)是一个开源项目,涵盖了用户空间和内核空间对音频设备的操作接口,通过应用层使用alsalib可以实现对音频设备的控制 TinyAlsa是android推出的一个精简的ALSA库&#xff0c…

c++中STL的vector简单实现

文章目录 vector构造函数 vector()拷贝构造 vector()析构函数 ~vector()iterator 的定义begin()与const版本end()与const版本增删改查尾插push_back()尾删pop_back()指定位置插入insert()指定位置删除 erase() operator[]与const版本容量增容reserve()设置容量 resize() 成员函…

npm源更换、卡住解决方式

sill idealTree buildDeps解决方案 1. 删除用户C:\Users\{账户}\下的.npmrc文件 2. npm cache verify 3. npm config set registry https://registry.npmmirror.com更换镜像源 //任选其一 npm config set registry https://registry.npmmirror.com npm config set registry h…

Python 基于pytorch从头写GPT模型;实现gpt实战

1.定义缩放点积注意力类 import numpy as np # 导入 numpy 库 import torch # 导入 torch 库 import torch.nn as nn # 导入 torch.nn 库 d_k 64 # K(Q) 维度 d_v 64 # V 维度 # 定义缩放点积注意力类 class ScaledDotProductAttention(nn.Module):def __init__(self):super…

spring boot kafka 发送消息 完整的例子工程

以下是一个简单的Spring Boot Kafka发送消息的完整例子&#xff1a; 首先&#xff0c;添加Spring Boot Kafka的依赖到你的pom.xml文件&#xff1a; xml <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId…

【教程】npm的时候ssh报错ssh://git@github.com/frozeman/bignumber.js-nolookahead.git

问题&#xff1a; fiscoubuntu:~/fisco/benchmarks$ npm install install web30.20.7 npm ERR! code 128 npm ERR! An unknown git error occurred npm ERR! command git --no-replace-objects ls-remote ssh://gitgithub.com/frozeman/bignumber.js-nolookahead.git npm ERR! …

DAY27:回溯(组合问题39、40、分割回文串131)

Leetcode: 39 组合总和 基本思路 本题没有组合数字的要求&#xff0c;只有对组合总和的要求&#xff0c;因此返回条件有两个&#xff0c;等于sum的时候收集结果&#xff0c;如果大于sum了就直接跳过。 组合没有数量要求元素可以重复拾取 这题的难点在于可以反复取值。因此对…

纯c实现栈和队列 数据结构大全

栈 栈是一种后进先出的数据结构&#xff0c;可以用数组来模拟实现&#xff0c;掌握必要的数据结构是非常的有必要的 一样是先打出头文件 #pragma once#include <stdio.h> #include <stdlib.h> #include <string.h> #include <stdbool.h> #include &…

Spring Cloud 系列:Seata 中TCC模式具体实现

概述 https://seata.io/zh-cn/docs/dev/mode/tcc-mode https://seata.io/zh-cn/docs/user/mode/tcc TCC模式与AT模式非常相似&#xff0c;每阶段都是独立事务&#xff0c;不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法&#xff1a; Try&#xff1a;资源的检测和…

第4章-IP基本原理

目录 1. IP协议概述 1.1. 定义 1.2. 功能 1.3. IP网络的结构 1.4. IP头格式 2. IP地址和地址映射 3. IP包转发 4. 其他相关协议介绍 1. IP协议概述 1.1. 定义 IP协议&#xff1a;IP协议是网际互连协议&#xff1b; 工作层次&#xff1a;网络层&#xff1b; 封装&#…

dubbo:服务暴露

节点角色说明&#xff1a; Provider:暴露服务的服务提供方。 Consumer::调用远程服务的服务消费方。 Registry:服务注册与发现的注册中心。 Monitor:统计服务的调用次调和调用时间的监控中心。 Container:服务运行容器。 调用关系说明&#xff1a; 0.服务容器负责启动&#xff…

js之执行上下文和作用域

定义 变量和函数的上下文决定了它们可以访问那些数据&#xff0c;以及它们的行为 每个上下文都有一个关联的变量对象&#xff0c;而这个上下文中定义的所有变量和函数都在存在于这个变量对象之上 上下文再其所以代码都执行完毕之后会被销毁&#xff0c;包括定义在它上面的所有…

研究生开题报告撰写:文言一心VSChatgpt3.5

文言一心 问&#xff1a;我是一名研二学生&#xff0c;请帮我生成一份研究生毕设开题答辩ppt框架。 答&#xff1a;好的&#xff0c;以下是一份研究生毕设开题答辩PPT的框架&#xff0c;供您参考&#xff1a; 幻灯片1&#xff1a;封面页 标题&#xff1a;研究生毕设开题答辩…

大数据学习之Flink、10分钟了解Flink的核心组件以及它们的工作原理

第一章、Flink的容错机制 第二章、Flink核心组件和工作原理 第三章、Flink的恢复策略 第四章、Flink容错机制的注意事项 第五章、Flink的容错机制与其他框架的容错机制相比较 目录 第二章、Flink核心组件和工作原理 Ⅰ、核心组件 1. Checkpoint组件&#xff1a; 2. Sav…

HTML+JavaScript-03

JavaScript函数 定义函数的格式 JavaScript 函数是通过 function 关键词定义的。 function sum(a, b){//函数体return a b;//返回值为a、b的和 }函数的调用 调用函数时直接书写函数名即可 function show(){alert("函数show被调用"); } show();函数的相互调用 …