Flink 调度源码分析4:Physical Slot 分配过程

Flink 调度源码分析1:拓扑图创建与提交过程
Flink 调度源码分析2:调度过程
Flink 调度源码分析3:Shared Slot 分配策略
Flink 调度源码分析4:Physical Slot 分配过程

1 整体过程

在 SlotSharingExecutionSlotAllocator.allocateSlotsForVertices() 中,会检查共享组是否有 slot,如果没有的话,会在下一步使用 PhysicalSlotProvider 为其分配 slot。

// 检查共享组是否有 slot
Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =  tryAssignExistingSharedSlots(groupsToAssign);  
slots.putAll(assignedSlots);  
groupsToAssign.removeAll(assignedSlots.keySet());  // 对没有 slot 的共享组分配 slot
if (!groupsToAssign.isEmpty()) {  Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots =  allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever);  slots.putAll(allocatedSlots);  groupsToAssign.removeAll(allocatedSlots.keySet());  // 所有的共享组一定有共享 slot    Preconditions.checkState(groupsToAssign.isEmpty());  
}

接下来查看 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 函数(注意这个函数,后面会多次提到这里)。
在这里插入图片描述

2 创建 slot 请求

2.1 获取 slot 配置

在 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 中,会对每一个共享组执行下面代码。

// 使用 SharedSlotProfileRetriever 创建 slot 配置文件  
ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);  
SlotProfile slotProfile =  sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);

会得到一个 SlotProfile。SlotProfile 是 task 希望调度的 slot 的配置文件。配置文件包含资源或位置限制等属性,其中一些可能是硬限制,也可能是软限制。它还包含 physical slot 的资源信息,当 shared slot 没有可用的 physical slot 时,可使用这些信息分配 physical slot。可以生成一个 matcher,通过在 SlotContext 中对 SlotProfile 以及其他要求进行匹配,筛选出候选 slot。
SlotProfile 包含下面这些属性。

/** This specifies the desired resource profile for the task slot. */  
private final ResourceProfile taskResourceProfile;  
/** This specifies the desired resource profile for the physical slot to host this task slot. */  
private final ResourceProfile physicalSlotResourceProfile;  
/** This specifies the preferred locations for the slot. */  
private final Collection<TaskManagerLocation> preferredLocations;  
/** This contains desired allocation ids of the slot. */  
private final Collection<AllocationID> preferredAllocations;  
/** This contains all reserved allocation ids from the whole execution graph. */  
private final Set<AllocationID> reservedAllocations;
  • taskResourceProfile 和 physicalSlotResourceProfile 是配置,两个一般是相等的。
  • preferredLocations 表示期望得到哪个 taskmanager 的 slot。
  • preferredAllocations 表示希望得到哪个 AllocationID,reservedAllocations存储了已经被分配的 reservedAllocations。
    AllocationID:JobManager 已分配 physical slot 的唯一标识符。该 ID 在 JobManager 首次请求 slot 时分配,并在重新分配时保持不变。JobManager 和 ResourceManager 使用此 ID 来跟踪和同步哪些 slot 分配给了哪个 TaskManager,哪些是空闲的。与 AllocationID 不同,SlotRequestId 用于任务从 SlotPool 请求 logical slot 时。多个 SlotRequestId 可以映射到一个 AllocationID(由于槽共享)。
    然后看看 sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile) 做了什么。
public SlotProfile getSlotProfile(  ExecutionSlotSharingGroup executionSlotSharingGroup,  ResourceProfile physicalSlotResourceProfile) {  Collection<AllocationID> priorAllocations = new HashSet<>();  Collection<TaskManagerLocation> preferredLocations = new ArrayList<>();  for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) {  priorAllocationIdRetriever.apply(execution).ifPresent(priorAllocations::add);  preferredLocations.addAll(  preferredLocationsRetriever.getPreferredLocations(  execution, producersToIgnore));  }  // 创建 SlotProfilereturn SlotProfile.priorAllocation(  physicalSlotResourceProfile,  physicalSlotResourceProfile,  preferredLocations,  // 指定 slot 位置的选择priorAllocations,  reservedAllocationIds);  
}

2.2 slot 优先位置

怎么确定 SlotProfile 中的 preferredLocations 参数的值?
位置的确定涉及两种接口:StateLocationRetriever 和 InputsLocationsRetriever。通过这两种获取优先部署位置。StateLocationRetriever 会获取每个执行节点的状态所在的位置。InputsLocationsRetriever 会获取当前节点的输入的所在位置。这两个逻辑在 SchedulerBase 构造函数中创建:

stateLocationRetriever =  // StateLocationRetriever 是只有一个方法的接口,所以这直接通过lambda函数创建实例executionVertexId ->  getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();  
inputsLocationsRetriever =  // 类为 ExecutionGraphToInputsLocationsRetrieverAdapternew ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);

