Flink Keyed State的优化与实践

本期作者

1.背景

Flink SQL在业务使用中有较多的双流join场景,当左右流的流量都较大,Join的等待时间即使为1小时,Flink Keyed State(Flink State分Operator State和Keyed State,后文所有State均代表后者)的存储大小也很容易达到TB级(内部默认使用的是RocksDBStateBackend)。

在State我们内部[1]之前就做了RT和长度的metric,当State的存储达到TB级别后,会发现State的scan/next/readNull请求RT会变得较高,另外双流Join不仅流量大,Join query的字段也较多,导致State的Value长度也较大,从而使得任务在流量高峰期CPU存在明显的周期性毛刺,根因是RocksDB的compaction引发。我们下面的内容主要是从业务场景跟进到RocksDB的读写行为,来优化RT耗时高的问题,并使用优化方案缓解compaction的压力。

2.Flink Keyed State诊断

我们统计了内部的TB级别大State任务,其State均由双流Join Operator产生。Flink的双流Join有两类,一类是无时间区间限制的Regular Join,其左右流的State Key在Inner和Outer Join下均会存放单条RowData数据,在监控中会发现Key length普遍较大,这类作业在内部使用数量以及State大小均相对较小。另一类是有时间区间限制的Interval Join,以及内部基于Interval Join实现的延迟弹出数据的Latency Join,其左右流的State在大State任务中Key很小,而Value较大,如下图所示是大State任务LatencyJoin的写KV长度监控。

图片

2.1 双流Join特性

第二类双流Join在内部使用数量和State存储大小均很大,内部的商业和AI这两个业务线中使用得最多,我们着重看一下这类Join在Flink的实现。

这类双流Join Operator,在Flink中会使用两个State,分别缓存左流和右流的数据,其结构为MapState<long,list>,Key存放的是毫秒时间戳,Value存放的是当前毫秒时刻相同On表达式列下的所有RowData记录,一个RowData就是当前流在Join后需要被Project的所有字段合集。当左右流的一条记录被Project的字段越多,单个RowData存储的字段也就越多,一个RowData的长度由所有Project字段的长度之和决定,大State任务中一般左右流的Project数量和长度均较大,所以也就导致了一个时间戳下即使只有一条RowData,写入RocksDB的List序列化结果也会较长。

图片

RocksDB的写流程示意图[2]如下,一对KV从client写入RocksDB中,会先写入WAL中(Flink中已关闭WAL),然后写入Memory table中,当Memory Table写满(阈值write_buffer_size=64MB)或主动触发flush(Flink checkpoint触发RocksDB的snapshot)后,会将数据刷入磁盘写入L0层的SST中,当L0层的文件数达到阈值(level0_file_num_compaction_trigger=4)会触发compaction操作,将L0层的所有数据和L1的数据合并并写入到L1中;当L1的存储大小达到阈值(max_bytes_for_level_base=256MB)后会触发compaction,将L1的文件和L2的若干个文件合并并写入L2;在L2及其以后,LN+1的存储大小为LN的10(max_bytes_for_level_multiplier)倍时均会触发compaction向LN+1合并数据。

图片

将任务添加state.backend.rocksdb.log.level=DEBUG_LEVEL配置后会发现,TB级别的双流Join大State任务,RocksDB的SST层级会变成[2,4,41,98,0,0,0],代表着L0至L7中SST文件个数,L3的文件数会十分多,这也是由于双流Join的两个特性决定的,一个是流量特别大,需要记录的KV数量庞大,另一个是前面提到的Value较长,导致整体的存储消耗也会较大,而RocksDB的层级分布,导致较多的数据存储到了L3,层级变多会导致compaction整体耗时变得较高。

2.2 RocksDB读RT高

RocksDB的读流程示意图[2]如下,Client读取数据会先从Memory Table中查询,如果没有找到则在磁盘的L0层读取数据,由于在L0层的数据不是全局有序的,所以会依次读取L0层的所有文件。RocksDB的L1至LN每层数据在当前Level内是全局排序的,所有L1至LN中的一个K只会在一个SST文件中,如果L0层未查询到数据,则会依次在L1至LN中每层只查询一个SST文件,直到查询到数据结束。 

图片

