Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {return timer.scheduleAtFixedRate(new ScheduledTrigger(),initDelay, baseInterval, TimeUnit.MILLISECONDS);}

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

public void checkpointState(CheckpointMetaData metadata,CheckpointOptions options,CheckpointMetricsBuilder metrics,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isCanceled) throws Exception {checkNotNull(options);checkNotNull(metrics);// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignmentsif (lastCheckpointId >= metadata.getCheckpointId()) {LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());channelStateWriter.abort(metadata.getCheckpointId(),new CancellationException("checkpoint aborted via notification"),true);checkAndClearAbortedStatus(metadata.getCheckpointId());return;}// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.lastCheckpointId = metadata.getCheckpointId();if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());return;}// if checkpoint has been previously unaligned, but was forced to be aligned (pointwise// connection), revert it here so that it can jump over output dataif (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {options = options.withUnalignedSupported();initInputsCheckpoint(metadata.getCheckpointId(), options);}// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstreamLOG.debug("Task {} broadcastEvent at {}, triggerTime {}, passed time {}",taskName,System.currentTimeMillis(),metadata.getTimestamp(),System.currentTimeMillis() - metadata.getTimestamp());CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());// Step (3): Register alignment timer to timeout aligned barrier to unaligned barrierregisterAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);// Step (4): Prepare to spill the in-flight buffers for input and outputif (options.needsChannelState()) {// output data already written while broadcasting eventchannelStateWriter.finishOutput(metadata.getCheckpointId());}// Step (5): Take the state snapshot. This should be largely asynchronous, to not impact// progress of the// streaming topologyMap<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());try {if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {finishAndReportAsync(snapshotFutures, metadata, metrics, options);} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));}} catch (Exception ex) {cleanup(snapshotFutures, metadata, metrics, ex);throw ex;}}

代码中可以看到构造了CheckpointBarrier, source将barrier当成数据广播给下游的所有节点。使用的方法就是operatorChain.brodacastEvent()。这里就回到最开始提到的异步屏障快照算法。

下游收到了barrier,如何进行快照处理的?flink同时有多种类型的checkpoint,他们分别的处理时机是啥,后面我会进一步进行代码分析。

CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/5151.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人工智慧时代的引擎:揭开机器人核心零部件的奥秘

机器人核心零部件技术现状及趋势 工业机器人是我国制造业的“顶冠明珠”&#xff0c;在机器人核心零部件的研发制造上&#xff0c;我国在很多方面已经接近国际顶尖水平&#xff0c;但一些核心技术仍无法满足复杂高端领域应用需求&#xff0c;如精密减速器的传动精度与寿命间竞争…

深度学习的瓶颈是什么!

深度学习主要的瓶颈&#xff1a; 数据依赖与标注问题&#xff1a;深度学习模型通常需要大量的标注数据来进行训练。然而&#xff0c;获取大量的标注数据不仅成本高昂&#xff0c;而且在某些领域&#xff08;如医疗、金融等&#xff09;中可能难以获取足够的标注数据。此外&…

人脸识别开源算法库和开源数据库

目录 1. 人脸识别开源算法库 1.1 OpenCV人脸识别模块 1.2 Dlib人脸识别模块 1.3 SeetaFace6 1.4 DeepFace 1.5 InsightFace 2. 人脸识别开源数据库 2.1 CelebA 2.2 LFW 2.3 MegaFace 2.4 Glint360K 2.5 WebFace260M 人脸识别 (Face Recognition) 是一种基于人的面部…

无人机反制:雷达探测+信号干扰器技术详解

固定翼无人机、旋翼无人机等&#xff0c;可折叠式无机、DIY无人机等。黑飞&#xff0c;监管困难给航空业带来了诸多隐患&#xff1b;给恐怖袭击及间谍侦察带来新的方式、引发了各国地区政府的忧虑&#xff0c;在中国存在的问题更加严峻。 反无人飞行器防御系统(AUDS)&#xff0…

【C++】手撕list(list的模拟实现)

目录 01.节点 02.迭代器 迭代器运算符重载 03.list类 &#xff08;1&#xff09;构造与析构 &#xff08;2&#xff09;迭代器相关 &#xff08;3&#xff09;容量相关 &#xff08;4&#xff09;访问操作 &#xff08;5&#xff09;插入删除 我们在学习数据结构的时候…

使用 GitHub Actions 实现项目的持续集成(CI)

目录 什么是 GitHub Actions 基础概念 Workflow 文件 Workflow 语法 实例&#xff1a;编译 OpenWrt 什么是 GitHub Actions GitHub Actions 是 GitHub 推出的持续集成&#xff08;Continuous Integration&#xff0c;简称 CI&#xff09;服务它允许你创建自定义工作流&am…

黑马面试篇1(续)

黑马面试篇1-CSDN博客&#xff08;续集&#xff09; 六、消息中间件篇 6.1 RabbitMQ 1&#xff09;使用场景&#xff1a; 异步发送&#xff08;验证码、短信、邮件…&#xff09;MYSQL和Redis , ES之间的数据同步分布式事务削峰填谷… 2&#xff09;RabbitMQ消息的重复消费问…

分享三款可以给pdf做批注的软件

PDF文件不像Word一样可以直接编辑更改&#xff0c;想要在PDF文件上进行编辑批注需要用到一些专业的软件&#xff0c;我自己常用的有三款&#xff0c;全都是官方专业正版的软件&#xff0c;功能丰富强大&#xff0c;使用起来非常方便&#xff01; 1.edge浏览器 这个浏览器不仅可…

【Spring】Spring中AOP的简介和基本使用,SpringBoot使用AOP

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 一、AOP简介 AOP的全称是Aspect-Oriented Programming&#xff0c;即面向切面编程&#xff08;也称面向方面编程&#xff09;。它是面向对象编程&#xff08;OOP&#xff09;的一种补充&#xff0c;目前已成为一种比较成…

ton-http-api安装部署

1、拉取github代码 mkdir /data git clone https://github.com/toncenter/ton-http-api.git cd ton-http-api2、创建环境变量 ./configure.py cat .env TON_API_CACHE_ENABLED0 TON_API_CACHE_REDIS_ENDPOINTcache_redis TON_API_CACHE_REDIS_PORT6379 TON_API_CACHE_REDIS_T…

Facebook’s Tectonic Filesystem: Efficiency from Exascale——论文阅读

FAST 2021 Paper 分布式元数据论文阅读笔记整理 背景 Blob storage 用来存放大量的文本、图片、视频等非结构化数据 包含 EB 级别的数据 存储内容大小不一&#xff0c;大小几KB到几MB不等 要求低时延 使用 Haystack 和 F4 Data warehouse 存放用于数据分析和机器学习的…

Leetcode—1232. 缀点成线【简单】

2024每日刷题&#xff08;122&#xff09; Leetcode—1232. 缀点成线 算法思想 实现代码 class Solution { public:bool checkStraightLine(vector<vector<int>>& coordinates) {int x0 coordinates[0][0];int y0 coordinates[0][1];int x1 coordinates[1…

Excel 中用于在一个范围中查找特定的值,并返回同一行中指定列的值 顺序不一样 可以处理吗

一、需求 Excel 中&#xff0c;在一列&#xff08;某范围内&#xff09;查找另一列特定的值&#xff0c;并返回同一行中另一指定列的值&#xff0c; 查找列和返回列的顺序不一样 二、 实现 1、下面是一个使用 INDEX 和 MATCH 函数的例子&#xff1a; 假设你有以下数据&…

python数据可视化:雷达图

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 python数据可视化&#xff1a; 雷达图 选择题 关于以下代码输出的雷达图中&#xff0c;以下说法正确的是&#xff1f; import numpy as np import matplotlib.pyplot as plt from pylab impor…

看懂原理图

EL3H7光耦 作用&#xff1a; 光耦还可以隔离驱动电机什么的、485隔离通讯啊、pwm信号传输&#xff0c;韦根&#xff0c;强电压。 参考&#xff1a;光耦应用及参数设计_el3h7光耦中文资料-CSDN博客

C# winform 漂亮的日期时间控件

源代码下载&#xff1a; https://download.csdn.net/download/gaoxiang19820514/89242240 效果图 在 HZH-Controls控件 基础上修改的日期控件 因为HZH_Controls控件 中的日期控件太大了&#xff0c; 我的程序中需要多个日期时间的控件放不下&#xff0c;主题是绿色的&#…

【介绍下Selenium】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

SpringWebFlux RequestBody多出双引号问题——ProxyPin抓包揪出真凶

缘起 公司有个服务做埋点收集的&#xff0c;可以参考我之前的文章埋点日志最终解决方案&#xff0c;今天突然发现有些数据日志可以输出&#xff0c;但是没法入库。 多出的双引号 查看Flink日志发现了JSON解析失败&#xff0c;Flink是从Kafka拿数据&#xff0c;Kafka本身不处…

C++信息学奥赛 数据结构认识

数据结构 1.1数据结构分类 1.2基本数据类型 1.3数字编码 1.4字符编码 1.1数据结构分类 数据结构如同一副稳固而多样的框架。为数据的有序组织提供了蓝图&#xff0c;算法得以在此基础上生动起来。 常用的数据结构包括哪些 &#xff0c; &#xff0c; &…

摩根大通推出创新工具 FlowMind,引领金融自动化新变革

近日&#xff0c;摩根大通人工智能研究部推出了一款极具创新性的工具——FlowMind&#xff0c;为金融行业带来了全新的工作模式和效率提升。 FlowMind 能够自动化金融工作流程&#xff0c;在信贷审批、风险评估、合规监测等重要任务中发挥着关键作用。它利用 GPT 自动生成工作…