在 SlotSharingExecutionSlotAllocatorFactory.createInstance() 中制定了优先位置检索器。

SyncPreferredLocationsRetriever preferredLocationsRetriever =  new DefaultSyncPreferredLocationsRetriever(context, context);

查看怎么决定优先位置的代码:

MergingSharedSlotProfileRetriever.getSlotProfile()
->  preferredLocationsRetriever.getPreferredLocations(execution, producersToIgnore)->  asyncPreferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore) // 这里虽然写着 async,但其实是同步的,也就是必须这个函数运行成功,才会执行下一步。也就是说这个位置必须是立即可用的,否则就不能用。->  getPreferredLocationsBasedOnInputs(executionVertexId, producersToIgnore)

getPreferredLocationsBasedOnInputs() 中的代码如下:

private CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(  final ExecutionVertexID executionVertexId,  final Set<ExecutionVertexID> producersToIgnore) {  CompletableFuture<Collection<TaskManagerLocation>> preferredLocations =  CompletableFuture.completedFuture(Collections.emptyList());  final Collection<ConsumedPartitionGroup> consumedPartitionGroups =  inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);  for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) {  // 为了避免太过分散,如果上游算子过多,则不获取它们的位置           if (consumedPartitionGroup.getConsumerVertexGroup().size()  > MAX_DISTINCT_CONSUMERS_TO_CONSIDER) {  continue;  }  // 获取上游节点的位置final Collection<CompletableFuture<TaskManagerLocation>> locationsFutures =  getInputLocationFutures(  producersToIgnore,  inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(  consumedPartitionGroup));  preferredLocations = combineLocations(preferredLocations, locationsFutures);  }  return preferredLocations;  
}

这里返回的 preferredLocations 最终会传递给 SlotProfile。

2.3 slot 请求

下一步需要创建 PhysicalSlotRequest:

PhysicalSlotRequest request =  new PhysicalSlotRequest(  physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);

PhysicalSlotRequest 包含以下内容:

private final SlotRequestId slotRequestId;  
private final SlotProfile slotProfile;  
private final boolean slotWillBeOccupiedIndefinitely;  // jobType == JobType.STREAMING

3 分配 physical slot

在 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 通过下面的代码分配 slot。

Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult =  slotProvider.allocatePhysicalSlots(slotRequests);

这里的 slotProvider 是 PhysicalSlotProvider 类。在 DefaultSchedulerComponents.createPipelinedRegionSchedulerComponents() 中创建了 PhysicalSlotProvider。可以看到它实际是个 PhysicalSlotProviderImpl。

final PhysicalSlotProvider physicalSlotProvider =  new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);

在 slotProvider.allocatePhysicalSlots() 中尝试为每一个 slot 分配请求执行如下代码:

// 尝试从可用的 slots 中,为每个请求分配一个 physicalSlotMap<SlotRequestId, Optional<PhysicalSlot>> availablePhysicalSlots =  tryAllocateFromAvailable(physicalSlotRequestsById.values());

在 tryAllocateFromAvailable() 尝试为每一个 slot 请求分配一个 physical slot:

