Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {return timer.scheduleAtFixedRate(new ScheduledTrigger(),initDelay, baseInterval, TimeUnit.MILLISECONDS);}

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

public void checkpointState(CheckpointMetaData metadata,CheckpointOptions options,CheckpointMetricsBuilder metrics,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isCanceled) throws Exception {checkNotNull(options);checkNotNull(metrics);// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignmentsif (lastCheckpointId >= metadata.getCheckpointId()) {LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());channelStateWriter.abort(metadata.getCheckpointId(),new CancellationException("checkpoint aborted via notification"),true);checkAndClearAbortedStatus(metadata.getCheckpointId());return;}// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.lastCheckpointId = metadata.getCheckpointId();if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());return;}// if checkpoint has been previously unaligned, but was forced to be aligned (pointwise// connection), revert it here so that it can jump over output dataif (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {options = options.withUnalignedSupported();initInputsCheckpoint(metadata.getCheckpointId(), options);}// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstreamLOG.debug("Task {} broadcastEvent at {}, triggerTime {}, passed time {}",taskName,System.currentTimeMillis(),metadata.getTimestamp(),System.currentTimeMillis() - metadata.getTimestamp());CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());// Step (3): Register alignment timer to timeout aligned barrier to unaligned barrierregisterAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);// Step (4): Prepare to spill the in-flight buffers for input and outputif (options.needsChannelState()) {// output data already written while broadcasting eventchannelStateWriter.finishOutput(metadata.getCheckpointId());}// Step (5): Take the state snapshot. This should be largely asynchronous, to not impact// progress of the// streaming topologyMap<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());try {if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {finishAndReportAsync(snapshotFutures, metadata, metrics, options);} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));}} catch (Exception ex) {cleanup(snapshotFutures, metadata, metrics, ex);throw ex;}}

代码中可以看到构造了CheckpointBarrier, source将barrier当成数据广播给下游的所有节点。使用的方法就是operatorChain.brodacastEvent()。这里就回到最开始提到的异步屏障快照算法。

下游收到了barrier,如何进行快照处理的?flink同时有多种类型的checkpoint,他们分别的处理时机是啥,后面我会进一步进行代码分析。

CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/5151.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac运行npm run serve报错opensslErrorStack

问题描述 mac电脑运行npm run serve报错 opensslErrorStack: [error:03000086:digital envelope routines::initialization error,error:0308010C:digital envelope routines::unsupported],library: digital envelope routines,reason: unsupported,code: ERR_OSSL_EVP_UNSUP…

Python3+Request+Pytest接口自动化测试框架

Python3+Request+Pytest接口自动化测试框架 作者:HMF 2023/11/26 1、Python3 接口自动化测试框架 框架名称:Python3+Request(requests_pkcs12)+Pytest 1.1、框架优点 采用数据驱动方式来解决大量功能重复性接口的测试。 代码分层(工具类(数据处理),服务类(接口请求处理),…

人工智慧时代的引擎:揭开机器人核心零部件的奥秘

机器人核心零部件技术现状及趋势 工业机器人是我国制造业的“顶冠明珠”&#xff0c;在机器人核心零部件的研发制造上&#xff0c;我国在很多方面已经接近国际顶尖水平&#xff0c;但一些核心技术仍无法满足复杂高端领域应用需求&#xff0c;如精密减速器的传动精度与寿命间竞争…

深度学习的瓶颈是什么!

深度学习主要的瓶颈&#xff1a; 数据依赖与标注问题&#xff1a;深度学习模型通常需要大量的标注数据来进行训练。然而&#xff0c;获取大量的标注数据不仅成本高昂&#xff0c;而且在某些领域&#xff08;如医疗、金融等&#xff09;中可能难以获取足够的标注数据。此外&…

人脸识别开源算法库和开源数据库

目录 1. 人脸识别开源算法库 1.1 OpenCV人脸识别模块 1.2 Dlib人脸识别模块 1.3 SeetaFace6 1.4 DeepFace 1.5 InsightFace 2. 人脸识别开源数据库 2.1 CelebA 2.2 LFW 2.3 MegaFace 2.4 Glint360K 2.5 WebFace260M 人脸识别 (Face Recognition) 是一种基于人的面部…

无人机反制:雷达探测+信号干扰器技术详解

固定翼无人机、旋翼无人机等&#xff0c;可折叠式无机、DIY无人机等。黑飞&#xff0c;监管困难给航空业带来了诸多隐患&#xff1b;给恐怖袭击及间谍侦察带来新的方式、引发了各国地区政府的忧虑&#xff0c;在中国存在的问题更加严峻。 反无人飞行器防御系统(AUDS)&#xff0…

【C++】手撕list(list的模拟实现)

目录 01.节点 02.迭代器 迭代器运算符重载 03.list类 &#xff08;1&#xff09;构造与析构 &#xff08;2&#xff09;迭代器相关 &#xff08;3&#xff09;容量相关 &#xff08;4&#xff09;访问操作 &#xff08;5&#xff09;插入删除 我们在学习数据结构的时候…

商城数据库88章表72~75

schooldb库——utf8字符集——utf8_general_ci排序规则 先创建库&#xff0c;再去使用下列的DDL语句。 &#xff08;72&#xff09;DDL——消息类型表 CREATE TABLE huang_shop_message_cats (id int(11) NOT NULL AUTO_INCREMENT COMMENT 自增ID,msgDateId int(11) DEFAULT…

使用 GitHub Actions 实现项目的持续集成(CI)

目录 什么是 GitHub Actions 基础概念 Workflow 文件 Workflow 语法 实例&#xff1a;编译 OpenWrt 什么是 GitHub Actions GitHub Actions 是 GitHub 推出的持续集成&#xff08;Continuous Integration&#xff0c;简称 CI&#xff09;服务它允许你创建自定义工作流&am…

黑马面试篇1(续)

黑马面试篇1-CSDN博客&#xff08;续集&#xff09; 六、消息中间件篇 6.1 RabbitMQ 1&#xff09;使用场景&#xff1a; 异步发送&#xff08;验证码、短信、邮件…&#xff09;MYSQL和Redis , ES之间的数据同步分布式事务削峰填谷… 2&#xff09;RabbitMQ消息的重复消费问…

分享三款可以给pdf做批注的软件

PDF文件不像Word一样可以直接编辑更改&#xff0c;想要在PDF文件上进行编辑批注需要用到一些专业的软件&#xff0c;我自己常用的有三款&#xff0c;全都是官方专业正版的软件&#xff0c;功能丰富强大&#xff0c;使用起来非常方便&#xff01; 1.edge浏览器 这个浏览器不仅可…

【Spring】Spring中AOP的简介和基本使用,SpringBoot使用AOP

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 一、AOP简介 AOP的全称是Aspect-Oriented Programming&#xff0c;即面向切面编程&#xff08;也称面向方面编程&#xff09;。它是面向对象编程&#xff08;OOP&#xff09;的一种补充&#xff0c;目前已成为一种比较成…

ton-http-api安装部署

1、拉取github代码 mkdir /data git clone https://github.com/toncenter/ton-http-api.git cd ton-http-api2、创建环境变量 ./configure.py cat .env TON_API_CACHE_ENABLED0 TON_API_CACHE_REDIS_ENDPOINTcache_redis TON_API_CACHE_REDIS_PORT6379 TON_API_CACHE_REDIS_T…

Facebook’s Tectonic Filesystem: Efficiency from Exascale——论文阅读

FAST 2021 Paper 分布式元数据论文阅读笔记整理 背景 Blob storage 用来存放大量的文本、图片、视频等非结构化数据 包含 EB 级别的数据 存储内容大小不一&#xff0c;大小几KB到几MB不等 要求低时延 使用 Haystack 和 F4 Data warehouse 存放用于数据分析和机器学习的…

Leetcode—1232. 缀点成线【简单】

2024每日刷题&#xff08;122&#xff09; Leetcode—1232. 缀点成线 算法思想 实现代码 class Solution { public:bool checkStraightLine(vector<vector<int>>& coordinates) {int x0 coordinates[0][0];int y0 coordinates[0][1];int x1 coordinates[1…

Excel 中用于在一个范围中查找特定的值,并返回同一行中指定列的值 顺序不一样 可以处理吗

一、需求 Excel 中&#xff0c;在一列&#xff08;某范围内&#xff09;查找另一列特定的值&#xff0c;并返回同一行中另一指定列的值&#xff0c; 查找列和返回列的顺序不一样 二、 实现 1、下面是一个使用 INDEX 和 MATCH 函数的例子&#xff1a; 假设你有以下数据&…

python数据可视化:雷达图

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 python数据可视化&#xff1a; 雷达图 选择题 关于以下代码输出的雷达图中&#xff0c;以下说法正确的是&#xff1f; import numpy as np import matplotlib.pyplot as plt from pylab impor…

看懂原理图

EL3H7光耦 作用&#xff1a; 光耦还可以隔离驱动电机什么的、485隔离通讯啊、pwm信号传输&#xff0c;韦根&#xff0c;强电压。 参考&#xff1a;光耦应用及参数设计_el3h7光耦中文资料-CSDN博客

python 调用 llama

参考&#xff1a; https://blog.51cto.com/u_16175437/9317548 方法一&#xff1a; 要在Python中调用Llama.ai模型来生成回答&#xff0c;你可以使用transformers库&#xff0c;它提供了调用不同的预训练模型的接口。以下是一个简单的例子&#xff0c;展示了如何使用transform…

课程35:Docker容器:Windows Containers与Linux Containers区别

这里写目录标题 🚀前言一、Windows Containers与Linux Containers区别二、基于Linux本地Docker调试2.1 添加Docker支持2.2 Windows与Linux本地Docker调试的区别2.2.1 对比不同目标OS2.2.2 Dockerfile区别2.2.3 项目文件2.3 总结三、Windows下基于Linux Containers部署3.1 发布…