在前文中我们提到过,双流Join的Key是时间戳,Value存放的是当前时间戳的数据集合List,左右流数据进入到Join Operator会在对应State中做一个Get请求,再将当前RowDate放入List中,随着时间的推移,会存在大量的Get请求击穿RocksDB,我们标记其为ReadNull,与此同时上文中提到双流Join大State的RocksDB L3层的文件数是比较多的,所有的ReadNull请求均会从L0访问至L3结束,整体读耗时是比较高的。

另外这类Join的State是MapState<long,list>结构,会有两个地方调用Map的iterator。一个是对流数据Join当前State的RowData集合,另一个是由Timer State定时触发清理当前State的时间窗口外的数据,否则State会越来越大。而State中Map的iterator调用的是RocksDB的seek和next来查询数据,seek和next操作会将所有满足currentKey的数据遍历出来,同样会读到RocksDB的最底层Level的SST文件。

我们通过监控发现大State的seek、next、readnull等读RT耗时99线比较高,达到了两位数的毫秒级,这对实时流计算来说是不期望看到的结果。

图片

3.State优化

无论是读RT耗时变高,还是compaction导致CPU毛刺,都是因为State中的Value过大,导致SST的层级变多,扫描过多SST文件,读性能也就会有所下降。一条State记录,从L0到LN的过程,不仅会在跨层中被读写,而且在同层中,因其他数据compaction到当层,也会被多次读写,在高level的compaction中会发现磁盘的读写IO之和会达到400MB/s,读写中占比较大的数据是State的Value,如果Value在compaction中不被搬迁移动,那么可以大大降低IO和CPU毛刺,在做调研中发现了RocksDB社区的BlobDB[3]方案。

RocksDB的BlobDB方案来自Wisckey[4]的KV分离,当一对KV被flush落盘时,如果Value长度大于阈值,会将Value写入Blob文件中,SST文件中仅记录Value在Blob的index,在大Value场景下,使用KV分离后SST文件的总大小和层级将会大大的降低。

图片

Flink社区中RocksDB最高版本[5]在去年调研时仅为RocksDB对应的6.20版本,而此版本还无法通过Java调用开启BlobDB,在与内部分布式存储团队沟通后,决定使用他们已线上运行的7.8.3版本来构建Flink的RocksDB,从而来使用BlobDB特性,于是我们将Flink原有的CompactionFilter等实现合并到了7.8.3的release中。

经过适配并在大State中开启KV分离后,观察RocksDB日志发现SST的文件大小急剧下降,State Key也全聚集在了L0和L1这两层中。由于SST层级的降低,ReadNull请求访问到L1层就结束了,而seek和next请求在有数据的情况下,只需要额外访问一次对应的blob文件即可,若没有数据,对blob的IO读操作也可省略。最后的效果是ReadNull耗时全降到了百微妙左右,scan和next的RT 99线也降到了1毫秒左右。

图片

另外我们通过用户Case的双跑任务对比发现,开启KV分离的任务CPU毛刺有所弱化,CPU整体使用降低了50%。任务依旧存在少量的CPU毛刺高峰现象,通过RocksDB日志分析毛刺依旧来源于compaction,虽然SST的层级降低了,但是blob文件的GC还是存在,GC会将blob文件数据做搬迁工作,整合新文件和删除老blob文件,其工作会比较消耗磁盘IO和CPU资源,而blob的GC在现实中是无法关闭的,关闭blob GC会导致存储层文件只增不减。即使compaction依旧存在,我们也是享受到了升级RocksDB版本带来的优化的,高毛刺的频率相比老版本来说少了很多了。

图片

4.总结与展望

除了对大State做了KV分离外,小State的有一些任务会存在20min一次周期性毛刺现象,我们引用了分布式存储团队的InnerCompaction[6]补丁,KV长度均很小,L1层存储大小接近max_bytes_for_level_base,四次Checkpoint生成的4个L0层SST向L1层compaction会导致CPU毛刺现象产生,而InnerCompaction的优化原理是L0层的小文件在同层预先做compaction,避免频繁的与L1的数据做合并,防止IO放大。

去年下半年在内部给用户推进Flink State的KV分离功能后,线上所有大State任务的CPU使用率都减少了20-50%。未来可能有如下几个方面会继续推进优化。

a) 将内部高版本Flink中的RocksDB完成升级。

b) Flink做Checkpoint执行到RocksDB层的snapshot是比较轻的,和compaction不强关联的,后续可以考虑降低compaction的速率来达到进一步弱化CPU毛刺现象。

c) 目前内部的KV分离还需要使用一个参数控制开启,后期期望能默认全局开启,KV分离对Value长度大小的阈值也期望能自适应,无需用户感知。

