【Spark系列1】DAG中Stage和Task的划分全流程

一、整体流程

每个Aciton操作会创建一个JOB,JOB会提交给DAGScheduler,DAGScheduler根据RDD依赖的关系划分为多个Stage,每个Stage又会创建多个TaskSet,每个TaskSet包含多个Task,这个Task就是每个分区的并行计算的任务。DAGScheduler将TaskSet按照顺序提交给TaskScheduler,TaskScheduler将每一个任务去找SchedulerBackend申请执行所需要的资源,获取到资源后,SchedulerBackend将这些Task提交给Executor,Executor负责将这些任务运行起来。

二、JOB提交

2.1、为什么需要action操作

在Spark中,分为transformation操作和action操作。执行用户程序时,transformation操作将一个RDD转换成了新的RDD,并在compute()函数中,记录了如何根据父RDD计算出当前RDD的数据、RDD如何分区等信息,并且能够得出最后一个RDD的数据。 但是RDD中的每个分区中依然是一条一条的分散的数据,那么要对最后一个RDD执行什么操作呢?这就是action操作的作用。

2.2、Job提交

每个action操作都会生成一个Job,这个Job包含了需要计算的RDD对象、需要计算的分区、需要执行什么样的计算。RDD和用户执行的计算都是可以序列化的,RDD序列化之后,在Executor中反序列化之后即可得到该RDD对象,再根据对象compute()函数就可以计算出某个分区的数据。JOB中包含的数据如下所示

2.3、分布式执行

当提交Job以后,就可以将Job划分为多个并行的任务,每个任务计算指定分区的一个分区即可。通过RDD的计算函数即可计算出该分区的数据,今儿计算出分区的结果。

三、Stage划分

3.1、宽依赖和窄依赖

如果一个RDD的每个分区最多只能被一个Child RDD的一个分区所使用, 则称之为窄依赖(Narrow dependency), 如果被多个Child RDD分区依赖, 则称之为宽依赖(wide dependency)

3.2、Stage划分

在用户编写的一系列转换中,多个RDD可能既形成了多次窄依赖,也形成了多次宽依赖,连续的窄依赖可以通过一个任务进行流水线处理,但是如果遇到了宽依赖,就必须先将父RDD的所有数据都进行计算并保存起来,再进行RDD的运算。在一个Job中,action操作知识定义了在最后的RDD中执行何种操作,而最后的RDD会依赖上个RDD,上个RDD又会有其他依赖,这样就形成了一系列的依赖关系。如果为宽依赖的话,就在依赖的地方进行切分,先将宽依赖的父RDD进行计算出来,再计算后续的RDD,按照快依赖被划分的过程,即为Stage划分的过程。

如上图所示,rdd1->rdd2,rdd3->rdd4是窄依赖,rdd2->rdd3,rdd4->rdd5是宽依赖。在发生shuffle的位置,Spark将计算分为两个阶段分别执行,每发生一次shuffle,Spark就将计算划分为先后的两个阶段,如下图

在划分阶段的过程中,对于某个阶段而言其并行的计算任务都完全相同,因此在Job执行的过程中,并行计算就是指每个阶段中任务并行的计算。如在Stage1中,每个分区的数据可以使用一个任务进行计算。10000个分区即可在集群中并行运行10000个任务进行计算。如果集群资源不够,可以将10000个任务依次在集群中运行,直到运行完毕,再进行Stage2的计算。Stage2也会根据分区数启动多个任务并行的加载Stage1生成的数据,完成Stage2的计算。

在一个Job的运行过程中,所有的Stage其实都是为最后一个Stage做准备,因为action操作只需要最后一个RDD的数据。因此最后一个Stage称为ResultStage,之前所有的Stage都是由Shuffle引起的中间计算过程,被称为ShuffleMapStage。其过程如下图

3.3、Spark实现

