EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework

作者:周克勇,花名一锤,阿里巴巴计算平台事业部EMR团队技术专家,大数据领域技术爱好者,对Spark有浓厚兴趣和一定的了解,目前主要专注于EMR产品中开源计算引擎的优化工作。


背景和动机

SparkSQL多年来的性能优化集中在Optimizer和Runtime两个领域。前者的目的是为了获得最优的执行计划,后者的目的是针对既定的计划尽可能执行的更快。

相比于Runtime,Optimizer是更加通用的、跟实现无关的优化。无论是Java世界(Spark, Hive)还是C++世界(Impala, MaxCompute),无论是Batch-Based(Spark, Hive)还是MPP-Based(Impala, Presto),甚至无论是大数据领域还是传统数据库领域亦或HTAP领域(HyPer, ADB),在Optimizer层面考虑的都是非常类似的问题: Stats收集,Cost评估以及计划选择;采用的优化技术也比较类似,如JoinReorder, CTE, GroupKey Elimination等。尽管因为上下文不同(如是否有索引)在Cost Model的构造上会有不同,或者特定场景下采用不同的空间搜索策略(如遗传算法 vs. 动态规划),但方法大体是相同的。

长期以来,Runtime的优化工作基本聚焦在解决当时的硬件瓶颈。如MapReduce刚出来时网络带宽是瓶颈,所以Google做了很多Locality方面的优化;Spark刚出来时解决的问题是磁盘IO,内存缓存的设计使得性能相比MapReduce有了数量级的提升;后来CPU成为了新的瓶颈[1],因此提升CPU性能成了近年来Runtime领域重要的优化方向。

提升CPU性能的两个主流技术是以MonetDB/X100[2](如今演化为VectorWise[3])为代表的向量化(Vectorized Processing)技术和以HyPer[5][6]为代表的代码生成(CodeGen)技术(其中Spark跟进的是CodeGen[9])。简单来说,向量化技术沿用了火山模型,但与其让SQL算子每次计算一条Record,向量化技术会积攒一批数据后再执行。逐批计算相比于逐条计算有了更大的优化空间,例如虚函数的开销分摊,SIMD优化,更加Cache友好等。这个技术的劣势在于算子之间传递的数据从条变成了批,因此增大了中间数据的物化开销。CodeGen技术从另外一个角度解决虚函数开销和中间数据物化问题:算子融合。简单来说,CodeGen框架通过打破算子之间的界限把火山模型“压平”了,把原来迭代器链压缩成了大的for循环,同时生成语义相同的代码(Java/C++/LLVM),紧接着用对应的工具链编译生成的代码,最后用编译后的class(Java)或so(C++,LLVM)去执行,从而把解释执行转变成了编译执行。此外,尽管还是逐条执行,由于抹去了函数调用,一条Record从(Stage内的)初始算子一直执行到结束算子都基本处于寄存器中,不会物化到内存。CodeGen技术的劣势在于难以应用SIMD等优化。

两个门派相爱相杀,在经历了互相发论文验证自家优于对方后[4][8]两家走向了合作,合作产出了一系列项目和论文,而目前学界的主流看法也是两者融合是最优解,一些采用融合做法的项目也应运而生,如进化版HyPer[6], Pelonton[7]等。

尽管学界已走到了融合,业界主流却没有很强的动力往融合的路子走,探究其主要原因一是目前融合的做法相比单独的优化并没有质的提升;二是融合技术目前没有一个广为接受的最优做法,还在探索阶段;三是业界在单一的技术上还没有发挥出最大潜力。以SparkSQL为例,从2015年SparkSQL首次露面自带的Expression级别的Codegen,到后来参考HyPer实现的WholeStage Codegen,再经过多年的打磨,SparkSQL的Codegen技术已趋成熟,性能也获得了两次数量级的跃升。然而,也许是出于可维护性或开发者接受度的考虑,SparkSQL的Codegen一直限制在生成Java代码,并没有尝试过NativeCode(C/C++, LLVM)。尽管Java的性能已经很优,但相比于Native Code还是有一定的Overhead,并缺乏SIMD(Java在做这方面feature),Prefetch等语义,更重要的是,Native Code直接操作裸金属,易于极致压榨硬件性能,对一些加速器(如GPU)或新硬件(如AEP)的支持也更方便。

