基于Kafka2.1解读Producer原理

文章目录

  • 前言
  • 一、Kafka Producer是什么?
  • 二、主要组件
    • 1.Kafka Producer
      • 1.1 partitioner
      • 1.2 keySerializer
      • 1.3 valueSerializer
      • 1.4 accumulator
      • 1.5 sender
    • 2.Sender
      • 2.1 acks
      • 2.2 client
      • 2.3 inFlightBatches
    • 3. Selector
      • 3.1 nioSelector
      • 3.2 channels
    • 4. 全局总览
    • 5. 一点思考
  • 总结


前言

相信现在的javer对于Kafka应该都很熟悉了,不管是八股文还是工作中使用。虽然Kafka server是scala写的,但是client是java写的,所以咱们理解client的代码还是比较容易的,今天先来基于源码解读下Kafka Producer的主体流程


一、Kafka Producer是什么?

这个应该不用过多介绍了吧,我们发送消息到MQ就是通过Kafka Producer来实现的。

二、主要组件

1.Kafka Producer

Kafka Producer的主要流程

1.1 partitioner

partitioner的作用是用来对发送的消息进行partition选择的,譬如msg要发送某个topic,topic里有多个partition,需要选择发往哪个partition。
如果我们发送消息时设置了partition key,那么就按照partition key进行hash,然后选择发送的partition
如果没有设置partition key,那么就默认使用轮询的方式来选择partition

1.2 keySerializer

顾名思义,就是对key进行序列化的工具类。因为我们的数据需要发送到网络里,所以数据必须序列化成二进制,所以需要进行序列化

1.3 valueSerializer

同上,和keySerializer类似的作用

1.4 accumulator

accumulator是比较重要的一个组件,主要功能包括:

  1. 选择/创建需要append数据的batch
  2. 把msg添加到该batch里面,获得batch里produceFuture

1.5 sender

sender是真正执行消息发送的组件,主要功能包括:

  1. 把可以发送的batch,组装成clientRequest,给底层的nioSelector注册写事件
  2. nioSelector处理读写事件,写事件处理时,将消息发送出去,更新batch的produceFuture让accumulator感知到消息发送结果
  3. 以下详细讲讲Sender的流程

2.Sender

Sender的主要流程

2.1 acks

acks是用来描述msg发送要怎样才能确认发送成功。
“-1”:全部副本应答,默认值
“0”:不需要任何应答,这种情况发送完立马认为发送成功
“1”:leader副本应该即认为发送成功

2.2 client

接口是KafkaClient,实现类是NetWorkClient

  1. send:
记录当前发送中的request=》inFlightRequests,
调用Selector进行消息发送
  1. poll:
调用Selector进行写事件处理:发送消息
清理一些数据:处理inFlightRequests
执行回调:handleProduceResponse:producerBatch.complete=》会更新batch里的produceFuture

2.3 inFlightBatches

记录当前当前正在被发送的batch

Map<TopicPartition, List<ProducerBatch>> inFlightBatches;

可以看到该Map的value是list类型,保证了同一个partition中的batch数据有序性

3. Selector

其实Selector的作用上面已经描述了,就是基于partition的node找到对应的channel,执行写事件注册和真实的消息发送

3.1 nioSelector

这个应该不用详细讲了,学过NIO的同学都知道,java的NIO Selector可以进行读写事件处理,就是通过selector的select方法,找到可处理的keys,然后基于不同的keys,拿到对应的channel,往channel写数据或者从channel读数据

3.2 channels

该channel是Kafka对java的channel进行的封装,得到的KafkaChannel,其实能够实现的功能就是暂存下可发送消息,以及调用java的channel 发送数据。此处是保存partition node和channel的映射关系。

4. 全局总览

Kafka Producer全局总览图

5. 一点思考

为什么Producer要分为accumulator和sender这两个重要组件呢?
为啥不像我们写业务代码一样,把消息append到batch里面之后,就直接发送消息?
答案是:功能定位不同,面对的处理对象也不同,所以解耦最合适。
accumulator的功能是追加消息;Sender的功能是网络发送消息
追加消息,面对的处理对象是消息;而网络发送消息,面对的处理对象是partition node,也就是创建网络连接的节点。
为了增加消息发送吞吐量,中间引入了batch。
所以把「追加消息」和「网络发送消息」分开让两个不同组件来处理,在功能的实现上进行解耦~


