【Flink状态管理五】Checkpoint的设计与实现

文章目录

  • 1. Checkpoint的整体设计
  • 2. Checkpoint创建源码解析
    • 2.1. DefaultExecutionGraphBuilder.buildGraph
    • 2.2. ExecutionGraph.enableCheckpointing

由于系统原因导致Flink作业无法正常运行的情况非常多,且很多时候都是无法避免的。对于Flink集群来讲,能够快速从异常状态中恢复,同时保证处理数据的正确性和一致性非常重要。Flink主要借助Checkpoint的方式保障整个系统状态数据的一致性,也就是基于ABS算法实现轻量级快照服务。

本节我们详细了解Checkpoint的设计与实现。

 

1. Checkpoint的整体设计

Checkpoint的执行过程分为三个阶段:启动、执行以及确认完成。其中Checkpoint的启动过程由JobManager管理节点中的CheckpointCoordinator组件控制,该组件会周期性地向数据源节点发送执行Checkpoint的请求,执行频率取决于用户配置的CheckpointInterval参数。

执行过程:

  1. 在JobManager管理节点通过CheckpointCoordinator组件向每个数据源节点发送Checkpoint执行请求,此时数据源节点中的算子会将消费数据对应的Position发送到JobManager管理节点中。
  2. JobManager节点会存储Checkpoint元数据,用于记录每次执行Checkpoint操作过程中算子的元数据信息,例如在FlinkKafkaConsumer中会记录消费Kafka主题的偏移量,用于确认从Kafka主题中读取数据的位置。
  3. 在数据源节点执行完Checkpoint操作后,继续向下游节点发送CheckpointBarrier事件,下游算子通过对齐Barrier事件,触发该算子的Checkpoint操作。
    当下游的map算子接收到数据源节点的Checkpoint
    Barrier事件后,首先对当前算子的数据进行处理,并等待其他上游数据源节点的Barrier事件到达。该过程就是Checkpoint
    Barrier对齐,目的是确保属于同一Checkpoint的数据能够全部到达当前节点。

在这里插入图片描述

Barrier事件的作用就是切分不同Checkpoint批次的数据。

  • 当map算子接收到所有上游的Barrier事件后,就会触发当前算子的Checkpoint操作,并将状态数据快照到指定的外部持久化介质中,该操作主要借助状态后端存储实现。

  • 当状态数据执行完毕后,继续将Barrier事件发送至下游的算子,进行后续算子的Checkpoint操作。

  • 另外,在map算子中执行完Checkpoint操作后,也会向JobManager管理节点发送Ack消息,确认当前算子的Checkpoint操作正常执行。此时Checkpoint数据会存储该算子对应的状态数据,如果StateBackend为MemoryStateBackend,则主要会将状态数据存储在JobManager的堆内存中

sink节点的ack

像map算子节点一样,当Barrier事件到达sink类型的节点后,sink节点也会进行Barrier对齐操作,确认上游节点的数据全部接入。然后对接入的数据进行处理,将结果输出到外部系统中。完成以上步骤后,sink节点会向JobManager管理节点发送Ack确认消息,确认当前Checkpoint中的状态数据都正常进行了持久化操作。(之后呢?当任务结束之后,cp会消失还是?)

 

2. Checkpoint创建源码解析

通过调用StreamExecutionEnvironment.enableCheckpointing(),开启Checkpoint。
此时Checkpoint的配置会被存储在StreamGraph中,然后将StreamGraph中的CheckpointConfig转换为JobCheckpointingSettings数据结构存储在JobGraph对象中,并伴随JobGraph提交到集群运行。启动JobMaster服务后,JobMaster调度和执行Checkpoint操作。

2.1. DefaultExecutionGraphBuilder.buildGraph

如下代码,通过JobGraph构建ExecutionGraph的过程中,获取JobGraph中存储的JobCheckpointingSettings配置,然后创建ExecutionGraph。

1)根据snapshotSettings配置获取triggerVertices、ackVertices以及confirmVertices节点集合,并转换为对应的ExecutionJobVertex集合。

  • 其中triggerVertices集合存储了所有SourceOperator节点,这些节点通过CheckpointCoordinator主动触发Checkpoint操作。
  • ackVertices和confirmVertices集合存储了StreamGraph中的全部节点,代表所有节点都需要返回Ack确认信息并确认Checkpoint执行成功。

2)创建CompletedCheckpointStore组件,用于存储Checkpoint过程中的元数据。

  • 当对作业进行恢复操作时会在CompletedCheckpointStore中检索最新完成的Checkpoint元数据信息,然后基于元数据信息恢复Checkpoint中存储的状态数据。CompletedCheckpointStore有两种实现,分别为StandaloneCompletedCheckpointStore和ZooKeeperCompletedCheckpointStore。
  • 在CompletedCheckpointStore中通过maxNumberOfCheckpointsToRetain参数配置以及结合checkpointIdCounter计数器保证只会存储固定数量的CompletedCheckpoint。

