详解 Spark 编程之 RDD 依赖关系

一、依赖与血缘关系

在这里插入图片描述

  • 依赖:两个相邻 RDD 之间的关系
  • 血缘关系:多个连续的 RDD 的依赖
  • 由于 RDD 不会保存数据,为了提高容错性,每个 RDD 都会保存自己的血缘关系,一旦某个转换过程出现错误,可以根据血缘关系重新从数据源开始读取计算
object TestRDDDependency {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")val sc = new SparkContext(conf)val rdd1 = sc.textFile("data/word.txt")println(rdd1.toDebugString) // 打印血缘关系println(rdd1.dependencies) // 打印依赖关系println("----------------------")val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.toDebugString) // 打印血缘关系println(rdd2.dependencies) // 打印依赖关系println("----------------------")val rdd3 = rdd2.map((_, 1))println(rdd3.toDebugString) // 打印血缘关系println(rdd3.dependencies) // 打印依赖关系println("----------------------")val rdd4 = rdd3.reduceByKey(_ + _)println(rdd4.toDebugString) // 打印血缘关系println(rdd4.dependencies) // 打印依赖关系println("----------------------")}
}

二、宽窄依赖

  • 窄依赖:OneToOneDependency,表示每一个父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一个 Partition 使用,类比喻为独生子女

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
    
  • 宽依赖:ShuffleDependency,表示同一个父 (上游) RDD 的 Partition 被子 (下游) RDD 的多个 Partition 依赖或者说子 RDD 的一个 Partition 需要父 RDD 的多个 Partition 的数据,所以会引起 Shuffle 操作,类比喻为多生

    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false
    ) extends Dependency[Product2[K, V]] 
    

三、阶段划分

  • 窄依赖由于上游和下游的 RDD 分区是一对一的,所以整个的执行过程是不受其它分区执行结果的影响,每个分区只需要一个 task 就可以完成计算任务

在这里插入图片描述

  • 宽依赖由于存在 shuffle 操作,下游的 RDD 分区的数据计算需要等待上游 RDD 相关分区的数据全部执行完成后才能开始,所以存在不同阶段的划分,上游和下游 RDD 的每个分区都需要一个 task 来完成计算任务,所有阶段的划分和执行顺序可以由有向无环图 (DAG) 的形式来表示
    在这里插入图片描述

  • 阶段划分源码:

    /**结论:1.默认会至少存在一个阶段,即 resultStage,最后执行的阶段2.当存在 shuffle 依赖时,每存在一个会增加一个阶段(shuffleMapStage)3.阶段的数量 = shuffle 依赖数量 + 1
    */
    // 行动算子触发作业执行
    rdd.collect()// collect() 深入底层
    dagScheduler.runJob()// runJob() 中会调用 submitJob(),其中会调用 handleJobSubmitted()
    // handleJobSubmitted() 中的阶段划分
    try {finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {...
    }// createResultStage() 方法
    private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {val parents = getOrCreateParentStages(rdd, jobId) // 判断是否有上一阶段val id = nextStageId.getAndIncrement()val stage = new  ResultStage(id, rdd, func, partitions, parents, jobId,  callSite) // 至少存在一个 resultStage 阶段stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage
    }// getOrCreateParentStages(),判断是否有上一阶段
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖getShuffleDependencies(rdd).map { shuffleDep =>// 为 shuffle 依赖创建 ShuffleMapStage 阶段getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList
    }// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
    private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.pop()if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.push(dependency.rdd)}}}parents
    }
    