总结

Kafka的消息发送其实就是一个RPC的过程,有自己的网络协议、消息协议、消息序列化方式、数据批量发送(增加吞吐量)、超时处理、底层网络模型。
不过Producer对于batch的复用,其实还是有一定的厉害之处的,等下次有空分析下~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/54884.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCN-Transformer时间序列预测(多输入单预测)——基于Pytorch框架

1 数据集介绍 我们使用的数据集包含以下几个重要的属性&#xff1a; date&#xff08;日期&#xff09; open&#xff08;开盘价&#xff09; high&#xff08;最高价&#xff09; low&#xff08;最低价&#xff09; close&#xff08;收盘价&#xff09; pre_close&…

使用IOT-Tree Server制作一个边缘计算设备(Arm Linux)

最近实现了一个小项目&#xff0c;现场有多个不同厂家的设备&#xff0c;用户需要对此进行简单的整合&#xff0c;并实现一些联动控制。 我使用了IOT-Tree Server这个软件轻松实现了&#xff0c;不外乎有如下过程&#xff1a; 1&#xff09;使用Modbus协议对接现有设备&#…

无人机侦测:手提式无线电侦测设备技术详解

手提式无线电侦测设备在无人机侦测中扮演着重要角色&#xff0c;它主要通过侦测无人机与遥控器或地面站之间的无线电信号来实现对无人机的监测和定位。以下是对手提式无线电侦测设备技术的详细解析&#xff1a; 一、技术原理 手提式无线电侦测设备通过无线电侦测技术&#xf…

steam上传游戏问题汇总

问题 首先是Library Logo 必须是png图片&#xff0c;还必须带上游戏名字你的宣传图不能使用游戏内部的截图。Library_Hero必须是空白的&#xff0c;不能有任何文字。他是和Library_logo合并在一起的。这个法律其实没必要填写。然后我错误的把EULA填写在这里了也报错了 如果你在…

C++刷怪笼(7)string类

目录 1.前言 2.正文 2.1标准库中的string类 2.1.1string类 2.1.2auto和范围for 2.1.3string类的常用接口说明 2.2string类的模拟实现 2.2.1经典的string类问题 2.2.2浅拷贝 2.2.3深拷贝 ​编辑 2.2.4写时拷贝 3.小结 1.前言 前面我们对C的封装这一大特性进行了详细…

题目:圆桌会议

Problem - 1214 (hdu.edu.cn) 解题思路&#xff1a; 结果的顺序就是原序列的逆序&#xff0c;例如12345就是54321为结果顺序。同时将一个顺序序列&#xff08;非环&#xff09;变成逆序需要的次数为。想要的得到最短的交换次数&#xff0c;只需要将环尽量对半分&#xff0c;然后…

【万字长文】Word2Vec计算详解(三)分层Softmax与负采样

【万字长文】Word2Vec计算详解&#xff08;三&#xff09;分层Softmax与负采样 写在前面 第三部分介绍Word2Vec模型的两种优化方案。 【万字长文】Word2Vec计算详解&#xff08;一&#xff09;CBOW模型 markdown行 9000 【万字长文】Word2Vec计算详解&#xff08;二&#xff0…

Chromium 中chrome.cookies扩展接口c++实现分析

chrome.cookies 使用 chrome.cookies API 查询和修改 Cookie&#xff0c;并在 Cookie 发生更改时收到通知。 更多参考官网定义&#xff1a;chrome.cookies | API | Chrome for Developers (google.cn) 本文以加载一个清理cookies功能扩展为例 https://github.com/Google…

针对考研的C语言学习(循环队列-链表版本以及2019循环队列大题)

题目 【注】此版本严格按照数字版循环队列的写法&#xff0c;rear所代表的永远是空数据 图解 1.初始化部分和插入部分 2出队 3.分部代码解析 初始化 void init_cir_link_que(CirLinkQue& q) {q.rear q.front (LinkList)malloc(sizeof(LNode));q.front->next NULL…

Ansible 工具从入门到使用

