Checkpoint 执行机制原理解析

在介绍Checkpoint的执行机制前,我们需要了解一下state的存储,因为stateCheckpoint进行持久化备份的主要角色。Checkpoint作为Flink最基础也是最关键的容错机制,Checkpoint快照机制很好地保证了Flink应用从异常状态恢复后的数据准确性。同时 Checkpoint相关的metrics(指标)也是诊断Flink应用健康状态最为重要的指标,成功且耗时较短的Checkpoint表明作业运行状况良好,没有异常或反压。然而,由于Checkpoint与反压的耦合,反压反过来也会作用于Checkpoint,导致Checkpoint的种种问题。Flink1.11引入Unaligned(未对齐)Checkpoint来解耦Checkpoint机制与反压机制,优化高反压情况下的Checkpoint表现。

Statebackend 的分类

下图阐释了目前Flink内置的三类state backend,其中MemoryStateBackendFsStateBackend在运行时都是存储在java heap中的,只有在执行Checkpoint时,FsStateBackend才会将数据以文件格式持久化到远程存储上。 而RocksDBStateBackend则借用了 RocksDB(内存磁盘混合的LSM DB)对state进行存储。
[点击并拖拽以移动] ​
对于在这里插入图片描述
HeapKeyedStateBackend,有两种实现:
【1】支持异步Checkpoint(默认): 存储格式CopyOnWriteStateMap
【2】仅支持同步Checkpoint 存储格式NestedStateMap

特别在MemoryStateBackend内使用HeapKeyedStateBackend时,Checkpoint序列化数据阶段默认有最大5 MB数据的限制。对于 RocksDBKeyedStateBackend,每个state都存储在一个单独的column family内,其中keyGroupKeyNamespace进行序列化存储在 DB作为key
[点击并拖拽以移动] ​

Checkpoint 执行机制详解

Checkpoint的执行流程逐步拆解进行讲解,下图左侧是Checkpoint Coordinator,是整个Checkpoint的发起者,中间是由两个 source,一个sink组成的Flink作业,最右侧的是持久化存储,在大部分用户场景中对应HDFS
[点击并拖拽以移动] ​

【1】Checkpoint Coordinator向所有source节点触发trigger Checkpoint
【2】source节点向下游广播barrier(分界线),这个barrier就是实现Chandy-Lamport分布式快照算法的核心,下游的task只有收到所有inputbarrier才会执行相应的Checkpoint

Chandy-Lamport算法将分布式系统抽象成DAG(暂时不考虑有闭环的图),节点表示进程,边表示两个进程间通信的管道。分布式快照的目的是记录下整个系统的状态,即可以分为节点的状态(进程的状态)和边的状态(信道的状态,即传输中的数据)。因为系统状态是由输入的消息序列驱动变化的,我们可以将输入的消息序列分为多个较短的子序列,图的每个节点或边先后处理完某个子序列后,都会进入同一个稳定的全局统状态。利用这个特性,系统的进程和信道在子序列的边界点分别进行本地快照,即使各部分的快照时间点不同,最终也可以组合成一个有意义的全局快照。
[点击并拖拽以移动] ​

从实现上看,Flink通过在DAG数据源定时向数据流注入名为Barrier的特殊元素,将连续的数据流切分为多个有限序列,对应多个 Checkpoint周期。每当接收到Barrier,算子进行本地的Checkpoint快照,并在完成后异步上传本地快照,同时将Barrier以广播方式发送至下游。当某个Checkpoint的所有Barrier到达DAG末端且所有算子完成快照,则标志着全局快照的成功。

[点击并拖拽以移动] ​

【3】当task完成state备份后,会将备份数据的地址state handle通知给Checkpoint coordinator
[点击并拖拽以移动] ​

【4】下游的sink节点收集齐上游两个inputbarrier之后,会执行本地快照,这里特地展示了RocksDB incremental(增量) Checkpoint的流程,首先RocksDB会全量刷数据到磁盘上(红色大三角表示),然后Flink框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。
[点击并拖拽以移动] ​

