Flink checkpoint 源码分析- Checkpoint snapshot 处理流程

背景

在上一篇博客中我们分析了代码中barrier的是如何流动改的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客

最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 

现在我们接着跟踪相应代码,观察是如何算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。

代码分析

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。

processBarrier方法实现上就可以看出,flink barrier的处理分成两种。

在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。

通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。

org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler

这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。

对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理历程。

总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。

At least once 下 barrier是如何处理的

at least once 下对于barrier的处理是在以下的方法中实现的。

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier

public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {final long barrierId = receivedBarrier.getId();// fast path for single channel trackersif (totalNumberOfInputChannels == 1) {markAlignmentStartAndEnd(receivedBarrier.getTimestamp());notifyCheckpoint(receivedBarrier);return;}// general path for multiple input channelsif (LOG.isDebugEnabled()) {LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount barrierCount = null;int pos = 0;for (CheckpointBarrierCount next : pendingCheckpoints) {if (next.checkpointId == barrierId) {barrierCount = next;break;}pos++;}if (barrierCount != null) {// add one to the count to that barrier and check for completionint numBarriersNew = barrierCount.incrementBarrierCount();if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listenerif (!barrierCount.isAborted()) {if (LOG.isDebugEnabled()) {LOG.debug("Received all barriers for checkpoint {}", barrierId);}markAlignmentEnd();notifyCheckpoint(receivedBarrier);}}}else {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif (barrierId > latestPendingCheckpointID) {markAlignmentStart(receivedBarrier.getTimestamp());latestPendingCheckpointID = barrierId;pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}

如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.

在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。

实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。

这里面当收取到第一个barrier,会将这个barrier信息存在个队列中。

当收取到时非第一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候就会将barrier队列中在这个barrier之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。

总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。

Exactly once checkpoint是如何处理的

首先看这一段的代码

@Overridepublic void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);}return;}checkNewCheckpoint(barrier);checkState(currentCheckpointId == barrierId);if (numBarriersReceived++ == 0) {if (getNumOpenChannels() == 1) {markAlignmentStartAndEnd(barrier.getTimestamp());} else {markAlignmentStart(barrier.getTimestamp());}}// we must mark alignment end before calling currentState.barrierReceived which might// trigger a checkpoint with unfinished future for alignment durationif (numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}}try {currentState = currentState.barrierReceived(context, channelInfo, barrier);} catch (CheckpointException e) {abortInternal(barrier.getId(), e);} catch (Exception e) {ExceptionUtils.rethrowIOException(e);}if (numBarriersReceived == numOpenChannels) {numBarriersReceived = 0;lastCancelledOrCompletedCheckpointId = currentCheckpointId;LOG.debug("{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);resetAlignmentTimer();allBarriersReceivedFuture.complete(null);}}

这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。

这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。

这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.

如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。

这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived

org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived

最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。

总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。

unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/8465.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FTTR(光猫)ITMS注册NCE纳管

ITMS注册 TR069交互过程&#xff1a; 1.1. TR069交互—主动连接机制 主动连接机制是指CPE主动发出请求连接事件(事件可以为&#xff1a; 0 BOOTSTRAP&#xff1b; 1 BOOT; PERIODIC等等)给ACS。在连接建立之后才能进行业务处理(通过调用RPC方法实现)。 备注&#xff1a;政企…

2024.5.8

聊天框完善 #include "mywidget.h" #include "ui_mywidget.h"MyWidget::MyWidget(QWidget *parent): QWidget(parent), ui(new Ui::MyWidget) {ui->setupUi(this);//设置窗口大小this->resize(400,560);//设置窗口图标和标题this->setWindowTit…

Android C++ 开发调试 LLDB 工具的使用

文章目录 调试环境准备基础命令Breakpoint CommandsWatchpoint CommandsExamining VariablesEvaluating ExpressionsExamining Thread StateExecutable and Shared Library Query Commands 参考&#xff1a; Android 中在进行 NDK 开发的时候&#xff0c;我们经常需要进行 C 代…

隐式3D形状表示:Occupancy Networks

OccNet 的关键思想是隐式地表示3D形状&#xff0c;而不是显式地表示。与直接编码形状几何信息不同&#xff0c;OccNet 将形状的表面建模为非线性分类器的决策边界。 隐式表示&#xff1a;Occupancy Networks 将 3D 形状表示为非线性分类器函数的决策边界 f θ : R 3 X → [ 0…

2024年颠覆商业模式《本草生活》项目,巧妙三招营销引流裂变套路

2024年颠覆商业模式《本草生活》项目&#xff0c;巧妙三招营销引流裂变套路 文丨微三云营销总监胡佳东&#xff0c;点击上方“关注”&#xff0c;为你分享市场商业模式电商干货。 - 引言&#xff1a;现如今流量枯竭、降本增效、红利不再已是线上营销的常态&#xff0c;互联网…

静态照片怎么合成gif?详细介绍一个方法

我们在各大平台中都能看到各种样式的gif动图。Gif动图其实就是由一帧一帧的静态图片合成的动态效果的gif&#xff0c;想要制作gif动画可以通过使用在线图片合成&#xff08;https://www.gif5.net/&#xff09;工具-GIF5工具网&#xff0c;手机、pc均可操作&#xff0c;只需要上…

nestjs 全栈进阶--自定义装饰器

