Flink – JobManager.submitJob

JobManager作为actor,

  case SubmitJob(jobGraph, listeningBehaviour) =>val client = sender()val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),jobGraph.getSessionTimeout)submitJob(jobGraph, jobInfo)

 

submitJob,做3件事、

根据JobGraph生成ExecuteGraph
恢复状态CheckpointedState,或者Savepoint
提交ExecuteGraph给Scheduler进行调度

 

ExecuteGraph

executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph, //currentJobs.get(jobGraph.getJobID),对应的jobid是否有现存的ExecuteGraph
  jobGraph,flinkConfiguration, //配置futureExecutor, //Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-future-", "-thread-")),根据cpu核数创建的线程池ioExecutor, // Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-io-", "-thread-"))userCodeLoader,  //libraryCacheManager.getClassLoader(jobGraph.getJobID),从jar中加载checkpointRecoveryFactory, //用于createCheckpointStore和createCheckpointIDCounter,standalone和zk两种
  Time.of(timeout.length, timeout.unit),restartStrategy, //job重启策略
  jobMetrics,numSlots, //scheduler.getTotalNumberOfSlots(),注册到该JM上的instances一共有多少slotslog.logger)

 

ExecutionGraphBuilder.buildGraph

 

New

        // create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(futureExecutor,ioExecutor,jobId,jobName,jobGraph.getJobConfiguration(),jobGraph.getSerializedExecutionConfig(),timeout,restartStrategy,jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths(),classLoader,metrics);} catch (IOException e) {throw new JobException("Could not create the execution graph.", e);}

 

attachJobGraph,生成Graph的节点和边

        // topologically sort the job vertices and attach the graph to the existing oneList<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology);

 

ExecutionGraph.attachJobGraph

       for (JobVertex jobVertex : topologiallySorted) {// create the execution job vertex and attach it to the graphExecutionJobVertex ejv =new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);ejv.connectToPredecessors(this.intermediateResults);//All job vertices that are part of this graph, ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasksExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);for (IntermediateResult res : ejv.getProducedDataSets()) {//All intermediate results that are part of this graph//ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResultsIntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);}//All vertices, in the order in which they were created//List<ExecutionJobVertex> verticesInCreationOrderthis.verticesInCreationOrder.add(ejv);}

将JobVertex封装成ExecutionJobVertex

会依次创建出ExecutionJobVertex,ExecutionVertex, Execution; IntermediateResult, IntermediateResultPartition

 

ExecutionJobVertex

public ExecutionJobVertex(ExecutionGraph graph,JobVertex jobVertex,int defaultParallelism,Time timeout,long createTimestamp) throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}//并发度,决定有多少ExecutionVertexint vertexParallelism = jobVertex.getParallelism();int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;//产生ExecutionVertexthis.taskVertices = new ExecutionVertex[numTaskVertices];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = jobVertex.getSlotSharingGroup();this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()]; //创建用于存放中间结果的IntermediateResultfor (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] = new IntermediateResult( //将JobGraph中的IntermediateDataSet封装成IntermediateResult
                    result.getId(),this,numTaskVertices,result.getResultType());}// create all task verticesfor (int i = 0; i < numTaskVertices; i++) {ExecutionVertex vertex = new ExecutionVertex( //初始化ExecutionVertexthis, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);this.taskVertices[i] = vertex; //
        }finishedSubtasks = new boolean[parallelism];}

 

ExecutionVertex

      public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex, //第几个task,task和ExecutionVertex对应
            IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.taskNameWithSubtask = String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1); //用于记录IntermediateResultPartitionfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex); //初始化IntermediateResultPartition
            result.setPartition(subTaskIndex, irp);resultPartitions.put(irp.getPartitionId(), irp);}this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);this.currentExecution = new Execution( //创建Execution
            getExecutionGraph().getFutureExecutor(),this,0,createTimestamp,timeout);this.timeout = timeout;}

 

connectToPredecessors,把节点用edge相连

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {List<JobEdge> inputs = jobVertex.getInputs(); //JobVertex的输入for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num); //对应的JobEdge
            IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); //取出JobEdge的source IntermediateResultthis.inputs.add(ires); //List<IntermediateResult> inputs;int consumerIndex = ires.registerConsumer(); //将当前vertex作为consumer注册到IntermediateResult的每个IntermediateResultPartitionfor (int i = 0; i < parallelism; i++) {ExecutionVertex ev = taskVertices[i];ev.connectSource(num, ires, edge, consumerIndex); //为每个ExecutionVertex建立到具体IntermediateResultPartition的ExecutionEdge}}}

