【大数据面试知识点】Spark的DAGScheduler

Spark数据本地化是在哪个阶段计算首选位置的?

先看一下DAGScheduler的注释,可以看到DAGScheduler除了Stage和Task的划分外,还做了缓存的跟踪和首选运行位置的计算。

DAGScheduler注释: 

The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.
Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage.
When looking through this code, there are several key concepts:

  • Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.
  • Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
  • Tasks are individual units of work, each sent to one machine.
  • Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
  • Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
  • Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.

To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.
Here's a checklist to use when making or reviewing changes to this class:

  • All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.
  • When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.

DAGScheduler的运行时机

DAGScheduler运行时机:Driver端初始化SparkContext时。DAGScheduler是在整个Spark Application的入口即 SparkContext中声明并实例化的。在实例化DAGScheduler之前,巳经实例化了SchedulerBackend和底层调度器 TaskScheduler。

如果是SQL任务的话,SparkSQL通过Catalyst(Spark SQL的核心是Catalyst优化器)将SQL先翻译成逻辑计划再翻译成物理计划,再转换成RDD的操作。之后运行时再通过DAGScheduler做RDD任务的划分和调度。

DAGScheduler如何划分Stage的?

用户提交的计算任务是一个由RDD依赖构成的DAG,Spark会把RDD的依赖以shuffle依赖为边界划分成多个Stage,这些Stage之间也相互依赖,形成了Stage的DAG。然后,DAGScheduler会按依赖关系顺序执行这些Stage。

要是把RDD依赖构成的DAG看成是逻辑执行计划(logic plan),那么,可以把Stage看成物理执行计划,为了更好的理解这个概念,我们来看一个例子。

下面的代码用来对README.md文件中包含整数值的单词进行计数,并打印RDD之间的依赖关系(Lineage):

scala> val counts = sc.textFile("README.md").flatMap(x=>x.split("\\W+")).filter(_.matches(".*\\d.*")).map(x=>(x,1)).reduceByKey(_+_)// 调用一个action函数,用来触发任务的提交和执行scala> counts.collect()​// 打印RDD的依赖关系(Lineage)scala> counts.toDebugStringres7: String =(2) ShuffledRDD[17] at reduceByKey at <console>:24 []+-(2) MapPartitionsRDD[16] at map at <console>:24 []|  MapPartitionsRDD[15] at filter at <console>:24 []|  MapPartitionsRDD[14] at flatMap at <console>:24 []|  README.md MapPartitionsRDD[13] at textFile at <console>:24 []|  README.md HadoopRDD[12] at textFile at <console>:24 []

DAGScheduler会根据Shuffle划分前后两个Stage:即StageShuffleMapStage和ResultStage

ShuffleMapStage

先看下ShuffleMapStage的注释,核心就是再讲ShuffleMapStage是做ShuffleWrite的Stage,Stage中是算子的pipline。

ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter). When executed, they save map output files that can later be fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of, and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
 
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage. 

ShuffleMapStages是在DAG执行过程中产生的Stage,用来为Shuffle产生数据。ShuffleMapStages发生在每个Shuffle操作之前,在Shuffle之前可能有多个窄转换操作,比如:map,filter,这些操作可以形成流水线(pipeline)。当执行ShuffleMapStages时,会产生Map的输出文件,这些文件会被随后的Reduce任务使用。

ShuffleMapStages也可以作为Jobs,通过DAGScheduler.submitMapStage函数单独进行提交。对于这样的Stages,会在变量mapStageJobs中跟踪提交它们的ActiveJobs。 要注意的是,可能有多个ActiveJob尝试计算相同的ShuffleMapStages。

它为一个shuffle过程产生map操作的输出文件。它也可能是自适应查询规划/自适应调度工作的最后阶段。

ResultStage

再看ResultStage的注释

ResultStages apply a function on some partitions of an RDD to compute the result of an action.
The ResultStage object captures the function to execute, `func`, which will be applied to each partition, and the set of partition IDs, `partitions`. Some stages may not run on all partitions of the RDD, for actions like first() and lookup().

