Flink源码解析之:如何根据JobGraph生成ExecutionGraph

Flink源码解析之:如何根据JobGraph生成ExecutionGraph

在上一篇Flink源码解析中,我们介绍了Flink如何根据StreamGraph生成JobGraph的流程,并着重分析了其算子链的合并过程和JobGraph的构造流程。

对于StreamGraph和JobGraph的生成来说,其都是在客户端生成的,本文将会讲述JobGraph到ExecutionGraph的生成过程,而这一过程会在Flink JobManager的服务端来完成。当JobGraph从客户端提交到JobManager后,JobManager会根据JobGraph生成对应的ExecutionGraph,而ExecutionGraph就是Flink作业调度时使用的核心数据结构。 本篇将会详细介绍JobGraph转换为ExecutionGraph的流程。

主体流程梳理

Flink在将JobGraph转换成ExecutionGraph后,便可以开始执行真正的任务。这一转换流程主要在Flink源码中的DefaultExecutionGraphBuilder类中的buildGraph方法中实现的。在转换过程中,涉及到了一些新的基本概念,先来简单介绍一下这些概念,对于理解ExecutionGraph有较大的帮助:

  • ExecutionJobVertex: 在ExecutionGraph中表示执行顶点,与JobGraph中的JobVertex一一对应。实际上,每个ExecutionJobVertex也是依赖JobVertex来创建的。
  • ExecutionVertex: 在ExecutionJobVertex类中创建,每个并发度都对应了一个ExecutionVertex对象,每个ExecutionVertex都代表JobVertex在某个特定并行子任务中的执行。在实际执行时,每个ExecutionVertex实际上就是一个Task,是ExecutionJobVertex并行执行的一个子任务。
  • Execution: Execution表示ExecutionVertex的一次执行。由于ExecutionVertex可以被执行多次(用于恢复、重新计算、重新分配),这个类用于跟踪该ExecutionVertex的单个执行状态和资源。
  • IntermediateResult: 在JobGraph中用IntermediateDataSet表示上游JobVertex的输出数据流,而在ExecutionGraph中,则用IntermediateResult来表示ExecutionJobVertex的输出数据流。
  • IntermediateResultPartition:这是IntermediateResult的一部分或一个分片。由于有多个并行任务(ExecutionVertex)执行相同的操作,每个任务都会产生一部分IntermediateResult。这些结果在物理存储和计算过程中,可能会被进一步划分成多个分区,每个分区对应一个 IntermediateResultPartition对象。

从上面的基本概念也可以看出,在ExecutionGraph中:

  • 相比StreamGraph和JobGraph,ExecutionGraph是实际根据任务并行度来生成拓扑结构的,在ExecutionGraph中,每个并行子任务都对应一个ExecutionVertex顶点和IntermediateResultPartition输出数据流分区。
  • 在ExecutionGraph中,上下游节点之间的连接是通过ExecutionVertex -> IntermediateResultPartition -> ExecutionVertex 对象来完成的。

整体的执行流程图如下所示:
在这里插入图片描述

入口方法:DefaultExecutionGraphBuilder.buildGraph

ExecutionGraph的生成是在DefaultExecutionGraphBuilder类的buildGraph方法中实现的:

