Flink源码剖析

写在前面

最近一段时间都没有更新博客了,原因有点离谱,在实现flink的两阶段提交的时候,每次执行自定义的notifyCheckpointComplete时候,好像就会停止消费数据,完成notifyComplete后再消费数据;基于上述原因,开始了一些奇奇怪怪的探索…

存在问题:
1、JobMaster的提交过程,在ResourceManager上,但是为什么会用到zookeeper;–》高可用
2、执行图是否会放到Checkpoint里面,–》目前没有分析,背景是SQL添加字段后,ck就无法使用了
3、本地快照和远程快照的区别,回顾一下Checkpoint的过程—》flink本身支持本地快照和远程快照,其目的自然不言而喻;

总结:

  1. 之前一直不理解flink为什么好像和kafka总是有一种莫名其妙的联系,其实flink=通过mailbox处理数数据的过程本身就利用了消息队列的思想
  2. 源码剖析,通过对整个集群启动,执行图的转换,Task的启动,Checkpoint过程了解,这部分大量运用到了并发编程的知识,真的强;
  3. 下一阶段,先把优秀的源码Copy一遍,java基础搞起来;flink历史版本的重大变更也需要看一下;

1.Flink集群启动

1.1.Flink RPC详解

Flink的RPC实现:基于Scala的网络编程库:Akka

  1. ActorSystem是管理Actor生命周期的组件,Actor是负责进行通信的组
  2. 每个Actor都有一个Mailbox,别的Actor发送给它的消息都首先存储在Mailbox中,通过这种方式可以实现异步通信;
  3. 每个Actor是单线程的处理方式,不断的从Mailbox拉取消息执行处理,所以对于Actor的消息处理,不适合调用会阻塞的处理方法。
  4. Actor可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的Actor
  5. 每一个ActorSystem和Actor都在启动的时候会给定一个name,如果要从ActorSystem中,获取一个Actor,则通过以下的方式来进行Actor的获取:akka.tcp://asname@bigdata02:9527/user/actorname
  6. 如果一个 Actor 要和另外一个 Actor进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然
    后通过该对象发送消息即可。
  7. 通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到
    返回处理结果。

Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及
到的最重要的 API 主要是以下这四个:

  1. RpcGateway:路由,RPC的老祖宗,其他个RPC组件,都是RpcGateWay的子类
  2. RpcServer:RpcService 和 RpcEndpoint 之间的粘合层
  3. RpcEndpoint: 业务逻辑载体,对应的 Actor 的封装
  4. RpcService:对应ActorSystem的封装
    RpcEndpoint子类关系图

RpcEndpoint下面有四个比较重要的子类:TaskExecutor、Dispatcher、JobMaster、ResourceManager

1.2.Flink集群启动脚本分析

Flink 集群的启动脚本在:flink-dist 子项目中,位于 flink-bin 下的 bin 目录:启动脚本为:startcluster.sh
该脚本会首先调用 config.sh 来获取 masters 和 workers,masters 的信息,是从 conf/masters 配置
文件中获取的, workers 是从 conf/workers 配置文件中获取的。然后分别:

  1. 通过 jobmanager.sh 来启动 JobManager
  2. 通过 taskmanager.sh 来启动 TaskManager

他们的内部,都通过 flink-daemon.sh 脚本来启动 JVM 进程,分析 flink-daemon.sh 脚本发现:

  1. JobManager 的启动参数:standalonesession,实现类是:StandaloneSessionClusterEntrypoint
  2. TaskManager 的启动参数:taskexecutor,实现类是:TaskManagerRunner

1.3.Flink主节点JobManager启动分析

JobManager是Flink集群的主节点,它包含三大重要的组件:

  1. ResourceManager:Flink的集群资源管理器,只有一个,关于Slot的管理和申请等工作,都由他负责
  2. Dispatcher:负责接收用户提交的JobGraph,然后启动一个JobMaster
  3. JobMaster:负责一个具体的Job的执行,在一个集群中,可能会有多个JobMaster同时执行,类似于 YARN集群中的 AppMaster 角色,类似于 Spark Job 中的 Driver 角色
  4. WebMonitorEndpoint:里面维护了很多的handler,如果客户端通过flink run的方式提交一个Job到Flink集群,最终,是由WebMonitorEndpoint来接收,并且决定使用哪一个handler来执行处理,如:

根据以上的启动脚本分析:JobManager的启动主类:StandaloneSessionClusterEntrypoint

// 入口,Entry point for the standalone session cluster
StandaloneSessionClusterEntrypoint.main()ClusterEntrypoint.runClusterEntrypoint(entrypoint);clusterEntrypoint.startCluster();runCluster(configuration, pluginManager);// 第一步:初始化各种服务initializeServices(configuration, pluginManager);// 第二步:创建DispatcherResourceManagerComponentFactory,初始化各种组件的工厂实例createDispatcherResourceManagerComponentFactory(configuration);// 第三步:创建 集群运行需要的一些组件:Dispatcher、ResourceManager等;clusterComponent = dispatcherResourceManagerComponentFactory.create(...);

第一步,initializeServices()中做了很多服务组件的初始化:

// 初始化和启动AkkaRpcService,内部其实包装了一ActorSystem
commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService();// 初始化HA服务组件,负责HA服务的是ZooKeeperHaServices
haServices = createHaServices(configuration, ioExecutor);// 初始化BlobServer服务端
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();// 初始化心跳服务组件,heartbeatServices = HeartbeatServices
heartbeatServices = createHeartbeatServices(configuration);// 初始化一个用来存储ExecutionGraph的Store,实现是:FileArchivedExecutionGraphStore
archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());

第二步, createDispatcherResourceManagerComponentFactory(configuration)中负责初始化了很多组件的工厂实例;

1. DispatcherRunnerFactory,默认实现:DefaultDispatcherRunnerFactory
2. ResourceManagerFactory,默认实现:StandaloneResourceManagerFactory
3. RestEndpointFactory,默认实现:SessionRestEndpointFactory
其中,DispatcherRunnerFactory内部也实例化了一个组件:SessionDispatcherLeaderProcessFactoryFactory

第三步,dispatcherResourceManagerComponentFactory.create(…)中主要去创建三个重要的组件:

1. DispatcherRunner,实现是:DispatcherRunnerLeaderElectionLifecycleManager
2. ResourceManager,实现是:StandaloneResourceManager
3. WebMonitorEndpoint,实现是:DispatcherRestEndpoint

在这里插入图片描述

1.4.Flink从节点TaskManager启动分析

TaskManager:This class is the executable entry point(入口点) for the task manager in yarn or standalone mode.

TaskManager上的基本资源单位是Slot,一个作业的Task最终会部署在一个TaskManager的Slot上运行,TaskManager会维护本地的Slot资源列表,并与JobMaster和JobManager通信

根据以上的加班启动分析:TaskManager的启动主类:TaskManagerRunner