参考:

[1]https://mp.weixin.qq.com/s/E23JO7YvzJrocbOIGO5X-Q

[2]https://blog.csdn.net/microGP/article/details/120416193

[3]https://github.com/facebook/rocksdb/wiki/BlobDB

[4]https://www.usenix.org/system/files/conference/fast16/fast16-papers-lu.pdf

[5]https://github.com/ververica/frocksdb

[6] https://github.com/bilibili/rocksdb

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/802242.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库(mysql)-基本查询语句(DQL)

查询语句 这边查询是给予一定表格,这边先做个解释 教师表包括(name(姓名),gender(性别),salary(工资),title(职位),subject_id(课程的编号),comm(奖金)) 学生表包括(姓名(name),gender(性别),job(职位),生日(birth)) 模版 SELECT 字段名 FROM 查询表 WHERE 查询语句 或与非…

k8s_入门_命令详解

命令详解 kubectl是官方的CLI命令行工具&#xff0c;用于与 apiserver进行通信&#xff0c;将用户在命令行输入的命令&#xff0c;组织并转化为 apiserver能识别的信息&#xff0c;进而实现管理k8s各种资源的一种有效途径 1. 帮助 2. 查看版本信息 3. 查看资源对象等 查看No…

Linux应用开发(3):Linux时间操作(time、mktime、localtime等)

1. 简述 在Linux系统中&#xff0c;时间操作函数是编程中经常使用的一部分&#xff0c;它们允许程序获取和设置系统时间&#xff0c;以及对时间进行各种处理。以下是一些常用的时间操作函数的详细介绍。 2. 时间操作 &#xff08;1&#xff09;time(): 获取1970年1月1日以来的…

爬虫入门教程(一)

爬虫入门教程 1.什么是爬虫 爬虫是一种自动获取网站数据的程序或脚本。它可以自动模拟人类访问网站,获取网页源代码,解析并提取出所需的数据。 爬虫的工作原理类似于搜索引擎的索引程序&#xff0c;它们会按照预定的规则和算法在互联网上不断地爬取网页&#xff0c;收集信息…

k8s知识

k8s是用于容器编排和管理的&#xff0c;docker或者ctr是k8s的运行时&#xff0c;k8s通过容器运行时来启动容器&#xff0c;容器启动需要镜像&#xff0c;镜像可以用docker构建&#xff0c;dockerfile就是用于自定义如何构建镜像&#xff0c;所以上面那套流水线就是先用dockerfi…

Linux|从 STDIN 读取 Awk 输入

简介 在之前关于 Awk 工具的系列文章中&#xff0c;主要探讨了如何从文件中读取数据。但如果你希望从标准输入&#xff08;STDIN&#xff09;中读取数据&#xff0c;又该如何操作呢&#xff1f; 在本文中&#xff0c;将介绍几个示例&#xff0c;展示如何使用 Awk 来过滤其他命令…

即插即用篇 | YOLOv8引入Haar小波下采样 | 一种简单而有效的语义分割下采样模块

本改进已集成到 YOLOv8-Magic 框架。 下采样操作如最大池化或步幅卷积在卷积神经网络(CNNs)中被广泛应用,用于聚合局部特征、扩大感受野并减少计算负担。然而,对于语义分割任务,对局部邻域的特征进行池化可能导致重要的空间信息丢失,这有助于逐像素预测。为了解决这个问题…

mysql 查询变量@i:=@i+1

学习完mysql的查询&#xff1a;基本查询&#xff0c;连接查询和子查询和mysql 正则表达式查询&#xff0c;接下来先学习下变量查询。 mysql中没有oracle序列号那一列。mysql可以使用查询变量的方式去处理。我们先了解下查询变量&#xff0c;后面应用起来就更清晰。 1&#xff0…

“弱智吧”才是人类面对AI的最后一道堡垒

在 AI 的研究领域中&#xff0c;语言模型的训练数据选择一直是一个关键问题。传统的智慧告诉我们&#xff0c;高质量的数据集应该是由专家精心挑选和校对的文本组成&#xff0c;以确保模型学习到的语言是规范、准确、有文化内涵的。 然而&#xff0c;最近的一项研究颠覆了这一观…

