抖音集团 FlinkSQL 性能优化探索及实践

本文作者:李精卫

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号回复【1】进入官方交流群

背景

随着抖音集团内部对流式任务的需求不断增长,Flink SQL作为一种低成本接入手段,已经在内部多个方向上得到大规模应用。目前,流式 SQL 任务的规模已经超过3万,任务资源使用量和分配量也达到了百万core。

在降本增效的大背景下,为了解决资源紧缺的问题,并同时满足业务对更高性能的需求,流式计算团队对 FlinkSQL 进行了深度优化,本文将聚焦这一实践,详解主要优化思路。

Engine优化

查询优化

View Reuse

在流式 SQL 中,为增加 SQL 代码的可读性,通常会将通用的计算逻辑放在 view 中。在这里,view 只是一个逻辑概念:在底层实现时,并没有真实存储中的 view 与之对应。

如下图所示,场景一表示任务中存在多个 sink 的表,view 中是窗口聚合运算的逻辑。场景二表示任务需要对两个流进行 union,view 中是普通聚合运算的逻辑。

在这两种场景中,用户会定义一个通用的 view 来进行计算。因为下游不同分支对 view 的查询不同,view 中的计算逻辑会在不同算子中重复计算,由此带来了重复的资源开销。那么问题就在于:为什么 view 没有被复用?

在 Calcite 的原有逻辑中,view 中包含的 Query 会被立即转化成一颗关系表达式树。如果有多条Query 访问了同一个 view,那么就会获得多颗属性完全相同,但分属于不同 Java 对象的 RelNode Tree。因此,后续所有优化都是基于不同的子树对象分别进行的,无法再重新合并成同一棵树。

  • Multi-sink

多 sink 的场景下,在生成 logical plan 阶段时,view 会被 Calcite 转换为多个 RelNode Tree。在后续 optimizer 的子图划分中,这些 RelNode Tree 不会被划分到相同子图中,从而导致 view 不能被复用。

由此可以看出,解决问题需要分别从 Calcite 和 Flink 入手。在Calcite 的 SqlToRel Convert中,不应立即将 view 中的 Query 转化成对应的 RelNode Tree,而应直接返回包含了对应 Sql CatalogView Table 的 LogicalTableScan。

在Flink中,CatalogView 的实现需要将 LogicalTableScan 对象存储下来,让下游节点都引用同一个 CatalogView。在优化之前,将 LogicalTableScan 中的 view 展开成 RelNode Tree,以便下游节点能够引用相同的 RelNode Tree 对象。

  • Union all

在 Union all 场景中,为了复用 view,可以在 view 后面增加一个虚拟的 sink 节点,将 Union all 场景转换为多 sink 场景。这可以使 view 在 logical plan 阶段时,不会提前展开成 RelNode Tree,而 union 也能够引用到相同的 View 对象。虚拟的 sink 节点则会在子图划分后被删除。

从上述两个场景中可知,在进行了 view 复用优化后,view 对应的计算逻辑只需计算一次,整体 CPU 收益为20%。

Remove Redundant Streaming Shuffle

Remove Redundant Streaming Shuffle 可以移除流式场景下不必要的数据分发开销。在批式场景中, shuffle 操作会有落盘的性能开销,这已经在社区中得到了优化。而在流式场景中,shuffle 操作则有序列化和网络传输的开销。

如下图例子所示,在计算不同品类产品价格 Top5 的平均值时,使用了排序和聚合计算。在排序和聚合前对 id 进行了 hash,这说明两个算子有相同的 hash key。数据被 rank 算子 hash 后,就不需要再进行第二次 hash 了,这说明第二个 shuffle 是多余的。

shuffle 是在生成 physical plan 的阶段中产生的。下图展示了 Sql 优化器将 SqlNode 从逻辑节点转换为物理节点的过程,在这个过程中,shuffle 也就是 exchange。转换过程是通过规则进行的,在 relRule.Convert 过程中会遍历每一个逻辑节点,判断当前节点是否满足转换规则,如果存在不满足的情况,就会增加一个 AbstractConvert。

