1 checkpoint
Flink 故障恢复机制的核心,就是应用状态的一致性检查点checkpoint。
在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint,处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下来,因为是有状态的流式计算所以除了当前处理的数据之外还应该有当前的状态。因为在状态编程中,我们可能会自定义状态,所以直接保存当前的数据和他的状态是不行的,还要知道在具体的操作流程里面到底执行到哪了,这样的话太复杂了,做不到。其实核心的一点就是要知道当前数据到底处理完没有。Flink提出的是不要保存当前所有的数据了,不管当前处理的数据是什么(如果要考虑就要考虑对应的每一个状态到底改变过没有),就考虑同一个数据,所有任务都处理完之后把那个状态取出来。
在Spark中是针对RDD做存盘,里面就是数据,现在是怎样的数据全部存到硬盘,故障恢复把数据拿出来重新算一遍,这个想法非常简单,因为Spark是批处理,数据全存下来,恢复的时候全做一遍,这是基于批处理的一种简单实现。在流处理FLink中要想存数据的话要么存全量,要么直接重置偏移量到最开始全部回滚,这个效率太低了。所以把之前做到某一步的状态保存下来。
有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份快照;这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。
假设输入数据是由自然数构成的数据流,source读取之后,按照奇偶分区,sum求和。source里面的状态是保存当前读到哪个数,也就是偏移量,现在是8表示已经处理完8这个数了(有可能在处理第6个数,但是状态是5),此时sum也已经把8这个数处理完了。现在可以做一个快照把这3个状态存起来,存到定义好的状态后端,JobManager进行管理了,会保存当前checkpoint的id,元信息(source对应哪个,sum对应哪个),这里source的状态是当前处理的偏移量,sum状态是之前所有处理完数据之后的累加和。
为什么不存储数据,而是存储所有任务的状态?假设如果要存储数据的话,有可能source在处理9,8还在去sum的路上,sum有可能还在处理6(也有可能处理完),如果把6,4,3这三个数据存储,这样最后这样恢复的话首先没有状态,source没有偏移量要消费的数据丢掉了,sum之前累加的结果没有记录不行。那么连着状态存起来,有个问题怎么知道当前数据到底要不要重新处理,有可能处理到3状态已经改了也有可能没有改。另外还在等待sum的5恢复后会丢失,所以按照数据和状态去存会有很多问题。所以Flink不要存当前正在处理的数据而是保证所有的操作把同一个数据处理完。
2 检查点恢复状态
在执行流应用程序期间,Flink 会定期保存状态的一致检性查点。如果发生故障, 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程。就要从之前存盘的检查点恢复,从新读取数据处理,流程如下:
(1)首先重启应用,外部系统不知道,所有状态都是空的
(2)然后从checkpoint恢复状态重置,这个是8处理完的时间点,source那里保存了偏移量,需要给数据源那里重新提交偏移量。
(3)开始消费并处理检查点到发生故障之间的所有数据,这样就好像错误没有发生,这就是Flink的checkpoint检查点机制保证了,内存状态的精确一次。因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置
3 Flink 检查点算法
上面说到Flink的checkpoint保存的是所有任务状态的快照,这个状态要求所以逇任务都处理完同一个数据之后的状态。
处理的流程怎样保证它保存状态的时候都保证他是处理完同一份数据呢?在前面例子source完之后就分区了比如8在奇数分区求和是知道数据读进来了,偶数求和分区怎么能知道5进来了呢。假如来的数据有同样的数据怎么判断当前数据是我要处理的数据呢。假设source保存了8的状态,后面的任务怎么知道读完8之后要保存呢?这就需要告诉他哪个数据读完了接下来要保存了,后面需要一个标记,要告诉后面的任务到底什么时候触发状态的保存。
Flink中假设8读完数据之后,在偏移量为8和9的数据之间插入一个标记,现在这个标记就是要让5做完操作之后的状态保存下来,只要看到这个标记就保存下来。我们把这个标记插入到流式处理的过程中,就像Watermark一样当做特殊的数据结构,后面的任务看到这个特殊的数据结构就做保存。
检查点算法的实现一般有两种想法:一是暂停应用,保存状态到检查点再重新恢复应用;二是将检查点的保存和数据处理分开,不暂停应用。Flink的实现是基于Chandy-Lamport算法的分布式快照。
Flink检查点算法的核心就是检查点分界线(Checkpoint Barrier),Barrier可以认为是在source里面要做checkpoint的时候插入的一个特殊数据结构,用来把一条流上的数据按照不同的检查点分开。分界线之前的数据的状态更改会包含在当前分界线所属的检查点,barrier之后的数据状态的更改属于之后的检查点。每个任务遇到Barrier保存自己的状态。如果有并行任务Barrier就广播出去。如果上游也不只一个分区,那就有多个Barrier,要等到所有的Barrier到齐,才能保证之前该去处理保存的数据状态都保存进去了,所以有个Barrier对齐的概念。
检查点算法流程:
(1)JobManager触发Checkpoint,发出一个标记,这个标记会带着一个数,他发送给所有的source任务,source接收到消息之后就会在当前的数据流里面插入Barrier。
(2)source任务见到了barrier就把当前刚处理完的状态(偏移量)保存了,然后把barrier广播往下游发送,同时向JobManager确认检查点已经保存好了
(3)如果上游其中一个分区的barrier到了,接下来要做的是Barrier的对齐,没到齐之前,已经来了barrier的流新的数据又来了不能直接做计算,而是先缓存起来,而barrier还没有到达的上游分区来的数据会被正常处理
(4)上游所有输入分区的barrier都到齐时,任务就将其状态保存到状态后端的检查点中,将barrier继续向下游转发
(5)直到Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕。当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了
4 保存点
保存点(savepoints)是Flink 提供的可以自定义的镜像保存功能,不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作 。
算法和checkpoint一样,比checkpoint多了一点额外的元数据,可以认为是具有额外元数据的checkpoint,区别在于checkpoint是自动创建的,保存点是用户手动触发的
保存点除了可以用于故障恢复外,还可以用于有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用等。
手动创建保存点:
bin/flink savepoint :jobId [:targetDirectory]
创建yarn上的保存点
bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
取消保存点
bin/flink cancel -s [:targetDirectory] :jobId
保存点恢复,和检查点恢复方法一样的
bin/flink run -s :savepointPath [:runArgs]
检查点(checkpoint)的目录是依赖JobID的,每次运行任务都是一个唯一的JobID,所以要找到上一次任务的JobID才能找到检查点。保存点(savepoint)需要手动触发,并且在指定目录下还生成一个唯一的子目录。根据JobID可以在任务失败后,简单的重新执行任务即可恢复到失败前的检查点。