基于以上动机,EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来20%左右的性能提升,为EMR再次获取世界第一立下汗马功劳,本文将详细介绍Native Codegen框架。

核心问题

做Native Codegen,核心问题有三个:
1.生成什么?
2.怎么生成?
3.如何集成到Spark?

生成什么

针对生成什么代码,结合调研的结果以及开发同学的技术栈,有三个候选项:C/C++, LLVM, Weld IR。C/C++的优势是实现相对简单,只需对照Spark生成的Java代码逻辑改写即可,劣势是编译时间过长,下图是HyPer的测评数据,C++的编译时间比LLVM高了一个数量级。


编译时间过长对小query很不友好,极端case编译时间比运行时间还要长。基于这个考虑,我们排除了C/C++选项。上图看上去LLVM的编译时间非常友好,而且很多Native CodeGen的引擎,如HyPer, Impala, 以及阿里云自研大数据引擎MaxCompute,ADB等,均采用了LLVM作为目标代码。LLVM对我们来说(对你们则不一定:D)最大的劣势就是过于底层,语法接近于汇编,试想用汇编重写SparkSQL算子的工作量会有多酸爽。大多数引擎也不会用LLVM写全量代码,如HyPer仅把算子核心逻辑用LLVM生成,其他通用功能(如spill,复杂数据结构管理等)用C++编写并提前编译好。即使LLVM+C++节省了不少工作量,对我们来说依然不可接受,因此我们把目光转向了第三个选项: Weld IR(Intermediate Representation)。

首先简短介绍以下Weld。Weld的作者Shoumik Palkar是 Matei Zaharia的学生,后者大家一定很熟悉,Spark的作者。Weld最初想解决的问题是不同lib之间互相调用时数据传输的开销,例如要在pandas里调用numpy的接口,首先pandas把数据写入内存,然后numpy读取内存进行计算,对于极度优化的lib来说,内存的写入和读取的时间可能会远超计算本身。针对这个问题,Weld开发了Common Runtime并配套提供了一组IR,再加上惰性求值的特性,只需(简单)修改lib使其符合Weld的规范,便可以做到不同lib共用Weld Runtime,Weld Runtime利用惰性求值实现跨lib的Pipeline,从而省去数据物化的开销。Weld Runtime还做了若干优化,如循环融合,循环展开,向量化,自适应执行等。此外,Weld支持调用C代码,可以方便调用三方库。

我们感兴趣的是Weld提供的IR和对应的Runtime。Weld IR面向数据分析进行设计,因此语义上跟SQL非常接近,能较好的表达算子。数据结构层面,Weld IR最核心的数据结构是vec和struct,能较好地表达SparkSQL的UnsafeRow Batch;基于struct和vec可以构造dict,能较好的表达SQL里重度使用的Hash结构。操作层面,Weld IR提供了类函数式语言的语义,如map, filter, iterator等,配合builder语义,能方便的表达Project, Filter, Agg, BroadCastJoin等算子语义。例如,以下IR表达了Filter + Project语义,具体含义是若第二列大于10,则返回第一列:

|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))

以下IR表达了groupBy的语义,具体含义是按照第一列做groupBy来计算第二列的sum:

|v: vec[{i32,i32}]| for(v,dictmerger[i32,i32,+],|b,i,n| merge(b,{n.$0,n.$1}))

