Flink 调度源码分析1:拓扑图创建与提交过程
Flink 调度源码分析2:调度过程
Flink 调度源码分析3:Shared Slot 分配策略
Flink 调度源码分析4:Physical Slot 分配过程
1 整体过程
在 SlotSharingExecutionSlotAllocator.allocateSlotsForVertices() 中,会检查共享组是否有 slot,如果没有的话,会在下一步使用 PhysicalSlotProvider 为其分配 slot。
// 检查共享组是否有 slot
Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots = tryAssignExistingSharedSlots(groupsToAssign);
slots.putAll(assignedSlots);
groupsToAssign.removeAll(assignedSlots.keySet()); // 对没有 slot 的共享组分配 slot
if (!groupsToAssign.isEmpty()) { Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever); slots.putAll(allocatedSlots); groupsToAssign.removeAll(allocatedSlots.keySet()); // 所有的共享组一定有共享 slot Preconditions.checkState(groupsToAssign.isEmpty());
}
接下来查看 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 函数(注意这个函数,后面会多次提到这里)。
2 创建 slot 请求
2.1 获取 slot 配置
在 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 中,会对每一个共享组执行下面代码。
// 使用 SharedSlotProfileRetriever 创建 slot 配置文件
ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);
SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
会得到一个 SlotProfile。SlotProfile 是 task 希望调度的 slot 的配置文件。配置文件包含资源或位置限制等属性,其中一些可能是硬限制,也可能是软限制。它还包含 physical slot 的资源信息,当 shared slot 没有可用的 physical slot 时,可使用这些信息分配 physical slot。可以生成一个 matcher,通过在 SlotContext 中对 SlotProfile 以及其他要求进行匹配,筛选出候选 slot。
SlotProfile 包含下面这些属性。
/** This specifies the desired resource profile for the task slot. */
private final ResourceProfile taskResourceProfile;
/** This specifies the desired resource profile for the physical slot to host this task slot. */
private final ResourceProfile physicalSlotResourceProfile;
/** This specifies the preferred locations for the slot. */
private final Collection<TaskManagerLocation> preferredLocations;
/** This contains desired allocation ids of the slot. */
private final Collection<AllocationID> preferredAllocations;
/** This contains all reserved allocation ids from the whole execution graph. */
private final Set<AllocationID> reservedAllocations;
- taskResourceProfile 和 physicalSlotResourceProfile 是配置,两个一般是相等的。
- preferredLocations 表示期望得到哪个 taskmanager 的 slot。
- preferredAllocations 表示希望得到哪个 AllocationID,reservedAllocations存储了已经被分配的 reservedAllocations。
AllocationID:JobManager 已分配 physical slot 的唯一标识符。该 ID 在 JobManager 首次请求 slot 时分配,并在重新分配时保持不变。JobManager 和 ResourceManager 使用此 ID 来跟踪和同步哪些 slot 分配给了哪个 TaskManager,哪些是空闲的。与 AllocationID 不同,SlotRequestId 用于任务从 SlotPool 请求 logical slot 时。多个 SlotRequestId 可以映射到一个 AllocationID(由于槽共享)。
然后看看 sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile) 做了什么。
public SlotProfile getSlotProfile( ExecutionSlotSharingGroup executionSlotSharingGroup, ResourceProfile physicalSlotResourceProfile) { Collection<AllocationID> priorAllocations = new HashSet<>(); Collection<TaskManagerLocation> preferredLocations = new ArrayList<>(); for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) { priorAllocationIdRetriever.apply(execution).ifPresent(priorAllocations::add); preferredLocations.addAll( preferredLocationsRetriever.getPreferredLocations( execution, producersToIgnore)); } // 创建 SlotProfilereturn SlotProfile.priorAllocation( physicalSlotResourceProfile, physicalSlotResourceProfile, preferredLocations, // 指定 slot 位置的选择priorAllocations, reservedAllocationIds);
}
2.2 slot 优先位置
怎么确定 SlotProfile 中的 preferredLocations 参数的值?
位置的确定涉及两种接口:StateLocationRetriever 和 InputsLocationsRetriever。通过这两种获取优先部署位置。StateLocationRetriever 会获取每个执行节点的状态所在的位置。InputsLocationsRetriever 会获取当前节点的输入的所在位置。这两个逻辑在 SchedulerBase 构造函数中创建:
stateLocationRetriever = // StateLocationRetriever 是只有一个方法的接口,所以这直接通过lambda函数创建实例executionVertexId -> getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();
inputsLocationsRetriever = // 类为 ExecutionGraphToInputsLocationsRetrieverAdapternew ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
在 SlotSharingExecutionSlotAllocatorFactory.createInstance() 中制定了优先位置检索器。
SyncPreferredLocationsRetriever preferredLocationsRetriever = new DefaultSyncPreferredLocationsRetriever(context, context);
查看怎么决定优先位置的代码:
MergingSharedSlotProfileRetriever.getSlotProfile()
-> preferredLocationsRetriever.getPreferredLocations(execution, producersToIgnore)-> asyncPreferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore) // 这里虽然写着 async,但其实是同步的,也就是必须这个函数运行成功,才会执行下一步。也就是说这个位置必须是立即可用的,否则就不能用。-> getPreferredLocationsBasedOnInputs(executionVertexId, producersToIgnore)
getPreferredLocationsBasedOnInputs() 中的代码如下:
private CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs( final ExecutionVertexID executionVertexId, final Set<ExecutionVertexID> producersToIgnore) { CompletableFuture<Collection<TaskManagerLocation>> preferredLocations = CompletableFuture.completedFuture(Collections.emptyList()); final Collection<ConsumedPartitionGroup> consumedPartitionGroups = inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId); for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) { // 为了避免太过分散,如果上游算子过多,则不获取它们的位置 if (consumedPartitionGroup.getConsumerVertexGroup().size() > MAX_DISTINCT_CONSUMERS_TO_CONSIDER) { continue; } // 获取上游节点的位置final Collection<CompletableFuture<TaskManagerLocation>> locationsFutures = getInputLocationFutures( producersToIgnore, inputsLocationsRetriever.getProducersOfConsumedPartitionGroup( consumedPartitionGroup)); preferredLocations = combineLocations(preferredLocations, locationsFutures); } return preferredLocations;
}
这里返回的 preferredLocations 最终会传递给 SlotProfile。
2.3 slot 请求
下一步需要创建 PhysicalSlotRequest:
PhysicalSlotRequest request = new PhysicalSlotRequest( physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
PhysicalSlotRequest 包含以下内容:
private final SlotRequestId slotRequestId;
private final SlotProfile slotProfile;
private final boolean slotWillBeOccupiedIndefinitely; // jobType == JobType.STREAMING
3 分配 physical slot
在 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 通过下面的代码分配 slot。
Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult = slotProvider.allocatePhysicalSlots(slotRequests);
这里的 slotProvider 是 PhysicalSlotProvider 类。在 DefaultSchedulerComponents.createPipelinedRegionSchedulerComponents() 中创建了 PhysicalSlotProvider。可以看到它实际是个 PhysicalSlotProviderImpl。
final PhysicalSlotProvider physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
在 slotProvider.allocatePhysicalSlots() 中尝试为每一个 slot 分配请求执行如下代码:
// 尝试从可用的 slots 中,为每个请求分配一个 physicalSlotMap<SlotRequestId, Optional<PhysicalSlot>> availablePhysicalSlots = tryAllocateFromAvailable(physicalSlotRequestsById.values());
在 tryAllocateFromAvailable() 尝试为每一个 slot 请求分配一个 physical slot:
for (PhysicalSlotRequest request : slotRequests) { // 使用 SlotSelectionStrategy 获取 slot Optional<SlotSelectionStrategy.SlotInfoAndLocality> slot = slotSelectionStrategy.selectBestSlotForProfile( freeSlotInfoTracker, request.getSlotProfile());
这里是根据 slotSelectionStrategy 选择 slot 的。slotSelectionStrategy 的值在 DefaultSchedulerComponents.createPipelinedRegionSchedulerComponents() 中指定:
final SlotSelectionStrategy slotSelectionStrategy = SlotSelectionStrategyUtils.selectSlotSelectionStrategy( jobType, jobMasterConfiguration);
这里是根据配置文件选择到底使用哪个策略。
cluster.evenly-spread-out-slot(EVENLY_SPREAD_OUT_SLOTS_STRATEGY) 为 True:// 默认为 falseslotSelectionStrategy = EvenlySpreadOutLocationPreferenceSlotSelectionStrategy // 均匀分布
否则:slotSelectionStrategy = DefaultLocationPreferenceSlotSelectionStrategy
- DefaultLocationPreferenceSlotSelectionStrategy
选择 slot 的代码如下:
从所有可用的 slot 里顺序选择一个,只有满足资源需求,就直接分配。这样做,容易造成分配的 slot 集中在某几个 TaskManager 上。好处是可以减少不同 TaskManager 之间的通信代价,坏处是不能平衡各个 TaskManager 之间的资源利用率。protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference( @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull ResourceProfile resourceProfile) { for (AllocationID allocationId : freeSlotInfoTracker.getAvailableSlots()) { SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId); if (candidate.getResourceProfile().isMatching(resourceProfile)) { return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED)); } } return Optional.empty(); }
- EvenlySpreadOutLocationPreferenceSlotSelectionStrategy
选择 slot 的代码如下:
从所有满足资源要求的 slot,找到资源利用率最小的 slot,并分配该 slot。这样 slot 分配在各个 TaskManager 之间近似平均。好处是能平衡各个 TaskManager 之间的资源利用率,坏处是不同 TaskManager 之间的通信代价可能较大。protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference( @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull ResourceProfile resourceProfile) { return freeSlotInfoTracker.getAvailableSlots().stream() .map(freeSlotInfoTracker::getSlotInfo) // 过滤掉不满足资源要求的 slot.filter(slotInfo -> slotInfo.getResourceProfile().isMatching(resourceProfile)) // 获取每个 slot 的资源利用率.map( slot -> new Tuple2<>( slot, freeSlotInfoTracker.getTaskExecutorUtilization(slot))) // 找到资源利用率最小的 slot.min(Comparator.comparingDouble(tuple -> tuple.f1)) .map( slotInfoWithTaskExecutorUtilization -> SlotInfoAndLocality.of( slotInfoWithTaskExecutorUtilization.f0, Locality.UNCONSTRAINED)); }