Flink checkpoint 源码分析

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {return timer.scheduleAtFixedRate(new ScheduledTrigger(),initDelay, baseInterval, TimeUnit.MILLISECONDS);}

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

public void checkpointState(CheckpointMetaData metadata,CheckpointOptions options,CheckpointMetricsBuilder metrics,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isCanceled) throws Exception {checkNotNull(options);checkNotNull(metrics);// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignmentsif (lastCheckpointId >= metadata.getCheckpointId()) {LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());channelStateWriter.abort(metadata.getCheckpointId(),new CancellationException("checkpoint aborted via notification"),true);checkAndClearAbortedStatus(metadata.getCheckpointId());return;}// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.lastCheckpointId = metadata.getCheckpointId();if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());return;}// if checkpoint has been previously unaligned, but was forced to be aligned (pointwise// connection), revert it here so that it can jump over output dataif (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {options = options.withUnalignedSupported();initInputsCheckpoint(metadata.getCheckpointId(), options);}// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstreamLOG.debug("Task {} broadcastEvent at {}, triggerTime {}, passed time {}",taskName,System.currentTimeMillis(),metadata.getTimestamp(),System.currentTimeMillis() - metadata.getTimestamp());CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());// Step (3): Register alignment timer to timeout aligned barrier to unaligned barrierregisterAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);// Step (4): Prepare to spill the in-flight buffers for input and outputif (options.needsChannelState()) {// output data already written while broadcasting eventchannelStateWriter.finishOutput(metadata.getCheckpointId());}// Step (5): Take the state snapshot. This should be largely asynchronous, to not impact// progress of the// streaming topologyMap<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());try {if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {finishAndReportAsync(snapshotFutures, metadata, metrics, options);} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));}} catch (Exception ex) {cleanup(snapshotFutures, metadata, metrics, ex);throw ex;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/5085.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《ElementPlus 与 ElementUI 差异集合》el-dialog 显示属性有差异

ElementPlus 用属性 v-model ElementUI 用属性 visible 其实也是 Vue2/Vue3 的差异&#xff1a;v-model 指令在组件上的使用已经被重新设计&#xff0c;替换掉了 v-bind.sync

JENKINS 安装,学习运维从这里开始

Download and deployJenkins – an open source automation server which enables developers around the world to reliably build, test, and deploy their softwarehttps://www.jenkins.io/download/首先点击上面。下载Jenkins 为了学习&#xff0c;从windows开始&#x…

preg_match详解(反向引用和捕获组)

在讲preg_match函数之前&#xff0c;我们先了解一下什么是php可变变量 php可变变量 在PHP中双引号包裹的字符串中可以解析变量&#xff0c;而单引号则不行 也就是在php中&#xff0c;双引号里面如果包含有变量&#xff0c;php解释器会将其替换为变量解释后的结果&#xff1b…

AI系列:大语言模型的RAG(检索增强生成)技术(上)

前言 大型语言模型&#xff08;LLM&#xff09;虽然在生成文本方面表现出色&#xff0c;但仍然存在一些局限性&#xff1a;数据是静态的&#xff0c;而且缺乏垂直细分领域的知识。为了克服这些限制&#xff0c;有时候会进行进一步的模型训练和微调。在实际应用中&#xff0c;我…

基于深度学习检测恶意流量识别框架(80+特征/99%识别率)

基于深度学习检测恶意流量识别框架 目录 基于深度学习检测恶意流量识别框架简要示例a.检测攻击类别b.模型训练结果输出参数c.前端检测页面d.前端训练界面e.前端审计界面&#xff08;后续更新了&#xff09;f.前端自学习界面&#xff08;自学习模式转换&#xff09;f1.自学习模式…

【嵌入式Linux】阻塞与非阻塞IO为何能降低CPU使用率

本文主要记录嵌入式Linux内核中阻塞与非阻塞IO访问的应用&#xff0c;以及解释了为何二者可以降低CPU使用率 阻塞与非阻塞IO为何能降低CPU使用率 0. 授权须知1. 通俗解释2. 场景描述3. 阻塞IO之———等待队列使用详解4. 非阻塞IO之———poll select4.1 poll 访问4.2 selct 访…

华为L410终端及麒麟KOS上如何安装安卓应用

原文链接&#xff1a;华为L410终端及麒麟KOS上如何安装安卓应用 Hello&#xff0c;大家好啊&#xff01;随着移动应用的普及&#xff0c;越来越多的用户希望在个人电脑上运行安卓应用&#xff0c;以便更好地整合工作和生活中的信息。特别是在华为L410终端和麒麟KOS操作系统上&a…

在线教程|零门槛部署 Llama 3,70B 版本只占 1.07G 存储空间,新用户免费体验 8B 版本

4 月 18 日&#xff0c;Meta 宣布开源 Llama 3&#xff0c;这个号称「迄今为止最好的开源大模型」一经发布&#xff0c;立刻引爆科技圈&#xff01; 发布当天恰逢斯坦福大学教授、AI 顶尖专家吴恩达的生日&#xff0c;作为 AI 开源倡导者&#xff0c;他激动地发文表示&#xff…

亿图图示使用教程

亿图图示是一款强大的图形绘制工具&#xff0c;可以用于创建流程图、思维导图、组织结构图等多种类型的图表。下面是一些基本的使用教程&#xff1a; 下载和安装&#xff1a;首先&#xff0c;你需要在官方网站上下载亿图图示的安装包&#xff0c;然后按照提示进行安装。 新建项…

Tesla P4终于在DL580 Gen9上面跑起来了!

正文共&#xff1a;666 字 11 图&#xff0c;预估阅读时间&#xff1a;1 分钟 跌跌撞撞&#xff0c;从Tesla M4终于走到了Tesla P40&#xff0c;显存从4 GB到8 GB&#xff0c;最后再到24 GB&#xff0c;真是不容易。 回顾一下&#xff0c;Tesla M4是最早开始搞的&#xff0c;经…

CI/CD:基于kubernetes的Gitlab搭建

1. 项目目标 &#xff08;1&#xff09;熟悉使用k8s环境搭建Gitlab &#xff08;2&#xff09;熟练应用Gitlab基本配置 2. 项目准备 2.1. 规划节点 主机名 主机IP 节点规划 k8s-master 10.0.1.1 kube_master k8s-node1 10.0.1.2 kube_node k8s-node2 10.0.1.3 k…

【AI心理咨询测评】一年后,AI心理咨询的路还有多远?——5例AI模型心理咨询能力测评对比

前言 随着GPT横空出世&#xff0c;AI心理健康的市场开始逐渐被开拓。有人联想到线上以GPT作为基础&#xff0c;开发可线上心理咨询的AI&#xff0c;例如国内的聆心智能。然而&#xff0c;这一想法也遭到了无数人的质疑&#xff1a;“连聊天都尚不能很好完成&#xff0c;去做心…

第⑰讲:Ceph集群各组件的配置参数调整

文章目录 1.Ceph集群各组件的配置文件1.1.Ceph各组件配置方式1.2.ceph临时查看、修改配置参数的方法 2.调整Monitor组件的配置参数删除Pool资源池2.1.临时调整配置参数2.2.永久修改配置参数 1.Ceph集群各组件的配置文件 1.1.Ceph各组件配置方式 Ceph集群中各个组件的默认配置…

【Jenkins】持续集成与交付 (一):深入理解什么是持续集成?

🟣【Jenkins】持续集成与交付 (一):深入理解什么是持续集成? 1、软件开发生命周期与持续集成2、 持续集成的流程3、持续集成的好处4、Jenkins的应用实践5、结语💖The Begin💖点点关注,收藏不迷路💖 1、软件开发生命周期与持续集成 软件开发生命周期(SDLC)是指软…

C语言:项目实践(贪吃蛇)

前言&#xff1a; 相信大家都玩过贪吃蛇这款游戏吧&#xff0c;贪吃蛇是久负盛名的游戏&#xff0c;它也和俄罗斯方块&#xff0c;扫雷等游戏位列经典游戏的行列&#xff0c;那贪吃蛇到底是怎么实现的呢&#xff1f; 今天&#xff0c;我就用C语言带着大家一起来实现一下这款游戏…

微软如何打造数字零售力航母系列科普04 - 微软联合Adobe在微软365应用程序中工作时推出新的生成式AI功能

微软和Adobe正在合作&#xff0c;将情境营销见解和工作流程引入微软Copilot&#xff0c;以提供生成的人工智能功能&#xff0c;使营销人员和营销团队能够在自然的工作流程中实现更多目标。 这些新的集成功能将在生产力和协作工具&#xff08;如Outlook、Teams和Word&#xff0…

【事业单位专场】联考、省市统考、单独招考

一、考编概述 1、事业单位类别 事业单位是指由国家出资或委托管理的公共机构&#xff0c;其主要职能是为社会提供公共服务。在中国&#xff0c;事业单位覆盖了科研、教育、文化和卫生等多个领域&#xff0c;并且有着不同的类型。以下是一些主要的分类&#xff1a; 教育事业单…

NLP(10)--TFIDF优劣势及其应用Demo

前言 仅记录学习过程&#xff0c;有问题欢迎讨论 TF*IDF&#xff1a; 优势&#xff1a; 可解释性好 可以清晰地看到关键词 即使预测结果出错&#xff0c;也很容易找到原因 计算速度快 分词本身占耗时最多&#xff0c;其余为简单统计计算 对标注数据依赖小 可以使用无标注语…

【状态机dp 状态压缩 分组】1994. 好子集的数目

本文涉及知识点 动态规划汇总 动态规划 状态机dp 状态压缩 分组 LeetCode1994. 好子集的数目 给你一个整数数组 nums 。如果 nums 的一个子集中&#xff0c;所有元素的乘积可以表示为一个或多个 互不相同的质数 的乘积&#xff0c;那么我们称它为 好子集 。 比方说&#xff…

离散数学之命题逻辑思维导图+大纲笔记(预习、期末复习,考研,)

大纲笔记&#xff1a; 命题逻辑的基本概念 命题与联结词 命题 命题是推理的基本单位 真命题&#xff0c;假命题 特征 陈述句 唯一的真值 是非真即假的陈述句 非命题 疑问句 祈使句 可真可假 悖论 模糊性 三个基本概念 复合命题 真值取决于原子命题的值和逻辑联结词 原子命题 逻…