基于DPU和HADOS-RACE加速Spark 3.x

背景简介

Apache Spark(下文简称Spark)是一种开源集群计算引擎,支持批/流计算、SQL分析、机器学习、图计算等计算范式,以其强大的容错能力、可扩展性、函数式API、多语言支持(SQL、Python、Java、Scala、R)等特性在大数据计算领域被广泛使用。其中,Spark SQL 是 Spark 生态系统中的一个重要组件,它允许用户以结构化数据的方式进行数据处理,提供了强大的查询和分析功能。

随着SSD和万兆网卡普及以及IO技术的提升,CPU计算逐渐成为Spark 作业的瓶颈,而IO瓶颈则逐渐消失。 有以下几个原因,首先,因为 JVM 提供的 CPU 指令级的优化如 SIMD要远远少于其他 Native 语言(如C/C++,Rust)导致基于 JVM 进行 CPU 指令的优化比较困难。其次,NVMe SSD缓存技术和AQE带来的自动优化shuffle极大的减轻了IO延迟。最后,Spark的谓词下推优化跳过了不需要的数据,进一步减少了IO开销。

基于此背景,Databricks(Spark背后的商业公司)在2022年SIGMOD会议上发表论文《Photon: A Fast Query Engine for Lakehouse Systems》,其核心思想是使用C++、向量化执行等技术来执行Spark物理计划,在客户工作负载上获得了平均3倍、最大10倍的性能提升,这证明Spark向量化及本地化是后续值得优化的方向。 Spark3.0(2020年6月发布)开始支持了数据的列式处理,英伟达也提出了利用GPU加速Spark的方案,利用GPU的列式计算和并发能力加速Join、Sort、Aggregate等常见的ETL操作。

DPU(Data Processing Unit) 作为未来计算的三大支柱之一,其设计旨在提供强大的计算能力,以加速各种数据处理任务。DPU的硬件加速能力,尤其在数据计算、数据过滤等计算密集型任务上,为处理海量数据提供了新的可能。通过高度定制和优化的架构,DPU能够在处理大规模数据时显著提升性能,为数据中心提供更高效、快速的计算体验,从而满足现代数据处理需求的挑战。但是目前DPU对Spark生态不能兼容,Spark计算框架无法利用DPU的计算优势。

中科驭数HADOS 异构计算加速软件平台(下文简称HADOS)是一款敏捷异构软件平台,能够为网络、存储、安全、大数据计算等场景进行提速。对于大数据计算场景,HADOS可以认为是一个异构执行库,提供了数据类型、向量数据结构、表达式计算、IO和资源管理等功能。 为了发挥Spark与DPU各自的优势,基于HADOS平台,我们开发了RACE算子卸载引擎,既能够发挥Spark优秀的分布式调度能力又可以发挥DPU的向量化执行能力。

我们通过实验发现,将Spark SQL的计算任务通过RACE卸载到DPU上, 预期可以把原生SparkSQL的单表达式的执行效率提升至9.97倍,TPC-DS单Query提升最高4.56倍。本文将介绍如何基于 DPU和RACE来加速 Spark SQL的查询速度,为大规模数据分析和处理提供更可靠的解决方案。

整体架构

整个解决方案可以参考下图:

• 最底层硬件资源层是DPU硬件,是面向数据中心的专用处理器,其设计旨在提供强大的计算能力,以加速各种数据处理任务,尤其是优化Spark等大数据框架的执行效率。通过高度定制和优化的架构,DPU能够在处理大规模数据时显著提升性能,为数据中心提供更高效、快速的计算体验。

• DPU加速层底层是HADOS异构计算加速软件平台,是中科驭数推出的专用计算敏捷异构软件开发平台。HADOS数据查询加速库通过提供基于列式数据的查询接口,供数据查询应用。支持Java、Scala、C和C++语言的函数调用,主要包括列数据管理、数据查询运行时函数、任务调度引擎、函数运算代价评估、内存管理、存储管理、硬件管理、DMA引擎、日志引擎等模块,目前对外提供数据管理、查询函数、硬件管理、文件存储相关功能API。

• DPU加速层中的RACE层,其最核心的能力就是修改执行计划树,通过 Spark Plugin 的机制,将Spark 执行计划拦截并下发给 DPU来执行,跳过原生 Spark 不高效的执行路径。整体的执行框架仍沿用 Spark 既有实现,包括消费接口、资源和执行调度、查询计划优化、上下游集成等。

• 最上层是面向用户的原生Spark,用户可以直接使用已有的业务逻辑,无感享受DPU带来的性能提升

目前支持的算子覆盖Spark生产环境常用算子,包括Scan、Filter、Project、Union、Hash Aggregation、Sort、Join、Exchange等。表达式方面,我们开发了目前生产环境常用的布尔函数、Sum/Count/AVG/Max/Min等聚合函数。