3)创建CheckpointStatsTracker实例
用于监控和追踪Checkpoint执行和更新的情况,包括Checkpoint执行的统计信息以及执行状况,WebUI中显示的Checkpoint监控数据主要来自CheckpointStatsTracker。

4)创建StateBackend,从UserClassLoader中反序列化出应用指定的StateBackend并设定为applicationConfiguredBackend。

5)初始化用户自定义的Checkpoint Hook函数

6)最终调用executionGraph.enableCheckpointing()方法,在作业的执行和调度过程中开启Checkpoint。

// 配置状态数据checkpointing
// 从jobGraph中获取JobCheckpointingSettings
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
//如果snapshotSettings不为空,则开启checkpoint功能
if (snapshotSettings != null) {List<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);List<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);//创建CompletedCheckpointStoreCompletedCheckpointStore completedCheckpoints;CheckpointIDCounter checkpointIdCounter;try {int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);if (maxNumberOfCheckpointsToRetain <= 0) {maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();}// 通过recoveryFactory创建CheckpointStorecompletedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);   // 通过recoveryFactory创建CheckpointIDCountercheckpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);}catch (Exception e) {throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);}// 获取checkpoints最长的记录次数int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);// 创建CheckpointStatsTracker实例CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(historySize,ackVertices,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// 从application中获取StateBackendfinal StateBackend applicationConfiguredBackend;final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();if (serializedAppConfigured == null) {applicationConfiguredBackend = null;}else {try {applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId,"Could not deserialize application-defined state backend.", e);}}// 获取最终的rootBackendfinal StateBackend rootBackend;try {rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, jobManagerConfig, classLoader, log);}catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}// 初始化用户自定义的checkpoint Hooks函数final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();final List<MasterTriggerRestoreHook<?>> hooks;// 如果serializedHooks为空,则hooks为空if (serializedHooks == null) {hooks = Collections.emptyList();}else {// 加载MasterTriggerRestoreHookfinal MasterTriggerRestoreHook.Factory[] hookFactories;try {hookFactories = serializedHooks.deserializeValue(classLoader);}catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);}// 设定ClassLoader为UserClassLoaderfinal Thread thread = Thread.currentThread();final ClassLoader originalClassLoader = thread.getContextClassLoader();thread.setContextClassLoader(classLoader);// 创建hooks函数try {hooks = new ArrayList<>(hookFactories.length);for (MasterTriggerRestoreHook.Factory factory : hookFactories) {hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}}// 将thread的ContextClassLoader设定为originalClassLoaderfinally {thread.setContextClassLoader(originalClassLoader);}}// 获取CheckpointCoordinatorConfigurationfinal CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();// 开启executionGraph中的Checkpoint功能executionGraph.enableCheckpointing(chkConfig,triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpoints,rootBackend,checkpointStatsTracker);
}

 

2.2. ExecutionGraph.enableCheckpointing

继续看ExecutionGraph.enableCheckpointing()方法的实现,包含如下逻辑。

  1. 将tasksToTrigger、tasksToWaitFor以及tasksToCommitTo三个ExecutionJobVertex集合转换为ExecutionVertex[]数组,每个ExecutionVertex代表ExecutionJobVertex中的一个SubTask节点。
  2. 容错管理:创建CheckpointFailureManager,用于Checkpoint执行过程中的容错管理,包含failJob和failJobDueToTaskFailure两个处理方法。
  3. 定时调度和执行:创建checkpointCoordinatorTimer,用于Checkpoint异步线程的定时调度和执行
  4. 协调和管理作业中的Checkpoint:创建CheckpointCoordinator组件,通过CheckpointCoordinator协调和管理作业中的Checkpoint,同时收集各Task节点中Checkpoint的执行状况等信息。
  5. Hook:将Master Hook注册到CheckpointCoordinator中,实现用户自定义Hook代码的调用。
  6. 控制CheckpointCoordinator的启停:将JobStatusListener的实现类CheckpointCoordinatorDeActivator注册到JobManager中,此时系统会根据作业的运行状态控制CheckpointCoordinator的启停,当作业的状态为Running时会触发启动CheckpointCoordinator组件。
