阿里云RemoteShuffleService新功能:AQE和流控

简介:阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮助了诸多客户解决Spark作业的性能、稳定性问题,并使得存算分离架构得以实施。为了更方便大家使用和扩展,RSS在2022年初开源,欢迎各路开发者共建。本文将介绍RSS最新的两个重要功能:支持Adaptive Query Execution(AQE),以及流控。

作者 | 一锤、明济
来源 | 阿里开发者公众号

阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮助了诸多客户解决Spark作业的性能、稳定性问题,并使得存算分离架构得以实施。为了更方便大家使用和扩展,RSS在2022年初开源,欢迎各路开发者共建。RSS的整体架构请参考[1],本文将介绍RSS最新的两个重要功能:支持Adaptive Query Execution(AQE),以及流控。

一 RSS支持AQE

1 AQE简介

自适应执行(Adaptive Query Execution, AQE)是Spark3的重要功能[2],通过收集运行时Stats,来动态调整后续的执行计划,从而解决由于Optimizer无法准确预估Stats导致生成的执行计划不够好的问题。AQE主要有三个优化场景: Partition合并(Partition Coalescing), Join策略切换(Switch Join Strategy),以及倾斜Join优化(Optimize Skew Join)。这三个场景都对Shuffle框架的能力提出了新的需求。

Partition合并

Partition合并的目的是尽量让reducer处理的数据量适中且均匀,做法是首先Mapper按较多的Partition数目进行Shuffle Write,AQE框架统计每个Partition的Size,若连续多个Partition的数据量都比较小,则将这些Partition合并成一个,交由一个Reducer去处理。过程如下所示。

由上图可知,优化后的Reducer2需读取原属于Reducer2-4的数据,对Shuffle框架的需求是ShuffleReader需要支持范围Partition:

def getReader[K, C](handle: ShuffleHandle,startPartition: Int,endPartition: Int,context: TaskContext): ShuffleReader[K, C]

Join策略切换

Join策略切换的目的是修正由于Stats预估不准导致Optimizer把本应做的Broadcast Join错误的选择了SortMerge Join或ShuffleHash Join。具体而言,在Join的两张表做完Shuffle Write之后,AQE框架统计了实际大小,若发现小表符合Broadcast Join的条件,则将小表Broadcast出去,跟大表的本地Shuffle数据做Join。流程如下:

Join策略切换有两个优化:1. 改写成Broadcast Join; 2. 大表的数据通过LocalShuffleReader直读本地。其中第2点对Shuffle框架提的新需求是支持Local Read。

倾斜Join优化

倾斜Join优化的目的是让倾斜的Partition由更多的Reducer去处理,从而避免长尾。具体而言,在Shuffle Write结束之后,AQE框架统计每个Partition的Size,接着根据特定规则判断是否存在倾斜,若存在,则把该Partition分裂成多个Split,每个Split跟另外一张表的对应Partition做Join。如下所示。

Partiton分裂的做法是按照MapId的顺序累加他们Shuffle Output的Size,累加值超过阈值时触发分裂。对Shuffle框架的新需求是ShuffleReader要能支持范围MapId。综合Partition合并优化对范围Partition的需求,ShuffleReader的接口演化为:

def getReader[K, C](handle: ShuffleHandle,startMapIndex: Int,endMapIndex: Int,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]

2 RSS架构回顾

RSS的核心设计是Push Shuffle + Partition数据聚合,即不同的Mapper把属于同一个Partition的数据推给同一个Worker做聚合,Reducer直读聚合后的文件。如下图所示。

在核心设计之外,RSS还实现了多副本,全链路容错,Master HA,磁盘容错,自适应Pusher,滚动升级等特性,详见[1]。

3 RSS支持Partition合并

Partition合并对Shuffle框架的需求是支持范围Partition,在RSS中每个Partition对应着一个文件,因此天然支持,如下图所示。

4 RSS支持Join策略切换

Join策略切换对Shuffle框架的需求是能够支持LocalShuffleReader。由于RSS的Remote属性,数据存放在RSS集群,仅当RSS和计算集群混部的场景下才会存在在本地,因此暂不支持Local Read(将来会优化混部场景并加以支持)。需要注意的是,尽管不支持Local Read,但并不影响Join的改写,RSS支持Join改写优化如下图所示。

5 RSS支持Join倾斜优化