其中RACE层的架构如下:

下面我们着重介绍RACE层的核心功能。

核心功能模块

RACE与Spark的集成

RACE作为Spark的一个插件,实现了SparkPlugin接口,与Spark的集成分为Driver端和Executor端。

• 在Driver端, 通过Spark Catalyst扩展点插入自定义的规则,实现对查询语句解析过程、优化过程以及物理计划转换过程的控制。

• 在Executor端, 插件在Executor的初始化过程中完成DPU设备的初始化工作。

Plan Conversion

Spark SQL在优化 Physical Plan时,会应用一批规则,RACE通过插入的自定义规则可以拦截到优化后的Physical Plan,如果发现当前算子上的所有表达式可以下推给DPU,那么替换Spark原生算子为相应的可以在DPU上执行的自定义算子,由HADOS将其下推给DPU 来执行并返回结果。

Fallback

Spark支持的Operator和Expression非常多,在RACE研发初期,无法 100% 覆盖 Spark 查询执行计划中的算子和表达式,因此 RACE必须有Fallback机制,支持Spark 查询执行计划中部分算子不运行在DPU上。

对于DPU无法执行的算子,RACE安排 Fallback 回正常的 Spark 执行路径进行计算。例如,下图中展示了插件对原生计划树的修改情况,可以下推给DPU的算子都替换成了对应的"Dpu"开头的算子,不能下推的算子仍然保留。除此之外,会自动插入行转列算子或者列转行算子来适配数据格式的变化。

当然了,不管是行转列算子还是列转行算子,都是开销比较大的算子,随着RACE支持的算子和表达式越来越多,Fallback的情况会逐渐减少。

Strategy

当查询计划中存在未卸载的算子时,因为这样引入了行列转换算子,由于其带来了额外的开销,导致即使对于卸载到DPU上的算子,其性能得到提升,而对于整个查询来说,可能会出现比原生Spark更慢的情况。 针对这种情况,最稳妥的方式就是整个Query全部回退到CPU,这至少不会比原生Spark慢,这是很重要的。

由于Spark3.0加入了AQE的支持,规则通常拦截到的是一个个QueryStage,它是Physical Plan的一部分而非完整的 Physical Plan。 RACE的策略是获取AQE规则介入之前的整个Query的 Physical Plan,然后分析该Physical Plan中的算子是否全部可卸载。如果全部可以卸载,则对QueryStage进行Plan Conversion, 如果不能全部卸载,则跳过Plan Conversion转而直接交给Spark处理。

我们在实际测试过程中发现,一些算子例如Take操作,它需要处理的数据量非常小,那么即使发生Fallback,也不会有很大的行列转换开销,通过白名单机制忽略这种算子,防止全部回退到CPU,达到加速目的。

Metrics

RACE会收集DPU执行过程中的指标统计,然后上报给Spark的Metrics System做展示,以方便Debug和系统调优。

Native Read&Write

SparkSQL的Scan算子支持列式读取,但是Spark的向量与DPU中定义的向量不兼容,需要在JVM中进行一次列转行然后拷贝到DPU中,这会造成巨大的IO开销。我们主要有以下优化:

1. 减少行列转换:对于Parquet格式等列式存储格式的文件读取,SparkSQL采用的是按列读取的方式,即Scan算子是列式算子,但是后续数据过滤等数据处理算子均是基于行的算子,SparkSQL必须把列式数据转换为行式数据,这会导致额外的计算开销。而本方案由于都是列式计算的算子,因此无需这种行列转换。

2. 减少内存拷贝: RACE卸载Scan算子到HADOS平台,HADOS平台的DPUScan算子以Native库的方式加载磁盘数据直接复制到DPU,省去了JVM到DPU的拷贝开销

3. 谓词下推支持:DPUScan也支持ColumnPruning规则,这个规则会确保只有真正使用到的字段才会从这个数据源中提取出来。支持两种Filter:PartitionFilters和PushFilters。PartitionFilters可以过滤掉无用的分区, PushFilters把字段直接下推到Parquet文件中去

4. 同时,文件的写出也进行了类似的优化

注意,这些优化仍然需要对数据进行一次复制,DPU直接读取磁盘是一个后续的优化方向。

加速效果

TPC-DS 单Query加速

单机单线程local模式场景,在1T数据集下,TPC-DS语句中有5条语句E2E时间提升比例超过2倍,最高达到4.56倍:

运算符加速效果

运算符的性能提升,DPU运算符相比Spark原生的运算符的加速比最高达到9.97。

算子加速效果

TPC-DS的测试中,向对于原生Spark解决方案,本方案Filter算子性能最高提高到了43倍,哈希聚合算子提升了13倍。这主要是因为我们节省了列式数据转换为行式数据的开销以及DPU运算的加速。

CPU资源使用情况