public class DefaultExecutionGraphBuilder {public static DefaultExecutionGraph buildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,ClassLoader classLoader,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,Time rpcTimeout,MetricGroup metrics,BlobWriter blobWriter,Logger log,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,ExecutionDeploymentListener executionDeploymentListener,ExecutionStateUpdateListener executionStateUpdateListener,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();// 创建JobInformationfinal JobInformation jobInformation =new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// Execution 保留的最大历史数final int maxPriorAttemptsHistoryLength =jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);// IntermediateResultPartitions的释放策略final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory =PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(jobManagerConfig);// create a new execution graph, if none exists so farfinal DefaultExecutionGraph executionGraph;try {// 创建默认的ExecutionGraph执行图对象,最后会返回该创建好的执行图对象executionGraph =new DefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,maxPriorAttemptsHistoryLength,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,partitionLocationConstraint,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}// set the basic propertiestry {executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));} catch (Throwable t) {log.warn("Cannot create JSON plan for job", t);// give the graph an empty planexecutionGraph.setJsonPlan("{}");}// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfinal long initMasterStart = System.nanoTime();log.info("Running initialization on master for job {} ({}).", jobName, jobId);for (JobVertex vertex : jobGraph.getVertices()) {String executableClass = vertex.getInvokableClassName();if (executableClass == null || executableClass.isEmpty()) {throw new JobSubmissionException(jobId,"The vertex "+ vertex.getID()+ " ("+ vertex.getName()+ ") has no invokable class.");}try {vertex.initializeOnMaster(classLoader);} catch (Throwable t) {throw new JobExecutionException(jobId,"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),t);}}log.info("Successfully ran initialization on master in {} ms.",(System.nanoTime() - initMasterStart) / 1_000_000);// topologically sort the job vertices and attach the graph to the existing one// 这里会先做一个排序,source源节点会放在最前面,接着开始遍历// 必须保证当前添加到集合的节点的前置节点都已经添加进去了List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();if (log.isDebugEnabled()) {log.debug("Adding {} vertices from job graph {} ({}).",sortedTopology.size(),jobName,jobId);}// 构建执行图的重点方法。生成具体的ExecutionGraphexecutionGraph.attachJobGraph(sortedTopology);if (log.isDebugEnabled()) {log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);}// configure the state checkpointing// checkpoint的相关配置if (isCheckpointingEnabled(jobGraph)) {JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// Maximum number of remembered checkpointsint historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);CheckpointStatsTracker checkpointStatsTracker =new CheckpointStatsTracker(historySize,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// load the state backend from the application settingsfinal StateBackend applicationConfiguredBackend;final SerializedValue<StateBackend> serializedAppConfigured =snapshotSettings.getDefaultStateBackend();if (serializedAppConfigured == null) {applicationConfiguredBackend = null;} else {try {applicationConfiguredBackend =serializedAppConfigured.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not deserialize application-defined state backend.", e);}}// StateBackend配置final StateBackend rootBackend;try {rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend,snapshotSettings.isChangelogStateBackendEnabled(),jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}// load the checkpoint storage from the application settingsfinal CheckpointStorage applicationConfiguredStorage;final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =snapshotSettings.getDefaultCheckpointStorage();if (serializedAppConfiguredStorage == null) {applicationConfiguredStorage = null;} else {try {applicationConfiguredStorage =serializedAppConfiguredStorage.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId,"Could not deserialize application-defined checkpoint storage.",e);}}final CheckpointStorage rootStorage;try {rootStorage =CheckpointStorageLoader.load(applicationConfiguredStorage,null,rootBackend,jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooks// 示例化用户自定义的cp hookfinal SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =snapshotSettings.getMasterHooks();final List<MasterTriggerRestoreHook<?>> hooks;if (serializedHooks == null) {hooks = Collections.emptyList();} else {final MasterTriggerRestoreHook.Factory[] hookFactories;try {hookFactories = serializedHooks.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);}final Thread thread = Thread.currentThread();final ClassLoader originalClassLoader = thread.getContextClassLoader();thread.setContextClassLoader(classLoader);try {hooks = new ArrayList<>(hookFactories.length);for (MasterTriggerRestoreHook.Factory factory : hookFactories) {hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}} finally {thread.setContextClassLoader(originalClassLoader);}}final CheckpointCoordinatorConfiguration chkConfig =snapshotSettings.getCheckpointCoordinatorConfiguration();// 创建CheckpointCoordinator对象executionGraph.enableCheckpointing(chkConfig,hooks,checkpointIdCounter,completedCheckpointStore,rootBackend,rootStorage,checkpointStatsTracker,checkpointsCleaner);}// create all the metrics for the Execution Graph// 添加metrics指标metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));return executionGraph;}

在这个方法里,会先创建一个 ExecutionGraph 对象,然后对 JobGraph 中的 JobVertex 列表做一下排序(先把有 source 节点的 JobVertex 放在最前面,然后开始遍历,只有当前 JobVertex 的前置节点都已经添加到集合后才能把当前 JobVertex 节点添加到集合中),最后通过过 attachJobGraph() 方法生成具体的ExecutionGraph。

在上面的代码中,最需要核心关注的方法是:executionGraph.attachJobGraph(sortedTopology);。该方法是创建ExecutionGraph的核心方法,包括了创建上面我们说的各种ExecutionGraph中涉及的对象,以及连接它们来形成ExecutionGraph拓扑结构。

接下来我们进入该方法来一探究竟。

生成ExecutionGraph:attachJobGraph

先来看下attachJobGraph方法的实现:

public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobException {assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+ "vertices and {} intermediate results.",topologicallySorted.size(),tasks.size(),intermediateResults.size());final long createTimestamp = System.currentTimeMillis();// 遍历排序好的拓扑JobVertexfor (JobVertex jobVertex : topologicallySorted) {if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable = false;}// 获取节点并行度信息VertexParallelismInformation parallelismInfo =parallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graph// 创建ExecutionJobVertexExecutionJobVertex ejv =new ExecutionJobVertex(this,jobVertex,maxPriorAttemptsHistoryLength,rpcTimeout,createTimestamp,parallelismInfo,initialAttemptCounts.getAttemptCounts(jobVertex.getID()));// 重要方法!!!// 构建ExecutionGraph,连接上下游节点ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);if (previousTask != null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",jobVertex.getID(), ejv, previousTask));}// 遍历ExecutionJobVertex的输出IntermediateResultfor (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet =this.intermediateResults.putIfAbsent(res.getId(), res);if (previousDataSet != null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal += ejv.getParallelism();}//将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。
registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);// the topology assigning should happen before notifying new vertices to failoverStrategy// 转换执行拓扑executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategy =// 创建部分组释放策略的方法,依赖于当前的调度的拓扑结构,这决定了当何时释放特定的中间数据结果所需的策略。
partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}