TaskManagerRunner.main();runTaskManagerSecurely(args, ResourceID.generate());// 加载配置Configuration configuration = loadConfiguration(args);// 启动TaskManagerrunTaskManagerSecurely(configuration, resourceID);// 启动TaskManagerrunTaskManager(configuration, resourceID, pluginManager);// 构建TaskManagerRunner实例taskManagerRunner = new TaskManagerRunner(configuration, resourceId, pluginManager);// 初始化一个线程池this.executor = java.util.concurrent.Executors.newScheduledThreadPool(...);// 获取高可用模式highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...);// 创建RPC服务rpcService = createRpcService(configuration, highAvailabilityServices);// 创建心跳服务heartbeatServices = HeartbeatServices.fromConfiguration(configuration);// 创建BlobCacheServiceblobCacheService = new BlobCacheService(...);// 创建TaskManagertaskManager = startTaskManager(...);// 初始化TaskManagerServicestaskManagerServices = TaskManagerServices.fromConfiguration(...);// 初始化TaskEventDispatchertaskEventDispatcher = new TaskEventDispatcher();// 初始化IOManagerAsyncioManager = new IOManagerAsync(...);// 初始化NettyShuffleEnvironmentshuffleEnvironment = createShuffleEnvironment(...);// 初始化KVStageServicekvStateService = KvStateService.fromConfiguration(...);// 初始化BroadcastVariableManagerbroadcastVariableManager = new BroadcastVariableManager();// 初始化TaskSlotTabletaskSlotTable = createTaskSlotTable();// 初始化JobTablejobTable = DefaultJobTable.create();// 初始化JobLeaderServicejobLeaderService = new DefaultJobLeaderService(...);// 初始化TaskStateManagertaskStateManager = new TaskExecutorLocalStateStoresManager(...);// 初始化LibraryCacheManagerlibraryCacheManager = new BlobLibraryCacheManager(...);// 返回return new TaskManagerServices(...);//  初始化一个TaskExecutorreturn new TaskExecutor(...);// 初始化心跳管理器:jobManagerHeartbeatManagerthis.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);// 初始化心跳管理器:resourceManagerHeartbeatManagerthis.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(...);// 转到TaskExecutor的onStart()方法TaskExecutor.onStart();startTaskExecutorService();// 启动TaskManagerRunnertaskManagerRunner.start();

2. Flink 应用程序的提交

2.1.Flink Program编程流程总结

Flink底层提供了一个功能完善且复杂的分布式流式计算引擎,但是上层的应用API却很简单,简单来说,把整个Flink应用程序的编写,抽象成三个方面:

  • 执行环境 ExecutionEnvironment
  • 数据抽象 DataSet DataStream
  • 逻辑操作 Source Transformation Sink

所以Flink的应用程序在编写的时候,基本是一个简单的统一套路:

1. 获取执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
2. 通过执行环境对象,注册数据源Source,得到数据抽象DataStream ds = env.socketTextStream(...);
3. 调用数据抽象的各种Transformation执行逻辑计算DataStream resultDs = ds.flatMap(...).keyBy(...).sum(...);
4. 将各自Transformation执行完毕之后得到的计算结果数据抽象注册SinkresultDs.addSink(...);
5. 提交Job执行env.execute(...);

在Flink应用程序中,其实所有的操作,都是StreamOperator,分为SourceOperator,SinkOperator,StreamOperator,然后能被优化的Operator就会Chain在一起,形成一个OperatorChain。
基本路数,和Spark一致,并且,在Flink-1.13版本后,将会完全统一批处理的API。
三个类似的概念:

  1. Function:函数
  2. Operator:对Function的封装
  3. Transformation:等价于Operator的概念,Flink中,process函数底层调用的依旧是transform

2.2.Flink Job提交脚本解析-Session模式

当编写好Flink的应用程序,正常的提交方式为:打成jar包,通过Flink命令来进行提交。
Flink命令脚本的底层,是通过java命令启动:CliFronted类来启动JVM经常执行任务的构造和提交。

flink run xxx.jar class arg1 arg2

2.3.CliFronted提交分析

当用户吧Flink应用程序打成jar使用flink run … 的shell命令提交的时候,底层是通过CliFrontend来处理。底层的逻辑,就是通过反射来调用用户剩下的main()方法执行。
在CliFronted内部,主要有以下几件事情要做:

  1. 根据Flink后面的执行命令来确定执行方法(run===>run(params))
  2. 解析main参数,构建PackageProgram,然后执行PackageProgram
  3. 通过反射获取应用程序的main方法的实例,通过反射调用执行起来

总得来说,就是准备执行Program所需要的配置,jar包,运行主类等的必要信息,然后提交执行。

2.4.ExecutionEnvironment源码解析

Flink应用程序的执行,首先就是创建运行环境StreamExecutionEnvironment,一般在企业环境中,都是通过getExecutionEnvironment()来获取ExecutionEnvironment,如果是本地运行的话,则会获取到:LocalStreamEnvironment,如果是提交到Flink集群运行,则获取到:StreamExecutionEnvironment。

final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

StreamExecutionEnvironment是Flink应用程序的执行入口,提供了一些重要的操作机制(包括但不限于):

  1. 提供了readTextFile(),socketTextStream(),createInput(),addSource()等方法去对接数据源
  2. 提供了setParallelism()设置程序的并行度
  3. StreamExecutionEnvironment管理了ExecutionConfig对象,该对象负责Job执行的一些行为配置管理
  4. StreamExecutionEnvironment管理了一个List<Transformation<?>> transformations 成员变量,该成员变了,主要用于保持Job的各种算子转化得到的Transformation,把这些Transformation按照逻辑拼接起来,就能得到StreamGraph(Transformation–>StreamOperator–>StreamNode)
  5. StreamExecutionEnvironment提供了execute()方法用于提交Job执行,该方法接收的参数就是:StreamGraph

2.5.Job提交流程源码分析

核心流程如下:

//核心入口
env.execute("Streaming WordCount");//负责生成StreamGraph//负责执行StreamGraphexecute(getStreamGraph(jobName));

第一步:getStreamGraph(jobName)生成StreamGraph解析

//入口
StreamGraph streamGraph = getStreamGraph(jobName, true);// 通过StreamGraphGenerator来生成StreamGraphStreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);for (Transformation<?> transformation: transformations) {transform(transformation);}	

transform(transformation)的内部实现:

transform(transformation);// 先递归处理改Transformation的输入,transformOneInputTransformCollection<Integer> inputIds = transform(transform.getInput());// 将Transformation变成Operator设置到StreamGraph中,其实就是添加StreamNodestreamGraph.addOperator(...);// 设置该 StreamNode 的并行度streamGraph.setParallelism(transform.getId(), parallelism);// 设置该StreamNode的入边StreamEdgefor(Integer inputId : inputIds){streamGraph.addEdge(inputId,sink.getId(),0);// 内部实现// 构建StreamNode之间的边(StreamEdge)对象StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode...);// 给上游StreamNode设置出边getStreamNode(edge.getSourceId()).addOutEdge(edge);// 给下游StreamNode设置入边getStreamNode(edge.getTargetId()).addInEdge(edge);}