【5】同样的,sink节点在完成自己的Checkpoint之后,会将state handle返回通知Checkpoint Coordinator
[点击并拖拽以移动] ​

【6】最后,当Checkpoint coordinator收集齐所有taskstate handle,就认为这一次的Checkpoint全局完成了,向持久化存储中再备份一个Checkpoint meta文件。
[点击并拖拽以移动] ​

Checkpoint 的 EXACTLY_ONCE 语义

EXACTLY ONCE语义: 在有多个输入Channel的时候,为了数据准确性,算子会等待所有流的Barrier都到达之后才会开始本地的快照,这种机制被称为Barrier对齐。在对齐的过程中,算子只会继续处理的来自未出现Barrier Channel的数据,而其余Channel的数据会被写入输入队列(Flink通过一个input buffer将在对齐阶段收到的数据缓存起来),直至在队列满后被阻塞。当所有Barrier到达后(对齐),算子进行本地快照,输出 Barrier 到下游并恢复正常处理。
比起其他分布式快照,该算法的优势在于辅以Copy-On-Write技术的情况下不需要Stop The World影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。

AT LEAST ONCE语义: 无需缓存收集到的数据,会对后续直接处理,所以导致restore(恢复)时,数据可能会被多次处理。下图是官网文档里面就Checkpoint align的示意图:
[点击并拖拽以移动] ​

需要特别注意的是,FlinkCheckpoint机制只能保证Flink的计算过程可以做到EXACTLY ONCE,端到端的EXACTLY ONCE需要 sourcesink支持。

Checkpoint 与反压的耦合

目前的Checkpoint算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的Barrier对齐反而会加剧作业的反压,甚至导致作业的不稳定。

首先, Chandy-Lamport分布式快照的结束依赖于Marker的流动,而反压则会限制Marker的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致Checkpoint的时间点落后于实际数据流较多。这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果Checkpoint连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的Lag更大,通常带来更大的反压,形成一个恶性循环。

其次,Barrier对齐本身可能成为一个反压的源头,影响上游算子的效率,而这在某些情况下是不必要的。比如典型的情况是一个的作业读取多个Source,分别进行不同的聚合计算,然后将计算完的结果分别写入不同的Sink。通常来说,这些不同的Sink会复用公共的算子以减少重复计算,但并不希望不同Source间相互影响。
[点击并拖拽以移动] ​

假设一个作业要分别统计AB两个业务线的以天为粒度指标,同时还需要统计所有业务线以周为单位的指标,拓扑如上图所示。如果B业务线某天的业务量突涨,使得Checkpoint Barrier有延迟,那么会导致公用的Window Aggregate进行Barrier对齐,进而阻塞业务AFlatMap,最终令业务A的计算也出现延迟。
当然这种情况可以通过拆分作业等方式优化,但难免引入更多开发维护成本,而且更重要的是这本来就符合Flink用户常规的开发思路,应该在框架内尽量减小出现用户意料之外的行为的可能性。

Unaligned Checkpoint

为了解决这个问题,Flink1.11版本引入了Unaligned Checkpoint的特性。要理解Unaligned Checkpoint的原理,首先需要了解 Chandy-Lamport论文中对于Marker处理规则的描述:自行百度翻译

Marker-Sending Rule for a Process p. For each channel c, incident on, and
directed away from p:
p sends one marker along c after p records its state and before p sends further messages
along c.Marker-Receiving Rule for a Process q. On receiving a marker along a channel
C:
if q has not recorded its state thenbegin q records its state;q records the state c as the empty sequenceend
else q records the state of c as the sequence of messages received along c after q’s state
was recorded and before q received the marker along c.