在AQE的三个场景中,RSS支持Join倾斜优化是最为困难的一点。RSS的核心设计是Partition数据聚合,目的是把Shuffle Read的随机读转变为顺序读,从而提升性能和稳定性。多个Mapper同时推送给RSS Worker,RSS在内存聚合后刷盘,因此Partition文件中来自不同Mapper的数据是无序的,如下图所示。

Join倾斜优化需要读取范围Map,例如读Map1-2的数据,常规的做法有两种:

  1. 读取完整文件,并丢弃范围之外的数据。
  2. 引入索引文件,记录每个Block的位置及所属MapId,仅读取范围内的数据。

这两种做法的问题显而易见。方法1会导致大量冗余的磁盘读;方法2本质上回退成了随机读,丧失了RSS最核心的优势,并且创建索引文件成为通用的Overhead,即使是针对非倾斜的数据(Shuffle Write过程中难以准确预测是否存在倾斜)。

为了解决以上两个问题,我们提出了新的设计:主动Split + Sort On Read。

主动Split

倾斜的Partition大概率Size非常大,极端情况会直接打爆磁盘,即使在非倾斜场景出现大Partition的几率依然不小。因此,从磁盘负载均衡的角度,监控Partition文件的Size并做主动Split(默认阈值256m)是非常必要的。

Split发生时,RSS会为当前Partition重新分配一对Worker(主副本),后续数据将推给新的Worker。为了避免Split对正在运行的Mapper产生影响,我们提出了Soft Split的方法,即当触发Split时,RSS异步去准备新的Worker,Ready之后去热更新Mapper的PartitionLocation信息,因此不会对Mapper的PushData产生任何干扰。整体流程如下图所示。

Sort On Read

为了避免随机读的问题,RSS采用了Sort On Read的策略。具体而言,File Split的首次Range读会触发排序(非Range读不会触发),排好序的文件连同其位置索引写回磁盘。后续的Range读即可保证是顺序读取。如下图所示。

为了避免多个Sub-Reducer等待同一个File Split的排序,我们打散了各个Sub-Reducer读取Split的顺序,如下图所示。

Sort优化

Sort On Read可以有效避免冗余读和随机读,但需要对Split File(256m)做排序,本节讨论排序的实现及开销。文件排序包括3个步骤:读文件,对MapId做排序,写文件。RSS的Block默认256k,Block的数量大概是1000,因此排序的过程非常快,主要开销在文件读写。整个排序过程大致有三种方案:

  1. 预先分配文件大小的内存,文件整体读入,解析并排序MapId,按MapId顺序把Block写回磁盘。
  2. 不分配内存,Seek到每个Block的位置,解析并排序MapId,按MapId顺序把原文件的Block transferTo新文件。
  3. 分配小块内存(如256k),顺序读完整个文件并解析和排序MapId,按MapId顺序把原文件的Block transferTo新文件。

从IO的视角,乍看之下,方案1通过使用足量内存,不存在顺序读写;方案2存在随机读和随机写;方案3存在随机写;直观上方案1性能更好。然而,由于PageCache的存在,方案3在写文件时原文件大概率缓存在PageCache中,因此实测下来方案3的性能更好,如下图所示。

同时方案3无需占用进程额外内存,故RSS采用方案3的算法。我们同时还测试了Sort On Read跟上述的不排序、仅做索引的随机读方法的对比,如下图所示。

整体流程

RSS支持Join倾斜优化的整体流程如下图所示。

二 RSS流控

流控的主要目的是防止RSS Worker内存被打爆。流控通常有两种方式:

  1. Client在每次PushData前先向Worker预留内存,预留成功才触发Push。
  2. Worker端反压。

由于PushData是非常高频且性能关键的操作,若每次推送都额外进行一次RPC交互,则开销太大,因此我们采用了反压的策略。以Worker的视角,流入数据有两个源:

  1. Client推送的数据
  2. 主副本发送的数据

如下图所示,Worker2既接收来自Mapper推送的Partition3的数据,也接收Worker1发送的Partition1的副本数据,同时会把Partition3的数据发给对应的从副本。

其中,来自Mapper推送的数据,当且仅当同时满足以下条件时才会释放内存:

  1. Replication执行成功
  2. 数据写盘成功

来自主副本推送的数据,当且仅当满足以下条件时才会释放内存:

数据写盘成功

我们在设计流控策略时,不仅要考虑限流(降低流入的数据),更要考虑泄流(内存能及时释放)。具体而言,高水位我们定义了两档内存阈值(分别对应85%和95%内存使用),低水位只有一档(50%内存使用)。达到高水位一档阈值时,触发流控,暂停接收Mapper推送的数据,同时强制刷盘,从而达到泄流的目标。仅限制来自Mapper的流入并不能控制来自主副本的流量,因此我们定义了高水位第二档,达到此阈值时将同时暂停接收主副本发送的数据。当水位低于低水位后,恢复正常状态。整体流程如下图所示。