第二步:execute(StreamGraph)解析

// 入口
final JobClient jobClient = executeAsync(streamGraph);// 执行一个StreamGraph,假定使用的是:AbstractSessionClusterExecutorexecutorFactory.getExecutor(configuration).execute(streamGraph, configuration);// 第一件事:由StreamGraph生成JobGraphfinal JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);// 第二件事:通过 RestClusterClient 提交 JobGraph 到Flink集群clusterClient.submitJob(jobGraph)	

继续提交:

// 通过RestClusterClient来提交JobGraph
RestClusterClient.submitJob(JobGraph jobGraph);// 继续提交RestClusterClient.sendRetriableRequest(...);// 通过ResetClient提交RestClient.sendRequest(webMonitorHost, webMonitorPort, ...)// 继续提交
RestClient.submitRequest(targetAddress, targetPort, httpRequest, responseType);

最终通过Channel把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。

2.6.Flink Graph演变

Flink的一个job,本质还是构建一个高效率的能用于分布式执行的DAG执行图。

  1. 帮我们把上下游两个相邻算作如果能Chain到一起,则Chain到一起做优化
  2. Chain到一起的多个Operator就会组成一个OperatorChain,当OperatorChain执行的时候,到底要执行多少个Task,则需要把DAG进行并行化变成实实在在的Task来调度执行。
最开始:dataStream.xx1().xx2()xxx3()....xxxn();env.execute();
到最后:List<StreamTask> 执行(不同的StreamTask内部逻辑计算操作不一样)
总结要点相邻两个阶段之间的StreamTask是有关系的,到底哪些上游StreamTask生产数据给下游消费StreamTask,即shuffle

一个Flink流式作业,从Client提交到Flink集群,到最后执行,总共会经历四种不同的状态。总得来说:

  1. Client 首先根据用户编写的代码生成StreamGraph,然后把StreamGraph构建成JobGraph交给Flink集群主节点
  2. 然后启动的JobMaster在接收到JobGraph后,会对其进行并行化生成ExecutionGraph后调度启动StreamTask执行。
  3. StreamTask并行化的运行在Flink集群中,就是最终的物理执行图状态结构。

Flink中的执行图可以分成四层:StreamGraph==>JobGraph==>ExecutionGraph==>物理执行图。
参考链接:

  • JobGraph中,数据从上一个Operator(JobVertex)流到下一个Operator(JobVertex)的过程中,上游作为生产者提供了IntermediateDataSet,而下游作为消费者需要JobEdge。事实上,JobEdge是一个通信管道,连接了上游生产的dataset和下游的JobVertex节点。【注:优化算子链以提高效率】
  • 在JobGraph转换到ExecutionGraph的过程中,主要发生了以下转变:
    1.加入了并行度的概念,成为真正可调度的图结构;
    2.生成了与JobVertex对应的ExecutionJobVertex,ExecutionVertex;与IntermediateDataSet对应的IntermediateResult和IntermediateResultPartition;并行将通过这些类实现
  • ExecutionGraph已经可以用于调度任务。Flink根据该图生成了一一对应的Task,每个Task对应一个ExecutionGraph的一个Execution。Task用InputGate、InputChannel和ResultPartition对对应了上面图中IntermediateResult和ExecutionEdge。

那么,设计中为什么要设计这四层执行逻辑呢?它的意义是什么?

  1. StreamGraph是对用户逻辑的映射
  2. JobGraph在StreamGraph基础上进行了一些优化,比如吧一部分操作串成Chain以提高效率
  3. ExecutionGraph是为了调度存在的,加入了并行处理的概念
  4. 物理执行结构,真正执行的是Task及其相关结构。

2.6.1.StreamGraph构建和提交源码解析

StreamGraph:把每一个算子transform成一个对流的转换(比如SingleOutputStreamOperator,它就是一个DataStream的子类),并且注册到环境中,用于生成StreamGraph。
它包含的主要抽象概念有:

  1. StreamNode:用来代表Operator的类,并具有所有相关的配置,如并发度、入边和出边等。
  2. StreamEdge:表示连接两个StreamNode的边。

源码核心代码入口:

StreamExecutionEnvironment.execute(getStreamGraph(jobName))

StreamGraph生成过程中,生成StreamNode的代码入口:

streamGraph.addOperator(vertexID, slotSharingGroup, coLocationGroup,operatorFactory, inTypeInfo, outTypeInfo, operatorName);

StreamGraph生成过程中,生成StreamEdge的代码入口:

streamGraph.addEdge(inputId, transform.getId(), 0);

2.6.2.JobGraph构建和提交源码解析

JobGraph:StreamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构
它包含的主要抽象概念有:

  1. JobVertex:经过优化后符合条件的多个StreamNode可能会Chain子啊一起生成一个JobVertex,即一个JobVertex包含一个或多个Operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
  2. IntermediateDataSet:表示JobVertex的输出,即经过Operator处理产生的数据集。
    producer是JobVertex,consumer是JobEdge。
  3. JobEdge:代表了Job Graph中的一条数据传输通道。Source是IntermediateDataSet,target事故JobVertex,即数据通过JobEdge由InterMediateDataSet传递给目标JobVertex。

源码核心代码入口:

final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline,configuration);

注:这里的Pipeline其实本质上就是StreamGraph

经过层层递进:

StreamingJobGraphGenerator.createJobGraph(this,jobID);

在StreamGraph构建JobGraph的过程中,最重要的事情就是Operator的Chain优化,那么到底什么情况下的Operator能Chain在一起呢?

1.下游节点的入度为1(也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;2.上下游节点都在同一个Slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);3.前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);4.上游节点的Chain策略为always或head(只能与下游链接,不能与上游链接,Source默认是head)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;5.下游节点的Chain策略为always(可以与上下游链接,map、flatmap、filter等默认是always)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;6.两个节点间物理分区逻辑是ForwardPartitioner
partitioner instanceof ForwardPartitioner;7.两个算子间的shuffle方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH8.上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism()9.用户没有禁用Chain
streamGraph.isChainingEnabled();

注:这里的9就是常用到的断开算子链!!!

2.6.3.ExecutionGraph构建和提交源码解析

ExecutionGraph:JobManager(JobMaster)根据JobGraph生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据架构。
它包含的主要抽象概念有:

  1. ExecutionJobVertex:和JobGraph中的JobVertex一一对应;每一个ExecutionJobVertex都有和并发度一样的ExecutionVertex
  2. ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。
  3. IntermediateResult:和JobGraph中IntermediateDataSet一一对应。一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该Operator的并发度。
  4. IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。
  5. ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。
  6. Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下
    ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过ExecutionAttemptID 来确定消息接受者

源码核心代码入口:

SchedulerBase.createAndRestoreExecutionGraph(...);

在SchedulerBase这个类的内部,有两个成员变量:一个是JobGraph,一个是ExecutionGraph;
在创建SchedulerBase这个类的子类:DefaultSchedule的实例对象的时候,会再SchedulerBase的构造方法中去生成ExecutionGraph。
源码核心流程:

SchedulerBase.createAndRestoreExecutionGraph();ExecutionGraph newExecutionGraph = createExecutionGraph(...);ExecutionGraphBuilder.buildGraph(jobGraph,...);//创建ExecutionGraph对象executionGraph = (prior != null) ? prior : new ExecutionGraph(...);//生成JobGraph的JSON表达式executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));//重点,从JobGraph构建ExecutionGraphexecutionGraph.attachJobGraph(sortedTopology);//遍历JobVertex执行并行化生成ExecutionVertexfor (JobVertex jobVertex : topologiallySorted) {//每一个JobVertex对应到一个ExecutionJobVertex// create the execution job vertex and attach it to the graphExecutionJobVertex ejv = new ExecutionJobVertex(this,jobVertex);ejv.connectToPredecessors(this.intermediateResults);List<JobEdge> inputs = jobVertex.getInputs();for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num);IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());this.inputs.add(ires);// 根据并行度来设置ExecutionVertexfor (int i = 0; i < parallelism; i++) {ExecutionVertex ev = taskVertices[i];ev.connectSource(num, ires, edge, consumerIndex);}}

2.6.4.物理执行图

物理执行图:JobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上部署Task后形成的“图”,并不是一个具体的数据结构。
它包含的主要抽象概念有:

  1. Task:Execution被调度后再分配的TaskManager中启动对应的Task,Task包裹了具有用户执行逻辑的Operator。
  2. ResultPartition:A result partition for data produced by a single task.
  3. ResultSubpartition:A single subpartition of a {@link ResultPartition} instance.
  4. InputGate:An input gate consumes one or more partitions of a single produced intermediate result.
  5. InputChannel:An input channel consumes a single {@link ResultSubpartitionView}.

2.7.WebMonitorEndpoint处理RetClient的JobSubmit请求

最终处理这个请求:JobSubmitHandler来处理!
核心入口:

// JobManager服务端处理入口
JobSubmitHandler.handleRequest();// 恢复得到JobGraphCompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);// 通过Dispatcher提交JobGraphDispatcher.submitJob(jobGraph, timeout)

Dispatcher的提交执行逻辑:

Dispatcher.persistAndRunJob()// 保存JobGraph在ZK上jobGraphWriter.putJobGraph(jobGraph);// 提交JobGraph执行Dispatcher.runJob(jobGraph);// 第一件事&&主要的事情:创建JobMaster,注:这里方法命名为createJobMaster更合理Dispatcher.createJobManagerRunner(jobGraph);// 初始化new JobManagerImplnew JobManagerRunnerImpl(...);// 初始化 JobMasternew JobMaster(...);// 创建DefaultSchedulerthis.schedulerNG = createScheduler(jobManagerJobMetricGroup);schedulerNGFactory.createInstance(...)new DefaultScheduler()super()this.executionGraph = createAndRestoreExecutionGraph(...);ExecutionGraph newExecutionGraph = createExecutionGraph(...);// 因为JobMaster是RpcEndpoint的子类,然后跳转到JobMaster的onStart()方法onStart();/*空*/// 第二件事:启动JobMasterDispatcher.startJobManagerRunner();jobManagerRunner.start();leaderElectionService.start(this);// 选举成功,会调用isLeader()方法// Leader election service for multiple JobManagerZooKeeperLeaderElectionService.isLeader()JobManagerRunnerImpl.grantLeadership(final UUID leaderSessionID);verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);	

接着上面的过程继续:

startJobMaster(leaderSessionId);runningJobsRegistry.setJobRunning(jobGraph.getJobID());jobMasterService.start(new JobMasterId(leaderSessionId));//内部完成以下两件事://startJobMasterServices();//resetAndStartScheduler();JobMaster.startJobExecution(JobMasterId newJobMasterId);// 第一件事:跑起来JobMaster相关的服务,主要是注册和心跳startJobMasterServices();startHeartbeatServices();slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());...// 和ResourceManager建立联系,监听ResourceManagerresourceManagerLeadRetriever.start(new ResourceManagerLeaderListener());// 第二件事:开始申请Slot,并且部署TaskresetAndStartScheduler();JobMaster.startScheduling();schedulerNG.startScheduling();// 启动所有的服务协调组startAllOperatorCoordinators();// 开始调度startSchedulingInternal();prepareExecutionGraphForNgScheduling();schedulingStrategy.startScheduling();allocateSlotsAndDeploy(...);

接着继续申请Slot然后部署:

schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);DefaultScheduler.allocateSlotsAndDeploy(executionVertexDeploymentOptions);// 申请SlotallocateSlots(executionVertexDeploymentOptions);// 部署Task运行waitForAllSlotsAndDeploy(deploymentHandles);

3.1.Slot管理(申请和释放)源码解析

核心入口:allocateSlots(executionVertexDeploymentOptions);
接下来看TaskManager的Slot管理:

// JobMaster发送请求申请Slot
DefaultScheduler.allocateSlots();
DefaultExecutionSlotAllocator.allocateSlotsFor();
NormalSlotProviderStrategy.allocateSlot();
SchedulerImpl.allocateSlot();
SchedulerImpl.allocateSlotInternal();
SchedulerImpl.internalAllocateSlot();
SchedulerImpl.allocateSingleSlot();
SchedulerImpl.requestNewAllocatedSlot();
SlotPoolImpl.requestNewAllocatedBatchSlot();
SlotPoolImpl.requestNewAllocatedSlotInternal();
SlotPoolImpl.requestSlotFromResourceManager();// ResourceManager接收到请求,执行Slot请求处理
ResourceManager.requestSlot();
SlotManagerImpl.registerSlotRequest(slotRequest);
SlotManagerImpl.internalRequestSlot();
SlotManagerImpl.allocateSlot();
TaskExecutorGateway.requestSlot();// TaskManager处理ResourceManager发送过来的Slot请求
TaskExecutor.requestSlot();
TaskExecutor.offerSlotsToJobManager();
TaskExecutor.internalOfferSlotsToJobManager();
JobMasterGateway.offerSlots();// JobMaster接收到TaskManager发送过来的Slot申请处理结果
JobMaster.offerSlots();
SlotPoolImpl.offerSlots();

大体上,分为四个大步骤:

  1. JobManager发送请求申请Slot
  2. ResourceManager接收到请求,执行Slot请求处理
  3. TaskManager处理ResourceManager发送过来的Slot请求
  4. JobMaster接收到TaskManager发送过来的Slot申请处理结果

4.1.StreamTask初始化和执行

4.1.1.TaskExecutor执行一个Task

TaskExecutor接收提交Task执行的请求,则调用:

TaskExecutor.submitTask(TaskDeploymentDescriptor tdd,JobMasterId jobMasterId,Time timeout);

在该方法的内部,会封装一个Task对象,在Task的构造方法中,也做了一些相应的初始化动作:

public Task(...){//封装一个Task信息对象 TaskInfothis.taskInfo = new TaskInfo(...);//一个Task的执行输出,输出的抽象ResultPartition和ResultSubPartitionfinal ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment.createResultPartitionWriters(taskShuffleContext,resultPartitionDeploymentDescriptors);this.consumableNotifyingPartitionWriters = ConsumableNotifyingResultPartitionWriterDecorator.decorate(...);//一个Task的执行输入,输入的抽象:InputGate和InputChannel(从上游一个Task节点拉取数据)//InputChannel可能有两种实现:local和remote//初始化InputGate和InputChannelfinal IndexedInputGate[] gates = shuffleEnvironment.createInputGates(...);// 初始化一个用来执行Task的线程,目标对象,就是Task本身executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask)}

封装一个Task的时候,调用构造方法执行,会去初始化该Task的输入(InputGate和InputChannel)和输出(ResultPartition和ResultSubPartition)组件相关,然后初始化用来执行该Task的一个线程。
总之,都是通过封装一个Task对象,包括一个executingThread,其目标对象,就是Task,所以在Task构建完成之后,调用:

task.startTaskThread();

之后,跳转到Task.run()方法,从此,真正开始一个Task的启动和执行。
启动一个Task的执行,这个Task有可能是SourceStreamTask,也有可能是非SourceStreamTask(比如OneInputStreamTask,TwoInputStreamTask)等。

4.1.2.SourceStreamTask和StreamTask初始化

前提知识:在最开始一个Job提交到Flink standalone集群运行的时候,在client构建StreamGraph(顶点是StreamNode,边是StreamEdge)的时候,会根据用户调用的算子生成Transformation为StreamGraph生成StreamNode,在生成StreamNode的时候,会通过OperatorFactory执行判断,如果该StreamOperator是StreamSource的时候,就会指定该StreamTask的invokableClass为SourceStreamTask,否则为(OneInputStreamTask,TwoInputStreamTask,StreamTask),核心代码如下:

Stream.addOperator(...){Class<? extends AbstractInvokable> invokableClass = operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
}

因此,当ExecutionVertex真正被提交到TaskExecutor中运行的时候,被封装的Execution对应的Task类的启动类AbstractInvokeable就是在构建StreamGraph的时候指定的对应的invokableClass。即:

  1. 如果启动SourceStreamTask,则启动类是:SourceStreamTask
  2. 如果启动非SourceStreamTask,则启动类是StreamTask

注:TaskExecutor、TaskManager和Slot之间的关系如何?
首先,查看SourceStreamTask的构造过程,核心入口:

Task.run();Task.doRun();// 将状态从CREATED改为DEPLOYING,这里的ExecutionState使用volatile修饰transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)// 拉起ResultPartitionWriter和InputGate,本质初始化BufferPool,分配Buffer// TaskManager---TaskExecutor(包括多个Task)---ResultPartitionManager(管理多个ResultPartition)//参考链接:https://cloud.tencent.com/developer/article/1878476//注:通过分配Buffer的方式,避免JVM的垃圾回收setupPartitionsAndGates(consumableNotifyingPartitionWriters,inputGates)//保证在Task执行过程中需要的各种组件Environment env = new RuntimeEnvironment(...);//通过反射实例化StreamTask实例(包括两种情况:SourceStreamTask,OneInputStreamTask)AbstractInvokable invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);//将状态从DEPLOYING改为RUNNINGtransitionState(ExecutionState.DEPLOYING,Execution.RUNNING);//启动StreamTaskinvokable.invoke();//StreamTask需要正常结束,处理Buffer中的数据ResultPartitionWriter.finish();

拉起ResultPartitionWriter和InputGate的时候到底是怎么做的?

for(ResultPartitionWriter partition : producePartitions){//注册当前Task的ResultPartition到启动当前Task的TaskManager之上的用来跟踪管理ResultPartition的ResultPartitionManager之中partition.setup();
}for(InputGate gate : inputGates){//为这个Task的InputGate中的InputChannel分配BufferPoolgate.setup();
}

SourceStreamTask构造方法:
内部通过反射来实例化AbstractInvokable的具体实例,最终跳转到SourceStreamTask的构造方法,同样,如果非SourceStreamTask的话,则跳转到OneInputStreamTask的带Environment参数的构造方法

public SourceStreamTask(Environment env) throw Exception{this(env,new Object());
}

然后跳转到重载构造:

private SourceStreamTask(Enviorment env, Object lock) throw Exception{//调用父类StreamTask的构造方法super(env, null, FatalExitExceptionHandler.INSTANCE, StreamTaskActionExecutor.synchronizedExecutor(lock));this.lock = Preconditions.checkNotNull(lock);//初始化一个接收数据的线程this.sourceThread = new LegacySourceFunctionThread();}

StreamTask的构造方法:

protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox mailbox) throws Exception {//StreamTask(ResultPartition + InputGate)//创建RecordWriter,写入数据到 ResultPartition,this.recordWriter = createRecordWriterDelegate(configuration, environment);//处理输入 Stream.processInput(),读取InputChannel的数据this.mailboxProcessor = new MailboxProcessor(this::processInput,mailbox,actionExecutor);//创建状态后端StateBacked,一般使用FsStateBackedthis.stateBackend = createStateBackend();//初始化SubtaskCheckpointCoordinatorImpl实例,主要作用是通过StateBackend创建CheckpointStoragethis.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl(//创建CheckpointStorage,使用FsStateBackend,创建的就是FsCheckpointStoragestateBackend.createCheckpointStorage(getEnvironment().getJobID())...);}

其中在SourceStreamTask的processInput()方法中,主要是启动接收数据的线程LegacySourceFunctionThread;在执行构造方法完毕后,LegacySourceFunctionThread已经初始化好了,但是 headOperator 还是null,所以,LegacySourceFunctionThread 还未真正启动。

4.1.3.SourceStreamTask和StreamTask执行

接下来要进入到StreamTask.invoke()方法执行,核心分为四个步骤:

public final void invoke() throws Exception{//Task正式工作之前beforeInvoke();//Task开始工作runMailboxLoop();//Task要结束afterInvoke();//Task最后执行清理cleanUpInvoke();}

在beforeInvoke()中,主要初始化OperatorChain,然后调用init()执行初始化,然后恢复状态,更改Task的状态isRuning=true;
在runMailboxLoop()中,主要是不停的处理mail,是Flink-1.10的改进,使用了mailbox模型来处理任务;
参考链接:http://matt33.com/2020/03/20/flink-task-mailbox/
在afterInvoke()中,主要是完成Task要结束之前需要完成的一些细节,比如,把buffer中的数据flush;
最后,在cleanUpInvoke()主要做一些资源的释放,执行各自关闭动作:set false,interrupt,shutdown,close,cleanup,dispose等;

整个Task的生命周期中,前两个步骤非常重要。
首先进入beforeInvoke()方法:

protected void beforeInvoke() throws Exception{//初始化OperatorChainopeatorChain = new OperatorChain<>(this,recordWriter);//执行初始化SourceStream.init();//初始化状态actionExecutor.runThrowing(() -> {operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());readRecoveredChannelState();});//更改运行状态isRunning = true;
}

首先看ChainOperator的初始化,首先会为每个Operator创建一个RecordWriterOutput,再为每个Operator创建一个OutputCollector;然后把每一个Operator都包装成OperatorWrapper放入List< StreamOperatorWrapper > allOpWrappers集合中。最后调用linkOperatorWrappers(allOpWrappers)方法以逻辑正序的方式来构建StreamOperator的链式关系。
在这里插入图片描述
然后就是init()方法,对于SourceStreamTask来说,就是看Source是不是ExternallyInduceSource,如果是,则注册一个savepoint钩子。对于OneInputStreamTask来说,主要就是创建CheckpointedInputGate,StreamTaskNetworkOutput,StreamTaskNetworkInput,StreamOneInputProcessor用来进行shuffle相关的数据传输。
到此为止,Task初始化和预执行相关的,都基本到位了,然后就开始我们的SourceStreamTask的HeadOperator的数据接收线程,开始流式处理。
核心代码入口:

LegacySourceFunctionThread.run();headOperator.run(lock,getStreamStatusMaintainer(),operatorChain);StreamSource.run();userFunction.run(ctx);

具体Task的执行,参考源码注释。
关于Flink Task之间的shuffle有一对这样的概念:

StreamTaskNetworkInput,存在StreamTask中用于接收上游T啊算了发过来的数据的
StreamTaskNetworkOutput

4.2~4.3.State和Checkpoint的过程剖析

首先明确一个观点,Statebackend核心的工作是管理State(生命周期),State是用于状态数据,Checkpoint是实现快照的过程

  1. 假定现在一个Task在正常的处理数据,即从mailBox中不断的取出数据,处理数据;更新该Task的State,如:KeyedState通过CopyOnWriteMap实现状态更新,但是本身受到HashMapStateBackend管理,
  2. TaskManager接收到JobMaster发送的checkpoint请求,向SourceTask注入barrier,barrier首先会进入mailbox,处理完成后开始准成Checkpoint,完成的工作:
    1. 广播当前barrier到下游StreamTask
    2. 启动一个异步线程AsyncCheckpointRunnable完成快照,这个过程Task会继续处理数据;
  3. Task完成Checkpoint后,汇报JobManager完成了Checkpoint,JobManager通知Task完成Checkpoint(不是等所有Task完成后再通知),会在StreamTask的notifyCheckpointOperation方法中,产生一个command(可以理解为mailbox的Task,优先级为MAX_PRIORITY)完成notifyCheckpointComplete

其实Checkpoint过程本身说复杂不负责,但是对于每一个步骤的细节如何,似乎单纯死记硬背不是一个好的建议,下面给出网上一些大佬的帖子,真的再次感谢各位大佬对源码的分析:
Flink 源码阅读笔记(10)- State 管理
AsyncCheckpointRunnable
怎么理解flink的异步检查点机制
Flink 基于 MailBox 实现的 StreamTask 线程模型
Flink Mailbox模型
Apache Flink and the input data reading
分布式数据流的轻量级异步快照
论文阅读-Lightweight Asynchronous Snapshots for Distributed Dataflows

4.4.Checkpoint的触发

Checkpoint是Flink Default Tolerance机制的重要组成部分,Flink Checkpoint的核心类名为org.apache.flink.runtime.checkpoint.CheckpointCoordinator

4.4.1.Client端生成Checkpoint配置

Flink的应用程序,都是通过StreamExecutionEnvironment的execute()方法提交执行的,在StreamExecutionEnvironment初始化的时候,会调用:

this.configure(this.configuration, this.userClassloader);

方法来执行配置参数的初始化,其中就会涉及到:

checkpointCfg.configure(configuration);

代码的执行。这句代码中的相关参数的初始化,就是跟Checkpoint相关的各种参数。
生成的CheckpointConfig会最先被设置盗StreamGraph中,然后由StreamingJobGraphGenerator解析到StreamGraph生成JobGraph的时候,由

StreamingJobGraphGenerator.configureCheckpointing();

将CheckpointConfig中的各自参数,封装成JobCheckpointingSettings对象,然后设置到JobGraph中由成员变量snapshotSettings来进行保存。

4.4.2.Checkpoint CheckpointCoordinator启动源码详解

然后再ExecutionGraphBuilder构建ExecutionGraph的时候,会生成CheckpointCoordinatorConfiguration对象,来保存成JobGraph中的snapshotSettings参数,最终该交给:

ExecutionGraphBuilder.enableCheckpointing();

执行解析,保存到ExecutionGraph中。
在ExecutionGraphBuilder生成ExecutionGraph的时候,总结一下,大概做了以下几件事情:

  1. 解析ExecutionGraph中的各种ExecutionVertex,设置到tasksToTrigger,tasksToWaitFor,tasksToCommitTo数组中
  2. 注册了CheckpointFailureManager组件,用来汇总Checkpoint的统计信息。
  3. 创建CheckpointFailureManager,管理Checkpoint失败后的策略
  4. 创建定时器CheckpointCoordinatorTimer(ScheduledExecutorService),用于定时触发Checkpoint
  5. 创建CheckpointCoordinator,并注册CheckpointCoordinatorDeActivator
    首先是CheckCoordinator的触发机制,核心入口是:
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());

checkpointCoordinator.createActivatorDeactivator()方法返回的是一个JobStatusListener,具体实现是:CheckpointCoordinatorDeActivator,它的作用是:当监听到Job的状态为JobStatus.RUNNING的时候,就开始执行CheckpointCoordinatorDeActivator.jobStatusChanges()的回调处理。而具体的间隔时间,一般都由用户自己设置。
总结一下:
CheckpointCoordinator是Flink执行Checkpoint的核心组件,JobManager在接收到client端的submitJob请求后将JobGraph转化为ExecutionGraph,并调用enableCheckpointing方法初始化CheckpointCoordinator,然后为CheckpointCoordinator注册一个Job状态变化的监听器CheckpointCoordinatorDeActivator。CheckpointCoordinatorDeActivator实现了JobStatusListener接口,当Job状态变成RUNNING时,调用startCheckpointScheduler方法开启CheckpointScheduler,当Job变成其他状态时,调用stopCheckpointScheduler方法停止CheckpointScheduler。

4.4.3.CheckpointCoordinator Checkpoint执行源码详解

所以,真正开始执行Checkpoint的入口是:

CheckpointCoordinator.startCheckpointScheduler();

内部具体通过scheduleTriggerWithDelay(getRandomInitDelay())来实现调度!
其中:getRandomInitDelay()存在意义是:ScheduledExecutor timer不要一上来就执行Checkpoint,而是等一段随机事件(在minPauseBetweenCheckpoints和baseInterval + 1L之间)。ScheduledTrigger就是定时调度的一个Checkpoint触发器。
具体的minPauseBetweenCheckpoints和baseInterval是多少,就看用户的设置是多少:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置:baseInterval = 10s,默认 500 = 0.5s
env.enableCheckpointing(1000*10);
//设置:minPauseBetweenCheckpoints = 1s,默认 0
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

接下来看调度逻辑:

CheckpointCoordinator.startCheckpointScheduler();CheckpointCoordinator.scheduleTriggerWithDelay(getRandomInitDelay());//定时调度任务timer.scheduleAtFixedRate(new ScheduledTrigger(),initDelay,baseInterval,TimeUnit.MILLISECONDS);ScheduledTrigger.run();//定时任务,其实就是定时调用CheckpointCoordinator的triggerCheckpoint()方法触发CheckpointCheckpointCoordinator.triggerCheckpoint(true);CheckpointCoordinator.triggerCheckpoint(checkpointProperties,null,isPeriodic,false);CheckpointCoordinator.startTriggerCheckpoint(CheckpointTriggerRequest);CheckpointCoordinator.initializeCheckpoint(...);CheckpointCoordinator.createPendingCheckpoint(...);CheckpointCoordinator.snapshotTaskState();//遍历每一个Source ExecutionVertex,触发CheckpointExecution.triggerCheckpoint(checkpointID,timestamp,checkpointOptions);Execution.triggerCheckpointHelper(checkpointId,timestamp,checkpointOptions,false);TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();taskManagerGateway.triggerCheckpoint(.....);// 发送 Checkpoint RPC 给对应的 Source ExecutionVertex 组件taskExecutorGateway.triggerCheckpoint(....);

到此为止,JobMaster终于把Checkpoint请求发送给了对应执行了Source ExecutionVertex的TaskManager节点了。
在这个过程中,可能会取消Checkpoint:

  1. coordinator处于shutdown状态
  2. 周期性Checkpoint调度被取消(periodicScheduling=false),一般periodicScheduling=false,是因为用户手动触发了savepoint
  3. 当前有排队的Checkpoint请求
  4. 当前pendingCheckpoints数量达到设定上限
  5. 与上一次Checkpoint间隔小于设定的最小值,如果间隔太小,会取消并重新设定调度器
  6. 如果Job的所有Source ExecutionVertex没有全处于RUNNING的状态的时候

4.4.4.Checkpoint TaskManager端处理state保存

当TaskManager接收到Checkpoint请求的时候,TaskManager端的Checkpoint分为两种情况:

  1. SourceStreamTask
  2. 其他StreamTask

当SourceStreamTask所在的TaskExecutor收到trigger Checkpoint消息,继续进行Checkpoint,核心入口是:

TaskExecutor.triggerCheckpoint(executionAttemptID,checkpointId,checkpointTimestamp,checkpointOptions,advanceToEndOfEventTime);

下一步调用:

task.triggerCheckpointBarrier(checkpointId,checkpointTimestamp,checkpointOptions, advanceToEndOfEventTime);

下一步调用:获取Task的AbstractInvokeable。并生成CheckpointMetaData,然后执行SourceStreamTask的状态判断,继续调用Checkpoint还是取消:

if(executionState == ExecutionState.RUNNING && invokable != null){//继续CheckpointSourceStreamTask.triggerCheckpointAsync(checkpointMetaData,CheckpointOptions,advanceToEndOfEventTime);
}else{//取消CheckpointcheckpointResponder.declineCheckpoint(jobId,executionId,checkpointID,new CheckpointException("xxx"));
}

经过一些跳转,最终跳转到:StreamTask

StreamTasl.triggerCheckpointAsync(checkpointMetaData,checkpointOptions,advanceToEndOfEventTime);

内部通过MailBox模型来调度执行,内部调用:

triggerCheckpoint(checkpointMetaData,checkpointOptions,advanceToEndOfEventTime);

来触发执行Checkpoint。从这里开始,就进入到Mailbox的主线程来执行Checkpoint了。在该方法中,核心逻辑为:

//1、执行SubtaskCheckpointCoordinatorImpl的初始化
subtaskCheckpointCoordinator.initCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);
//2、执行Checkpoint
boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
//3、通过上述的返回值来判断是否要取消Checkpoint
if (!success) {declineCheckpoint(checkpointMetaData.getCheckpointId());
}

在performCheckpoint()方法中,会调用:SubtaskCheckpointCoordinatorImpl的checkpointState() 执行 state 的快照!内部分为这么几个步骤:

// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
//           The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastEvent(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),options.isUnalignedCheckpoint());// Step (3): Prepare to spill the in-flight buffers for input and output
if (options.isUnalignedCheckpoint()) {prepareInflightDataSnapshot(metadata.getCheckpointId());
}// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the
// streaming topology
takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)// Step (5): report Task State Snapshots to jobmanager
finishAndReportAsync(snapshotFutures, metadata, metrics, options);