在生成 Exchange 的规则中,会判断当前节点的数据分布特征是否满足需求,如果不满足,就在节点上游增加 Exchange 节点来满足数据分布的特征。最后,PhysicalExchange 会被转换为 hashShuffle,用于数据的分发。

如何移除掉多余的 Streaming Shuffle?针对该问题,主要思路是参考 Batch 对 Shuffle 的优化。在规则转换的过程中,不仅要考虑节点本身,还要考虑输入节点的特征是否满足需求,将问题往上抛。

实现针对 Physical RelNode 的规则判断方法,主要分为以下两种情况:

  • 对于本身没有数据分布特征的节(如 Calc 和 Correlate Node),判断它们能否满足一个特定数据分布的需求,只需检查自身输入中是否包含 hash key。

  • 对于本身有数据分布特征的节点(如 Aggregate 和 Rank nodes),需要确认本身的数据分布特征是否满足给定的 distribution requirements。

        如下图所示,首先要检查 aggregate 节点是否满足数据分布特征,这需要查看它的输入,即 rank 节点是否满足要求。如果 rank 节点不满足,则需要在其上游添加 exchange 节点。添加后,rank 算子满足了数据分布特征。由于 rank 和 aggregate 的 hashkey 相同,因此 aggreagte 也满足了。

该方法可为火山模型提供更优、成本更低的执行计划。火山模型最终将选择这个移除多余 Exchange 的执行计划。移除多余的 streaming shuffle 后,rank 算子和 agg 算子中的 hash 连接已经消失,并且 chain 在一起,整体 CPU 收益达到了 24%。这也为在 Streaming 场景下优化 MultipleInput 的算子提供了可能。

查询执行优化

Streaming MultipleInput Operator

基于 Remove Streaming Shuffle,在对多余的 hash shuffle 进行优化的前提下,可以在 join+join、 join+agg、join+union 中,对shuffle 进行更深层的优化。

如下图所示,因为 agg1 hash key 和 Join left key 相同;agg2 hash key 和 Join right key 相同,所以可将 Join 前的 hash 变为 forward。

当前的 OperatorChain 策略不支持多input算子的 Chain,无法避免因多余 shuffle 而导致的序列化、反序列化和可能的网络开销。因此,流式计算团队使用 MultipleInput 机制,在 Streaming 场景下,将多个 Input 的算子上下游合并为 MultipleInutOperator,从而进行优化。

具体而言,优化经历了以下几个步骤:

  • 首先,在 Planner 层构建出 MultipleInputExecNode。

    MultipleInputExecNode 是在 logical physical 计划后,当 plan 被转换为ExecNode DAG时,从 ExecNodeDAG 中推导而出。获得 ExecNodeDAG 后,先从根节点进行广度优先搜索,从而获取图的拓扑排序。构建 MultipleInputExecNode 是在 Covert ExecNode DAG 环节进行的,完成这一系列操作后,它将在 ExecNode Graph 中构建出来。

  • 在生成 StreamMultipleInputExecNode 后,被 translate 成 StreamMultipleInput transformation。

    在 transformation 中,包含了创建 MultipleInput Operator 的一些信息,通过 TableOperatorWrapper 存储 sub-op 信息。

  • 生成 Job Graph。这需要满足以下2个条件:

    StreamConfig 需要兼容 Multiple Input 从 two Input 的 TypeSerializer1,2变成 TypeSerializer[],这主要用于 state/key 数据传输。
    Stream Graph 可以添加 MultipleInputOperator 节点,通过方法 addMultipleInputOperator,将 Transformation 对应的 properties 添加到 vertex 中构成 Stream Graph 中的节点。

运行时实现了 StreamingMultipleInputOperator ,且需要考虑算子的创建,算子的数据处理,状态,Timer&&Watermark,Barrier,Checkpoint 等问题。

  • Operator initialization:

    • 不只要创建 StreamingMultipleInputOperator,也要创建对应的 sub-op;

    • sub-op 本质上是 Abstract StreamOperator,sub-op id = op id + index;

    • 在 createAllOperator 创建每个 sub-op 对象,并构建 DAG 的输入输出。

  • ProcessElement :

    • 处理数据过程中要保证 key 的传递。

  • State

    • MultipleInputStreamOperato 和 sub-op 分享state handler;

    • 创建新的 API stateNameContext 来解决状态名字冲突。

  • Timer && Watermark

    • MultipleInputStreamOperator和sub-op 分享 timeServiceManager;

    • 创建新的 api TimerNameContext来解决状态名字冲突;

    • timeServiceManager 以 sub-op粒度管理 timer;

    • 使用Combindedwatermark 来保证 Watermark 对齐。

  • barrier:

      此处无需过多考虑,MultipleInput Operator 内部没有 buffer 中的数据,因此按照拓扑顺序进行 checkpoint 不会丢失数据。但需要注意的是,需要将 prepareSnapshotPreBarrier 从 MultipleInputStreamOperator 传播到所有子算子。