ResultStage是Job的最后一个Stage,该Stage是基于执行action函数的rdd来创建的。该Stage用来计算一个action操作的结果。该类的声明如下:

 private[spark] class ResultStage(id: Int,rdd: RDD[_],val func: (TaskContext, Iterator[_]) => _,val partitions: Array[Int],parents: List[Stage],   //依赖的父StagefirstJobId: Int,callSite: CallSite)extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

为了计算action操作的结果,ResultStage会在目标RDD的一个或多个分区上使用函数:func。需要计算的分区id集合保存在成员变量:partitions中。但对于有些action操作,比如:first(),take()等,函数:func可能不会在所有分区上使用。

另外,在提交Job时,会先创建ResultStage。但在提交Stage时,会先递归找到该Stage依赖的父级Stage,并先提交父级Stage。如下图所示:

举个例子:

思考题 

如下rdd运算,为什么最终只划分了3个Stage

scala> val rdd1 = sc.textFile("/root/tmp/a.txt",3).flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)
val rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:1scala> val rdd2 = sc.textFile("/root/tmp/a.txt",3).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
val rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:1scala> val rdd3 = rdd1.join(rdd2)
val rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:1scala> val rdd4 = rdd3.groupByKey()
val rdd4: org.apache.spark.rdd.RDD[(String, Iterable[(Int, Int)])] = MapPartitionsRDD[13] at groupByKey at <console>:1scala> rdd4.collect().foreach(println)
(c,Seq((2,2)))                                                                  
(d,Seq((1,1)))
(a,Seq((2,2)))
(b,Seq((1,1)))scala> rdd4.toDebugString
val res8: String =
(3) MapPartitionsRDD[13] at groupByKey at <console>:1 []|  MapPartitionsRDD[12] at join at <console>:1 []|  MapPartitionsRDD[11] at join at <console>:1 []|  CoGroupedRDD[10] at join at <console>:1 []|  ShuffledRDD[4] at reduceByKey at <console>:1 []+-(3) MapPartitionsRDD[3] at map at <console>:1 []|  MapPartitionsRDD[2] at flatMap at <console>:1 []|  /root/tmp/a.txt MapPartitionsRDD[1] at textFile at <console>:1 []|  /root/tmp/a.txt HadoopRDD[0] at textFile at <console>:1 []|  ShuffledRDD[9] at reduceByKey at <console>:1 []+-(3) MapPartitionsRDD[8] at map at <console>:1 []|  MapPartitionsRDD[7] at flatMap at <console>:1 []|  /root/tmp/a.txt MapPartitionsRDD[6] at textFile at <console>:1 []|  /root/t...

参考:DAGScheduler-Stage的划分与提交 - 知乎Spark SQL 源码分析之Physical Plan 到 RDD的具体实现_physicalplan到rdd的具体实现-CSDN博客

一文搞定Spark的DAG调度器(DAGScheduler)_spark dagscheduler-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/590773.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据 - Hadoop系列《三》- HDFS(分布式文件系统)概述

&#x1f436;5.1 hdfs的概念 HDFS分布式文件系统,全称为:Hadoop Distributed File System。 它是一个文件系统&#xff0c;用于存储文件&#xff0c;通过目录树来定位文件&#xff1b;其次&#xff0c;它是分布式的&#xff0c;由很多服务器联合起来实现其功能&#xff0c;集…

(五)分文件编程

文章目录 为什么要引入分文件编程.C文件怎么添加.H文件怎么书写以及如何进行链接.H书写格式&#xff1a;“有头有尾标识符”例如&#xff08;timer.h) .H链接链接到头文件所在路径的文件夹路径即可 提供一个分文件编程的一种代码最后附上视频演示 为什么要引入分文件编程 C程序…

neovim调试xv6-riscv过程中索引不到对应头文件问题

大家好&#xff0c;我叫徐锦桐&#xff0c;个人博客地址为www.xujintong.com&#xff0c;github地址为https://github.com/jintongxu。平时记录一下学习计算机过程中获取的知识&#xff0c;还有日常折腾的经验&#xff0c;欢迎大家访问。 和这篇文章neovim调试linux内核过程中索…

2024最新前端React面试题:JSX是什么,它和JS有什么区别

JSX是什么&#xff0c;它和JS有什么区别 回答思路&#xff1a;1.编写方式--->2.分别是什么&#xff1f;--->3.分别是怎么编译的&#xff1f;1.编写方式2.分别是什么&#xff1f;3.分别是怎么编译的&#xff1f; 回答思路&#xff1a;1.编写方式—>2.分别是什么&#x…

中国电科网安C++开发工程师校招一面面经

本文介绍2024届秋招中&#xff0c;中国电子科技网络信息安全有限公司的C/C开发工程师岗位一面的面试基本情况、提问问题等。 10月投递了中国电子科技网络信息安全有限公司的C/C开发工程师岗位&#xff0c;并不清楚所在的部门。目前完成了一面&#xff0c;在这里记录一下一面经历…

mysqlbinlog查看binlog

根据位置 mysqlbinlog --no-defaults -v --base64-outputdecode-rows --databasedbname --start-position1180 --stop-position1313 /www/server/data/mysql-bin.000007 > /path/db.sql根据时间 mysqlbinlog -v --base64-outputdecode-rows --databasedbname --start-date…

Oracle exists和in的效率问题

使用exists&#xff1a; select * from T1 where exists(select 1 from T2 where T1.aT2.a) ; 使用exists写法时&#xff0c;其中 “select 1 from T2 where T1.aT2.a” 相当于一个关联表查询&#xff0c;相当于“select 1 from T1,T2 where T1.aT2.a”&#xff0c; “selec…

git的使用基础教程

最近项目在搞自动化测试&#xff0c;需要将各种测试脚本集成到自动化框架里边&#xff0c;这个就需要用到版本管理系统了,下面简单价绍一下git的使用。 首先从官网下载并安装git工具&#xff0c;下面以wins系统为例子说明 https://git-scm.com/downloads wins安装好后&#xff…

