TaskScheduler相关

目录

 

1、DAGScheduler与TaskScheduler

2、TaskScheduler与SchedulerBackend

3、任务调度过程总结


1、DAGScheduler与TaskScheduler

DAGScheduler面向我们整个Job划分出了Stage,划分了Stage是从后往前划分的,执行的时候是从前往后,每个Stage内部有一系列任务,Stage里面的任务是并行计算的,这些并行计算的任务的逻辑是完全相同的,只不过是处理的数据不同而已。DAGScheduler会以TaskSet的方式以一个DAG构造的Stage中所有的任务提交给底层调度器TaskScheduler,TaskScheduler是一个接口(做接口的好处就是跟具体的任务调度解耦合,这样Spark就可以运行在不同的资源调度模式上Standalone,yarn,mesos等)这符合面向对象中依赖抽象而不依赖具体的原则,带来了底层资源调度器的可插拔性,导致Spark可以运行在众多的资源调度器模式上。

DAGScheduler是高层调度器,TaskScheduler是底层调度器。高层调度器本身是属于Spark自己内核的实现,通过划分不同的Stage,这个是优化的核心。底层调度器做了一个接口,面向接口了解这个接口可以自己定义底层的具体调度器。

2、TaskScheduler与SchedulerBackend

DAGScheduler完成了面向Stage的划分之后,会按照顺序逐个将我们的Stage通过TaskSchedulerImpl的submitTaskScheduler提交给底层调度器,调度过程如下:

(1)DAG构建完毕,提交stage给taskScheduler,taskScheduler执行submitTasks开始调起具体的Task,他主要的作用是将TaskSet加到我们的TaskSetManager中,这里new了一个TaskSet。

(2)因为TaskScheduler是个接口,进入到他的具体实现,也就是TaskSchedulerImpl。在这里任务集合中的任务可能是ShuffleMapTask也可能是ResultTask。我们把任务集合赋值给变量tasks,然后在同步代码块中创建了一个createTaskSetManager,在其内部路由了TaskSetManager。构建TaskSetManager的参数包括this(TaskSchedulerImpl本身),taskSet,还有最大失败重试次数(是构建TaskSchedulerImpl的时候传进来的,如果没设置的话默认最大为4)

(3)在创建了TaskSetManager后比较关键的一点就是调用了SchedulableBuilder.addTaskSetmanager,SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个task具体运行在哪个ExecutorBackend中

schedulableBuilder他是SchedulableBuilder类型的,是整个应用程序级别调度器,我们是在TaskSchedulerImpl创建的时候就对他实例化。他本身是我们应用程序级别的调度器,他自己支持2种调度模式有FIFO(先进先出)和FAIR(公平调度)。我们的调度策略可以通过spark-env.sh中的spark.scheduler.mode进行具体的配置,默认是FIFO。

(4)在SchedulableBuilder.addTaskSetmanager之后有一个关键的步骤backend.reviveOffers()他的作用就是给DriverEndPoint发ReviveOffers消息。

运行时调用的是CoarseGrainedSchedulerbackend的reviveOffers方法。ReviveOffers是个case Object,啥也没有,因为不需要封装数据,相当于一个触发器。因为资源调度肯定是统一进行调度算法的,这里只不过相当于一个信号灯。所以只要资源变化的时候ReviveOffers就会被触发,因为不是通过ReviveOffers本身去调资源的。只不过是触发调资源的过程,为什么要触发资源,原因很简单提交新的任务过来了

(5)DriverEndPoint这个驱动程序的调度器收到ReviveOffers消息后调用makeOffers,makeOffers的目的是找出可以利用的executors。

创建WorkerOffer,WorkerOffer是个很简单的数据结构,是个case class,在这边会告诉我们具体的Executor可用的资源,可用资源是是cpu的core(为什么不考虑内存只考虑core?因为已经分配好了,基本都符合要求(内存),现在要计算,计算的核心考虑对象是cpu)

launchTasks参数中的Scheduler.resourceOffers是根据已经可用的计算资源,为每个task具体分配计算资源。这里的Scheduler就是TaskSchedulerImpl。输入是ExecutorBackend及其上可用的Cores,输出是TaskDescription的二维数组,在其中确定了每个Task具体运行在哪个ExecutorBackend。

(6)resourceOffers是如何确定task具体运行在哪个ExecutorBackend上的呢?作为参数传进来的WorkerOffer是个一维数据,里面装了所有可计算的资源,返回的是个二维数据每个任务他的数据本地性。

①首先开始循环遍历可用资源(o <- offers),然后判断如果不包含这个executor,要把executor加进去,刚才是最新请求了一下机器有哪些可用资源,executorIdToHost这是个内存数据结构

