Kafka-RecordAccumulator分析

前面介绍过,KafkaProducer可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。

主线程调用KafkaProducer.send方法发送消息的时候,先将消息放到RecordAccumulator中暂存,然后主线程就可以从send方法中返回了,此时消息并没有真正地发送给Kafka,而是缓存在了RecordAccumulator中。

之后,业务线程通过KafkaProducer.send()方法不断向RecordAccumulator追加消息,当达到一定的条件,会唤醒Sender线程发送RecordAccumulator中的消息。

下面我们就来介绍RecordAccumulator的结构。

首先需要注意的是,RecordAccumulator至少有一个业务线程和一个Sender线程并发操作,所以必须是线程安全的。

RecordAccumulator中有一个以TopicPartition为key的ConcurrentMap,每个value是ArrayDeque(ArrayDeque并不是线程安全的集合),其中缓存了发往对应TopicPartition的消息。

每个RecordBatch拥有一个MemoryRecords对象的引用。

MemoryRecords才是消息最终存放的地方。

这三个类的依赖关系如图所示。

在这里插入图片描述

MemoryRecords

大体了解了RecordAccumulator的结构之后,我们就从最底层的MemoryRecords开始分析。

MemoryRecords表示的是多个消息的集合,其中封装了Java NIO ByteBuffer用来保存消息数据,Compressor用于对ByteBuffer中的消息进行压缩,以及其他控制字段。

在这里插入图片描述

如图(左)所示,有四个字段比较重要,简单介绍一下。

  • buffer:用于保存消息数据的Java NIO ByteBuffer。
  • writeLimit:记录buffer字段最多可以写入多少个字节的数据。
  • compressor:压缩器,对消息数据进行压缩,将压缩后的数据输出到buffer。
  • writable:此MemoryRecords对象是只读的模式,还是可写模式。在MemoryRecords发送前时,会将其设置成只读模式。

在Compressor比较重要的字段和方法如图(右)所示,有两个输出流类型的字段,分别是bufferStream和appendStream。

前者是在buffer上建立的ByteBufferOutputStream(Kafka自己提供的实现)对象,ByteBufferOutputStream继承了java.io.OutputStream,封装了ByteBuffer,当写入数据超出ByteBuffer容量时,ByteBufferOutputStream会进行自动扩容;后者是DataOutputStream类型,它对前者进行了一层装饰,为其添加了压缩的功能。

MemoryRecords中的Compressor的压缩类型是由“compression.type”配置参数指定的,即KafkaProducer.compressionType字段的值。

下面来分析一下创建压缩流的方式,目前KafkaProducer支持GZIP、SNAPPY、LZ4三种压缩方式。

Compressor提供了一系列put*()方法,向appendStream流写入数据,如图所示。很明显,这是装饰器模式的典型,通过bufferStream装饰,添加自动扩容的功能;通过appendStream装饰后,添加压缩功能。

在这里插入图片描述
了解了Compressor的实现逻辑之后,我们回到MemoryRecords继续分析。

MemoryRecords的构造方法是私有的,只能通过emptyRecords)方法得到其对象。

MemoryRecords中有四个比较重要的方法。

  • append()方法:先判断MemoryRecords是否为可写模式,然后调用Compressor.put*()方法,将消息数据写入ByteBuffer中。
  • hasRoomFor()方法:根据Compressor估算的已写字节数,估计MemoryRecords剩余空间是否足够写入指定的数据。注意,这里仅仅是估算,所以不一定准确,通过hasRoomFor()方法判断之后写入数据,也可能就会导致底层ByteBuffer出现扩容的情况。
  • close()方法:出现ByteBuffer扩容的情况时,MemoryRecords.buffer字段与ByteBufferOutputStream.buffer字段所指向的不再是同一个ByteBuffer对象,如图(左)所示。
  • 在close()方法中,会将MemoryRecords.buffer字段指向扩容后的ByteBuffer对象,如图(右)所示。同时,将writable设置为false(即只读模式)。
    在这里插入图片描述
  • sizelnBytes()方法:对于可写的MemoryRecords,返回的是ByteBufferOutputStream.buffer字段的大小;对于只读MemoryRecords,返回的是MemoryRecords.buffer的大小。

RecordBatch

了解了MemoryRecords的具体实现之后,来分析RecordBatch类的实现。

