Stage划分和Task最佳位置

目录

 

1、Job Stage划分

2、Task最佳位置

3、总结

3.1 Stage划分总结:

3.2 Task最佳位置总结:


1、Job Stage划分

Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。而Stage划分的依据就是宽依赖。下面以RDD的collect方法为例:

(1)他是一个action会触发一个具体的作业runJob

(2)runJob有很多重载方法,不断地往里调用,最后交给dagScheduler的runJob,在dagScheduler的runJob交给了submitJob,后面还有一个等待作业结果看成功还是失败,会有相应的动作。

(3)在submitJob中首先看一下分区长度,是因为要进行计算,这个肯定是RDD导致的action他要校验一下是不是在运行的时候相应的Partition存在。

eventProcessLoop调用post的时候有个Jobsubmitted的参数,他是一个case class,因为一个application中可能有很多的Job,不同的job的Jobsubmitted实例不一样所以不能用case object。他里面封装了job的id,最后一个RDD,具体对RDD操作的函数,有哪些Partition要被计算,监听作业状态等。

他的核心就是将Jobsubmitted交给eventProcessLoop。他是通过post方法post给eventProcessLoop,这个post其实就是发往EventLoop里面的eventQueue

(4)发现在EventLoop里面开辟了一个线程,他是setDaemon方式作为后台线程,因为要在后台做不断的循环(如果是前台线程的话对垃圾回收是有影响的),在run方法里面会不断的循环我们的消息队列,从eventQueue(是一个LinkedBlockingDeque,我们可以往他里面信息)中获得消息,调用了onReceive,发现在里面没有具体的实现所以在DAGSchedulerEventProcessLoop中对onReceive进行了实现,这里就收到了DAGSchedulerEvent,这里面再调用doOnReceive。doOnReceive收到信息就开始处理

(5)接下来就是HandleJobSubmited。这个时候Stage就开始了。我们知道最后一个Stage一定是ResultStage,前面所有的Stage都是ShuffleMapStage。

(6)发现有个getOrCreateParentStages的方法,开始创建ResultStage的父stage,里面有多个嵌套获取shuffle依赖和循环创建shuffleMapStage,若没有shuffle,操作则返回空list

进入到创建父Stage的方法getOrCreateParentStages,这里仅仅是抽取当前RDD的shuffle依赖,shuffleMapStage,如果不是shuffleDependency就继续抽取父RDD,迭代遍历一直到抽取出为止或者没有

进入getOrCreateShuffleMapStage方法中,进行匹配能不能取到ParentStage的值,当没有parentStage的时候会返回空,能取到就返回stage,ShuffleMapStage是根据遍历出的ShuffleDependencies一次次创建出来的

进入createShuffleMapStage方法 此方法是递归循环创建shuffleMapStage的过程

这个时候ShuffleMapStage已经创建完成了,并不是一次就创建完成,而是遇见shuffle的时候会由下往上递归创建ShuffleMapStage

(7)构建完所有的ShuffleMapStage后,将其作为参数创建ResultStage

(8)最后将Stage和id关联,更新job所有的Stage,并将Stage返回出去。

(9)回到handleJobsubmited方法中,finalStage构建完之后,新建一个ActiveJob保存了当前job的一些信息,打印一堆日志之类。getMissingParentStages(finalStage)根据finalStage,刚才找父Stage的时候如果有的话直接返回,如果没有的话就会创建,所以如果曾经有就不需要再去做。listenerBus.post监听事件,最后submitStage(finalStage)。

首先获得id,如果jobId是defined的话再次getMissingParentStages(stage)获得missing的stage之后判断一下是否为空,如果为空的话就submitMissingTasks(stage, jobId.get)个就是没有前置性的Tasks,也就是没有父Stage。在这个底层其实是DAGScheduler把这个处理的过程交给具体的TaskScheduler去处理

2、Task最佳位置

(1)在handleJobsubmited方法中最后是最后调用submitStage,在他里面会调用submitMissingTasks

(2)这里面有很多代码,我们要关心Stage本身的算法以及Task任务本地性把当前的Stage加进去,然后对Stage进行判断,一种是ShuffleMapStage,一种是ResultStage。继续往下走会看到taskIdToLocations这是关键的代码,taskIdToLocations是一个Map

partitionsToCompute这里面获得是具体的要计算的PartitionID,我们我们这边看到的map里面的id是Partition的id。这里面匿名函数,产生的是tuple根据Partition的id。后面toMap就是Partition的id和TaskLocation的位置。

(3)进入到getPreferredLocs(stage.rdd, id),进来的是RDD,PartitionID返回的是一个集合。

再进入getPreferredLocsInternal

visited: HashSet[(RDD[_], Int)]这个HashSet开始是空,所以直接传进来一个new HashSet,然后判断visited如果已经有的话,那么添加就不成功,那么就是已经计算了数据本地性了,就返回Nil。

