最近看Kafka源码,着实被它的客户端缓冲池技术优雅到了

作者 | 犀牛饲养员

 责编 | 徐威龙

封图| CSDN 下载于视觉中国

最近看kafka源码,着实被它的客户端缓冲池技术优雅到了。忍不住要写篇文章赞美一下(哈哈)。

注:本文用到的源码来自kafka2.2.2版本。

背景

当我们应用程序调用kafka客户端 producer发送消息的时候,在kafka客户端内部,会把属于同一个topic分区的消息先汇总起来,形成一个batch。真正发往kafka服务器的消息都是以batch为单位的。如下图所示:

这么做的好处显而易见。客户端和服务端通过网络通信,这样批量发送可以减少网络带来的性能开销,提高吞吐量。

这个Batch的管理就非常值得探讨了。可能有人会说,这不简单吗?用的时候分配一个块内存,发送完了释放不就行了吗。

kafka是用java语言编写的(新版本大部分都是用java实现的了),用上面的方案就是使用的时候new一个空间然后赋值给一个引用,释放的时候把引用置为null等JVM GC处理就可以了。

看起来似乎也没啥问题。但是在并发量比较高的时候就会频繁的进行GC。我们都知道GC的时候有个stop the world,尽管最新的GC技术这个时间已经非常短,依然有可能成为生产环境的性能瓶颈。

kafka的设计者当然能考虑到这一层。下面我们就来学习下kafka是如何对batch进行管理的。

缓冲池技术原理解析

kafka客户端使用了缓冲池的概念,预先分配好真实的内存块,放在池子里。

每个batch其实都对应了缓冲池中的一个内存空间,发送完消息之后,batch不再使用了,就把内存块归还给缓冲池。

听起来是不是很耳熟啊?不错,数据库连接池,线程池等池化技术其实差不多都是这样的原理。通过池化技术降低创建和销毁带来的开销,提升执行效率。

代码是最好的文档,下面我们就来撸下源码。

我们撸代码的步骤采用的是从上往下的原则,先带你看看缓冲池在哪里使用,然后再深入到缓存池内部深入分析。

下面的代码做了一些删减,值保留了跟本文相关的部分便于分析。

public class KafkaProducer<K, V> implements Producer<K, V> {private final Logger log;private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);private static final String JMX_PREFIX = "kafka.producer";public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";@Overridepublic Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptionsProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);...}

当我们调用客户端的发送消息的时候,底层会调用doSend,然后里面使用一个记录累计器RecordAccumulator把消息append进来。我们继续往下看看。

