Spark的动态资源分配算法

文章目录

  • 前言
  • 基于任务需求进行资源请求的整体过程
  • 资源申请的生成过程详解
    • 资源申请的生成过程的简单例子
    • 资源调度算法的代码解析
  • 申请资源以后的处理:Executor的启动或者结束
    • 对于新启动的Container的处理
    • 对于结束的Container的处理
  • 基于资源分配结果进行任务调度
    • PendingTask的生成:
      • TaskLocation详解
      • 根据TaskLocation信息,将Task添加到不同的pendingTask数组中
    • 可用的LocationLevel的计算
    • 基于Locality的Task调度
  • 结语

前言

在TODO这篇文章中,介绍了Spark RPC通信的基本流程。可以看到,Spark中的Driver通过Stage调度生成了物理执行计划,这个物理执行计划包括了所有需要运行的Task,以及,最关键的,这些Task希望运行的节点信息,我们叫做Locality Preference,即本地性偏好。
但是在Yarn的场景下,资源是以Executor(Container)为单位进行调度,整个Container的粒度与Task不一一对应,Container的生命周期与Task不一一对应。在这种场景下,动态资源调度的基本任务就是

  • 根据这些Task以及Task的本地性偏好,向Yarn申请Container作为Executor以执行Task;这里最重要的,是在资源申请中表达出对应的Task的位置偏好。
  • 在申请完资源以后,根据Task的本地性偏好,将Task调度到申请到的资源上面去。这里最重要的,是尽量满足Task的本地性偏好。

本文将详细讲解这个过程。
对于资源申请算法的基本流程,以及将Task和资源进行匹配的基本流程,本文都用实际例子进行讲解。

基于任务需求进行资源请求的整体过程

向Yarn请求资源是由客户端向ApplicationMaster申请,然后ApplicationMaster向Yarn发起请求的,而不是客户端直接向Yarn申请的。
资源是为了服务于Task的运行,Task的生成显然是Driver端负责的,Driver会根据物理执行计划生成的Task信息发送给ApplicationMaster,ApplicationMaster根据这些Task的相关信息进行资源申请。

ApplicationMaster启动以后,会有一个独立线程不断通过调用YarnAllocator.allocateResources()进行持续的资源更新(查看ApplicationMaster的launchReporterThread()方法)。这里叫资源更新,而不叫资源申请,因为这里的操作包括新的资源的申请,旧的无用的Container的取消,以及Blocklist Node的更新等多种操作。
总而言之,ApplicationMaster作为客户端和Yarn的中间方,其资源申请的方法allocateResource()在逻辑上的功能为:

  1. 粒度转换: 将Task级别的资源请求,转换为Container(Executor级别的资源请求)。这是一个游戏到粗的粒度的转换。
  2. 维度转换: Driver发过来的资源请求是资源的最终全局状态,而Yarn 的 API 要求的针对每一个Container进行增量请求。因此,allocateResources()会将Driver发送过来资源请求的最终状态,对比当前系统已经运行、分配未运行、已经发送请求但是还没有分配资源等等已经存在的状态,确定一个发送给Yarn的增量请求状态。这是一个全量到增量的维度的转换。
  3. 角度转换: Driver发过来的每个Task都带有各自Task的Locality,而发送给Yarn的Container请求又是带有Locality需求的Container需求。这是一个从Task到Container的角度的转换。

