Spark Shuffle Service简介与测试

一 Dynamic Resource Allocation(动态资源分配)

了解Shuffle Service之前,我们需要先了解和Shuffle Service有关的另一个特性:动态资源分配。

Spark管理资源有两种方式:静态资源分配和动态资源分配。

  • 静态资源分配:spark提交任务前,指定固定的资源,在spark运行任务过程中,一直占用这些资源不释放,job运行结束后才会释放。
  • 动态资源分配:Spark 会根据工作负荷,动态地调整作业使用的资源。具体一点来说,当工作负荷增大,Spark 会申请更多的 Executor,当工作负荷变小,则会移除多余的 Executor。这里所指的资源,主要是指 Executor 分配的 CPU/Memory,当然也包括一个 Executor JVM 进程占用的 Disk 和 Network IO 等等,而这里所指的工作负荷是指处于 Pending 和 Running 状态的 Task。

Spark 最早是从 on Yarn 模式支持 Dynamic Resouce Allocation 的特性。至少从 Spark 1.2 开始就已经可用了。

二 External Shuffle Service

与 Dynamic Resouce Allocation 关系紧密的是 External Shuffle Service。

正常来说,Executor 退出有两种情况,一是整个 Spark 任务结束,这是属于正常结束;二是 Executor 遇到 failure,会导致异常退出。在 Dynamic Resouce Allocation 的场景下,由于 Executor 数量会根据工作负荷增加或者移除,当 Spark Job 下游 Stage 需要读取上游 Stage 的状态(一般来说是数据 ShuffleMapStage 的数据),那么由于原来的 Executor 被 如果被移除了,随着 Executor 的退出,磁盘上的临时目录也会被移除。此时相关的数据就需要通过 RDD 的血缘重新计算了,通常来说这是非常耗时。

所以 Spark 需要一个 External Shuffle Service 来管理 Shuffle 数据,External Shuffle Service 本质上是一个辅助进程,原来在读取 Shuffle 数据的时候,是每个 Executor 互相读取,现在则是直接读取 External Shuffle Service,也相当于解耦了计算和读取数据的过程

External Shuffle Service模式如下图所示:

2.1 Shuffle分类

Shuffle是分布式计算框架用来衔接上下游任务的数据重分布过程,在分布式计算中所有涉及到数据上下游衔接的过程都可以理解为shuffle。针对不同的分布式框架,shuffle有几种实现形态:

  1. 基于文件的pull based shuffle,如MapReduce、Spark。这种shuffle方式多用于类MR的框架,比如MapReduce、Spark,它的特点是具有较高的容错性,适合较大规模的批处理作业。由于实现的是基于文件的shuffle方案,因此失败重跑时只须重跑失败的task、stage,而无须重跑整个job。

  2. 基于管道的push based shuffle,比如Flink、Storm等。基于管道的push based shuffle的实现方式多用于Flink、Storm等流式框架,或是一些MPP框架,如Presto、Greenplum等,它的特点是具有较低的延迟和较高的性能,但是比较大的问题是由于没有将shuffle数据持久化下来,因此任务的失败会导致整个作业的重跑。

Shuffle是分布式框架里面最为重要的一个环节,shuffle的性能和稳定性直接影响到了整个框架的性能和稳定性,因此改进shuffle框架是非常有必要的。

2.1.1 Spark pull based shuffle

在Spark 3.2之前采用的是 pull based shuffle。

传统的Spark shuffle 是 pull based 模型,详细 shuffle 过程如下:

  • Executor启动时向当前机器的 ESS 服务进行注册,同步 shuffle 目录配置等信息。
  • Map task 执行完会将计算结果写到本地磁盘, 然后将所有 blocks 信息上报给Driver。
  • Reduce task启动时会向 Driver 获取shuffle信息,提取当前 reduce task 需要读取的 blocks信息。Shuffle 请求使用线程池异步向所有 map task 所在的ESS 服务请求 shuffle 数据,reduce task 轮询消费请求结果,执行 reduce 计算。