再Spark实现中,SparkContext将Job提交至DAGScheduler,DAGScheduler获取Job中执行action操作的RDD,将最后执行action操作的RDD划分到最后的ResultStage中,然后遍历该RDD的依赖和所有的父依赖,每遇到宽依赖就将两个RDD划分到两个不同的Stage中,遇到窄依赖就将窄依赖的多个RDD划分到一个Stage中,经过这次操作,一个RDD就划分为有多个依赖关系的Stage。再每个Stage中,所有的RDD之间都是窄依赖的关系,Stage之间的RDD都是宽依赖的关系。DAGScheduler将最初被依赖的Stage提交,计算该Stage中的数据,计算完成后,再将后续的Stage提交,知道最后运行的ResultStage,则整个计算Job完成。ResultStage和ShuffleMapStage结构如下图

在生成ShuffleapStage时,ShuffleDependency起到了承上启下的作用,如果两个RDD之间为宽依赖,子RDD的依赖为ShuffleDependency;在划分Stage的时候,父Stage会保存该ShuffleDependency,以便在执行父Stage的时候,根据ShuffleDependency获取Shuffle的写入器,在子Stage执行的时候,会根据RDD的依赖关系使用相同的ShuffleDependency获取Shuffle的读取器。

在计算过程中,ShuffleMapStage会生成该Stage的结果,为下一个Stage提供数据,计算下一个Stage的RDD的时候,会拉取上一个Stage的计算结果。上一个Stage的计算保存在哪呢?答案是Spark的组件MapOutputTracker。MapOutputTracker也是主从结构,Executor端是MapOutputTrackerWroker,当ShuffleMapStage的任务运行完成后,会通过Executor上的MapOutputTrackerWroker将数据保存的位置发送到Driver上的MapOutputTrackerMaster中。在后续Stage需要上一个Stage的计算结果的时候,就通过MapOutputTrackerMaster询问计算结果的保存位置,进而加载相应的数据。

四、Task划分

DAGScheduler将Job划分为多个Stage之后,下一步就是将Stage划分为多个可以在集群中并行执行的任务,只有将任务并行执行,Stage才能更快的完成。

4.1、任务的个数

由于Stage中都是对RDD的计算,RDD又是分区的,所以在对任务进行划分的时候,每个分区可以启动一个任务进行计算。无论是ResultStage还是ShuffleMapStage,每个阶段能够并行执行的任务数量都取决于该阶段中最后一个Rdd的分区数量

上面已经介绍,在一个Stage中,RDD的依赖关系是窄依赖,所以最后一个RDD的分区数量取决于其依赖的RDD的分区数量,一直依赖到该阶段的开始的RDD的分区。对于第一阶段开始的RDD分为两种情况:

  1. 第一种为初始的RDD,即从数据源加载数据形成的初始RDD,这种情况的分区数量取决于初始RDD的形成分区方式。
  2. 第二种为该阶段的初始RDD为Shuffle阶段的Reduce任务,这种情况下,该RDD的分区数量取决于在Shuffle的Map阶段最后一个RDD的分区器设置的分区数量。

4.2、Task的生成

当确定了每个Stage的分区数量之后,就需要为每个分区生成相应的计算任务,该计算任务就是需要对该阶段的最后一个RDD执行什么操作

在ResultStage中,需要对最后一个RDD的每个分区分别执行用户自定义的action操作,所以在ResultStage中生成的每个Task都包含以下三个部分

  1. 需要对哪个RDD进行操作
  2. 需要对RDD哪个分区进行操作
  3. 需要对分区的内容执行什么样的操作

在ResultStage中划分的Task称为ResultTask,ResultTask中包含了ResultStage中最后一个RDD,即执行action操作的的RDD,需要计算的RDD分区的id和执行action操作的函数。

在ShuffleMapStage中,最终需要完成Shuffle过程中的Map阶段的操作,每个分区按照Shuffle中的Map端定义的过程执行数据的分组操作,将分组结果进行保存,并将保存结果位置通知Driver端的MapOutputTrackerMaster,MapOutputTrackerMaster保存着每一个Shuffle中Map输出的位置。在ShuffleMapStage中划分的Task称为ShuffleMapTask。ShuffleMapTask同样由三个重要的部分组成:Stage中最后的RDD、需要计算的分区的id、划分Stage的ShuffleDependency