经过优化后, agg+join 操作会被合并到 MultipleInput 算子中,这将带来10%的 Cpu 收益,同时也会解决网络内存不足导致任务无法启动的问题

Optimization of Long Sliding Windows
  • 长滑动窗口及其底层实现逻辑

在 Flink SQL 中,长滑动窗口的具体写法是 Hop(table, slide, size)。其中,size 表示窗口的大小,slide 表示窗口移动的步长。在滑动窗口中,如果步长小于窗口大小,那么会有元素属于不同的窗口。

在滑动窗口计算中,如果窗口时间周期长,在大流量场景下计算7天、30天等时间段的uv并进行去重的操作时,会出现计算中数据延迟特别严重,甚至数据无法推动的问题,即便增加资源也无法解决这一问题。

经过对滑动窗口底层实现逻辑的分析,可知滑动窗口计算的主要性能瓶颈在于窗口计算最小的单位——窗格(pane)的合并操作。pane 是窗口大小和步长的最大公约数,大多数时候,pane 的大小都是 1。每次滑动窗口触发计算时,均需要把当前窗口下对应的所有窗格数据重新合并一遍。由于长窗口下其窗格数量很多,所以性能开销很大。

  • 长滑动窗口优化思路

对此,主要的优化思路是以空间换时间:

  1. 在窗口算子中定义全局状态,存储当前窗口的计算结果;

  2. 在聚合函数中新增 retractMerge 方法,窗口向后滑动时,移除被划走窗口的数据;

  3. 触发下一次计算时,合并新增窗口的数据。

如下图所示:在窗口向后滑动 3 个窗格时,移除 pane1-pane3 的结果,再合并进来 pane 11-pane13 的结果。总共需要计算6个窗格,优化了4个窗格的计算。

因此,当窗口大小和滑动步长的比值越大,优化效果就越明显。优化后,整体 CPU 收益达到了 60%。

数据处理(Format侧)

Native Json Format

目前,抖音集团公司内部约有1.3万个任务使用 Json Format,占用资源近 70 万core。如果按照 5%的占比进行保守估计,线上约有3.5万core用于 Json 的反序列化,因此该部分有较大优化空间。

下图展示了数据从消息队列(MQ)中读取,并最终传递给下游运算符的主要流程。其中,Json 反序列化和将 GeneralRowData 序列化为字节,是两个重要的开销。

针对上述两项重要的资源消耗,主要从以下两个方面进行优化:

  • 针对 Json 反序列化开销

      使用支持向量化编程的 c++ json 解析库, 选择字节内部自研的 sonic-cpp,来提高性能。

  • 针对序列化为 binaryRowData 的开销

      使用 native 方法直接产出 BinaryRowData 所需要的二进制表示,再使用 BinaryRowData 指向这一部分数据,从而免去序列化对应的开销。

在测试集中,native Json 的 CPU 收益能够达到 57%。

优化实践

为了确保引擎优化能够给业务方带来实际的优化效果,流式计算团队在内部做了大量工作,以确保优化项能够稳定上线,以下将对此展开详细介绍。

  • 工具层

如上框架图所示,最下层是工具层,具备以下5项能力:

  • 支持 SQL 任务元信息实时上报;

  • 算子粒度离线数仓,提供算子粒度的任务监控;

  • Commits 粒度 DAG 兼容性检查 :可以提前发现哪些优化项会影响任务状态恢复;

  • 优化项分优先级灰度:可以限制风险暴露范围;

  • 数据准确性链路构建:保证了上线优化项不会导致数据准确性发现问题。