Pull based shuffle 模型实际只发生一次从ESS服务到 reduce task 的网络传输,设计相对简单,大部分时候表现稳定。但是如果 spark job 非常大(比如 map task 和 reduce task 都是一万个,那么理论上的请求就有一亿个)也会存在如下一些问题:

  • **磁盘 I/O 效率低。**虽然 map task 最终会将 shuffle 结果合并成一个文件,但是每个reduce task 在请求 shuffle 数据的时候,一次只会请求一个 block 数据分片;ESS 服务在接收到无序的 shuffle 数据请求时,只能重复通过随机 I/O 方式读取大小在数十 KB 的 block 数据,因此磁盘吞吐会非常低。
  • **Shuffle 网络连接可靠性问题。**Spark executor 使用连接池来管理不同机器间的网络连接,对于相同地址的请求,会复用同一个连接。当 map tasks 的 shuffle 结果存储在 S 个 ESS 服务上,reduce tasks 分布在 E 个 executors 上,那么理论上还是会建立 E * S 个网络连接,生产环境中大的Spark job,E 和 S 都可能会超过1000,那么网络出现问题的概率就会比较大。虽然在网络出现问题的时候,即使 Spark 内部通过重试,恢复网络连接,重新获取到了 shuffle 数据,但是这些 task 的执行时间也会变长,从而影响 Spark job 执行时间。
  • **Shuffle 数据的本地性不好。**现在的计算机硬件,CPU速度远大于磁盘和网络 I/O,所以对于分布式系统,将计算分配到数据所在机器,可以得到更好的性能提升。但是 pull based shuffle 模型中,reduce task 计算需要的 shuffle 数据分散在不同的节点上,虽然在 Spark 中,fetch shuffle 数据和reduce task 计算可以同时进行,但是reduce task的计算一般还是快于 shuffle 数据的获取过程,从而限制了 reduce task 计算速度。
2.1.2 Spark push based shuffle

虽然Spark已经对 shuffle 过程做了很多优化,但是当集群的规模足够大的时候,shuffle read 仍然会有很多不稳定的情况。Linkedin 向 Spark社区贡献了他们内部基于 push based shuffle 实现的框架 Magnet [2],并在社区 Spark 3.2 版本 [3]实现基本可用。在 pull based shuffle 模型中,每个 reduce task 需要主动 pull 其 map stage 中每个 map task 输出的对应的 reduce 分片数据,但是在 push based shuffle 模型中,所有 map task 都会主动将同一个 reduce task 的数据 push 到同一个 ESS 服务, reduce task 就可以到这个 ESS 服务 fetch 合并好的 shuffle 数据了。

详细 shuffle 流程如下:

  • Executor启动时候向当前机器的 ESS 服务进行注册,同步 shuffle 目录配置等信息。

  • Map 阶段在启动之前,DAG scheduler 会尝试给该Stage 选择一组ESS 服务作为 PushMergerLocations 。

  • Map task 执行完会将计算结果写到本地磁盘, 然后将所有 blocks 信息上报给Driver。最后多了一个判断,如果开启 push based shuffle, map task会启动一个线程池读取本地的 shuffle 数据,将 shuffle block 数据推送到 block 所属的 reduce task 对应的 remote ESS 服务。

  • ESS 服务接收自不同 Executor 推送过来的 shuffle blocks,相同 reduce 的 shuffle 数据会合并到同一文件中,多个 shuffle blocks 会合并成一个 chunk 进行存储,此外还会有 index 文件和 meta 文件来保证数据的可靠性。

  • Reduce 阶段在启动之前会有一个等待,让更多的 map 结果被 push 到 remote ESS,然后 Spark driver 会向所有 PushMergerLocations 的 ESS 发送一个 FinalizeShuffleMerge 请求,ESS 服务收到请求后,停止接收 pushed shuffle 数据,并持久化所有缓存数据到文件中,最后向 Driver 返回最终 merged shuffle blocks 信息。

  • Reduce task启动后获取 shuffle Map Status 元数据和 merged status 元数据。对于已经 merge 好的 shuffle 数据,reduce task 先向ESS 服务请求获取 merged shuffle blocks 的 meta 数据,然后获取对应的 merged shuffle 数据;对于还没有被 merge 的 shuffle 数据,直接从原来的map task 所在节点的 ESS 服务请求读取 shuffle 数据。获取到 shuffle 数据后,继续执行 reduce task。

2.1.3 push based shuffle优点

Push based shuffle模型相比于 pull based shuffle 模型需要额外将 shuffle 数据传输到 remote ESS服务,但是为什么开启Push based shuffle 还会更快呢?

2.1.3.1 push shuffle数据和map tasks同时进行

Spark DAG在计算调度的时候,会将Job划分为stage,然后根据依赖关系逐个调度 stage来执行,其中reduce stage一定会等待map stage的所有task执行完成。因此在 map stage执行完成之前,先执行完 map task 的 executor 就有机会将 shuffle 数据传输到 remote ESS服务,而不影响 executor 同时执行其他 job 的tasks,所以开启push based shuffle并不影响集群的计算吞吐。

