kafka生产端之架构及工作原理

文章目录

  • 整体架构
  • 元数据更新

整体架构

消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢?下面我们来看一下生产者客户端的整体架构,如图所示。

在这里插入图片描述

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send0方法调用要么被阻塞,要么抛出异常,这个取决于参数max.b1ock.ms的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。通俗地说,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧漆。与此同时,将较小的ProducerRecord拼漆成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。

ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque>的保存形式转变成<Node,ListProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这单需要做一个应用逻辑层面到网络IO层面的转换。

在转换成<Node,ListProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

元数据更新

上面提及的InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于图中的InFlightRequests来说,图中展示了三个节点Node0、Node1和Node2,很明显Node1的负载最小。也就是说,Node1为当前的leastLoadedNodec选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
在这里插入图片描述

我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker节点的地址、端口等信息才能建立连接,最终才能将消息发送到Kafka,在这一过程中所需要的信息都属于元数据信息。

在上面的讲解中我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/69506.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32 Unix时间戳

Unix时间戳 Unix 时间戳&#xff08;Unix Timestamp&#xff09;定义为从UTC/GMT的1970年1月1日0时0分0秒开始所经过的秒数&#xff0c;不考虑闰秒 时间戳存储在一个秒计数器中&#xff0c;秒计数器为32位/64位的整型变量 世界上所有时区的秒计数器相同&#xff0c;不同时区通过…

SSM仓库物品管理系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.用户登录代码&#xff1a;2.保存物品信息代码&#xff1a;3.删除仓库信息代码&#xff1a; 一、项目演示 项目演示地址&#xff1a; 视频地址 二、项目介绍 项目描述&#xff1a;这是一个基于SSM框架开发的仓库…

[渗透测试]热门搜索引擎推荐— — shodan篇

[渗透测试]热门搜索引擎推荐— — shodan篇 免责声明&#xff1a;本文仅用于分享渗透测试工具&#xff0c;大家使用时&#xff0c;一定需要遵守相关法律法规。 除了shodan&#xff0c;还有很多其他热门的&#xff0c;比如&#xff1a;fofa、奇安信的鹰图、钟馗之眼等&#xff0…

绕组电感 - Ansys Maxwell 磁通链与电流

在本博客中&#xff0c;我将演示如何使用 Ansys Maxwell 中磁瞬态求解器的磁通链和电流结果来计算绕组电感。Ansys Maxwell 磁瞬态求解器在场计算中考虑了涡流效应&#xff0c;我将展示一种使用磁通链和电流结果来计算绕组电感的简单方法。 实际上&#xff0c;电感是非线性的…

Spring Boot牵手Redisson:分布式锁实战秘籍

一、引言 在当今的分布式系统架构中,随着业务规模的不断扩大和系统复杂度的日益增加,如何确保多个服务节点之间的数据一致性和操作的原子性成为了一个至关重要的问题。在单机环境下,我们可以轻松地使用线程锁或进程锁来控制对共享资源的访问,但在分布式系统中,由于各个服务…

Fiddler Classic(HTTP流量代理+半汉化)

目录 一、关于Fiddler (一) Fiddler Classic (二) Fiddler Everywhere (三) Fiddler Everywhere Reporter (四) FiddlerCore (五) 总结 二、 软件安全性 1. 软件安装包 2. 软件汉化dll 三、安装与半汉化 1. 正常打开安装包点击下一步安装即可&#xff0c;安装路径自…

C++适用于所有输入法的解决方案(切换输入法)

文章目录 1、方法 1&#xff1a;模拟按键切换到英文模式2、&#x1f680; 方法 2&#xff1a;直接切换到美式键盘&#xff08;适用于所有输入法&#xff09;3、&#x1f680; 方法 3&#xff1a;遍历所有输入法&#xff0c;选择第一个英文输入法4、&#x1f525; 结论5、&#…

AI大语言模型

一、AIGC和生成式AI的概念 1-1、AIGC Al Generated Content&#xff1a;AI生成内容 1-2、生成式AI&#xff1a;generative ai AIGC是生成式 AI 技术在内容创作领域的具体应用成果。 目前有许多知名的生成式 AI&#xff1a; 文本生成领域 OpenAI GPT 系列百度文心一言阿里通…

Visual Studio Code中文出现黄色框子的解决办法

Visual Studio Code中文出现黄色框子的解决办法 一、vsCode中文出现黄色框子-如图二、解决办法 一、vsCode中文出现黄色框子-如图 二、解决办法 点击 “文件”点击 “首选项”点击 “设置” 搜索框直接搜索unicode选择“文本编辑器”&#xff0c;往下滑动&#xff0c;找到“Un…

