Flink源码之Checkpoint执行流程

checkpoint

Checkpoint完整流程如上图所示:

  1. JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint
  2. SourceTask向下游广播CheckpointBarrier
  3. SouceTask完成状态快照后向JobMaster发送快照结果
  4. 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果
  5. JobMaster保存SubTask快照结果
  6. JobMaster收到所有SubTask快照结果后保存快照信息,想SubTask通知Checkpoint完成

以下对整个流程具体说明。

CheckpointCoordinator

JobMaster将JobGraph转换为ExecutionGraph时,如果开启Checkpoint,会为ExecutionGraph生成一个CheckpointCoordinator

DefaultExecutionGraphBuilder.buildGraph//在此会将JobGraph转换为ExecutionGraphDefaultExecutionGraph::newDefaultExecutionGraph::attachJobGraph //创建ExecutionJobVertexDefaultExecutionTopology.fromExecutionGraph //创建ExecutionTopologyDefaultExecutionGraph::enableCheckpointing //创建CheckpointCoordinatorDefaultExecutionGraph::createCheckpointPlanCalculator//创建DefaultCheckpointPlanCalculatorCheckpointCoordinator::new 

CheckpointCoordinator封装了StateBackend和CheckpointStorage

StateBackend负责管理状态:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

CheckpointStorage则是负责存储StateBackend管理的状态:

  • JobManagerCheckpointStorage //checkpoint state放入JobManager内存
  • FileSystemCheckpointStorage //配置state.checkpoints.dir时

在为StreamTask构造SubtaskCheckpointCoordinatorImpl时会调用:

CheckpointStorage::createCheckpointStorage

创建CheckpointStorageAccess用于执行Checkpoint时解析状态存储位置

  • MemoryBackendCheckpointStorageAccess //对应JobManagerCheckpointStorage
  • FsCheckpointStorageAccess //对应FileSystemCheckpointStorage

CheckpointCoordinator在执行状态快照时会调用

CheckpointStorageAccess::resolveCheckpointStorageLocation

生成CheckpointStreamFactory用于生成读写状态数据流

  • MemCheckpointStreamFactory //对应JobManagerCheckpointStorage
  • FsCheckpointStreamFactory //对应FileSystemCheckpointStorage

Checkpoint触发流程

JobMaster状态转换为running后,通过CheckpointCoordinator向SourceTask发送TriggerCheckpoint

JobMaster端触发流程

JobMaster::start  //RPCServer启动
JobMaster::onStart
JobMaster::startJobExecution
JobMaster::startJobMasterServices //获取RM地址后与RM建立连接
JobMaster::startScheduling
SchedulerBase::startScheduling
DefaultScheduler::startSchedulingInternal
SchedulerBase::transitionToRunningDefaultExecutionGraph::transitionToRunning //调用ExecutionGraph监听器通知状态变化CheckpointCoordinatorDeActivator::jobStatusChanges//触发checkpointCheckpointCoordinator::startCheckpointSchedulerCheckpointCoordinator::scheduleTriggerWithDelay //定时不断触发CheckpointCheckpointCoordinator::triggerCheckpointCheckpointCoordinator::startTriggeringCheckpointDefaultCheckpointPlanCalculator::calculateCheckpointPlan//Plan中会隔离出SourceTask作为作为Trigger Checkpoint的入口CheckpointCoordinator::createPendingCheckpointCheckpointCoordinator::triggerCheckpointRequestCheckpointCoordinator::triggerTasks Execution::triggerCheckpoint //向每个SourceTask发送TriggerCheckpoint请求Execution::triggerCheckpointHelperTaskManagerGateway::triggerCheckpoint//向TaskExecutor发RPC

StreamTask端执行流程

SourceTask

SourceTask由JobMaster RPC直接触发,执行时先广播CheckpointBarrier,然后对状态执行异步快照

