Flink+Paimon多流拼接性能优化实战

目录

(零)本文简介

意外收获:

(一)背景

(二)探索梳理过程

(三)源码改造

(四)修改效果

1、JOB状态

2、Level5的dataFile总大小

3、数据延迟

4、关联率

(五)未来展望:异步Compact


(零)本文简介

Paimon多流拼接/合并性能优化;

        为解决离线T+1多流拼接数据时效性Flink实时状态太大任务稳定性问题,这里基于数据湖工具Apache Paimon进行近实时的多流拼接。

        使用Flink+Paimon基于ParmaryKey TablePartialUpdate)进行多流拼接的时候,跑一段时间有时会遇到周期性背压、checkpoint时间过长等情况,本文通过剖析源码逻辑、修改源码,在一定程度上解决了这个问题。

        note:下文对源码的修改可能需要了解一点paimon的实现原理比如:LSM Tree(level DB)

可参考:LSM树详解 - 知乎

        LSM(Log-Structured Merge Tree)_lsm tree_一介草民kk的博客-CSDN博客

Apache Paimon基础 、多流拼接方法 及 与Hudi 的对比 可参考前面文章:

新一代数据湖存储技术Apache Paimon入门Demo_Leonardo_KY的博客-CSDN博客

基于数据湖的多流拼接方案-HUDI概念篇_Leonardo_KY的博客-CSDN博客

意外收获:

        本文通过修改源码还意外解决了【跨分区关联率偏低】的问题(详见下文)。

(一)背景

       这里使用 Flink 1.14 + Apache Paimon 0.5 snapshot 进行多流拼接(前端埋点流 + 服务端埋点流);

        当前情况是一天一个分区,一个分区100个bucket;就会出现如下情况:分区/bucket中的数据越来越多,到达下午或者傍晚的时候就会出现 paimon 作业周期性背压(因为mergeTree中维护的数据越来越多,tree越来越大),checkpoint时间也会比较长;于是决定将mergeTree中的过期数据删除,即让其不进入tree中,减少计算量;

        这里的“过期”按需自定义,比如调研发现99.9%的数据都可以使用3个小时之内的数据拼接上,那就根据时间戳与当前时间戳(假设没有很严重的消费积压)相比,时间差超过3小时的数据就将其丢弃;

具体细节涉及到(这里先将结论给出):

    1. data文件创建后是否还会修改?(不会)
    2. 根据时间排序的data数据文件是增量还是全量?(几个最新文件加起来就是全量)
    3. 应该根据dataFile的创建/修改时间判断过期 还是 通过具体每个record字段值的时间戳判断过期?(通过record)

(二)探索梳理过程

1、首先观察hdfs文件之后发现,dataFile只保留最近一个小时的文件,超过一小时的文件就会被删除,这里应该对应参数 partition.expiration-check-interval = 1h,由此可知data文件不是增量的【下文compact只有几个文件再次加强验证】(那么就不能通过dataFile的最新修改时间判断文件过期将数据过滤);

2、观察flink log发现,每次compaction都只读几个文件,如下所示:

        每次其实只读取一个level0的file,再加上几个level5的file(level5这里file就是之前的全部数据,包含多个流的),最后将compact之后的文件再命名为新的名字写到level5;

        随着分区数据量的增多,参与compact的file也会越来越多(这也是会导致tree偏大,出现周期性背压的原因);

另外,dataFile命名呈现如下规律:

        level5的第二个文件总是跟第一个中间隔一个(这个跟改源码没有关系,只是适合观察规律);

到晚间的时候参与compact的file更多了:

3、观察每次level5生成的dataFile(理论上level5的dataFile会越来越大/多,当单个文件大小超过128M *(1+rate)时,会生成新文件);

        所有level5的文件大小加起来会越来越大,即永远是呈增长趋势;

        如下每一层的总大小在不断增大,同时当文件到一定程度之后,每层2个文件变成3个文件;