事实上一个Job的所有Task的state的Checkpoint是由takeSnapshotSync来真正完成的。最底层会调用StreamOperatorStateHandler的snapshotState()方法来完成具体的工作,它的内部主要做三件事情:

//1、对StreamOperator完成snapshot
streamOperator.snapshotState(snapshotContext);//2、针对Operator类型的状态执行snapshot
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));//3、针对KeyedState类型的状态执行snapshot
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));

4.4.5.Checkpoint CheckCoordinator端反馈处理

当上述,第四步完成的时候,第五步就可以对JobMaster进行Checkpoint状态汇报了。然后当TaskExecutor执行完Checkpoint之后,发送回反馈CheckCoordinator执行处理。
核心入口是:JobMaster.acknowledgeCheckpoint()方法
下面看详细流程:

JobMaster.acknowledgeCheckpoint();SchedulerBase.acknowledgeCheckpoint();CheckpointCoordinator.receiveAcknowledgeMessage();//处理Task节点返回的ack信息PendingCheckpoint.acknowledgeTask();//判断该 PendingCheckpoint 该发的和该收到的 ack 是否都已经成功 ackif (checkpoint.isFullyAcknowledged()) {//更改PendingCheckpoint为CompletedCheckpointCheckpointCoordinator.completePendingCheckpoint(checkpoint);}	