TaskExecutor::triggerCheckpoint
Task::triggerCheckpointBarrier
AbstractInvokable::triggerCheckpointAsync
SourceStreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsyncInMailbox
StreamTask::performCheckpoint
SubtaskCheckpointCoordinatorImpl::checkpointStateOperatorChain.broadcastEvent //广播CheckpointBarrier
CheckpointStorage::createCheckpointStorage//为JobId创建CheckpointStorageAccess
SubtaskCheckpointCoordinatorImpl::takeSnapshotSync
CheckpointStorageWorkerView::resolveCheckpointStorageLocation//CheckpointStorageAccess创建 CheckpointStreamFactoryOperatorChain::snapshotState //对每个OperatorRegularOperatorChain::buildOperatorSnapshotFuturesRegularOperatorChain::checkpointStreamOperatorAbstractStreamOperator::snapshotStateStreamOperatorStateHandler::snapshotState//调用Operator/Keyed Backend的snapshotStateSnapshotContextSynchronousImpl::newAbstractUdfStreamOperator::snapshotState //调用UDF中snapshotState方法,一般用于更新OperatorStateDefaultOperatorStateBackend::snapshotSnapshotStrategyRunner::snapshotDefaultOperatorStateBackendSnapshotStrategy::syncPrepareResources//深copy operator state,便于后续进行异步快照DefaultOperatorStateBackendSnapshotStrategy::asyncSnapshot//异步快照					  	  CheckpointStateOutputStream::closeAndGetHandleOperatorStreamStateHandle::new //包装元信息及数据StreamStateHandleHeapKeyedStateBackend::snapshotSnapshotStrategyRunner::snapshotHeapSnapshotStrategy::syncPrepareResourcesHeapSnapshotStrategy::asyncSnapshot //采用COWSateTable异步快照CheckpointStateOutputStream::closeAndGetHandleKeyGroupsStateHandle::new //包装KeyGroup及数据StreamStateHandle
SubtaskCheckpointCoordinatorImpl::finishAndReportAsync //向JobMaster发送checkpoint的结果AsyncCheckpointRunnable::new AsyncCheckpointRunnable::runAsyncCheckpointRunnable::finalizeNonFinishedSnapshotsOperatorSnapshotFinalizer::new //等待TaskSnapshot状态信息序列化完成AsyncCheckpointRunnable::reportCompletedSnapshotStatesTaskStateManagerImpl::reportTaskStateSnapshotsRpcCheckpointResponder::acknowledgeCheckpoint//向JobMaster发送Ack,带上State信息
非SourceTask

在StreamTask启动后调用StreamTask::processInput不断读取数据进行处理, 非SourceTask在收到上游的CheckpointBarrier对齐后触发Checkpoint,

StreamTask::processInput
StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement
处理CheckpointedInputGate::pollNextCheckpointedInputGate::handleEventSingleCheckpointBarrierHandler::processBarrierSingleCheckpointBarrierHandler::markCheckpointAlignedAndTransformStateWaitingForFirstBarrier::barrierReceivedAbstractAlignedBarrierHandlerState::barrierReceivedSingleCheckpointBarrierHandler.ControllerImpl::allBarriersReceived//判断对齐AbstractAlignedBarrierHandlerState::triggerGlobalCheckpointSingleCheckpointBarrierHandler.ControllerImpl::triggerGlobalCheckpointSingleCheckpointBarrierHandler::triggerCheckpointCheckpointBarrierHandler::notifyCheckpoint //触发StreamTask CheckpointStreamTask::triggerCheckpointOnBarrierStreamTask::performCheckpoint //后续调用过程与SourceTask一样SubtaskCheckpointCoordinatorImpl::checkpointState   		

根据调用栈看出,非SourceStreamTask执行Checkpoint只是触发时机不同,SourceTask由JobMaster RPC定时不断触发,非SourceTask则是在上游的CheckpointBarrier对齐后触发Checkpoint,最终执行逻辑都是将当前算子的信息写入CheckpointStorage后向JobMaster发送确认信息。

StreamTask向JobMaster ACK信息中包含状态元信息及StreamStateHandle,根据状态存储位置分为:

  • ByteStreamStateHandle //对应JobManagerCheckpointStorage,将状态序列化为byte[]发送给JobMaster
  • FileStateHandle //对应FileSystemCheckpointStorage,将状态写入文件系统后将文件路径发送给JobMaster

JobMaster端完成流程

JobMaster收到StreamTask的acknowledgeCheckpoint后:

