【大数据面试知识点】Spark的DAGScheduler

Spark数据本地化是在哪个阶段计算首选位置的?

先看一下DAGScheduler的注释,可以看到DAGScheduler除了Stage和Task的划分外,还做了缓存的跟踪和首选运行位置的计算。

DAGScheduler注释: 

The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.
Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage.
When looking through this code, there are several key concepts:

  • Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.
  • Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
  • Tasks are individual units of work, each sent to one machine.
  • Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
  • Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
  • Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.

To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.
Here's a checklist to use when making or reviewing changes to this class:

  • All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.
  • When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.

DAGScheduler的运行时机

DAGScheduler运行时机:Driver端初始化SparkContext时。DAGScheduler是在整个Spark Application的入口即 SparkContext中声明并实例化的。在实例化DAGScheduler之前,巳经实例化了SchedulerBackend和底层调度器 TaskScheduler。

如果是SQL任务的话,SparkSQL通过Catalyst(Spark SQL的核心是Catalyst优化器)将SQL先翻译成逻辑计划再翻译成物理计划,再转换成RDD的操作。之后运行时再通过DAGScheduler做RDD任务的划分和调度。

DAGScheduler如何划分Stage的?

用户提交的计算任务是一个由RDD依赖构成的DAG,Spark会把RDD的依赖以shuffle依赖为边界划分成多个Stage,这些Stage之间也相互依赖,形成了Stage的DAG。然后,DAGScheduler会按依赖关系顺序执行这些Stage。

要是把RDD依赖构成的DAG看成是逻辑执行计划(logic plan),那么,可以把Stage看成物理执行计划,为了更好的理解这个概念,我们来看一个例子。

下面的代码用来对README.md文件中包含整数值的单词进行计数,并打印RDD之间的依赖关系(Lineage):

scala> val counts = sc.textFile("README.md").flatMap(x=>x.split("\\W+")).filter(_.matches(".*\\d.*")).map(x=>(x,1)).reduceByKey(_+_)// 调用一个action函数,用来触发任务的提交和执行scala> counts.collect()​// 打印RDD的依赖关系(Lineage)scala> counts.toDebugStringres7: String =(2) ShuffledRDD[17] at reduceByKey at <console>:24 []+-(2) MapPartitionsRDD[16] at map at <console>:24 []|  MapPartitionsRDD[15] at filter at <console>:24 []|  MapPartitionsRDD[14] at flatMap at <console>:24 []|  README.md MapPartitionsRDD[13] at textFile at <console>:24 []|  README.md HadoopRDD[12] at textFile at <console>:24 []

DAGScheduler会根据Shuffle划分前后两个Stage:即StageShuffleMapStage和ResultStage

ShuffleMapStage

先看下ShuffleMapStage的注释,核心就是再讲ShuffleMapStage是做ShuffleWrite的Stage,Stage中是算子的pipline。

ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter). When executed, they save map output files that can later be fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of, and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
 
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage. 

ShuffleMapStages是在DAG执行过程中产生的Stage,用来为Shuffle产生数据。ShuffleMapStages发生在每个Shuffle操作之前,在Shuffle之前可能有多个窄转换操作,比如:map,filter,这些操作可以形成流水线(pipeline)。当执行ShuffleMapStages时,会产生Map的输出文件,这些文件会被随后的Reduce任务使用。

ShuffleMapStages也可以作为Jobs,通过DAGScheduler.submitMapStage函数单独进行提交。对于这样的Stages,会在变量mapStageJobs中跟踪提交它们的ActiveJobs。 要注意的是,可能有多个ActiveJob尝试计算相同的ShuffleMapStages。

它为一个shuffle过程产生map操作的输出文件。它也可能是自适应查询规划/自适应调度工作的最后阶段。

ResultStage

再看ResultStage的注释

ResultStages apply a function on some partitions of an RDD to compute the result of an action.
The ResultStage object captures the function to execute, `func`, which will be applied to each partition, and the set of partition IDs, `partitions`. Some stages may not run on all partitions of the RDD, for actions like first() and lookup().

