Spark资源调度分配

1、任务调度与资源调度

任务调度:是指通过DAGScheduler,TaskScheduler,SchedulerBackend等进行的作业调度。

资源调度:是指应用程序获取资源。

任务调度是在资源调度的基础上,没有资源调度,那么任务调度就没有任何意义了。

2、分配Driver(只对cluster模式有效)

Spark的Driver的运行有2种模式,一种是Client模式(Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出),一种是cluster模式(真正的Driver就会在worker中的一台机器上,在哪台有Master决定)。

schedule的调用时机:每次有新的应用程序提交或者集群资源发生改变的时候(包括Executor的增加或减少,Worker的增加或减少)。

资源调度肯定是在master里面,因为master负责资源调度,每次资源变动或者注册程序,或新任务提交等一大堆的东西都会产生schedule()的调用,如RegisterApplication里面最后一行就是schedule()

进入schedule方法里面,为我们当前等待的应用程序调度当前可用的资源(每当一个新的应用程度进来的时候这个方法都会被调用)。或者说资源的可用状况改变,例如说那个executor挂掉了,或者worker挂掉了,或者新增加了机器,

(1)首先是判断master的状态,如果不是ALIVE的状态,这个资源调度就无从谈起,直接返回,也就是说Standby 的master不会进行application资源的调用。所以master必须是ALIVE的方式采用进行资源的调度。

(2)Random.shuffle()是将worker打乱打乱有利于负载均衡,workers是HashSet的数据结构里面是WorkInfo,WorkInfo里面的内容是注册的时候把这些信息注册进来的。shuffle方法中,打乱之前要判断一下要工作就必须要让worker的state是ALIVE的,所以就判断所有Worker中哪些是ALIVE级别的,ALIVE才能参与资源的分配工作

(3)在shuffle方法中,他的随机打乱是首先构建一个ArrayBuffer,把所有的worker都放进去,这个函数内部有定义了swap函数(这个函数是将两个索引上的位置进行交换)。然后从ArrayBuffer中最后一个元素开始一直到2就是第3个索引,每次都减1.nextInt(n)是取出0到n-1里面的一个整数。然后将取出的索引与n-1索引位置的替换,这样的话顺序就特别的乱。

(4)然后将这个shuffledAlivedWorkers的长度赋值到numWorkerAlive,并定义变量curPos

(5)遍历waitingDrivers队列,也就是等待被调度的Driver队列,使其的launched值为false,定义numWorkerVisited的值为0。waitingDrivers就是一个ArrayBuffer里面存放DriverInfo,DriverInfo里面的DriverDescription里面有supervise是因为如果是cluster模式,submit的时候有指定supervise在driver挂掉的时候会自动重启。

注意:循环遍历等待启动的driver(cluster模式才有driver等待启动,如果是client模式是不需要等待启动driver的因为你提交driver就启动了)。

(6)进行循环判断,条件为状态为Alive的worker数量大于0并且是没有被调度的launched=fasle,那么将这些worker进行shuffle,被调用的worker的数量(numWorkerVisited)的值加1,进一步判断,如果worker的剩余内存大于等于driver的进行调度所需的内存(driver.desc.mem)并且worker的剩余CPU数量大于等于driver调度所需的CPU数量(driver.desc.cores),(DriverInfo里面的DriverDescription中有当前Driver启动是需要的内存和cpu等要求的内容)。调用launchDriver方法,将这个Driver从driver的等待队列移除,并设置launched的值为true。curPos= (curPos + 1) % numWorkersAlive是将指针指向下一个worker。

(7)符合要求之后launchDriver,launch到循环时候的一个worker中去了,而这个worker是Shuffle之后的for循环随机产生的一个worker,因为每次都调用Shuffle所以顺序不一样,所以我们的driver放到这个worker上,这就保证负载均衡