每个RecordBatch对象中封装了一个MemoryRecords对象,除此之外,还封装了很多控制信息和统计信息,下面简单介绍一下。

  • recordCount:记录了保存的Record的个数。
  • maxRecordSize:最大Record的字节数。
  • attempts:尝试发送当前RecordBatch的次数。
  • lastAttemptMs:最后一次尝试发送的时间戳。
  • records:指向用来存储数据的MemoryRecords对象。
  • topicParition:当前RecordBatch中缓存的消息都会发送给此TopicPartition。
  • produceFuture:ProduceRequestResult类型,标识RecordBatch状态的Future对象。
  • lastAppendTime:最后一次向RecordBatch追加消息的时间戳。
  • thunks:Thunk对象的集合,在后面会详细介绍。
  • offsetCounter:用来记录某消息在RecordBatch中的偏移量。
  • retry:是否正在重试。如果RecordBatch中的数据发送失败,则会重新尝试发送。

图中,以RecordBatch为中心,刻画了其相关类间的对应关系。

在这里插入图片描述
下面分析一下ProduceRequestResult这个类的功能。

ProduceRequestResult并未实现java.util.concurrent.Future接口,但是其通过包含一个count值为1的CountDownLatch对象,实现了类似于Future的功能(Future、CountDownLatch等工具的使用)。

当RecordBatch中全部的消息被正常响应、或超时、或关闭生产者时,会调用ProduceRequestResult.done方法,将produceFuture标记为完成并通过ProduceRequestResult.error字段区分“异常完成”还是“正常完成”,之后调用CountDownLatch对象的countDown方法。

此时,会唤醒阻塞在CountDownLatch对象的await方法的线程(这些线程通过ProduceRequestResult的await方法等待上述三个事件的发生)。

分区会为其中记录的消息分配一个offset并通过此offset维护消息顺序。

在ProduceRequestResult中还有一个需要注意的字段baseOffset,表示的是服务端为此RecordBatch中第一条消息分配的offset,这样每个消息可以根据此offset以及自身在此RecordBatch中的相对偏移量,计算出其在服务端分区中的偏移量了。

在介绍Tunk类之前,请回顾KafkaProducer.send方法的第二个参数,是一个Callback对象,它是针对单个消息的回调函数(每个消息都会有一个对应的Callback对象作为回调)。

RecordBatch.thunks字段可以理解为消息的回调对象队列,Thunk中的callback字段就指向对应消息的Callback对象,其另一个字段future是FutureRecordMetadata类型。

FutureRecordMetadata类有两个关键字段。

  • result:ProduceRequestResult类型,指向对应消息所在RecordBatch的produceFuture字段。
  • relativeOffset:long类型,记录了对应消息在RecordBatch中的偏移量。

FutureRecordMetadata实现了java.util.concurrent.Future接口,但其实现基本都是委托给了ProduceRequestResult对应的方法,由此可以看出,消息应该是按照RecordBatch进行发送和确认的。

当生产者已经收到某消息的响应时,FutureRecordMetadata.get方法就会返回RecordMetadata对象,其中包含消息在Partition中的offset等其他元数据,可供用户自定义Callback使用。

分析完RecordBatch依赖的组件,现在回来看看RecordBatch类的核心方法。tryAppend方法是最核心的方法,其功能是尝试将消息添加到当前的RecordBatch中缓存。

在这里插入图片描述
当RecordBatch成功收到正常响应、或超时、或关闭生产者时,都会调用RecordBatch的done()方法。

在done()方法中,会回调RecordBatch中全部消息的Callback回调,并调用其produceFuture字段的done()方法。RecordBatch.done()方法的调用关系如图所示。

在这里插入图片描述

RufferPool

ByteBuffer的创建和释放是比较消耗资源的,为了实现内存的高效利用,基本上每个成熟的框架或工具都有一套内存管理机制。

Kafka客户端使用BufferPool来实现ByteBuffer的复用。

图展示了BufferPool的核心字段。

在这里插入图片描述
首先需要了解的是,每个BufferPool对象只针对特定大小(由poolableSize字段指定)的ByteBuffer进行管理,对于其他大小的ByteBuffer并不会缓存进BufferPool。

一般情况下,我们会调整MemoryRecords的大小(RecordAccumulator.batchSize字段指定),使每个MemoryRecords可以缓存多条消息。

但也有例外情况,当一条消息的字节数大于MemoryRecords时,就不会复用BufferPool中缓存的ByteBuffer,而是额外分配ByteBuffer,在它被使用完后也不会放入BufferPool进行管理,而是直接丢弃由GC回收。

如果经常出现这种例外情况,就需要考虑调整batchSize的配置了。