具体的语法定义请参考Weld文档(https://github.com/weld-project/weld/blob/master/docs/language.md)。
Weld 开发者API提供了两个核型接口:

  1. weld_module_compile, 把Weld IR编译成可执行模块(module)。

  2. weld_module_run, 执行编译好的模块。

基本流程如下图所示,最终也是生成LLVM代码。



由此,Weld IR的优势就显然易见了,既兼顾了性能(最终生成LLVM代码),又兼顾了易用性(CodeGen Weld IR相比LLVM, C++方便很多)。基于这些考虑,我们最终选择Weld IR作为目标代码。

怎么生成

SparkSQL原有的CodeGen框架之前简单介绍过了,详见https://developer.aliyun.com/article/727277。我们参考了Spark原有的做法,支持了表达式级别,算子级别,以及WholeStage级别的Codegen。复用Producer-Consumer框架,每个算子负责生成自己的代码,最后由WholeStageCodeGenExec负责组装。

这个过程有两个关键问题:

1.算子之间传输的介质是什么?
2.如何处理Weld不支持的算子?

传输介质

不同于Java,Weld IR不提供循环结构,取而代之的是vec结构和其上的泛迭代器操作,因此Weld IR难以借鉴Java Codegen在Stage外层套个大循环,然后每个算子处理一条Record的模式,取而代之的做法是每个算子处理一批数据,IR层面做假物化,然后依赖Weld的Loop-Fusion优化去消除物化。例如前面提到的Filter后接Project,Filter算子生成的IR如下,过滤掉第二列<=10的数据:

|v:vec[{i32,i32}]| let res_fil = for(v,appender,|b,i,n| if(n.$1>10, merge(b,n), b)

Project算子生成的IR如下,返回第一列数据:

let res_proj = for(res_fil,appender,|b,i,n| merge(b,n.$0))

表面上看上去Filter算子会把中间结果做物化,实际上Weld的Loop-Fusion优化器会消除此次物化,优化后代码如下:

|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))

尽管依赖Weld的Loop-Fusion优化可以极大简化CodeGen的逻辑,但开发中我们发现Loop-Fusion过程非常耗时,对于复杂SQL(嵌套3层以上)甚至无法在有限时间给出结果。当时面临两个选择:修改Weld的实现,或者修改CodeGen直接生成Loop-Fusion之后的代码,我们选择了后者。重构后生成的代码如下,其中1,2,11行由Scan算子生成,3,4,5,6,8,9,10行由Filter算子生成,7行由Project算子生成。

|v: vec[{i32,i32}]|for(v,appender,|b,i,n|if(n.$1 > 10,merge(b,n.$0),b))

这个优化使得编译时间重回亚秒级别。

Fallback机制

受限于Weld当前的表达能力,一些算子无法用Weld实现,例如SortMergeJoin,Rollup等。即使是原版的Java CodeGen,一些算子如Outter Join也不支持CodeGen,因此如何做好Fallback是保证正确性的前提。我们采用的策略很直观:若当前算子不支持Native CodeGen,则由Java CodeGen接管。这里涉及的关键问题是Fallback的粒度:是算子级别还是Stage级别?

抛去实现难度不谈,虽然直观上算子粒度的Fallback更加合理,但实际上却会导致更严重的问题:Stage内部Pipeline的断裂。如上文所述,CodeGen的一个优势是把整个Stage的逻辑Pipeline化,打破算子之间的界限,单条Record从初始算子执行到结束算子,整个过程不存在物化。而算子粒度的Fallback则会导致Stage内部一部分走Native Runtime,另一部分走Java Runtime,则两者连接处无可避免存在中间数据物化,这个开销通常会大于Native Runtime带来的收益。

基于以上考虑,我们选择了Stage级别的Fallback,在CodeGen阶段一旦遇到不支持的算子,则整个Stage都Fallback到Java CodeGen。统计显示,整个TPCDS Benchmark,命中Native CodeGen的Stage达到80%。

Spark集成

完成了代码生成和Fallback机制,最后的问题就是如何跟Spark集成了。Spark的WholeStageCodegenExec的执行可以理解为一个黑盒,无论上游是Table Scan,Shuffle Read,还是BroadCast,给到黑盒的输入类型只有两种: RowBatch(上游是Table Scan)或Row Iterator(上游非Table Scan),而黑盒的输出固定为Row Iterator,如下图所示:



上文介绍我们选择了Stage级别的Fallback,也就决定了黑盒要么是Java Runtime,要么是Native Runtime,不存在混合的情况,因此我们只需要关心如何把Row Batch/Row Iterator转化为Weld认识的内存布局,以及如何把Weld的输出转化成Row Iterator即可。为了进一步简化问题,我们注意到,尽管Shuffle Reader/BroadCast的输入是Row Iterator,但本质上远端序列化的数据结构是Row Batch,只不过Spark反序列化后转换成Row Iterator后再喂给CodeGen Module,RowBatch包装成Row Iterator非常简易。因此Native Runtime的输入输出可以统一成RowBatch。

解决办法呼之欲出了:把RowBatch转换成Weld vec!但我们更进了一步,何不直接把Row Batch喂给Weld从而省去内存转换呢?本质上Row Batch也是满足某种规范的字节流而已,Spark也提供了OffHeap模式把内存直接存堆外(仅针对Scan Stage。Shuffle数据和Broadcast数据需要读到堆外),Weld可以直接访问。Spark UnsafeRow的内存布局大致如下:


针对确定的schema,null bitmap和fixed-length data的结构是固定的,可以映射成struct,而针对var-length data我们的做法是把这些数据copy到连续的内存地址中。如此一来,针对无变长数据的RowBatch,我们直接把内存块喂给Weld;针对有变长部分的数据,我们也只需做大粒度的内存拷贝(把定长部分和变长部分分别拷出来),而无需做列级别的细粒度拷贝转换。

继续举前文的Filter+Project的例子,一条Record包含两个int列,其UnsafeRow的内存布局如下(为了对齐,Spark里定长部分最少使用8字节)。
 


显而易见,这个结构可以很方便映射成Weld struct:

{i64,i64,i64}

而整个Row Batch便映射成Weld vec:

vec[{i64,i64,i64}]

如此便解决了Input的问题。而Weld Output转RowBatch本质是以上过程的逆向操作,不再赘述。

解决了Java和Native之间的数据转换问题,剩下的就是如何执行了。首先我们根据当前Stage的Mode来决定走Java Runtime还是Native Runtime。在Native分支,首先会执行StageInit做Stage级别的初始化工作,包括初始化Weld,加载编译好的Weld Module,拉取Broadcast数据(若有)等;接着是一个循环,每个循环读取一个RowBatch(来自Scan或Shuffle Reader)喂给Native Runtime执行,Output转换并喂给Shuffle Writer。如下图所示:
 

总结

本文介绍了EMR团队在Spark Native Codegen方向的探索实践,限于篇幅若干技术点和优化没有展开,后续可另开文详解,例如:

1.极致Native算子优化
2.数据转换详解
3.Weld Dict优化

大家感兴趣的任何内容欢迎沟通: )

[1] Making Sense of Performance in Data Analytics Frameworks. Kay Ousterhout
[2] MonetDB/X100: Hyper-Pipelining Query Execution. Peter Boncz
[3] Vectorwise: a Vectorized Analytical DBMS. Marcin Zukowski
[4] Efficiently Compiling Efficient Query Plans for Modern Hardware. Thomas Neumann
[5] HyPer: A Hybrid OLTP&OLAP Main Memory Database System Based on Virtual Memory Snapshots. Alfons Kemper
[6] Data Blocks: Hybrid OLTP and OLAP on Compressed Storage using both Vectorization and Compilation. Harald Lang
[7] Relaxed Operator Fusion for In-Memory Databases: Making Compilation, Vectorization, and Prefetching Work Together At Last. Prashanth Menon
[8] Vectorization vs. Compilation in Query Execution. Juliusz Sompolski
[9] https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html

查看更多内容,欢迎访问天池技术圈官方地址:EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework_天池技术圈-阿里云天池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/53696.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

StarRocks Lakehouse 快速入门——Apache Iceberg