在上面attchGraph方法中,首先遍历输入的排序后的JobVertex列表,对每一个JobVertex:

  • 判断是否停止: 对于单个 JobVertex,如果它是一个输入顶点且不可停止,则整个 Job 不可停止。这在流处理任务中是常见的,一些输入数据源可能无法停止(如Kafka)。
  • 获取并行信息并创建执行的顶点: 根据JobVertex的ID,从parallelismStore中获取并行信息。利用这些信息创建ExecutionJobVertex实例,它代表运行在特定TaskManager上的taskId,可以是待调度、运行或已完成的。
  • 判断新添加的顶点是否已经存在: 如果试图添加一个已经存在的顶点,这意味着存在程序错误,因为每个JobVertex应当有唯一的ID。这将抛出异常。
  • 判断数据集是否已经存在: 同样。如果试图添加一个已经存在的IntermediateResult,这将抛出异常。
  • 添加执行顶点到创建顺序列表和增加总的顶点数量: 记录创建顶点的顺序能够确保在执行时能够按照正确的依赖关系进行。并同时更新总的顶点数量。

遍历完成后, 注册执行顶点和结果分区,将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。

利用DefaultExecutionTopology工具类将ExecutionGraph转换为SchedulingTopology,这样便于任务调度器进行处理。

最后,调用partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology())根据当前的调度的拓扑结构来创建组释放策略,这决定了当何时释放特定的中间数据结果所需的策略。

上面流程中,最需要关注的方法就是new ExecutionJobVertexejv.connectToPredecessors(this.intermediateResults);

接下来,我们分别对其进行探究。

创建 ExecutionJobVertex 对象

进入到该方法的源码中:

@VisibleForTesting
public ExecutionJobVertex(InternalExecutionGraphAccessor graph,JobVertex jobVertex,int maxPriorAttemptsHistoryLength,Time timeout,long createTimestamp,VertexParallelismInformation parallelismInfo,SubtaskAttemptNumberStore initialAttemptCounts)throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}this.graph = graph;this.jobVertex = jobVertex;this.parallelismInfo = parallelismInfo;// verify that our parallelism is not higher than the maximum parallelismif (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) {throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",jobVertex.getName(),this.parallelismInfo.getParallelism(),this.parallelismInfo.getMaxParallelism()));}this.resourceProfile =ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = checkNotNull(jobVertex.getSlotSharingGroup());this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets =new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] =new IntermediateResult(result.getId(),this,this.parallelismInfo.getParallelism(),result.getResultType());}// create all task verticesfor (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {ExecutionVertex vertex =new ExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,maxPriorAttemptsHistoryLength,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i] = vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor (IntermediateResult ir : this.producedDataSets) {if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) {throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");}}final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =getJobVertex().getOperatorCoordinators();if (coordinatorProviders.isEmpty()) {this.operatorCoordinators = Collections.emptyList();} else {final ArrayList<OperatorCoordinatorHolder> coordinators =new ArrayList<>(coordinatorProviders.size());try {for (final SerializedValue<OperatorCoordinator.Provider> provider :coordinatorProviders) {coordinators.add(OperatorCoordinatorHolder.create(provider, this, graph.getUserClassLoader()));}} catch (Exception | LinkageError e) {IOUtils.closeAllQuietly(coordinators);throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);}this.operatorCoordinators = Collections.unmodifiableList(coordinators);}// set up the input splits, if the vertex has anytry {@SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource =(InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if (splitSource != null) {Thread currentThread = Thread.currentThread();ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try {inputSplits =splitSource.createInputSplits(this.parallelismInfo.getParallelism());if (inputSplits != null) {splitAssigner = splitSource.getInputSplitAssigner(inputSplits);}} finally {currentThread.setContextClassLoader(oldContextClassLoader);}} else {inputSplits = null;}} catch (Throwable t) {throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);}
}