基于上述能力,工具层实现了算子粒度的任务监控,同时保证了任务稳定性和数据准确性

  • 优化层

在优化项这层,对存量优化进行推广上量或全量,同时也对很多新增优化项展开探索和推广。

  • 引擎&平台层

在引擎&平台层,与业务方协作,推动存量任务治理。通过在平台侧进行优化项配置,使新增作业能够直接应用某些优化项。同时,经过校验的优化项将在引擎侧中默认开启。

经过优化,最终获得了 10w core+ 的性能收益。

未来展望

在未来,流式计算团队将持续优化 FlinkSQL,探索 Join 中状态的最佳使用方式。同时,也会在流批融合 native Engine 等方向上持续探索发力。

点击跳转 火山引擎Flink流式计算 了解更多

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/52173.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

K8S系列——(二)、K8S部署RocketMQ集群

1、环境准备 要将RocketMQ部署到K8S上,首先你需要提前准备一个K8S集群环境,如图我已经准备好了一个版本为 v1.28.13 的 K8S 集群(其他版本也没问题): 角色IPMaster192.168.6.220Node-1192.168.6.221Node-2192.168.6.…

React 学习——Class类组件的基本结构

老版本的react的写法&#xff1a;是通过class类组件的 import { Component } from react; class Counter extends Component{state {count: 0,}setCount ()>{this.setState({ count: this.state.count 1 })}render(){return <button onClick{this.setCount}>{thi…

【Qt】输入类控件QLineEdit

目录 输入类控件QLineEdit 例子&#xff1a;录入个人信息 例子&#xff1a;使用正则表达式验证输入框的数据 例子&#xff1a;验证俩次输入密码一致 例子&#xff1a;切换显示代码 输入类控件QLineEdit QLineEdit 用来表示单行输入框&#xff0c;可以输入一段文本&#xf…

Go 文件操作基本方法大全

前言 在Go语言中&#xff0c;操作文件主要依赖于标准库中的os和io/ioutil&#xff08;注意&#xff1a;io/ioutil在Go 1.16及以后版本中被逐步弃用&#xff0c;推荐使用io和os包中的函数进行替代&#xff09;以及io和bufio等包。以下是一些基于这些基本库操作文件的方法大全&a…

Python和MATLAB谐波生成导图

&#x1f3af;要点 绘制三次谐波生成透射功率谱、对数对数图表示半导体曲面二次谐波生成&#xff0c;分析判断材料特性谐波均值估计计算边际似然&#xff08;贝叶斯统计&#xff09;二次谐波散射分析胶体染料分子结构交流电谐波波形傅立叶分析分析旋转各向异性谐波高次谐波非线…

appium下载及安装

下载地址&#xff1a;https://github.com/appium/appium-desktop/releases 双击安装就可以

深入学习SQL优化的第三天

目录 聚合函数 排序和分组 聚合函数 1251. 平均售价 表&#xff1a;Prices------------------------ | Column Name | Type | ------------------------ | product_id | int | | start_date | date | | end_date | date | | price | int …

桌球厅助教陪练系统源码开发和行业市场分析

台球助教陪练系统&#xff1a;引领智能化运动体验 作为一款专为台球爱好者设计的智能陪练系统&#xff0c;我们的目标是通过技术创新&#xff0c;让每位用户都能享受到个性化、高效的学习体验。无论是初学者还是寻求突破的高手&#xff0c;都能在我们的平台上找到适合自己的陪…

基于YOLO V8的PCB缺陷检测识别系统(python源码+Pyqt5界面+数据集+训练代码)

数据集准备&#xff1a;收集并标注PCB缺陷的图像。模型训练&#xff1a;使用YOLO v8框架训练一个模型来识别这些缺陷。GUI开发&#xff1a;利用PyQt5创建一个用户友好的图形界面。模型部署&#xff1a;在GUI中集成训练好的模型&#xff0c;使用户能够上传PCB图像并得到缺陷检测…

IOS 11 通用Base控制器封装

整体规划 BaseController&#xff1a;把viewDidLoad逻辑拆分为三个方法&#xff0c;方便管理。 BaseCommonController&#xff1a;不同项目可以复用的逻辑&#xff0c;例如&#xff1a;设置背景颜色方法等 BaseLogicController&#xff1a;本项目的通用逻辑&#xff0c;主要…