这个是数据本地性,默认情况下在一个rack,实际生产环境肯定是若干个rack。

②shuffledOffers把我们所有可用计算资源打散,以寻求计算的负载均衡。tasks中对每个worker new了一个ArrayBuffer里面是TaskDescription,每个executor可用放几个TaskDescription就可以运行多少个任务,他有多少个core就可以立即分配多少个任务。所以new一个一维数组他的长度是当前机器的cores的个数决定的。(现在创建的一维数组是说当前这个ExecutorBackend可以分配多少个task也就是可以并行运行多少个task)。

sortedTaskSets根据调度顺序,返回的是TaskSet集体的计算集合。然后遍历所有排序后的TaskSet集合,如果有可用的新的executor,我们会把executorAdded交给taskSet。

如果有新的ExecutorBackend分配给我们的job,此时会调用executorAdded来获得最新的完整的可用计算资源

③回到resourceOffers方法中(这个是TaskSchedulerImpl的),接下来这里是两层循环,追求最高最优先级本地性(首先是PROCESS_LOCAL,不行就NODE_LOCAL,不行的话就...按优先级下去)。

在resourceOfferSingleTaskSet中,获得executorId,根据他的id获得host,然后判断一下每台机器可用的计算资源,PUC_PER_TASK默认为1(就是每个任务默认用一条线程)。然后就就开始进行总路规则的计算

如果符合要求的话就执行,所以最后算的还是要靠taskSetManager的resourceOffer,他获得就是每个任务具体运行在哪里,返回的就是TaskDescription

④到taskSetManager的resourceOffer方法中

因为实质来讲DAGScheduler已经告诉我们具体任务运行在哪台机器上。DAGScheduler是从数据层面考虑preferedLocation的(从RDD的层面确定),而TaskScheduler是从具体计算Task的角度考虑计算的本地性。他们并没有冲突而且是配合在一块的,所以说TaskScheduler是更底层的调度DAGScheduler是高层的调度。DAGScheduler确定数据的本地性就是确定数据在哪台机器上,根据这个数据本地性确认计算要发生在哪台机器上,TaskScheduler有5个数据本地性原则,肯定追求PROCESS_LOCAL(考虑数据是否直接在内存中)。一个是高层,一个是底层,高层就是preferedLocation确定数据具体在哪台机器上,机器上有executor,底层就是知道数据本地性的情况下任务的本地性,所以底层是更具体的。TaskScheduler确定executor上具体的task运行的数据具体在内存中还是节点上,还是在这台机器上,这完全是2码事

(7)回到makeOffers方法中,本地两个层面,一个数据本地性,一个是计算的本地性,确定好后就launchTasks,把任务发送给executorBackend去执行。首先将二维数组进行序列化。

任务要大于128M就把任务abort丢弃

如果任务小于128M。那么启动任务,启动具体任务的是CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上。TaskSetManager是跟踪任务的,不是发送任务的

(8)CoarseGrainedExecutorBackend接收到消息之后,就反序列化交给executor

3、任务调度过程总结

(1)DAG构建完毕,提交stage给taskScheduler,taskScheduler的实例TaskSchedulerImpl执行submitTasks方法,将TaskSet加入到TaskSetManager中进行管理;

(2)SchedulerBuilder.addTaskSetManager方法SchedulerBuilder确定在哪个ExecutorBackend中(按照TaskSetManager的locality aware来确定每个task具体运行在哪个ExecutorBackend中),SchedulableBuilder是整个应用程序级别调度器有FIFO和FAIR两种调度模式;

(3)CoarseGrainedSchedulerBackend.reviveOffers给DriverEndPoint发送ReviveOffers,ReviveOffers本身是一个空的case Object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发生ReviveOffers这个消息作为触发器;

(4)DriverEndPoint收到ReviveOffers消息,然后路由到makeOffers具体的方法中,在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息);

(5)调用TaskSchedulerImpl的resourceOffers,作为参数执行launchTasks发送出去。TaskSchedulerImpl为每一个Task具体分配计算资源,输入是ExecutorBackend及其上可用的Cores,输出TaskDescription二维数组,在其中为每个Task具体运行在哪个ExecutorBackend。

 

resourceOffers是如何确定Task运行在哪个ExecutorBackend上的算法实现如下:

(1)通过Random.shuffle方法重新洗牌所有的计算资源,用于负载均衡;

(2)根据每个ExecutorBackend的Cores个数,申明声明类型为TaskDescription的ArrayBuffer数组;