在上面这段代码中,主要实现了ExecutionVertex的创建和IntermediateResult对象的创建:

  • 遍历当前JobVertex的输出IntermediateDataSet列表,并根据IntermediateDataSet来创建相应的IntermediateResult对象。每个IntermediateDataSet都会对应一个IntermediateResult
  • 根据当前JobVertex的并发度,来创建相同数量的ExecutionVertex对象,每个ExecutionVertex对象代表一个并行计算任务,在实际执行时就是一个Task任务。

创建ExecutionVertex对象

进一步地,我们观察创建ExecutionVertex对象的实现逻辑如下所示:

public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex,IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength,int initialAttemptCount) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.executionVertexId = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);this.taskNameWithSubtask =String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(),subTaskIndex + 1,jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);// 根据IntermediateResult创建当前subTaskIndex分区下的IntermediateResultPartitonfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp =new IntermediateResultPartition(result,this,subTaskIndex,getExecutionGraphAccessor().getEdgeManager());// 记录当前分区的irp到ir中result.setPartition(subTaskIndex, irp);// 记录分区ip与irp的对应关系resultPartitions.put(irp.getPartitionId(), irp);}this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);// 创建对应的Execution对象,初始化时initialAttempCount为0,如果后面重新调度这个task,它会自增加1this.currentExecution =new Execution(getExecutionGraphAccessor().getFutureExecutor(),this,initialAttemptCount,createTimestamp,timeout);getExecutionGraphAccessor().registerExecution(currentExecution);this.timeout = timeout;this.inputSplits = new ArrayList<>();
}

上述创建ExecutionVertex的过程主要实现了以下步骤:

  1. 生成中间结果分区IntermediateResultPartition
    中间结果分区代表一个并行任务产生的输出,同一并行任务可能会有多个输出(对应多个后续任务),也就是多个中间结果分区。
  • 基于 result,在相应的索引 subTaskIndex 上创建一个 IntermediateResultPartition 并给它赋值。IntermediateResultPartition 提供了并行任务的输出数据,对应于某个特定执行顶点 ExecutionVertex 的并行子任务。
  • 在创建过程中,需要使用 getExecutionGraphAccessor().getEdgeManager() 获取边管理器,边管理器是用于维护这个分区与其它 ExecutionVertex 之间的连接关系。
  • 记录这个 IntermediateResultPartition 到 result 中的相应索引位置,并在 resultPartitions 映射表中保存 IntermediateResultPartition。
  1. 创建执行(Execution)对象:
    这一过程是基于 Execution 的构造函数引发的。它用于代表该 ExecutionVertex 在某一特定点时间的一次尝试执行。创建 Execution 实例后,会将其注册到执行图(ExecutionGraph)中,以便于后续调度和执行任务。

通过以上流程,生成了中间结果分区,映射了每一个分区和其对应的任务关系,并且创建了 Execution 对象用于管理并跟踪任务的执行状态。

在创建好ExecutionVertex和IntermediateResultPartition后,根据上面的流程图,就是考虑如何将它们进行连接生成ExecutionGraph了。

这部分的实现逻辑就在attachJobGraph方法的ejv.connectToPredecessors(this.intermediateResults);方法中实现的。

生成ExecutionGraph

同样地,我们进入源码来深入观察一下实现逻辑:

public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets)throws JobException {List<JobEdge> inputs = jobVertex.getInputs();if (LOG.isDebugEnabled()) {LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.",jobVertex.getID(), jobVertex.getName(), inputs.size()));}for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num);if (LOG.isDebugEnabled()) {if (edge.getSource() == null) {LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",num,jobVertex.getID(),jobVertex.getName(),edge.getSourceId()));} else {LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",num,jobVertex.getID(),jobVertex.getName(),edge.getSource().getProducer().getID(),edge.getSource().getProducer().getName()));}}// fetch the intermediate result via ID. if it does not exist, then it either has not// been created, or the order// in which this method is called for the job vertices is not a topological orderIntermediateResult ires = intermediateDataSets.get(edge.getSourceId());if (ires == null) {throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "+ edge.getSourceId());}this.inputs.add(ires);EdgeManagerBuildUtil.connectVertexToResult(this, ires, edge.getDistributionPattern());}
}

这段代码主要完成了将当前的ExecutionJobVertex与其前置任务(predecessors)连接的流程。传入的参数intermediateDatasets包含了JobGraph中所有的中间计算结果,这些结果是由上游前置任务产生的。

需要注意的是,该过程要求连接操作的执行顺序应遵循任务的拓扑顺序。Flink的计算任务通常由多个阶段组成,每个阶段的输出是下一个阶段的输入,每个阶段(JobVertex)都处理一种类型的计算,例如map或reduce。

流程大致如下:

  • 获取输入: 首先获取jobVertex的输入,输入是JobEdge列表,每一条JobEdge都代表一个上游产生的中间数据集和连接上下游的方式(例如HASH, BROADCAST)。
  • 循环处理每个输入: 然后遍历这些inputs,对于每一条JobEdge:
    • 基于edge.getSourceId()从intermediateDatasets获取IntermediateResult,这是一个中间计算结果。
    • 检查该中间结果是否存在,如果不存在,则表示这不是一个拓扑排序,因为预期的情况是当你尝试访问一个中间结果时,它应该已经被创建了。如果找不到,抛出一个异常。
    • 如果存在(没有异常被抛出),将找到的IntermediateResult添加到ExecutionJobVertex的inputs(List类型)中,这样当前任务就知道它的输入来自哪些中间结果。
    • 调用EdgeManagerBuildUtil.connectVertexToResult方法来建立当前ExecutionJobVertex与找到的IntermediateResult之间的连接。 EdgeManager是Flink中负责管理输入输出边的组件,它显示地记录了发送端的分区和接收端的分区对应关系。

这个流程重要的是建立了Job中每个任务的执行依赖关系,并明确了数据传输的方式,让任务在执行时清楚自己的输入来自哪里,当任务执行完成后,它产生的输出会通过何种方式被发送到哪些任务。

具体的连接方式,我们需要继续进入到EdgeManagerBuildUtil.connectVertexToResult方法中。其源码如下所示:

/*** Calculate the connections between {@link ExecutionJobVertex} and {@link IntermediateResult} ** based on the {@link DistributionPattern}.** @param vertex the downstream consumer {@link ExecutionJobVertex}* @param intermediateResult the upstream consumed {@link IntermediateResult}* @param distributionPattern the {@link DistributionPattern} of the edge that connects the*     upstream {@link IntermediateResult} and the downstream {@link IntermediateResult}*/
static void connectVertexToResult(ExecutionJobVertex vertex,IntermediateResult intermediateResult,DistributionPattern distributionPattern) {switch (distributionPattern) {// 点对点的连接方式case POINTWISE:connectPointwise(vertex.getTaskVertices(), intermediateResult);break;// 全连接的方式case ALL_TO_ALL:connectAllToAll(vertex.getTaskVertices(), intermediateResult);break;default:throw new IllegalArgumentException("Unrecognized distribution pattern.");}
}

会根据DistributionPattern选择不同的连接方式,这里主要分两种情况,DistributionPattern是跟Partitioner的配置有关。

这里以POINTWISE的连接方式来举例,看一下其是如何在构造ExecutionGraph时连接上下游节点的。

private static void connectPointwise(ExecutionVertex[] taskVertices, IntermediateResult intermediateResult) {final int sourceCount = intermediateResult.getPartitions().length;final int targetCount = taskVertices.length;if (sourceCount == targetCount) {for (int i = 0; i < sourceCount; i++) {ExecutionVertex executionVertex = taskVertices[i];IntermediateResultPartition partition = intermediateResult.getPartitions()[i];ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());partition.addConsumers(consumerVertexGroup);ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(partition.getPartitionId(), intermediateResult);executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);}} else if (sourceCount > targetCount) {for (int index = 0; index < targetCount; index++) {ExecutionVertex executionVertex = taskVertices[index];ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());int start = index * sourceCount / targetCount;int end = (index + 1) * sourceCount / targetCount;List<IntermediateResultPartitionID> consumedPartitions =new ArrayList<>(end - start);for (int i = start; i < end; i++) {IntermediateResultPartition partition = intermediateResult.getPartitions()[i];partition.addConsumers(consumerVertexGroup);consumedPartitions.add(partition.getPartitionId());}ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(consumedPartitions, intermediateResult);executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);}} else {for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {IntermediateResultPartition partition =intermediateResult.getPartitions()[partitionNum];ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(partition.getPartitionId(), intermediateResult);int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;List<ExecutionVertexID> consumers = new ArrayList<>(end - start);for (int i = start; i < end; i++) {ExecutionVertex executionVertex = taskVertices[i];executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);consumers.add(executionVertex.getID());}ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromMultipleVertices(consumers);partition.addConsumers(consumerVertexGroup);}}
}