1. Ansible概述 Ansible是一个基于Python开发的配置管理和应用部署工具&#xff0c;现在也在自动化管理领域大放异彩。它融合了众多老牌运维工具的优点&#xff0c;Pubbet和Saltstack能实现的功能&#xff0c;Ansible基本上都可以实现。 Ansible能批量配置、部署、管理上千台主…

基于Zynq SDIO WiFi移植一(支持2.4/5G)

基于SDIO接口的WIFI&#xff0c;在应用上&#xff0c;功耗低于USB接口&#xff0c;且无须USB Device支持&#xff0c;满足某些应用场景 1 硬件连接 2 Vivado工程配置 3 驱动编译 3.1 KERNRL CONFIG (build ENV) 修改 export KERNELPATH<path of kernel header>export T…

一种压缩QRCode矩阵以用于存储的方法

通常QRCode由服务器生成&#xff0c;以图片格式发送到客户端&#xff0c;由客户端直接展示&#xff0c;也可以由客户端使用javascript或其他内置的SDK直接生成。 0、需求 QRCode生成过程中往往是先生成矩阵&#xff0c;然后使用矩阵生成图片&#xff0c;矩阵就是由01组成的一…

[单master节点k8s部署]35.ingress 反向代理(二)

成功部署ingress controller [rootmaster 35ingress]# kubectl get pods -n kube-system NAME READY STATUS RESTARTS AGE calico-kube-controllers-7dc5458bc6-fpv96 1/1 Running 10 (4d16h ago) 9d calico-…

自动化测试selenium篇(二)

1. 操作测试对象 1.1 普通操作 测试代码如下所示&#xff1a; private static void Test03() throws InterruptedException {//创建一个驱动WebDriver webDriver new ChromeDriver();//打开百度首页webDriver.get("https://www.baidu.com");//找到百度搜索输入框…

通过AI技术克服自动化测试难点(下)

前面的文章里我们对可以应用到测试中的AI技术做了整体介绍&#xff0c;详细介绍了OpenCV技术、OCR技术和神经网络&#xff0c;本文我们继续为大家介绍卷积神经网络、数据集以及AI技术在其他方面和测试相关的创新。 卷积神经网络整体上的原理是这样的&#xff0c;首先在底层特征…

【linux系统】进程

文章目录 进程和PCBlinux与进程的相关命令PS linux下的PCB进程标识符父子进程fork 进程状态磁盘睡眠 -- D暂停和跟踪暂停 -- T和t僵尸进程 -- Z孤儿进程 进程优先级 进程地址空间再谈fork进程地址空间分布虚拟地址和页表mm_struct 进程控制进程终止进程退出码信号 进程等待进程…

【翻译】在 Python 应用程序中使用Qt Designer的UI文件

原文地址&#xff1a;Using a Designer UI File in Your Qt for Python Application 直接上图&#xff0c;上代码 将UI文件转为Python 为了演示&#xff0c;我们使用 Qt Widgets 简单示例说明。 这个应用程序由一个源文件 easing.py、一个 UI 文件 form.UI、一个资源文件 ea…

考研笔记之操作系统(四) - 文件管理

文件管理 1. 简介1.1 前情回顾1.2 文件的属性1.3 文件内部数据的组织方式1.4 操作系统向上提供的文件功能1.5 文件应如何放在外存 2. 文件的逻辑结构2.1 无结构文件2.2 有结构文件2.2.1 顺序文件2.2.2 索引文件2.2.3 索引顺序文件2.2.4 多级索引顺序文件 3. 文件目录3.1 基本概…

1422. 分割字符串的最大得分【字符串】

文章目录 1422. 分割字符串的最大得分解题思路Go代码 1422. 分割字符串的最大得分 1422. 分割字符串的最大得分 给你一个由若干 0 和 1 组成的字符串 s &#xff0c;请你计算并返回将该字符串分割成两个 非空 子字符串&#xff08;即 左 子字符串和 右 子字符串&#xff09;所…

使用3080ti运行blip2的

使用3080ti运行blip2的案例 注意&#xff01;blip2很吃显存&#xff0c;需要大于80GB显存的卡。我最后安装的所有包的版本信息&#xff08;python 3.9 &#xff09;以供参考&#xff1a; 首先&#xff0c;我在运行blip2的demo的时候显存用了80G以上&#xff0c;所以大家卡的显存…