下面介绍BufferPool的关键字段:

  • free:是一个ArayDeque队列,其中缓存了指定大小的ByteBuffer对象。
  • ReentrantLock:因为有多线程并发分配和回收ByteBuffer,所以使用锁控制并发,保证线程安全。
  • waiters:记录因申请不到足够空间而阻塞的线程,此队列中实际记录的是阻塞线程对应的Condition对象。
  • totalMemory:记录了整个Pool的大小。
  • availableMemory:记录了可用的空间大小,这个空间是totalMemory减去free列表中全部ByteBuffer的大小。
    BufferPool.allocate()方法负责从缓冲池中申请ByteBuffer,当缓冲池中空间不足时,就会阻塞调用线程。

下面简单分析一下allocate()方法申请空间的过程:
在这里插入图片描述
继续分析deallocate()方法的实现:
在这里插入图片描述

RecordAccumulator

介绍完了MemoryRecord、RecordBatch以及BufferPool的工作机制,再来看RecordAccumulator的实现就比较简单了。

下面来看RecordAccumulator中的关键字段和方法,如图所示。

在这里插入图片描述

  • batches:TopicPartition与RecordBatch集合的映射关系,类型是CopyOnWriteMap,是线程安全的集合,但其中的Deque是ArayDeque类型,是非线程安全的集合。在后面的介绍中可以看到,追加新消息或发送RecordBatch的时候,需要加锁同步。

每个Deque中都保存了发往对应TopicPartition的RecordBatch集合。

  • batchSize:指定每个RecordBatch底层ByteBuffer的大小。
  • Compression:压缩类型,参考MemoryRecords。
  • incomplete:未发送完成的RecordBatch集合,底层通过Set集合实现。
  • free:BufferPool对象,参考BufferPool。
  • drainlndex:使用drain方法批量导出RecordBatch时,为了防止饥饿,使用drainIndex记录上次发送停止时的位置,下次继续从此位置开始发送。

KafkaProducer.send方法最终会调用RecordAccumulator.append方法将消息追加到RecordAccumulator中,其代码比较长,先来看其主要逻辑:

  1. 首先在batches集合中查找TopicPartition对应的Deque,查找不到,则创建新的Deque,并添加到batches集合中。
  2. 对Deque加锁(使用synchronized关键字加锁)。
  3. 调用tryAppendO方法,尝试向Deque中最后一个RecordBatch追加Record。
  4. synchronized块结束,自动解锁。
  5. 追加成功,则返回RecordAppendResult(其中封装了ProduceRequestResult)。
  6. 追加失败,则尝试从BufferPool中申请新的ByteBuffer。
  7. 对Deque加锁(使用synchronized关键字加锁),再次尝试第3步。
  8. 追加成功,则返回;失败,则使用第5步得到的ByteBuffer创建RecordBatch。
  9. 将Record追加到新建的RecordBatch中,并将新建的RecordBatch追加到对应的Deque尾部。
  10. 将新建的RecordBatch追加到incomplete集合。
  11. synchronized块结束,自动解锁。
  12. 返回RecordAppendResult,RecordAppendResult会中的字段会作为唤醒Sender线程的条件。

下面是RecordAccumulator.append方法的具体实现:

在这里插入图片描述
在这里插入图片描述现在回到KafkaProducer.doSend方法,doSend方法的最后一步就是判断此次向RecordAccumulator中追加消息后是否满足唤醒Sender线程条件,这里唤醒Sender线程的条件是消息所在队列的最后一个RecordBatch满了或此队列中不止一个RecordBatch。

在客户端将消息发送给服务端之前,会调用RecordAccumulator.ready方法获取集群中符合发送消息条件的节点集合。

这些条件是站在RecordAccumulator的角度对集群中的Node进行筛选的,具体的条件如下:

  1. Deque中有多个RecordBatch或是第一个RecordBatch是否满了。
  2. 是否超时了。
  3. 是否有其他线程在等待BufferPool释放空间(即BufferPool的空间耗尽了)。
  4. 是否有线程正在等待flush操作完成。
  5. Sender线程准备关闭。

下面来看一下ready方法的代码,它会遍历batches集合中每个分区,首先查找当前分区Leader副本所在的Node,如果满足上述五个条件,则将此Node信息记录到readyNodes集合中。

遍历完成后返回ReadyCheckResult对象,其中记录了满足发送条件的Node集合、在遍历过程中是否有找不到Leader副本的分区(也可以认为是Metadata中当前的元数据过时了)、下次调用ready方法进行检查的时间间隔。

在这里插入图片描述