四、任务划分

  • RDD 任务划分中间分为:Application、Job、Stage 和 Task

    • Application:初始化一个 SparkContext 即生成一个 Application
    • Job:一个 Action 算子就会生成一个 Job
    • Stage:Stage 等于宽依赖 (ShuffleDependency) 的个数加 1
    • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
  • Application -> Job -> Stag e-> Task 之间每一层都是 1 对 n 的关系

  • 任务划分源码:

    val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage => partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary,  part,  locs,  stage.latestInfo.taskMetrics,  properties, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage => partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}
    }//
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()//
    override def findMissingPartitions(): Seq[Int] = {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/20336.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

随身wifi网络卡顿怎么解决?随身WiFi哪个牌子的最好用?排名第一名的随身WiFi!

对于随身wifi靠不靠谱这个问题&#xff0c;网上一直存在争议。很多人的随身wifi网速不稳定&#xff0c;信号看着满格就是上不了网。关于随身wifi卡顿到底该怎么解决呢&#xff1f; 1.如果是设备网络在一个地方上网速度很快&#xff0c;换一个地方网络就不行了&#xff0c;很可能…

Linux学习笔记(清晰且清爽)

本文首次发布于个人博客 想要获得最佳的阅读体验&#xff08;无广告且清爽&#xff09;&#xff0c;请访问本篇笔记 Linux安装 关于安装这里就不过多介绍了&#xff0c;安装版本是CentOS 7&#xff0c;详情安装步骤见下述博客在VMware中安装CentOS7&#xff08;超详细的图文教…

ios v品会 api-sign算法

vip品会 api-sign算法还原 ios入门案例 视频系列 IOS逆向合集-前言哔哩哔哩bilibili 一、ios难度与安卓对比 这里直接复制 杨如画大佬的文章的内容&#xff1a; ios难度与安卓对比 很多人说ios逆向比安卓简单&#xff0c;有以下几个原因 1 首先就是闭源&#xff0c;安卓开源…

vscode过滤器@modified(查看配置了哪些设置)

文档 visualstudio•docs•getstarted•settingshttps://code.visualstudio.com/docs/getstarted/settings 说明 使用modified可以过滤出&#xff1a; 配置过的设置&#xff08;和默认值不同&#xff09;&#xff1b; 在 settings.json 文件中配置了值的设置 步骤 1.打开…

Vue3实战笔记(53)—奇怪+1,VUE3实战模拟股票大盘工作台

文章目录 前言一、实战模拟股票大盘工作台二、使用步骤总结 前言 实战模拟股票大盘工作台 一、实战模拟股票大盘工作台 接上文&#xff0c;这两天封装好的组件直接应用,上源码&#xff1a; <template><div class"smart_house pb-5"><v-row ><…

JS对象由浅入深

对象 对象&#xff08;Object&#xff09;&#xff1a;JavaScript里的一种数据类型&#xff08;引用类型&#xff09;&#xff0c;也是用于存储数据的 好处&#xff1a;可以用来详细的描述某个事物&#xff0c;是用键值对形式存储语义更明了 特点&#xff1a;对象数据是无序的&…

模型 FABE(特性 优势 好处 证据)法则

说明&#xff1a;系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。特性、优势、好处、证据&#xff0c;一气呵成。 1 FABE法则的应用 1.1 FABE法则营销商用跑步机 一家高端健身器材公司的销售代表正在向一家新开的健身房推销他们的商用跑步机。以下…

【数据分享】中国电力年鉴(2004-2022)

大家好&#xff01;今天我要向大家介绍一份重要的中国电力统计数据资源——《中国电力年鉴》。这份年鉴涵盖了从2004年到2022年中国电力统计全面数据&#xff0c;并提供限时免费下载。&#xff08;无需分享朋友圈即可获取&#xff09; 数据介绍 自1993年首次出版以来&#xf…

【数据结构】链表与顺序表的比较

不同点&#xff1a; 顺序表和链表是两种常见的数据结构&#xff0c;他们的不同点在于存储方式和插入、删除操作、随机访问、cpu缓存利用率等方面。 一、存储方式不同: 顺序表&#xff1a; 顺序表的存储方式是顺序存储&#xff0c;在内存中申请一块连续的空间&#xff0c;通…

Java运算符及程序逻辑控制

&#x1f389;welcome to my blog 请留下你宝贵的足迹吧(点赞&#x1f44d;评论&#x1f4dd;收藏⭐&#xff09; &#x1f493;期待你的一键三连&#xff0c;你的鼓励是我创作的动力之源&#x1f493; &#x1f423;目录 &#x1f340;运算符&#x1f4da;1.算术运算符&#x…

python判断文件是否存在

import os test_path "/Users/yxk/Desktop/test/GrayScale.tif" if(os.path.exists(test_path)):print(文件存在&#xff01;&#xff01;&#xff01;&#xff01;) else:print("文件不存在&#xff01;&#xff01;&#xff01;&#xff01;")结果如下 …

RabbitMQ(四)事务消息,惰性队列,优先队列

文章目录 事务消息概念配置 惰性队列概念应用场景 优先队列概念配置 事务消息 仅在生产者端有效&#xff0c;消费端无效 概念 总结&#xff1a; 在生产者端使用事务消息和消费端没有关系在生产者端使用事务消息仅仅是控制事务内的消息是否发送提交事务就把事务内所有消息都发送…

Java面试——专业技能

优质博文&#xff1a;IT-BLOG-CN 一、简单讲下 Java 的跨平台原理 由于各个操作系统&#xff08;Windows&#xff0c;Linux等&#xff09;支持的指令集不是完全一致的。就会让我们程序在不同的操作系统上要执行不同的程序代码。Java 开发了适用于不同操作系统及位数的 Java 虚拟…

【教程】自监督 对比学习,代码,爽学一波

from&#xff1a; https://docs.lightly.ai/self-supervised-learning/examples/simclr.html

代码随想录第22天|回溯part2 组合总和III电话号码的字母组合

216.组合总和III 当组合的数量为k就判断和&#xff0c;并且返回。 在枚举的时候可以进行剪枝&#xff0c;如果总和已经超过了n&#xff0c;那么就没必要继续递归下去了 class Solution { public:vector<int> path;vector<vector<int>> res;void backTrackin…

Java版本家政上门系统源码,自主研发、安全可控,支持任意二次开发

家政上门系统源码&#xff0c;Java版本&#xff0c;自主研发、安全可控。支持任意二次开发、有丰富合作案例。多端管理&#xff1a;管理端、用户端、服务端。 技术参数&#xff1a; 技术架构&#xff1a;springboot、mysql 、Thymeleaf 开发语言&#xff1a;java1.8、vue 开…

软件开发步骤详解

一、引言 随着信息技术的迅猛发展&#xff0c;软件已成为现代社会不可或缺的一部分。无论是企业运营、个人生活还是科学研究&#xff0c;都离不开各种软件的支持。因此&#xff0c;掌握软件开发的步骤和技巧对于IT从业者来说至关重要。本文旨在详细介绍软件开发的整个流程&…

计算机网络期末复习(1)计算机网络在信息时代对的作用 计算机网络的定义和分类 三种交换方法

计算机网络在信息时代扮演着至关重要的角色&#xff0c;它极大地改变了我们生活、工作和学习的方式。 计算机网络在信息时代的作用 信息共享与传播&#xff1a;计算机网络使全球范围内的信息快速共享成为可能&#xff0c;无论是新闻、学术研究还是娱乐内容&#xff0c;都可以…

初识 JavaScript

目录 1. 什么是 JavaScript2. JS 引入方式2.1 内部引入方式2.2 外部引入方式 3. JS 中的注释4. JS 中的结束符5. 输入和输出5.1 输出5.2 输入 6. 变量与常量6.1 变量的声明6.2 变量的赋值6.3 常量 7. JS 中的数据类型8. JS 中的类型转换8.1 隐式转换8.2 显式转换 正文开始 1. …

钣金件设计规范

(一&#xff09; 钣金 1、钣金的概念 钣金&#xff08;sheet metal&#xff09;是针对金属薄板&#xff08;厚度通常在6mm以下&#xff09;的 一种综合冷加工工艺&#xff0c;包括冲裁、折弯、拉深、成形、锻压、铆合等&#xff0c; 其显著的特征是同一零件厚度一致。 2、钣…