三 性能测试

我们对比了RSS和原生的External Shufle Service(ESS)在Spark3.2.0开启AQE的性能。RSS采用混部的方式,没有额外占用任何机器资源。此外,RSS所使用的内存为8g,仅占机器内存的2.3%(机器内存352g)。具体环境如下。

1 测试环境

硬件:

header 机器组 1x ecs.g5.4xlarge
worker 机器组 8x ecs.d2c.24xlarge,96 CPU,352 GB,12x 3700GB HDD。

Spark AQE相关配置:

spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.initialPartitionNum 1000
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.localShuffleReader.enabled false

RSS相关配置:

RSS_MASTER_MEMORY=2g
RSS_WORKER_MEMORY=1g
RSS_WORKER_OFFHEAP_MEMORY=7g

2 TPCDS 10T测试集

我们测试了10T的TPCDS,E2E来看,ESS耗时11734s,RSS单副本/两副本分别耗时8971s/10110s,分别比ESS快了23.5%/13.8%,如下图所示。我们观察到RSS开启两副本时网络带宽达到上限,这也是两副本比单副本低的主要因素。

具体每个Query的时间对比如下:

相关链接

欢迎各位开发者参与讨论和共建!

github地址:

https://github.com/alibaba/RemoteShuffleService

Reference

[1]阿里云EMR Remote Shuffle Service在小米的实践,以及开源. 阿里云EMR Remote Shuffle Service在小米的实践,以及开源-阿里云开发者社区

[2]Adaptive Query Execution: Speeding Up Spark SQL at Runtime. How to Speed up SQL Queries with Adaptive Query Execution

原文链接

本文为阿里云原创内容,未经允许不得转载。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/510982.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenSergo 正式开源,多家厂商共建微服务治理规范和实现

简介 OpenSergo,Open 是开放的意思,Sergo 则是取了服务治理两个英文单词 Service Governance 的前部分字母 Ser 和 Go,合起来即是一个开放的服务治理项目。 该项目由阿里云、bilibili、字节跳动,以及 Spring Cloud Alibaba、Nacos…

Linux中常见的网络端口号

在使用计算机或智能手机时,我们使用各种硬件端口,如用于音频、HDMI、Type-c 等的 3.5mm 端口,其用于与硬件外围设备通信。同样,网络端口可以让我们在一台计算机上访问各种网络服务来达到通信的目的。因此,端口被认为是…

如何在云原生混部场景下利用资源配额高效分配集群资源?

简介:由于混部是一个复杂的技术及运维体系,包括 K8s 调度、OS 隔离、可观测性等等各种技术,之前的一篇文章《历经 7 年双 11 实战,阿里巴巴是如何定义云原生混部调度优先级及服务质量的?》,主要聚焦在调度优…

稳定性与高可用保障的工作思路

简介:稳定性与高可用性是老生常谈的两个词。凭借经验和感受我们知道,提高系统的这两项指标,系统会更加健康,产品也会有更好的用户体验。但是如果要给稳定性和高可用性下一个定义该如何表述?稳定性和高可用性这二者又有…

腾讯云数据库自研内核全新升级 新架构比原先性能提升20%

9月16日,腾讯云数据库自研内核TXSQL完成了全新升级。搭载了新一代内核的云数据库在性能上较之前提升超过20%,延迟降低80%。目前,新一代内核TXSQL已应用于腾讯云数据库MySQL、分布式数据库TDSQL、云原生数据库TDSQL-C等多款主流云数据库产品中…

系统性能分析从入门到进阶

简介:本文以系统为中心, 结合日常工作和用例, 由浅入深地介绍了性能分析的一些方法和体会, 希望对想了解系统性能分析的同学有所帮助。 作者 | 勿非 本文以系统为中心, 结合日常工作和用例, 由浅入深地介绍了性能分析的一些方法和体会, 希望对想了解系统性能分析的…

宜搭小技巧|第一时间看到审批进度?消息通知来帮你

简介:「消息通知」自动发送,再也不用担心错过流程审批进度! 今天,宜小搭要申请出差,为了第一时间获取审批进度,他频繁刷新审批页面,这样既麻烦同时也浪费了大量时间,影响其他工作。…

一看即会:Serverless 应用开发的 7 个实战小技巧