导读&#xff1a; StarRocks Lakehouse 快速入门旨在帮助大家快速了解湖仓相关技术&#xff0c;内容涵盖关键特性介绍、独特的优势、使用场景和如何与 StarRocks 快速构建一套解决方案。最后大家也可以通过用户真实的使用场景来了解 StarRocks Lakehouse 的最佳实践&#xff01…

2024国赛数学建模备赛|30种常用的算法模型之最优算法-层次分析法

层次分析法&#xff08;Analytic Hierarchy Process&#xff0c;简称 AHP&#xff09;是对一些较为复杂、较为模 糊的问题作出决策的简易方法&#xff0c;它特别适用于那些难于完全定量分析的问题。它是美 国运筹学家 T. L. Saaty 教授于上世纪 70 年代初期提出的一种简便、灵活…

网络安全服务基础Windows--第13节-加密技术

基本保密通信模型 密码学发展 1. 古典密码学&#xff08;1949年之前&#xff09; 主要特点&#xff1a;数据的安全基于算法的保密 ● 在古典密码学中&#xff0c;密码算法通常是通过⼿⼯或机械装置实现的。 ● 数据的安全性主要依赖于算法本身的保密性&#xff0c;即“安…

Return arguments from function calling with OpenAI API when streaming?

题意&#xff1a;在使用OpenAI API进行流式传输时&#xff0c;如何返回函数调用的参数&#xff1f; 问题背景&#xff1a; Ive made a simple OpenAI API example with function calling. Im only using function calling to format the response, Im not calling multiple fu…

一个vue前端的例子(六)如何获取table一行的id

比如我们要删除列表一行 vue中template中的scope到底是个什么&#xff1f;_vue template scope-CSDN博客 <el-button click"edit_tool(scope.$index)" type"warning" icon"el-icon-edit">编辑</el-button> 获取列表下标

Brave编译指南2024 Windows篇:Brave简介(一)

1.引言 随着互联网技术的不断发展&#xff0c;用户对隐私保护和安全性的需求日益增加。传统浏览器在这方面存在诸多不足&#xff0c;而Brave浏览器则通过一系列创新技术和功能&#xff0c;致力于为用户提供更好的隐私保护和浏览体验。Brave不仅屏蔽广告和跟踪器&#xff0c;还…

web项目如何部署到服务器上呢?——麻烦的方法

只需关注web项目如何部署到服务器上&#xff0c;因为服务器运行时就可以访问web项目了。 一、麻烦的方法 1、首先启动服务器 &#xff08;1&#xff09;找到bin文件夹 &#xff08;2&#xff09;双击运行startup.bat文件 &#xff08;3&#xff09;运行之后的界面如下&#…

Dart 3.5更新对普通开发者有哪些影响?

哈喽&#xff0c;我是老刘 Flutter 3.24以及Dart 3.5不久前发布了。 突然觉得时间过得好快。六年前刚开始使用Flutter 1.0的场景还在眼前。 之前写了一篇文章盘点Flutter 3.24的新功能对普通开发者有哪些影响。Flutter 3.24 对普通开发者有哪些影响&#xff1f;https://mp.wei…

vivado 设置物理约束

设置物理约束 在本实验中&#xff0c;您将为CPU网表设计创建物理约束&#xff0c;观察中的操作 GUI转换为Tcl命令。使用Tcl命令&#xff0c;可以轻松编写复杂的操作脚本 用于在流动的不同阶段重复使用。 注意&#xff1a;如果您从实验1继续&#xff0c;并且您的设计已打开&…

Centos7.9 安装Elasticsearch 8.15.1(图文教程)

本章教程,主要记录在Centos7.9 安装Elasticsearch 8.15.1的整个安装过程。 一、下载安装包 下载地址: https://www.elastic.co/cn/downloads/past-releases/elasticsearch-8-15-1 你可以通过手动下载然后上传到服务器,也可以直接使用在线下载的方式。 wget https://artifacts…

【学术会议征稿】2024年智能驾驶与智慧交通国际学术会议(IDST 2024)