调用RecordAccumulator.ready)方法得到readyNodes集合后,此集合还要经过NetworkClient的过滤(在介绍Sender线程的时候再详细介绍)之后,才能得到最终能够发送消息的Node集合。

RecordAccumulator.drain方法会根据上述Node集合获取要发送的消息,返回Map<Integer,List>集合,key是Nodeld,value是待发送的RecordBatch集合。

drain方法也是由Sender线程调用的。drain方法的核心逻辑是进行映射的转换:将RecordAccumulator记录的TopicPartition>RecordBatch集合的映射,转换成了Nodeld->RecordBatch集合的映射。

为什么需要这次转换呢?在网络I/O层面,生产者是面向Node节点发送消息数据,它只建立到Node的连接并发送数据,并不关心这些数据属于哪个TopicPartition;而在调用KafkaProducer的上层业务逻辑中,则是按照TopicPartition的方式产生数据,它只关心发送到哪个TopicPartition,并不关心这些TopicPartition在哪个Node节点上。

在下文介绍到Sender线程的时候会发现,它每次向每个Node节点至多发送一个ClientRequest请求,其中封装了追加到此Node节点上多个分区的消息,待请求到达服务端后,由Kafka对请求记性解析。

下面来看看drain方法的代码:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/629355.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM实战(23)——内存碎片优化

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 学习必须往深处挖&…

I2C总线和通信协议详解 (超详细配42张高清图+万字长文)

I2C总线和通信协议详解 (超详细配42张高清图万字长文) I2C&#xff08;Inter-Integrated Circuit&#xff09;通信总线&#xff0c;作为嵌入式系统设计中的一个关键组成部分&#xff0c;其灵活性和高效率使其在高级应用中备受青睐。本文旨在提供关于I2C通信总线的深度解析&…

认识并使用JWT