public void enableCheckpointing(CheckpointCoordinatorConfiguration chkConfig,List<ExecutionJobVertex> verticesToTrigger,List<ExecutionJobVertex> verticesToWaitFor,List<ExecutionJobVertex> verticesToCommitTo,List<MasterTriggerRestoreHook<?>> masterHooks,CheckpointIDCounter checkpointIDCounter,CompletedCheckpointStore checkpointStore,StateBackend checkpointStateBackend,CheckpointStatsTracker statsTracker) {checkState(state == JobStatus.CREATED, "Job must be in CREATED state");checkState(checkpointCoordinator == null, "checkpointing already enabled");ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");// 创建CheckpointFailureManagerCheckpointFailureManager failureManager = new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(),new CheckpointFailureManager.FailJobCallback() {@Overridepublic void failJob(Throwable cause) {getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));}@Overridepublic void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {getJobMasterMainThreadExecutor().execute(()  -> failGlobalIfExecutionIsStillRunning(cause, failingTask));}});// 创建checkpointCoordinatorTimercheckState(checkpointCoordinatorTimer == null);checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor(new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));// 创建checkpointCoordinatorcheckpointCoordinator = new CheckpointCoordinator(jobInformation.getJobId(),chkConfig,tasksToTrigger,tasksToWaitFor,tasksToCommitTo,checkpointIDCounter,checkpointStore,checkpointStateBackend,ioExecutor,new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),SharedStateRegistry.DEFAULT_FACTORY,failureManager);// 向checkpoint Coordinator中注册master Hooksfor (MasterTriggerRestoreHook<?> hook : masterHooks) {if (!checkpointCoordinator.addMasterHook(hook)) {LOG.warn("Trying to register multiple checkpoint hooks with the name: {}",hook.getIdentifier());}}//向checkpointCoordinator中设定checkpointStatsTrackercheckpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);// 注册JobStatusListener,用于自动启动CheckpointCoordinatorif (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());}this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
}

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/694040.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opencv图像处理(一)

一. OpenCV 简介 OpenCV 是一个跨平台计算机视觉库&#xff0c;可以运行在Linux、Windows、Android和Mac OS操作系统上。 应用领域 1、人机互动 2、物体识别 3、图像分割 4、人脸识别 5、动作识别 6、运动跟踪 7、机器人 8、运动分析 9、机器视觉 10、…

整数分块 (因数平方和)(余数之和)

整数分块 文章目录 整数分块例题1&#xff1a;因数平方和分析:具体代码&#xff1a;__int128写法逆元写法 例题2&#xff1a;余数之和思想&#xff1a;代码 一般在算法中遇到时间复杂度为1e9的&#xff0c; 那么一次 O ( n ) O(n) O(n)的遍历无法解决问题 求 ∑ i 1 n [ n i ]…

3.网络游戏逆向分析与漏洞攻防-游戏启动流程漏洞-游戏启动流程的分析

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;项目搭建 首先下图红框里是游戏启动的程序 游戏启动之后的名字&#xff08;fxgame.exe&#xff09; 一般游戏启动的架构&#xff1a; 第一种&#xff1a;登录器程序启动游戏主程序&#xff0c;然后游…

java面向对象上:类的结构之一

目录 1.相同点 2.不同点 2.1 在类中声明的位置的不同 2.2 关于权限修饰符的不同 2.3 默认初始化值的情况&#xff1a; 2.4 在内存中加载的位置 补充&#xff1a;回顾变量的分类&#xff1a; 方式一&#xff1a;按照数据类型&#xff1a; 方式二&#xff1a;按照在类中…

【Flutter】底部导航BottomNavigationBar的使用

常用基本属性 属性名含义是否必须items底部导航栏的子项List是currentIndex当前显示索引否onTap底部导航栏的点击事件&#xff0c; Function(int)否type底部导航栏类型&#xff0c;定义 [BottomNavigationBar] 的布局和行为否selectedItemColor选中项图标和label的颜色否unsel…

工业网关的功能和优势,以及如何选择合适的工业网关-天拓四方

工业网关是连接各种工业设备和系统的通信设备&#xff0c;可以实现不同设备和系统之间的数据交换和通信。它可以作为一个中心节点&#xff0c;将各种工业设备连接起来&#xff0c;形成一个统一的通信网络&#xff0c;从而实现设备的远程监控、数据采集、分析和控制等功能。在工…

抖音小店新手应该怎么做?4个必须掌握的运营步骤,助你快速入门

大家好&#xff0c;我是电商花花。 很多新手在刚开始接触电商&#xff0c;接触抖音小店的时候都会感到迷茫吗&#xff0c;不知所措&#xff0c;新店刚开始都是从没有流量&#xff0c;没有销量&#xff0c;没有订单走过来的&#xff0c;我们也是。 新手做店都是需要方法&#…

wpf grid 列之间存在间隙

上图为grid的两列布局&#xff0c;中间的白线实际为两列的间隙&#xff0c;BorderThickness"0" 并不能消除 解决方法&#xff1a; <Grid RenderOptions.EdgeMode"Aliased"> # 在grid上添加属性

二分图模型即状态整理

二分图首先是个无向图。 主要有以下几类问题&#xff1a; 1.二分图&#xff0c;不存在奇数环&#xff0c;染色法不存在矛盾 2.匈牙利算法&#xff0c;匹配&#xff0c;最大匹配&#xff0c;匹配点&#xff0c;增广路径 3.最小点覆盖&#xff0c;最大独立集&#xff0c;最小路径…