4.5.Checkpoint State恢复源码剖析

JobMaster实例创建时,通过调用链:

JobMaster.createScheduler();DefaultSchedulerFactory.createInstance();new DefaultScheduler();SchedulerBase.createAndRestoreExecutionGraph();SchedulerBase.tryRestoreExecutionGraphFromSavepoint();CheckpointCoordinator.restoreSavepoint();

到达CheckpointCoordinator的restoreSavepoint()方法,进入Checkpoint State restore流程。
所以Checkpoint State Restore的核心入口是:

CheckpointCoordinator.restoreSavepoint();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/55789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣16~20题

题16&#xff08;中等&#xff09;&#xff1a; 思路&#xff1a; 双指针法&#xff0c;和15题差不多&#xff0c;就是要排除了&#xff0c;如果total<target则排除了更小的&#xff08;left右移&#xff09;&#xff0c;如果total>target则排除了更大的&#xff08;rig…

kafka的成神秘籍(java)

kafka的成神秘籍 kafka的简介 ​ Kafka 最初是由Linkedin 即领英公司基于Scala和 Java语言开发的分布式消息发布-订阅系统&#xff0c;现已捐献给Apache软件基金会。Kafka 最被广为人知的是作为一个 消息队列(mq)系统存在&#xff0c;而事实上kafka已然成为一个流行的分布式流…