认识并使用JWT 一、互联网世界的用户认证二、对JWT的基本认知三、JWT的原理1 Header2 Payload3 Signature4 [参考资料](https://www.ruanyifeng.com/blog/2018/07/json_web_token-tutorial.html) 四、使用JWT1、引入依赖2、jwt的生成与解析3、测试3.1 生成jwt3.2 解析jwt 一、互…

DataXCloud部署与配置[智数通]

静态IP设置 # 修改网卡配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33# 修改文件内容 TYPEEthernet PROXY_METHODnone BROWSER_ONLYno BOOTPROTOstatic IPADDR192.168.18.130 NETMASK255.255.255.0 GATEWAY192.168.18.2 DEFROUTEyes IPV4_FAILURE_FATALno IPV6INIT…

Pytorch各种Dropout层应用于详解

目录 torch框架Dropout functions详解 dropout 用途 用法 使用技巧 参数 数学理论公式 代码示例 alpha_dropout 用途 用法 使用技巧 参数 数学理论公式 代码示例 feature_alpha_dropout 用途 用法 使用技巧 参数 数学理论 代码示例 dropout1d 用途 用…

SQL实践:利用tag检索文件的多种情况讨论(二)

在上一篇文章SQL实践&#xff1a;利用tag检索文件的多种情况讨论中&#xff0c;我们介绍了在使用外键的方式为数据关联tag后&#xff0c;如何筛选&#xff1a; 如何筛选包含某一个tag的数据如何筛选包含且只包含某一个tag的数据如何筛选包含多个指定tag的数据 这篇文章主要是…

eNSP学习——终端直连三层网关设备进行通信

VLAN 配置 一 . 功能简介 将设备中的某些接口定义为一个单独的区域&#xff0c;将指定接口加入到指定 VLAN 中之后&#xff0c;接口就可以转发 指定 VLAN 报文。从而实现 VLAN 内的主机可以直接通信&#xff0c;而 VLAN 间的主机不能直接互通&#xff0c;将广播报文 …

element-ui tree树形结构全选、取消全选,展开收起

控制树形结构全选、取消全选&#xff0c;展开收起 <template><div><!-- 添加 ref"tree" 属性--><el-tree:data"data"show-checkboxdefault-expand-allnode-key"id"ref"tree"highlight-current:props"defa…

详解SpringCloud微服务技术栈:Feign远程调用、最佳实践、错误排查

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位大四、研0学生&#xff0c;正在努力准备大四暑假的实习 &#x1f30c;上期文章&#xff1a;详解SpringCloud微服务技术栈&#xff1a;Nacos配置管理 &#x1f4da;订阅专栏&#xff1a;微服务技术全家桶 希望文章对你们有…

蓝桥杯青少年创意编程大赛:激发少儿编程潜能,培养未来科技之星

随着科技的飞速发展&#xff0c;编程已经成为了当今世界的一项重要技能。为了培养更多的编程人才&#xff0c;蓝桥杯官网显示&#xff0c;蓝桥杯青少年创意编程大赛应运而生。作为国内有影响力的少儿编程赛事之一&#xff0c;蓝桥杯青少年创意编程大赛旨在激发青少年对编程的兴…

​Portkey AI网关:一个用来连接多种人工智能模型的开源工具

简介 它允许开发者通过一个简单的API接口来访问超过100种不同的大语言模型。包括OpenAI、Anthropic、Mistral、LLama2、Anyscale、Google Gemini等。安装体积只有45kb&#xff0c;处理速度提升了9.9倍&#xff0c;可以在多个不同的AI模型中来回切换。可以根据自己的需要进行灵…

找不到mfc100.dll的解决方法,怎么修复mfc100.dll文件

当我们在使用电脑时&#xff0c;时常可能会遇到各类系统提示的错误信息。"找不到mfc100.dll" 就是这些错误之一&#xff0c;该错误提示会妨碍我们执行一些应用程序或特定代码。为了帮助读者克服这个技术障碍&#xff0c;本篇文章将详尽阐明导致该问题的根本原因&…

【Flutter 问题系列第 80 篇】TextField 输入框组件限制可输入的最大长度后,输入的内容中包含表情符号时,获取输入的内容数还是会超出限制的问题

这是【Flutter 问题系列第 80 篇】&#xff0c;如果觉得有用的话&#xff0c;欢迎关注专栏。 博文当前所用 Flutter SDK&#xff1a;3.10.5、Dart SDK&#xff1a;3.0.5 一&#xff1a;问题描述 在输入用户名称、简介等内容时&#xff0c;一般我们都会限制输入框内最大可输入…

FFMPEG解码实时流,支持cpu、gpu解码

官网下载的ffmpeg目前只能下载到X64版本的库&#xff0c;具体编译请参考windows编译ffmpeg源码&#xff08;32位库&#xff09;_windows 32位ffmpeg动态库-CSDN博客 直接上代码 int VideoDecodeModule::Open(std::string strUrl) {AVFormatContext *pFormatCtx nullptr;AVCo…

电脑本地连接不见了怎么恢复?5个方法轻松解决问题!

“我在使用电脑时&#xff0c;突然发现我的本地连接不见了&#xff0c;这是怎么回事呢&#xff1f;有什么方法可以解决这个问题吗&#xff1f;” 电脑的本地连接是一种将电脑与局域网连接的方式。局域网是一种小型的网络&#xff0c;通常在建筑物内或地理位置相近的少量计算机之…

Python数据分析案例33——新闻文本主题多分类(Transformer, 组合模型) 模型保存

案例背景 对于海量的新闻&#xff0c;我们可能需要进行文本的分类。模型构建很重要&#xff0c;现在对于自然语言处理基本都是神经网络的方法了。 本次这里正好有一组质量特别高的新闻数据&#xff0c;涉及 教育 科技 社会 时政 财经 房产 家居 七大主题&#xff0c;基本涵盖…

Grafana(三)Grafana 免密登录-隐藏导航栏-主题变换

一. 免密登录 Grafana 的常用方式&#xff1a; 将配置好的Grafana图嵌入到系统页面中 为了实现可免登录访问&#xff0c;可以通过如下方式进行设置&#xff1a; 1. 修改Grafana配置文件 在Grafana的配置文件 /etc/grafana/grafana.ini 中&#xff0c;找到 [auth.anonymous] 配…

P9842 [ICPC2021 Nanjing R] Klee in Solitary Confinement 题解(SPJ!!!)

[ICPC2021 Nanjing R] Klee in Solitary Confinement 题面翻译 给定 n , k n,k n,k 和一个长为 n n n 的序列&#xff0c;你可以选择对区间 [ l , r ] [l, r] [l,r] 的数整体加上 k k k&#xff0c;也可以不加。最大化众数出现次数并输出。 题目描述 Since the travele…

MySQL命令大全和实例

文章目录 1. 数据库管理2. 表操作3. 数据操作&#xff08;CRUD&#xff09;4. 条件查询与排序5. 聚合函数和分组6. 用户权限管理7. 其他操作8. 视图操作9. 索引操作10. 子查询与连接查询11. 插入多行数据12. 删除满足特定条件的表中所有数据13. 清空表&#xff08;保留表结构&a…

rust跟我学五:是否安装双系统

图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎么得到检测双系统的。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址:[我的Ru…