灸哥问答:软件架构在软件研发中的作用

软件架构在软件开发中扮演着至关重要的角色。我们在软件研发的过程中&#xff0c;类比于建造一座公寓楼&#xff0c;而软件架构就像是盖楼之前的设计图纸&#xff0c;如果没有设计图纸就直接盖楼&#xff0c;可想而知带来的后果是什么。我对软件架构的作用表现总结如下&#xf…

[C语言]时间戳

时间戳的概念 时间戳就是定义一个时间点作为0秒, 之后每过一秒依此加一, 将当前的时间戳换算成年月日, 再加上起点, 获得的就是现在时刻的时间. 根据地球时区的偏移, 比如北京时间是东八区, 做一个偏移量的加减. 0起点: 1900年1月1日0时0分0秒. 0偏移地点: 英国伦敦 时间戳…

gradio-osprey-demo

创建需要的dockerfle ################### # 使用 Ubuntu 作为基础镜像 FROM nvcr.io/nvidia/cuda:11.8.0-devel-ubuntu22.04 # 更新软件包列表并安装依赖项 RUN apt update && \ apt install -y python3 python3-pip git ffmpeg libsm6 libxext6 curl wget …

JS变量和函数提升

JS变量和函数提升 JS变量提升编译阶段执行阶段相同变量或函数 变量提升带来的问题变量容易不被察觉的遭覆盖本应销毁的变量未被销毁 如何解决变量提升带来的问题 JS变量提升 sayHi()console.log(myname)var myname yyfunction sayHi() {console.log(Hi) }// 执行结果: // Hi …

[蓝桥杯 2023省模拟题]判断蓝桥

问题描述 输入一个字符串&#xff0c;请判断这个字符串是否正好是 lanqiao 。在输入时如果只是大小写不同也算作相同。 输入格式 输入一行包含一个字符串。 输出格式 如果是 lanqiao &#xff0c;输出全小写的字符串 yes &#xff0c;否则输出全小写的字符串 no 。 样例输…

深度学习——PIL和OpenCV

PIL 官方文档 格式互转 opencv cv2.imread() 参数&#xff1a; filepath&#xff1a;读入imge的完整路径 flags&#xff1a;标志位&#xff0c;{cv2.IMREAD_COLOR&#xff0c;cv2.IMREAD_GRAYSCALE&#xff0c;cv2.IMREAD_UNCHANGED} cv2.IMREAD_COLOR&#xff1a;默认参数&…

Attention机制

前置知识&#xff1a;RNN&#xff0c;LSTM/GRU 提出背景 Attention模型是基于Encoder-Decoder框架提出的。Encoder-Decoder框架&#xff0c;也就是编码-解码框架&#xff0c;主要被用来处理序列-序列问题。 Encoder&#xff1a;编码器&#xff0c;将输入的序列<x1,x2,x3……

如何理解IoC和DI?(小知识)

IOC就是控制反转&#xff0c;通俗的说就是我们不用自己创建实例对象&#xff0c;这些都交给Spring的bean工厂帮我们创建管理。这也是Spring的核心思想&#xff0c;通过面向接口编程的方式来是实现对业务组件的动态依赖。这就意味着IOC是Spring针对解决程序耦合而存在的。在实际…

『番外篇十』SwiftUI 实战:打造一款“五脏俱全”的网络图片显示 App(下)

概览 在上篇文章中,我们初步实现了一款小巧的网络图片显示器。 我们先是创建了 json 数据对应的图片模型,然后将 App 界面“分而治之”划分为独立的三个组件以便“逐个击破”,最后我们将所有这些融合在一起。 不过,目前的实现仍有一些问题。比如我们添加了一层不必要的 …

Spring MVC注解详解与实战:从请求处理到数据绑定

文章目录 第一部分&#xff1a;注解详解第二部分&#xff1a;代码示例 第一部分&#xff1a;注解详解 RequestBody 作用&#xff1a;主要用来处理Content-Type为application/json、application/xml等类型的请求体&#xff0c;将请求体中的参数绑定到方法的形参上。常用于处理PO…

详解Vue3中的鼠标事件mousemove、mouseover和mouseout

本文主要介绍Vue3中的常见鼠标事件mousemove、mouseover和mouseout。 目录 一、mousemove——鼠标移动事件二、mouseover——鼠标移入事件三、mouseout——鼠标移出事件 下面是Vue 3中常用的鼠标事件mousemove、mouseover和mouseout的详解。 一、mousemove——鼠标移动事件 鼠…

跟着cherno手搓游戏引擎【3】事件系统和预编译头文件

不多说了直接上代码&#xff0c;课程中的架构讲的比较宽泛&#xff0c;而且有些方法写完之后并未测试。所以先把代码写完。理解其原理&#xff0c;未来使用时候会再此完善此博客。 文件架构&#xff1a; Event.h:核心基类 #pragma once #include"../Core.h" #inclu…