作为对比,在MapReduce 框架中有一个类似的优化。当 map tasks 和 reduce tasks 非常多的时候,一般情况下 map tasks 不会同时完成。为了优化 shuffle 过程, MapReduce 框架允许当 map tasks 完成一定的百分比后,就开始提前调度和启动部分 reduce task,这样提前启动的 reduce tasks 在仍有 map tasks 在计算的时候,就可以读取 shuffle 数据了;但是提前启动的这部分 reduce tasks还是依赖于全部map task 的输出,所以要等待所有map tasks执行完成,才能接着完成reduce task,同时还占用着集群的计算资源,所以在MapReduce框架要协调好他们之间的调度,才能更好的优化计算任务。

2.2.3.2 合并Shuffle blocks请求

Reduce shuffle wait 一般主要是因为ESS响应慢导致的。面对一个大的Spark Job时,ESS服务面临的是请求量大,请求时间比较集中,请求的数据量较小的 shuffle 数据请求。比如我们有十万个map task和一万个reduce task, 每个map task shuffle写数据是200M,如不考虑本地读 shuffle 数据的情况,则平均每个reduce 要向ESS服务发送十万个,平均大小为20k的 block请求,如果我们Spark集群有1000台ESS服务机器,则每个ESS服务要在短时间内服务一百万个平均大小为20k的shuffle block read RPC请求。

每个ESS服务使用Netty 线程池来响应这些请求,但受限于机器的CPU和磁盘资源,RPC请求量大的时候,仍会有请求等待Netty 分配线程来响应;对收到的 shuffle 数据请求,ESS服务先通过磁盘读取shuffle index文件,然后读取shuffle data 文件中对应的 shuffle block数据,最后返回结果。如果map task的 shuffle 数据是存储在 SSD磁盘上,磁盘I/O时间相对会快一些;但如果 shuffle 数据存储在HDD磁盘,shuffle block数据又非常小,频繁的随机 I/O 会导致整体磁盘吞吐下降,shuffle 请求延迟变大。

一般情况下,shuffle 服务都是和其他大数据组件部署在一起的。当 shuffle 节点上其他服务占用较多CPU或磁盘I/0资源时,也有可能会导致 ESS 服务响应比平时慢。当集群 ESS 节点较多时,这种情况发生概率更高。

开启push based shuffle 之后,ESS服务会将接收到的同一个 reduce task 的多个 shuffle block 数据合并为文件大小更大的 chunks,reduce 在请求 shuffle 数据的时候,每次返回一个chunk,,大大减轻了网络请求的压力,同时磁盘I/O从随机读取变成顺序读取,I/O效率明显提升,所以ESS服务响应更快更稳定。

2.1.3.3 Reduce本地读shuffle数据

Remote ESS 服务接收到shuffle 数据后,会将同一个 reduce task 的shuffle数据合并成同一个文件。Spark 为了避免 reduce task再通过网络读取该reduce task的 shuffle 数据,当remote ESS 服务合并了超过 REDUCER_PREF_LOCS_FRACTION (默认 20%)的 blocks 时,Spark DAG scheduler 会尽可能的把 reduce task调度到该 romote ESS 服务所在机器上执行,该reduce task的大部分shuffle数据就是本机读取,不再需要网络传输。

三 Magnet

为了解决Shuffle问题,LinkedIn的三位大佬提出、设计并实现了 Magnet,这是一种新颖的基于推送(push-based)的 shuffle 服务。Magnet 项目在今年早些时候作为 VLDB 2020 上发表的工业跟踪论文首次向社区亮相,您可以在此处阅读我们的 VLDB 论文:Magnet: Push-based Shuffle Service for Large-scale Data Processing。

Spark较低版本的shuffle和MapReduce很类似,中间的shuffle结果数据都是all-to-all的传输。也就是,所有的MapTask执行得到结果,然后需要传到所有的ReducerTask上执行。这几位LinkedIn的大佬提出了Magent(磁铁)一种全新的shuffle机制,可以扩展到具有数千个节点的Spark集群,处理PB级的shuffle数据。它的设计考虑到了本地的Spark集群、以及基于云的集群都可以使用。它会将一些小的中间shuffle结果合并到大的block来解决重要的shuffle扩展性的瓶颈。Magnet主要就是合并shuffle块,并且让合并后的block能够让Reducer任务高效读取。Magnet可以显著提高独立于底层硬件的shuffle性能,将LinkedIn生产上的Spark作业端到端运行时间减少近30%。

Spark 3.2实现了Magnet

四 Remote Shuffle Service