connectSource

public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {final DistributionPattern pattern = edge.getDistributionPattern(); // 获取edge的distribution patternfinal IntermediateResultPartition[] sourcePartitions = source.getPartitions(); // 获取souce的partitionsExecutionEdge[] edges;switch (pattern) {case POINTWISE:edges = connectPointwise(sourcePartitions, inputNumber);break;case ALL_TO_ALL:edges = connectAllToAll(sourcePartitions, inputNumber);break;default:throw new RuntimeException("Unrecognized distribution pattern.");}this.inputEdges[inputNumber] = edges;// add the consumers to the source// for now (until the receiver initiated handshake is in place), we need to register the // edges as the execution graphfor (ExecutionEdge ee : edges) {ee.getSource().addConsumer(ee, consumerNumber);}
}

看下connectPointwise

private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {final int numSources = sourcePartitions.length;  //Partitions的个数final int parallelism = getTotalNumberOfParallelSubtasks(); //subTasks的并发度// simple case same number of sources as targetsif (numSources == parallelism) { //如果1比1,简单return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; //取sourcePartitions中和subTaskIndex对应的那个partition
    }else if (numSources < parallelism) { //如果subTasks的并发度高,那一个source会对应于多个taskint sourcePartition;// check if the pattern is regular or irregular// we use int arithmetics for regular, and floating point with rounding for irregularif (parallelism % numSources == 0) { //整除的情况下,比如2个source,6个task,那么第3个task应该对应于第一个source// same number of targets per sourceint factor = parallelism / numSources;sourcePartition = subTaskIndex / factor;}else {// different number of targets per sourcefloat factor = ((float) parallelism) / numSources;sourcePartition = (int) (subTaskIndex / factor);}return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };}else {//......
    }
}

 

配置checkpoint

                executionGraph.enableSnapshotCheckpointing(snapshotSettings.getCheckpointInterval(),snapshotSettings.getCheckpointTimeout(),snapshotSettings.getMinPauseBetweenCheckpoints(),snapshotSettings.getMaxConcurrentCheckpoints(),snapshotSettings.getExternalizedCheckpointSettings(),triggerVertices,ackVertices,confirmVertices,checkpointIdCounter,completedCheckpoints,externalizedCheckpointsDir,checkpointStatsTracker);

启动CheckpointCoordinator,参考专门讨论Checkpoint机制的blog

 

Scheduler

下面看看如何将生成好的ExecutionGraph进行调度

     future { //异步try {submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //放入submittedJobGraphs} catch {//
            }}jobInfo.notifyClients(decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知用户提交成功if (leaderElectionService.hasLeadership) {executionGraph.scheduleForExecution(scheduler) //调度
          }} catch {//
        }}(context.dispatcher)}