上面这段代码的目的是通过“点对点”的方式(即每个生产者产生的数据只被一个消费者消费)建立任务节点(ExecutionVertex)与中间结果集(IntermediateResultPartition)之间的连接关系。

这个方法的逻辑主要是根据上游任务产生的IntermediateResultPartition的数量(源)和下游ExecutionVertex节点数量(目标)的比例关系,做不同的操作:

  • 源和目标数量相等:方法会将每个源中间结果分区与对应的下游ExecutionVertex节点连接。这种情况下,每个任务都完全独立,只会消费一个特定的上游中间结果分区
  • 源数量大于目标数量:源中间结果分区会被尽可能平均地分配给下游ExecutionVertex节点,即每个ExecutionVertex可能会消费多个源中间结果分区数据。
  • 源数量小于目标数量:每个源中间结果分区可能会被分配给多个下游ExecutionVertex节点消费,即多个ExecutionVertex节点可能消费同一个源中间结果分区数据。

⠀在执行连接的过程中,会创建ConsumerVertexGroup和ConsumedPartitionGroup:

  • ConsumerVertexGroup包含一组接收同一个中间结果分区(IntermediateResultPartition)的顶点集合。
  • ConsumedPartitionGroup包含顶点要消费的一组中间结果分区。

⠀注意,当源数量小于目标数量时,会有多个任务消费同一个源数据,所以需要使用ConsumerVertexGroup.fromMultipleVertices(consumers)来创建ConsumerVertexGroup。

几种连接情况的示例图如下所示:
在这里插入图片描述

到这里,这个作业的 ExecutionGraph 就创建完成了,有了 ExecutionGraph,JobManager 才能对这个作业做相应的调度。

总结

本文详细介绍了JobGraph生成ExecutionGraph的流程,介绍了ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition相关概念的原理和生成过程。最后我们介绍了Flink在生成ExecutionGraph时是如何实现IntermediateResultPartition和ExecutionVertex的连接的。

到这里,StreamGraph、JobGraph和Execution的生成过程,在最近的三篇文章中都已经详细讲解完成了,当然除了我们介绍的内容外,还有更多的实现细节没有介绍,有兴趣的读者可以参考文本来阅读源码,以此加深自己的理解和对更多实现细节的挖掘。

最后,再对StreamGraph、JobGraph和ExecutionGraph做一个总结:

  • StreamGraph. StreamGraph 是表示 Flink 流计算的图模型,它是用户定义的计算逻辑的内部表示形式,是最原始的用户逻辑,一个没有做任何优化的DataFlow;
  • JobGraph. JobGraph 由一个或多个 JobVertex 对象和它们之间的 JobEdge 对象组成,包含并行任务的信息。在JobGraph中对StreamGraph进行了优化,将能够合并在同个算子链中的操作符进行合并,以此减少任务执行时的上下文切换,提任务执行性能。
  • ExecutionGraph. ExecutionGraph是 JobGraph 的并发执行版本,由 ExecutionVertex 和 IntermediateResultPartition 组成。每个 JobVertex 会被转换为一个或多个 ExecutionVertex,ExecutorGraph 包含了每个任务的全部实例,包含它们的状态、位置、输入输出结果。ExecutionGraph 是 Flink 中最核心的部分,它用于任务的调度、失败恢复等。