ResultStage是Job的最后一个Stage,该Stage是基于执行action函数的rdd来创建的。该Stage用来计算一个action操作的结果。该类的声明如下:

 private[spark] class ResultStage(id: Int,rdd: RDD[_],val func: (TaskContext, Iterator[_]) => _,val partitions: Array[Int],parents: List[Stage],   //依赖的父StagefirstJobId: Int,callSite: CallSite)extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

为了计算action操作的结果,ResultStage会在目标RDD的一个或多个分区上使用函数:func。需要计算的分区id集合保存在成员变量:partitions中。但对于有些action操作,比如:first(),take()等,函数:func可能不会在所有分区上使用。

另外,在提交Job时,会先创建ResultStage。但在提交Stage时,会先递归找到该Stage依赖的父级Stage,并先提交父级Stage。如下图所示:

举个例子:

思考题 

如下rdd运算,为什么最终只划分了3个Stage

scala> val rdd1 = sc.textFile("/root/tmp/a.txt",3).flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)
val rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:1scala> val rdd2 = sc.textFile("/root/tmp/a.txt",3).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
val rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:1scala> val rdd3 = rdd1.join(rdd2)
val rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:1scala> val rdd4 = rdd3.groupByKey()
val rdd4: org.apache.spark.rdd.RDD[(String, Iterable[(Int, Int)])] = MapPartitionsRDD[13] at groupByKey at <console>:1scala> rdd4.collect().foreach(println)
(c,Seq((2,2)))                                                                  
(d,Seq((1,1)))
(a,Seq((2,2)))
(b,Seq((1,1)))scala> rdd4.toDebugString
val res8: String =
(3) MapPartitionsRDD[13] at groupByKey at <console>:1 []|  MapPartitionsRDD[12] at join at <console>:1 []|  MapPartitionsRDD[11] at join at <console>:1 []|  CoGroupedRDD[10] at join at <console>:1 []|  ShuffledRDD[4] at reduceByKey at <console>:1 []+-(3) MapPartitionsRDD[3] at map at <console>:1 []|  MapPartitionsRDD[2] at flatMap at <console>:1 []|  /root/tmp/a.txt MapPartitionsRDD[1] at textFile at <console>:1 []|  /root/tmp/a.txt HadoopRDD[0] at textFile at <console>:1 []|  ShuffledRDD[9] at reduceByKey at <console>:1 []+-(3) MapPartitionsRDD[8] at map at <console>:1 []|  MapPartitionsRDD[7] at flatMap at <console>:1 []|  /root/tmp/a.txt MapPartitionsRDD[6] at textFile at <console>:1 []|  /root/t...

参考:DAGScheduler-Stage的划分与提交 - 知乎Spark SQL 源码分析之Physical Plan 到 RDD的具体实现_physicalplan到rdd的具体实现-CSDN博客

一文搞定Spark的DAG调度器(DAGScheduler)_spark dagscheduler-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/590773.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据 - Hadoop系列《三》- HDFS(分布式文件系统)概述

&#x1f436;5.1 hdfs的概念 HDFS分布式文件系统,全称为:Hadoop Distributed File System。 它是一个文件系统&#xff0c;用于存储文件&#xff0c;通过目录树来定位文件&#xff1b;其次&#xff0c;它是分布式的&#xff0c;由很多服务器联合起来实现其功能&#xff0c;集…

(五)分文件编程

文章目录 为什么要引入分文件编程.C文件怎么添加.H文件怎么书写以及如何进行链接.H书写格式&#xff1a;“有头有尾标识符”例如&#xff08;timer.h) .H链接链接到头文件所在路径的文件夹路径即可 提供一个分文件编程的一种代码最后附上视频演示 为什么要引入分文件编程 C程序…

git的使用基础教程

最近项目在搞自动化测试&#xff0c;需要将各种测试脚本集成到自动化框架里边&#xff0c;这个就需要用到版本管理系统了,下面简单价绍一下git的使用。 首先从官网下载并安装git工具&#xff0c;下面以wins系统为例子说明 https://git-scm.com/downloads wins安装好后&#xff…

灸哥问答:软件架构在软件研发中的作用

软件架构在软件开发中扮演着至关重要的角色。我们在软件研发的过程中&#xff0c;类比于建造一座公寓楼&#xff0c;而软件架构就像是盖楼之前的设计图纸&#xff0c;如果没有设计图纸就直接盖楼&#xff0c;可想而知带来的后果是什么。我对软件架构的作用表现总结如下&#xf…

JS变量和函数提升

JS变量和函数提升 JS变量提升编译阶段执行阶段相同变量或函数 变量提升带来的问题变量容易不被察觉的遭覆盖本应销毁的变量未被销毁 如何解决变量提升带来的问题 JS变量提升 sayHi()console.log(myname)var myname yyfunction sayHi() {console.log(Hi) }// 执行结果: // Hi …

深度学习——PIL和OpenCV

PIL 官方文档 格式互转 opencv cv2.imread() 参数&#xff1a; filepath&#xff1a;读入imge的完整路径 flags&#xff1a;标志位&#xff0c;{cv2.IMREAD_COLOR&#xff0c;cv2.IMREAD_GRAYSCALE&#xff0c;cv2.IMREAD_UNCHANGED} cv2.IMREAD_COLOR&#xff1a;默认参数&…

Attention机制

前置知识&#xff1a;RNN&#xff0c;LSTM/GRU 提出背景 Attention模型是基于Encoder-Decoder框架提出的。Encoder-Decoder框架&#xff0c;也就是编码-解码框架&#xff0c;主要被用来处理序列-序列问题。 Encoder&#xff1a;编码器&#xff0c;将输入的序列<x1,x2,x3……

『番外篇十』SwiftUI 实战:打造一款“五脏俱全”的网络图片显示 App(下)