在VS里使用C#制作窗口应用

新建项目 创建项目的时候搜索net&#xff0c;选择这个。 打开应该是这样 第一个控件 选择公共控件 - PictureBox - 拖入Form 在Image处选择上传本地资源&#xff0c;建议上传一个小一点的图片。 修改一下尺寸。 ctrls 保存 从“属性”切换到“事件” 双击Click事件…

DBSCAN密度聚类介绍 样本点 样本集合 半径 邻域 核心对象 边界点 密度直达 密度可达 密度相连

DBSCAN密度聚类介绍 样本点 样本集合 半径 邻域 核心对象 边界点 密度直达 密度可达 密度相连 简介概念定义原理DBSCAN的优点DBSCAN的缺点小尝试制作不易&#xff0c;感谢三连&#xff0c;谢谢啦 简介 DBSCAN&#xff08;Density-Based Spatial Clustering of Applications wi…

【算法】动态规划1,最小花费爬楼梯,解码方法

一、动态规划简介 动态规划 , 英文名称 Dynamic Programming , 简称 DP , 不是具体的某种算法 , 是一种算法思想 ; 动态规划 , 没有具体的步骤 , 只有一个核心思想 ; 动态规划 的 核心思想 是 由大化小 , 大规模问题 使用 小规模问题 计算结果 解决 , 类似于 分治算法 ; 二、…

srs集群下行edge处理逻辑

官方关于源站集群的介绍&#xff1a; Origin Cluster | SRS 下行边缘是指观众端从边缘edge拉流&#xff0c;边缘edge回源到源站origin节点拉流&#xff0c;然后再 把流转给客户端 边缘处理类SrsPlayEdge 当服务器收到播放请求时&#xff0c;创建对应的consumer消费者。在创…

Docker后台启动镜像,如何查看日志信息

执行 docker run -d -p 9090:8080 core-backend-image 命令后&#xff0c;Docker 会在后台运行一个新的容器实例&#xff0c;并映射宿主机的 9090 端口到容器的 8080 端口。要查看启动的容器日志&#xff0c;您需要先获取容器的 ID 或名称&#xff0c;然后使用 docker logs 命令…

Linux系统之iptables应用SNAT与DNAT

一、SNAT&#xff1a; 1.应用环境 局域网主机共享单个公网IP地址接入Internet &#xff08;私有IP不能在Internet中正常路由&#xff09; 2.SNAT原理 源地址转换&#xff0c;根据指定条件修改数据包的源IP地址&#xff0c;通常被叫做源映谢数据包从内网发送到公网时&#x…

Fiddler与wireshark使用

Fiddler解决三个问题 1、SSL证书打勾&#xff0c;解析https请求 2、响应回来乱码&#xff0c;不是中文 3、想及时中止一下&#xff0c;查看实时的日志 4、搜索对应的关键字 问题1解决方案&#xff1a; 标签栏Tools下 找到https&#xff0c;全部打勾 Actions里面 第一个 t…

从输入url到页面显示中间发生了什么

文章目录 整体概述URL释义用户输入缓存处理域名解析IP 地址什么是域名解析浏览器查找域名对应IP小结 TCP 三次握手握手时序三次握手数据包分析为什么需要三次握手 HTTP 请求HTTP 响应服务器MVC 后台处理阶段http 响应报文 TCP 四次挥手浏览器渲染 整体概述 浏览器输入 URL 到页…

如何搭建Facebook直播网络?

在当今数字化时代&#xff0c;Facebook直播已经成为了一种极具吸引力的社交形式&#xff0c;为个人和企业提供了与观众直接互动的机会&#xff0c;成为推广产品、分享经验、建立品牌形象的重要途径。然而&#xff0c;对于许多人来说&#xff0c;搭建一个稳定、高质量的 Faceboo…

创意办公:专注 ONLYOFFICE,探索办公新境界

一.ONLYOFFICE 介绍 ONLYOFFICE 是一个基于 Web 的办公套件&#xff0c;提供了文档处理、电子表格和演示文稿编辑等功能。它被设计为一个协作工具&#xff0c;支持多人实时协作编辑文档&#xff0c;并且可以在本地部署或者作为云服务使用。 二.ONLYOFFICE 特点和功能 以下是 …

品牌渠道管控的目标是什么

品牌做渠道管控的根本原因是解决渠道中的各种问题&#xff0c;常见的渠道问题包含破价、窜货、假货等&#xff0c;在治理渠道的过程中&#xff0c;其实也是对渠道中各角色关系的梳理&#xff0c;比如通过治理破价链接&#xff0c;可以及时发现渠道中不符合品牌价值的经销商&…