参考:
https://matt33.com/2019/12/20/flink-execution-graph-4/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/65872.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生成式AI与RAG架构:如何选择合适的向量数据库?

大规模语言模型和情境感知的AI应用程序推动了检索增强生成&#xff08;RAG&#xff09;架构的发展&#xff0c;使其成为关注的焦点。RAG将生成模型的力量与外部知识相结合&#xff0c;允许系统生成更加具体且与情境相关的回应。 向量数据库构成了RAG系统的基石。选择正确的向量…

电脑找不到mfc110.dll文件要如何解决?Windows缺失mfc110.dll文件快速解决方法

一、mfc110.dll文件的重要性 mfc110.dll&#xff0c;全称Microsoft Foundation Class Library 110&#xff0c;是Microsoft Visual C Redistributable for Visual Studio 2012的一部分。这个动态链接库&#xff08;DLL&#xff09;文件对于支持基于MFC&#xff08;Microsoft F…

大模型Weekly 03|OpenAI o3发布;DeepSeek-V3上线即开源!

大模型Weekly 03&#xff5c;OpenAI o3发布&#xff1b;DeepSeek-V3上线即开源&#xff01;DeepSeek-V3上线即开源&#xff1b;OpenAI 发布高级推理模型 o3https://mp.weixin.qq.com/s/9qU_zzIv9ibFdJZ5cTocOw?token47960959&langzh_CN 「青稞大模型Weekly」&#xff0c;持…

4、上一个接口返回值,作为下一个方法(接口)的变量

import requestsclass TestCase:# 设置1个类变量B "初始值"def test1(self):url "**这里是接口url**"params {"type": "json"}resp1 requests.get(urlurl, paramsparams)# .json()用于将服务器返回的 JSON 格式的响应内容解析为 P…

USB 中断传输的 PID 序列

中断传输的 PID 序列 端点在初始化后&#xff0c;从 DATA0 开始&#xff0c;每成功执行一个事务&#xff0c;数据包序列翻转一次&#xff08;从 DATA0 变为DATA1 或从 DATA1 变为 DATA0)。 数据翻转和传输的个数没有直接关系&#xff0c;只由端点在初始化后处理的总数决定。 …

SAP财务凭证的更改、冲销的方式

文章目录 一、财务凭证更改二、财务凭证冲销 【SAP系统研究】 #SAP #FICO #SAP财务 一、财务凭证更改 &#xff08;1&#xff09;已经过账的财务凭证 FB02&#xff1a;过完帐的允许更改的地方有限&#xff0c;只有凭证抬头文本、参照、分配、文本、原因代码等。 &#xff0…

OpenCV的人脸检测模型FaceDetectorYN

OpenCV的人脸检测模型FaceDetectorYN 1. 官网地址2. 如何使用2.1.到opencv_zoo下载模型文件和代码2.2. 下载文件展示2.3. 修改了demo支持读取视频文件&#xff0c;默认是图片和摄像头## 2.4 效果展示 1. 官网地址 https://docs.opencv.org/4.x/df/d20/classcv_1_1FaceDetector…

vue使用el-select下拉框自定义复选框

在 Vue 开发中&#xff0c;高效且美观的组件能极大地提升用户体验和开发效率。在vue中使用elementplus 的 el-select下拉框实现了一个自定义的多选下拉框组件。 一、代码功能概述 这段代码创建了一个可多选的下拉框组件&#xff0c;通过el-select和el-checkbox-group结合的方…

01-英语准备

首先是自我介绍&#xff0c;中英文都可以&#xff0c;建议提前打好草稿然后开始背&#xff0c;模板网上有很多&#xff0c;可以自行查找&#xff0c;主要就是个人的一些基本情况&#xff0c;竞赛获奖经历&#xff0c;感兴趣的方向等等。接下来就是老师问的一些问题了。 做个英文…

亚信科技研发智能化实践之路

作者&#xff1a;亚信科技高级研发经理史伟星 亚信科技是一家专注于 To B 业务的公司。公司 1993 年成立&#xff0c;于 2000 年成为纳斯达克首批上市的高科技企业。2010 年&#xff0c;通过持续深耕&#xff0c;成为中国领先的通信软件产品服务商。2014 年&#xff0c;完成私…