ApplicationMaster端的allocateResources()方法的基本流程在代码YarnAllocator.allocateResources()中:

   ---------------------------------- YarnAllocator ------------------------------------def allocateResources(): Unit = synchronized {updateResourceRequests() // 与Yarn进行资源交互val allocateResponse = amClient.allocate(progressIndicator) // 从Yarn端获取资源结果,包括新分配的、已经结束的等等val allocatedContainers = allocateResponse.getAllocatedContainers()handleAllocatedContainers(allocatedContainers.asScala) // 处理新分配的Containerval completedContainers = allocateResponse.getCompletedContainersStatuses() // 处理已经结束的ContainerprocessCompletedContainers(completedContainers.asScala)}}

ApplicationMaster端的allocateResources()的基本流程如下图所示:
在这里插入图片描述

  1. 生成资源请求,即将Driver发送过来的全量的、Task粒度的资源请求和Host偏好信息,转换为对Yarn的、以Executor为粒度的资源请求
    updateResourceRequests()
    
  2. 将资源请求发送给Yarn并从Yarn上获取分配结果(基于Yarn的异步调度策略,这次获取的记过并非本次资源请求的分配结果)以进行后续处理:
    val allocatedContainers = allocateResponse.getAllocatedContainers()
    handleAllocatedContainers(allocatedContainers.asScala)
    val completedContainers = allocateResponse.getCompletedContainersStatuses()
    processCompletedContainers(completedContainers.asScala)
    

可以看到,updateResourceRequests()是资源请求的核心方法,它会负责同Yarn进行通信以进行资源请求。
在TODO中,我们也介绍过,生成资源请求,其决策过程发生在方法updateResourceRequests()中。我们主要来看updateResourceRequests()方法:

   ---------------------------------- YarnAllocator ------------------------------------def updateResourceRequests(): Unit = {// 获取已经发送给Yarn但是待分配的ContainerRequest,计算待分配容器请求的数量// 这些ContainerRequest是之前通过调用amClient.addContainerRequest 发送出去的val pendingAllocate = getPendingAllocateval numPendingAllocate = pendingAllocate.size// 还没有发送请求的executor的数量val missing = targetNumExecutors - numPendingAllocate -numExecutorsStarting.get - numExecutorsRunning.get// 还没有发送给Yarn的资源请求if (missing > 0) {      /*** 将待处理的container请求分为三组:本地匹配列表、本地不匹配列表和非本地列表。*/val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(hostToLocalTaskCounts, pendingAllocate)// staleRequests 的意思是,ApplicationMaster已经请求了这个Container,// 但是这个ContainerRequest所要求的hosts里面没有一个是在 hostToLocalTaskCounts (即task所倾向于)中的,因此,需要取消这个Container Request,因为已经没有意义了// cancel "stale" requests for locations that are no longer neededstaleRequests.foreach { stale =>amClient.removeContainerRequest(stale)}val cancelledContainers = staleRequests.size// consider the number of new containers and cancelled stale containers available// 将新的container请求,以及刚刚取消的container,作为available containerval availableContainers = missing + cancelledContainers// to maximize locality, include requests with no locality preference that can be cancelled// 在availableContainers的基础上,再算上没有任何locality要求的并且还没有分配成功的containerval potentialContainers = availableContainers + anyHostRequests.size// LocalityPreferredContainerPlacementStrategy,计算每一个Container 的Node locality和 Rack localityval containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,allocatedHostToContainersMap, localRequests)val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]// 遍历ContainerLocalityPreferences数组中的每一个ContainerLocalityPreferencescontainerLocalityPreferences.foreach {case ContainerLocalityPreferences(nodes, racks) if nodes != null =>newLocalityRequests += createContainerRequest(resource, nodes, racks)// 根据获取的locality,重新创建ContainerRequest请求}// 除了有locality需求的container以外,还有更多的available container需要被请求,因此对这些container请求也发送出去if (availableContainers >= newLocalityRequests.size) {// more containers are available than needed for locality, fill in requests for any hostfor (i <- 0 until (availableContainers - newLocalityRequests.size)) {newLocalityRequests += createContainerRequest(resource, null, null) // 构造ContainerRequest对象}} else {val numToCancel = newLocalityRequests.size - availableContainers// cancel some requests without locality preferences to schedule more local containersanyHostRequests.slice(0, numToCancel).foreach { nonLocal =>amClient.removeContainerRequest(nonLocal)}}} else if (numPendingAllocate > 0 && missing < 0) {val numToCancel = math.min(numPendingAllocate, -missing)val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)matchingRequests.iterator().next().asScala.take(numToCancel).foreach(amClient.removeContainerRequest)}}

其基本过程为:

  1. 获取当前Pending的request(已经发送给Yarn但是还没有分配Container的请求),并将这些Pending的请求按照本地性的需求进行切分。这里的基本意图是,当前收到了来自Driver的全局的资源状态信息,而在Yarn上还有一部分之前的资源请求还没有分配Container,那么,会不会这些Pending Requewt中有些Request已经不需要了(满足不了任何一个task的locality需求)

          val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(hostToLocalTaskCounts, pendingAllocate)
    

    切分的过程,就是查看当前在Yarn这一端pending的所有的Container的locality与我们目前需求的所有(全局)的Task的locality的交集:

    -------------------------------------- YarnAllocator ----------------------------------private def splitPendingAllocationsByLocality(hostToLocalTaskCount: Map[String, Int], // 每一个host到希望分配上去的task的数量pendingAllocations: Seq[ContainerRequest] // 还没有分配出去的ContainerRequest): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = {val localityMatched = ArrayBuffer[ContainerRequest]()val localityUnMatched = ArrayBuffer[ContainerRequest]()val localityFree = ArrayBuffer[ContainerRequest]()val preferredHosts = hostToLocalTaskCount.keySet// 将当前已经发送给Yarn但是还没有分配的Container的请求进行切分pendingAllocations.foreach { cr =>val nodes = cr.getNodes // 这个 ContainerRequest 对节点的要求if (nodes == null) {localityFree += cr // 这个ContainerRequest对nodes没有要求,那么就是对本地性没有要求的Container请求} else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) { // 这个Container的本地性要求和task期望分配的hosts集合有交集localityMatched += cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去} else { // 这个Container的本地性要求和task期望分配的hosts集合没有交集localityUnMatched += cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去}}// 切分结果 (localRequests, staleRequests, anyHostRequests)(localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)}}
    

    切分过程的具体流程如下图所示:
    请添加图片描述

    切分的具体流程为 :

    • 如果这个Pending Container没有任何的locality要求,那么就是localityFree Container,即,其实际分配的位置有可能是当前所有tasks所希望的位置,也可能不是,那么这个container就是localityFree container
      if (nodes == null) {localityFree += cr // 这个ContainerRequest对nodes没有要求,那么就是对本地性没有要求的Container请求} 
      
    • 如果这个Pending Container有locality 要求,并且这个locality的nodes与当前所有tasks有交集,那么这个Pending Container就被划分为localityMatched,显然,这个Pending Container是不应该被取消的;
      else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) { // 这个Container的本地性要求和task期望分配的hosts集合有交集localityMatched += cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去}
      
    • 如果这个Pending Container有locality要求,但是这个locality中的nodes不在当前所有tasks的locality中的任何一个节点,即这个Pending Container实际分配的位置不可能是任何一个task所倾向于的位置,那么这个Pending Container就是localityUnMatched,显然,localityUnMatched container目前无法放置任何一个task,需要取消掉;
      else { // 这个Container的本地性要求和task期望分配的hosts集合没有交集localityUnMatched += cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去}
      
  2. 对于localityUnmatched container,向Yarn发送请求,取消这种Container,这些被取消的Container在后面会重新申请,以便在申请的资源总量不变的情况下增强资源的本地特性:

     staleRequests.foreach { stale =>amClient.removeContainerRequest(stale)}
    
  3. 计算总的Container的数量,包括:

    • Pending Container中刚刚cancel的container的数量,这些Container刚刚取消了,我们可以再次申请这些Container,但是肯定会增强这些新的资源请求的locality,以最大化我们的Task的locality
    • Pending Container中的locality free的Container数量,这些Container可能分配在集群中的任何地方
    • 新增(missing)的Container请求,即当前的总的container请求中除去正在运行(已经有task在运行,numExecutorsRunning)和正在启动(已经分配但是还没分配task,numExecutorsStarting)的,再除去所有的pending的container(numPendingAllocate,是从Yarn的API中获取的数量,已经请求但是还没有分配成功的资源),多出来的Container:
    	// 还没有发送请求的executor的数量val missing = targetNumExecutors - numPendingAllocate -numExecutorsStarting.get - numExecutorsRunning.get .....// consider the number of new containers and cancelled stale containers available// 将新的container请求,以及刚刚取消的container,作为available containerval availableContainers = missing + cancelledContainers// to maximize locality, include requests with no locality preference that can be cancelled// 在availableContainers的基础上,再算上没有任何locality要求的并且还没有分配成功的containerval potentialContainers = availableContainers + anyHostRequests.size
    

    在这里,val availableContainers = missing + cancelledContainers,即available container代表这次可以增量申请的最大的container数量,包括了这次的额外需求,以及刚刚取消的container(取消的container可以重新申请)

  4. 构建Container请求。这里会根据LocalityPreferredContainerPlacementStrategy的localityOfRequestedContainers来构建Container请求,返回Array[ContainerLocalityPreferences],每一个ContainerLocalityPreferences代表了一个带有对应host和rack信息的Container请求:

       val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,allocatedHostToContainersMap, localRequests)
    
  5. 根据ContainerLocalityPreferences,转换成Yarn的 ContainerRequest

       containerLocalityPreferences.foreach {case ContainerLocalityPreferences(nodes, racks) if nodes != null =>newLocalityRequests += createContainerRequest(resource, nodes, racks)// 根据获取的locality,重新创建ContainerRequest请求case _ =>}
    
  6. 如果可以申请的Container(available container)的数量大于刚刚计算完locality的Container数量,那么,为了将申请配额用尽,就再申请其相差的部分Container, 保证申请的Container的数量不小于Available Container的数量。

     if (availableContainers >= newLocalityRequests.size) {// more containers are available than needed for locality, fill in requests for any hostfor (i <- 0 until (availableContainers - newLocalityRequests.size)) {newLocalityRequests += createContainerRequest(resource, null, null) // 构造ContainerRequest对象}}
    
  7. 如果可以申请的Container(available container)的数量小于刚刚计算完locality的Container数量,那么需要取消一部分container:

    else {val numToCancel = newLocalityRequests.size - availableContainers// cancel some requests without locality preferences to schedule more local containersanyHostRequests.slice(0, numToCancel).foreach { nonLocal =>amClient.removeContainerRequest(nonLocal)}
    
  8. 调用Yarn的标准接口addContainerRequest(),将ContainerRequest发送给Yarn(其实这个接口并不会真正将请求发送出去,只会存放在RMAMClient端,真正发送是通过allocate()接口):

    newLocalityRequests.foreach { request =>amClient.addContainerRequest(request)
    } // 在这里发送container的请求,从日志来看,资源请求已经发出来了,Yarn已经处理了
    

所以,从上面可以看到,最关键的方法是LocalityPreferredContainerPlacementStrategy.localityOfRequestedContainers()方法,它根据当前的已有信息(总共的Container需求,有locality需求的task的数量,这些locality分布在每一个task上的数量等),生成一个Array[ContainerLocalityPreferences]数组,数组中的每一个元素代表了一个Container的需求,并包含了其locality的要求信息,然后基于生成的ContainerLocalityPreferences经过转换成ContainerRequest,发送给Yarn。

资源申请的生成过程详解

资源申请的生成,就是根据当前集群运行的基本情况,Task的基本需求,生成Yarn上的资源请求的过程。

资源申请的生成过程的简单例子

在了解其具体实现以前,我们以具体例子的方式,看一下localityOfRequestedContainers()方法的基本实现逻辑,从而对其动机和达成的效果有一个很好的理解,然后,我们再看其实现细节。

  1. 从任务调度去看,看到的是Task以及每个 Task的Locality倾向。比如,现在我们一共需要为30个Task分配资源,其中,20个Task的locality倾向为Host1,Host2,Host3,10个Task的Locality倾向为Host1, Host2, Host4, 因此,对应到每个Host上的Task权重如下表所示:

    Host 1Host 2Host 3Host 4
    20 Tasks202020
    10 Tasks101010
    Sum of Tasks30302010

    即,20个Task希望分配在Host1,Host2,Host3中的任何一个,有10个Task希望分配在Host1, Host2, Host4中的任何一个。如上表所示,综合来看,所有Task在四台机器上分配的权重是(30, 30, 20,10)

  2. 假设一个Task需要的vCore是1,而一个Container(Executor)有2个vCore,因此,转换成Container以后的结果如下表所示:

    Host 1Host 2Host 3Host 4
    20 Tasks202020
    10 Tasks101010
    Sum of Tasks30302010
    Sum of Containers1515105

    上面的Sum of Containers的数字只是表示一个比例值,并不表示对应的Host上实际需要申请的Container的数量,我们实际需要的总的Container数量才15个。那么,这15个Container需求平均到每台Host上是多少呢?

  3. 比如Host 1的Sum of Container 为15, 所有Host的Sum of Container 是45,因此占比是1/3,所以平均下来分配到Host1上的Container数量应该是 15 * 1/3 = 5。经过向上取整(宁可稍微多分配也不要少分配)以后,每台机器所平均到的15个Container需求是:

    Host 1Host 2Host 3Host 4
    20 Tasks202020
    10 Tasks101010
    Sum of Tasks30302010
    Sum of Containers1515105
    Allocated Container Target5542
  4. 在这里,计算完总的Allocated Container Target以后,需要减去当前已经在该Host上已经存在(正在运行或者在这个Host上pending的Container),因为我们最终发送给Yarn的Container请求是增量请求。假设现在在每一个Host上已经存在的Container数量都是1,即15个Container中有4个Container是已经分配的,那么,减去已经存在的Container数量以后的结果如下表所示,所以,我们需要新申请12个Container:

    Host 1Host 2Host 3Host 4
    20 Tasks202020
    10 Tasks101010
    Sum of Tasks30302010
    Sum of Containers1515126
    Allocated Container Target5542
    Newly Allocated Container4431
  5. 将每个Host的Newly Allocated Container按照比例进行缩放,保证比例最大的那个Host(这里是Host1 和 Host 2)的比例值是需要新申请的Container的数量。在这里,扩大因子应该是 12(Container的总数量)/4(比例最大的Host的Average Allocated Container) = 3 :

    Host 1Host 2Host 3Host 4
    20 Tasks202020
    10 Tasks101010
    Sum of Tasks30302010
    Sum of Containers1515105
    Allocated Container Target5542
    Newly Allocated Container4431
    Round Up121293
  6. 开始发起资源请求。每一个Container请求的Locality中包含的Host如下表所示:

    Host 1Host 2Host 3Host 3
    3 Containers
    6 Containers
    3 Containers

    其含义是:

    • 3个Container请求的Locality是[Host1, Host2, Host3, Host4],即请求Yarn分配3个Container,并且尽量将它们分配在这4个Hosts中。此时剩下的Host比例为[9:9:6:0]
    • 6个Container请求的Locality是[Host1, Host2, Host3],即请求Yarn分配6个Container,并且尽量将它们分配在这3个Hosts中,此时剩下的Host中container比例为[3:3:0]
    • 3个Container请求的Locality是[Host1, Host2],即请求Yarn分配3个Container,并且尽量将它们分配在这2个Hosts中

    这样,所有Host的Container比例就是12:12:9:3,平均到12个需分配的Container以后的比例是4:4:3:1,再加上已经分配在每个host上的1个Container,那么总的Container在每个Host上的比例就是5:5:4:2,这个比例和我们直接根据每个Host的task比例折算成的Container的比例15:15:10:5是大致相近的。
    到了这里,我们可以理解了,为什么我们需要在 步骤5 做Round Up操作,并且Round Up的目标是将目前比例值最大的Host的比例值扩大为当前Container需求的最大值? 因为在步骤6中生成Container请求的时候,比例值最大的Host的比例值肯定是等于需要申请的Container数量的。

资源调度算法的代码解析

上面以实际例子解释了Spark将当前的Task的Locality需求信息转换成Yarn的资源请求的细节。下面,我们结合代码,详细看一下localityOfRequestedContainers()方法的实现细节:

  def localityOfRequestedContainers(numContainer: Int, // 需要进行计算的container的数量,包括missing的,cancel掉的(本地性不符合任何task要求的pending container),以及对本地性没有要求的pending的containernumLocalityAwareTasks: Int, // 对locality有要求的task的数量,这个是Driver端通过stageIdToExecutorPlacementHints计算然后通过RequestExecutor传递过来的hostToLocalTaskCount: Map[String, Int], // 在Stage提交了以后,这个map里面保存了从host到期望分配到这个host的task的数量,这个是Driver端通过stageIdToExecutorPlacementHints传递过来的allocatedHostToContainersMap: HashMap[String, Set[ContainerId]], // 已经launch起来的host -> container的映射关系localityMatchedPendingAllocations: Seq[ContainerRequest] // 对本地性有要求的pending的container): Array[ContainerLocalityPreferences] = {//  预期的从host到期望在上面再launch的新的container数量的映射关系val updatedHostToContainerCount = expectedHostToContainerCount(numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,localityMatchedPendingAllocations)// 希望再launch的所有Host上的container的数量之和,在这里的例子中,是15val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum// The number of containers to allocate, divided into two groups, one with preferred locality,// and the other without locality preference.//  没有locality需求的container的数量val requiredLocalityFreeContainerNum =math.max(0, numContainer - updatedLocalityAwareContainerNum)//  有locality需求的container的数量val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNumval containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()if (requiredLocalityFreeContainerNum > 0) { // 如果有container是没有locality需求的for (i <- 0 until requiredLocalityFreeContainerNum) {containerLocalityPreferences += ContainerLocalityPreferences( // 为这些没有locality需求的container一一创建container需求null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])}}if (requiredLocalityAwareContainerNum > 0) { // 如果有container有locality需求val largestRatio = updatedHostToContainerCount.values.max // 全局的所有host中最大的container数量// Round the ratio of preferred locality to the number of locality required container// number, which is used for locality preferred host calculating.var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) =>val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio(k, adjustedRatio.ceil.toInt) // 往上取整}// 每个有locality需求的的Container request,为他们确定对应的hosts和rackfor (i <- 0 until requiredLocalityAwareContainerNum) {// Only filter out the ratio which is larger than 0, which means the current host can// still be allocated with new container request.val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray // 还有container可以分配的一个或者多个hostsval racks = hosts.map { h =>resolver.resolve(yarnConf, h) // 解析这些host所在的rack}.toSet// 每一个ContainerLocalityPreferences代表一个ContainercontainerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)// Minus 1 each time when the host is used. When the current ratio is 0,// which means all the required ratio is satisfied, this host will not be allocated again.preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }}}// containerLocalityPreferences中的每一项都会变成一个新的Container RequestcontainerLocalityPreferences.toArray}

其参数的基本含义是:

  • numContainer: Int 需要进行计算的Container的数量,即可能进行分配的Container数量,包括Miss的container(还没有申请的Container),Cancel掉的(本地性不符合任何task要求,因此已经从Yarn上取消的pending container)。同时,还包括Pending Container中对本地性没有要求的Container,这一部分Container也是我们重新申请的对象,以最大化Locality。上文讲到过的updateResourceRequests()方法中的potentialContainers就是传入到该方法的numContainers参数:

       // 将新的container请求,以及刚刚取消的container,作为available containerval availableContainers = missing + cancelledContainers// to maximize locality, include requests with no locality preference that can be cancelled// 在availableContainers的基础上,再算上没有任何locality要求的并且还没有分配成功的containerval potentialContainers = availableContainers + anyHostRequests.size
    
  • numLocalityAwareTasks: Int 对locality有要求的task的数量,这个是Driver端通过对stageIdToExecutorPlacementHints计算然后通过RequestExecutor传递过来的数值。已经说过,这是此时的全局状态量,而不是一个增量;

  • hostToLocalTaskCount: Map[String, Int] 在Stage提交了以后,这个map里面保存了从host到期望分配到这个host的task的数量,这个是Driver端通过stageIdToExecutorPlacementHints传递过来的,具体过程是:

    • 在Driver端,ExecutorAllocationManager的onStageSubmitted回调中,会将这个Stage的task preference存放在stageIdToExecutorPlacementHints中。

      ----------------------------------------- ExecutorAllocationManager ----------------------------------------
      override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {.....// 计算这个stage在每一个host上的task数量// Compute the number of tasks requested by the stage on each hostvar numTasksPending = 0val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>// 对于每一个task的prefered location的listnumTasksPending += 1// 对于这个task的每一个 preferred locationlocality.foreach { location => // 对于这个locality中的每一个location// 这个host上的task的数量+1val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1hostToLocalTaskCountPerStage(location.host) = count}}// 这个map的key是stage id,value是一个元组,记录了这个stage的pending的task的数量,以及从host到task count的map信息stageIdToExecutorPlacementHints.put(stageId,(numTasksPending, hostToLocalTaskCountPerStage.toMap))updateExecutorPlacementHints() 
      
    • 随后,ExecutorAllocationManager会有线程不断将这些信息通过RequestExecutors发送给远程的ApplicationMaster:

        def start(): Unit = {listenerBus.addToManagementQueue(listener)val scheduleTask = new Runnable() {override def run(): Unit = {schedule() // 这里会根据需要更新numExecutorsTarget的数量,也会调用}}executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)}
      
  • allocatedHostToContainersMap: HashMap[String, Set[ContainerId]] 已经launch起来的host -> container的映射关系。这是updateResource()方法每次通过Yarn的标准API allocate()向Yarn询问以后获取的结果。我们说过,allocate()接口用来向Yarn发送本次的资源请求,并返回当前Yarn为这个Application分配的Container的结果。由于Yarn端的资源分配是异步分配,因此allocate()返回的结果并非是这次请求的资源的分配结果,而是两次相邻的allocate()请求发生之间的新产生的资源分配结果

  • localityMatchedPendingAllocations: Seq[ContainerRequest] 对本地性有要求的pending的container,其在方法splitPendingAllocationsByLocality()中对Pending的Container的Locality状态进行切分后,那些与当前请求的Task的Locality有交集的Pending Container将作为已经存在的Container,整个资源请求的目标,是使得新申请的Container和已经分配的Container加起来,其资源倾向和所有Task的统计倾向尽量匹配,从而最大程度满足Task的本地性需求。

localityOfRequestedContainers()算法的基本过程为:

  1. 计算每一个Host上应该新分配的Container的数量的预期值。由于是新分配的Container的预期值,因此需要先根据每个Host上的预期存在的Container的总的数量,减去该Host上已经存在的Container:

    val updatedHostToContainerCount = expectedHostToContainerCount(numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,localityMatchedPendingAllocations)
    

    这里的计算,就是完成下表中从Sum of Tasks(每个机器上分配到的Task的比例) 到 Sum of Containers (每个机器上分配的Container的比例)的转换,然后根据Sum of Containers 减去每台机器上已经分配的Container,就得到了Average Allocated Container Total(每台机器上应该新分配的Container的数量):

    Host 1Host 2Host 3Host 4
    20 Tasks202020
    10 Tasks101010
    Sum of Tasks30302010
    Sum of Containers1515126
    Allocated Container Target5542
    Newly-Allocated Container4431
  2. 根据上面计算的分配结果,统计没有locality需求的Container的总数量和有locality需求的Container数量:

    // 希望再launch的所有Host上的container的数量之和,在这里的例子中,是15
    val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum// The number of containers to allocate, divided into two groups, one with preferred locality,
    // and the other without locality preference.
    //  没有locality需求的container的数量
    val requiredLocalityFreeContainerNum =math.max(0, numContainer - updatedLocalityAwareContainerNum)//  有locality需求的container的数量
    val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum
    
  3. 先为 没有locality需求的Container 构造ContainerLocalityPreferences,每一个ContainerLocalityPreferences对象对应了一个Container请求和这个请求的Locality需求。可以看到,这种没有Locality需求的Container的Host 偏好和Rack 偏好都是空的:

    val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()
    if (requiredLocalityFreeContainerNum > 0) { // 如果有container是没有locality需求的for (i <- 0 until requiredLocalityFreeContainerNum) {containerLocalityPreferences += ContainerLocalityPreferences( // 为这些没有locality需求的container一一创建container需求null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])}
    }
    
  4. 为 有locality需求的Container 构造ContainerLocalityPreferences对象,每一个ContainerLocalityPreferences对象封装了这个Container请求和这个请求的Locality需求:

    4.1 这里需要完成比较难以理解的Container请求的比例放大,保证比例最大的那个Host(这里是Host1 和 Host 2)的比例值是需要申请的Container的数量,以满足随后为每一个Container构造其Locality信息的过程:

     var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) =>val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio(k, adjustedRatio.ceil.toInt) // 往上取整}
    

    如下图所示,这里就是完成Round Up 这一步骤,将需要新分配的Container数量成比例放大,保证Container比例最大的Host(这里是Host1 和 Host 2)放大以后的值刚好等于需要分配的有Locality Preference 的 Container的总数量。

    Host 1Host 2Host 3Host 4
    20 Tasks202020
    10 Tasks101010
    Sum of Tasks30302010
    Sum of Containers1515105
    Allocated Container Target5542
    Newly Allocated Container4431
    Round Up121293

    4.2 放大完成以后,开始进行分配。
    当前有12 个Container需要分配,每一个Host的分配比例为(12,12,9,3)。分配过程上文已经经过,其代码如下:

      // 每个有locality需求的的Container request,为他们确定对应的hosts和rackfor (i <- 0 until requiredLocalityAwareContainerNum) {// Only filter out the ratio which is larger than 0, which means the current host can// still be allocated with new container request.val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray // 还有container可以分配的一个或者多个hostsval racks = hosts.map { h =>resolver.resolve(yarnConf, h) // 解析这些host所在的rack}.toSet// 每一个ContainerLocalityPreferences代表一个ContainercontainerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)// Minus 1 each time when the host is used. When the current ratio is 0,// which means all the required ratio is satisfied, this host will not be allocated again.preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }}
    

申请资源以后的处理:Executor的启动或者结束

上面讲过,资源调度的入口方法allocateResources()会通过updateResourceRequests()来计算所需资源并向Yarn进行资源的更新,包括申请新的资源、释放无用的资源等:

  def allocateResources(): Unit = synchronized {updateResourceRequests() // val allocateResponse = amClient.allocate(progressIndicator)val allocatedContainers = allocateResponse.getAllocatedContainers()handleAllocatedContainers(allocatedContainers.asScala)val completedContainers = allocateResponse.getCompletedContainersStatuses()processCompletedContainers(completedContainers.asScala)}}

通过调用Yarn的标准API allocate(),获取了资源分配的结果。再次强调,Yarn这一端的资源调度是异步调度,因此这个资源分配的结果并不是刚刚通过addContainerRequest()进行资源申请的结果,只是调用者在两次调用allocate() API的之间Yarn对于这个Application的新的资源分配结果。拿到了分配的Container,Spark就可以将Executor启动起来了(注意,是启动一个空的Executor,不是启动Task)。启动起来的Executor随后就会向DriverEndpoint注册自己,通信的详细过程参考TODO。这里不再赘述。

对分配结果的处理,主要是处理已经分配的Container以及已经运行结束的Container:

    val allocatedContainers = allocateResponse.getAllocatedContainers()handleAllocatedContainers(allocatedContainers.asScala)val completedContainers = allocateResponse.getCompletedContainersStatuses()processCompletedContainers(completedContainers.asScala)
  1. 对于已经分配的Container,需要从Yarn的AMRMClient中将对应的资源请求删除,避免对同一个资源进行多次重复申请,然后启动对应的Executor。
  2. 对于已经完成的Container,需要根据Container的退出状态,记录相关日志,同时,需要向Driver发送RemoveExecutor消息告知Driver这个Container的结束,Driver端会进行相关状态的维护。

对于新启动的Container的处理

对于一个刚刚分配成功的Container,其处理工作主要包括两个

  • 一是从AMRMClient中将对应的资源请求删除,避免同一资源请求的Container被重复申请;
  • 然后,在远程的NodeManager节点上启动Container。

这些过程在方法handleAllocatedContainers()中进行:

--------------------------------- YarnAllocator --------------------------------------
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)// 先处理Host Match的Containerval remainingAfterHostMatches = new ArrayBuffer[Container]for (allocatedContainer <- allocatedContainers) {matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,containersToUse, remainingAfterHostMatches)}// 处理Host Match以后剩余的Containerval remainingAfterRackMatches = new ArrayBuffer[Container]for (allocatedContainer <- remainingAfterHostMatches) {val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)matchContainerToRequest(allocatedContainer, rack, containersToUse,remainingAfterRackMatches)}// 处理Host Match和Rack Match以后剩余的Containerval remainingAfterOffRackMatches = new ArrayBuffer[Container]for (allocatedContainer <- remainingAfterRackMatches) {matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,remainingAfterOffRackMatches)}// 在Host Match,Rack Match,以及ANY_HOST Match以后,依然还有剩余的Container,这只能是Bugif (!remainingAfterOffRackMatches.isEmpty) {for (container <- remainingAfterOffRackMatches) {internalReleaseContainer(container)}}/*** 在这里会打印 Launching container container_1714042499037_5294_01_000002 on host*/runAllocatedContainers(containersToUse)}
  1. 遍历每一个分配的Container,在AMRMClient端找到跟这个Container所在的机器相匹配的资源请求,将这个资源请求AMRMClient中删除。这个删除操作并不会和远程的ResourceManager通信,因为这些资源请求都通过addContainerRequest() API被AMRMClient保存在本地然后通过allocate() API发送给远程的RM的。因此对应请求的删除是在AMRMClient的本地进行的:
    val remainingAfterHostMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- allocatedContainers) {matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,containersToUse, remainingAfterHostMatches)
    }
    
    其中,matchContainerToRequest()方法就是根据这个分配成功的Container的特性(Priority,VCore, CPU,Host Location),从AMRMClient中删除对应的ResourceRequest。未删除的Container保存在参数remainingAfterHostMatches中:
    --------------------------------- YarnAllocator --------------------------------------private def matchContainerToRequest(allocatedContainer: Container,location: String,containersToUse: ArrayBuffer[Container],remaining: ArrayBuffer[Container]): Unit = {// 这个Container的资源特性val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,resource.getVirtualCores)// 以Priority,Resource(VCore, Memory),location作为ID,删除这个Container对应的资源请求val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,matchingResource)if (!matchingRequests.isEmpty) { // 匹配成功val containerRequest = matchingRequests.get(0).iterator.nextamClient.removeContainerRequest(containerRequest) // 从AMRMClient中删除containersToUse += allocatedContainer} else {remaining += allocatedContainer // 未匹配的Container放入remaining,接着进行其他匹配}}
    
  2. 遍历剩下的在Host级别没有匹配成功的剩余的Container,在Rack级别进行Container和ResourceRequest 的匹配,并将匹配不成功的Container保存在remainingAfterRackMatches中:
    val remainingAfterRackMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- remainingAfterHostMatches) {/*** SparkRackResolver.*/val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)matchContainerToRequest(allocatedContainer, rack, containersToUse,remainingAfterRackMatches)
    }
    
  3. 遍历剩下的在Rack级别没有匹配成功的剩余的Container,在ANY_HOST(忽略本地偏好)级别进行Container和ResourceRequest 的匹配(这种ResourceRequest是那种没有Locality需求的ResourceRequest),并将匹配不成功的Container保存在remainingAfterRackMatches
    // Assign remaining that are neither node-local nor rack-local
    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- remainingAfterRackMatches) {matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,remainingAfterOffRackMatches)
    }
    
  4. 如果有的已经分配的Container无论是Host、Rack还是ANY_HOST偏好都没有在AMRMClient本地找到他们匹配的资源请求(注释中说,这是由于Yarn本身的竞争导致的bug),那么释放这些Container:
    if (!remainingAfterOffRackMatches.isEmpty) {for (container <- remainingAfterOffRackMatches) {internalReleaseContainer(container)}
    }
    
  5. 资源请求释放完毕以后,通过方法runAllocatedContainers()逐个启动每一个Container。
    ---------------------------------- YarnAllocator ----------------------------------------private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {for (container <- containersToUse) { // 对于已经allocate并且资源已经匹配的containerexecutorIdCounter += 1val executorHostname = container.getNodeId.getHostval containerId = container.getIdval executorId = executorIdCounter.toString // 分配executorIddef updateInternalState(): Unit = synchronized {numExecutorsRunning.incrementAndGet()numExecutorsStarting.decrementAndGet()executorIdToContainer(executorId) = container // executor 和 container的映射关系containerIdToExecutorId(container.getId) = executorId // container 和 executor 的映射关系/*** Container launch起来以后,更新allocatedHostToContainersMap*/val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,new HashSet[ContainerId])containerSet += containerIdallocatedContainerToHostMap.put(containerId, executorHostname)}numExecutorsStarting.incrementAndGet()launcherPool.execute(new Runnable {override def run(): Unit = {try {new ExecutorRunnable(........).run() // 运行ExecutorRunnable,用来和NodeManager通信来启动ContainerupdateInternalState()}})}}
    
    5.1 在线程池中启动一个ExecutorRunnable。ExecutorRunnable会负责和NodeManager进行通信,在对应节点上将Container启动起来;
    launcherPool.execute(new Runnable {override def run(): Unit = {try {new ExecutorRunnable(Some(container),conf,sparkConf,driverUrl,executorId,executorHostname,executorMemory,executorCores,appAttemptId.getApplicationId.toString,securityMgr,localResources).run()
    
    5.2 在ExecutorRunnable启动以后,由于新的Container的加入,更新相关元数据信息,包括executor -> container,container -> executor, container -> host, host -> container的映射关系,这是通过内部方法updateInternalState()来负责的:
    ---------------------------------- YarnAllocator ----------------------------------------def updateInternalState(): Unit = synchronized {numExecutorsRunning.incrementAndGet()numExecutorsStarting.decrementAndGet()executorIdToContainer(executorId) = container // executor 和 container的映射关系containerIdToExecutorId(container.getId) = executorId // container 和 executor 的映射关系/*** Container launch起来以后,更新allocatedHostToContainersMap*/val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,new HashSet[ContainerId])containerSet += containerIdallocatedContainerToHostMap.put(containerId, executorHostname)}
    

对于结束的Container的处理

对于结束的Container的处理在方法processCompletedContainers()中进行:

---------------------------------- YarnAllocator ----------------------------------------
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {for (completedContainer <- completedContainers) {val containerId = completedContainer.getContainerIdval alreadyReleased = releasedContainers.remove(containerId) //  alreadyReleased记录这个Container是否已经被释放val hostOpt = allocatedContainerToHostMap.get(containerId)val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")val exitReason = if (!alreadyReleased) { // 这个Container还没有释放,那么走释放流程// Decrement the number of executors running. The next iteration of// the ApplicationMaster's reporting thread will take care of allocating.numExecutorsRunning.decrementAndGet()// Hadoop 2.2.X added a ContainerExitStatus we should switch to use// there are some exit status' we shouldn't necessarily count against us, but for// now I think its ok as none of the containers are expected to exit.val exitStatus = completedContainer.getExitStatusval (exitCausedByApp, containerExitReason) = exitStatus match {case ContainerExitStatus.SUCCESS =>.....case ContainerExitStatus.PREEMPTED =>....case VMEM_EXCEEDED_EXIT_CODE =>....case PMEM_EXCEEDED_EXIT_CODE =>....}if (exitCausedByApp) {logWarning(containerExitReason)} else {logInfo(containerExitReason)}ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)} else {// 如果我们释放了这个Container,那么说明一定是Driver直接通过 killExecutor// 释放掉了这个Container,而不是它自行结束ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,s"Container $containerId exited from explicit termination request.")}// 解除host -> container 以及 container -> host mappingfor {host <- hostOpt // 这个Container对应的HostcontainerSet <- allocatedHostToContainersMap.get(host) // 这个Container对应的Host上的所有container} {containerSet.remove(containerId) // 删除这个containerif (containerSet.isEmpty) { // 这个container是这个host上的最后一个containerallocatedHostToContainersMap.remove(host) // 删除host} else {allocatedHostToContainersMap.update(host, containerSet)}allocatedContainerToHostMap.remove(containerId) // 解除container -> host map}// 解除container -> executor mappingcontainerIdToExecutorId.remove(containerId).foreach { eid =>executorIdToContainer.remove(eid)....if (!alreadyReleased) {// The executor could have gone away (like no route to host, node failure, etc)// Notify backend about the failure of the executornumUnexpectedContainerRelease += 1driverRef.send(RemoveExecutor(eid, exitReason))}}}
}

可以看到,方法processCompletedContainers()会遍历Yarn返回的每一个Completed(注意,Completed只是代表Container运行结束,但是运行结果可能是Succeed可能是Fail),然后逐个处理:

  1. 如果Container此时并没有被释放,说明Container是自行结束,而不是Driver所杀死的。根据Container的退出状态和退出原因,打印日志:

    val alreadyReleased = releasedContainers.remove(containerId) //  alreadyReleased记录这个Container是否已经被释放
    val exitReason = if (!alreadyReleased) {val alreadyReleased = releasedContainers.remove(containerId) //  alreadyReleased记录这个Container是否已经被释放val exitStatus = completedContainer.getExitStatusval (exitCausedByApp, containerExitReason) = exitStatus match {case ContainerExitStatus.SUCCESS =>.....case ContainerExitStatus.PREEMPTED =>....case VMEM_EXCEEDED_EXIT_CODE =>....case PMEM_EXCEEDED_EXIT_CODE =>....}if (exitCausedByApp) {logWarning(containerExitReason)} else {logInfo(containerExitReason)}}
    
  2. 如果我们发现这个Container已经在ReleasedContainer中存在,说明只能是Driver通过KillExecutor的方式将Container给Release了,而不是Container自行退出:

            // 如果我们释放了这个Container,那么说明一定是Driver直接通过 killExecutor// 释放掉了这个Container,而不是它自行结束ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,s"Container $containerId exited from explicit termination request.")
    

    如果是Driver杀死了Executor,Driver会向AMEndpoint发送KillExecutor消息,AMEndpoint会将这个Executor从其维护的元数据信息中删除,将这个Kill掉的Executor的Container添加到releasedContainers中,同时通过AMRMClient向Yarn发送释放container的请求:

      def killExecutor(executorId: String): Unit = synchronized {val container = executorIdToContainer.get(executorId).getinternalReleaseContainer(container)numExecutorsRunning.decrementAndGet()}private def internalReleaseContainer(container: Container): Unit = {releasedContainers.add(container.getId()) // 将这个Container从releasedContainer中删除amClient.releaseAssignedContainer(container.getId()) // 向Yarn发送释放Container的请求}
    
  3. 删除这个Container和Host之间的映射关系,包括Host -> Container的映射关系以及反向的Container-> Host的映射关系

       for {host <- hostOpt // 这个Container所在的HostscontainerSet <- allocatedHostToContainersMap.get(host) // 这个Host上的所有Container} {containerSet.remove(containerId)if (containerSet.isEmpty) {allocatedHostToContainersMap.remove(host)} else {allocatedHostToContainersMap.update(host, containerSet)}allocatedContainerToHostMap.remove(containerId)}
    
  4. 删除Container和Executor之间的映射关系,同时,如果不是Driver主动release的这个container,那么会向Driver发送RemoveExecutor消息

       containerIdToExecutorId.remove(containerId).foreach { eid =>executorIdToContainer.remove(eid)........if (!alreadyReleased) {// 这个Container不是Driver自行释放的,那么需要像Driver汇报一个RemoveExecutor消息driverRef.send(RemoveExecutor(eid, exitReason))}
    

基于资源分配结果进行任务调度

上面讲到对于新分配的Container的处理,在收到Yarn返回的分配的Container以后,ApplicationMaster会启动对应的Executor。这些Executor启动以后,会向Driver注册自己以告知Driver自己的存在,Driver进而将Task调度到Executor中。
其实,Task的调度的触发不仅仅是新分配了Container或者新Launch了Executor,基本上在集群的资源可能发生变化的情况下,都会触发Task的调度,因此,Task的调度是一个不断将Pending Task与可用资源进行匹配然后调度出去的过程

我列举了下面四种可以触发Driver端的CoarseGrainedSchedulerBackend通过运行makeOffers(或者只针对某一个Executor的makeOffer)来进行任务调度:

  1. 来自Executor的注册:上面说过Executor启动以后会向Driver注册自己。此时,Driver认为集群中有了新的可用资源,因此尝试进行Task到Executor的调度。Executor的注册是通过Executor启动的时候向DriverEndpoint发送RegisterExecutor消息来触发的。

    Driver将Executor能够提供的可用资源(Memory , CPU)叫做Resource Offer。因此,当收到了新的Executor的注册,Driver端会调用makeOffers()方法,为这个Executor生成对应的WorkerOffer,代表这个Executor剩余可用的CPU和Memory:

    ------------------------------------ DriverEndpoint -------------------------------
    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {//  收到 RegisterExecutor 请求,这个请求发生在Executor启动以后,向Driver发送的信息case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>......executorRef.send(RegisteredExecutor).....makeOffers()  // 尝试进行task的调度,针对所有可用的executor}
    
  2. 来自Executor的StatusUpdate:在Executor上Task的运行状态发生变化,都会告知Driver,Driver认为此时集群的资源状态发生了变化,因此尝试进行一次task的调度。但是这次的Task的调度是针对这个Executor的,即只会调度适合运行在这个Executor上的Pending Task到这个Exeuctor上:

    ------------------------------------ DriverEndpoint -----------------------------------
    override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId) // 尝试进行Task的调度,但是只针对这一个Executor...}
    
  3. 来自TaskScheduler中的任务的相关变化,比如,Task的提交,Executor的丢失,Task的失败,推测执行等等,由于系统的可用资源发生了变化,因此TaskScheduler都会通过向DriverEndpoint发送ReviveOffers消息,以触发新一轮的Pending Task的调度。

    ----------------------------------- DriverEndpoint --------------------------------override def receive: PartialFunction[Any, Unit] = {.....case ReviveOffers =>makeOffers()
    
  4. 来自SchedulerBackend的自我触发:在DriverEndpoint作为一个RpcEndpoint启动的时候,会启动一个ReviveThread,以固定频率,向自己发送ReviveOffers的本地消息(发送给自己),以触发Pending 的 Task到Container的调度:

    -------------------------------- DriverEndpoint ---------------------------------------override def onStart() {// 定期恢复offer,以允许延迟调度工作// Periodically revive offers to allow delay scheduling to workval reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")reviveThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReviveOffers))}}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)}
    

在以Locality为考虑重点的Task的调度,就是根据locality从高到低(executor-local 优先级最高),参照当前允许的优先级,取出对应的Task进行调度。如果当前优先级的Task已经调度完毕,或者当前locality的一部分或者全部Task经过了很久还没有完成调度(即当前的系统资源无法完全满足当前的locality需求),那么就需要降低locality再次尝试进行调度。
下文会详细讲解makeOffers()的具体流程。

PendingTask的生成:

Task的调度的重要逻辑是满足Task 的Location Preference, 即每一个Task对运行位置 (Executor, Host, Rack等等)上的偏好。我们先看看TaskLocation的含义和生成过程,然后看看Spark的Driver是怎样通过Locality-Aware的调度方式,尽最大可能满足每一个Task的本地性需求。

TaskLocation详解

在Spark中,一个Task的Location归根结底是由这个Task的Split决定的。在Hadoop上,一个Split的位置信息表示为SplitLocationInfo类,包含了这个Split是否在对应的Host上缓存了,以及,对应的Host(DataNode) 信息:

------------------------------------------ SplitLocationInfo -----------------------------------------
public class SplitLocationInfo {private boolean inMemory;private String location;public SplitLocationInfo(String location, boolean inMemory) {this.location = location;this.inMemory = inMemory;}

我们从SplitLocationInfo的构造过程中可以看到其具体含义。一个HDFS文件的一个Split是由FileSplit对象表达,对象中hostInfos存放了一个SplitLocationInfo数组,每一个SplitLocationInfo对象存放了一个这个Split的一个Replica的Location信息(因为在HDFS上文件是多副本的),包括具体的Hostname(即DataNode所在的节点)以及这个Host是否在内存中缓存了这个Split的标记位。
当然,在HDFS的维度,我们说一个Split被缓存,其实缓存的是这个Split对应的Replica。在Hadoop中一个文件的Split对应一个FileSplit对象,代码如下:

--------------------------------------- FileSplit ------------------------------------------------public FileSplit(Path file, long start, long length, String[] hosts,String[] inMemoryHosts) {this(file, start, length, hosts);hostInfos = new SplitLocationInfo[hosts.length];for (int i = 0; i < hosts.length; i++) {// because N will be tiny, scanning is probably faster than a HashSetboolean inMemory = false;for (String inMemoryHost : inMemoryHosts) {if (inMemoryHost.equals(hosts[i])) {inMemory = true;break;}}hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);}}

其中,一个Replica的Location信息对应了一个SplitLocationInfo对象:

--------------------------------- SplitLocationInfo ------------------------------------
public class SplitLocationInfo {private boolean inMemory;private String location;public SplitLocationInfo(String location, boolean inMemory) {this.location = location;this.inMemory = inMemory;}
  ----------------------------------- NewHadoopRDD ---------------------------------------override def getPreferredLocations(hsplit: Partition): Seq[String] = {// 返回这个 NewHadoopPartition对应的Hadoop层面的Split信息,比如,一个文件的FileSplit,// 包含了这个File中这个split的起始位置,长度,replica的位置val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value// 提取这个FileSplit的多个Replica位置信息val locs = HadoopRDD.convertSplitLocationInfo(split.getLocationInfo)locs.getOrElse(split.getLocations.filter(_ != "localhost"))}

在Spark端,在生成每一个RDD中的Partition的信息的时候,其映射关系其实从底向上为 HDFS Split -> Partition -> Task 的关系,因此,在生成RDD Partition的时候,会将对应的Split的Location信息转换成Partition的Location信息,这个Partition的Location信息使用TaskLocation对象的具体实现类来表达的。这个转换是由方法convertSplitLocationInfo()负责的,即:

  • 如果Split的是内存缓存,则构造HDFSCacheTaskLocation的location对象,代表这个partition已经被HDFS的对应Host缓存了。很显然,对应的Task如果直接调度到这台机器,则会提高读取效率;
  • 如果Split不是内存缓存,则构造HostTaskLocation,显然,Task调度到这条机器上,读取效率会更高(没有跨机器的网络带宽,如果打开了短路读,还可以利用HDFS的短路读特性)
    ----------------------------------  HadoopRDD ---------------------------------------private[spark] def convertSplitLocationInfo(infos: Array[SplitLocationInfo]): Option[Seq[String]] = {Option(infos).map(_.flatMap { loc =>val locationStr = loc.getLocationif (loc.isInMemory) {Some(HDFSCacheTaskLocation(locationStr).toString)} else {Some(HostTaskLocation(locationStr).toString)}})}
    

TaskLocation的实现类除了HDFSCacheTaskLocation和HostTaskLocation,还有ExecutorCacheTaskLocation,即这个Task希望运行在这个Executor上,比如,下面两种情况,这个Partition的Locality Preference是希望精确到对应的Executor的:

  1. 在Streaming的Task的调度中,对于Receiver的调度,为了均匀调度,会首先将所有的Task均匀调度到Host上,剩下的Executor均匀调度到Executor上。这不不在赘述,详细逻辑可以查看ReceiverSchedulingPolicy的scheduleReceivers()方法;
  2. 这个Partition在这个Executor上被缓存,因此,对应的Task肯定需要精确运行在对应的Executor上。这不不在赘述,详细逻辑可以查看DAGScheduler的getCacheLocs()方法。

根据TaskLocation信息,将Task添加到不同的pendingTask数组中

所有需要运行因此需要相应资源的Task,Driver都会将对应的Task添加到pendingTask中,这些pendingTask是由TaskSetManager维护的,一个TaskSetManager对象是一个stage的任务集合,主要负责这个Stage的Task的管理和调度。
根据每一个Task的资源本地性需求的不同,pendingTask分别维护在下面的Map中,代码如下:

  // 希望在某一个Executor上运行的pendingTasks,Key是Executor ID,Value是希望在这个Executor上运行的TaskID的列表private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]// 希望在某一个Host上运行的pendingTasks,Key是对应的Host,Value是希望在这个Host上运行的TaskID的列表private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]// 希望在某个一Rack上运行的pendingTasks,Key是对应的Rack,Value是希望在这个Rack上运行的TaskID的列表private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]// 没有任何的本地性需求的pendingTasks的列表private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

向PendingTask中添加任务的时候,会根据每一个task的本地性需求,将对应的pendingTask放入到上面不同的任务集合中,

  /** Add a task to all the pending-task lists that it should be on. */private[spark] def addPendingTask(index: Int) {for (loc <- tasks(index).preferredLocations) {loc match {case e: ExecutorCacheTaskLocation =>pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += indexcase e: HDFSCacheTaskLocation =>val exe = sched.getExecutorsAliveOnHost(loc.host)exe match {case Some(set) =>for (e <- set) {pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index}}case _ =>}pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += indexfor (rack <- sched.getRackForHost(loc.host)) {pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index}}if (tasks(index).preferredLocations == Nil) {pendingTasksWithNoPrefs += index}allPendingTasks += index  // No point scanning this whole list to find the old task there}

其基本逻辑为:

  1. 遍历所有的Task,根据对应Task的location preference,将对应的Task添加到对应的pendingLocation数组中:

  2. 如果是ExecutorCacheTaskLocation,那么就将Task添加到pendingTasksForExecutor中

            case e: ExecutorCacheTaskLocation =>pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
    
  3. 如果是HDFSCacheTaskLocation,并且这个Host上的确有active的Executor,那么就将这个task添加到这个Host上的每一个Executor上,即将这个Task添加到pendingTasksForExecutor中。但是,显然,这个Task最终只会在其中的某个Executor上运行一次,而不会重复运行。

              val exe = sched.getExecutorsAliveOnHost(loc.host)exe match {case Some(set) =>for (e <- set) {pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index}
    
  4. 无论是ExecutorCacheTaskLocation,还是HDFSCacheTaskLocation,都将这个Task添加到pendingTasksForHost中,因为这个Task肯定会运行着这个Host上的。

    pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
    
  5. 获取这个Host所在的Rack,然后添加到这个对应的pendingTasksForRack中:

          for (rack <- sched.getRackForHost(loc.host)) {pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index}
    
  6. 如果这个Task没有任何location preference,那么就添加到pendingTasksWithNoPrefs中:

        if (tasks(index).preferredLocations == Nil) {pendingTasksWithNoPrefs += index}
    
  7. 无论怎样,将这个task添加到allPendingTasks中,这是当所有的locality都无法满足要求一户最坏的Locality选择。

总之,我们需要注意:

  • 区分pendingTasksWithNoPrefs和allPendingTasks。前者意味着Task本身对locality没有任何要求,因此可以随意将Task进行分配。而allPendingTasks中存放了所有的task,意味着当我们无法满足task的localtiy要求以后的不得已的选择。比如一个在pendingTasksForRack中的task的locality要求无法满足,只能退化到选择其他Rack来运行该Task
  • pendingTasksForExecutor、pendingTasksForHost、pendingTasksForRack之间存在的包含关系,即,如果一个Task是pendingTasksForExecutor,那么也会放入到这个Executor所在的host(pendingTasksForHost)和所在的rack(pendingTasksForRack)中,同理,如果一个Task的pendingTasksForHost,也会放入到这个Host所在的Rack(pendingTasksForRack)中。而且,所有的Task都会放入到allPendingTasks中。我们从后面的locality退化可以看到原因,即如果某个locality要求下无法分配完所有的task,那么会退化到低一级的优先级,而低一级的优先级列表中肯定包含了高一级优先级列表的Task,因此这个Task可以在低一级进行调度。
  • 我们可以看到,pendingTasksForRack中的task并不是这个task的locality preference直接就是就是这个rack,而是这个task的locatiion是对应的某个host,因此将这个host所在的rack添加为这个task的rack locality。后面会看到,rack locality是比no preference更低一级的locality,其实就是这个原因,尽管表面上看起来比较疑惑,也与比如Yarn的任务调度逻辑似乎不同。

可用的LocationLevel的计算

Spark用一个枚举类型TaskLocality来表达不同的本地级别:

object TaskLocality extends Enumeration {// Process local is expected to be used ONLY within TaskSetManager for now.val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value

PROCESS_LOCAL: 进程本地,即这个Task希望调度到某一个特定的Executor上
NODE_LOCAL: 节点本地,即这个Task希望调度到某一个特定的节点
NO_PREF: 没有本地倾向,即这个Task可以任意调度到集群中的任何节点
RACK_LOCAL: 机架本地,即这个Task希望调度到某一个特定的rack上
ANY: 无法满足要求,是前面四种本地性倾向都无法满足要求时的最后的选择策略,将有节点倾向的Task任意调度到集群中无法满足其本地性倾向的位置上。
这里需要注意的是:

  1. 显然,越靠前,数字越小,Locality级别越高
  2. RACK_LOCAL并不指的是这些任务的locality preference中本来就指明了运行在这个Rack上,而是NODE_LOCAL或者PROCESS_LOCAL的对应退化调度策略,即,如果对应的节点或者Executor由于资源问题无法满足其要求,就会退化到使用RACK_LOCAL,并不是Task本身声明调度到rack上。
  3. NO_PREF的优先级比RACK_LOCAL的优先级更高,这是因为NO_PREF指的是Task本身明确表明自己没有本地性偏好,因此不是一种退化策略,而是对其locality的一种完全的满足,而RACK_LOCAL其实是一种退化策略,因此RACK_LOCAL的优先级低于NO_PREF
  4. ANY的优先级最低,和RACK_LOCAL一样,也是一种退化策略。

在一个TaskSet的TaskSetManager构造的时候,以及后来一个新的Executor加入或者丢失的时候,都会重新计算这个TaskSet的Valid Locality Levels。所以,应该注意到:

  • Valid Locality Levels 指的是根据当前的Pending Task的不同LOCALITY需求(比如,不同locality需求的pending task分别放在了pendingTasksForExecutor、pendingTasksForHost、pendingTasksWithNoPrefs、pendingTasksForRack和allTasks中),计算我们目前可能需要的是哪些TaskLocality。这样,在对Task进行基于Locality 的分配(下文会讲到)的时候,只需要考虑这些valid locality levels。
  • 必须注意,computeValidLocalityLevels是TaskSetManager的成员方法,即这里是为某一个TaskSet计算对应的Valid Locality Levels

下面的方法显示了计算valid locality level的基本过程:

------------------------------------- TaskSetManager --------------------------------------private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}val levels = new ArrayBuffer[TaskLocality.TaskLocality]// 如果有任务在等待执行器(executor),并且这些执行器中有活着的执行器,那么就将PROCESS_LOCAL添加到有效级别列表中。if (!pendingTasksForExecutor.isEmpty &&pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {levels += PROCESS_LOCAL}// 如果有任务在等待主机(host),并且这些主机上有活着的执行器,那么就将NODE_LOCAL添加到有效级别列表中。if (!pendingTasksForHost.isEmpty &&pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {levels += NODE_LOCAL}if (!pendingTasksWithNoPrefs.isEmpty) {levels += NO_PREF}// 如果有任务在等待机架(rack),并且这些机架上有活着的主机,那么就将RACK_LOCAL添加到有效级别列表中。if (!pendingTasksForRack.isEmpty &&pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {levels += RACK_LOCAL}levels += ANYlevels.toArray}

该方法返回一个TaskLocality数组,即如果对应的pendingTask数组中有Task,那么就讲对应的TaskLocality添加到数组中。这里需要注意到往数组中添加TaskLocality是按照TaskLocality的值从低到高(Locality优先级从高到低)的顺序添加的,从下文介绍基于Locality的Task调度可以看到,调度时会遍历这个返回的TaskLocality数组,即调度时按照Locality优先级从高到低进行的。最后的TaskLocality.ANY一定会最后添加到结果中,作为一个优先级的保底操作。

基于Locality的Task调度

上文讲过,Task的调度的触发,以及其通过makeOffers()方法进行调度。在这里,我们详细讲解这个调度的基本细节。
makeOffers()的基本功能,就是为当前的Executor的资源剩余情况,生成对应的WorkerOffer,代表这些Executor可提供的运行资源,其中包含了对应的executorId,所在的host信息以及可用的vCore信息:

private[spark]
case class WorkerOffer(executorId: String, host: String, cores: Int)

然后TaskScheduler会根据这些剩余资源,将对应的pendingTask调度出去,当然,调度过程中需要依赖对应的Task的locality信息。

    private def makeOffers() {// 根据集群当前的可用资源状况,生成Task的调度结果val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map {case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeq // 构造每一个Executor上的可用资源scheduler.resourceOffers(workOffers)  // 在这里依赖于TaskScheduler来调度对应的task到对应的worker上}launchTasks(taskDescs) // 启动这些tasks}

resourceOffers()方法是TaskSchedulerImpl的成员方法,其输入是一系列的WorkerOffer,返回可以进行调度的所有Task(每一个Task由一个TaskDescription表示)。我们后面会看到,TaskSchedulerImpl会遍历当前需要调度的所有TaskSet尝试进行调度,而不是某一个TaskSet。
resourceOffers()代码如下所示:

  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {// Mark each slave as alive and remember its hostname// Also track if new executor is addedvar newExecAvail = falsefor (o <- offers) { // 对于每一个WorkerOfferif (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) = new HashSet[String]() // 构建这个Host的Executor的map}// 是否有新增的executor进来。如果有,则维护executor相关的map信息if (!executorIdToRunningTaskIds.contains(o.executorId)) {hostToExecutors(o.host) += o.executorId // 将这个executor加到这个host -> executors 的map中executorAdded(o.executorId, o.host)executorIdToHost(o.executorId) = o.host // 将这个host加到executor -> host 的毛重executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() // 构建这个executor -> tasks的mapnewExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host // 构建rack -> hosts的map}}// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do// this here to avoid a separate thread and added synchronization overhead, and also because// updating the blacklist is only relevant when task offers are being made.blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>offers.filter { offer =>!blacklistTracker.isNodeBlacklisted(offer.host) &&!blacklistTracker.isExecutorBlacklisted(offer.executorId)}}.getOrElse(offers)// 将offer进行随机shuffle,返回打乱顺序以后的IndexedSeq[WorkerOffer]val shuffledOffers = shuffleOffers(filteredOffers)// Build a list of tasks to assign to each worker.// 根据当前的WorkerOffer,预构建一个TaskDescription的二维数组val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))val availableCpus = shuffledOffers.map(o => o.cores).toArray // 构建WorkerOffer -> 可用CPU的对应关系val sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {if (newExecAvail) { // 如果有新的Executor加入进来taskSet.executorAdded() // 重新计算这个TaskSet的locality的相关信息}}// Take each TaskSet in our scheduling order, and then offer it each node in increasing order// of locality levels so that it gets a chance to launch local tasks on all of them.// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYfor (taskSet <- sortedTaskSets) { // 按照调度顺序,取出每一个TaskSetvar launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = false// locality从低到高遍历这个TaskSet中的每一个可用的localityLevels,locality越低代表本地性越好for (currentMaxLocality <- taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) { // 没有launch任何一个tasktaskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}if (tasks.size > 0) {hasLaunchedTask = true}return tasks}

makeOffers()的基本流程如下图所示:
在这里插入图片描述

其基本流程为:

  1. 根据生成的每一个WorkerOffer,TaskScheduler会维护executor <-> host <-> rack之间的映射关系。刚刚说过,resourceOffers()调用有可能来自于Executor的注册,因此有必要根据WorkerOffer来更新这个新的Executor和Host以及Rack之间的映射关系
        for (o <- offers) { // 对于每一个WorkerOfferif (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) = new HashSet[String]() // 构建这个Host的Executor的map}// 是否有新增的executor进来。如果有,则维护executor相关的map信息if (!executorIdToRunningTaskIds.contains(o.executorId)) {hostToExecutors(o.host) += o.executorId // 将这个executor加到这个host -> executors 的map中executorAdded(o.executorId, o.host)executorIdToHost(o.executorId) = o.host // 将这个host加到executor -> host 的毛重executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() // 构建这个executor -> tasks的mapnewExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host // 构建rack -> hosts的map}}
    
  2. 过滤掉部分不考虑的WorkerOffer,比如,这个WorkerOffer的Node或者Executor在黑名单中
        val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>offers.filter { offer =>!blacklistTracker.isNodeBlacklisted(offer.host) &&!blacklistTracker.isExecutorBlacklisted(offer.executorId)}}.getOrElse(offers)
    
  3. 为了避免每次都将Task调度到某一个WorkerOffer上(假如这个WorkerOffer资源足够多),每次调用resourceOffers()的时候都会对WorkerOffer进行一次重新随机排序。注意,这个重新排序只是将WorkerOffer重新排序,没有将Locality重新排序,locality永远从高到低(值从小到大)进行考虑。
    // 将offer进行随机shuffle,返回打乱顺序以后的IndexedSeq[WorkerOffer]val shuffledOffers = shuffleOffers(filteredOffers)// Build a list of tasks to assign to each worker.// 根据当前的WorkerOffer,预构建一个TaskDescription的二维数组val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))val availableCpus = shuffledOffers.map(o => o.cores).toArray // 构建WorkerOffer -> 可用CPU的对应关系
  1. 根据TaskSet的调度策略,获取排序以后的TaskSet列表。对于列表中的每一个TaskSet,如果这个TaskSet刚刚有新的Host加入,需要通过调用TaskSet的executorAdded()方法,这个方法主要是重新计算locality信息。上文已经介绍过TaskSet的locality信息的计算。 TaskSet的调度策略不在本文介绍范围内,感兴趣的读者可以自行了解
    val sortedTaskSets = rootPool.getSortedTaskSetQueue // 根据TaskSet调度策略,返回排序以后的TaskSet数组
    for (taskSet <- sortedTaskSets) {if (newExecAvail) { // 如果有新的Executor加入进来taskSet.executorAdded() // 重新计算这个TaskSet的locality的相关信息}
    }
    
  2. 根据排序以后的TaskSet,遍历每一个TaskSet,基于这个TaskSet的locality数组,对这个TaskSet的task进行调度。
        for (taskSet <- sortedTaskSets) { // 按照调度顺序,取出每一个TaskSetvar launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = false// locality从低到高遍历这个TaskSet中的每一个可用的localityLevels,locality越低代表本地性越好for (currentMaxLocality <- taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) { // 没有launch任何一个tasktaskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}
    
    上文讲过locality数组的生成过程,可以看到locality数组从前到手的locality从高到低,因此,这里的逻辑是,对于每一个TaskSet,尝试先调度locality最高的Task,并且,如果当前要求的locality在指定的超时时间内无法将所有task调度完毕,将尝试更低一级的locality进行调度。到最后,locality会降低到TaskLocality.ANY,即进行任意调度。

所以,对于一个TaskSet中的task根据当前要求的TaskLocality进行任务调度,发生在方法resourceOfferSingleTaskSet()中。这个方法根据允许的最大的locality(currentMaxLocality, 这里的最大指的是最低要求,即,不可以比这个locality更宽松了),当前可用资源(shuffledOffers),需要调度的TaskSet,返回成功调度的这个TaskSet中的task:

  private def resourceOfferSingleTaskSet(taskSet: TaskSetManager, // 当前的TaskSetmaxLocality: TaskLocality, // 当前最大的locality,最大的意思是最优的localityshuffledOffers: Seq[WorkerOffer], // 每一个WorkerOffer代表了一个可用资源availableCpus: Array[Int], // 这个shuffledOffers中的每一个WorkerOffer所代表的可用的VCores// 一个WorkerOffer按照可用cpu以及每个task的cpu,算出Task的数量tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {var launchedTask = false// nodes and executors that are blacklisted for the entire application have already been// filtered out by this pointfor (i <- 0 until shuffledOffers.size) { // 遍历每一个WorkerOffer(每一个WorkerOffer对应了一个executor)val execId = shuffledOffers(i).executorIdval host = shuffledOffers(i).hostfor (task <- taskSet.resourceOffer(execId, host, maxLocality)) { // 往这个Offer上调度一个Tasktasks(i) += task // 保存调度结果val tid = task.taskId....launchedTask = true}}return launchedTask}

从上面的代码可以看到,resourceOfferSingleTaskSet()方法会遍历当前所有的可用资源WorkerOffer,尝试按照当前的maxLocality,调用resourceOffer()方法,往这个Offer上面调度一个task。
resourceOffer()方法是TaskSet的成员方法,其根据executorID, hostname以及最大允许的locality(maxLocality, 即locality不可以再差了),尝试从pendingTask中选出一个满足条件的task调度:

  def resourceOffer(execId: String, // executor idhost: String, // executor所在的hostmaxLocality: TaskLocality.TaskLocality) // 所容许的locality,不能比这个locality更宽松: Option[TaskDescription] ={ ....val curTime = clock.getTimeMillis()// allowedLocality 代表当前最宽松的locality是什么,显然,在开始的时候,我们希望allowedLocality严格一点儿,// 后面如果分配失败了,再逐渐放松要求var allowedLocality = maxLocality//  如果 maxLocality == TaskLocality.NO_PREF,那么allowedLocality = maxLocality,// 进入TaskLocality.NO_PREF本来就是对调度没有任何要求if (maxLocality != TaskLocality.NO_PREF) {allowedLocality = getAllowedLocalityLevel(curTime) // 根据当前的时间,更新当前时间节点下的allowedLocalityif (allowedLocality > maxLocality) { // allowedLocality比maxLocality更松弛// We're not allowed to search for farther-away tasksallowedLocality = maxLocality}}// 根据当前计算得到的locality 弹出对应的tasks,然后调度起来dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>// Found a task; do some bookkeeping and return a task description.....currentLocalityIndex = getLocalityIndex(taskLocality)lastLaunchTime = curTime // 更新这个TaskSetManager的最后一个task的启动时间.........addRunningTask(taskId)sched.dagScheduler.taskStarted(task, info)new TaskDescription(.....)}}

可以看到,getAllowedLocalityLevel()方法根据当前传入的最大允许的locality和当前时间,计算真正的允许的locality,然后从当前的pendingTask中取出满足要求的Task返回。

  1. 根据传入的最大允许的locality(maxLocality),计算实际使用的allowedLocality。之所以实际使用的allowedLocality可能与maxLocality不同,原因是maxLocality代表了这个TaskSet的locality数组的外层循环(resourceOffers()方法会按照locality从第到高进行Task的分配),但是在延迟调度的场景下,在某一个locality level没有合适资源的情形下,在配置的退化时间到来之前,不急于将locality进行退化处理,即myLocalityLevels(currentLocalityIndex)不变,依然为NODE_LOCAL。在下一轮调度重新到来的时候(resourceOffers()方法重新运行),在尝试到maxLocality = ALL的时候,突然有了可以满足locality = NODE_LOCAL的资源,这时候getAllowedLocalityLevel()就会返回myLocalityLevels(currentLocalityIndex) = NODE_LOCAL,因为这个locality level的超时时间还没到,进而maxLocality更新为NODE_LOCAL,从而在最大运行的locality为NODE_LOCAL的约束下进行任务调度
  2. 根据当前的host, executorID和maxLocality,尝试从pendingTask的队列中取出一个符合要求的Task。这里的符合要求指的是,这个Task的locality偏好是允许在这个host+executorID上运行的,并且如果在这个host + executorID上运行,其locality是不会比maxLocality更差的。
    比如:

dequeueTask的逻辑如下所示。它根据当前的WorkerOffer(Executor ID + hostname)和允许的最大的locality(maxLocaltiy),返回一个locality满足要求的Task准备运行。这里的满足要求指的是,这个Task如果运行在这个WorkerOffer上,能够满足所允许的最差的locality(maxLocality)的要求。

  private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] ={// TaskLocality.isAllowed(maxLocality, TaskLocality.PROCESS_LOCAL) 永远返回true,因此不做判断for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {// Look for noPref tasks after NODE_LOCAL for minimize cross-rack trafficfor (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {for {rack <- sched.getRackForHost(host)index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))} {return Some((index, TaskLocality.RACK_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {return Some((index, TaskLocality.ANY, false))}}// find a speculative task if all others tasks have been scheduleddequeueSpeculativeTask(execId, host, maxLocality).map {case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}}

示例1:
当前的WorkerOffer信息如下:

  • Hostname: testhost1.corp.com
  • ExecutorID: 25536
    当前的maxLocality = NODE_LOCAL, testhost1-rack1.corp.com所在的rack为rack1

在当前的TaskSet中pendingTasksForExecutor中有一个Task A,其preference正好是 Executor 25535。因此,根据上文所讲解的pendingTask的添加过程,Task A会同时存在于pendingTasksForExecutor,pendingTasksForHost,pendingTasksForRack以及allTasks中。
这时候dequeueTask()的运行过程为:

  • 由于pendingTasksForExecutor中没有针对Executor 25536的Task A,因此返回Task A,Task A的实际locality是更好的PROCESS_LOCAL,满足了maxLocality = NODE_LOCAL的要求

示例2:
当前的WorkerOffer信息如下:

  • Hostname: testhost1.corp.com
  • ExecutorID: 25536
    当前的maxLocality = NODE_LOCAL, testhost1-rack1.corp.com所在的rack为rack1

在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536
的pending Task。但是在pendingTasksForHost中有一个Task B, 其 locality preference正好是Hostname: testhost1.corp.com。 根据上文所讲解的pendingTask的添加过程,Task B会同时存在于pendingTasksForHost,pendingTasksForRack以及allTasks中。
这时候dequeueTask()的运行过程为:

  • 由于pendingTasksForExecutor中没有针对Hostname: testhost1.corp.com,因此跳过
  • 由于pendingTasksForHost中有针对Hostname testhost1.corp.com的Task,因此取出并返回Task B,此时的实际locality正好等于maxLocality,为 NODE_LOCAL

示例3:
当前的WorkerOffer信息如下:

  • Hostname: testhost1.corp.com
  • ExecutorID: 25536
    当前的maxLocality = RACK_LOCAL, testhost1-rack1.corp.com所在的rack为rack1

在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536
的pending Task。但是在pendingTasksForHost中有一个Task B, 其 locality preference是Hostname: testhost2-rack1.corp.com,即它的locality preference同当前的WorkerOffer不在一个节点但是在一个机架上。 根据上文所讲解的pendingTask的添加过程,Task B会同时存在于pendingTasksForHost,pendingTasksForRack以及allTasks中。
这时候dequeueTask()的运行过程为:

  • 由于pendingTasksForExecutor中没有针对Hostname: testhost1.corp.com,因此跳过
  • 由于pendingTasksForHost中有针对Hostname testhost1.corp.com的Task,因此跳过
  • 由于pendingTasksForHost中有针对rack1的Task C,因此取出并返回Task C,此时的实际locality是RACK_LOCAL

示例4:
当前的WorkerOffer信息如下:

  • Hostname: testhost1.corp.com
  • ExecutorID: 25536
    当前的maxLocality = ANY, testhost1-rack1.corp.com所在的rack为rack1
    在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536的的pending Task。pendingTasksWithNoPrefs也没有Task。但是在pendingTasksForHost中有一个Task D, 其 locality preference是另外一个Hostname: testhost2-rack2.corp.com(这个testhost2所在的rack为rack2)。 根据上文所讲解的pendingTask的添加过程,Task D会同时存在于pendingTasksForHost,pendingTasksForRack以及allTasks中。
    这时候dequeueTask()的运行过程为:
  • 由于pendingTasksForExecutor中没有针对ExecutorID: 25536,因此跳过
  • 由于pendingTasksForHost中没有针对Hostname testhost2.corp.com的Task,因此跳过
  • 由于pendingTasksWithNoPrefs中没有任何Task,因此跳过
  • 由于getPendingTasksForRack中没有针对rack1的任何task,因此跳过
  • 在allPendingTasks中取出Task D,满足maxLocality = NODE_LOCAL 要求,因此取出并返回Task D,此时实际的locality正好等于maxLocality,为ANY。

结语

Spark在Yarn上的资源管理是粗粒度的资源管理(Coarse Grained),即资源和Task并不一一对应。ApplicationMaster作为资源请求的代理,充当了细粒度的Task和粗粒度的Yarn Container之间的桥梁,即,根据细粒度的、全量的Task资源需求状态不断生成增量的、粗粒度的资源请求,并将Yarn不断异步返回的资源和当前的Task进行匹配,以最优化Task的放置。
本文全面地讲解了整个资源请求和调度过程。读者从中可以看到基于Yarn的整个资源调度过程,Spark的不同角色之间的通信过程,整个过程虽然是以Yarn为基础,但是其反映的是一个通用的资源调度决策的基本思路,因此很有参考意义。
我觉得,感兴趣的读者,可以和Spark on K8S这种细粒度的资源调度过程作比较,将会有更多的收益。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/47090.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

win10删除鼠标右键选项

鼠标右键菜单时&#xff0c;发现里面的选项特别多&#xff0c;找一下属性&#xff0c;半天找不到。删除一些不常用的选项&#xff0c;让右键菜单变得干净整洁。 1、按下键盘上的“winR”组合按键&#xff0c;调出“运行”对话框&#xff0c;输入“regedit”命令&#xff0c;点击…

linux后门教程

linux后门教程 alias 用法 系统默认别名&#xff1a;alias 设置别名&#xff1a;alias lsls -laih 删除别名&#xff1a;unalias ls **加参数&#xff1a;**alias ls‘ls -laih;pwd’ 注意 系统启动默认加载的配置文件 /etc/profile 切换用户就会执行/etc/profile /etc/bash…

Python 实验五 高级数据结构

一、实验目的 &#xff08;1&#xff09;掌握序列的基本操作 &#xff08;2&#xff09;掌握集合、字典的基本操作 二、实验环境 联网计算机一台/每人&#xff0c;内装Windows 7以上操作系统和安装Python 3.7集成开发环境IDLE。 三、实验内容 Sy5-1 列表实现。编写一个…

minIO集成springboot

问题 minIO与spring集成。 步骤 创建桶 创建key 找到创建账号页面&#xff0c;如下图&#xff1a; 点击创建&#xff0c;如下图&#xff1a; 设置如下权限&#xff1a; {"Version": "2012-10-17","Statement": [{"Effect": &q…

codeforces round 948 div2(a,b,c)

题目链接 A #include<bits/stdc.h>using namespace std;#define int long long #define PII pair<int,int>void solve() {int n,m;cin>>n>>m;if(n&1){if((m&1)&&m>1&&m<n)cout<<"YES"<<\n;else…

python的异常

异常 定义 异常是程序执行中发生的错误事件&#xff0c;它可以打断正常的指令流。Python提供了强大的异常处理机制&#xff0c;允许程序在发生错误时执行某些替代指令&#xff0c;而不是直接崩溃。 类型 TypeError&#xff1a;类型错误&#xff0c;比如尝试将字符串和整数相加。…

Jenkins安装nodeJs环境

首先插件市场安装nodeJS插件&#xff0c;我这里已经安装了&#xff0c;没安装的话在 Available plugins 中搜索安装 安装完成后需要下载需要的nodejs版本 新增完成就可以在构建的时候选择当前版本号了

JMeter接口测试之文件上传(参数提取与传递)

参考文档&#xff1a; Jmeter接口测试-文件上传&#xff08;全网最详细的教程&#xff09;_jmeter 文件上传-CSDN博客 1、首先通过fiddler抓取文件上传接口&#xff0c;在Raw的tab页中查看默认请求头以及请求参数 如图所示 2、在jmeter中导入抓取的接口&#xff0c;首先需要配…

新书速览|深入理解Hive:从基础到高阶:视频教学版

《深入理解Hive&#xff1a;从基础到高阶&#xff1a;视频教学版》 本书内容 《深入理解Hive:从基础到高阶:视频教学版》采用“理论实战”的形式编写&#xff0c;通过大量的实例&#xff0c;结合作者多年一线开发实战经验&#xff0c;全面地介绍Hive的使用方法。《深入理解Hiv…

AI算法18-最小角回归算法Least Angle Regression | LARS

​​​ 最小角回归算法简介 最小角回归&#xff08;Least Angle Regression, LAR&#xff09;是一种用于回归分析的统计方法&#xff0c;它在某些方面类似于最小二乘回归&#xff0c;但提供了一些额外的优点。最小角回归由Bradley Efron等人提出&#xff0c;主要用于处理具有…

【Linux】安装PHP扩展-redis

说明 本文档是在centos7.6的环境下&#xff0c;安装PHP7.4之后&#xff0c;安装对应的PHP扩展包redis。 一、下载redis扩展 pecl官方地址:PECL :: The PHP Extension Community Library 下载的版本是&#xff1a;redis-5.3.7.tgz 二、安装redis扩展 1.上传 redis 压缩包到…

【嵌入式DIY实例-ESP8266篇】-LCD ST7789显示DS1307 RTC时间数据

LCD ST7789显示DS1307 RTC时间数据 文章目录 LCD ST7789显示DS1307 RTC时间数据1、硬件准备与接线2、代码实现本文将介绍如何使用 ESP8266 NodeMCU 板和 DS1307 RTC 集成电路构建简单的实时时钟和日历 (RTCC),其中时间和日期打印在 ST7789 TFT 显示模块上。 ST7789 TFT 模块包…

【海外云手机】静态住宅IP集成解决方案

航海大背景下&#xff0c;企业和个人用户对于网络隐私、稳定性以及跨国业务的需求日益增加。静态住宅IP与海外云手机的结合&#xff0c;提供了一种创新的集成解决方案&#xff0c;能够有效应对这些需求。 本篇文章分为三个部分&#xff1b;静态住宅优势、云手机优势、集成解决…

gemini-pro-vision 看图说话

一、安装 pip install -U langchain-google-vertexai 二、设置访问权限 申请服务账号json格式key 三、完整代码 import gradio as gr import json import base64 from pathlib import Path import os import time import requests from fastapi import FastAPI, UploadFile,…

K8S私有云裸金属服务器负载均衡器OpenELB——筑梦之路

OpenELB介绍 OpenELB 是一个专为裸机 Kubernetes 集群设计的开源负载均衡器实现。 在云服务环境中的 Kubernetes 集群里&#xff0c;通常可以用云服务提供商提供的负载均衡服务来暴露 Service&#xff0c;但是在本地没办法这样操作。而 OpenELB 可以让用户在裸金属服务器、边缘…

RocketMQ~架构与工作流程了解

简介 RocketMQ 具有高性能、高可靠、高实时、分布式 的特点。它是一个采用 Java 语言开发的分布式的消息系统&#xff0c;由阿里巴巴团队开发&#xff0c;在 2016 年底贡献给 Apache&#xff0c;成为了 Apache 的一个顶级项目。 在阿里内部&#xff0c;RocketMQ 很好地服务了集…

怎么关闭Windows安全中心?

Windows安全中心是Windows操作系统中的一项重要功能&#xff0c;系统提供这个功能的目的是保护电脑免受各种安全威胁。尽管如此&#xff0c;有时候我们可能出于某些原因需要关闭它。本文将详细介绍如何关闭Windows安全中心&#xff0c;以及需要注意的事项。 重要提醒&#xff1…

搞清c++中的队列(queue)以及双端队列(deque),以及常用的接口!

1. 队列 概念&#xff1a;Queue是一种先进先出(First In First Out,FIFO)的数据结构&#xff0c;它有两个出口 特征&#xff1a; 队列容器允许从一端新增元素&#xff0c;从另一端移除元素 队列中只有队头和队尾才可以被外界使用&#xff0c;因此队列不允许有遍历行为 队列…

这个工具居然能让你的微信暴露得一览无余!!

今天在github看到一个不错的项目&#xff0c;叫做wx-dump-4j&#xff0c;不看不知道&#xff0c;一看吓一跳&#xff0c;这个工具完全的解析了我的微信&#xff01;这个工具准确显示好友数、群聊数和当日消息总量&#xff0c;并且&#xff01;&#xff01;这个工具提供过去15天…

第59期|GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区&#xff0c;集成了生成预训练Transformer&#xff08;GPT&#xff09;、人工智能生成内容&#xff08;AIGC&#xff09;以及大语言模型&#xff08;LLM&#xff09;等安全领域应用的知识。在这里&#xff0c;您可以找…