public final class RecordAccumulator {private final Logger log;private volatile boolean closed;private final AtomicInteger flushesInProgress;private final AtomicInteger appendsInProgress;private final int batchSize;private final CompressionType compression;private final int lingerMs;private final long retryBackoffMs;private final int deliveryTimeoutMs;private final BufferPool free;private final Time time;private final ApiVersions apiVersions;private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;private final IncompleteBatches incomplete;// The following variables are only accessed by the sender thread, so we don't need to protect them.private final Map<TopicPartition, Long> muted;private int drainIndex;private final TransactionManager transactionManager;private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock) throws InterruptedException {// We keep track of the number of appending thread to make sure we do not miss batches in// abortIncompleteBatches().appendsInProgress.incrementAndGet();ByteBuffer buffer = null;buffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {// Need to check if producer is closed again after grabbing the dequeue lock.if (closed)throw new KafkaException("Producer closed while send in progress");RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));dq.addLast(batch);...

RecordAccumulator其实就是管理一个batch队列,我们看到append方法实现其实是调用BufferPool的free方法申请(allocate)了一块内存空间(ByteBuffer), 然后把这个内存空空间包装成batch添加到队列后面。

当消息发送完成不在使用batch的时候,RecordAccumulator会调用deallocate方法归还内存,内部其实是调用BufferPool的deallocate方法。

public void deallocate(ProducerBatch batch) {incomplete.remove(batch);// Only deallocate the batch if it is not a split batch because split batch are allocated outside the// buffer pool.if (!batch.isSplitBatch())free.deallocate(batch.buffer(), batch.initialCapacity());}

很明显,BufferPool就是缓冲池管理的类,也是我们今天要讨论的重点。我们先来看看分配内存块的方法。

public class BufferPool {static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";private final long totalMemory;private final int poolableSize;private final ReentrantLock lock;private final Deque<ByteBuffer> free;private final Deque<Condition> waiters;/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */private long nonPooledAvailableMemory;private final Metrics metrics;private final Time time;private final Sensor waitTime;public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {if (size > this.totalMemory)throw new IllegalArgumentException("Attempt to allocate " + size+ " bytes, but there is a hard limit of "+ this.totalMemory+ " on memory allocations.");ByteBuffer buffer = null;this.lock.lock();try {// check if we have a free buffer of the right size pooledif (size == poolableSize && !this.free.isEmpty())return this.free.pollFirst();// now check if the request is immediately satisfiable with the// memory on hand or if we need to blockint freeListSize = freeSize() * this.poolableSize;if (this.nonPooledAvailableMemory + freeListSize >= size) {// we have enough unallocated or pooled memory to immediately// satisfy the request, but need to allocate the bufferfreeUp(size);this.nonPooledAvailableMemory -= size;} else {// we are out of memory and will have to blockint accumulated = 0;Condition moreMemory = this.lock.newCondition();try {long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);this.waiters.addLast(moreMemory);// loop over and over until we have a buffer or have reserved// enough memory to allocate onewhile (accumulated < size) {long startWaitNs = time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try {waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);} finally {long endWaitNs = time.nanoseconds();timeNs = Math.max(0L, endWaitNs - startWaitNs);recordWaitTime(timeNs);}if (waitingTimeElapsed) {throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");}remainingTimeToBlockNs -= timeNs;// check if we can satisfy this request from the free list,// otherwise allocate memoryif (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// just grab a buffer from the free listbuffer = this.free.pollFirst();accumulated = size;} else {// we'll need to allocate memory, but we may only get// part of what we need on this iterationfreeUp(size - accumulated);int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);this.nonPooledAvailableMemory -= got;accumulated += got;}...

首先整个方法是加锁操作的,所以支持并发分配内存。

逻辑是这样的,当申请的内存大小等于poolableSize,则从缓存池中获取。这个poolableSize可以理解成是缓冲池的页大小,作为缓冲池分配的基本单位。从缓存池获取其实就是从ByteBuffer队列取出一个元素返回。

如果申请的内存不等于特定的数值,则向非缓存池申请。同时会从缓冲池中取一些内存并入到非缓冲池中。这个nonPooledAvailableMemory指的就是非缓冲池的可用内存大小。非缓冲池分配内存,其实就是调用ByteBuffer.allocat分配真实的JVM内存。


缓存池的内存一般都很少回收。而非缓存池的内存是使用后丢弃,然后等待GC回收。

继续来看看batch释放的代码,

public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {if (size == this.poolableSize && size == buffer.capacity()) {buffer.clear();this.free.add(buffer);} else {this.nonPooledAvailableMemory += size;}Condition moreMem = this.waiters.peekFirst();if (moreMem != null)moreMem.signal();} finally {lock.unlock();}}

很简单,也是分为两种情况。要么直接归还缓冲池,要么就是更新非缓冲池部分的可以内存。然后通知等待队列里的第一个元素。

推荐阅读:Docker 概念很难理解?一文搞定 Docker 端口绑定DevOps 转型时如何安全融入?对企业产出有何影响?2019年 DevOps 最新现状研究报告解读 | 原力计划
十分钟上手 React+MirrorX,从此前端大神代码不再难懂 | 原力计划
第一批复工的人,都栽在了公司的厕所......
如何用Jupyter Notebook制作新冠病毒疫情追踪器?
比特币最主流,以太坊大跌,区块链技术“万金油”红利已结束 | 区块链开发者年度报告
真香,朕在看了!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/518825.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