2024年智能驾驶与智慧交通国际学术会议(IDST 2024) 2024 International Conference on Intelligent Driving and Smart Transportation 智能驾驶和智慧交通利用新兴技术&#xff0c;使城市出行更加方便、更具成本效益且更安全。在此背景下&#xff0c;由中南大学主办的2024年…

LLMs技术 | 整合Ollama实现本地LLMs调用

前言 近两年AIGC发展的非常迅速&#xff0c;从刚开始的只有ChatGPT到现在的很百家争鸣。从开始的大参数模型&#xff0c;再到后来的小参数模型&#xff0c;从一开始单一的文本模型到现在的多模态模型等等。随着一起进步的不仅仅是模型的多样化&#xff0c;还有模型的使用方式。…

65、Python之函数高级:装饰器实战,通用日志记录功能的动态添加

引言 从系统开发的规范性来说&#xff0c;日志的记录是一个规范化的要求&#xff0c;但是&#xff0c;有些程序员会觉得麻烦&#xff0c;反而不愿意记录日志&#xff0c;还是太年轻了…… 其实&#xff0c;如果个人保护意识稍微强一些&#xff0c;一定会主动进行日志的记录的…

python_openCV_计算图片中的区域的黑色比例

希望对原始图片进行处理&#xff0c;然后计算图片上的黑色和白色的占比 上图&#xff0c; 原始图片 import numpy as np import cv2 import matplotlib.pyplot as pltdef cal_black(img_file):#功能&#xff1a; 计算图片中的区域的黑色比例#取图片中不同的位置进行计算&…

关于武汉芯景科技有限公司的IIC缓冲器芯片XJ4307开发指南(兼容LTC4307)

一、芯片引脚介绍 1.芯片引脚 2.引脚描述 二、系统结构图 三、功能描述 1.总线超时&#xff0c;自动断开连接 当 SDAOUT 或 SCLOUT 为低电平时&#xff0c;将启动内部定时器。定时器仅在相应输入变为高电平时重置。如果在 30ms &#xff08;典型值&#xff09; 内没有变为高…

国产芯片LT9211D:MIPI转LVDS转换器,分辨率高达3840x2160 30Hz,碾压其它同功能芯片

以下为LT9211D&#xff1a;MIPI TO LVDS的芯片简单介绍&#xff0c;供各位参考 Lontium LT9211D是一款高性能MIPI DSI/CSI-2到双端口LVDS转换器。LT9211D反序列化 输入MIPI视频数据&#xff0c;解码数据包&#xff0c;转换格式化的视频数据流到LVDS发射机输出AP与移动显示面板或…

ppt模板简约下载哪个?这些模板简约又大气

中秋节&#xff0c;作为中国传统节日中最具诗意的一个&#xff0c;月圆人团圆的美好寓意总是让人心生向往。 想在国际网站上宣传这一传统节日的独特魅力&#xff0c;却担心自己的PPT不够吸引人&#xff1f;别急&#xff0c;使用精美免费的ppt模板&#xff0c;可以让你的演示瞬…

Python Flask_APScheduler定时任务的正确(最佳)使用

描述 APScheduler基于Quartz的一个Python定时任务框架&#xff0c;实现了Quartz的所有功能。最近使用Flask框架使用Flask_APScheduler来做定时任务&#xff0c;在使用过程当中也遇到很多问题&#xff0c;例如在定时任务调用的方法中需要用到flask的app.app_context()时&#…

【Canvas与艺术】菊花孔雀螺旋

【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>菊花孔雀螺旋</title><style type"text/css">…

.net MAUI应用生命周期

.NET Multi-platform App UI (.NET MAUI) 应用通常有四种执行状态&#xff1a;“未运行”、“运行中”、“已停用”和“已停止”。 当应用从未运行状态转换为运行状态、从运行状态转换为已停用状态、从已停用状态转换为已停止状态、从已停止状态转换为运行状态&#xff0c;以及…