for (PhysicalSlotRequest request : slotRequests) {  // 使用 SlotSelectionStrategy 获取 slot    Optional<SlotSelectionStrategy.SlotInfoAndLocality> slot =  slotSelectionStrategy.selectBestSlotForProfile(  freeSlotInfoTracker, request.getSlotProfile());

这里是根据 slotSelectionStrategy 选择 slot 的。slotSelectionStrategy 的值在 DefaultSchedulerComponents.createPipelinedRegionSchedulerComponents() 中指定:

final SlotSelectionStrategy slotSelectionStrategy =  SlotSelectionStrategyUtils.selectSlotSelectionStrategy(  jobType, jobMasterConfiguration);

这里是根据配置文件选择到底使用哪个策略。

cluster.evenly-spread-out-slot(EVENLY_SPREAD_OUT_SLOTS_STRATEGY) 为 True:// 默认为 falseslotSelectionStrategy = EvenlySpreadOutLocationPreferenceSlotSelectionStrategy  // 均匀分布
否则:slotSelectionStrategy = DefaultLocationPreferenceSlotSelectionStrategy
  1. DefaultLocationPreferenceSlotSelectionStrategy
    选择 slot 的代码如下:
    protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(  @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,  @Nonnull ResourceProfile resourceProfile) {  for (AllocationID allocationId : freeSlotInfoTracker.getAvailableSlots()) {  SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId);  if (candidate.getResourceProfile().isMatching(resourceProfile)) {  return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED));  }  }  return Optional.empty();  
    }
    
    从所有可用的 slot 里顺序选择一个,只有满足资源需求,就直接分配。这样做,容易造成分配的 slot 集中在某几个 TaskManager 上。好处是可以减少不同 TaskManager 之间的通信代价,坏处是不能平衡各个 TaskManager 之间的资源利用率。
  2. EvenlySpreadOutLocationPreferenceSlotSelectionStrategy
    选择 slot 的代码如下:
    protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(  @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,  @Nonnull ResourceProfile resourceProfile) {  return freeSlotInfoTracker.getAvailableSlots().stream()  .map(freeSlotInfoTracker::getSlotInfo)  // 过滤掉不满足资源要求的 slot.filter(slotInfo -> slotInfo.getResourceProfile().isMatching(resourceProfile))  // 获取每个 slot 的资源利用率.map(  slot ->  new Tuple2<>(  slot, freeSlotInfoTracker.getTaskExecutorUtilization(slot)))  // 找到资源利用率最小的 slot.min(Comparator.comparingDouble(tuple -> tuple.f1))  .map(  slotInfoWithTaskExecutorUtilization ->  SlotInfoAndLocality.of(  slotInfoWithTaskExecutorUtilization.f0,  Locality.UNCONSTRAINED));  
    }
    
    从所有满足资源要求的 slot,找到资源利用率最小的 slot,并分配该 slot。这样 slot 分配在各个 TaskManager 之间近似平均。好处是能平衡各个 TaskManager 之间的资源利用率,坏处是不同 TaskManager 之间的通信代价可能较大。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/13587.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【EXCEL_VBA_实战】两组数据比对是否一致(字符串数组)

工作背景&#xff1a;比对两组数据是否一致&#xff08;位置非一一对应&#xff09; 思路构建&#xff1a;两组数据转换为两组字符串数组&#xff0c;比对所包含元素是否相同 问题点&#xff1a;A数组的第一个元素不一定与B数组的第一个元素对应&#xff0c;此时无法通过公式…

es数据备份和迁移Elasticsearch

Elasticsearch数据备份与恢复 前提 # 注意&#xff1a; 1.在进行本地备份时使用--type需要备份索引和数据&#xff08;mapping,data&#xff09; 2.在将数据备份到另外一台ES节点时需要比本地备份多备份一种数据类型&#xff08;analyzer,mapping,data,template&#xff09; …

岛屿问题刷题

200. 岛屿数量 - 力扣&#xff08;LeetCode&#xff09; class Solution {public int numIslands(char[][] grid) {int n grid.length;//grid行数int m grid[0].length;//grid列数int res 0;for(int r 0;r<n;r){for(int c0;c<m;c){if(grid[r][c]1){dfs(grid,r,c);res…

分布式异步框架celery + Redis 安装配置

引入 这里不对web框架做过多说明&#xff0c;到时候在总结一篇 python的常见web框架 django、flask、tornado、sanic、fastapi..各框架区别 - 内部集成功能的多少 django&#xff0c;内部提供了很多组件。 【相对大】flask、tornado、sanic、fastapi… 本身自己功能很少第…

java集合类详解

目录 1、数组导入&#xff1a; 2、单列集合 List接口 1、ArrayList&#xff1a;数组列表 ArrayList类中的方法 2、LinkedList&#xff1a;链表列表 3、Vector&#xff1a;数组列表 4、list集合的遍历 1、for循环遍历 2、增强for循环 3、迭代器遍历 Set接口 1、Has…

data studio连接到虚拟机上的openGauss

参考&#xff1a;使用DataStudio连接本地虚拟机中的opengauss数据库_big data_白日梦想家_胖七七-华为云开发者联盟 本实验虚拟机安装的是CentOS7 数据库版本是&#xff1a;openGauss-5.0.2-CentOS-64bit-all.tar.gz 1.配置pg_hba.conf 首先使用su - omm登录到omm用户&…

MySQL数据库,创建表及其插入数据和查询数据

首先&#xff0c;由上图创建表 mysql> create table worker( -> dept_id int(11) not null, -> emp_id int (11) not null, -> work_time date not null, -> salary float(8,2) not null, -> poli_face varchar(10) not null default 群众, -> name…

华为设备WLAN基础配置

WLAN基础配置之AP上线 配置WLAN无线网络的第一阶段&#xff0c;AP上线技术&#xff1a; 实验目标&#xff1a;使得AP能够获得来自AC的DHCP地址服务的地址&#xff0c;且是该网段地址池中的IP。 实验步骤&#xff1a; 1.把AC当作三层交换机配置虚拟网关 sys Enter system view…

安卓CardView使用