4、【以上3点均为原始实现思路,从这里开始改造】思考:既然已知每个bucket中只要最新的几个dataFile就包含了全部的data数据(dataFile不是增量的),那么就不能通过文件最新修改时间来判断数据是否过期,只能从最新的几个dataFile的每条记录来进行判断了,即原本每次参与合并的record是从这个partition+bucket建立开始的全部数据,那么是否可以通过修改源码判断每条record是否过期,从而不参与mergeTree,在compact完成之后也不会再次写入新的dataFile(如果还是写进来,每次读进tree时都需要判断是否过期,是否进入tree)?【答案当然是可以的!】

(三)源码改造

1、首先说明一下,在源码中有这么一段

// IntervalPartition.partition()
public List<List<SortedRun>> partition() {List<List<SortedRun>> result = new ArrayList<>();List<DataFileMeta> section = new ArrayList<>();BinaryRow bound = null;for (DataFileMeta meta : files) {if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {// larger than current right bound, conclude current section and create a new oneresult.add(partition(section));section.clear();bound = null;}section.add(meta);if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {// update right boundbound = meta.maxKey();}}if (!section.isEmpty()) {// conclude last sectionresult.add(partition(section));}return result;
}

        此处为了将文件排序、再将有overlap的放在一个list里边,一但产生gap(即没有overlap),那么就创建新的list,最终将这些 list 再放到List>中:

示意图如下:

2、后续通过一些处理变成 List> 的格式,这里的KeyValue就包含我们想要去操纵的record!

源码是这样的:

public <T> RecordReader<T> mergeSort(List<ReaderSupplier<KeyValue>> lazyReaders,Comparator<InternalRow> keyComparator,MergeFunctionWrapper<T> mergeFunction)throws IOException {if (ioManager != null && lazyReaders.size() > spillThreshold) {return spillMergeSort(lazyReaders, keyComparator, mergeFunction);}List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());for (ReaderSupplier<KeyValue> supplier : lazyReaders) {try {readers.add(supplier.get());} catch (IOException e) {// if one of the readers creating failed, we need to close them all.readers.forEach(IOUtils::closeQuietly);throw e;}}return SortMergeReader.createSortMergeReader(readers, keyComparator, mergeFunction, sortEngine);
}

        这里的return就会创建sortMergeReader了,我们可以在将数据传入这里之前,先进行过滤(通过判断每一条record是否超过过期时间),修改如下:

public <T> RecordReader<T> mergeSort(List<ReaderSupplier<KeyValue>> lazyReaders,Comparator<InternalRow> keyComparator,MergeFunctionWrapper<T> mergeFunction)throws IOException {if (ioManager != null && lazyReaders.size() > spillThreshold) {return spillMergeSort(lazyReaders, keyComparator, mergeFunction);}List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());for (ReaderSupplier<KeyValue> supplier : lazyReaders) {try {// 过滤掉过期数据RecordReader<KeyValue> filterSupplier =supplier.get().filter((KeyValue keyValue) ->isNotExpiredRecord(keyValue.value(), expireTimeMillis));readers.add(filterSupplier);} catch (IOException e) {// if one of the readers creating failed, we need to close them all.readers.forEach(IOUtils::closeQuietly);throw e;}}return SortMergeReader.createSortMergeReader(readers,keyComparator,mergeFunction,sortEngine,keyType.getFieldTypes(),valueType.getFieldTypes());
}// 判断这条数据是否过期
public boolean isNotExpiredRecord(InternalRow row, long expireTimeMillis) {if (expireTimeMillis <= 0) {return true;}// 只要有一个字段不为空,且大于0,且过期时间大于expireTimeMillis,就判断为过期for (Integer pos : expireFieldsPosSet) {if ((!row.isNullAt(pos))&& row.getLong(pos) > 0&& (System.currentTimeMillis() - row.getLong(pos)) > expireTimeMillis) {return false;}}return true;
}

与此同时,将相关参数暴露出来,可以在建表时进行自定义配置:

public static final ConfigOption<Integer> RECORDS_EXPIRED_HOUR =key("record.expired-hour").intType().defaultValue(-1).withDescription("Records in streams WON'T be offered into MergeTree when they are expired."+ " (Inorder to avoid too large MergeTree; -1 means never expired). ");public static final ConfigOption<String> RECORDS_EXPIRED_FIELDS =key("record.expired-fields").stringType().noDefaultValue().withDescription("Records in streams WON'T be offered into MergeTree when they are judged as [expired] according to these fields."+ "If you specify multiple fields, delimiter is ','.");

使用方法:

val createPaimonJoinTable = (s"CREATE TABLE IF NOT EXISTS ${paimonTable}(\n"+ " uuid STRING,\n"+ " metaid STRING,\n"+ " cid STRING,\n"+ " area STRING,\n"+ " ts1 bigint,\n"+ " ts2 bigint,\n"+ " d STRING, \n"+ " PRIMARY KEY (d, uuid) NOT ENFORCED \n"+ ") PARTITIONED BY (d) \n"+ " WITH (\n" +"    'merge-engine' = 'partial-update',\n" +"    'changelog-producer' = 'full-compaction', \n" +"    'file.format' = 'orc', \n" +s"    'sink.managed.writer-buffer-memory' = '${sinkWriterBuffer}', \n" +s"    'full-compaction.delta-commits' = '${fullCompactionCommits}', \n" +s"    'scan.mode' = '${scanMode}', \n" +s"    'bucket' = '${bucketNum}', \n" +s"    'sink.parallelism' = '${sinkTaskNum}', \n" +s"    'record.expired-hour' = '3' , \n" +   // user defined para"     'record.expired-fileds' = '4,5' , \n" +   // user defined para"     'sequence.field' = 'ts1' \n" +")")
tableEnv.executeSql(createPaimonJoinTable)

(四)修改效果

1、JOB状态

运行到晚上20点尚未出现背压:

checkpoint时间也没有过长(如果不剔除过期数据,到这个时间cp时长应该在3分钟左右):

生产到Kafka的消息也没有严重的断流或者锯齿现象:

还是有可能出现exception如下(但对数据量没有任何影响):

2、Level5的dataFile总大小

        上边只是现象,最终还是要数据说话。

        修改源码之后,观察dataFile,理论上每一层的size总大小可能会出现减小的情况 (因为过期数据就不会再写入到 level5 新的data文件中了)

        如下图:levelSize diff(下一次level总size - 上一次level总size),确实出现了“有正有负”的情况,于是验证源码修改生效(即每次进行compact只会读取近 n 个小时的数据进行合并)!

3、数据延迟

有意思的是,当我们修改源码(将过期的数据丢弃)之后,数据延迟也变小了。

数据延迟计算方法:paimon处理完将数据写到kafka队列的时间戳 - 前端埋点被触发被服务器接收到的时间戳;

修改前:

修改后:

4、关联率

        意外收获:

        经过上述过程改造源码,还可以解决“跨分区关联率偏低”的问题!!!

        既然是多个流相关联,那么就必然存在一个关联率的问题(一定会有部分数据因为埋点上报缺失/延迟导致关联不上)。于是就会存在如下问题:如果数据按“天”进行分区,那么在跨分区时刻也就必然会存在更多的数据关联不上(因为两个流的时间不是完全同步的,一条流可能落到前一天分区,另一条流可能落在第二天分区;数据不在同一个分区,就不会进入同一个mergeTree,也就关联不上)。

那么修改了源码之后是如何解决上述问题的呢?

        如前文所述,我们修改源码的目的是“使参与compact的数据不会持续增加”,于是修改代码使部分数据过期,最终level5(LSM tree的最深一层)的数据总量不持续增加。那么,既然数据不会持续增加,我们就可以将所有的数据全部放在一个分区中(或者理解为不设分区,一直在一个hdfs路径下;此时只有一开始跑的时候前一少部分数据关联率偏低,后边会维持在一个稳定水平),也就没有过跨分区一说了。

(五)未来展望:异步Compact

官方提供的paimon源码,里边的compaction是 sync 模式的,我尝试改成过 async 的,但是时不时会出现很少量的数据丢失(感觉可能是因为同一时刻有多个compact任务在进行),后续有机会可以再继续尝试一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/66592.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32定时器定时及其应用

STM32定时器定时及其应用 定时器概述☆定时器相关配置CubeMX工程配置及程序实现固件库程序设计及实现 定时器概述 1. 工作原理 使用精准的时基&#xff0c;通过硬件的方式&#xff0c;实现定时功能。定时器核心就是计数器 2. 定时器分类   基本定时器&#xff08;TIM6~TIM7…

Redis 事务

1. 是什么 1. 官网 https://redis.io/dosc/manual/transactions/ 2. 可以一次执行多个命令&#xff0c;本质是一组命令的集合。一个事务中的所有命令都会序列化&#xff0c;按顺序地串行化执行而不会被其它命令插入&#xff0c;不许加塞 2. 能干啥 一个队列中&#xff0c;一次…

2022年03月 C/C++(七级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C++编程(1~8级)全部真题・点这里 第1题:红与黑 有一间长方形的房子, 地上铺了红色、 黑色两种颜色的正方形瓷砖。你站在其中一块黑色的瓷砖上, 只能向相邻的黑色瓷砖移动。 请写一个程序, 计算你总共能够到达多少块黑色的瓷砖。 时间限制: 1000 内存限制: 65536 输入…

C语言----详解socket通信

一&#xff1a;什么是socket 刚接触socket的同学想必也知道socket的中文名&#xff0c;套接字&#xff0c;与其说是中文名倒不如说这是什么玩意&#xff0c;我们先不要管中文名的实际意义&#xff0c;我们先来了解一下什么是socket。 我们上网产生的数据都是经过协议栈一层一层…

Ubuntu18.04安装cuDNN

注册账号 https://developer.nvidia.com/rdp/cudnn-archive 该网站下载安装包需要先进行注册。登录成功后&#xff0c;找到与CUDA对应的版本。 选择Linux版本进行下载。 下载后的格式为.tar.xz 解压 tar xvJf cudnn-linux-x86_64-8.9.3.28_cuda12-archive.tar.xz配置环境 su…

【广州华锐互动】AR远程连接专家进行协同管理,解放双手让协同更便捷

AR远程协同系统是一种基于AR技术&#xff0c;实现远程设备维修和技术支持的系统。该系统通过将虚拟信息叠加在现实世界中&#xff0c;实现对设备的全方位监控和管理&#xff0c;并可以通过AR眼镜等终端设备&#xff0c;实时查看设备的各项数据和信息&#xff0c;为设备维修提供…

Python小知识 - 1. Python装饰器(decorator)

Python装饰器&#xff08;decorator&#xff09; Python装饰器是一个很有用的功能&#xff0c;它可以让我们在不修改原有代码的情况下&#xff0c;为已有的函数或类添加额外的功能。 常见的使用场景有&#xff1a; a. 函数缓存&#xff1a;对于一些计算量较大的函数&#xff0c…

2023.9.2 关于 JVM 垃圾回收机制(GC)

目录 为什么要有垃圾回收机制? STW&#xff08;Stop The World&#xff09;问题 垃圾回收机制主要回收哪个内存区域? 垃圾对象判断算法 引用计数算法 可达性分析算法 垃圾对象回收算法 标记清除算法 复制算法 标记整理算法 分代算法 为什么要有垃圾回收机制? 自动…

Navicat16连接Oracle报错:Oracle library is not loaded

1、有时候我们在用navicat的时候连接oracle的时候&#xff0c;它会提示我们Oracle library is not loaded&#xff0c;这时候我们要首先验证本机上是否已安装oracle的客户端&#xff0c;如果已安装客户段&#xff0c;navicat中的oci.dll选择我们安装的客户段的oci.dll文件 2、…

MATLAB中编译器中的变量联系到Simulink

MATLAB中编译器中的变量联系到Simulink 现在编译器中创建变量&#xff0c;进行编译&#xff0c;使其生成在工作区。 然后在Simulink中国使用变量即可。

opencv入门-Opencv原理以及Opencv-Python安装

图像的表示 1&#xff0c;位数 计算机采用0/1编码的系统&#xff0c;数字图像也是0/1来记录信息&#xff0c;图像都是8位数图像&#xff0c;包含0~255灰度&#xff0c; 其中0代表最黑&#xff0c;1代表最白 3&#xff0c; 4&#xff0c;OpenCV部署方法 安装OpenCV之前…

Hadoop 集群小文件归档 HAR、小文件优化 Uber 模式

文章目录 小文件归档 HAR小文件优化 Uber 模式 小文件归档 HAR 小文件归档是指将大量小文件合并成较大的文件&#xff0c;从而减少存储开销、元数据管理的开销以及处理时的任务调度开销。 这里我们通过 Hadoop Archive (HAR) 来进行实现&#xff0c;它是一种归档格式&#xf…

使用Docker配置深度学习的运行环境

文章目录 推荐实验环境前言docker安装docker操作docker配置常见方法&#xff08;安装包、联网、程序管理器&#xff09;安装驱动的前提要求传统方法安装驱动程序程序管理器安装联网安装deb包安装 安装完成后的设置非传统方法安装-通过容器安装驱动的前提要求安装NVIDIA-Contain…

Scala集合继承体系图

Scala集合简介 1&#xff09; Scala 的集合有三大类&#xff1a;序列 Seq、集Set、映射 Map&#xff0c;所有的集合都扩展自 Iterable特质。 2&#xff09; 对于几乎所有的集合类&#xff0c;Scala 都同时提供了可变和不可变的版本&#xff0c;分别位于以下两个包 不可变集合…

Orangepi安装外设库 wiringPi

注意&#xff1a;mobaXterm传送文件要在SSH登陆环境下才可以。 同时电脑和orangepi都在同一个wifi下。

unittest框架的使用

先简单介绍一下unittest的核心组成部分&#xff1a; 测试夹具&#xff1a;Test Fixture 一般用于执行测试用例的准备或者清理工作&#xff0c;比如测试开始前的数据准备或者测试结束的数据清理等。通过setUp()、tearDown()、setUpClass()、tearDownClass()这四个钩子函数实现了…

tableau基础学习2:时间序列数据预处理与绘图

文章目录 数据预处理1. 原始数据2. 合并数据集2. 创建计算字段 绘图分析1. 趋势分析2. 计算字段趋势分析 这一部分&#xff0c;我们记录一些分析时序趋势的分析步骤 数据预处理 1. 原始数据 原始数据是excel表格&#xff0c;其中包含三个Sheet页&#xff0c; 这里我们选择两…

ModaHub魔搭社区专访百度智能云李莅:以后所有的数据库它都会原生地支持用向量?

ModaHub魔搭社区&#xff1a;您是否认为&#xff0c;以后所有的数据库它都会原生地支持用向量&#xff1f; 李莅&#xff1a;传统数据库广义上也分好几类&#xff1a;一类是关系型的&#xff0c;一类是 NoSQL 类的&#xff0c;还有一类是分析型的数据库。我觉得关系型的这种数据…

Super Resolve Dynamic Scene from Continuous Spike Streams论文笔记

摘要 近期&#xff0c;脉冲相机在记录高动态场景中展示了其优越的潜力。不像传统相机将一个曝光时间内的视觉信息进行压缩成像&#xff0c;脉冲相机连续地输出二的脉冲流来记录动态场景&#xff0c;因此拥有极高的时间分辨率。而现有的脉冲相机重建方法主要集中在重建和脉冲相…