==和===的区别,被坑的一天

在 JavaScript 中&#xff0c; 和 都用于比较两个值&#xff0c;但它们有一个重要的区别&#xff1a; 1. (宽松相等运算符) 进行比较时&#xff0c;会 自动类型转换&#xff08;也叫做强制类型转换&#xff09;&#xff0c;即如果比较的两个值的类型不同&#xff0c;JavaScr…

如何不修改模型参数来强化大语言模型 (LLM) 能力?

前言 如果你对这篇文章感兴趣&#xff0c;可以点击「【访客必读 - 指引页】一文囊括主页内所有高质量博客」&#xff0c;查看完整博客分类与对应链接。 大语言模型 (Large Language Model, LLM, e.g. ChatGPT) 的参数量少则几十亿&#xff0c;多则上千亿&#xff0c;对其的训…

使用Python和OpenCV进行视觉图像分割

简介&#x1f381; 在图像处理领域&#xff0c;图像分割是一项基础且关键的技术&#xff0c;它涉及到将图像划分为若干个具有特定属性的区域。本文将通过一个实践项目&#xff0c;展示如何使用Python编程语言&#xff0c;结合OpenCV库&#xff0c;对一张玫瑰花的图片进行图像分…

代码解析:安卓VHAL的AIDL参考实现

以下内容基于安卓14的VHAL代码。 总体架构 参考实现采用双层架构。上层是 DefaultVehicleHal&#xff0c;实现了 VHAL AIDL 接口&#xff0c;并提供适用于所有硬件设备的通用 VHAL 逻辑。下层是 FakeVehicleHardware&#xff0c;实现了 IVehicleHardware 接口。此类可模拟与实…

vLLM结构化输出(Guided Decoding)

简介 vLLM 的结构化输出特性是通过“引导式解码”&#xff08;Guided Decoding&#xff09;实现的&#xff0c;这一功能允许模型在生成文本时遵循特定的格式约束&#xff0c;例如 JSON 模式或正则表达式&#xff0c;从而确保生成的内容符合预期的结构化要求。 后端引擎 启动…

部署SenseVoice

依赖 Conda cuda pythor 查看GPU版本-CSDN博客 创建虚拟conda环境 conda create --name deeplearn python3.10 conda activate deeplearn git clone https://github.com/FunAudioLLM/SenseVoice.git cd SenseVoice pip install -r requirements.txt pip install gradio pip …

基于51单片机(STC32G12K128)和8X8彩色点阵屏(WS2812B驱动)的小游戏《贪吃蛇》

目录 系列文章目录前言一、效果展示二、原理分析三、各模块代码1、定时器02、矩阵按键模块3、8X8彩色点阵屏 四、主函数总结 系列文章目录 前言 《贪吃蛇》&#xff0c;一款经典的、怀旧的小游戏&#xff0c;单片机入门必写程序。 以《贪吃蛇》为载体&#xff0c;熟悉各种屏幕…

关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题

flinkCDC监控mysql binlog时&#xff0c;datetime类型自动转换成时间戳类型 问题解决1.自定义转换器类2.代码引用 结果 问题 flink版本&#xff1a;1.18.1&#xff0c;mysql版本&#xff1a;8.0.40 使用FlinkCDC的MySqlSource 连接mysql&#xff0c;对于datetime 类型字段&…

SwiftUI 撸码常见错误 2 例漫谈

概述 在 SwiftUI 日常撸码过程中&#xff0c;头发尚且还算茂盛的小码农们经常会犯这样那样的错误。虽然犯这些错的原因都很简单&#xff0c;但有时想要快速准确的定位它们却并不容易。 况且这些错误还可能在模拟器和 Xcode 预览&#xff08;Preview&#xff09;表现的行为不甚…

【Unity】 HTFramework框架(五十八)【进阶篇】资源及代码热更新实战演示(Deployment + HybridCLR)

更新日期&#xff1a;2025年1月2日。 Github源码&#xff1a;[点我获取源码] 索引 资源及代码热更新实战演示运行演示Demo1.克隆项目工程2.更新子模块3.打开项目4.打开入口场景5.设置远端资源服务器地址6.导入HybridCLR7.初始化HybridCLR8.发布项目9.部署资源版本10.运行Exe11.…