Kafka源码简要分析

目录

一、生产者的初始化流程

二、生产者到缓冲队列的流程

三、Sender拉取数据到Kafka流程

四、消费者初始化

五、主题订阅原理

六、消费者抓取数据原理

七、消费者组初始化

八、消费者组消费流程

九、提交offset原理


一、生产者的初始化流程

  1. 首先获取事务id和客户端id(用到事物必须要事物id不然报错,每个生产者都需要唯一标识客户端id)
  2. 监控kafka相关情况的JmxReporter配置
  3. 然后获取分区器,如果用户有自定义的就读取配置的,如果没有配置就用默认分区器
  4. 然后key和value进行序列化
  5. 然后就读取自定义拦截器,可以定义多个拦截器,组成拦截器链
  6. 然后初始化控制单条日志的大小,默认是1m;缓冲区大小,默认32m;
  7. 创建内存池,缓存队列,初始化批次大小默认16k,压缩相关处理,默认是none,重试间隔时间默认100ms
  8. 连接kafka集群,获取元数据,才能知道要发送到哪个分区
  9. 创建sender线程,会有个创建sender的方法,sender线程负责拉取缓冲队列消息到Kafka,在方法里面会定义缓存请求的个数默认5个,然后请求超时的时间,然后创建一个网络请求客户端对象,会传入刚刚的参数还有客户端id,重试时间,发送缓冲区的大小128和接受缓冲区的大小32,还有acks等配置。sender继承了Runnbale接口,然后会new个sender线程出来用上面这些参数,然后返回。
  10. sender放到后台,启动sender线程

二、生产者到缓冲队列的流程

  1. 在执行到拦截器的时候就要调用一个onSend方法,如果有多个拦截器,每个拦截器都会走一次这个方法,这个方法就是拦截器对数据加工的
  2. 然后获取元数据,要根据主题的分区放到对应的缓存队列
  3. 序列化相关操作key和value的序列化和压缩
  4. 分区操作,如果指定了分区,直接分配到指定分区;没有指定就会根据分区器进行分配,没有指定key就会粘性分区处理(如果批次大小和活着时间到了不然就一直是那个,满足才能创建新队列用),如果指定key就根据key到hashcode进分区数取模,
  5. 保证(序列化和压缩后)数据大小能够传输,他去读取配置的消息最大值和缓冲区大小,如果有超过的抛异常
  6. 向缓存队列里面追加数据,获取或者创建一个队列按照分区,然后尝试添加数据(一般不成功,因为还没申请内存),然后根据16k和现在压缩后的总大小取最大值,申请内存就申请这个大小,内存池分配内存,然后sender线程拿走就了会释放内存。
  7. 如果批次大小满了或者有了新的批次需要创建,就唤醒sender线程把缓冲队列的数据拉取过去。

三、Sender拉取数据到Kafka流程

  1. 事务相关操作
  2. 获取元数据信息,为了知道发到哪个分区
  3. 判断32m缓存是否准备好,先获取队列的信息,先判断内存队列有没有数据
  4. 判断leader是不是空如果没有目标那还是会抛出异常,如果批次大小或时间满足一个条件,就会发送。
  5. 把所有请求按照节点为单位来发送请求,这样一台机器只需要建立一次连接
  6. 封装了个request然后通过网络客户端把数据发送过去
  7. 然后服务端还是通过网络客户端获取结果

四、消费者初始化

  1. 消费者组平衡
  2. 获取消费者组id和客户端id
  3. 设置请求服务端等待时间,默认30秒;重试时间,默认100毫秒
  4. 拦截器链相关处理
  5. key和value的反序列化
  6. 判断offset从什么位置开始消费
  7. 获取消费者元数据(重试时间、是否允许访问系统主题默认false,是否允许自动创建topic主题默认true)
  8. 连接Kafka集群
  9. 创建网络客户端对象(连接重试时间默认50ms,最大重试时间1s,发送缓冲区128kb和接受缓冲区64kb大小)
  10. 指定消费者分区分配策略
  11. 创建coordinator对象
  12. 设置自动提交offset时间,默认5s,配置抓取数据的参数(最少抓取多少最大一次抓取多少等)

五、主题订阅原理

  1. 传入要订阅的主题,如果为null直接抛出异常
  2. 注册负载均衡监听器,如果消费者组中有节点挂了,要通知其他消费者
  3. 按照主题自动订阅进行分配