其中关键是if q has not recorded its state,也就是接收到Marker时算子是否已经进行过本地快照。一直以来FlinkAligned Checkpoint通过Barrier对齐,将本地快照延迟至所有Barrier到达,因而这个条件是永真的,从而巧妙地避免了对算子输入队列的状态进行快照,但代价是比较不可控的 Checkpoint时长和吞吐量的降低 。实际上这和Chandy-Lamport算法是有一定出入的。举个例子,假设我们对两个数据流进行equal-join,输出匹配上的元素。按照Flink Aligned Checkpoint的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的Checkpoint周期):
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。
图 b: 算子分别读取Channel一个元素,输出2。随后接收到Channel 1Barrier,停止处理Channel 1后续的数据,只处理 Channel 2的数据。
图 c: 算子再消费2个自Channel 2的元素,接收到Barrier,开始本地快照并输出Barrier

对于相同的情况,Chandy-Lamport算法的状态变化如下:
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。
图 b: 算子分别处理两个Channel一个元素,输出结果2。此后接收到Channel 1Barrier,算子开始本地快照记录自己的状态,并输出Barrier
图 c: 算子继续正常处理两个Channel的输入,输出9。特别的地方是Channel 2后续元素会被保存下来,直到Channel 2Barrier出现(即Channel 297)。保存的数据会作为Channel的状态成为快照的一部分。

两者的差异主要可以总结为两点:
快照的触发是在接收到第一个Barrier时还是在接收到最后一个Barrier时。
是否需要阻塞已经接收到BarrierChannel的计算。