【mmengine】配置器(config)(进阶)继承与导出,命令行修改配置

一、配置文件的继承 1.1 继承机制概述 新建optimizer_cfg.py: optimizer dict(typeSGD, lr0.02, momentum0.9, weight_decay0.0001)新建runtime_cfg.py: device "cuda" gpu_ids [0, 1] batch_size 64 epochs 100 num_workers 8新建resnet50.py: _base_ […

Vue 路由设置

为了防止遗忘&#xff0c;记录一下用Vue写前端配置路由时的过程&#xff0c;方便后续再需要用到时回忆。 一、举个例子 假如需要实现这样的界面逻辑&#xff1a; 在HomePage中有一组选项卡按钮用于导航到子页面&#xff0c;而子页面Page1中有一个按钮&#xff0c;其响应事件是…

笔记-stm32移植ucos

文章目录 一、UCOS的基础知识1.1 前后台系统:1.2 RTOS系统可剥夺型内核:前后台系统和RTOS系统 1.3 UCOS系统简介学习方法 二、ucossii移植Step1&#xff1a;在工程中建立存放UCOSS代码的文件夹UCOSIIStep2:向CORE文件夹添加文件Step3:向Config文件夹添加文件Step4:向port文件夹…

LLM4Rec最新工作: 字节发布用于序列推荐的分层大模型HLLM

前几个月 Meta HSTU 点燃各大厂商对 LLM4Rec 的热情&#xff0c;一时间&#xff0c;探索推荐领域的 Scaling Law、实现推荐的 ChatGPT 时刻、取代传统推荐模型等一系列话题让人兴奋&#xff0c;然而理想有多丰满&#xff0c;现实就有多骨感&#xff0c;尚未有业界公开真正复刻 …

vscode中配置python虚拟环境

python虚拟环境作用 Python虚拟环境允许你为每个独立的项目创建一个隔离的环境&#xff0c;这样每个项目都可以拥有自己的一套Python安装包和依赖&#xff0c;不会互相影响。实际使用中&#xff0c;可以在vscode或pycharm中使用虚拟环境。 1.创建虚拟环境的方法&#xff1a; …

【NLP自然语言处理】01-基础学习路径简介

目的&#xff1a;让大家能够在 AI-NLP 领域由基础到入门具体安排&#xff1a; NLP介绍 文本预处理RNN 及其变体&#xff08;涉及案例&#xff09;Transformer 原理详解迁移学习 和 Bert 模型详解 &#xff08;涉及案例&#xff09;特点&#xff1a; 原理 实践每个文章会有练习…

04-SpringBootWeb案例(中)

3. 员工管理 完成了部门管理的功能开发之后&#xff0c;我们进入到下一环节员工管理功能的开发。 基于以上原型&#xff0c;我们可以把员工管理功能分为&#xff1a; 分页查询&#xff08;今天完成&#xff09;带条件的分页查询&#xff08;今天完成&#xff09;删除员工&am…

算法题总结(十)——二叉树上

#二叉树的递归遍历 // 前序遍历递归LC144_二叉树的前序遍历 class Solution {public List<Integer> preorderTraversal(TreeNode root) {List<Integer> result new ArrayList<Integer>(); //也可以把result 作为全局变量&#xff0c;只需要一个函数即可。…

Linus Torvalds 要求内核开发人员编写更好的 Git 合并提交信息

昨天在宣布 Linux 6.12-rc2 内核时&#xff0c;Linus Torvalds 要求内核维护者在提交信息方面做得更好。Torvalds 尤其希望内核维护者在描述拉取请求中的变更时&#xff0c;能更好地使用积极、命令式的语气。 Linux创建者在6.12-rc2 公告中解释道&#xff1a; 总之&#xff0c…

论文阅读笔记-XLNet: Generalized Autoregressive Pretraining for Language Understanding

前言 Google发布的XLNet在问答、文本分类、自然语言理解等任务上都大幅超越BERT,XLNet提出一个框架来连接语言建模方法和预训练方法。我们所熟悉的BERT是denoising autoencoding模型,最大的亮点就是能够获取上下文相关的双向特征表示,所以相对于标准语言模型(自回归)的预…

【AIGC】ChatGPT提示词Prompt高效编写模式:结构化Prompt、提示词生成器与单样本/少样本提示

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | ChatGPT 文章目录 &#x1f4af;前言&#x1f4af;结构化Prompt (Structured Prompt)组成元素应用实例优势结论 &#x1f4af;提示词生成器 (Prompt Creator)如何工作应用实例优势结论 &#x1f4af;单样本/少样本提示 (O…

什么是安全运营中心 SOC?

SOC 代表安全运营中心&#xff0c;它是任何企业中负责组织安全、保护企业免受网络风险的单一、集中的团队或职能。 安全运营中心将管理和控制业务运营的所有安全要素&#xff0c;从监控资产到雇用合适的人员和流程&#xff0c;再到检测和应对威胁。 在本文中&#xff0c;我们…

PHP变量(第④篇)

本栏目教学是php零基础到精通&#xff0c;如果你还没有安装php开发工具请查看下方链接&#xff1a; Vscode、小皮面板安装-CSDN博客 今天来讲一讲php中的变量&#xff0c;变量是用于存储信息的"容器"&#xff0c;这些数据可以在程序执行期间被修改&#xff08;即其…

ThinkBook 16+ 锐龙6800h 安装ubuntu键盘失灵

问题&#xff1a;在ThinkBook 16 锐龙6800h 安装ubuntu18.04 出现笔记本键盘按下延迟非常高&#xff0c;输出卡死的情况&#xff0c;但是外接键盘可以正常使用 解决&#xff1a;更新内核 1、进入 https://kernel.ubuntu.com/~kernel-ppa/mainline/ 下载所需内核版本&#x…

Node.js+Express毕设论文选题最新推荐题目和方向

目录 一、前言 二、毕设选题推荐 三、总结 四、附录&#xff08;手册、官网、资源教程等&#xff09; 1. Node.js 官方资源 2. Express 官方资源 3.安装方法 4 创建示例 一、前言 Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境&#xff0c;它允许开发者使用…

智能医疗:Spring Boot医院管理系统开发

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常适…

x++、++x的一些问题

x、x在字面上无非就说一个先前置递增然后再运算&#xff0c;另一个是运算完再递增&#xff0c;是不是有些许模棱两可的感觉&#xff0c;接下来引用一个简单的for循环就能够大致理解&#xff1a; 先是x&#xff1a; int i0,x0;for(i0;(i)<5;){xi;printf("%d\n",x)…

ubuntu 安装baget

一、安装netcore3.1 环境 二、下载运行文件 下载&#xff1a;github.com/loic-sharma/BaGet/releases 修改&#xff1a;appsettings.json文件 mkdir -p /root/apps/baget mkdir -p /root/apps/datas touch /root/apps/baget.db cd /root/apps/baget dotnet BaGet.dll --urls&…