相信坚持的力量,我的程序员打怪升级之路

我是来自阿里云-云通信技术团队的玄照&#xff0c;2015年底进入阿里&#xff0c;刚渡过三年醇&#xff0c;正迈向五年陈的路上。个人兴趣爱好广泛&#xff0c;静的、动的&#xff0c;音乐、游戏、电影、运动都比较喜欢。 玄照&#xff0c;阿里云高级技术专家 程序生涯启航 第…

牛客网SQL篇刷题篇(32-37)

https://www.nowcoder.com/ta/sql 1.sql group_contact()、concat()、concat_ws函数 https://blog.csdn.net/qq_36250202/article/details/99308824 eg:将employees表的所有员工的last_name和first_name拼接起来作为Name&#xff0c;中间以一个空格区分 SELECT CONCAT(las…

支付宝工程师如何搞定关系数据库的“大脑”——查询优化器

前言 查询优化器是关系数据库系统的核心模块&#xff0c;是数据库内核开发的重点和难点&#xff0c;也是衡量整个数据库系统成熟度的“试金石”。 查询优化理论诞生距今已有四十来年&#xff0c;学术界和工业界其实已经形成了一套比较完善的查询优化框架(System-R 的 Bottom-…

SpringBoot2 集成 xxl-job任务调度中心

接上一篇&#xff1a; 搭建xxl-job任务调度中心 https://gblfy.blog.csdn.net/article/details/113809843 文章目录一、SpringBoot 配置1. maven依赖2. 执行器配置 application.yml3. 执行器组件配置4. 部署执行器项目二、xxl-job任务调度中心2.1. 执行器管理2.2. 任务管理三、…

HelloWorld

HelloWorld 创建一个Java文件 文件后缀名为.javaHello.java 编写代码 public class Hello{public static void main(String[] args){System.out.print("Hello, world!");} }编译java文件 javac Hello.java会多出一个Hello.class 文件 运行class文件 java Hell…

运行Java程序时 Tomcat出错 显示端口被占用

解决方法&#xff1a;命令提示符&#xff08;管理员&#xff09; 输入netstat -ano | findstr 8080 检查8080端口有哪些进程 输入taskkill -pid 11728 -f 关闭11728的进程

从开源小白到 Apache Member,我的成长之路

我们走过的每一步路&#xff0c;都会留下印记&#xff0c;越坚实&#xff0c;越清晰。 近日&#xff0c;Apache 软件基金会&#xff08;ASF&#xff09;官方 Blog 宣布全球新增 40 位 Apache Member&#xff0c;张乎兴有幸成为其中一位。 目前&#xff0c;全球共有771位 ASF …

当你打开天猫的那一刻,推荐系统做了哪些工作?

当年打开天猫的那一刻&#xff0c;它为你完成了华丽的变身&#xff0c;成为世上独一无二的“天猫”&#xff0c;这就是智能推荐的力量。今天&#xff0c;来自阿里巴巴搜索推荐事业部的算法工程师陈启伟为你介绍天猫如何玩转首页个性化推荐&#xff0c;揭开搜索推荐的神秘面纱。…

百万人学AI:CSDN重磅共建人工智能技术新生态

站在AI发展的新十年起点上&#xff0c;CSDN将发挥开发者优势&#xff0c;与中国AI各行业和企业共建“百万人学AI”新技术生态。作者 | CSDN新媒体事业部8年前&#xff0c;现图灵奖得主Hinton团队在ImageNet竞赛中首次使用深度学习完胜Google等其它团队&#xff0c;顿时让工业界…

牛客网SQL篇刷题篇(38-47)

1.视图&#xff1a;视图是可视化的表。 视图的作用&#xff1a; 第一点&#xff1a;使用视图&#xff0c;可以定制用户数据&#xff0c;聚焦特定的数据。 解释&#xff1a; 在实际过程中&#xff0c;公司有不同角色的工作人员&#xff0c;我们以销售公司为例的话&#xff0…