executionGraph.scheduleForExecution

    public void scheduleForExecution(SlotProvider slotProvider) throws JobException {switch (scheduleMode) {case LAZY_FROM_SOURCES:// simply take the vertices without inputs.for (ExecutionJobVertex ejv : this.tasks.values()) { //ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks,这个tasks的命名不科学if (ejv.getJobVertex().isInputVertex()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}}break;case EAGER:for (ExecutionJobVertex ejv : getVerticesTopologically()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}break;default:throw new JobException("Schedule mode is invalid.");}}

对于流默认是EAGER,

public JobGraph createJobGraph() {jobGraph = new JobGraph(streamGraph.getJobName());// make sure that all vertices start immediatelyjobGraph.setScheduleMode(ScheduleMode.EAGER);

 

ExecutionJobVertex.scheduleAll

    public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {    ExecutionVertex[] vertices = this.taskVertices;// kick off the tasksfor (ExecutionVertex ev : vertices) {ev.scheduleForExecution(slotProvider, queued);}}

ExecutionVertex.scheduleForExecution

//The current or latest execution attempt of this vertex's task
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {return this.currentExecution.scheduleForExecution(slotProvider, queued);
}

Execution.scheduleForExecution

    public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();if (transitionState(CREATED, SCHEDULED)) {ScheduledUnit toSchedule = locationConstraint == null ? //生成ScheduledUnitnew ScheduledUnit(this, sharingGroup) :new ScheduledUnit(this, sharingGroup, locationConstraint);final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //从slotProvider获取slotfinal Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {@Overridepublic Void apply(SimpleSlot simpleSlot, Throwable throwable) {if (simpleSlot != null) { //slot分配成功try {deployToSlot(simpleSlot); //deploy} catch (Throwable t) {try {simpleSlot.releaseSlot();} finally {markFailed(t);}}}else {markFailed(throwable);}return null;}});}

slotProvider,参考Flink - Scheduler

 

deployToSlot,核心就是往TaskManager提交submitTask请求

    public void deployToSlot(final SimpleSlot slot) throws JobException {ExecutionState previous = this.state;if (previous == SCHEDULED || previous == CREATED) {if (!transitionState(previous, DEPLOYING)) { //状态迁移成Deployingthrow new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");}}try {// good, we are allowed to deployif (!slot.setExecutedVertex(this)) { //设置slot和ExecuteVertex关系throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);}this.assignedResource = slot;final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( //创建DeploymentDescriptor
                attemptId,slot,taskState,attemptNumber);// register this execution at the execution graph, to receive call backsvertex.getExecutionGraph().registerExecution(this);final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout); //向TaskMananger的Actor发送请求
submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {......}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/259253.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

window内容

window parent top location.href location.reload location.replace转载于:https://www.cnblogs.com/carlos-guo/p/3391784.html

计算机类公务员如何提升自己,大学毕业才发现:所学专业对考公务员如此重要,4类专业上岸率高...

导语&#xff1a;毕业季来临&#xff0c;同学们是想直接找工作积累工作经验&#xff0c;还是继续考取相关证书&#xff0c;来获得更稳定职业的入场券&#xff1f;毕业抉择很多毕业生面临的第一个问题就是未来职业规划&#xff0c;因为大学毕业之后&#xff0c;就意味着一段新的…

使用getline读入

直接上代码&#xff1a; 第一份&#xff1a;从cin 读入多行数字&#xff0c;每行2个。当输入完毕后&#xff0c;按2次回车结束 #include<iostream> #include <cstdio> #include <sstream> #include <string> #include <vector> #include <it…

POJ 1221

整数划分 划分成单峰的回文数列 dp[i][j] 表示 把i划分&#xff0c;其中划分的数不能大于j 1             i1或j1 dp[i][j]  dp[i][j-1]1         ji dp(i,j-1)dp(i-j,min(i-j,j)) i>j>1 1 #include <iostream>2 #include <cstd…

HYSBZ - 1050(旅行comf 并查集Java实现)

HYSBZ - 1050(旅行comf Java实现) 原题地址 解法&#xff1a;枚举每一条边&#xff0c;对于这条边&#xff0c;我们需要找到集合中和其值相差最小的最大边&#xff0c;这个集合是指与包括i边在内的ST联通集。对于这一要求&#xff0c;我们只需对所有的边进行从小到大的排序&…

UVA 11401 - Triangle Counting

Problem G Triangle Counting Input: Standard Input Output: Standard Output You are given n rods of length 1, 2…, n. You have to pick any 3 of them & build a triangle. How many distinct triangles can you make? Note that, two triangles will be considere…

苏州软件测试11k工资要什么水平,3个月从机械转行软件测试,他的入职薪资是11K...

原标题&#xff1a;3个月从机械转行软件测试&#xff0c;他的入职薪资是11K只要找到适合自己的学习方式&#xff0c;成功转行只是早晚的问题&#xff01;今天汇智妹给大家介绍的这位小伙伴&#xff0c;是咱们汇学联盟平台上的一位线上学员——小周。97年的小哥哥&#xff0c;19…

python idle 清屏问题的解决

在学习和使用python的过程中&#xff0c;少不了要与python idle打交道。但使用python idle都会遇到一个常见而又懊恼的问题——要怎么清屏?我在stackoverflow看到这样两种答案&#xff1a;1.在shell中输入1 import os 2 os.system(cls) 这种方法只能在windows系统中cmd模式下的…

TCP/IP 原理--链路层

链路层作用&#xff1a; &#xff08;1&#xff09;为IP模块发送和接收IP数据报&#xff1b; &#xff08;2&#xff09;为ARP发送ARP请求和接受ARP应答 &#xff08;3&#xff09;为RARP发送RARP请求和接受ARP应答 协议&#xff1a;以太网和SLIP协议 A.以太网协议数据封装格式…

Sqoop拒绝连接错误

使用Sqoop远程连接MySQL导入数据到HBase数据库&#xff1a; sqoop import --driver com.mysql.jdbc.Driver --connect "jdbc:mysql://hzhiServer:3306/myssh?autoReconnecttrue" --table table_001 --username hadoop --password 1 --hbase-table table_001 --colum…

拆解凹多边形

偶遇需要拆解凹多边形 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows; using System.Windows.Media;namespace DrawPolygon {public static class Settings{public const float…

计算机教学的弊端,信息技术在教学中的利弊及解决对策

摘 要&#xff1a;信息技术教育已成为国家信息化事业的重要组成部分&#xff0c;是当今素质教育的重要内容之一。从阐述信息技术教育的内涵和发展阶段出发&#xff0c;分析了当前信息技术在教学应用中的优势和存在的问题&#xff0c;并提出了相应的解决对策。关键词&#xff1a…

【转】Linux命令之查看文件占用空间大小-du,df

原文网址&#xff1a;http://blog.csdn.net/wangjunjun2008/article/details/19840671 du(disk usage),顾名思义,查看目录/文件占用空间大小#查看当前目录下的所有目录以及子目录的大小$ du -h $ du -ah #-h:用K、M、G的人性化形式显示 #-a:显示目录和文件 du -h tmp du -ah tm…

一个FORK的面试题

为什么80%的码农都做不了架构师&#xff1f;>>> #include <stdio.h> #include <sys/types.h> #include <unistd.h> int main(void) { int i; for(i0; i<2; i){ fork(); printf("-"); } wait(NULL); wait(NULL); return 0; }/c 如果…

C++11系列学习之二-----lambda表达式

C11添加了一项名为lambda表达式的新功能&#xff0c;通过这项功能可以编写内嵌的匿名函数&#xff0c;而不必编写独立函数和函数对象&#xff0c;使得代码更容易理解。lambda表达式的语法如下所示&#xff1a;[capture_block](parameters) exceptions_specification -> retu…

php四种基础算法:冒泡,选择,插入和快速排序法

许多人都说 算法是程序的核心&#xff0c;一个程序的好于差,关键是这个程序算法的优劣。作为一个初级phper&#xff0c;虽然很少接触到算法方面的东西 。但是对于冒泡排序&#xff0c;插入排序&#xff0c;选择排序&#xff0c;快速排序四种基本算法&#xff0c;我想还是要掌握…

GCPC2014 C Bounty Hunter

题意&#xff1a;给你一个平面上的点集&#xff08;x值各不相等&#xff09;&#xff0c;问你从最左边走到最右边&#xff08;只能以x递增的顺序&#xff09;&#xff0c;再从最右边回到最左边&#xff08;以x递减的顺序&#xff09;问你最短距离是多少。 解题思路&#xff1a;…

计算机启动时运行ccleaner,Ccleaner的使用方法

ccleaner是一款非常好用的系统优化工具&#xff0c;它可以提升电脑速度&#xff0c;可以对上网历史记录、临时文件夹、回收站垃圾清理、注册表进行垃圾项扫描和清理、软件卸载等功能&#xff0c;保护用户的个人浏览隐私&#xff0c;为Windows系统腾出更多硬盘空间。下面小编就为…

PLSQL Developer软件使用大全

PLSQL Developer软件使用大全 第一章 PLSQL Developer特性 PL/SQL Developer是一个集成开发环境&#xff0c;专门面向Oracle数据库存储程序单元的开发。如今&#xff0c;有越来越多的商业逻辑和应用逻辑转向了Oracle Server&#xff0c;因此&#xff0c;PL/SQL编程也成了整个开…

C++11系列学习之三----array/valarray

创建数组&#xff0c;是程序设计中必不可少的一环。我们一般可以有以下几种方法来创建数组。 一、C内置数组 数组大小固定&#xff0c;速度较快 通用格式是&#xff1a;数据类型 数组名[ 数组大小 ]; 如 int a[40];//一维数组 int a[5][10];//二维数组 二、vector创建数组 包…