launchDriver中首先先打印一个log,然后就是worker.addDrievr(driver)这个worker是当前master中对这个集群元数据的一个描述,需要保存元数据现在还是在master上。然后driver.worker = Some(worker),driver说明自己在哪个worker上,相互引用。然后就是关键点了worker.endpoint.send(LaunchDriver(driver.id, driver.desc)),这个就是发远程消息给worker,消息通信远程rpc。所以master发指令给worker,让远程的worker具体启动executor。启动之后driver的state就变成running了。

注意:所有schedule的时候首先就是进行所有driver级别的launch,这说明一件事情要现有driver才有其他的。

3、为Application分配资源

(1)在schedule中继续,最后一行是startExecutorsOnWorkers,为Application分配资源并启动Executors。

调度和启动Executor在Worker上为我们具体的当前程序。默认使用FIFO的方式,就是先满足第一个应用程序,再满足第二个应用程序...。Spark默认为应用程序启动Executor的方式是FIFO的方式,也就是所有提交的应用程序都是放在调度的等待队列中的,先进先出,只有满足了前面应用程序的资源分配的基础上才能满足下一个应用程序资源的分配。

我们现在是Master类中,Master直接调这个方法,但是具体在哪些Executor上启动executor还不知道。for循环waitingApps,要求app.coresLeft>0

equestedCores:总共需要的核数(--total-executor-cores指定,默认Int的最大值)

coresGranted:已经分配的值

假设整个程序要求1000个core,但是现在可用的只有100个,不能立即满足,所以可能在等待队列中,我们要看一下coresLeft(还需要的cores),如果>0就就还需要调度了,如果不大于0也就是不需要core就不会为应用程序分配executor

进入到for循环中这个是应用程序在提交的时候会配置很多参数,这里说每个executor需要的core有多少。过滤出worker是ALIVE的状态(才能分配executor),然后worker的内存要大于配置内存,然后进行排序,谁的core多就排在前面。排序后就产生可用的useableWorkers。

现在不应该考虑数据本地性,因为不是资源分配的内容,这个不是job调度,还没到计算,不考虑数据本地性。

(2)进入到scheduleExecutorsOnWorkers方法中

这个注释的意思:具体调度在这个worker上启动executor;

返回的是一个数组包含到每个worker上的赋值的具体的cores;

有2种启动worker的方式,第一个方式是spread out尝试把一个应用程序运行在尽可能多的worker上(我们把executor运行在尽可能多的node上是更符合数据本地性的表现,做基础建设的时候是这么考虑的,因为数据有可能在所有的worker上,这其实也有风险也可能有500台机器,数据存在200台机器上,但是这个是默认的情况,默认层面上从资源调度的层面上考虑最大化的数据本地性,调度层面上必须决定这个代码具体运行在哪几台机器上 。这个数据本地性只是顺便带来的,因为这样更好的响应了并发处理能力,不是考虑数据的本地性,所以是潜在的数据本地性);

第二种方式是把我们当前应用程序运行在尽可能少的worker上。在为每一个executor分配多少个core是可配置的,可以在submit的时候设置。在一个worker下面可能会有多个executor,前提是worker上有足够的cores和memory的时候。否则的话默认情况下分配一个executor把当前worker上所有的cores都拿走了。

一次在我们的executor上分配多个core是非常重要的,我们的集群现在4个worker,每个worker有16个cores。我们的用户要求3个executor,配置的时候最大可以48个,每个executor16个。

大致意思是说有两种分配模型,第一种是将executors分配到尽可能多的workers上;第二种与第一种相反。默认使用的是第一种模型,这种模型更加符合数据的本地性原则,为每个Executor分配的cores的个数是可以进行配置的(spark-submit 或者 spark-env.sh),如果设置了,多个executors可能会被分配在一个worker上(前提是该worker拥有足够的cores和memory),否则每个executor会充分利用worker上的cores,这种情况下一个executor会被分配在一个worker上。具体在集群上分配cores的时候会尽可能的满足我们的要求,如果需要的cores的个数大于workers中空闲的cores的个数,那么就先分配空闲的cores,尽可能的去满足要求。

spreadOutApps是让我们的应用程序尽可能多的运行在所有的node上