CPU资源从平均60%下降到5%左右

原生Spark方案CPU使用情况:

基于RACE和DPU加速后,CPU使用情况:

总结与展望

通过把Spark的计算卸载到DPU加速器上,在用户原有代码无需变更的情况下,端到端的性能可以得到2-5倍的提升,某些算子能达到43倍性能提升,同时CPU资源使用率从60%左右下降到5%左右,显著提升了原生SparkSQL的执行效率。DPU展现了强大的计算能力,对于端到端的分析,会有一些除去算子之外的因素影响整体运行时间,包括磁盘IO,网络Shuffle以及调度的Overhead。这些影响因素将来可以逐步去做特定的优化,例如:

1. 算子的Pipeline执行

原生Spark的算子Pipeline执行以及CodeGen都是Spark性能提升的关键技术,当前,我们卸载到DPU中的计算还没有支持Pipeline以及CodeGen。未来这两个技术的加入,是继续提升Spark的执行效率的一个方向。

2. 读数据部分,通过DPU卡直读磁盘数据来做优化

我们还可以通过DPU卡直接读取硬盘数据,省去主机DDR到DPU卡DDR的数据传输时间,以达到性能提升的效果,可以参考英伟达的GPU对磁盘读写的优化,官方数据CSV格式的文件读取可优化20倍左右。

3. RDMA技术继续提升Shuffle性能

对于Shuffle占比很高的作业,可以通过内存Shuffle以及RDMA技术,来提升整个Shuffle的过程,目前已经实现内存Shuffle,未来我们还可以通过RDMA技术直读远端内存数据,从而完成整个Shuffle链路的优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/699007.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springsecurity+vue前后端分离适配cas认证的跨域问题

0. cas服务搭建参考:CAS 5.3服务器搭建_cas-overlay-CSDN博客 1. 参照springsecurity适配cas的方式, 一直失败, 无奈关闭springssecurity认证 2. 后端服务适配cas: 参考前后端分离项目(springbootvue)接入单点登录cas_前后端分离做cas单点登录-CSDN博客 1) 引入maven依赖 …

鸿蒙LiteOS-M 内核初始化

