Flink中的容错机制

1 checkpoint

   Flink 故障恢复机制的核心,就是应用状态的一致性检查点checkpoint。

  在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint,处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下来,因为是有状态的流式计算所以除了当前处理的数据之外还应该有当前的状态。因为在状态编程中,我们可能会自定义状态,所以直接保存当前的数据和他的状态是不行的,还要知道在具体的操作流程里面到底执行到哪了,这样的话太复杂了,做不到。其实核心的一点就是要知道当前数据到底处理完没有。Flink提出的是不要保存当前所有的数据了,不管当前处理的数据是什么(如果要考虑就要考虑对应的每一个状态到底改变过没有),就考虑同一个数据,所有任务都处理完之后把那个状态取出来。

  在Spark中是针对RDD做存盘,里面就是数据,现在是怎样的数据全部存到硬盘,故障恢复把数据拿出来重新算一遍,这个想法非常简单,因为Spark是批处理,数据全存下来,恢复的时候全做一遍,这是基于批处理的一种简单实现。在流处理FLink中要想存数据的话要么存全量,要么直接重置偏移量到最开始全部回滚,这个效率太低了。所以把之前做到某一步的状态保存下来。

  有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份快照;这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QpJk3zNR-1596295448157)(C:\资料\flink\笔记\7 状态编程和容错机制\assets\1596201467669.png)]

  假设输入数据是由自然数构成的数据流,source读取之后,按照奇偶分区,sum求和。source里面的状态是保存当前读到哪个数,也就是偏移量,现在是8表示已经处理完8这个数了(有可能在处理第6个数,但是状态是5),此时sum也已经把8这个数处理完了。现在可以做一个快照把这3个状态存起来,存到定义好的状态后端,JobManager进行管理了,会保存当前checkpoint的id,元信息(source对应哪个,sum对应哪个),这里source的状态是当前处理的偏移量,sum状态是之前所有处理完数据之后的累加和。

  为什么不存储数据,而是存储所有任务的状态?假设如果要存储数据的话,有可能source在处理9,8还在去sum的路上,sum有可能还在处理6(也有可能处理完),如果把6,4,3这三个数据存储,这样最后这样恢复的话首先没有状态,source没有偏移量要消费的数据丢掉了,sum之前累加的结果没有记录不行。那么连着状态存起来,有个问题怎么知道当前数据到底要不要重新处理,有可能处理到3状态已经改了也有可能没有改。另外还在等待sum的5恢复后会丢失,所以按照数据和状态去存会有很多问题。所以Flink不要存当前正在处理的数据而是保证所有的操作把同一个数据处理完。

2 检查点恢复状态

  在执行流应用程序期间,Flink 会定期保存状态的一致检性查点。如果发生故障, 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程。就要从之前存盘的检查点恢复,从新读取数据处理,流程如下:

  (1)首先重启应用,外部系统不知道,所有状态都是空的

在这里插入图片描述

  (2)然后从checkpoint恢复状态重置,这个是8处理完的时间点,source那里保存了偏移量,需要给数据源那里重新提交偏移量。

在这里插入图片描述

  (3)开始消费并处理检查点到发生故障之间的所有数据,这样就好像错误没有发生,这就是Flink的checkpoint检查点机制保证了,内存状态的精确一次。因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置

在这里插入图片描述

3 Flink 检查点算法

  上面说到Flink的checkpoint保存的是所有任务状态的快照,这个状态要求所以逇任务都处理完同一个数据之后的状态。

  处理的流程怎样保证它保存状态的时候都保证他是处理完同一份数据呢?在前面例子source完之后就分区了比如8在奇数分区求和是知道数据读进来了,偶数求和分区怎么能知道5进来了呢。假如来的数据有同样的数据怎么判断当前数据是我要处理的数据呢。假设source保存了8的状态,后面的任务怎么知道读完8之后要保存呢?这就需要告诉他哪个数据读完了接下来要保存了,后面需要一个标记,要告诉后面的任务到底什么时候触发状态的保存。

  Flink中假设8读完数据之后,在偏移量为8和9的数据之间插入一个标记,现在这个标记就是要让5做完操作之后的状态保存下来,只要看到这个标记就保存下来。我们把这个标记插入到流式处理的过程中,就像Watermark一样当做特殊的数据结构,后面的任务看到这个特殊的数据结构就做保存。

  检查点算法的实现一般有两种想法:一是暂停应用,保存状态到检查点再重新恢复应用;二是将检查点的保存和数据处理分开,不暂停应用。Flink的实现是基于Chandy-Lamport算法的分布式快照。

  Flink检查点算法的核心就是检查点分界线(Checkpoint Barrier),Barrier可以认为是在source里面要做checkpoint的时候插入的一个特殊数据结构,用来把一条流上的数据按照不同的检查点分开。分界线之前的数据的状态更改会包含在当前分界线所属的检查点,barrier之后的数据状态的更改属于之后的检查点。每个任务遇到Barrier保存自己的状态。如果有并行任务Barrier就广播出去。如果上游也不只一个分区,那就有多个Barrier,要等到所有的Barrier到齐,才能保证之前该去处理保存的数据状态都保存进去了,所以有个Barrier对齐的概念。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UTkZ1J24-1596295448173)(C:\资料\flink\笔记\7 状态编程和容错机制\assets\1596202432713.png)]

  检查点算法流程:

  (1)JobManager触发Checkpoint,发出一个标记,这个标记会带着一个数,他发送给所有的source任务,source接收到消息之后就会在当前的数据流里面插入Barrier。

  (2)source任务见到了barrier就把当前刚处理完的状态(偏移量)保存了,然后把barrier广播往下游发送,同时向JobManager确认检查点已经保存好了

  (3)如果上游其中一个分区的barrier到了,接下来要做的是Barrier的对齐,没到齐之前,已经来了barrier的流新的数据又来了不能直接做计算,而是先缓存起来,而barrier还没有到达的上游分区来的数据会被正常处理

  (4)上游所有输入分区的barrier都到齐时,任务就将其状态保存到状态后端的检查点中,将barrier继续向下游转发

  (5)直到Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕。当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