目前各大厂商实现了 Remote Shuffle Service,其实就是Spark push based shuffle service的一种变种。将spark shuffle的计算与存储分离,来适配云原生环境下的Spark。

  • 使用Push-Style Shuffle代替Pull-Style,减少Mapper的内存压力。
  • 支持IO聚合,Shuffle Read的连接数从M*N降到N,同时更改随机读为顺序读。
  • 支持两副本机制,降低Fetch Fail概率。
  • 支持计算与存储分离架构,可以部署Shuffle Service至特殊硬件环境中,与计算集群分离。
  • 解决Spark on Kubernetes时对本地磁盘的依赖。

本质是Spark push based shuffle serviced的一种变种,解决spark on k8s shuffle数据无法本地化问题

故,spark on yarn(spark 3.2+以上版本)无需Remote Shuffle Service.

五 Spark On Yarn ESS和RSS对比测试

ESS(Spark 3.2 External Shuffle Service)和 RSS(celeborn: 阿里开源的 Remote Shuffle service)

采用数仓用户资料加工流程进行验证,通过结果我们可以看出,在Spark on Yarn下 ESS和RSS性能基本相同。因为Spark 3.2的 ESS和 RSS本质上都是基于push based shuffle service的一个变种。

因好多云环境(k8s),无法(或者不允许)本地化数据,此时,ESS无法使用,RSS因此场景而产生。


欢迎关注微信公众号:大数据AI

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/669012.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java并发基础:Phaser全面解析!

内容概要 Phaser是Java中一个灵活的同步工具,其优点在于支持多阶段的任务拆分与同步,并且能够动态地注册与注销参与者,它提供了丰富的等待与推进机制,使得开发者能够更细粒度地控制线程的协调行为,实现复杂的并行任务…

Redis核心技术与实战【学习笔记】 - 25.Redis 支撑秒杀场景的关键技术

简述 秒杀是一个非常经典的活动场景,比如,在双 11、618 等电商促销活动中,都会有秒杀场景。秒杀场景的业务特点是限时限量,业务系统要处理瞬时的大量高并发请求,而 Redis 就经常被用来支撑秒杀活动。 秒杀场景包含多…

2.4日总结

第一题&#xff1a;选数 题解&#xff1a;思路还是很简单的&#xff0c;只需要想清楚dfs里的函数都是什么就可以了&#xff0c;还有一个简单的判断素数的函数&#xff0c;这题真没啥难度&#xff0c;就是属于基础题吧&#xff0c;请看AC代码 #include <stdio.h> #includ…

【c/python】GtkGrid

一、GtkGrid GtkGrid 是 GTK (GIMP Toolkit) 中的一个基础容器构件&#xff08;widget&#xff09;&#xff0c;它可以用来安排其他构件在一个灵活的多行多列的网格中。每个加入网格的构件都可以占据一个或多个行和列。由于 GtkGrid 提供了在二维空间中安排构件的方式&#xf…

YOLOv5独家改进:上采样算子 | 超轻量高效动态上采样DySample,效果秒杀CAFFE,助力小目标检测

💡💡💡本文独家改进:一种超轻量高效动态上采样DySample, 具有更少的参数、FLOPs,效果秒杀CAFFE和YOLOv5网络中的nn.Upsample 💡💡💡在多个数据集下验证能够涨点,尤其在小目标检测领域涨点显著。 收录 YOLOv5原创自研 https://blog.csdn.net/m0_63774211/cate…

多输入多输出 | Matlab实现PSO-LSTM粒子群优化长短期记忆神经网络多输入多输出预测

多输入多输出 | Matlab实现PSO-LSTM粒子群优化长短期记忆神经网络多输入多输出预测 目录 多输入多输出 | Matlab实现PSO-LSTM粒子群优化长短期记忆神经网络多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab实现PSO-LSTM粒子群优化长短期记忆神经网络…

2 物理层(一):数据通信的基本概念

目录 目标1 数据通信的基本概念1.1 数据通信的基本概念1、数据2、信息3、信号4、信道5、通信和数据通信6、数据通信网7、码元和码字8、数据分组9、基带传输、频带传输和宽带传输基带传输频带传输宽带传输 1.2 数据通信的主要技术指标1.3 数据通信系统1、数据通信系统基本模型编…

【ARM 嵌入式 编译系列 2.7 -- GCC 编译优化参数详细介绍】

文章目录 GCC 编译优化概述常用优化等级-O1 打开的优化选项-O2 打开的优化选项-O3 打开的优化选项-Os 打开的优化选项优化技术使用优化选项的注意事项GCC 编译优化概述 GCC(GNU Compiler Collection)包含了用于C、C++、Objective-C、Fortran、Ada和Go等语言的编译器。在编译…

能和ai聊天的软件有吗?分享4款智能软件!