目录 前言一、基础使用1.1 依赖导入1.2 CardView的常用属性1.3 CardView继承关系 二、关于Z轴的概念三、CardView效果3.1 圆角 CardView3.2 阴影 CardView3.3 设置卡片背景3.4 设置卡片背景&#xff08;内部颜色&#xff09;3.5 同时设置背景颜色 前言 CardView是Android支持库…

WXML模板语法-数据绑定

1.数据绑定的基本原则 (1)在data中定义数据 (2)在WXML中使用数据 2.在data页面中定义数据&#xff1a;在页面对应的.js文件中&#xff0c;把数据定义在data对象中即可 &#xff08;这里打错了 应该是数组类型的数据... 报意思啊&#xff09; 3.Mustache语法的格式 把data中的…

低代码开发平台:开启企业数字化转型的快捷通道

低代码开发平台&#xff08;Low-Code Development Platform&#xff09;是近年来企业数字化转型中备受瞩目的技术工具&#xff0c;其被誉为加速业务上线的利器。随着信息技术的迅猛发展&#xff0c;企业对于数字化的需求与日俱增&#xff0c;但传统的软件研发流程往往耗时耗力&…

MATLAB|【免费】融合正余弦和柯西变异的麻雀优化算法SCSSA-CNN-BiLSTM双向长短期记忆网络预测模型

目录 主要内容 部分代码 部分结果一览 下载链接 主要内容 该程序实现多输入单输出预测&#xff0c;通过融合正余弦和柯西变异改进麻雀搜索算法&#xff0c;对CNN-BiLSTM的学习率、正则化参数以及BiLSTM隐含层神经元个数等进行优化&#xff0c;并对比了该改进算法…

摄像头应用测试

作者简介&#xff1a; 一个平凡而乐于分享的小比特&#xff0c;中南民族大学通信工程专业研究生在读&#xff0c;研究方向无线联邦学习 擅长领域&#xff1a;驱动开发&#xff0c;嵌入式软件开发&#xff0c;BSP开发 作者主页&#xff1a;一个平凡而乐于分享的小比特的个人主页…

Linux 信号捕捉与处理

&#x1f493;博主CSDN主页:麻辣韭菜&#x1f493;   ⏩专栏分类&#xff1a;Linux知识分享⏪   &#x1f69a;代码仓库:Linux代码练习&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习更多Linux知识   &#x1f51d; ​ 目录 前言 1. 信号的处理时机 1.1用户…

【排版问题解决】word加入公式时字间距突然变大

出现以下问题 解决方案 第一步:选择段落 第二步 段落括起来后右键选择“段落”- 第三步 “换行和分页”-在换行里打勾“允许西文在单词中间换行”。 恢复格式

vue.js状态管理和服务端渲染

状态管理 vuejs状态管理的几种方式 组件内管理状态&#xff1a;通过data&#xff0c;computed等属性管理组件内部状态 父子组件通信&#xff1a;通过props和自定义事件实现父子组件状态的通信和传递 事件总线eventBus&#xff1a;通过new Vue()实例&#xff0c;实现跨组件通…

LP-MSPM03507学习资料汇总

(因对MSPM0研究不够深入,故暂不开启浏览权限,权当记录学习。但愿尽快掌握供大家免费阅读。有意者可私信我共同学习) 一、延时函数 1、滴答定时器SYSTICK 1.1 SysConfig配置 配置1ms延时函数,并开启中断 1.2 编写延时函数delay_ms unsigned int utick = 0;//滴答定时器中…

57. UE5 RPG 处理AI敌人转向以及拾取物品的问题

在上一篇文章中&#xff0c;我们实现了使用AI行为树控制敌人进行移动&#xff0c;它们可以一直跟随玩家&#xff0c;虽然现在还未实现攻击。但在移动过程中&#xff0c;我发现了有两个问题&#xff0c;第一个是敌人转向的时候很僵硬&#xff0c;可以说是瞬间转向的&#xff0c;…

Vue3实战笔记(39)—封装页脚组件,附源码

文章目录 前言一、封装页脚组件二、使用组件总结 前言 在Web开发中&#xff0c;页脚组件是一个重要的部分&#xff0c;它为用户提供关于网站的信息、导航链接以及版权声明等。而封装页脚组件则是一种高效的方法&#xff0c;可以提高代码的可重用性和可维护性。 一、封装页脚组…

重生之我要精通JAVA--第五周笔记

文章目录 APIJDK7时间Date时间类CalendarSimpleDateFormat 类SimpleDateFormat 类作用 JDK8时间Zoneld时区 包装类Integer成员方法 Arrays Lambda表达式标准格式注意点好处省略写法 集合进阶Collection迭代器遍历Collection集合获取迭代器Iterator中的常用方法细节注意点 增强f…