JobManager作为actor,
case SubmitJob(jobGraph, listeningBehaviour) =>val client = sender()val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),jobGraph.getSessionTimeout)submitJob(jobGraph, jobInfo)
submitJob,做3件事、
根据JobGraph生成ExecuteGraph
恢复状态CheckpointedState,或者Savepoint
提交ExecuteGraph给Scheduler进行调度
ExecuteGraph
executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph, //currentJobs.get(jobGraph.getJobID),对应的jobid是否有现存的ExecuteGraph jobGraph,flinkConfiguration, //配置futureExecutor, //Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-future-", "-thread-")),根据cpu核数创建的线程池ioExecutor, // Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-io-", "-thread-"))userCodeLoader, //libraryCacheManager.getClassLoader(jobGraph.getJobID),从jar中加载checkpointRecoveryFactory, //用于createCheckpointStore和createCheckpointIDCounter,standalone和zk两种 Time.of(timeout.length, timeout.unit),restartStrategy, //job重启策略 jobMetrics,numSlots, //scheduler.getTotalNumberOfSlots(),注册到该JM上的instances一共有多少slotslog.logger)
ExecutionGraphBuilder.buildGraph
New
// create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(futureExecutor,ioExecutor,jobId,jobName,jobGraph.getJobConfiguration(),jobGraph.getSerializedExecutionConfig(),timeout,restartStrategy,jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths(),classLoader,metrics);} catch (IOException e) {throw new JobException("Could not create the execution graph.", e);}
attachJobGraph,生成Graph的节点和边
// topologically sort the job vertices and attach the graph to the existing oneList<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology);
ExecutionGraph.attachJobGraph
for (JobVertex jobVertex : topologiallySorted) {// create the execution job vertex and attach it to the graphExecutionJobVertex ejv =new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);ejv.connectToPredecessors(this.intermediateResults);//All job vertices that are part of this graph, ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasksExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);for (IntermediateResult res : ejv.getProducedDataSets()) {//All intermediate results that are part of this graph//ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResultsIntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);}//All vertices, in the order in which they were created//List<ExecutionJobVertex> verticesInCreationOrderthis.verticesInCreationOrder.add(ejv);}
将JobVertex封装成ExecutionJobVertex
会依次创建出ExecutionJobVertex,ExecutionVertex, Execution; IntermediateResult, IntermediateResultPartition
ExecutionJobVertex
public ExecutionJobVertex(ExecutionGraph graph,JobVertex jobVertex,int defaultParallelism,Time timeout,long createTimestamp) throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}//并发度,决定有多少ExecutionVertexint vertexParallelism = jobVertex.getParallelism();int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;//产生ExecutionVertexthis.taskVertices = new ExecutionVertex[numTaskVertices];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = jobVertex.getSlotSharingGroup();this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()]; //创建用于存放中间结果的IntermediateResultfor (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] = new IntermediateResult( //将JobGraph中的IntermediateDataSet封装成IntermediateResult result.getId(),this,numTaskVertices,result.getResultType());}// create all task verticesfor (int i = 0; i < numTaskVertices; i++) {ExecutionVertex vertex = new ExecutionVertex( //初始化ExecutionVertexthis, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);this.taskVertices[i] = vertex; // }finishedSubtasks = new boolean[parallelism];}
ExecutionVertex
public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex, //第几个task,task和ExecutionVertex对应 IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.taskNameWithSubtask = String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1); //用于记录IntermediateResultPartitionfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex); //初始化IntermediateResultPartition result.setPartition(subTaskIndex, irp);resultPartitions.put(irp.getPartitionId(), irp);}this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);this.currentExecution = new Execution( //创建Execution getExecutionGraph().getFutureExecutor(),this,0,createTimestamp,timeout);this.timeout = timeout;}
connectToPredecessors,把节点用edge相连
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {List<JobEdge> inputs = jobVertex.getInputs(); //JobVertex的输入for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num); //对应的JobEdge IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); //取出JobEdge的source IntermediateResultthis.inputs.add(ires); //List<IntermediateResult> inputs;int consumerIndex = ires.registerConsumer(); //将当前vertex作为consumer注册到IntermediateResult的每个IntermediateResultPartitionfor (int i = 0; i < parallelism; i++) {ExecutionVertex ev = taskVertices[i];ev.connectSource(num, ires, edge, consumerIndex); //为每个ExecutionVertex建立到具体IntermediateResultPartition的ExecutionEdge}}}
connectSource
public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {final DistributionPattern pattern = edge.getDistributionPattern(); // 获取edge的distribution patternfinal IntermediateResultPartition[] sourcePartitions = source.getPartitions(); // 获取souce的partitionsExecutionEdge[] edges;switch (pattern) {case POINTWISE:edges = connectPointwise(sourcePartitions, inputNumber);break;case ALL_TO_ALL:edges = connectAllToAll(sourcePartitions, inputNumber);break;default:throw new RuntimeException("Unrecognized distribution pattern.");}this.inputEdges[inputNumber] = edges;// add the consumers to the source// for now (until the receiver initiated handshake is in place), we need to register the // edges as the execution graphfor (ExecutionEdge ee : edges) {ee.getSource().addConsumer(ee, consumerNumber);} }
看下connectPointwise
private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {final int numSources = sourcePartitions.length; //Partitions的个数final int parallelism = getTotalNumberOfParallelSubtasks(); //subTasks的并发度// simple case same number of sources as targetsif (numSources == parallelism) { //如果1比1,简单return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; //取sourcePartitions中和subTaskIndex对应的那个partition }else if (numSources < parallelism) { //如果subTasks的并发度高,那一个source会对应于多个taskint sourcePartition;// check if the pattern is regular or irregular// we use int arithmetics for regular, and floating point with rounding for irregularif (parallelism % numSources == 0) { //整除的情况下,比如2个source,6个task,那么第3个task应该对应于第一个source// same number of targets per sourceint factor = parallelism / numSources;sourcePartition = subTaskIndex / factor;}else {// different number of targets per sourcefloat factor = ((float) parallelism) / numSources;sourcePartition = (int) (subTaskIndex / factor);}return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };}else {//...... } }
配置checkpoint
executionGraph.enableSnapshotCheckpointing(snapshotSettings.getCheckpointInterval(),snapshotSettings.getCheckpointTimeout(),snapshotSettings.getMinPauseBetweenCheckpoints(),snapshotSettings.getMaxConcurrentCheckpoints(),snapshotSettings.getExternalizedCheckpointSettings(),triggerVertices,ackVertices,confirmVertices,checkpointIdCounter,completedCheckpoints,externalizedCheckpointsDir,checkpointStatsTracker);
启动CheckpointCoordinator,参考专门讨论Checkpoint机制的blog
Scheduler
下面看看如何将生成好的ExecutionGraph进行调度
future { //异步try {submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //放入submittedJobGraphs} catch {// }}jobInfo.notifyClients(decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知用户提交成功if (leaderElectionService.hasLeadership) {executionGraph.scheduleForExecution(scheduler) //调度 }} catch {// }}(context.dispatcher)}
executionGraph.scheduleForExecution
public void scheduleForExecution(SlotProvider slotProvider) throws JobException {switch (scheduleMode) {case LAZY_FROM_SOURCES:// simply take the vertices without inputs.for (ExecutionJobVertex ejv : this.tasks.values()) { //ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks,这个tasks的命名不科学if (ejv.getJobVertex().isInputVertex()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}}break;case EAGER:for (ExecutionJobVertex ejv : getVerticesTopologically()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}break;default:throw new JobException("Schedule mode is invalid.");}}
对于流默认是EAGER,
public JobGraph createJobGraph() {jobGraph = new JobGraph(streamGraph.getJobName());// make sure that all vertices start immediatelyjobGraph.setScheduleMode(ScheduleMode.EAGER);
ExecutionJobVertex.scheduleAll
public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { ExecutionVertex[] vertices = this.taskVertices;// kick off the tasksfor (ExecutionVertex ev : vertices) {ev.scheduleForExecution(slotProvider, queued);}}
ExecutionVertex.scheduleForExecution
//The current or latest execution attempt of this vertex's task public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {return this.currentExecution.scheduleForExecution(slotProvider, queued); }
Execution.scheduleForExecution
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();if (transitionState(CREATED, SCHEDULED)) {ScheduledUnit toSchedule = locationConstraint == null ? //生成ScheduledUnitnew ScheduledUnit(this, sharingGroup) :new ScheduledUnit(this, sharingGroup, locationConstraint);final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //从slotProvider获取slotfinal Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {@Overridepublic Void apply(SimpleSlot simpleSlot, Throwable throwable) {if (simpleSlot != null) { //slot分配成功try {deployToSlot(simpleSlot); //deploy} catch (Throwable t) {try {simpleSlot.releaseSlot();} finally {markFailed(t);}}}else {markFailed(throwable);}return null;}});}
slotProvider,参考Flink - Scheduler
deployToSlot,核心就是往TaskManager提交submitTask请求
public void deployToSlot(final SimpleSlot slot) throws JobException {ExecutionState previous = this.state;if (previous == SCHEDULED || previous == CREATED) {if (!transitionState(previous, DEPLOYING)) { //状态迁移成Deployingthrow new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");}}try {// good, we are allowed to deployif (!slot.setExecutedVertex(this)) { //设置slot和ExecuteVertex关系throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);}this.assignedResource = slot;final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( //创建DeploymentDescriptor attemptId,slot,taskState,attemptNumber);// register this execution at the execution graph, to receive call backsvertex.getExecutionGraph().registerExecution(this);final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout); //向TaskMananger的Actor发送请求 submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {......}}