阿里云RemoteShuffleService 新功能:AQE 和流控

简介:阿里云EMR 自2020年推出 Remote Shuffle Service(RSS)以来,帮助了诸多客户解决 Spark 作业的性能、稳定性问题,并使得存算分离架构得以实施。为了更方便大家使用和扩展,RSS 在2022年初开源(https://github.com/alibaba/RemoteShuffleService),欢迎各路开发者共建: )

阿里云RemoteShuffleService 新功能:AQE 和流控

阿里云EMR 自2020年推出 Remote Shuffle Service(RSS) 以来,帮助了诸多客户解决 Spark 作业的性能、稳定性问题,并使得存算分离架构得以实施。为了更方便大家使用和扩展,RSS 在2022年初开源(https://github.com/alibaba/RemoteShuffleService),欢迎各路开发者共建: ) RSS的整体架构请参考[1],本文将介绍 RSS 最新的两个重要功能:支持 Adaptive Query Execution(AQE),以及流控。

RSS 支持 AQE

AQE 简介

自适应执行(Adaptive Query Execution, AQE)是 Spark3 的重要功能[2],通过收集运行时 Stats,来动态调整后续的执行计划,从而解决由于 Optimizer 无法准确预估 Stats导致生成的执行计划不够好的问题。AQE 主要有三个优化场景: Partition 合并(Partition Coalescing), Join 策略切换(Switch Join Strategy),以及倾斜 Join 优化(Optimize Skew Join)。这三个场景都对 Shuffle 框架的能力提出了新的需求。

Partition 合并

Partition 合并的目的是尽量让 reducer 处理的数据量适中且均匀,做法是首先 Mapper按较多的 Partition 数目进行 Shuffle Write,AQE 框架统计每个 Partition 的 Size,若连续多个 Partition 的数据量都比较小,则将这些 Partition 合并成一个,交由一个 Reducer 去处理。过程如下所示。

由上图可知,优化后的 Reducer2 需读取原属于 Reducer2-4 的数据,对 Shuffle 框架的需求是 ShuffleReader 需要支持范围 Partition:

def getReader[K, C](handle: ShuffleHandle,startPartition: Int,endPartition: Int,context: TaskContext): ShuffleReader[K, C]

Join 策略切换

Join 策略切换的目的是修正由于 Stats 预估不准导致 Optimizer 把本应做的 Broadcast Join 错误的选择了 SortMerge Join 或 ShuffleHash Join。具体而言,在 Join 的两张表做完 Shuffle Write 之后,AQE 框架统计了实际大小,若发现小表符合 Broadcast Join 的条件,则将小表 Broadcast 出去,跟大表的本地 Shuffle 数据做 Join。流程如下:

Join 策略切换有两个优化:1. 改写成 Broadcast Join; 2. 大表的数据通过LocalShuffleReader 直读本地。其中第2点对 Shuffle 框架提的新需求是支持 Local Read。

倾斜Join优化

倾斜Join优化的目的是让倾斜的 Partition 由更多的 Reducer 去处理,从而避免长尾。具体而言,在 Shuffle Write 结束之后,AQE 框架统计每个 Partition 的 Size,接着根据特定规则判断是否存在倾斜,若存在,则把该 Partition 分裂成多个 Split,每个 Split 跟另外一张表的对应 Partition 做 Join。如下所示。

Partiton 分裂的做法是按照 MapId 的顺序累加他们 Shuffle Output 的 Size,累加值超过阈值时触发分裂。对 Shuffle 框架的新需求是 ShuffleReader 要能支持范围 MapId。综合 Partition 合并优化对范围 Partition 的需求,ShuffleReader 的接口演化为:

def getReader[K, C](handle: ShuffleHandle,startMapIndex: Int,endMapIndex: Int,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]

RSS 架构回顾

RSS 的核心设计是 Push Shuffle + Partition 数据聚合,即不同的 Mapper 把属于同一个 Partition 的数据推给同一个 Worker 做聚合,Reducer 直读聚合后的文件。如下图所示。

在核心设计之外,RSS 还实现了多副本,全链路容错,Master HA,磁盘容错,自适应Pusher,滚动升级等特性,详见[1]。

RSS 支持 Partition 合并

Partition 合并对 Shuffle 框架的需求是支持范围 Partition,在 RSS 中每个 Partition 对应着一个文件,因此天然支持,如下图所示。

RSS 支持 Join 策略切换

Join 策略切换对 Shuffle 框架的需求是能够支持 LocalShuffleReader。由于 RSS 的 Remote 属性,数据存放在 RSS 集群,仅当 RSS 和计算集群混部的场景下才会存在在本地,因此暂不支持 Local Read(将来会优化混部场景并加以支持)。需要注意的是,尽管不支持 Local Read,但并不影响 Join 的改写,RSS 支持 Join 改写优化如下图所示。

RSS 支持 Join 倾斜优化

在 AQE 的三个场景中,RSS 支持 Join 倾斜优化是最为困难的一点。RSS 的核心设计是 Partition 数据聚合,目的是把 Shuffle Read 的随机读转变为顺序读,从而提升性能和稳定性。多个 Mapper 同时推送给 RSS Worker,RSS 在内存聚合后刷盘,因此 Partition 文件中来自不同 Mapper 的数据是无序的,如下图所示。

Join 倾斜优化需要读取范围 Map,例如读 Map1-2的数据,常规的做法有两种:

  1. 读取完整文件,并丢弃范围之外的数据。
  2. 引入索引文件,记录每个 Block 的位置及所属 MapId,仅读取范围内的数据。

这两种做法的问题显而易见。方法1会导致大量冗余的磁盘读;方法2本质上回退成了随机读,丧失了 RSS 最核心的优势,并且创建索引文件成为通用的 Overhead,即使是针对非倾斜的数据( Shuffle Write 过程中难以准确预测是否存在倾斜)。

为了解决以上两个问题,我们提出了新的设计:主动 Split + Sort On Read。

主动Split

倾斜的 Partition 大概率 Size 非常大,极端情况会直接打爆磁盘,即使在非倾斜场景出现大 Partition 的几率依然不小。因此,从磁盘负载均衡的角度,监控 Partition 文件的 Size 并做主动 Split (默认阈值256m)是非常必要的。

Split 发生时,RSS 会为当前 Partition 重新分配一对 Worker(主副本),后续数据将推给新的 Worker。为了避免 Split 对正在运行的 Mapper 产生影响,我们提出了 Soft Split 的方法,即当触发 Split 时,RSS 异步去准备新的 Worker,Ready 之后去热更新 Mapper 的 PartitionLocation 信息,因此不会对 Mapper 的 PushData 产生任何干扰。整体流程如下图所示。

Sort On Read

为了避免随机读的问题,RSS 采用了 Sort On Read 的策略。具体而言,File Split 的首次 Range 读会触发排序(非 Range 读不会触发),排好序的文件连同其位置索引写回磁盘。后续的 Range 读即可保证是顺序读取。如下图所示。

为了避免多个 Sub-Reducer 等待同一个 File Split 的排序,我们打散了各个 Sub-Reducer 读取 Split 的顺序,如下图所示。

Sort 优化

Sort On Read 可以有效避免冗余读和随机读,但需要对 Split File(256m)做排序,本节讨论排序的实现及开销。文件排序包括3个步骤:读文件,对 MapId 做排序,写文件。RSS 的 Block 默认256k,Block 的数量大概是1000,因此排序的过程非常快,主要开销在文件读写。整个排序过程大致有三种方案:

  1. 预先分配文件大小的内存,文件整体读入,解析并排序 MapId,按 MapId 顺序把 Block 写回磁盘。
  2. 不分配内存,Seek 到每个 Block 的位置,解析并排序 MapId,按 MapId 顺序把原文件的 Block transferTo 新文件。
  3. 分配小块内存(如256k),顺序读完整个文件并解析和排序MapId,按MapId顺序把原文件的Block transferTo新文件。

从 IO 的视角,乍看之下,方案1通过使用足量内存,不存在顺序读写;方案2存在随机读和随机写;方案3存在随机写;直观上方案1性能更好。然而,由于 PageCache 的存在,方案3在写文件时原文件大概率缓存在 PageCache 中,因此实测下来方案3的性能更好,如下图所示。

同时方案3无需占用进程额外内存,故 RSS 采用方案3的算法。我们同时还测试了 Sort On Read 跟上述的不排序、仅做索引的随机读方法的对比,如下图所示。

整体流程

RSS 支持 Join 倾斜优化的整体流程如下图所示。

RSS流控

流控的主要目的是防止 RSS Worker 内存被打爆。流控通常有两种方式:

  1. Client 在每次 PushData 前先向 Worker 预留内存,预留成功才触发 Push。
  2. Worker 端反压。

由于 PushData 是非常高频且性能关键的操作,若每次推送都额外进行一次 RPC 交互,则开销太大,因此我们采用了反压的策略。以 Worker 的视角,流入数据有两个源:

  1. Client 推送的数据
  2. 主副本发送的数据

如下图所示,Worker2 既接收来自 Mapper 推送的 Partition3 的数据,也接收 Worker1发送的 Partition1 的副本数据,同时会把 Partition3 的数据发给对应的从副本。

其中,来自 Mapper 推送的数据,当且仅当同时满足以下条件时才会释放内存:

  1. Replication 执行成功
  2. 数据写盘成功

来自主副本推送的数据,当且仅当满足以下条件时才会释放内存:

  1. 数据写盘成功

我们在设计流控策略时,不仅要考虑限流(降低流入的数据),更要考虑泄流(内存能及时释放)。具体而言,高水位我们定义了两档内存阈值(分别对应85%和95%内存使用),低水位只有一档(50%内存使用)。达到高水位一档阈值时,触发流控,暂停接收 Mapper 推送的数据,同时强制刷盘,从而达到泄流的目标。仅限制来自 Mapper 的流入并不能控制来自主副本的流量,因此我们定义了高水位第二档,达到此阈值时将同时暂停接收主副本发送的数据。当水位低于低水位后,恢复正常状态。整体流程如下图所示。

性能测试

我们对比了 RSS 和原生的 External Shufle Service(ESS) 在 Spark3.2.0 开启 AQE 的性能。RSS 采用混部的方式,没有额外占用任何机器资源。此外,RSS 所使用的内存为8g,仅占机器内存的2.3%(机器内存352g)。具体环境如下。

测试环境

硬件:

header 机器组 1x ecs.g5.4xlarge

worker 机器组 8x ecs.d2c.24xlarge,96 CPU,352 GB,12x 3700GB  HDD。

Spark AQE 相关配置:

spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.initialPartitionNum 1000
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.localShuffleReader.enabled false

RSS 相关配置:

RSS_MASTER_MEMORY=2g
RSS_WORKER_MEMORY=1g
RSS_WORKER_OFFHEAP_MEMORY=7g

TPCDS 10T测试集

我们测试了10T的 TPCDS,E2E 来看,ESS 耗时11734s,RSS 单副本/两副本分别耗时8971s/10110s,分别比 ESS 快了23.5%/13.8%,如下图所示。我们观察到 RSS 开启两副本时网络带宽达到上限,这也是两副本比单副本低的主要因素。

具体每个 Query 的时间对比如下:

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/510906.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用Delta Lake构建批流一体数据仓库

简介:Delta Lake是一个开源存储层,它为数据湖带来了可靠性。Delta Lake提供了ACID事务、可扩展的元数据处理,并统一了流式处理和批处理数据处理。Delta-Lake运行在现有数据湖之上,并且与Apache Spark API完全兼容。希望本篇能让大…

中国峰会|下一代云基础架构,赋能企业云上发展

点击上方入口立即【自由构建 探索无限】一起共赴年度科技盛宴!马上点击“阅读原文”了解更多亚马逊云科技中国峰会让我们共同见证亚马逊的一小步云计算的一大步扫码【立即报名】直通大咖云集的亚马逊云科技中国峰会!

Delta Lake基础介绍(商业版)

简介:介绍 Lakehouse 搜索引擎的设计思想,探讨其如何使用缓存,辅助数据结构,存储格式,动态文件剪枝,以及 vectorized execution 达到优越的处理性能。 作者:李洁杏,Databrick资深软…

云原生数仓如何破解大规模集群的关联查询性能问题?

简介:AnalyticDB for PostgreSQL(以下简称ADB PG)是一款PB级的MPP架构云原生数据仓库。本文从ADB PG架构设计的角度出发,探讨Runtime Filter在ADB PG中的实现方案,并介绍了基于Bloom Filter的ADB PG Dynamic Join Filter功能技术细节。 作者 …

独家对话Python之父:人类大脑才是软件开发效率的天花板

【CSDN 编者按】十五年前,《程序员》杂志曾专访过 Python 之父 Guido van Rossum,一起探讨了 Python 3.0 的较为明显的新特性,即增加了对中文( Unicode )的支持。十五年过去,Python 的版本号只前进了一个数字,但是 Pyt…

淘系用户平台技术团队单元测试建设

简介:单元测试是工程交付前质量保障的第一环,也无疑是软件工程质量保障的重要基石,有效的单元测试能够提前发现90%以上的代码Bug问题,同时也能防止代码的腐化,在工程重构演进时起到至关重要的作用。 作者 | 问元 来源 …

阿里云弹性计算对视觉计算的思考与实践

简介:利用人类已有和将有的技术加之商业手段,实现对人类感官体验进行全方位升级。 4月21日,“2022英伟达数字孪生技术应用论坛”上,阿里云弹性计算产品专家张新涛为大家带来了题为《阿里云弹性计算在XR业务上的应用实践》的主题分…

游戏行业弹性计算最佳实践

简介:本篇主要介绍三大游戏场景:游戏服务、大数据运营、云游戏的架构特点,以及基于这些场景下的阿里云游戏行业计算基础设施选型与部署方案。 文丨寻野,阿里云弹性计算产品解决方案架构师 摘要:游戏一直以来是互联网…

三大特性,多个场景,Serverless 应用引擎 SAE 全面升级

简介:Serverless 应用引擎 SAE 凭借着天然技术优势,已经帮助成千上万家企业实现容器和微服务技术转型。近日,SAE不仅进一步提供了全套微服务能力,更为传统 Job 和 PHP 用户提供了全新的,更高效、更经济且可平滑迁移的解…

代码覆盖率在性能优化上的一种可行应用

简介:JavaScript 是前端应用主要语言,相较于其他平台编程语言,JS资源多数情况下要通过网络进行加载,那么代码的体积直接影响了页面加载执行时间。“无效的代码”的多寡直接影响到了我们的代码质量,所以度量代码的执行覆…

MaxCompute湖仓一体介绍

简介:本篇内容分享了MaxCompute湖仓一体介绍。 分享人:孟硕 阿里云 MaxCompute产品专家 视频链接:数据智能实战营-北京站 专题回顾 正文: 本篇内容将通过两个部分来介绍MaxCompute湖仓一体。 一、什么是 MaxCompute 湖仓一体…

云原生离线实时一体化数仓建设与实践

简介:本篇内容分享了云原生离线实时一体化数仓建设与实践。 分享人:刘一鸣 Hologres 产品经理 视频链接:数据智能实战营-北京站 专题回顾 正文: 本篇内容将通过五个部分来介绍云原生离线实时一体化数仓建设与实践。 一、离线实…

议题征集|Flink Forward Asia 2022 正式启动

在这数据量爆炸性增长的时代,开源软件如雨后春笋般出现在开发者的视野中,数据的价值被重新定义。同时,越来越多的企业开启实时化道路,数据的实时分析与计算需求与日俱增。作为主打流处理的计算引擎 Apache Flink 于 2014 年正式开…

龙蜥正式开源 SysOM:百万级实战经验打造,一站式运维管理平台 | 龙蜥技术

简介:SysOM集监控、告警、诊断、修复、安全能力于一体的操作系统运维平台。 文/系统运维 SIG 如果你被突如其来的 OOPS 和满屏奇怪的函数弄得满头问号?机器内存明明很大,却申请不出来内存?业务周期抖动,ping 命令偶尔…

微软在华商业应用战略全面升级,首次推出面向医疗和生命科学的云行业套件

2022年9月29日,微软宣布进一步升级在华商业应用战略,落地一系列智能商业应用(Biz App)功能的同时,以Dynamics 365和Power Platform为基础,进一步完善商业应用战略与价值定位,助力更多客户和合作…

使用Databricks进行零售业需求预测的应用实践

简介:本文从零售业需求预测痛点、商店商品模型预测的实践演示,介绍Databricks如何助力零售商进行需求、库存预测,实现成本把控和营收增长。 作者:李锦桂 阿里云开源大数据平台开发工程师 本文从零售业需求预测痛点、商店商品模型…

龙蜥开源内核追踪利器 Surftrace:协议包解析效率提升 10 倍 | 龙蜥技术

简介:如何将网络报文与内核协议栈清晰关联起来精准追踪到关注的报文行进路径呢? 文/系统运维 SIG Surftrace 是由系统运维 SIG 推出的一个 ftrace 封装器和开发编译平台,让用户既能基于 libbpf 快速构建工程进行开发,也能作为 ft…

开源要正式写进法律了?

作者 | 何苗 出品 | CSDN(ID:CSDNnews)去年,当大家还在为开源的快速发展而欢呼之际,影响了全球数百万台计算机Log4j 漏洞事件给开源软件开发者与使用者敲响了一记警钟。因而今年,开源软件及其供应链安全…

阿里云软著申请|这项保护,让我得到了10万赔偿

简介:对于企业来说,申请软件著作权是证明自己和保护自己的强力护盾。除此之外,它还有着很多不可忽视的意义与价值。阿里云软著申请,一站式智能服务,助力企业和开发者高效发展,省时省力更省心。 前几日&…

宜搭小技巧|海量数据管理难?这招帮你事半功倍

简介:一键生成数据管理页,海量数据随心管理! 话接上回,宜小搭组织大家团建,当收集完大家的报名信息后,有小伙伴想要修改已提交的信息,面对海量的数据,整理查找太费时间。 如何快速…