JobMaster::acknowledgeCheckpoint
SchedulerBase::acknowledgeCheckpoint
ExecutionGraphHandler::acknowledgeCheckpoint
CheckpointCoordinator::receiveAcknowledgeMessagePendingCheckpoint::acknowledgeTask //某一个Task的确认PendingCheckpoint::updateOperatorState//更新SubTask状态信息CheckpointCoordinator::completePendingCheckpoint//所有Task Ack后PendingCheckpoint::finalizeCheckpointCheckpoints.storeCheckpointMetadata//保存CheckpointMetadataCompletedCheckpoint::newCheckpointCoordinator::sendAcknowledgeMessages//向Task通知Checkpoint完成消息ExecutionVertex::notifyCheckpointCompleteTaskManagerGateway.notifyCheckpointComplete

JobMaster收到所有StreamTask的Checkpoint状态信息后,标志一次Checkpoint完成,这时会通知StreamTask CheckPoint完成消息,便于SubTask监听Checkpoint完成后做后续动作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/48539.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LION AI 大模型落地,首搭星纪元 ES

自新能源汽车蓬勃发展以来,随着潮流不断进步和变革的“四大件”有着明显变化。其中有:平台、智能驾驶、配置、以及车机。方方面面都有着不同程度的革新。 而车机方面,从以前老旧的媒体机、 CD 机发展至如今具有拓展性、开放性、智能化的车机…

[保研/考研机试] KY207 二叉排序树 清华大学复试上机题 C++实现

题目链接: 二叉排序树_牛客题霸_牛客网二叉排序树,也称为二叉查找树。可以是一颗空树,也可以是一颗具有如下特性的非空二叉树: 1。题目来自【牛客题霸】https://www.nowcoder.com/share/jump/437195121692721757794 描述&#x…

滑动窗口介绍

1.基本概念 利用单调性,使用同向双指针,两个指针之间形成一个窗口 子串与子数组都是连续的一段子序列时不连续的 2.为什么可以用滑动窗口? 暴力解决时发现两个指针不需要回退(没必要回退,一定不会符合结果&#xf…

【网络】数据链路层——MAC帧协议 | ARP协议

🐱作者:一只大喵咪1201 🐱专栏:《网络》 🔥格言:你只管努力,剩下的交给时间! 来到数据链路层后,完整的数据被叫做数据帧,习惯上称之为MAC帧。 MAC帧协议 | A…

jenkins全量迁移

文章目录 1、目的2、迁移1)查看jenkins的主目录2)登录要迁出的服务器打包3)找到对应的war包4)登录对应迁入服务,上传war包和打包的jenkins数据等5)在新的服务器解压迁入的数据等,并查看端口是否…

# Lua与C++交互(二)———— 交互

C 调用lua 基础调用 再来温习一下 myName “beauty girl” C想要获取myName的值,根据规则,它需要把myName压入栈中,这样lua就能看到;lua从堆栈中获取myName的值,此时栈顶为空;lua拿着myName去全局表中查…

【Jenkins】rpm方式安装Jenkins(2.401,jdk版本17)

目录 【Jenkins】rpm方式安装Jenkins 1、主机初始化 2、软件要求 RPM包安装的内容 配置文件说明 3、web操作 【Jenkins】rpm方式安装Jenkins 1、主机初始化 [rootlocalhost ~]# hostname jenkins[rootlocalhost ~]# bash[rootjenkins ~]# systemctl stop firewalld[roo…

YOLOv8教程系列:三、K折交叉验证——让你的每一份标注数据都物尽其用(yolov8目标检测+k折交叉验证法)

YOLOv8教程系列:三、K折交叉验证——让你的每一份标注数据都物尽其用(yolov8目标检测k折交叉验证法) 0.引言 k折交叉验证(K-Fold Cross-Validation)是一种在机器学习中常用的模型评估技术,用于估计模型的性…

Java详解编译型和解释型语言

在计算机的高级编程语言类型分为两种,分别是编译型和解释型,而Java既有编译型又有解释型 什么是编译型?什么是解释型? 字面上来说编译和解释都有‘翻译’的意思,而她们两个的区别是‘翻译’的时机不同,什…