【Java】Java中类的初始化顺序(静态方法,静态块,非静态块,最后有流程图)

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 在日常使用Java的时候&#xff0c;我们都接触过new这个关键字&#xff0c;那你是否知道在我们的对象真正创建出来之前都做了哪些事情呢&#xff1f; 实际上要去判断一个类的初始化的顺序&#xff0c;需要分一下情况&…

Bean的默认名称

1.使用spring的注解 Component、Repository、Service、Controller 等注解去把一个类配置为bean时&#xff0c;如果不指定bean的名称&#xff0c;那么bean的名称的默认规则是&#xff1a; ①类名的首字母小写&#xff0c;例如&#xff1a;类名称 UserDao &#xff0c;那么默认的…

C++笔记:STL容器库的使用

前置&#xff1a; 对于stl容器库&#xff0c;我只做了一些常用的笔记&#xff0c;关于更详细的使用可以参考:https://cppreference.com/https://cppreference.com/ 一.string--字符串 对于C中string字符串会比C语言的字符数组使用起来会顺手许多。 命名空间&#xff1a;std 关于…

使用串口给ESP8266发送AT指令无反应解决

解决方法&#xff1a;重新烧录固件 设置PC机波特率 设置PC机上的端口波特率和flash下载工具中的波特率一致&#xff0c;否则flash下载工具会一直提示串口连接失败。我这里将PC机上的串口波特率设置为115200&#xff0c;然后flash下载工具波特率也设置为115200 Windows系统上…

电脑远程控制esp32上的LED

1、思路整理 首先esp32需要连接上wifi 然后创建udp socket 接受udp数据 最后解析数据&#xff0c;控制LED 2、micropython代码实现 import network from socket import * from machine import Pin p2Pin(2,Pin.OUT)def do_connect(): #连接wifi wlan network.WLAN(network.…

芒果YOLOv8改进组合157:动态标签分配ATSS+新颖高效AsDDet检测头组合改进,共同助力VisDrone涨点1.8%,小目标高效涨点

💡本篇内容:【芒果YOLOv8改进ATSS标签分配策略|第三集】芒果YOLOv8改进组合157:动态标签分配ATSS+新颖高效AsDDet检测头组合改进,共同助力VisDrone涨点1.8%,小目标高效涨点 💡🚀🚀🚀本博客 标签分配策略ATSS改进+ 新颖高效AsDDet检测头组合改进,适用于 YOLOv8 …

超详细解读Transformer框架

Transformer是由谷歌大脑2017年在论文《Attention is All You Need》中提出的一种序列到序列(Seq2Seq)模型。自提出伊始&#xff0c;该模型便在NLP和CV界大杀四方&#xff0c;多次达到SOTA效果。NLP领域中&#xff0c;我们所熟知的BERT和GPT就是从Transformer中衍生出来的预训练…

使用Ollama在本地运行AI大模型gemma

1.下载&#xff1a; https://github.com/ollama/ollama/releases 2.配置环境变量 我的电脑-右键-属性-系统-高级系统设置-环境变量-【系统环境变量】新建 变量名&#xff1a;OLLAMA_MODELS &#xff08;固定变量名&#xff09; 变量值&#xff1a;E:\Ollama\Lib &#xff0…

iOS 开发中上传 IPA 文件的方法(无需 Mac 电脑)

引言 在 iOS 开发中&#xff0c;将 IPA 文件上传到苹果开发者中心是一个重要的步骤。通常情况下&#xff0c;我们需要使用 Mac 电脑上的 Xcode 或 Application Loader 工具来完成这个任务。然而&#xff0c;如果你没有 Mac 电脑&#xff0c;也没有关系&#xff0c;本文将介绍一…

express里面的鉴权及express-session中间件的使用总结

了解 HTTP 协议的无状态性是进一步学习 Session 认证机制的必要前提。http 协议的无状态性&#xff0c;指的是客户端的每次 http 请求都是独立的&#xff0c;连续多个请求之间没有直接的关系&#xff0c;服务器不会主动保留每次 http 请求的状态。 什么是 Cookie Cookie 是存储…

如何在CentOS安装Nexus容器无公网IP远程管理本地仓库

文章目录 1. Docker安装Nexus2. 本地访问Nexus3. Linux安装Cpolar4. 配置Nexus界面公网地址5. 远程访问 Nexus界面6. 固定Nexus公网地址7. 固定地址访问Nexus Nexus是一个仓库管理工具&#xff0c;用于管理和组织软件构建过程中的依赖项和构件。它与Maven密切相关&#xff0c;可…