视频教程 20_nest中自定义装饰器_哔哩哔哩_bilibili nest new custom-decorator -p pnpm pnpm start:dev 在Nestjs 中我们使用了大量装饰器 decorator &#xff0c;所以Nestjs 也允许我们去自定义装饰器。 1. 自定义方法装饰器 nest g decorator aaa --flat 它生产的代码…

详细分析McCabe环路复杂度(附例题)

目录 前言1. 基本知识2. 例题 前言 该知识点常出在408或者软考中&#xff0c;对此此文重点讲讲理论知识以及例题 对于例题平时看到也会更新 1. 基本知识 McCabe环路复杂度是一种用于衡量软件代码复杂性的指标&#xff0c;主要是通过计算代码中的控制流图中的环路数量来衡量…

机房——蓝桥杯十三届2022国赛大学B组真题

问题分析 这题用深搜广搜都能做&#xff0c;不过我更倾向于用广搜&#xff0c;因为广搜能更容易找到目标点。那么是采用结构体存储边还是采用二维数组存储临接矩阵呢&#xff1f;我们注意到n的取值范围为1e5,用二维数组哪怕是bool类型就需要至少1e10Byte的连续空间,这个空间太大…

5V升8.4V2A升压恒压WT3231

5V升8.4V2A升压恒压WT3231 WT3231 是一种高性能直流-直流&#xff08;DC-DC&#xff09;转换器&#xff0c;集成了能够承受10A电流和26mΩ低导通电阻的功率MOSFET。该转换器能提供高达12V的稳定输出电压&#xff0c;并具有固定600KHz开关频率&#xff0c;使得小型外部电感和电…

解决github无法克隆私有仓库,Repository not found问题(2024最新)

一、背景 这个问题出现&#xff0c;是你用了其他主机设备&#xff0c;需要重新clone私有库时&#xff0c;发现一直报找不到仓库&#xff0c;如下报错&#xff1a; remote: Repository not found.二、解决方法 &#xff08;1&#xff09;账号密码方式&#xff08;已不支持&am…

构建自己的docker镜像node.js

学习资源&#xff1a; 构建自己的 Docker 镜像_哔哩哔哩_bilibili 针对其中的一些比较困难的点写篇文章。 以下是对app.js的注释&#xff1a; // 使用 Koa 框架搭建 Node.js 应用的示例代码// 这两行代码引入了 koa 模块&#xff0c;并创建了一个新的 Koa 应用实例&#xf…

C++之QT文本处理QDir、QFileDialog、QStringList、QFile

一、相应的头文件 #include <QFileDialog> #include <QDir> #include <QStringList> 二、简介 1.QFileDialog 实际效果如下&#xff1a;比如需要选择打开的文件夹或者文件名&#xff0c;通过调用资源管理器的方式进行可视化操作。 代码示例为&#xff1a…

gitlab集群高可用架构拆分部署

目录 前言 负载均衡器准备 外部负载均衡器 内部负载均衡器 (可选)Consul服务 Postgresql拆分 1.准备postgresql集群 手动安装postgresql插件 2./etc/gitlab/gitlab.rb配置 3.生效配置文件 Redis拆分 1./etc/gitlab/gitlab.rb配置 2.生效配置文件 Gitaly拆分 1.…

五月加仓比特币

作者&#xff1a;Arthur Hayes Co-Founder of 100x. 编译&#xff1a;Liam 编者注&#xff1a;本文略有删减 (以下内容仅代表作者个人观点&#xff0c;不应作为投资决策的依据&#xff0c;也不应被视为参与投资交易的建议或意见&#xff09;。 从四月中旬到现在&#xff0c;当你…

flask框架的初步认识

flask框架的初步认识 这是一个轻量级的网页框架&#xff0c;在运行后&#xff0c;就相当于服务器&#xff0c;当用户输入URL就会触发对应的事件调用方法&#xff0c;返回给用户一个网页文件&#xff0c;并通过自动识别html标签&#xff0c;来为用户呈现对应的样式和效果&#…

小红书达人置换合作推广怎么做?

小红书作为国内领先的生活方式分享平台&#xff0c;已成为品牌与消费者沟通的重要桥梁。达人置换合作推广&#xff0c;即品牌与小红书上的意见领袖&#xff08;KOL&#xff09;合作&#xff0c;通过他们的影响力推广产品&#xff0c;已成为品牌营销的重要手段。本文伯乐网络传媒…

【光速上手 Hydra 】一行代码自动跑多次实验,Hydra 中的 Multirun 参数如何使用?

Hydra 是一个开源的 Python 框架&#xff0c;简化了研究和其他复杂应用的开发。其关键特性是能够通过组合动态地创建一个分层次的配置&#xff0c;并通过配置文件和命令行进行覆盖。Hydra 的名称来源于其能够运行多个类似的作业 - 就像一个有多个头的九头蛇一样。 主要特性&am…

TikTok 正式起诉美国政府;全新 iPad Pro 将搭载苹果 M4 芯片丨 RTE 开发者日报 Vol.199

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」&#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real Time Engagement&#xff09; 领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、「…

机器学习 | 时间序列预测中的AR模型及应用

自回归模型&#xff0c;通常缩写为AR模型&#xff0c;是时间序列分析和预测中的一个基本概念。它们在金融、经济、气候科学等各个领域都有广泛的应用。在本文中&#xff0c;我们将探索自回归模型&#xff0c;它们如何工作&#xff0c;它们的类型和实际例子。 自回归模型 自回…