概览 在上篇文章中,我们初步实现了一款小巧的网络图片显示器。 我们先是创建了 json 数据对应的图片模型,然后将 App 界面“分而治之”划分为独立的三个组件以便“逐个击破”,最后我们将所有这些融合在一起。 不过,目前的实现仍有一些问题。比如我们添加了一层不必要的 …

详解Vue3中的鼠标事件mousemove、mouseover和mouseout

本文主要介绍Vue3中的常见鼠标事件mousemove、mouseover和mouseout。 目录 一、mousemove——鼠标移动事件二、mouseover——鼠标移入事件三、mouseout——鼠标移出事件 下面是Vue 3中常用的鼠标事件mousemove、mouseover和mouseout的详解。 一、mousemove——鼠标移动事件 鼠…

跟着cherno手搓游戏引擎【3】事件系统和预编译头文件

不多说了直接上代码&#xff0c;课程中的架构讲的比较宽泛&#xff0c;而且有些方法写完之后并未测试。所以先把代码写完。理解其原理&#xff0c;未来使用时候会再此完善此博客。 文件架构&#xff1a; Event.h:核心基类 #pragma once #include"../Core.h" #inclu…

JMeter使用

目录 启动JMeter 创建线程组 设置线程参数 设置http请求参数 ​编辑 创建查看结果树(显示成功/失败多少以及返回结果等信息) 创建聚合报告(显示响应时间、吞吐量、异常数等信息) 点击上方的执行按钮即可开始压力测试 结果树显示 聚合报告结果显示 启动JMeter 在JMete…

CSS-4

平面转换 整体认识 div {margin: 100px 0;width: 100px;height: 100px;background-color: pink;/* 过渡效果 */transition: all 1s;}/* 当鼠标悬停到div时&#xff0c;进行平面转换 */div:hover {transform: translate(800px) rotate(360deg) scale(2) skew(180deg);}作用&…

系统学习Python——装饰器:函数装饰器-[对方法进行装饰:使用嵌套函数装饰方法]

分类目录&#xff1a;《系统学习Python》总目录 如果想要函数装饰器在简单函数和类级别的方法上都能工作&#xff0c;最直接的解决办法在于使用前面文章介绍的状态保持方案之一&#xff1a;把自己的函数装饰器编写为嵌套的def&#xff0c;这样你就不会陷入单一的self实例参数既…

【unity学习笔记】捏人+眨眼效果+口型效果

一、vriod捏人 1.在vroidstudio软件中捏人 2.导出模型&#xff08;.vrm) 二、vrid导入unity的插件 1.在Git上搜索、打开univrm。 2.找到release页面找到合适的插件版本。&#xff08;VRM-0.116.0_0f6c&#xff09; 3.将univrm导入到工程中&#xff08;assets&#xff09;。 三…

前端显示json格式化

实现效果 在前端页面上展示格式化的JSON数据可以通过以下步骤完成&#xff1a; 获取JSON数据&#xff1a;首先&#xff0c;你需要获取要展示的JSON数据。你可以从后端API获取数据&#xff0c;或者直接在前端定义一个JSON对象。 格式化JSON&#xff1a;使用JavaScript的JSON对…

微服务雪崩问题及解决方案

雪崩问题 微服务中&#xff0c;服务间调用关系错综复杂&#xff0c;一个微服务往往依赖于多个其它微服务。 微服务之间相互调用&#xff0c;因为调用链中的一个服务故障&#xff0c;引起整个链路都无法访问的情况。 如果服务提供者A发生了故障&#xff0c;当前的应用的部分业务…

C++初阶------------------入门C++

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

【LMM 007】Video-LLaVA:通过投影前对齐以学习联合视觉表征的视频多模态大模型

论文标题&#xff1a;Video-LLaVA: Learning United Visual Representation by Alignment Before Projection 论文作者&#xff1a;Bin Lin, Yang Ye, Bin Zhu, Jiaxi Cui, Munan Ning, Peng Jin, Li Yuan 作者单位&#xff1a;Peking University, Peng Cheng Laboratory, Sun …

TCP中的三次握手和四次挥手

TCP中的连接和断开可以说是在面试中经常被问到的问题之一&#xff0c;正好有空就总结一下&#xff0c;首先回顾一下TCP的相关知识点 1. TCP的基础知识 1.1 TCP的基本概念 我们知道TCP是运输层的面向连接的可靠的传输协议。面向连接的&#xff0c;指的就是在两个进程发送数据…

简单FTP客户端软件开发——JavaFX开发FTP客户端

文章目录 导入外部包commons-net-3.10.0.jarJavaFX开发客户端 FTP客户端要求如下&#xff1a; 简单FTP客户端软件开发 网络环境中的一项基本应用就是将文件从一台计算机中复制到另一台可能相距很远的计算机中。而文件传送协议FTP是因特网上使用得最广泛的文件传送协议。FTP使用…