4.3、Task的最佳运行位置

生成Task时,还会计算Task的最佳运行位置。虽然RDD包含计算RDD的所有信息,可以在任何节点上运行,但是如果通过为Task计算分配最佳的运行位置,可以将Task调度到含有该Task需要的数据的节点,从而实现移动计算而不是移动数据的目的。Spark会根据RDD可能分布的的情况,将Task的运行位置主要分为Host级别和Executor级别当一个RDD被某个Executor缓存,则对该RDD计算时,优先会把计算的Task调度到该Executor中执行。当一个RDD需要的数据存在某个host中时,则会把该Task调度到这个节点的Executor中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/660371.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数灵通外链工具如何实现回传功能?

在数字营销的世界里,了解用户行为是至关重要的。数灵通提供了一个强大的回传功能,能够让我们在用户完成某个动作后,获取到这个动作的数据反馈。利用这个功能,企业可以更好地了解用户需求和行为,从而优化营销策略&#…

Docker Compose下载

使用插件版本: 进行安装 乌班图的安装方式参考:centos 的安装方式参考; sudo yum update sudo yum install docker-compose-plugin通过检查版本来验证 Docker Compose 是否已正确安装。 docker compose version 预期输出: Dock…

编译LVGL遇到的问题及解决方式

问题1: 编译选项报错, 未识别 -Wshift-negative-value 选项 Building project file: main/src/main.c Building project file: main/src/mouse_cursor_icon.c cc: error: unrecognized command line option ‘-Wshift-negative-value’ cc: error: unrecognized command line …

【目标跟踪】3D点云跟踪

文章目录 一、前言二、代码目录三、代码解读3.1、文件描述3.2、代码框架 四、关联矩阵计算4.1、ComputeLocationDistance4.2、ComputeDirectionDistance4.3、ComputeBboxSizeDistance4.4、ComputePointNumDistance4.5、ComputePointNumDistance4.6、result_distance 五、结果 一…

Node.js 文件系统操作指南

文章目录 Node.js 文件系统操作完全指南一、引言二、基本文件操作2.1 读取文件2.2 写入文件2.3 追加内容到文件 三、文件与目录的创建与删除3.1 创建文件3.2 创建目录3.3 删除文件3.4 删除目录 四、文件与目录的信息查询4.1 检查文件或目录是否存在4.2 获取文件信息4.3 获取目录…

main函数、_tmain函数和wmain函数的区别

作者:朱金灿 来源:clever101的专栏 为什么大多数人学不会人工智能编程?>>> 今天碰到一个问题,算是彻底搞明白了main函数、_tmain函数和wmain函数的区别。就是使用vs2015新建一个控制台工程,如果入口函数是设…

C++ 11新特性之完美转发

概述 在C编程语言的演进过程中,C 11标准引入了一系列重大革新,其中之一便是“完美转发”机制。这一特性使得模板函数能够无损地传递任意类型的实参给其他函数或构造函数,从而极大地增强了C在泛型编程和资源管理方面的灵活性与效率。 完美转发…

探究HMAC算法:消息认证与数据完整性的完美结合

Hash-based Message Authentication Code(基于哈希的消息认证码,简称HMAC)算法作为一种广泛应用的消息认证码(MAC)算法,在现代信息安全领域起着至关重要的作用。本文将从算法原理、优缺点、实际应用等方面&…

20240128周报-网络太杂,Tomcat太难

今天来做个小总结吧,之前说想用几个月的时间将Java生态给整理一遍,该工作已经进入第三周了。先和各位老老板汇报一下上一周的工作,然后说一下本周的计划和后面的计划。 1.上周工作 上周的计划是将网络和Tomcat的内容梳理一番,但…