(3)如果新的ExecutorBackend分配给我们的Job,此时会回调用executorAdded来获得最新的完整的可用计算资源

(4)求的最高级别的优先级本地性;

(5)通过TaskSetManager的resourceOffers最终确定每个Task具体运行在哪个ExecutorBackend的具体的本地性;

(6)通过launchTasks把任务发送给ExecutorBackend执行。

 

注意:

(1)默认Task的最大重试次数是4;

(2)spark目前支持FIFO的FAIR两种调度器;

(3)TaskScheduler中要负责为Task分配计算资源,此时程序已经具备集群中你的计算资源了,根据计算本地性原则确定Task具体运行在哪个ExecutorBackend中,每个Task默认是采用一条线程计算的;

(4)TaskDescription中已经确定好Task运行在哪个ExecutorBackend;

(5)数据本地优先级由高到底为:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY;

(6)DAGScheduler是从数据层面考虑数据本地性,而TaskScheduler是从具体计算Task角度考虑计算本地性的。也就是DAGScheduler通过preferedLocation确定数据的本地性就是确定数据在哪台机器上,根据这个数据本地性确认计算要发生在哪台机器上,TaskScheduler就是知道数据本地性的情况下任务的本地性,所以底层是更具体的;

(7)Task进行广播的时候,如果任务大于128M的话,则Task会被直接丢弃,如果小于128M的话会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473630.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1637. 两点之间不包含任何点的最宽垂直面积

文章目录1. 题目2. 解题1. 题目 给你 n 个二维平面上的点 points &#xff0c;其中 points[i] [xi, yi] &#xff0c;请你返回两点之间内部不包含任何点的 最宽垂直面积 的宽度。 垂直面积 的定义是固定宽度&#xff0c;而 y 轴上无限延伸的一块区域&#xff08;也就是高度为…

Task执行流程

1、源码走读 &#xff08;1&#xff09;当Driver中的SchedulerBackend&#xff08;Standalone模式为CoarseGrainedSchedulerBackend&#xff09;给ExecutorBackend&#xff08;Standalone模式为CoarseGrainedExecutorBackend&#xff09;发送LaunchTask之后&#xff0c;Coarse…

LeetCode 1638. 统计只差一个字符的子串数目(DP)

文章目录1. 题目2. 解题2.1 暴力枚举2.2 DP1. 题目 给你两个字符串 s 和 t &#xff0c;请你找出 s 中的非空子串的数目&#xff0c;这些子串满足替换 一个不同字符 以后&#xff0c;是 t 串的子串。 换言之&#xff0c;请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串…

Airflow简介

1、什么是Airflow Airflow 是一个 Airbnb 的 Workflow 开源项目&#xff0c;使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统&#xff0c;可以简单理解为是高级版的crontab&#xff0c;但是它解决了crontab无法解决的任务…

Flask简介与简单项目操作流程

Flask框架简介Flask诞生于2010年&#xff0c;是Armin ronacher&#xff08;人名&#xff09;用Python语言基于Werkzeug工具箱编写的轻量级Web开发框架。它主要面向需求简单的小应用。Flask本身相当于一个内核&#xff0c;其他几乎所有的功能都要用到扩展&#xff08;邮件扩展Fl…

LeetCode 1640. 能否连接形成数组(哈希)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 arr &#xff0c;数组中的每个整数 互不相同 。 另有一个由整数数组构成的数组 pieces&#xff0c;其中的整数也 互不相同 。请你以 任意顺序 连接 pieces 中的数组以形成 arr 。但是&#xff0c;不允许 对每个数组 pieces[i]…

python基本知识、数据库、网络、编程等总结

Python语言特性 1 Python的函数参数传递 看两个例子: a 1 def fun(a):a 2 fun(a) print a # 1 a [] def fun(a):a.append(1) fun(a) print a # [1] 所有的变量都可以理解是内存中一个对象的“引用”&#xff0c;或者&#xff0c;也可以看似c中void*的感觉。 通过id来看引…

LeetCode 1641. 统计字典序元音字符串的数目(DP)

文章目录1. 题目2. 解题1. 题目 给你一个整数 n&#xff0c;请返回长度为 n 、仅由元音 (a, e, i, o, u) 组成且按 字典序排列 的字符串数量。 字符串 s 按 字典序排列 需要满足&#xff1a;对于所有有效的 i&#xff0c;s[i] 在字母表中的位置总是与 s[i1] 相同或在 s[i1] 之…

LeetCode 1642. 可以到达的最远建筑(二分查找 / 优先队列贪心)