实现 FastCGI

CGI的由来&#xff1a; 最早的 Web 服务器只能简单地响应浏览器发来的 HTTP 请求&#xff0c;并将存储在服务器上的 HTML 文件返回给浏 览器&#xff0c;也就是静态 html 文件&#xff0c;但是后期随着网站功能增多网站开发也越来越复杂&#xff0c;以至于出现动态技 术&…

【Pyhthon读取 PDF文件表格 ,转为 CSV/TSV/JSON文件】

tabula-py tabula-py 是一个将 PDF 表格转换为 pandas DataFrame 的工具。 tabula-py 是 tabula-java 的包装器&#xff0c;需要您的机器上有 java。 tabula-py 还允许您将 PDF 中的表格转换为 CSV/TSV 文件。 tabula-py 的 PDF 提取准确度与 tabula-java 或 tabula app 相…

8月21日笔记

Frp Frp(Fast e Reverse ) Proxy) 是一款简单&#xff0c;好用&#xff0c;稳定的隧道工具。Frp 使用 Go语言开发&#xff0c;支持跨平台&#xff0c;仅需下载对应平台的二进制文件即可执行&#xff0c;没有额外依赖。它是一款高性能的反向代理应用&#xff0c;可以轻松地进行…

Spring DI 数据类型—— set 方法注入

首先新建项目&#xff0c;可参考 初识IDEA、模拟三层--控制层、业务层和数据访问层 一、spring 环境搭建 &#xff08;一&#xff09;pom.xml 导相关坐标 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.or…

http连接未释放导致生产故障

凌晨4点运维老大收到报警&#xff08;公司官网页面超时&#xff0c;上次故障因为运维修改nginx导致官网域名下某些接口不可用后&#xff0c;运维在2台nginx服务器上放了检测程序&#xff0c;检测官网页面&#xff09;&#xff0c;运维自己先看了看服务器相关配置&#xff0c;后…

Java实现STL中的全排列函数next_permutation()

目录 一、引言 二、全排列函数next_permutation() 三、next_permutation()的使用 四、Java实现next_permutation() 五、使用next_permutation()实现全排列 一、引言 相信很多小伙伴们都做过全排列的算法题&#xff0c;输入一个n&#xff0c;输出1~n的全排列。对于这个问题…

JVM 有哪些垃圾回收器?

JVM 有哪些垃圾回收器&#xff1f; 图中展示了7种作用于不同分代的收集器&#xff0c;如果两个收集器之间存在连线&#xff0c;则说明它们可以搭配使用。虚拟机所处的区域则表示它是属于新生代还是老年代收集器。 新生代收集器&#xff08;全部的都是复制算法&#xff09;&…

【安全靶场】-DC-7

❤️博客主页&#xff1a; iknow181 &#x1f525;系列专栏&#xff1a; 网络安全、 Python、JavaSE、JavaWeb、CCNP &#x1f389;欢迎大家点赞&#x1f44d;收藏⭐评论✍ 一、收集信息 1.查看主机是否存活 nmap -T4 -sP 192.168.216.149 2.主动扫描 看开放了哪些端口和功能 n…

【网络】UDP和TCP之间的差别和回显服务器

文章目录 UDP 和 TCP 之间的差别有连接/无连接可靠传输/不可靠传输面向字节流/面向数据报全双工/半双工 UDP/TCP API 的使用UDP APIDatagramSocket构造方法方法 DatagramPacket构造方法方法 回显服务器&#xff08;Echo Server&#xff09;1. 接收请求2. 根据请求计算响应3. 将…

黑马头条vue2.0项目实战(十一)——功能优化(组件缓存、响应拦截器、路由跳转与权限管理)

1. 组件缓存 1.1 介绍 先来看一个问题&#xff1f; 从首页切换到我的&#xff0c;再从我的回到首页&#xff0c;我们发现首页重新渲染原来的状态没有了。 首先&#xff0c;这是正常的状态&#xff0c;并非问题&#xff0c;路由在切换的时候会销毁切出去的页面组件&#xff…