下面的cached就是已经在DAGScheduler的内存数据结构中了。进入getCacheLocs,这边返回的是序列,cacheLocs是一个HashMap,这包含了每个RDD的Partition的id以及id对应的taskLocation,这个包含了Stage本身也包含了Stage内部任务的本地性

(4)回到getPreferredLocsInternal中,上面是看一下DAGScheduler中有没有缓存根据Partition而保存的数据本地性的内容,如果不为空的话就把内容返回。然后调用下面的getpreferdLocations(如果自定义一个RDD的话是一定要写这个方法的)

(5)最后判断一下如果是窄依赖的话就自己调用自己

3、总结

3.1 Stage划分总结:

(1)Action触发Job,开始逆向分析job执行过程Action中利用SparkContext runJob路由到dagScheduler.runJob(rdd,func,分区数,其他),提交Job作业;

(2)DAGScheduler的runJob中调用submitJob并返回监听waiter,生命周期内监听Job状态;

(3)在submitJob内部,将该获取到的Job(已有JobId),插入到名为eventProcessLoop的LinkedBlockingDeque结构的事件处理队列中;

(4)eventProcessLoop放入新事件后,调起底层的DAGSchedulerEventProcessLoop的onReceive方法;

(5)执行doOnReceive,根据DAGSchedulerEvent的具体类型如JobSubmitted事件或者MapStageSubmitted事件,调取具体的Submitted handle函数提交具体的Job;

(6)以JobSubmitted为例,在handleJobSubmitted内部,返回从ResultStage 建立stage 建立finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite),finalStage激活Job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties),同时开始逆向构建缺失的stage;

(7)DAG构建完毕,提交stage,submitStage(finalStage),submitStage中stage提交为tasks,submitMissingTasks(),submitMissingTasks,根据ShuffleMapStage还是ResultStage创建 ShuffleMapTask 或 ResultTask。

(7)taskScheduler.submitTasks()开始调起具体的task

3.2 Task最佳位置总结:

(1)在划分Stage的时候submitMissingTasks方法中会有一个taskIdToLocations的属性,他的结构为 Map[Int, Seq[TaskLocation]],他保存的就是PartitionID及其对应的最佳位置

(2)在对taskIdToLocations赋值的时候会调用getPreferredLocs方法,再路由到getPreferredLocsInternal返回最佳位置Seq[TaskLocation]

(3)在getPreferredLocsInternal方法中

①判断rdd的partition是否被访问过,如果被访问过,则什么都不做

②然后判断DAGScheduler的内存中是否cache了在当前Paritition的信息,如果有的话直接返回

③如果没有cache,则调用rdd.getPreferredLocations方法,获取RDD partition的最佳位置

④遍历RDD的依赖,如果有窄依赖,遍历父依赖的partition,对遍历到的每个partition,递归调用getPreferredLocsInternal方法

即从第一个窄依赖的第一个partition开始,然后将每个partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列

注意:DAGScheduler计算数据本地性的时候借助了RDD自身的getPreferredLocations中的数据,因为getPreferredLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferredLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473634.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1636. 按照频率将数组升序排序(哈希+排序)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 nums ,请你将数组按照每个值的频率 升序 排序。 如果有多个值的频率相同,请你按照数值本身将它们 降序 排序。 请你返回排序后的数组。 示例 1: 输入:nums [1,1,2,2,2,3] 输出…

TaskScheduler相关

目录 1、DAGScheduler与TaskScheduler 2、TaskScheduler与SchedulerBackend 3、任务调度过程总结 1、DAGScheduler与TaskScheduler DAGScheduler面向我们整个Job划分出了Stage,划分了Stage是从后往前划分的,执行的时候是从前往后,每个Stag…

LeetCode 1637. 两点之间不包含任何点的最宽垂直面积