SpringBoot2 集成 xxl-job任务调度中心_参数传递

文章目录一、xxl-job任务调度中心1. 调度中心创建任务2. 调度中心创建执行器二、执行器任务编码2.1. 单参数2.2. 多参数三、调度中心参数传递测试3.1. 单个参数传递3.2. 多个参数传递前提&#xff1a;执行器和xxl-job任务调度中心启动完毕 一、xxl-job任务调度中心 1. 调度中心…

Java-用IDEA创建Java项目

1. 创建项目 2. 创建空项目 3. 输入项目名 &#xff14;.配置JDK 点击Project Structure 配置JDK 点击Apply->OK 5. 新建模块 https://www.bilibili.com/video/BV12J41137hu?p21&spm_id_frompageDriver

DevOps:从「蒸汽时代」到「高铁时代」,SUNMI DevOps转型之路 | 原力计划

作者 | 文振熙、刘文沣责编 | 徐威龙封图| CSDN 下载于视觉中国商米科技成立于 2013 年&#xff0c;总部位于上海市杨浦区创智天地&#xff0c;是一家具有产品创新基因和互联网基因的公司。商米在短时间内迅速成长为一家近1000人的企业&#xff0c;产品研发人数占比一度超过70%…

SpringBoot2 集成 xxl-job任务调度中心_路由策略

文章目录一、简述二、故障转移演示2.1. 启动2个执行器2.2. 添加执行器ip2.3. 故障转移策略2.4. 启动任务2.5. 模拟8081执行器宕机2.6. 结论三、轮训策略演示3.1. 启动2个执行器3.2. 添加执行器ip3.3. 轮训策略3.4. 启动任务3.5. 日志分析3.6. 故障转移3.7. 重新启动8082执行器四…

Uniapp组件之间传参

1.父组件内引入子组件&#xff0c;并且子组件使用父组件内的数据 将子组件引入到父组件&#xff1a; <uni-pop :opts"defaultOptions"></uni-pop> import uniPop from /components/uniPop/uniPop.vue 子组件使用父组件内的数据&#xff1a; 2------创建…

基于大数据的舆情分析系统架构 - 架构篇

前言 互联网的飞速发展促进了很多新媒体的发展&#xff0c;不论是知名的大V&#xff0c;明星还是围观群众都可以通过手机在微博&#xff0c;朋友圈或者点评网站上发表状态&#xff0c;分享自己的所见所想&#xff0c;使得“人人都有了麦克风”。不论是热点新闻还是娱乐八卦&am…

Java-标识符和关键字

关键字 标识符 https://www.bilibili.com/video/BV12J41137hu?p22&spm_id_frompageDriver

SpringBoot2 集成 xxl-job任务调度中心_阻塞策略

阻塞处理策略&#xff1a;调度过于密集执行器来不及处理时的处理策略&#xff0c;策略包括&#xff1a;单机串行&#xff08;默认&#xff09;、丢弃后续调度、覆盖之前调度 阻塞处理策略说明单机串行&#xff08;默认&#xff09;任务依次排队执行丢弃后续调度当上一个任务没…

反转!Python再次卫冕2020年编程榜,Java和C回落,你怎么看?​

2020年转眼Q1季度快要结束&#xff0c;在近几个月的榜单中&#xff0c;Python持续19年的火爆&#xff0c;走在在卫冕的道路&#xff0c;并且与老牌语言Java、C的差距拉得更远了一些。近期Udemy 制作了一份《2020 年职场学习趋势报告》&#xff0c;指出了哪些技能最受职场人关注…

HTTP状态码415 springboot项目

1.415报错&#xff0c;有可能是parameter写错了&#xff0c;前台不接收这种形式 controller写RequestBody&#xff0c;前台url写&#xff1f;name1&number1就会报错415