从这两点来看,新的 Unaligned Checkpoint将快照的触发改为第一个Barrier且取消阻塞Channel的计算 ,算法上与Chandy-Lamport基本一致,同时在实现细节方面结合Flink的定位做了几个改进。
首先,不同于 Chandy-Lamport模型的只需要考虑算子输入Channel的状态,Flink的算子有输入和输出两种Channel ,在快照时两者的状态都需要被考虑。其次,无论在Chandy-Lamport还是Flink Aligned Checkpoint算法中,Barrier都必须遵循其在数据流中的位置,算子需要等待Barrier被实际处理才开始快照。而Unaligned Checkpoint改变了这个设定,允许算子优先摄入并优先输出Barrier。如此一来,第一个到达Barrier会在算子的缓存数据队列(包括输入Channel和输出Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入Channel在其Barrier之前的数据会被写入快照中。
[点击并拖拽以移动] ​

这样的主要好处是,如果本身算子的处理就是瓶颈Chandy-LamportBarrier仍会被阻塞,但Unaligned Checkpoint则可以在 Barrier进入输入Channel就马上开始快照。这可以从很大程度上加快Barrier流经整个DAG的速度,从而降低Checkpoint整体时长。回到之前的例子,用Unaligned Checkpoint来实现,状态变化如下:
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。输出 Channel已存在结果数据1
图 b: 算子优先处理输入Channel 1Barrier,开始本地快照记录自己的状态,并将Barrier插到输出Channel末端。
图 c: 算子继续正常处理两个Channel的输入,输出29。同时算子会将Barrier越过的数据(即输入Channel 12和输出 Channel1)写入Checkpoint,并将输入Channel 2后续早于Barrier的数据(即 297)持续写入Checkpoint

比起Aligned Checkpoint中不同Checkpoint周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint进行快照和输出Barrier时,部分本属于当前Checkpoint的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前Checkpoint的输出数据却落到Barrier之后(因此未反映到下游算子的状态中)。

这也正是 Unaligned的含义: 不同Checkpoint周期的数据没有对齐,包括不同输入Channel之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从Checkpoint恢复时,不对齐的数据并不能由Source端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被Checkpoint恢复到对应Channel中,所以依然能提供只计算一次的准确结果。

当然,Unaligned Checkpoint并不是百分百优于Aligned Checkpoint,它会带来的已知问题就有:
【1】由于要持久化缓存数据,State Size会有比较大的增长,磁盘负载会加重。
【2】随着State Size增长,作业恢复时间可能增长,运维管理难度增加。

目前看来,Unaligned Checkpoint更适合容易产生高反压同时又比较重要的复杂作业。对于像数据ETL同步等简单作业,更轻量级的 Aligned Checkpoint显然是更好的选择。

总结:Flink 1.11Unaligned Checkpoint主要解决在高反压情况下作业难以完成Checkpoint的问题,同时它以磁盘资源为代价,避免了Checkpoint可能带来的阻塞,有利于提升Flink的资源利用率。随着流计算的普及,未来的Flink应用大概会越来越复杂,在未来经过实战打磨完善后Unaligned Checkpoint很有可能会取代Aligned Checkpoint成为Flink的默认Checkpoint策略。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/582607.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Zblog主题模板:ZblogitseanPage博客主题模板

zblog主题模板:ZblogitseanPage博客主题模板 ZblogitseanPage博客主题模板主要是以文字内容为主导,将页面的设计杂乱的图片和元素进行最小化或者去除,从而使整个页面更加简洁、清晰,突出信息的呈现。 下面介绍一下zblog主题模板:Z…

Docker自建私人云盘系统

Docker自建私人云盘系统。 有个人云盘需求的人,主要需求有这几类: 文件同步、分享需要。 照片、视频同步需要,尤其是全家人都是用的同步。 影视观看需要(分为家庭内部、家庭外部) 搭建个人网站/博客 云端OFFICE需…

配置代理解决跨域(CORS)问题

一、跨域 ? 我们在完成前后端分离项目时(VueSpringBoot),有很多人会遇到跨域问题(CORS)。 跨域(Cross-Origin Resource Sharing,CORS)是浏览器的一项安全功能&#xff…

H5向微信小程序发送信息(小程序web-view打开H5)

引入weixin-js-sdk npm i weixin-js-sdk 页面引入 // 引入wxjsimport wx from "weixin-js-sdk"; 点击触发方法 methods: {goweap(id){console.log(wx);// H5传递数据 (navigateBack)wx.miniProgram.navigateBack({delta: 1});wx.min…

Django 中集成 CKEditor 富文本编辑器详解

概要 在 Web 应用中,富文本编辑器是提高用户体验的重要组件之一。CKEditor 是一款流行的、功能丰富的富文本编辑器。在 Django 项目中集成 CKEditor 不仅可以提升内容编辑的灵活性,还能丰富用户的互动体验。本文将详细介绍如何在 Django 中集成和配置 C…

Apache Commons JCS缓存解决方案

第1章:引言 大家好,我是小黑!今天,咱们来聊聊Apache Commons JCS,一个Java界里的缓存大杀器。缓存技术,对于提高应用性能来说,就像是给它加了一剂兴奋剂,能让数据访问变得快如闪电。…

windows进行udp端口转发,解决项目中服务器收不到组播数据的问题

说明 windows7的netsh interface portproxy命令只支持tcp端口转发 如果要进行udp端口转发可以使用sokit 运行sokit 端口转发(以为tcp作为讲解,udp类似) 选择转发器 输入监听地址(SRC地址)和端口 输入转发地址&am…

零基础学Java第三天

复习与回顾: 1.八大基本数据类型整型: byte short int long浮点: float double字符: char布尔: boolean2.声明整型和浮点和布尔类型的数据 一个变量数据类型 变量名字 初始化的值;int a 234;float f1 34.5f;double d1 782.9;boolean b1 true;3.变量名…

深度学习核心技术与实践之深度学习基础篇

非书中全部内容,只是写了些自认为有收获的部分 神经网络 生物神经元的特点 (1)人体各种神经元本身的构成很相似 (2)早期的大脑损伤,其功能可能是以其他部位的神经元来代替实现的 (3&#x…

nmap扫描和MS17-010(永恒之蓝)漏洞攻击

情况介绍 攻击机kali中使用ifconfig可以看到ip是192.168.0.106。 靶机Window 7使用ipconfig可以看到ip是192.168.0.100。 Nmap Nmap全称为Network Mapper,又成为“上帝之眼”,可以扫描ip和端口。 然后在kali攻击机上使用nmap 192.168.0.103可以看…

一元函数微分学——刷题(2

目录 1.题目:2.解题思路和步骤:3.总结:小结: 1.题目: 2.解题思路和步骤: 这个题目可能看上去比较吓人,但是,又很简单;因为一切都太明确了,跟着我仔细看一下&…

SpingBoot的项目实战--模拟电商【2.登录】

🥳🥳Welcome Huihuis Code World ! !🥳🥳 接下来看看由辉辉所写的关于SpringBoot电商项目的相关操作吧 目录 🥳🥳Welcome Huihuis Code World ! !🥳🥳 一.功能需求 二.代码编写 …

目标检测损失函数:IoU、GIoU、DIoU、CIoU、EIoU、alpha IoU、SIoU、WIoU原理及Pytorch实现

前言 损失函数是用来评价模型的预测值和真实值一致程度,损失函数越小,通常模型的性能越好。不同的模型用的损失函数一般也不一样。损失函数主要是用在模型的训练阶段,如果我们想让预测值无限接近于真实值,就需要将损失值降到最低…

uniApp中uView组件库的丰富布局方法

目录 基本使用 #分栏间隔 #混合布局 #分栏偏移 #对齐方式 API #Row Props #Col Props #Row Events #Col Events UniApp的uView组件库是一个丰富的UI组件库,提供了各种常用的UI组件和布局方法,帮助开发者快速构建美观、灵活的界面。下面给你写一…

C++设计模式 #8 抽象工厂(Abstract Factory)

抽象工厂这个名字比较难以帮助理解,可以把抽象工厂理解为“品牌工厂”或者“家族工厂”。 动机 在软件系统中,经常面临着“一系列相互依赖的对象”的创建工作;同时,由于需求的变化,往往存在更多系列对象的创建工作。如…

C# 常用数据类型及取值范围

1.常见数据类型和取值范围 序号数据类型占字节数取值范围1byte10 到 2552sbyte1-128 到 1273short 2-32,768 到 32,7674ushort20 到 65,5355int4-2,147,483,648 到 2,147,483,6476uint40 到 4,294,967,2957float41.5 x 10−45 至 3.4 x 10388double85.0 10−324 到 1.…

位运算|比特位计数、汉明距离

位运算|比特位计数、汉明距离 338 比特位计数 /** 比特位计数法一:Brian Kernighan 算法的原理是:对于任意整数 x,令 xx & (x−1),该运算将 x 的二进制表示的最后一个 1 变成 0。因此,对 x 重复该操作&#xff0…

极智开发 | 解读英伟达软件生态 深度神经网络库cuDNN

欢迎关注,获取我的更多经验分享 大家好,我是极智视界,本文来介绍一下 解读英伟达软件生态 深度神经网络库cuDNN。 邀您加入我的知识星球「极智视界」,星球内有超多好玩的项目实战源码下载,链接:https://t.zsxq.com/0aiNxERDq cuDNN,全称为 NVIDIA CUDA Deep Neural Net…

Transform环境搭建与代码调试——Attention Is All Y ou Need

1、源代码 2、环境搭建 conda create -n transform python3.8 -y conda activate transform cd /media/lhy/Transforms/annotatedtransformerpip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple # # Uncomment for colabpip install -q torchdata…

一篇文章带你轻松入门Python

Python基础 1. Hello World! Python命令行 假设你已经安装好了Python, 那么在命令提示符输入: python 将直接进入python。然后在命令行提示符>>>后面输入: >>>print(Hello World!) 可以看到,随后在屏幕上输出: print是一个常用函数&#xf…