目录
1、Job Stage划分
2、Task最佳位置
3、总结
3.1 Stage划分总结:
3.2 Task最佳位置总结:
1、Job Stage划分
Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。而Stage划分的依据就是宽依赖。下面以RDD的collect方法为例:
(1)他是一个action会触发一个具体的作业runJob
(2)runJob有很多重载方法,不断地往里调用,最后交给dagScheduler的runJob,在dagScheduler的runJob交给了submitJob,后面还有一个等待作业结果看成功还是失败,会有相应的动作。
(3)在submitJob中首先看一下分区长度,是因为要进行计算,这个肯定是RDD导致的action他要校验一下是不是在运行的时候相应的Partition存在。
eventProcessLoop调用post的时候有个Jobsubmitted的参数,他是一个case class,因为一个application中可能有很多的Job,不同的job的Jobsubmitted实例不一样所以不能用case object。他里面封装了job的id,最后一个RDD,具体对RDD操作的函数,有哪些Partition要被计算,监听作业状态等。
他的核心就是将Jobsubmitted交给eventProcessLoop。他是通过post方法post给eventProcessLoop,这个post其实就是发往EventLoop里面的eventQueue
(4)发现在EventLoop里面开辟了一个线程,他是setDaemon方式作为后台线程,因为要在后台做不断的循环(如果是前台线程的话对垃圾回收是有影响的),在run方法里面会不断的循环我们的消息队列,从eventQueue(是一个LinkedBlockingDeque,我们可以往他里面信息)中获得消息,调用了onReceive,发现在里面没有具体的实现所以在DAGSchedulerEventProcessLoop中对onReceive进行了实现,这里就收到了DAGSchedulerEvent,这里面再调用doOnReceive。doOnReceive收到信息就开始处理
(5)接下来就是HandleJobSubmited。这个时候Stage就开始了。我们知道最后一个Stage一定是ResultStage,前面所有的Stage都是ShuffleMapStage。
(6)发现有个getOrCreateParentStages的方法,开始创建ResultStage的父stage,里面有多个嵌套获取shuffle依赖和循环创建shuffleMapStage,若没有shuffle,操作则返回空list
进入到创建父Stage的方法getOrCreateParentStages,这里仅仅是抽取当前RDD的shuffle依赖,shuffleMapStage,如果不是shuffleDependency就继续抽取父RDD,迭代遍历一直到抽取出为止或者没有
进入getOrCreateShuffleMapStage方法中,进行匹配能不能取到ParentStage的值,当没有parentStage的时候会返回空,能取到就返回stage,ShuffleMapStage是根据遍历出的ShuffleDependencies一次次创建出来的
进入createShuffleMapStage方法 此方法是递归循环创建shuffleMapStage的过程
这个时候ShuffleMapStage已经创建完成了,并不是一次就创建完成,而是遇见shuffle的时候会由下往上递归创建ShuffleMapStage
(7)构建完所有的ShuffleMapStage后,将其作为参数创建ResultStage
(8)最后将Stage和id关联,更新job所有的Stage,并将Stage返回出去。
(9)回到handleJobsubmited方法中,finalStage构建完之后,新建一个ActiveJob保存了当前job的一些信息,打印一堆日志之类。getMissingParentStages(finalStage)根据finalStage,刚才找父Stage的时候如果有的话直接返回,如果没有的话就会创建,所以如果曾经有就不需要再去做。listenerBus.post监听事件,最后submitStage(finalStage)。
首先获得id,如果jobId是defined的话再次getMissingParentStages(stage)获得missing的stage之后判断一下是否为空,如果为空的话就submitMissingTasks(stage, jobId.get)个就是没有前置性的Tasks,也就是没有父Stage。在这个底层其实是DAGScheduler把这个处理的过程交给具体的TaskScheduler去处理
2、Task最佳位置
(1)在handleJobsubmited方法中最后是最后调用submitStage,在他里面会调用submitMissingTasks
(2)这里面有很多代码,我们要关心Stage本身的算法以及Task任务本地性把当前的Stage加进去,然后对Stage进行判断,一种是ShuffleMapStage,一种是ResultStage。继续往下走会看到taskIdToLocations这是关键的代码,taskIdToLocations是一个Map
partitionsToCompute这里面获得是具体的要计算的PartitionID,我们我们这边看到的map里面的id是Partition的id。这里面匿名函数,产生的是tuple根据Partition的id。后面toMap就是Partition的id和TaskLocation的位置。
(3)进入到getPreferredLocs(stage.rdd, id),进来的是RDD,PartitionID返回的是一个集合。
再进入getPreferredLocsInternal
visited: HashSet[(RDD[_], Int)]这个HashSet开始是空,所以直接传进来一个new HashSet,然后判断visited如果已经有的话,那么添加就不成功,那么就是已经计算了数据本地性了,就返回Nil。
下面的cached就是已经在DAGScheduler的内存数据结构中了。进入getCacheLocs,这边返回的是序列,cacheLocs是一个HashMap,这包含了每个RDD的Partition的id以及id对应的taskLocation,这个包含了Stage本身也包含了Stage内部任务的本地性
(4)回到getPreferredLocsInternal中,上面是看一下DAGScheduler中有没有缓存根据Partition而保存的数据本地性的内容,如果不为空的话就把内容返回。然后调用下面的getpreferdLocations(如果自定义一个RDD的话是一定要写这个方法的)
(5)最后判断一下如果是窄依赖的话就自己调用自己
3、总结
3.1 Stage划分总结:
(1)Action触发Job,开始逆向分析job执行过程Action中利用SparkContext runJob路由到dagScheduler.runJob(rdd,func,分区数,其他),提交Job作业;
(2)DAGScheduler的runJob中调用submitJob并返回监听waiter,生命周期内监听Job状态;
(3)在submitJob内部,将该获取到的Job(已有JobId),插入到名为eventProcessLoop的LinkedBlockingDeque结构的事件处理队列中;
(4)eventProcessLoop放入新事件后,调起底层的DAGSchedulerEventProcessLoop的onReceive方法;
(5)执行doOnReceive,根据DAGSchedulerEvent的具体类型如JobSubmitted事件或者MapStageSubmitted事件,调取具体的Submitted handle函数提交具体的Job;
(6)以JobSubmitted为例,在handleJobSubmitted内部,返回从ResultStage 建立stage 建立finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite),finalStage激活Job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties),同时开始逆向构建缺失的stage;
(7)DAG构建完毕,提交stage,submitStage(finalStage),submitStage中stage提交为tasks,submitMissingTasks(),submitMissingTasks,根据ShuffleMapStage还是ResultStage创建 ShuffleMapTask 或 ResultTask。
(7)taskScheduler.submitTasks()开始调起具体的task
3.2 Task最佳位置总结:
(1)在划分Stage的时候submitMissingTasks方法中会有一个taskIdToLocations的属性,他的结构为 Map[Int, Seq[TaskLocation]],他保存的就是PartitionID及其对应的最佳位置
(2)在对taskIdToLocations赋值的时候会调用getPreferredLocs方法,再路由到getPreferredLocsInternal返回最佳位置Seq[TaskLocation]
(3)在getPreferredLocsInternal方法中
①判断rdd的partition是否被访问过,如果被访问过,则什么都不做
②然后判断DAGScheduler的内存中是否cache了在当前Paritition的信息,如果有的话直接返回
③如果没有cache,则调用rdd.getPreferredLocations方法,获取RDD partition的最佳位置
④遍历RDD的依赖,如果有窄依赖,遍历父依赖的partition,对遍历到的每个partition,递归调用getPreferredLocsInternal方法
即从第一个窄依赖的第一个partition开始,然后将每个partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列
注意:DAGScheduler计算数据本地性的时候借助了RDD自身的getPreferredLocations中的数据,因为getPreferredLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferredLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化。