4 保存点

  保存点(savepoints)是Flink 提供的可以自定义的镜像保存功能,不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作 。

  算法和checkpoint一样,比checkpoint多了一点额外的元数据,可以认为是具有额外元数据的checkpoint,区别在于checkpoint是自动创建的,保存点是用户手动触发的

  保存点除了可以用于故障恢复外,还可以用于有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用等。

  手动创建保存点:

bin/flink savepoint :jobId [:targetDirectory]

  创建yarn上的保存点

bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

  取消保存点

bin/flink cancel -s [:targetDirectory] :jobId

  保存点恢复,和检查点恢复方法一样的

bin/flink run -s :savepointPath [:runArgs]

  检查点(checkpoint)的目录是依赖JobID的,每次运行任务都是一个唯一的JobID,所以要找到上一次任务的JobID才能找到检查点。保存点(savepoint)需要手动触发,并且在指定目录下还生成一个唯一的子目录。根据JobID可以在任务失败后,简单的重新执行任务即可恢复到失败前的检查点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473550.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 698. 划分为k个相等的子集(回溯)

文章目录1. 题目2. 解题1. 题目 给定一个整数数组 nums 和一个正整数 k,找出是否有可能把这个数组分成 k 个非空子集,其总和都相等。 示例 1: 输入: nums [4, 3, 2, 3, 5, 2, 1], k 4 输出: True 说明:…

MySQL中的表中增加删除字段

1增加两个字段: mysql> create table id_name(id int,name varchar(20)); Query OK, 0 rows affected (0.13 sec)mysql> alter table id_name add age int,add address varchar(11); Query OK, 0 rows affected (0.13 sec) Records: 0 Duplicates: 0 Warnin…

Ubuntu下svn 版本管理客户端工具及常用方法

Ubuntu16.04系统下安装RapidSVN版本控制器及配置diff,editor,merge和exploer工具,在Window下我们使用TortoiseSVN(小乌龟),可以很方便地进行查看、比较、更新、提交、回滚等SVN版本控制操作。 在Linux下我们可以使用RapidSVN。RapidSVN是一款轻量级的免费…

Flink的Table API 与SQL的流处理

1 流处理与SQL的区别 Table API和SQL,本质上还是基于关系型表的操作方式;而关系型表、SQL本身,一般是有界的,更适合批处理的场景。所以在流处理的过程中,有一些特殊概念。 SQL流处理处理对象字段元组的有界集合字段元…

LeetCode 833. 字符串中的查找与替换(排序,replace)

文章目录1. 题目2. 解题1. 题目 某个字符串 S 需要执行一些替换操作,用新的字母组替换原有的字母组(不一定大小相同)。 每个替换操作具有 3 个参数:起始索引 i,源字 x 和目标字 y。 规则是:如果 x 从原始…

Json.NET

我前面的一篇博客 Metro应用Json数据处理 介绍了如何使用 DataContractJsonSerializer 类将对象的实例序列化为JSON字符串以及将JSON字符串反序列化为对象的实例的处理方式。而此种处理方式的一个很大的缺点就是要求JSON字符串格式是约定好的,而在很多情况下我们无法…