目录 一、LiteOS-M 初始化内核二、LOS_KernelInit代码分析三、LOS_Start代码解析坚持就有收获 一、LiteOS-M 初始化内核 在LiteOS-M应用程序中,系统初始化如下: /*** brief This is the ohos entry, and you could call this in your main funciton af…

2024如何恢复旧版的Chrome的主题样式

起因 chrome 更新版本之后的主题样式变成了浅紫色的页签卡样式,感觉很不习惯,也很不喜欢 如何换回旧版主题 通过主题商店,安装旧版本的主题 主题商店搜索下面,或着直接访问下面的地址 Chrome Original White Theme https://…

vue手写卡片切换,并且点击获取到卡片信息

需求:做一个卡片样式的列表,之后有一些基本信息,之后卡片选中后样式不一样,默认选中第一个卡片,点击卡片后可以获取到卡片的信息 一、效果 二、关键代码 index默认重0开始,activeTable默认为0,0-0等于0,但…

Facebook的数字社交使命:连接世界的下一步

在数字化时代,社交媒体已成为人们生活的重要组成部分,而Facebook作为其中最具影响力的平台之一,一直以来都在努力履行着自己的使命——连接世界。然而,随着时代的变迁和技术的发展,Facebook正在不断探索着连接世界的下…

Kafka:kafka的技术架构? ①

一、Kafka的优势 Apache Kafka是一个开放源代码的分布式事件流平台,成千上万的公司使用它来实现高性 能数据管道,流分析,数据集成和关键任务等相关的应用程序。 二、技术架构 0)partition分区可以设置备份数,也可以设…

Go 如何按行读取(大)文件?尝试 bufio 包提供的几种方式

嗨,大家好!我是波罗学。本文是系列文章 Go 技巧第十七篇,系列文章查看:Go 语言技巧。 本文将介绍 Go 如何按行读取文件,基于此会逐步延伸到如何按块读取文件。 引言 我们将要介绍的按行读取文件的方式其实是非常适合…

C# 1.消息队列MQ使用场景--图文解析

为什么使用消息队列MQ(Message Queue)? 消息队列有什么优点和缺点? Kafka(大数据日志采集)、ActiveMQ(最早的MQ--目前使用较少)、RabbitMQ(开源,中小型企业使用足够)、RocketMQ(阿里开发,大型企业适用) 都…

125 Linux C++ 系统编程4 Linux 静态库制作,动态库制作,静态库和动态库对比。静态库运行时找不到库的bug fix

一 静态库 和动态库 对比 静态库的原理:假设我们有一个 静态库,大小为500M,这个静态库实现了一些打牌的逻辑算法,提供了一堆API,让开发者 可以轻松的实现 54张扑克牌的随机发牌,指定发牌等功能。 我们写了…

Python入门学习——基础语法

一、Python解释器 1. Python解释器的作用是&#xff1a; 将Python代码翻译成计算机认识的O和1并提交计算机执行在解释器环境内可以一行行的执行我们输入的代码也可以使用解释器程序&#xff0c;去执行".py"代码文件 2. Python解释器程序在&#xff1a; <Python…

google浏览器chrome无法访问localhost等本地虚拟域名的解决方法

场景一&#xff1a; 谷歌浏览器访问出现&#xff1a;forbbiden 403 问题&#xff0c;或者直接跳转到正式域名(非本地虚拟域名) 访问本地的虚拟域名http://www.hd.com/phpinfo.php?p1发生了302 条状 火狐浏览器正常访问; 解决方法&#xff1a; 方法1&#xff1a;在谷歌浏览器…

Linux的ACL权限以及特殊位和隐藏属性

前言&#xff1a; ACL是什么&#xff1f; ACL&#xff08;Access Control List&#xff09;是一种权限控制机制&#xff0c;用于在Linux系统中对文件和目录进行细粒度的访问控制。传统的Linux权限控制机制基于所有者、所属组和其他用户的三个权限类别&#xff08;读、写、执行…

转运机器人,AGV底盘小车:打造高效、精准的汽车电子生产线

为了满足日益增长的市场需求&#xff0c;保持行业领先地位&#xff0c;某汽车行业电子产品企业引入富唯智能AMR智能搬运机器人及其智能物流解决方案&#xff0c;采用自动化运输措施优化生产节拍和搬运效率&#xff0c;企业生产效率得到显著提升。 项目背景&#xff1a; 1、工厂…

milvus Delete api写s3的流程

Delete api写s3的流程 milvus版本:v2.3.2 整体架构: Delete 的数据流向 delete相关配置 dataNode:segment:insertBufSize: 16777216 # Max buffer size to flush for a single segment.deleteBufBytes: 67108864 # Max buffer size to flush del for a single channelsyncPe…

汽修专用产品---选型介绍 汽修示波器 汽车示波器 汽车电子 汽修波形 汽车传感器波形 汽车检测

为了满足汽车电子用户的测量需求&#xff0c;我司特推出汽修专用版示波器&#xff0c;一键测量&#xff0c;轻松找出汽车问题。 LOTO各种型号的示波器其实都可以用作汽车传感器信号波形的检测。汽修应用中&#xff0c;工程师对示波器的性能要求对于LOTO产品来说不算高。 在我们…

《TCP/IP详解 卷一》第6章 DHCP

目录 6.1 引言 6.2 DHCP 6.2.1 地址池和租用 6.2.2 DHCP和BOOTP消息格式 6.2.3 DHCP和BOOTP选项 6.2.4 DHCP协议操作 6.2.5 DHCPv6 6.2.6 DCHP中继 6.2.7 DHCP认证 6.2.8 重新配置扩展 6.2.9 快速确认 6.2.10 位置信息&#xff08;LCI和LoST&#xff09; 6.2.11 移…

Unity Shader ASE基础效果思路与代码(二):边缘光、扰动火焰

Unity Shader ASE基础效果思路与代码(二)&#xff1a;边缘光、扰动火焰 这里写目录标题 Unity Shader ASE基础效果思路与代码(二)&#xff1a;边缘光、扰动火焰边缘光效果展示&#xff1a;代码与思路&#xff1a; 扰动火焰效果展示&#xff1a;代码与思路&#xff1a; 边缘光 …

snmp协议开通教程

目录 一、什么是snmp协议&#xff1f; 二、snmp协议可以用来干什么&#xff1f; 三、snmp协议的开通 1、snmpv2协议开通 2、snmpv3协议开通 一、什么是snmp协议&#xff1f; SNMP&#xff08;Simple Network Management Protocol&#xff09;是一种用于网络管理的标准协议&a…

STM32物联网(封装AT指令进行TCP连接及数据的接收和发送)

文章目录 前言一、AT指令函数封装1.向ESP8266发送数据函数2.设置ESP8266工作模式3.连接WIFI函数4.查询IP地址5.连接TCP服务器6.发送数据到TCP服务器7.接收并解析来自TCP服务器的数据8.关闭TCP服务器 二、代码测试总结 前言 本篇文章将继续带大家学习STM32物联网&#xff0c;那…

Vue 2.0 中的 Vuex Store 状态管理器核心概念和组成部分

Vue 2.0 中的 Vuex Store 状态管理器核心概念和组成部分 State&#xff08;状态&#xff09;&#xff1a; Vuex Store 的核心就是集中式存储应用的所有组件的状态。它是一个单一状态树&#xff0c;所有的组件都从这个状态树中读取数据并可以响应状态的变化。 const state {c…