六、消费者抓取数据原理

  1. 他首先先初始化消费者组和队列
  2. 然后回调消息会到缓冲队列,然后去队列抓取数据,最多一次500条
  3. 然后抓取后拦截器开始处理数据

七、消费者组初始化

  1. 先判断coordinator不为null那就说明为消费者组
  2. 如果没有指定分区分配策略会抛出异常
  3. 判断coordinator是否准备好,他会循环创建查找coordinator的请求并发送,并获取服务器返回到结果

他这整个消费者组初始化就是判断coordinator有没有准备好

八、消费者组消费流程

  1. 他会用判断coordinator是不是空,是的话就等待
  2. 他上来先去队列拉取数据,一般是拉取不到的
  3. 他先构造请求的入参(最少一次抓多少,最多抓多少,超时时间等待)然后调用send
  4. 他送后返回future,通过回调获取数据的
  5. 他会循环遍历数据获取分区,获取分区的数据,如果有数据就放到消息队列里面
  6. 然后就调用从队列拉取数据的方法拉取,然后他有大小限制最大500,他会循环一波一波拉取过去
  7. 然后放到拦截器走加工操作

九、提交offset原理

  • 同步提交:找到coordinator然后调用commitOffset进行发送,然后不停dowhile循环,调用发送提交请求,然后等待回调获取结果,一直循环到成功为止。
  • 异步提交:他还是用coordinator去提交但是他不等待结果,他new了个监听等待结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/98023.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

卷积层与池化层输出的尺寸的计算公式详解

用文字简单表述如下 卷积后尺寸计算公式: (图像尺寸-卷积核尺寸 2*填充值)/步长1 池化后尺寸计算公式: (图像尺寸-池化窗尺寸 2*填充值)/步长1 一、卷积中的相关函数的参数定义如下: in_channels(int) – 输入信号的通道 out_channels(int)…

SDL播放pcm无声音的原因

试过了网上各种: 要用if(SDL_Init(SDL_INIT_AUDIO | SDL_INIT_TIMER)) ,不要用if(SDL_Init(SDL_INIT_VIDEO | SDL_INIT_AUDIO | SDL_INIT_TIMER)) —NO 无论如何先SDL_memset(stream, 0, len); —NO 最后对比了一个可以出声的程序,原因如下…

Vue3 reactive和ref详解

reactive Vue3.0中的reactive reactive 是 Vue3 中提供的实现响应式数据的方法。在 Vue2 中响应式数据是通过 defineProperty 来实现的,在 Vue3 中响应式数据是通过 ES6 的 Proxy来实现的。reactive 参数必须是对象 (json / arr)如果给 reactive 传递了其它对象 默…

alsa pcm设备之硬件参数

硬件参数包含了stream描述比如格式,采样率,通道数,和ringbuffer 圆形缓存区大小等. 使用snd_pcm_hw_params_t ,ALSA pcm设备使用了参数重定义系统相关的硬件参数,应用程序首先选择全范围的配置, 然后应用程序设置单个参数,直到所有参数都是基本的(确定的). 格式: 使…

Android异步和线程

代码仓库:https://github.com/MADMAX110/Starbuzz Android应用打开数据库时首先要搜索数据库文件,如果没有找到数据库文件就要创建一个空的数据库。然后它要运行所有SQL命令,在数据库中创建数据库表和需要的所有初始数据。最后还要执行一些查…

rust变量

一 、变量定义 (一)语法格式 使用let关键字定义变量 let varname: type value; 如,let a: i32 78;也可以不显式指定类型 let varname value; 如,let a 78;一些例子 1.布尔 let t true; let f: bool false;2.整数 let a …

合宙Air780e+luatos+腾讯云物联网平台完成设备通信与控制(属性上报+4G远程点灯)

1.腾讯云物联网平台 首先需要在腾讯云物联网平台创建产品、创建设备、定义设备属性和行为,例如: (1)创建产品 (2)定义设备属性和行为 (3)创建设备 (4)准备参…

ctrl+d和ctrl+c的区别

CtrlD和CtrlC都是常用的键盘快捷键,但它们的功能不同。 CtrlD 在不同的操作系统和应用程序中可以有不同的功能。在Unix/Linux系统的命令行终端中,CtrlD的作用是发送EOF(End of File)信号,表示输入结束。在Windows系统中…

【高阶数据结构】图详解第一篇:图的基本概念及其存储结构(邻接矩阵和邻接表)

