简介: 本文由 Apache Flink PMC , 阿里巴巴高级技术专家李钰分享,主要从有状态的流计算、全局一致性快照 、Flink的容错机制、Flink的状态管理 四个方面介绍 Flink 的容错机制原理。
作者 | 李钰
分享人:本文由 Apache Flink PMC , 阿里巴巴高级技术专家李钰分享,主要介绍 Flink 的容错机制原理,内容大纲如下:
- 有状态的流计算
- 全局一致性快照
- Flink的容错机制
- Flink的状态管理
一、有状态的流计算
流计算
流计算是指有一个数据源可以持续不断地发送消息,同时有一个常驻程序运行代码,从数据源拿到一个消息后会进行处理,然后把结果输出到下游。
分布式流计算
分布式流计算是指把输入流以某种方式进行一个划分,再使用多个分布式实例对流进行处理。
流计算中的状态
计算可以分成有状态和无状态两种,无状态的计算只需要处理单一事件,有状态的计算需要记录并处理多个事件。
举个简单的例子。例如一个事件由事件ID和事件值两部分组成,如果处理逻辑是每拿到一个事件,都解析并输出它的事件值,那么这就是一个无状态的计算;相反,如果每拿到一个状态,解析它的值出来后,需要和前一个事件值进行比较,比前一个事件值大的时候才把它进行输出,这就是一个有状态的计算。
流计算中的状态有很多种。比如在去重的场景下,会记录所有的主键;又或者在窗口计算里,已经进入窗口还没触发的数据,这也是流计算的状态;在机器学习/深度学习场景里,训练的模型及参数数据都是流计算的状态。
二、全局一致性快照
全局一致性快照是可以用来给分布式系统做备份和故障恢复的机制。
全局快照
什么是全局快照
全局快照首先是一个分布式应用,它有多个进程分布在多个服务器上;其次,它在应用内部有自己的处理逻辑和状态;第三,应用间是可以互相通信的;第四,在这种分布式的应用,有内部状态,硬件可以通信的情况下,某一时刻的全局状态,就叫做全局的快照。
为什么需要全局快照
- 第一,用它来做检查点,可以定期对全局状态做备份,当应用程序故障时,就可以拿来恢复;
- 第二,做死锁检测,进行快照后当前的程序继续运行,然后可以对快照进行分析,看应用程序是不是存在死锁状态,如果是就可以进行相应的处理。
全局快照举例
下图为分布式系统中全局快照的示例。
P1和P2是两个进程,它们之间有消息发送的管道,分别是C12和C21。对于 P1进程来说, C12是它发送消息的管道,称作output channel; C21是它接收消息的管道,称作 input channel。
除了管道,每个进程都有一个本地的状态。比如说P1和P2每个进程的内存里都有XYZ三个变量和相应的值。那么 P1和P2进程的本地状态和它们之间发送消息的管道状态,就是一个初始的全局状态,也可称为全局快照。
假设P1给P2发了一条消息,让P2把x的状态值从4改为7,但是这个消息在管道中,还没到达P2。这个状态也是一个全局快照。
再接下来,P2收到了P1的消息,但是还没有处理,这个状态也是一个全局快照。
最后接到消息的P2把本地的X的值从4改为7,这也是一个全局快照。
所以当有事件发生的时候,全局的状态就会发生改变。事件包括进程发送消息、进程接收消息和进程修改自己的状态。
2.全局一致性快照
假如说有两个事件,a和b,在绝对时间下,如果a发生在b之前,且b被包含在快照当中,那么则a也被包含在快照当中。满足这个条件的全局快照,就称为全局一致性快照。
2.1 全局一致性快照的实现方法
时钟同步并不能实现全局一致性快照;全局同步虽然可以实现,但是它的缺点也非常明显,它会让所有应用程序都停下来,会影响全局的性能。
3.异步全局一致性快照算法 – Chandy-Lamport
异步全局一致性快照算法Chandy-Lamport可以在不影响应用程序运行的前提下,实现全局一致性快照。
Chandy-Lamport的系统要求有以下几点:
- 第一,不影响应用运行,也就是不影响收发消息,不需要停止应用程序;
- 第二,每个进程都可以记录本地状态;
- 第三,可以分布式地对已记录的状态进行收集;
- 第四,任意进程都可以发起快照
同时,Chandy-Lamport算法可以执行还有一个前提条件:消息有序且不重复,并且消息可靠性可保障。
3.1 Chandy-Lamport算法流程
Chandy-Lamport的算法流程主要分为三个部分:发起快照、分布式的执行快照和终止快照。
发起快照
任意进程都可以发起快照。如下图所示,当由P1发起快照的时候,第一步需要记录本地的状态,也就是对本地进行快照,然后立刻向它所有 output channel发送一个marker消息,这中间是没有时间间隙的。marker消息是一个特殊的消息,它不同于应用之间传递的消息。
发出Marker消息后,P1就会开始记录所有input channel的消息,也就是图示C21管道的消息。
分布式的执行快照
如下图,先假定当 Pi接收到来自Cki的marker消息,也就是Pk发给Pi的marker消息。可以分两种情况来看:
第一种情况:这个是Pi收到的第一个来自其它管道的marker消息,它会先记录一下本地的状态,再把 C12管道记为空,也就是说后续再从 P1发消息,就不包含在此次快照里了,与此同时立刻向它所有output channel发送marker消息。 最后开始记录来自除Cki之外的所有input channel的消息。
上面提到Cki消息不包含在实时快照里,但是实时消息还是会发生,所以第二种情况是,如果此前Pi已经接收过marker消息,它会停止记录 Cki消息,同时会将此前记录的所有Cki消息作为Cki在本次快照中的最终状态来保存。
终止快照
终止快照的条件有两个:
- 第一,所有进程都已经接收到marker消息,并记录在本地快照;
- 第二,所有进程都从它的n-1个input channel里收到了marker 消息,并记录了管道状态。
当快照终止,快照收集器 (Central Server) 就开始收集每一个部分的快照去形成全局一致性快照了。
示例展示
在下图的例子里,一些状态是在内部发生的,比如A,它跟其它进程没有交互。内部状态就是 P1发给自己消息,可以将A认为是C11=[A->]。
Chandy-Lamport全局一致性快照的算法是怎么执行的呢?
假设从p1来发起快照,它发起快照时,首先对本地的状态进行快照,称之为S1,然后立刻向它所有的output channel,即P2和P3,分别发marker消息,然后再去记录它所有input channel的消息,即来自P2和P3及自身的消息。
图例所示,纵轴是绝对时间,按照绝对时间来看,为什么P3和P2收到marker消息会有时间差呢?因为假如这是一个真实的物理环境里的分布式进程,不同节点之间的网络状况是不一样的,这种情况会导致消息送达时间存在差异。
P3先收到marker消息,且是它接收到的第一个marker消息。接收到消息后,它首先会对本地状态进行快照,然后把 C13管道的标记成 close,与此同时开始向它所有的output channel发送 marker消息,最后它会把来自除了C13之外的所有input channel的消息开始进行记录。
接收到P3发出的marker信息的是P1,但这不是它接收的第一个marker,它会把来自C31 channel的管道立刻关闭,并且把当前的记录消息做这个channel的快照,后续再接收到来自P3的消息,就不会更新在此次的快照状态里了。
接下来P2接收到来自P3的消息,这是它接到的第一个marker消息。接收到消息后,它首先对本地状态进行快照,然后把 C32管道的标记成 close,与此同时开始向它所有的output channel发送 marker消息,最后它会把来自除了C32之外的所有input channel的消息开始进行记录。
再来看P2接收到来自P1的消息,这不是P2接收到的第一个marker消息,所以它会把所有的 input channel全部关闭,并且记录channel的状态。
接下来看P1接收到来自P2的消息,这也不是它接收的第一个消息。那么它就会把所有的input channel关闭,并把记录的消息作为状态。那么这里面有两个状态,一个是C11,即自己发给自己的消息;一个是C21,是P2里H发给P1D的。
最后一个时间点,P3接收到来自P2的消息,这也不是它收到的第一个消息,操作跟上面介绍的一样。在这期间P3本地有一个事件J,它也会把J作为它的状态。
当所有进程都记录了本地状态,而且每一个进程的所有输入管道都已经关闭了,那么全局一致性快照就结束了,也就是对过去时间点的全局性的状态记录完成了。
3.3 Chandy-Lamport与 Flink之间的关系
Flink 是分布式系统,所以 Flink 会采用全局一致性快照的方式形成检查点,来支持故障恢复。Flink的异步全局一致性快照算法跟Chandy-Lamport算法的区别主要有以下几点:
- 第一,Chandy-Lamput支持强连通图,而 Flink支持弱连通图;
- 第二,Flink采用的是裁剪的(Tailored)Chandy-Lamput异步快照算法;
- 第三,Flink的异步快照算法在DAG场景下不需要存储Channel state,从而极大减少快照的存储空间。
三、Flink的容错机制
容错,就是恢复到出错前的状态。流计算容错一致性保证有三种,分别是:Exactly once,At least once,At most once。
- Exactly once,是指每条event会且只会对state产生一次影响,这里的“一次”并非端到端的严格一次,而是指在 Flink内部只处理一次,不包括source和sink的处理。
- At least once,是指每条event会对state产生最少一次影响,也就是存在重复处理的可能。
- At most once,是指每条event会对state产生最多一次影响,就是状态可能会在出错时丢失。
端到端的Exactly once
Exactly once的意思是,作业结果总是正确的,但是很可能产出多次;所以它的要求是需要有可重放的source。
端到端的Exactly once,是指作业结果正确且只会被产出一次,它的要求除了有可重放的source外,还要求有事务型的sink和可以接收幂等的产出结果。
Flink的状态容错
很多场景都会要求在Exactly once的语义,即处理且仅处理一次。如何确保语义呢?
简单场景的 Exactly Once 容错方法
简单场景的做法如下图,方法就是,记录本地状态并且把 source的offset,即 Event log的位置记录下来就好了。
分布式场景的状态容错
如果是分布式场景,我们需要在不中断运算的前提下对多个拥有本地状态的算子产生全局一致性快照。Flink 分布式场景的作业拓扑比较特殊,它是有向无环并且是弱联通图,可以采用裁剪的Chandy-Lamport,也就是只记录所有输入的offset和各个算子状态,并依赖rewindable source(可回溯的source,即可以通过offset读取比较早一点时间点),从而不需要存储channel的状态,这在存在聚合 (aggregation)逻辑的情况下可以节省大量的存储空间。
最后做恢复,恢复就是把数据源的位置重新设定,然后每一个算子都从检查点恢复状态。
3.Flink 的分布式快照方法
首先在源数据流里插入Checkpoint barrier,也就是上文提到的Chandy-Lamport算法里的marker message,不同的Checkpoint barrier会把流自然地切分多个段,每个段都包含了Checkpoint的数据;
Flink 里有一个全局的Coordinator,它不像Chandy-Lamport对任意一个进程都可以发起快照,这个集中式的 Coordinator会把Checkpoint barrier注入到每个source里,然后启动快照。当每个节点收到barrier后,因为 Flink 里面它不存储 Channel state,所以它只需存储本地的状态就好。
在做完了Checkpoint 后,每个算子的每个并发都会向Coordinator发送一个确认消息,当所有任务的确认消息都被Checkpoint Coordinator接收,快照就结束了。
4.流程演示
见下图示,假设Checkpoint N 被注入到 source里,这时source会先把它正在处理分区的offset记录下来。
随着时间的流逝,它会把Checkpoint barrier发送到两个并发的下游,当barrier分别到达两个并发,这两个并发会分别把它们本地的状态都记录在Checkpoint 的里:
最后barrier到达最终的subtask,快照就完成了。
这是比较简单的场景演示,每个算子只有单流的输入,再来看下图比较复杂的场景,算子有多流输入的情况。
当算子有多个输入,需要把Barrier 对齐。怎么把Barrier对齐呢?如下图所示,在左侧原本的状态下,当其中一条barrier到达,另一条barrier命令上有的barrier还在管道中没有到达,这时会在保证Exactly once的情况下,把先到达的流直接阻塞掉,然后等待另一条流的数据处理。等到另外一条流也到达了,会把之前的流unblock,同时把barrier发送到算子。
在这个过程中,阻塞掉其中一条流的作用是,会让它产生反压。Barrier 对齐会导致反压和暂停operator的数据处理。
如果不在对齐过程中阻塞已收到barrier的数据管道,数据持续不断流进来,那么属于下个Checkpoint的数据被包含在当前的Checkpoint里,如果一旦发生故障恢复后,由于source会被rewind,部分数据会有重复处理,这就是at-least-once。 如果能接收at-least-once,那么可以选择其他可以避免barrier对齐带来的副作用。另外也可以通过异步快照来尽量减少任务停顿并支持多个Checkpoint同时进行。
5.快照触发
本地快照同步上传到系统需要state Copy-on-write的机制。
假如对元数据信息做了快照之后数据处理恢复了,在上传数据的过程中如何保证恢复的应用程序逻辑不会修改正在上传的数据呢?实际上不同状态存储后端的处理是不一样的,Heap backend会触发数据的copy-on-write,而对于RocksDB backend来说LSM的特性可以保证已经快照的数据不会被修改。
四、Flink 的状态管理
1.Flink 状态管理
首先需要去定义一个状态,在下图的例子里,先定义一个Value state。
在定义的状态的时候,需要给出以下的几个信息:
- 状态识别ID
- 状态数据类型
- 本地状态后端注册状态
- 本地状态后端读写状态
2.Flink 状态后端
又叫state backend,Flink状态后端有两种;
- 第一种,JVM Heap,它里面的数据是以Java对象形式存在的,读写也是以对象形式去完成的,所以速度很快。但是也存在两个弊端:第一个弊端,以对象方式存储所需的空间是磁盘上序列化压缩后的数据大小的很多倍,所以占用的内存空间很大;第二个弊端,虽然读写不用做序列化,但是在形成snapshot时需要做序列化,所以它的异步snapshot过程会比较慢。
- 第二种,RocksDB,这个类型在读写时就需要做序列化,所以它读写的速度比较慢。但是它有一个好处,基于LSM的数据结构在快照之后会形成sst文件,它的异步checkpoint过程就是文件拷贝的过程,CPU消耗会比较低。
原文链接
本文为阿里云原创内容,未经允许不得转载。