在介绍Checkpoint
的执行机制前,我们需要了解一下state
的存储,因为state
是Checkpoint
进行持久化备份的主要角色。Checkpoint
作为Flink
最基础也是最关键的容错机制,Checkpoint
快照机制很好地保证了Flink
应用从异常状态恢复后的数据准确性。同时 Checkpoint
相关的metrics
(指标)也是诊断Flink
应用健康状态最为重要的指标,成功且耗时较短的Checkpoint
表明作业运行状况良好,没有异常或反压。然而,由于Checkpoint
与反压的耦合,反压反过来也会作用于Checkpoint
,导致Checkpoint
的种种问题。Flink
在1.11
引入Unaligned
(未对齐)Checkpoint
来解耦Checkpoint
机制与反压机制,优化高反压情况下的Checkpoint
表现。
Statebackend 的分类
下图阐释了目前Flink
内置的三类state backend
,其中MemoryStateBackend
和FsStateBackend
在运行时都是存储在java heap
中的,只有在执行Checkpoint
时,FsStateBackend
才会将数据以文件格式持久化到远程存储上。 而RocksDBStateBackend
则借用了 RocksDB
(内存磁盘混合的LSM DB
)对state
进行存储。
[点击并拖拽以移动]
对于
HeapKeyedStateBackend
,有两种实现:
【1】支持异步Checkpoint
(默认): 存储格式CopyOnWriteStateMap
;
【2】仅支持同步Checkpoint
: 存储格式NestedStateMap
;
特别在MemoryStateBackend
内使用HeapKeyedStateBackend
时,Checkpoint
序列化数据阶段默认有最大5 MB
数据的限制。对于 RocksDBKeyedStateBackend
,每个state
都存储在一个单独的column family
内,其中keyGroup
,Key
和Namespace
进行序列化存储在 DB
作为key
。
Checkpoint 执行机制详解
对Checkpoint
的执行流程逐步拆解进行讲解,下图左侧是Checkpoint Coordinator
,是整个Checkpoint
的发起者,中间是由两个 source
,一个sink
组成的Flink
作业,最右侧的是持久化存储,在大部分用户场景中对应HDFS
。
【1】Checkpoint Coordinator
向所有source
节点触发trigger Checkpoint
;
【2】source
节点向下游广播barrier
(分界线),这个barrier
就是实现Chandy-Lamport
分布式快照算法的核心,下游的task
只有收到所有input
的barrier
才会执行相应的Checkpoint
。
Chandy-Lamport
算法将分布式系统抽象成DAG
(暂时不考虑有闭环的图),节点表示进程,边表示两个进程间通信的管道。分布式快照的目的是记录下整个系统的状态,即可以分为节点的状态(进程的状态)和边的状态(信道的状态,即传输中的数据)。因为系统状态是由输入的消息序列驱动变化的,我们可以将输入的消息序列分为多个较短的子序列,图的每个节点或边先后处理完某个子序列后,都会进入同一个稳定的全局统状态。利用这个特性,系统的进程和信道在子序列的边界点分别进行本地快照,即使各部分的快照时间点不同,最终也可以组合成一个有意义的全局快照。
从实现上看,Flink
通过在DAG
数据源定时向数据流注入名为Barrier
的特殊元素,将连续的数据流切分为多个有限序列,对应多个 Checkpoint
周期。每当接收到Barrier
,算子进行本地的Checkpoint
快照,并在完成后异步上传本地快照,同时将Barrier
以广播方式发送至下游。当某个Checkpoint
的所有Barrier
到达DAG
末端且所有算子完成快照,则标志着全局快照的成功。
【3】当task
完成state
备份后,会将备份数据的地址state handle
通知给Checkpoint coordinator
。
【4】下游的sink
节点收集齐上游两个input
的barrier
之后,会执行本地快照,这里特地展示了RocksDB incremental
(增量) Checkpoint
的流程,首先RocksDB
会全量刷数据到磁盘上(红色大三角表示),然后Flink
框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。
【5】同样的,sink
节点在完成自己的Checkpoint
之后,会将state handle
返回通知Checkpoint Coordinator
。
【6】最后,当Checkpoint coordinator
收集齐所有task
的state handle
,就认为这一次的Checkpoint
全局完成了,向持久化存储中再备份一个Checkpoint meta
文件。
Checkpoint 的 EXACTLY_ONCE 语义
EXACTLY ONCE
语义: 在有多个输入Channel
的时候,为了数据准确性,算子会等待所有流的Barrier
都到达之后才会开始本地的快照,这种机制被称为Barrier
对齐。在对齐的过程中,算子只会继续处理的来自未出现Barrier Channel
的数据,而其余Channel
的数据会被写入输入队列(Flink
通过一个input buffer
将在对齐阶段收到的数据缓存起来),直至在队列满后被阻塞。当所有Barrier
到达后(对齐),算子进行本地快照,输出 Barrier 到下游并恢复正常处理。
比起其他分布式快照,该算法的优势在于辅以Copy-On-Write
技术的情况下不需要Stop The World
影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。
AT LEAST ONCE
语义: 无需缓存收集到的数据,会对后续直接处理,所以导致restore
(恢复)时,数据可能会被多次处理。下图是官网文档里面就Checkpoint align
的示意图:
需要特别注意的是,Flink
的Checkpoint
机制只能保证Flink
的计算过程可以做到EXACTLY ONCE
,端到端的EXACTLY ONCE
需要 source
和sink
支持。
Checkpoint 与反压的耦合
目前的Checkpoint
算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的Barrier
对齐反而会加剧作业的反压,甚至导致作业的不稳定。
首先, Chandy-Lamport
分布式快照的结束依赖于Marker
的流动,而反压则会限制Marker
的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致Checkpoint
的时间点落后于实际数据流较多。这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果Checkpoint
连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的Lag
更大,通常带来更大的反压,形成一个恶性循环。
其次,Barrier
对齐本身可能成为一个反压的源头,影响上游算子的效率,而这在某些情况下是不必要的。比如典型的情况是一个的作业读取多个Source
,分别进行不同的聚合计算,然后将计算完的结果分别写入不同的Sink
。通常来说,这些不同的Sink
会复用公共的算子以减少重复计算,但并不希望不同Source
间相互影响。
假设一个作业要分别统计A
和B
两个业务线的以天为粒度指标,同时还需要统计所有业务线以周为单位的指标,拓扑如上图所示。如果B
业务线某天的业务量突涨,使得Checkpoint Barrier
有延迟,那么会导致公用的Window Aggregate
进行Barrier
对齐,进而阻塞业务A
的FlatMap
,最终令业务A
的计算也出现延迟。
当然这种情况可以通过拆分作业等方式优化,但难免引入更多开发维护成本,而且更重要的是这本来就符合Flink
用户常规的开发思路,应该在框架内尽量减小出现用户意料之外的行为的可能性。
Unaligned Checkpoint
为了解决这个问题,Flink
在1.11
版本引入了Unaligned Checkpoint
的特性。要理解Unaligned Checkpoint
的原理,首先需要了解 Chandy-Lamport
论文中对于Marker
处理规则的描述:自行百度翻译
Marker-Sending Rule for a Process p. For each channel c, incident on, and
directed away from p:
p sends one marker along c after p records its state and before p sends further messages
along c.Marker-Receiving Rule for a Process q. On receiving a marker along a channel
C:
if q has not recorded its state thenbegin q records its state;q records the state c as the empty sequenceend
else q records the state of c as the sequence of messages received along c after q’s state
was recorded and before q received the marker along c.
其中关键是if q has not recorded its state
,也就是接收到Marker
时算子是否已经进行过本地快照。一直以来Flink
的Aligned Checkpoint
通过Barrier
对齐,将本地快照延迟至所有Barrier
到达,因而这个条件是永真的,从而巧妙地避免了对算子输入队列的状态进行快照,但代价是比较不可控的 Checkpoint
时长和吞吐量的降低 。实际上这和Chandy-Lamport
算法是有一定出入的。举个例子,假设我们对两个数据流进行equal-join
,输出匹配上的元素。按照Flink Aligned Checkpoint
的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的Checkpoint
周期):
图 a: 输入Channel 1
存在3
个元素,其中2
在Barrier
前面;Channel 2
存在4
个元素,其中2
、9
、7
在Barrier
前面。
图 b: 算子分别读取Channel
一个元素,输出2
。随后接收到Channel 1
的Barrier
,停止处理Channel 1
后续的数据,只处理 Channel 2
的数据。
图 c: 算子再消费2
个自Channel 2
的元素,接收到Barrier
,开始本地快照并输出Barrier
。
对于相同的情况,Chandy-Lamport
算法的状态变化如下:
图 a: 输入Channel 1
存在3
个元素,其中2
在Barrier
前面;Channel 2
存在4
个元素,其中2
、9
、7
在Barrier
前面。
图 b: 算子分别处理两个Channel
一个元素,输出结果2
。此后接收到Channel 1
的Barrier
,算子开始本地快照记录自己的状态,并输出Barrier
。
图 c: 算子继续正常处理两个Channel
的输入,输出9
。特别的地方是Channel 2
后续元素会被保存下来,直到Channel 2
的 Barrier
出现(即Channel 2
的9
和7
)。保存的数据会作为Channel
的状态成为快照的一部分。
两者的差异主要可以总结为两点:
快照的触发是在接收到第一个Barrier
时还是在接收到最后一个Barrier
时。
是否需要阻塞已经接收到Barrier
的Channel
的计算。
从这两点来看,新的 Unaligned Checkpoint
将快照的触发改为第一个Barrier
且取消阻塞Channel
的计算 ,算法上与Chandy-Lamport
基本一致,同时在实现细节方面结合Flink
的定位做了几个改进。
首先,不同于 Chandy-Lamport
模型的只需要考虑算子输入Channel
的状态,Flink
的算子有输入和输出两种Channel
,在快照时两者的状态都需要被考虑。其次,无论在Chandy-Lamport
还是Flink Aligned Checkpoint
算法中,Barrier
都必须遵循其在数据流中的位置,算子需要等待Barrier
被实际处理才开始快照。而Unaligned Checkpoint
改变了这个设定,允许算子优先摄入并优先输出Barrier
。如此一来,第一个到达Barrier
会在算子的缓存数据队列(包括输入Channel
和输出Channel
)中往前跳跃一段距离,而被”插队”的数据和其他输入Channel
在其Barrier
之前的数据会被写入快照中。
这样的主要好处是,如果本身算子的处理就是瓶颈,Chandy-Lamport
的Barrier
仍会被阻塞,但Unaligned Checkpoint
则可以在 Barrier
进入输入Channel
就马上开始快照。这可以从很大程度上加快Barrier
流经整个DAG
的速度,从而降低Checkpoint
整体时长。回到之前的例子,用Unaligned Checkpoint
来实现,状态变化如下:
图 a: 输入Channel 1
存在3
个元素,其中2
在Barrier
前面;Channel 2
存在4
个元素,其中2
、9
、7
在Barrier
前面。输出 Channel
已存在结果数据1
。
图 b: 算子优先处理输入Channel 1
的Barrier
,开始本地快照记录自己的状态,并将Barrier
插到输出Channel
末端。
图 c: 算子继续正常处理两个Channel
的输入,输出2
、9
。同时算子会将Barrier
越过的数据(即输入Channel 1
的2
和输出 Channel
的1
)写入Checkpoint
,并将输入Channel 2
后续早于Barrier
的数据(即 2
、9
、7
)持续写入Checkpoint
。
比起Aligned Checkpoint
中不同Checkpoint
周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint
进行快照和输出Barrier
时,部分本属于当前Checkpoint
的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前Checkpoint
的输出数据却落到Barrier
之后(因此未反映到下游算子的状态中)。
这也正是 Unaligned
的含义: 不同Checkpoint
周期的数据没有对齐,包括不同输入Channel
之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从Checkpoint
恢复时,不对齐的数据并不能由Source
端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被Checkpoint
恢复到对应Channel
中,所以依然能提供只计算一次的准确结果。
当然,Unaligned Checkpoint
并不是百分百优于Aligned Checkpoint
,它会带来的已知问题就有:
【1】由于要持久化缓存数据,State Size
会有比较大的增长,磁盘负载会加重。
【2】随着State Size
增长,作业恢复时间可能增长,运维管理难度增加。
目前看来,Unaligned Checkpoint
更适合容易产生高反压同时又比较重要的复杂作业。对于像数据ETL
同步等简单作业,更轻量级的 Aligned Checkpoint
显然是更好的选择。
总结:Flink 1.11
的Unaligned Checkpoint
主要解决在高反压情况下作业难以完成Checkpoint
的问题,同时它以磁盘资源为代价,避免了Checkpoint
可能带来的阻塞,有利于提升Flink
的资源利用率。随着流计算的普及,未来的Flink
应用大概会越来越复杂,在未来经过实战打磨完善后Unaligned Checkpoint
很有可能会取代Aligned Checkpoint
成为Flink
的默认Checkpoint
策略。