文章目录 1. 图的基本概念1.1 什么是图1.2 有向图和无向图1.3 完全图1.4 邻接顶点1.5 顶点的度1.6 路径1.7 路径长度1.8 简单路径与回路1.9 子图1.10 连通图1.11 强连通图1.12 生成树 2. 图的存储结构2.1 邻接矩阵2.2 邻接矩阵代码实现结构定义构造函数添加边打印图测试 2.3 邻…

ToBeWritten之改进威胁猎杀:自动化关键角色与成功沟通经验

也许每个人出生的时候都以为这世界都是为他一个人而存在的,当他发现自己错的时候,他便开始长大 少走了弯路,也就错过了风景,无论如何,感谢经历 转移发布平台通知:将不再在CSDN博客发布新文章,敬…

10款录屏软分析与选择使用,只看这篇文章就轻松搞定所有,高清4K无水印录屏,博主UP主轻松选择

录屏软件整理 如下为录屏软件,通过思维导图展示分析介绍: https://www.drawon.cn/template/details/6522bd5e0dad9029a0b528e1 如下为整理的录屏软件列表 名称产地价格支持的平台下载地址说明OBS国外免费开源windows/linux/machttps://obsproject.co…

linux 笔记:远程服务器登录jupyter notebook

1 生成jupyter notebook 配置文件(服务器端) jupyter notebook --generate-config #Writing default config to: /home/shuailiu/.jupyter/jupyter_notebook_config.py2 Ipython中设置密码(服务器端) 3 修改jupyter 配置文件&…

汇编语言是怎么一回事?

汇编语言基础 汇编指令和机器码的区别 数据的表示 各类汇编指令 数据传送和算法运算 位运算 条件分支指令 函数调用 字符串处理 流水线和指令调度 流水线实现指令级并行 编译器指令调度 CPU乱序与投机执行 汇编器将汇编语言翻译成 CPU 可以执行的机器码&#xff0c…

强烈推荐这5款功能强大的小软件

​ 今日的栽种,明日的果实,今天继续分享五个功能强大的小软件。 1.文本编辑——IDM UltraEdit ​ IDM UltraEdit是一款功能强大的文本编辑器,它支持多种编程语言和文件格式,可以处理大型文件,进行代码折叠&#xff0…

reactjs开发环境搭建

Reactjs是一个前端web页面应用开发框架工具集,其支持前端构建页面以及后端构建页面两种常用的开发场景,其中,支持reactjs的开发框架包括next.js、remix、gatsby以及其他,本文主要描述next.js开发环境的搭建,next.js是一…

c++中的map和set

文章目录 1. 关联式容器2. 键值对3. 树形结构的关联式容器3.1 set3.1.1 set的介绍3.1.2 set的使用 3.2 map3.2.1 map的介绍3.2.2 map的使用 3.3 multiset3.3.1 multiset的介绍3.3.2 multiset的使用 3.4 multimap3.4.1 multimap的介绍3.4.2 multimap的使用 1. 关联式容器 在初阶…

java socket实现代理Android App

实现逻辑就是转发请求和响应。 核心代码 // 启动代理服务器private void startProxyServer() {new Thread(new ProxyServer()).start();}// 代理服务器static class ProxyServer implements Runnable {Overridepublic void run() {try {// 监听指定的端口int port 8098; //一…

CSS 实现:常见布局

1 设备与视口 设备屏幕尺寸是指屏幕的对角线长度。像素是计算机屏幕能显示一种特定颜色的最小区域,分为设备像素和逻辑像素。 在 Apple 的视网膜屏(Retina)中,默认每 4 个设备像素为一组,渲染出普通屏幕中一个像素显示…

Elasticsearch:如何从 Elasticsearch 集群中删除数据节点

Elasticsearch 集群通常包含多个节点,并且可能存在需要从集群中删除节点的情况。 应谨慎执行此过程,以确保数据的完整性和可用性。 在本文中,我们将引导你完成从 Elasticsearch 集群安全删除节点的步骤。 确保集群是绿色的 在尝试从 Elastic…

【数字化转型】10大数字化转型能力成熟度模型03

一、前言 数字化转型是数据化能力建设的目标和价值,作为一个新兴的课题,目前为止并未出现一个统一的数字化转型成熟度模型。不同的企业和机构,根据自身的发展和认知,推出了自己的企业级或者准行业级标准。这些标准具有很强的参考意义,作者收集和整理了相关的标准和规范,整…