下面具体进入到scheduleExecutorsOnWorkers代码中

(3)回到startExecutorsOnWorkers方法中现在还没有真正的发生调度,在这里获得元数据信息之后,前面决定了在哪些机器上分配多少个Executor,每个Executor上分配多少个cores,下面就开始循环找出可用的workers

这里就具体直接分配了,就是先决定后分配

分配肯定是远程通信,循环遍历要分配个数的executor,把要分配的executor元数据信息交给addExecutor没后发送给worker

4、总结

(1)Master在接收搭配RegisterApplication之后创建一个applicationInfo对象 ,然后registerApplication方法中将application加入到队列,持久化application ,发送RegisteredApplication消息到ClientEndPoint,ClientPoint接收到消息后设置registered=true ,最后schedule()进行资源分配;

(2)在schedule()中,先打乱可以使用的workers,主要看在这个worker上内存和cpu的cores满不满足启动一个driver,满足的话则在这个worker上启动一个driver,直到找到一个合适的worker,然后launchDriver启动Driver,然后调用startExecutorsOnWorkers;

(3)在startExecutorsOnWorkers 中,首先筛选出可以使用的workers,主要看内存和cpu ,然后调用shceduleExecutorsOnWorkers得到要分配的信息 ,shceduleExecutorsOnWorkers方法中主要是①获取当前的app还有多少cpuCore没有被分配 ②筛选出可以用来启动executor的workers saber在筛选出来的worker上面进行executor的分配的信息的记载在assignedCores中 并返回;

(4)根据上面得到的要分配的信息,调用allocateWorkerResourceToExecutors()在每一个worker上面分配资源启动executor 。在allocateWorkerResourceToExecutors方法中主要是①计算在当前的worker上启动几个executor ②计算在当前的worker上一个executor会分配几个cpuCores ③调用launchExecutor进行启动executor

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473651.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

两个栈实现队列与两个队列实现栈

1. 两个栈实现队列 实现一 思路 s1是入栈的,s2是出栈的。 入队列,直接压到s1是就行了出队列,先把s1中的元素全部出栈压入到s2中,弹出s2中的栈顶元素;再把s2的所有元素全部压回s1中 实现二 思路 s1是入栈的&#xff0c…