文章目录1. 题目2. 解题2.1 二分查找2.2 优先队列贪心1. 题目 给你一个整数数组 heights &#xff0c;表示建筑物的高度。另有一些砖块 bricks 和梯子 ladders 。 你从建筑物 0 开始旅程&#xff0c;不断向后面的建筑物移动&#xff0c;期间可能会用到砖块或梯子。 当从建筑…

ClickHouse高可用及副本测试

1 概述 ​ 对于默认的分布式表的配置&#xff0c;每个分片只有一份&#xff0c;这种多分片单副本集群&#xff0c;挂掉一个节点的话查询分布式表会报错。为了解决这个问题的话可以使用ClickHouse高可用集群&#xff0c;对于每个分片具有2个或2个以上的副本&#xff0c;当某个节…

阿里云Redis读写分离典型场景:如何轻松搭建电商秒杀系统

秒杀活动是绝大部分电商选择的低价促销&#xff0c;推广品牌的方式。不仅可以给平台带来用户量&#xff0c;还可以提高平台知名度。一个好的秒杀系统&#xff0c;可以提高平台系统的稳定性和公平性&#xff0c;获得更好的用户体验&#xff0c;提升平台的口碑&#xff0c;从而提…

LeetCode 756. 金字塔转换矩阵(回溯)

文章目录1. 题目2. 解题1. 题目 现在&#xff0c;我们用一些方块来堆砌一个金字塔。 每个方块用仅包含一个字母的字符串表示。 使用三元组表示金字塔的堆砌规则如下&#xff1a; 对于三元组(A, B, C) &#xff0c;“C”为顶层方块&#xff0c;方块“A”、“B”分别作为方块“…

Flask框架项目实例:**租房网站(一)

Flask是一款MVC框架&#xff0c;主要是从模型、视图、模板三个方面对Flask框架有一个全面的认识&#xff0c;通过完成作者-读书功能&#xff0c;先来熟悉Flask框架的完整使用步骤。 操作步骤为&#xff1a; 1.创建项目2.配置数据库3.定义模型类4.定义视图并配置URL 5.定义模板…

LeetCode 316. 去除重复字母 / 1081. 不同字符的最小子序列(单调栈)

文章目录1. 题目2. 解题1. 题目 LC 316&#xff1a; 给你一个字符串 s &#xff0c;请你去除字符串中重复的字母&#xff0c;使得每个字母只出现一次。需保证 返回结果的字典序最小&#xff08;要求不能打乱其他字符的相对位置&#xff09;。 示例 1&#xff1a; 输入&#…

LeetCode 809. 情感丰富的文字

文章目录1. 题目2. 解题1. 题目 有时候人们会用重复写一些字母来表示额外的感受&#xff0c;比如 "hello" -> "heeellooo", "hi" -> "hiii"。 我们将相邻字母都相同的一串字符定义为相同字母组&#xff0c;例如&#xff1a;&qu…

网站部署nginx--uwsgi

网站代码写完之后就是项目部署&#xff0c;主要包括两个方面&#xff1a; 1.nginx安装与配置&#xff1a; 1、Nginx 安装 系统平台&#xff1a;CentOS release 6.6 (Final) 64位。 一、安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl open…

天池 在线编程 滑动数独(滑动窗口)

文章目录1. 题目2. 解题1. 题目 描述 给定一个 3xn的矩阵 number&#xff0c;并且该矩阵只含有1到9的正整数。 考虑有一个大小为 3x3 滑动窗口&#xff0c;从左到右遍历该矩阵 number&#xff0c; 那么该滑动窗口在遍历整个矩阵的过程中会有n-2个。 现在你的任务是找出这些滑…

TIGK监控平台介绍

1 概述 众所周知监控平台对大数据平台是非常重要的&#xff0c;监控是故障诊断和分析的重要辅助利器&#xff0c;在发生事故之前就能预警&#xff0c;最大限度降低系统故障率。   监控系统我们可以分为业务层面&#xff0c;应用层面&#xff0c;系统层面 1.1 业务层面 业务系…

天池 在线编程 队列检查(排序)

文章目录1. 题目2. 解题1. 题目 描述 班上的学生根据他们的年级照片的身高升序排列&#xff0c;确定当前未站在正确位置的学生人数 数组长度 < 10^5示例 输入: heights [1,1,3,3,4,1]输出: 3解释: 经过排序后 heights变成了[1,1,1,3,3,4]&#xff0c;有三个学生不在应在…

celery异步执行任务在Django中的应用实例

1. 创建django项目celery_demo, 创建应用demo: django-admin startproject celery_demo python manage.py startapp demo2.在celery_demo模块中创建celery.py模块, 文件目录为: celery.py模块内容为: from celery import Celery from django.conf import settings import os#…