详解 Spark 编程之 RDD 依赖关系

一、依赖与血缘关系

在这里插入图片描述

  • 依赖:两个相邻 RDD 之间的关系
  • 血缘关系:多个连续的 RDD 的依赖
  • 由于 RDD 不会保存数据,为了提高容错性,每个 RDD 都会保存自己的血缘关系,一旦某个转换过程出现错误,可以根据血缘关系重新从数据源开始读取计算
object TestRDDDependency {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")val sc = new SparkContext(conf)val rdd1 = sc.textFile("data/word.txt")println(rdd1.toDebugString) // 打印血缘关系println(rdd1.dependencies) // 打印依赖关系println("----------------------")val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.toDebugString) // 打印血缘关系println(rdd2.dependencies) // 打印依赖关系println("----------------------")val rdd3 = rdd2.map((_, 1))println(rdd3.toDebugString) // 打印血缘关系println(rdd3.dependencies) // 打印依赖关系println("----------------------")val rdd4 = rdd3.reduceByKey(_ + _)println(rdd4.toDebugString) // 打印血缘关系println(rdd4.dependencies) // 打印依赖关系println("----------------------")}
}

二、宽窄依赖

  • 窄依赖:OneToOneDependency,表示每一个父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一个 Partition 使用,类比喻为独生子女

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
    
  • 宽依赖:ShuffleDependency,表示同一个父 (上游) RDD 的 Partition 被子 (下游) RDD 的多个 Partition 依赖或者说子 RDD 的一个 Partition 需要父 RDD 的多个 Partition 的数据,所以会引起 Shuffle 操作,类比喻为多生

    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false
    ) extends Dependency[Product2[K, V]] 
    

三、阶段划分

  • 窄依赖由于上游和下游的 RDD 分区是一对一的,所以整个的执行过程是不受其它分区执行结果的影响,每个分区只需要一个 task 就可以完成计算任务

在这里插入图片描述

  • 宽依赖由于存在 shuffle 操作,下游的 RDD 分区的数据计算需要等待上游 RDD 相关分区的数据全部执行完成后才能开始,所以存在不同阶段的划分,上游和下游 RDD 的每个分区都需要一个 task 来完成计算任务,所有阶段的划分和执行顺序可以由有向无环图 (DAG) 的形式来表示
    在这里插入图片描述

  • 阶段划分源码:

    /**结论:1.默认会至少存在一个阶段,即 resultStage,最后执行的阶段2.当存在 shuffle 依赖时,每存在一个会增加一个阶段(shuffleMapStage)3.阶段的数量 = shuffle 依赖数量 + 1
    */
    // 行动算子触发作业执行
    rdd.collect()// collect() 深入底层
    dagScheduler.runJob()// runJob() 中会调用 submitJob(),其中会调用 handleJobSubmitted()
    // handleJobSubmitted() 中的阶段划分
    try {finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {...
    }// createResultStage() 方法
    private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {val parents = getOrCreateParentStages(rdd, jobId) // 判断是否有上一阶段val id = nextStageId.getAndIncrement()val stage = new  ResultStage(id, rdd, func, partitions, parents, jobId,  callSite) // 至少存在一个 resultStage 阶段stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage
    }// getOrCreateParentStages(),判断是否有上一阶段
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖getShuffleDependencies(rdd).map { shuffleDep =>// 为 shuffle 依赖创建 ShuffleMapStage 阶段getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList
    }// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
    private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.pop()if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.push(dependency.rdd)}}}parents
    }
    