ACwing 5. 多重背包问题 II(二进制拆分+DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包。 第 i 种物品最多有 si 件,每件体积是 vi,价值是 wi。 求解将哪些物品装入背包,可使物品体积总和不超过背包容量,且价值总和最大。 输出最大价值。 输入格式…

排序:冒泡排序与选择排序

冒泡排序 冒泡排序(英语:Bubble Sort)是一种简单的排序算法。它重复地遍历要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。遍历数列的工作是重复地进行直到没有再需要交换,也就是说该…

Spark Master的注册机制与状态管理

目录 1、Master接收注册的主要对象 2、Master接收Worker的注册 3、Master接收Driver的注册 4、Master处理Driver状态变化 5、Master接收Application的注册 6、Master处理Executor状态变化 1、Master接收注册的主要对象 Master主要接受注册的对象是:Applicatio…

排序:插入排序与希尔排序

插入排序 插入排序(英语:Insertion Sort)是一种简单直观的排序算法。它的工作原理是通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入。插入排序在实现上,在从…

jieba分词提取小说人名

文章目录1. 读入文本2. 分词3. 计数4. 排序5. 添加用户字典以《神雕侠侣》为例: 使用 jieba.posseg获取词性,人名的词性为 nr 1. 读入文本 import jieba.posseg as psg with open(shendiaoxialv.txt,encodingutf-8) as f:text f.readlines()print(te…

Spark Worker源码

目录 1、概述 2、LaunchDriver 3、LaunchDriver 4、总结 1、概述 worker肯定是实现RPC通信的,否则别人没法给你发消息。他继承的是ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是线程安全的,意味着处理一条消息完成后再处理下一个消息。换…

排序:快速排序与归并排序

快速排序 快速排序(英语:Quicksort),又称划分交换排序(partition-exchange sort),通过一趟排序将要排序的数据分割成独立的两部分,其中一部分的所有数据都比另外一部分的所有数据都…

LeetCode 764. 最大加号标志(DP)

文章目录1. 题目2. 解题1. 题目 在一个大小在 (0, 0) 到 (N-1, N-1) 的2D网格 grid 中,除了在 mines 中给出的单元为 0,其他每个单元都是 1。网格中包含 1 的最大的轴对齐加号标志是多少阶?返回加号标志的阶数。如果未找到加号标志&#xff…

机器学习基础—Kaggle泰坦尼克预测(完整分析)

1.引言 我们先找个简单的实际例子,来看看,所谓的数据挖掘或者机器学习实际应用到底是怎么样一个过程。 2.背景 2.1 关于Kaggle Kaggle是一个数据分析建模的应用竞赛平台,有点类似KDD-CUP(国际知识发现和数据挖掘竞赛)&…

Spark Executor解析

目录 1、Spark Executor如何工作 2、Spark Executor工作源码 1、Spark Executor如何工作 当Driver发送过来Task的时候,其实是发送给CoarseGrainedExecutorBackend这个RPCEndpoint,而不是直接发送给Executor(Executor由于不是消息循环体永远…

LeetCode 381. O(1) 时间插入、删除和获取随机元素 - 允许重复(vector + 哈希)

文章目录1. 题目2. 解题1. 题目 设计一个支持在平均 时间复杂度 O(1) 下, 执行以下操作的数据结构。 注意: 允许出现重复元素。 insert(val):向集合中插入元素 val。remove(val):当 val 存在时,从集合中移除一个 val。getRando…

Stage划分和Task最佳位置

目录 1、Job Stage划分 2、Task最佳位置 3、总结 3.1 Stage划分总结: 3.2 Task最佳位置总结: 1、Job Stage划分 Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是…

LeetCode 1636. 按照频率将数组升序排序(哈希+排序)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 nums ,请你将数组按照每个值的频率 升序 排序。 如果有多个值的频率相同,请你按照数值本身将它们 降序 排序。 请你返回排序后的数组。 示例 1: 输入:nums [1,1,2,2,2,3] 输出…

TaskScheduler相关

目录 1、DAGScheduler与TaskScheduler 2、TaskScheduler与SchedulerBackend 3、任务调度过程总结 1、DAGScheduler与TaskScheduler DAGScheduler面向我们整个Job划分出了Stage,划分了Stage是从后往前划分的,执行的时候是从前往后,每个Stag…

LeetCode 1637. 两点之间不包含任何点的最宽垂直面积

文章目录1. 题目2. 解题1. 题目 给你 n 个二维平面上的点 points ,其中 points[i] [xi, yi] ,请你返回两点之间内部不包含任何点的 最宽垂直面积 的宽度。 垂直面积 的定义是固定宽度,而 y 轴上无限延伸的一块区域(也就是高度为…

Task执行流程

1、源码走读 (1)当Driver中的SchedulerBackend(Standalone模式为CoarseGrainedSchedulerBackend)给ExecutorBackend(Standalone模式为CoarseGrainedExecutorBackend)发送LaunchTask之后,Coarse…

LeetCode 1638. 统计只差一个字符的子串数目(DP)

文章目录1. 题目2. 解题2.1 暴力枚举2.2 DP1. 题目 给你两个字符串 s 和 t ,请你找出 s 中的非空子串的数目,这些子串满足替换 一个不同字符 以后,是 t 串的子串。 换言之,请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串…

Airflow简介

1、什么是Airflow Airflow 是一个 Airbnb 的 Workflow 开源项目,使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统,可以简单理解为是高级版的crontab,但是它解决了crontab无法解决的任务…

Flask简介与简单项目操作流程

Flask框架简介Flask诞生于2010年,是Armin ronacher(人名)用Python语言基于Werkzeug工具箱编写的轻量级Web开发框架。它主要面向需求简单的小应用。Flask本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Fl…