天池 在线编程 最大得分(DP)

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744275 2. 解题 class Solution { public:/*** param matrix: the matrix* return: the maximum score you can get*/int maximumScore(vector<vector<i…

天池 在线编程 LR String

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744276 2. 解题 class Solution { public:/*** param s: a string* param t: a string* param n: max times to swap a l and a r.* return: return if s can …

天池 在线编程 音乐组合

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744274 2. 解题 对60求余后&#xff0c;0, 30的为 Cn2C_n^2Cn2​&#xff0c;其余的相加等于60的&#xff0c;种类相乘 class Solution { public:/*** param …

java之NIO(Channel,Buffer,Selector)

java之NIO 1 什么是NIO Java NIO (New IO&#xff0c;Non-Blocking IO)是从Java 1.4版本开始引入的一套新的IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO的三大核心部分&#xff1a;通道(Channel)&#xff0c;缓冲区(Buffer), 选择器(Selector)&#xff0c;数据总是从…

LeetCode 1652. 拆炸弹(前缀和)

文章目录1. 题目2. 解题1. 题目 你有一个炸弹需要拆除&#xff0c;时间紧迫&#xff01;你的情报员会给你一个长度为 n 的 循环 数组 code 以及一个密钥 k 。 为了获得正确的密码&#xff0c;你需要替换掉每一个数字。所有数字会 同时 被替换。 如果 k > 0 &#xff0c;将…

MYSQL从入门到精通

SQL是数据库的查询语言&#xff0c;语法结构简单&#xff0c;相信本文会让你从入门到熟练。 掌握SQL后&#xff0c;不论你是产品经理、运营人员或者数据分析师&#xff0c;都会让你分析的能力边界无限拓展。别犹豫了&#xff0c;赶快上车吧&#xff01; SQL最小化的查询结构如下…

LeetCode 1653. 使字符串平衡的最少删除次数(DP)

文章目录1. 题目2. 解题1. 题目 给你一个字符串 s &#xff0c;它仅包含字符 a 和 b​​​​ 。 你可以删除 s 中任意数目的字符&#xff0c;使得 s 平衡 。 我们称 s 平衡的 当不存在下标对 (i,j) 满足 i < j 且 s[i] b 同时 s[j] a 。 请你返回使 s 平衡 的 最少 删除…

LeetCode 1654. 到家的最少跳跃次数(BFS)

文章目录1. 题目2. 解题1. 题目 有一只跳蚤的家在数轴上的位置 x 处。请你帮助它从位置 0 出发&#xff0c;到达它的家。 跳蚤跳跃的规则如下&#xff1a; 它可以 往前 跳恰好 a 个位置&#xff08;即往右跳&#xff09;。它可以 往后 跳恰好 b 个位置&#xff08;即往左跳&…

Maven详解及实例

1 什么是Maven Maven对项目进行模型抽象&#xff0c;充分运用的面向对象的思想&#xff0c;Maven可以通过一小段描述信息来管理项目的构建&#xff0c;报告和文档的软件项目管理工具。Maven 除了以程序构建能力为特色之外&#xff0c;还提供高级项目管理工具。简单的来说Maven是…

Python多线程详解

1、多线程的理解 多进程和多线程都可以执行多个任务&#xff0c;线程是进程的一部分。线程的特点是线程之间可以共享内存和变量&#xff0c;资源消耗少&#xff08;不过在Unix环境中&#xff0c;多进程和多线程资源调度消耗差距不明显&#xff0c;Unix调度较快&#xff09;&…

LeetCode 1655. 分配重复整数(回溯)

文章目录1. 题目2. 解题2.1 回溯1. 题目 给你一个长度为 n 的整数数组 nums &#xff0c;这个数组中至多有 50 个不同的值。 同时你有 m 个顾客的订单 quantity &#xff0c;其中&#xff0c;整数 quantity[i] 是第 i 位顾客订单的数目。请你判断是否能将 nums 中的整数分配给…

Flink的异步I/O及Future和CompletableFuture

1 概述 Flink在做流数据计算时&#xff0c;经常要外部系统进行交互&#xff0c;如Redis、Hive、HBase等等存储系统。系统间通信延迟是否会拖慢整个Flink作业&#xff0c;影响整体吞吐量和实时性。 如需要查询外部数据库以关联上用户的额外信息&#xff0c;通常的实现方式是向数…

LeetCode 1656. 设计有序流(数组)

文章目录1. 题目2. 解题1. 题目 有 n 个 (id, value) 对&#xff0c;其中 id 是 1 到 n 之间的一个整数&#xff0c;value 是一个字符串。不存在 id 相同的两个 (id, value) 对。 设计一个流&#xff0c;以 任意 顺序获取 n 个 (id, value) 对&#xff0c;并在多次调用时 按 …

flask框架+mysql数据库并与前台数据交互

在Flask使用数据库 我们将使用Flask-SQLAlchemy 的扩展来管理数据库。由SQLAlchemy项目提供的&#xff0c;已封装了关系对象映射&#xff08;ORM&#xff09;的一个插件。 ORMs允许数据库程序用对象的方式替代表和SQL语句。面向对象的操作被ORM转化为数据库命令。这样就意味着&…