一、依赖与血缘关系
- 依赖:两个相邻 RDD 之间的关系
- 血缘关系:多个连续的 RDD 的依赖
- 由于 RDD 不会保存数据,为了提高容错性,每个 RDD 都会保存自己的血缘关系,一旦某个转换过程出现错误,可以根据血缘关系重新从数据源开始读取计算
object TestRDDDependency {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")val sc = new SparkContext(conf)val rdd1 = sc.textFile("data/word.txt")println(rdd1.toDebugString) // 打印血缘关系println(rdd1.dependencies) // 打印依赖关系println("----------------------")val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.toDebugString) // 打印血缘关系println(rdd2.dependencies) // 打印依赖关系println("----------------------")val rdd3 = rdd2.map((_, 1))println(rdd3.toDebugString) // 打印血缘关系println(rdd3.dependencies) // 打印依赖关系println("----------------------")val rdd4 = rdd3.reduceByKey(_ + _)println(rdd4.toDebugString) // 打印血缘关系println(rdd4.dependencies) // 打印依赖关系println("----------------------")}
}
二、宽窄依赖
-
窄依赖:OneToOneDependency,表示每一个父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一个 Partition 使用,类比喻为独生子女
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
-
宽依赖:ShuffleDependency,表示同一个父 (上游) RDD 的 Partition 被子 (下游) RDD 的多个 Partition 依赖或者说子 RDD 的一个 Partition 需要父 RDD 的多个 Partition 的数据,所以会引起 Shuffle 操作,类比喻为多生
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false ) extends Dependency[Product2[K, V]]
三、阶段划分
- 窄依赖由于上游和下游的 RDD 分区是一对一的,所以整个的执行过程是不受其它分区执行结果的影响,每个分区只需要一个 task 就可以完成计算任务
-
宽依赖由于存在 shuffle 操作,下游的 RDD 分区的数据计算需要等待上游 RDD 相关分区的数据全部执行完成后才能开始,所以存在不同阶段的划分,上游和下游 RDD 的每个分区都需要一个 task 来完成计算任务,所有阶段的划分和执行顺序可以由有向无环图 (DAG) 的形式来表示
-
阶段划分源码:
/**结论:1.默认会至少存在一个阶段,即 resultStage,最后执行的阶段2.当存在 shuffle 依赖时,每存在一个会增加一个阶段(shuffleMapStage)3.阶段的数量 = shuffle 依赖数量 + 1 */ // 行动算子触发作业执行 rdd.collect()// collect() 深入底层 dagScheduler.runJob()// runJob() 中会调用 submitJob(),其中会调用 handleJobSubmitted() // handleJobSubmitted() 中的阶段划分 try {finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch {... }// createResultStage() 方法 private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {val parents = getOrCreateParentStages(rdd, jobId) // 判断是否有上一阶段val id = nextStageId.getAndIncrement()val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 至少存在一个 resultStage 阶段stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage }// getOrCreateParentStages(),判断是否有上一阶段 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖getShuffleDependencies(rdd).map { shuffleDep =>// 为 shuffle 依赖创建 ShuffleMapStage 阶段getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList }// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖 private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.pop()if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.push(dependency.rdd)}}}parents }
四、任务划分
-
RDD 任务划分中间分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个 Application
- Job:一个 Action 算子就会生成一个 Job
- Stage:Stage 等于宽依赖 (ShuffleDependency) 的个数加 1
- Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
-
Application -> Job -> Stag e-> Task 之间每一层都是 1 对 n 的关系
-
任务划分源码:
val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage => partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage => partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}} }// val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()// override def findMissingPartitions(): Seq[Int] = {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions) }