文章目录1. 题目2. 解题1. 题目 给你 n 个二维平面上的点 points ,其中 points[i] [xi, yi] ,请你返回两点之间内部不包含任何点的 最宽垂直面积 的宽度。 垂直面积 的定义是固定宽度,而 y 轴上无限延伸的一块区域(也就是高度为…

Task执行流程

1、源码走读 (1)当Driver中的SchedulerBackend(Standalone模式为CoarseGrainedSchedulerBackend)给ExecutorBackend(Standalone模式为CoarseGrainedExecutorBackend)发送LaunchTask之后,Coarse…

LeetCode 1638. 统计只差一个字符的子串数目(DP)

文章目录1. 题目2. 解题2.1 暴力枚举2.2 DP1. 题目 给你两个字符串 s 和 t ,请你找出 s 中的非空子串的数目,这些子串满足替换 一个不同字符 以后,是 t 串的子串。 换言之,请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串…

Airflow简介

1、什么是Airflow Airflow 是一个 Airbnb 的 Workflow 开源项目,使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统,可以简单理解为是高级版的crontab,但是它解决了crontab无法解决的任务…

Flask简介与简单项目操作流程

Flask框架简介Flask诞生于2010年,是Armin ronacher(人名)用Python语言基于Werkzeug工具箱编写的轻量级Web开发框架。它主要面向需求简单的小应用。Flask本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Fl…

LeetCode 1640. 能否连接形成数组(哈希)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 arr ,数组中的每个整数 互不相同 。 另有一个由整数数组构成的数组 pieces,其中的整数也 互不相同 。请你以 任意顺序 连接 pieces 中的数组以形成 arr 。但是,不允许 对每个数组 pieces[i]…

python基本知识、数据库、网络、编程等总结

Python语言特性 1 Python的函数参数传递 看两个例子: a 1 def fun(a):a 2 fun(a) print a # 1 a [] def fun(a):a.append(1) fun(a) print a # [1] 所有的变量都可以理解是内存中一个对象的“引用”,或者,也可以看似c中void*的感觉。 通过id来看引…

LeetCode 1641. 统计字典序元音字符串的数目(DP)

文章目录1. 题目2. 解题1. 题目 给你一个整数 n,请返回长度为 n 、仅由元音 (a, e, i, o, u) 组成且按 字典序排列 的字符串数量。 字符串 s 按 字典序排列 需要满足:对于所有有效的 i,s[i] 在字母表中的位置总是与 s[i1] 相同或在 s[i1] 之…

LeetCode 1642. 可以到达的最远建筑(二分查找 / 优先队列贪心)

文章目录1. 题目2. 解题2.1 二分查找2.2 优先队列贪心1. 题目 给你一个整数数组 heights ,表示建筑物的高度。另有一些砖块 bricks 和梯子 ladders 。 你从建筑物 0 开始旅程,不断向后面的建筑物移动,期间可能会用到砖块或梯子。 当从建筑…

ClickHouse高可用及副本测试

1 概述 ​ 对于默认的分布式表的配置,每个分片只有一份,这种多分片单副本集群,挂掉一个节点的话查询分布式表会报错。为了解决这个问题的话可以使用ClickHouse高可用集群,对于每个分片具有2个或2个以上的副本,当某个节…

阿里云Redis读写分离典型场景:如何轻松搭建电商秒杀系统

秒杀活动是绝大部分电商选择的低价促销,推广品牌的方式。不仅可以给平台带来用户量,还可以提高平台知名度。一个好的秒杀系统,可以提高平台系统的稳定性和公平性,获得更好的用户体验,提升平台的口碑,从而提…

LeetCode 756. 金字塔转换矩阵(回溯)

文章目录1. 题目2. 解题1. 题目 现在,我们用一些方块来堆砌一个金字塔。 每个方块用仅包含一个字母的字符串表示。 使用三元组表示金字塔的堆砌规则如下: 对于三元组(A, B, C) ,“C”为顶层方块,方块“A”、“B”分别作为方块“…

Flask框架项目实例:**租房网站(一)

Flask是一款MVC框架,主要是从模型、视图、模板三个方面对Flask框架有一个全面的认识,通过完成作者-读书功能,先来熟悉Flask框架的完整使用步骤。 操作步骤为: 1.创建项目2.配置数据库3.定义模型类4.定义视图并配置URL 5.定义模板…

LeetCode 316. 去除重复字母 / 1081. 不同字符的最小子序列(单调栈)

文章目录1. 题目2. 解题1. 题目 LC 316: 给你一个字符串 s ,请你去除字符串中重复的字母,使得每个字母只出现一次。需保证 返回结果的字典序最小(要求不能打乱其他字符的相对位置)。 示例 1: 输入&#…

LeetCode 809. 情感丰富的文字

文章目录1. 题目2. 解题1. 题目 有时候人们会用重复写一些字母来表示额外的感受,比如 "hello" -> "heeellooo", "hi" -> "hiii"。 我们将相邻字母都相同的一串字符定义为相同字母组,例如:&qu…

网站部署nginx--uwsgi

网站代码写完之后就是项目部署,主要包括两个方面: 1.nginx安装与配置: 1、Nginx 安装 系统平台:CentOS release 6.6 (Final) 64位。 一、安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl open…

天池 在线编程 滑动数独(滑动窗口)

文章目录1. 题目2. 解题1. 题目 描述 给定一个 3xn的矩阵 number,并且该矩阵只含有1到9的正整数。 考虑有一个大小为 3x3 滑动窗口,从左到右遍历该矩阵 number, 那么该滑动窗口在遍历整个矩阵的过程中会有n-2个。 现在你的任务是找出这些滑…

TIGK监控平台介绍

1 概述 众所周知监控平台对大数据平台是非常重要的,监控是故障诊断和分析的重要辅助利器,在发生事故之前就能预警,最大限度降低系统故障率。   监控系统我们可以分为业务层面,应用层面,系统层面 1.1 业务层面 业务系…