简介:干货满满,马住收藏! Serverless 应用开发的 7 个经验心得 作者说:Serverless 架构下的应用开发,与传统架构的应用开发还是有比较大的区别点的,例如天然分布式架构会让很多框架丧失一定的"便利性…

如何使用 Serverless Devs 部署静态网站到函数计算

简介:手把手教你:如何使用 Serverless Devs 部署静态网站到函数计算。 前言 公司经常有一些网站需要发布上线,对比了几款不同的产品后,决定使用阿里云的函数计算(FC)来托管构建出来的静态网站。 FC 弹性实例自带的50…

一个好的科技公司logo长这样

简介:一个好的科技logo能体现出行业独有的专业性和技术优势,让你的公司科技感加满! 近年来,越来越多的初创公司崭露头角,其中科技互联网公司的比重非常高。小云也收到很多朋友的留言,询问科技类公司应该怎…

系列解读 SMC-R (二):融合 TCP 与 RDMA 的 SMC-R 通信 | 龙蜥技术

简介:本篇以 first contact (通信两端建立首个连接) 场景为例,介绍 SMC-R 通信流程。 文/龙蜥社区高性能网络SIG 一、引言 通过上一篇文章 《系列解读SMC-R:透明无感提升云上 TCP 应用网络性能(一)》我们了解到&…

北京大学、阿里巴巴成立联合实验室,聚焦人工智能理论和创新算法研究

9月17日,在北京大学智能学科建设20周年大会上,北京大学和阿里巴巴共同宣布成立“北大-阿里妈妈人工智能创新联合实验室” (以下简称实验室)。实验室将聚焦人工智能前沿领域的理论、方法与关键技术展开研究,为社会和企业…

智能开放搜索上线定制分词器

简介:智能开放搜索上线定制召回模型-定制分词器功能,满足各行业、垂类、业务特殊,对搜索有较高分词要求的客户,提升语义理解能力,精准召回用户搜索意图。 NLP算法在搜索链路中的应用 这是一个完整的从查询词到搜索结…

云端渲染时长1.58亿核小时,阿里云助力国漫巨制《新神榜:杨戬》提升视效

当前,追光动画新作《新神榜:杨戬》(以下简称“杨戬”)正在热映,制作水准再次升级。无论是“水墨特效太极图大战”,亦或神女婉罗的灵动舞姿,还是元神现身的超燃瞬间,都极具视觉震撼。…

如何开一场高效的迭代排期会 | 敏捷开发落地指南

简介:如何开一场高效的迭代排期会,高效落地敏捷开发,先从这3个关键活动着手,通过本文你将了解到什么是敏捷开发、什么是双周迭代、如何高效地开展排期会,以及如何在云效项目协作Projex 中落地排期会相关事宜。 摘要&a…

Linux 中如何获取文件的绝对路径

我们都知道,在命令行可以使用 pwd 命令来获取当前目录的完整路径(绝对路径):pwd那么,如何获取文件的绝对路径呢?有下列几种方法,可以打印文件的完整路径:readlinkrealpathfindls 和 …

EasyCV开源|开箱即用的视觉自监督+Transformer算法库

简介:EasyCV是阿里巴巴开源的基于Pytorch,以自监督学习和Transformer技术为核心的 all-in-one 视觉算法建模工具。EasyCV在阿里巴巴集团内支撑了搜索、淘系、优酷、飞猪等多个BU业务,同时也在阿里云上服务了若干企业客户,通过平台…

开源数据库为什么能捕获开发者的心?

【CSDN 编者按】开源数据库的重要性,早就不言而喻。早期的自由软件开发者和初创公司,很多都受益于开源数据库。伴随着曾经的初创公司羽翼逐渐丰满,它们的开发者文化渗透到整个生态系统中,更多的人开始关注这些初创公司采取的方法&…

“消息驱动、事件驱动、流 ”基础概念解析

简介:本文旨在帮助大家对近期消息领域的高频词“消息驱动(Message-Driven),事件驱动(Event-Driven)和流(Streaming)”有更清晰的了解和认知,其中事件驱动 EDA 作为 Gartn…

KubeVela 1.3 发布:开箱即用的可视化应用交付平台,引入插件生态、权限认证、版本化等企业级新特性

简介:得益于 KubeVela 社区上百位开发者的参与和 30 多位核心贡献者的 500 多次代码提交, KubeVela 1.3 版本正式发布。相较于三个月前发布的 v1.2 版本[1],新版本在 OAM 核心引擎(Vela Core),可视化应用交…