随着科技的飞速发展&#xff0c;人工智能&#xff08;AI&#xff09;已经渗透到我们生活的方方面面。如今&#xff0c;我们只需一款软件&#xff0c;就能与AI进行流畅的对话。今天&#xff0c;就让我们一起揭开这些神秘软件的神秘面纱&#xff0c;看看它们如何让我们的沟通变得…

ONLYOFFICE:一站式办公,探索高效办公新境界

写在前面ONLYOFFICE 介绍ONLYOFFICE 有哪些优势ONLYOFFICE 文档 8.0 发布如何体验 ONLYOFFICEONLYOFFICE 文档部分页面截图 写在前面 在当今这样一个数字化时代&#xff0c;办公软件已经成为我们日常工作中不可或缺的一部分&#xff0c;熟练使用 Office、WPS、腾讯文档、金山文…

搜大学英语题,用哪个软件好?哪款大学搜题工具好用? #媒体#职场发展#媒体

大学生除了学习专业知识外&#xff0c;还应该关注和学习一些软技能&#xff0c;如沟通能力、团队合作和领导力等&#xff0c;以提升自己的综合素质。 1.好大学在线 好大学在线是上海交通大学拥有的中国顶尖慕课平台。 依托该平台&#xff0c;上海交通大学与百度及金智教育实…

机器学习 | 基于网格搜索的SVM超参数调节

机器学习模型被定义为一个数学模型&#xff0c;其中包含许多需要从数据中学习的参数。然而&#xff0c;有一些参数&#xff0c;称为超参数&#xff0c;这些参数不能直接学习。它们通常是由人类在实际训练开始前根据直觉或经验和试验选择的。这些参数通过提高模型的性能&#xf…

网络协议梳理

1 引言 在计算机网络中要做到有条不紊地交换数据&#xff0c;就必须遵守一些事先约定好的规则。这些规则明确规定了所交换的数据的格式以及有关的同步问题。这里所说的同步不是狭义的&#xff08;即同频或同频同相&#xff09;而是广义的&#xff0c;即在一定的条件下应当发生什…

每日一题——LeetCode1403.非递增顺序的最小子序列

方法一 个人方法&#xff1a; 按题目要求&#xff0c;尽可能先取出nums里最大的值&#xff0c;这样才能满足子序列尽可能短且元素之和最大 var minSubsequence function(nums) {nums.sort((a,b)>a-b)let sum1nums.reduce((a,b)>ab,0),sum20,res[]while(sum1>sum2){…

【并发编程】原子累加器

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;并发编程 ⛺️稳重求进&#xff0c;晒太阳 JDK8之后有专门做累加的类&#xff0c;效率比自己做快数倍以上 累加器性能比较 参数是方法 // supplier 提供者 无中生有 ()->结果// func…

golang并发安全-sync.Once

什么是sync.Once sync.Once 是 Go 语言中的一种同步原语&#xff0c;用于确保某个操作或函数在并发环境下只被执行一次。它只有一个导出的方法&#xff0c;即 Do&#xff0c;该方法接收一个函数参数。在 Do 方法被调用后&#xff0c;该函数将被执行&#xff0c;而且只会执行一…

Excel——高级筛选匹配条件提取数据

一、筛选多条件 Q&#xff1a;筛选多个条件&#xff0c;并将筛选出的内容复制到其他区域 点击任意一个单元格 点击【数据】——【筛选】——【高级筛选】 选择【将筛选结果复制到其他位置】——在【列表区域】 鼠标选择对应的区域位置&#xff0c;条件区域一定要单独写出来&a…

政安晨:示例演绎Python的函数与获取帮助的方法

调用函数和定义我们自己的函数&#xff0c;并使用Python内置的文档&#xff0c;是成为一位Pythoner的开始。 通过我的上篇文章&#xff0c;相信您已经看过并使用了print和abs等函数。但是Python还有许多其他函数&#xff0c;并且定义自己的函数是Python编程的重要部分。 在本…

【51单片机】LED的三个基本项目(LED点亮&LED闪烁&LED流水灯)(3)

前言 大家好吖&#xff0c;欢迎来到 YY 滴单片机系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过单片机的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的…

将xyz格式的GRACE数据转成geotiff格式

我们需要将xyz格式的文件转成geotiff便于成图&#xff0c;或者geotiff转成xyz用于数据运算&#xff0c;下面介绍如何实现这一操作&#xff0c;采用GMT和matlab两种方法。 1.GMT转换 我们先准备一个xyz文件&#xff0c;这里是一个降水文件。在gmt中采用以下的语句实现xyz转grd…