亚马逊要怎么运营?亚马逊运营主要运营内容有哪些?

一个店铺的成长发展少不了运营,而店铺的运营必须要有相关运营经验,才能将店铺做好,近几年亚马逊电商平台在不断的发展,亚马逊的运营模式非常独特,它借助于多种技术解决方案来提供最佳的客户体验。那么亚马逊要怎么运营…

你的MiniFilter安全吗?

简介 筛选器管理器 (FltMgr.sys)是Windows系统提供的内核模式驱动程序, 用于实现和公开文件系统筛选器驱动程序中通常所需的功能; 第三方文件系统筛选器开发人员可以使用FltMgr的功能可以更加简单的编写文件过滤驱动, 这种驱动我们通常称为MiniFilter, 下面是MiniFilter的基本…

python21-Python的字符串查找、替换相关方法

str还提供了如下常用的执行查找、替换等操作的方法。 startswith():判断字符串是否以指定子串开头。 endswith():判断字符串是否以指定子串结尾 find():查找指定子串在字符串中出现的位置,如果没有找到指定子串,则返回-1。 index():查找指定子串在字…

Java多线程--解决单例模式中的懒汉式的线程安全问题

文章目录 一、单例设计模式的线程安全问题(1)饿汉式没有线程安全问题(2)懒汉式线程安全问题1、案例2、方式1-同步方法3、方式2-同步代码块4、优化 二、代码(1)实现线程安全的懒汉式(2&#xff0…

猫什么时候发腮?公认发腮效果好的生骨肉冻干推荐

猫什么时候发腮是许多猫主人非常关心的问题。在猫咪的成长过程中,发腮是一项重要的体征,也是猫咪成熟的标志。想要让猫咪拥有可爱的肉嘟嘟脸型,主人需要在适龄的年龄段加强营养补给,不要错失最佳发腮期。那么,猫咪的最…

pytorch交换数组元素坑

写 PermutePatch 时遇到一个 bug:在试图交换 PyTorch 数组的两个元素时,两个位置都变成同一个元素!具体见测试代码。本文兼测几种交换情况: 两个 python 变量list 两个元素numpy 数组两个元素pytorch 数组两个元素 import numpy…

api接口1688商品详情接口采集商品详情数据商品价格详情页数据可支持高并发调用演示示例

接入1688商品详情API接口的步骤如下: 注册账号:首先,你需要在1688开放平台注册一个账号。 创建应用:登录后,在控制台中找到“我的应用”,点击“创建应用”。 获取API密钥:创建应用后&#xff…

【Linux】VMware Workstation16安装银河麒麟高级服务器操作系统V10 SP3 AMD64

目录 一、麒麟服务器概述 二、安装步骤 设置硬盘大小 完成配置 修改内存 处理器等设备配置 选择直接安装 配置磁盘 网络配置 设置root账号密码 开始安装 启动完成 一、麒麟服务器概述 银河麒麟高级服务器操作系统V10是针对企业级关键业务,适应虚拟化、云…

Android开发之动画

逐帧动画 通过在连续的帧之间快速切换来创建动画效果 var anim animationView.background as AnimationDrawable获取动画的Drawable资源anim.start()启动动画anim.stop()停止动画 private lateinit var animationView: ImageView private var flag: Boolean trueanimation…

C语言KR圣经笔记 6.8联合体 6.9位域

6.8 联合体(union) 联合体是一个可以(在不同时间)保存不同类型和大小的对象的变量,由编译器来跟踪大小和对齐要求。联合体提供了一种不用在程序中嵌入任何与机器相关的信息,而能够在单个存储区域内操作不同…

时间序列预测——GRU模型

时间序列预测——GRU模型 在深度学习领域,循环神经网络(RNN)是处理时间序列数据的一种常见选择。上期已介绍了LSTM的单步和多步预测。本文将深入介绍一种LSTM变体——门控循环单元(GRU)模型,包括其理论基础…