《艾尔登法环》运行时弹窗“由于找不到vcruntime140.dll,无法继续执行代码”要怎么解决?

宝子们&#xff0c;是不是在玩《艾尔登法环》的时候&#xff0c;突然弹出一个提示&#xff1a;“由于找不到vcruntime140.dll&#xff0c;无法继续执行代码”&#xff1f;这可真是让人着急上火&#xff01;别慌&#xff0c;今天就给大家唠唠这个文件为啥会丢&#xff0c;还有怎…

LabVIEW商业软件开发

在商业软件开发和仪器自动测试领域&#xff0c;LabVIEW以其图形化编程方式、高效的数据采集能力和强大的硬件集成优势&#xff0c;成为众多工程项目的核心开发工具。然而&#xff0c;商业软件的开发远不止编写代码和实现功能那么简单&#xff0c;尤其是在仪器自动测试领域&…

第40天:Web开发-JS应用VueJS框架Vite构建启动打包渲染XSS源码泄露代码审计

#知识点 1、安全开发-VueJS-搭建启动&打包安全 2、安全开发-VueJS-源码泄漏&代码审计 一、Vue搭建创建项目启动项目 1、Vue 框架搭建->基于nodejs搭建&#xff0c;安装nodejs即可 参考&#xff1a;https://cn.vuejs.org/ 已安装18.3或更高版本的Node.js 2、Vue 创建…

6、使用one-api管理统一管理大模型,并开始使用本地大模型

文章目录 本节内容介绍集中接入&#xff1a;将大模型统一管理起来当使用了大模型代理大模型代理示例 开源模型&#xff1a;如何使用Hugging Face上的模型modelscope使用 pipeline 调用模型用底层实现调用模型流式输出 如何在项目中使用开源模型使用 LangChain使用集中接入开始使…

Winform开发框架(蝇量级) MiniFramework V2.1

C/S框架网与2022年发布的一款蝇量级开发框架&#xff0c;适用于开发Windows桌面软件、数据管理应用系统、软件工具等轻量级软件&#xff0c;如&#xff1a;PLC上位机软件、数据采集与分析软件、或企业管理软件&#xff0c;进销存等。适合个人开发者快速搭建软件项目。 适用开发…

【漫话机器学习系列】087.常见的神经网络最优化算法(Common Optimizers Of Neural Nets)

常见的神经网络优化算法 1. 引言 在深度学习中&#xff0c;优化算法&#xff08;Optimizers&#xff09;用于更新神经网络的权重&#xff0c;以最小化损失函数&#xff08;Loss Function&#xff09;。一个高效的优化算法可以加速训练过程&#xff0c;并提高模型的性能和稳定…

傅里叶公式推导(一)

文章目录 三角函数系正交证明图观法数学证明法计算当 n不等于m当 n等于m&#xff08;重点&#xff09; 其它同理 首先要了解的一点基础知识&#xff1a; 三角函数系 { sin ⁡ 0 , cos ⁡ 0 , sin ⁡ x , cos ⁡ x , sin ⁡ 2 x , cos ⁡ 2 x , … , sin ⁡ n x , cos ⁡ n x ,…

1. 构建grafana(版本V11.5.1)

一、grafana官网 https://grafana.com/ 二、grafana下载位置 进入官网后点击downloads&#xff08;根据自己的需求下载&#xff09; 三、grafana安装&#xff08;点击下载后其实官网都写了怎么安装&#xff09; 注&#xff1a;我用的Centos&#xff0c;就简略的写下我的操作步…

macOS 上部署 RAGFlow

在 macOS 上从源码部署 RAGFlow-0.14.1&#xff1a;详细指南 一、引言 RAGFlow 作为一款强大的工具&#xff0c;在人工智能领域应用广泛。本文将详细介绍如何在 macOS 系统上从源码部署 RAGFlow 0.14.1 版本&#xff0c;无论是开发人员进行项目实践&#xff0c;还是技术爱好者…

快速集成DeepSeek到项目

DeepSeek API-KEY 获取 登录DeekSeek 官网&#xff0c;进入API 开放平台 2. 创建API-KEY 复制API-KEY进行保存&#xff0c;后期API调用使用 项目中集成DeepSeek 这里只展示部分核心代码&#xff0c;具体请查看源码orange-ai-deepseek-biz-starter Slf4j AllArgsConstructo…

保姆级教程Docker部署Zookeeper模式的Kafka镜像

目录 一、安装Docker及可视化工具 二、Docker部署Zookeeper 三、单节点部署 1、创建挂载目录 2、命令运行容器 3、Compose运行容器 4、查看运行状态 5、验证功能 四、部署可视化工具 1、创建挂载目录 2、Compose运行容器 3、查看运行状态 一、安装Docker及可视化工…