Python采集电商平台泳衣数据进行可视化分析

前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 环境使用: python 3.8 解释器 pycharm 编辑器 模块使用: 第三方模块 需要安装 requests —> 发送 HTTP请求 内置模块 不需要安装 csv —> 数据处理中经常会用到的一种文件格式 第三方模块安装&#xff1a…

实验五 Linux 内核的安装与加载

【实验目的】 掌握 uboot 的使用方法,能够使用 uboot 安装和加载内核 【实验环境】 ubuntu 14.04 发行版FS4412 实验平台 【注意事项】 实验步骤中以“$”开头的命令表示在 ubuntu 环境下执行,以“#”开头的命令表 示在开发板下执行 【实验步骤】 …

计算机视觉 -- 图像分割

文章目录 1. 图像分割2. FCN2.1 语义分割– FCN (Fully Convolutional Networks)2.2 FCN--deconv2.3 Unpool2.4 拓展–DeconvNet 3. 实例分割3.1 实例分割--Mask R-CNN3.2 Mask R-CNN3.3 Faster R-CNN与 Mask R-CNN3.4 Mask R-CNN:Resnet1013…

ES搭建集群

一、创建 elasticsearch-cluster 文件夹 创建 elasticsearch-7.8.0-cluster 文件夹,在内部复制三个 elasticsearch 服务。 然后每个文件目录中每个节点的 config/elasticsearch.yml 配置文件 node-1001 节点 #节点 1 的配置信息: #集群名称&#xff0…

【数据备份、恢复、迁移与容灾】上海道宁与云祺科技为企业用户提供云数据中心容灾备份解决方案

云祺容灾备份系统支持 主流虚拟化环境下的虚拟机备份 提供对云基础设施 云架构平台以及 应用系统的全方位数据保护 云祺容灾备份系统规范功能 增强决策能力 高效恢复数据至可用状态 有效降低恢复成本 更大限度减少业务中断时间 保障业务可访问性 开发商介绍 成都云祺…

LSTM数学计算公式

LSTM(长短期记忆网络)是一种循环神经网络(RNN)的变体,常用于处理时间序列相关的任务。下面将简要介绍LSTM的数学推导和公式模型。 在训练一般神经网络模型时,通常用,其中W为权重,X为输入&#…

算法通关村第九关——中序遍历与搜索树

1 中序遍历和搜索树原理 二叉搜索树按照中序遍历正好是一个递增序列。其比较规范的定义是: 若它的左子树不为空,则左子树上所有节点的值均小于它的根节点的值;若它的右子树不为空,则右子树所有节点的值均大于它的根节点的值&…

【网络层协议】ARP攻击与欺骗常见的手段以及工作原理

个人主页:insist--个人主页​​​​​​ 本文专栏:网络基础——带你走进网络世界 本专栏会持续更新网络基础知识,希望大家多多支持,让我们一起探索这个神奇而广阔的网络世界。 目录 一、ARP攻击的常见手段 第一种:IP…

【健康医疗】Axure用药提醒小程序原型图,健康管理用药助手原型模板

作品概况 页面数量:共 20 页 兼容软件:Axure RP 9/10,不支持低版本 应用领域:健康管理,用药助手 作品申明:页面内容仅用于功能演示,无实际功能 作品特色 本作品为「用药提醒」小程序原型图…

Spring Boot 知识集锦之actuator监控端点详解

文章目录 0.前言1.参考文档2.基础介绍默认支持的端点 3.步骤3.1. 引入依赖3.2. 配置文件3.3. 核心源码 4.示例项目5.总结 0.前言 背景: 一直零散的使用着Spring Boot 的各种组件和特性,从未系统性的学习和总结,本次借着这个机会搞一波。共同学…

Android NDK JNI与Java的相互调用

一、Jni调用Java代码 jni可以调用java中的方法和java中的成员变量,因此JNIEnv定义了一系列的方法来帮助我们调用java的方法和成员变量。 以上就是jni调用java类的大部分方法,如果是静态的成员变量和静态方法,可以使用***GetStaticMethodID、CallStaticObjectMethod等***。就…