四、任务划分

  • RDD 任务划分中间分为:Application、Job、Stage 和 Task

    • Application:初始化一个 SparkContext 即生成一个 Application
    • Job:一个 Action 算子就会生成一个 Job
    • Stage:Stage 等于宽依赖 (ShuffleDependency) 的个数加 1
    • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
  • Application -> Job -> Stag e-> Task 之间每一层都是 1 对 n 的关系

  • 任务划分源码:

    val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage => partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary,  part,  locs,  stage.latestInfo.taskMetrics,  properties, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage => partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}
    }//
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()//
    override def findMissingPartitions(): Seq[Int] = {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/20336.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript实现粒子数字倒计时效果附完整注释

<!DOCTYPE html> <html lang="en"><head><meta charset

随身wifi网络卡顿怎么解决?随身WiFi哪个牌子的最好用?排名第一名的随身WiFi!

对于随身wifi靠不靠谱这个问题&#xff0c;网上一直存在争议。很多人的随身wifi网速不稳定&#xff0c;信号看着满格就是上不了网。关于随身wifi卡顿到底该怎么解决呢&#xff1f; 1.如果是设备网络在一个地方上网速度很快&#xff0c;换一个地方网络就不行了&#xff0c;很可能…

股票买卖II

股票买卖II 时间限制&#xff1a;1秒 内存限制&#xff1a;128M 题目描述 给定一个长度为N的数组&#xff0c;数组中的第i个数字表示一个给定股票在第i天的价格。 设计一个算法来计算你所能获取的最大利润。你可以尽可能地完成更多的交易&#xff08;多次买卖一支股票…

解析Java中1000个常用类:Readable类,你学会了吗?

在 Java 编程中,处理输入流是一个常见的需求。Java 提供了多种方式来处理输入流,例如 InputStream、Reader 等类和接口。 而 Readable 接口是 Java 提供的一个简单而强大的接口,用于表示可读的字符序列。 本文将详细介绍 Readable 接口的用途、实现原理、应用场景,并通过…

Linux学习笔记(清晰且清爽)

本文首次发布于个人博客 想要获得最佳的阅读体验&#xff08;无广告且清爽&#xff09;&#xff0c;请访问本篇笔记 Linux安装 关于安装这里就不过多介绍了&#xff0c;安装版本是CentOS 7&#xff0c;详情安装步骤见下述博客在VMware中安装CentOS7&#xff08;超详细的图文教…

QT之全局忽略编译警告QMAKE_CXXFLAGS

全局忽略编译警告QMAKE_CXXFLAGS 这个是Qt中用来给编译器传递开关的&#xff0c;常写在’pro’文件或’pri’文件中。 将所有的警告当成错误处理 QMAKE_CXXFLAGS -Werror return-type //函数有返回值 QMAKE_CXXFLAGS -Werror return-local-addr //返回局部变量地址 QMAKE…

Dubbo架构概览:服务注册与发现、远程调用、监控与管理

Dubbo 是一个成熟的、高性能的、基于 Java 的微服务开发框架&#xff0c;它主要用于解决分布式系统中的服务治理问题&#xff0c;包括服务的注册与发现、远程过程调用&#xff08;RPC&#xff09;、服务监控与管理等多个关键环节。以下是Dubbo架构概览的详细介绍&#xff1a; …

3种使用OpenCV进行图像合成的技巧

准备好探索图像世界的魔法了吗&#xff1f;今天&#xff0c;我们将用Python和OpenCV库&#xff0c;一起解锁三种超炫的图像合成技巧&#xff0c;让你的照片变得与众不同&#xff01;&#x1f308; 1. 图像融合&#xff1a;让风景与梦境交织 想象一下&#xff0c;把日出的辉煌…

【前端每日基础】day33——响应式布局

响应式布局是一种网页设计的方法&#xff0c;它可以使网站在不同的设备上&#xff08;如桌面电脑、平板电脑、手机等&#xff09;以及不同的屏幕尺寸上呈现出最佳的显示效果。响应式布局的目标是使用户在任何设备上都能够方便地访问和浏览网站&#xff0c;而不需要使用不同版本…

ios v品会 api-sign算法

vip品会 api-sign算法还原 ios入门案例 视频系列 IOS逆向合集-前言哔哩哔哩bilibili 一、ios难度与安卓对比 这里直接复制 杨如画大佬的文章的内容&#xff1a; ios难度与安卓对比 很多人说ios逆向比安卓简单&#xff0c;有以下几个原因 1 首先就是闭源&#xff0c;安卓开源…

PH编程入门:从基础到实践的全方位解析

PH编程入门&#xff1a;从基础到实践的全方位解析 PH编程&#xff0c;作为一种独特而强大的编程语言&#xff0c;正逐渐在各个领域展现其巨大的潜力。对于初学者来说&#xff0c;如何快速入门并掌握PH编程的精髓&#xff0c;是一个既充满挑战又充满机遇的过程。本文将从四个方…

vscode过滤器@modified(查看配置了哪些设置)

文档 visualstudio•docs•getstarted•settingshttps://code.visualstudio.com/docs/getstarted/settings 说明 使用modified可以过滤出&#xff1a; 配置过的设置&#xff08;和默认值不同&#xff09;&#xff1b; 在 settings.json 文件中配置了值的设置 步骤 1.打开…

Ubuntu Linux 24.04 使用certbot生成ssl证书

设置域名 1. 将需要生成SSL证书的域名解析到IP地址 idealand.xyz <> 64.176.82.190 检查防火墙的设置 1. 首先查看防火墙的状态&#xff1a; # ufw status 2. 如果防火墙开启了&#xff0c;要开放80和443端口用于certbot验证 # ufw allow 80 # ufw allow 443 生…

Vue3实战笔记(53)—奇怪+1,VUE3实战模拟股票大盘工作台

文章目录 前言一、实战模拟股票大盘工作台二、使用步骤总结 前言 实战模拟股票大盘工作台 一、实战模拟股票大盘工作台 接上文&#xff0c;这两天封装好的组件直接应用,上源码&#xff1a; <template><div class"smart_house pb-5"><v-row ><…

JS对象由浅入深

对象 对象&#xff08;Object&#xff09;&#xff1a;JavaScript里的一种数据类型&#xff08;引用类型&#xff09;&#xff0c;也是用于存储数据的 好处&#xff1a;可以用来详细的描述某个事物&#xff0c;是用键值对形式存储语义更明了 特点&#xff1a;对象数据是无序的&…

模型 FABE(特性 优势 好处 证据)法则

说明&#xff1a;系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。特性、优势、好处、证据&#xff0c;一气呵成。 1 FABE法则的应用 1.1 FABE法则营销商用跑步机 一家高端健身器材公司的销售代表正在向一家新开的健身房推销他们的商用跑步机。以下…

数控切割编程:探索精密制造的奥秘与挑战

数控切割编程&#xff1a;探索精密制造的奥秘与挑战 在现代化制造领域&#xff0c;数控切割编程以其高精度、高效率的特性&#xff0c;成为众多行业不可或缺的工艺手段。然而&#xff0c;对于初学者或外行人来说&#xff0c;数控切割编程往往显得神秘且复杂。本文将从四个方面…

【数据分享】中国电力年鉴(2004-2022)

大家好&#xff01;今天我要向大家介绍一份重要的中国电力统计数据资源——《中国电力年鉴》。这份年鉴涵盖了从2004年到2022年中国电力统计全面数据&#xff0c;并提供限时免费下载。&#xff08;无需分享朋友圈即可获取&#xff09; 数据介绍 自1993年首次出版以来&#xf…

【数据结构】链表与顺序表的比较

不同点&#xff1a; 顺序表和链表是两种常见的数据结构&#xff0c;他们的不同点在于存储方式和插入、删除操作、随机访问、cpu缓存利用率等方面。 一、存储方式不同: 顺序表&#xff1a; 顺序表的存储方式是顺序存储&#xff0c;在内存中申请一块连续的空间&#xff0c;通…

解决OpenCV读取目标图像,cv2.imshow出现闪退的问题

前言 本文是该专栏的第17篇,后面将持续分享OpenCV计算机视觉的干货知识,记得关注。 最近有粉丝朋友询问到OpenCV读取目标图像出现的一个问题,在基于python语言“使用OpenCV读取目标图像的时候,利用cv2.imshow函数出现闪退”的情况。 而本文,笔者将详细介绍针对上述问题,…