Flink中的容错机制

1 checkpoint

   Flink 故障恢复机制的核心,就是应用状态的一致性检查点checkpoint。

  在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint,处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下来,因为是有状态的流式计算所以除了当前处理的数据之外还应该有当前的状态。因为在状态编程中,我们可能会自定义状态,所以直接保存当前的数据和他的状态是不行的,还要知道在具体的操作流程里面到底执行到哪了,这样的话太复杂了,做不到。其实核心的一点就是要知道当前数据到底处理完没有。Flink提出的是不要保存当前所有的数据了,不管当前处理的数据是什么(如果要考虑就要考虑对应的每一个状态到底改变过没有),就考虑同一个数据,所有任务都处理完之后把那个状态取出来。

  在Spark中是针对RDD做存盘,里面就是数据,现在是怎样的数据全部存到硬盘,故障恢复把数据拿出来重新算一遍,这个想法非常简单,因为Spark是批处理,数据全存下来,恢复的时候全做一遍,这是基于批处理的一种简单实现。在流处理FLink中要想存数据的话要么存全量,要么直接重置偏移量到最开始全部回滚,这个效率太低了。所以把之前做到某一步的状态保存下来。

  有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份快照;这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QpJk3zNR-1596295448157)(C:\资料\flink\笔记\7 状态编程和容错机制\assets\1596201467669.png)]

  假设输入数据是由自然数构成的数据流,source读取之后,按照奇偶分区,sum求和。source里面的状态是保存当前读到哪个数,也就是偏移量,现在是8表示已经处理完8这个数了(有可能在处理第6个数,但是状态是5),此时sum也已经把8这个数处理完了。现在可以做一个快照把这3个状态存起来,存到定义好的状态后端,JobManager进行管理了,会保存当前checkpoint的id,元信息(source对应哪个,sum对应哪个),这里source的状态是当前处理的偏移量,sum状态是之前所有处理完数据之后的累加和。

  为什么不存储数据,而是存储所有任务的状态?假设如果要存储数据的话,有可能source在处理9,8还在去sum的路上,sum有可能还在处理6(也有可能处理完),如果把6,4,3这三个数据存储,这样最后这样恢复的话首先没有状态,source没有偏移量要消费的数据丢掉了,sum之前累加的结果没有记录不行。那么连着状态存起来,有个问题怎么知道当前数据到底要不要重新处理,有可能处理到3状态已经改了也有可能没有改。另外还在等待sum的5恢复后会丢失,所以按照数据和状态去存会有很多问题。所以Flink不要存当前正在处理的数据而是保证所有的操作把同一个数据处理完。

2 检查点恢复状态

  在执行流应用程序期间,Flink 会定期保存状态的一致检性查点。如果发生故障, 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程。就要从之前存盘的检查点恢复,从新读取数据处理,流程如下:

  (1)首先重启应用,外部系统不知道,所有状态都是空的

在这里插入图片描述

  (2)然后从checkpoint恢复状态重置,这个是8处理完的时间点,source那里保存了偏移量,需要给数据源那里重新提交偏移量。

在这里插入图片描述

  (3)开始消费并处理检查点到发生故障之间的所有数据,这样就好像错误没有发生,这就是Flink的checkpoint检查点机制保证了,内存状态的精确一次。因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置

在这里插入图片描述

3 Flink 检查点算法

  上面说到Flink的checkpoint保存的是所有任务状态的快照,这个状态要求所以逇任务都处理完同一个数据之后的状态。

  处理的流程怎样保证它保存状态的时候都保证他是处理完同一份数据呢?在前面例子source完之后就分区了比如8在奇数分区求和是知道数据读进来了,偶数求和分区怎么能知道5进来了呢。假如来的数据有同样的数据怎么判断当前数据是我要处理的数据呢。假设source保存了8的状态,后面的任务怎么知道读完8之后要保存呢?这就需要告诉他哪个数据读完了接下来要保存了,后面需要一个标记,要告诉后面的任务到底什么时候触发状态的保存。

  Flink中假设8读完数据之后,在偏移量为8和9的数据之间插入一个标记,现在这个标记就是要让5做完操作之后的状态保存下来,只要看到这个标记就保存下来。我们把这个标记插入到流式处理的过程中,就像Watermark一样当做特殊的数据结构,后面的任务看到这个特殊的数据结构就做保存。

  检查点算法的实现一般有两种想法:一是暂停应用,保存状态到检查点再重新恢复应用;二是将检查点的保存和数据处理分开,不暂停应用。Flink的实现是基于Chandy-Lamport算法的分布式快照。

  Flink检查点算法的核心就是检查点分界线(Checkpoint Barrier),Barrier可以认为是在source里面要做checkpoint的时候插入的一个特殊数据结构,用来把一条流上的数据按照不同的检查点分开。分界线之前的数据的状态更改会包含在当前分界线所属的检查点,barrier之后的数据状态的更改属于之后的检查点。每个任务遇到Barrier保存自己的状态。如果有并行任务Barrier就广播出去。如果上游也不只一个分区,那就有多个Barrier,要等到所有的Barrier到齐,才能保证之前该去处理保存的数据状态都保存进去了,所以有个Barrier对齐的概念。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UTkZ1J24-1596295448173)(C:\资料\flink\笔记\7 状态编程和容错机制\assets\1596202432713.png)]

  检查点算法流程:

  (1)JobManager触发Checkpoint,发出一个标记,这个标记会带着一个数,他发送给所有的source任务,source接收到消息之后就会在当前的数据流里面插入Barrier。

  (2)source任务见到了barrier就把当前刚处理完的状态(偏移量)保存了,然后把barrier广播往下游发送,同时向JobManager确认检查点已经保存好了

  (3)如果上游其中一个分区的barrier到了,接下来要做的是Barrier的对齐,没到齐之前,已经来了barrier的流新的数据又来了不能直接做计算,而是先缓存起来,而barrier还没有到达的上游分区来的数据会被正常处理

  (4)上游所有输入分区的barrier都到齐时,任务就将其状态保存到状态后端的检查点中,将barrier继续向下游转发

  (5)直到Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕。当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

4 保存点

  保存点(savepoints)是Flink 提供的可以自定义的镜像保存功能,不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作 。

  算法和checkpoint一样,比checkpoint多了一点额外的元数据,可以认为是具有额外元数据的checkpoint,区别在于checkpoint是自动创建的,保存点是用户手动触发的

  保存点除了可以用于故障恢复外,还可以用于有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用等。

  手动创建保存点:

bin/flink savepoint :jobId [:targetDirectory]

  创建yarn上的保存点

bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

  取消保存点

bin/flink cancel -s [:targetDirectory] :jobId

  保存点恢复,和检查点恢复方法一样的

bin/flink run -s :savepointPath [:runArgs]

  检查点(checkpoint)的目录是依赖JobID的,每次运行任务都是一个唯一的JobID,所以要找到上一次任务的JobID才能找到检查点。保存点(savepoint)需要手动触发,并且在指定目录下还生成一个唯一的子目录。根据JobID可以在任务失败后,简单的重新执行任务即可恢复到失败前的检查点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473550.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

os、os.path、shutil操作文件和文件路径的常用方法总结

os模块是python标准库中的一个用于访问操作系统功能的模块,下面简要介绍一下常用的命令 1、os.name(). 判断现在正在使用的平台,windows返回’nt’,Linux返回‘posix’ 2、os.getcwd() 得到当前工作的目录 3、os.listdir(). 指定所在目…

LeetCode 698. 划分为k个相等的子集(回溯)

文章目录1. 题目2. 解题1. 题目 给定一个整数数组 nums 和一个正整数 k,找出是否有可能把这个数组分成 k 个非空子集,其总和都相等。 示例 1: 输入: nums [4, 3, 2, 3, 5, 2, 1], k 4 输出: True 说明:…

Linux网络服务器epoll模型的socket通讯的实现(一)

准备写一个网络游戏的服务器的通讯模块&#xff0c;参考网上看到的一些代码&#xff0c;在linux下面实现一个多线程的epoll模型的socket通讯的代码,以下是第一部分多线程的切换代码: 1 #include <stdio.h>2 #include <sys/types.h>3 #include <sys/epoll.h>…

MySQL中的表中增加删除字段

1增加两个字段&#xff1a; mysql> create table id_name(id int,name varchar(20)); Query OK, 0 rows affected (0.13 sec)mysql> alter table id_name add age int,add address varchar(11); Query OK, 0 rows affected (0.13 sec) Records: 0 Duplicates: 0 Warnin…

Ubuntu编写开机自启动脚本(转载)

From:http://blog.csdn.net/marujunyy/article/details/8466255 1、首先编写一个简单的shell脚本test.sh #! /bin/bash echo "Hello world!" filenamedate"%Y%m%d" echo $filename 2、设置脚本开机自启动 方法一&#xff1a; 编辑/etc/init.d/rc.local文件…

Ubuntu下svn 版本管理客户端工具及常用方法

Ubuntu16.04系统下安装RapidSVN版本控制器及配置diff,editor,merge和exploer工具&#xff0c;在Window下我们使用TortoiseSVN(小乌龟)&#xff0c;可以很方便地进行查看、比较、更新、提交、回滚等SVN版本控制操作。 在Linux下我们可以使用RapidSVN。RapidSVN是一款轻量级的免费…

Flink的Table API 与SQL介绍及调用

1 概述 DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的&#xff0c;Table API和SQL是在DateStreamAPI上又包了一层。对于新版本的Blink在DateStream基础上又包了一层实现了批流统一&#xff0c;上层执行环境都是基于流处理&#xff0c;做批流统一的查询。T…

Python编程中一些异常处理的小技巧

编程中经常会需要使用到异常处理的情况&#xff0c;在阅读了一些资料后&#xff0c;整理了关于异常处理的一些小技巧记录如下。 1 如何自定义异常 1.1 定义异常类 在实际编程中&#xff0c;有时会发现Python提供的内建异常的不够用&#xff0c;我们需要在特殊业务场景下的异常…

Flink的Table API 与SQL的流处理

1 流处理与SQL的区别 Table API和SQL&#xff0c;本质上还是基于关系型表的操作方式&#xff1b;而关系型表、SQL本身&#xff0c;一般是有界的&#xff0c;更适合批处理的场景。所以在流处理的过程中&#xff0c;有一些特殊概念。 SQL流处理处理对象字段元组的有界集合字段元…

LeetCode 833. 字符串中的查找与替换(排序,replace)

文章目录1. 题目2. 解题1. 题目 某个字符串 S 需要执行一些替换操作&#xff0c;用新的字母组替换原有的字母组&#xff08;不一定大小相同&#xff09;。 每个替换操作具有 3 个参数&#xff1a;起始索引 i&#xff0c;源字 x 和目标字 y。 规则是&#xff1a;如果 x 从原始…

Json.NET

我前面的一篇博客 Metro应用Json数据处理 介绍了如何使用 DataContractJsonSerializer 类将对象的实例序列化为JSON字符串以及将JSON字符串反序列化为对象的实例的处理方式。而此种处理方式的一个很大的缺点就是要求JSON字符串格式是约定好的&#xff0c;而在很多情况下我们无法…

MySQL如何跨机器迁移数据?

经常会遇到如此需求&#xff0c;需把A主机上的MySQL数据库所有迁移到B主机上&#xff0c;或者部分数据库&#xff0c;所以接下来将介绍迁移所有数据库和迁移单个数据库时的数据迁移步骤。 1 实验环境 A主机&#xff08;源主机&#xff09;&#xff1a; IP地址&#xff1a;19…

ClickHouse的特性及读写

1 ClickHouse特性 OLAP数据库一般有2个要求&#xff1a;①容量要比关系型数据库大&#xff0c;②在线查询的速度要快。ClickHouse这两点都满足并且还支持标准的sql&#xff0c;支持比较复杂的语句&#xff0c;支持分布式。ClickHouse的几个显著特点如下&#xff1a; &#xff0…

天池 在线编程 最大得分(DP)

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744275 2. 解题 class Solution { public:/*** param matrix: the matrix* return: the maximum score you can get*/int maximumScore(vector<vector<i…

imagick用法!

https://coderwall.com/p/9hj97w sudo apt-get install imagemagick sudo apt-get install php5-imagick sudo service apache2 restart 使用imagick类&#xff1a; http://www.wodezhan.cn/?p15转载于:https://www.cnblogs.com/vincedotnet/p/3592957.html

天池 在线编程 LR String

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744276 2. 解题 class Solution { public:/*** param s: a string* param t: a string* param n: max times to swap a l and a r.* return: return if s can …

Python中如何在一行里获取多个异常

我知道这样&#xff1a; try:# 可能错的地方 except:# 如果错了执行这里也知道这样&#xff1a; try:# 可能错的地方 except IDontLikeYourFaceException:# 给爷笑一个 except YouAreTooShortException:# 踩高跷但是我想在两个不同的异常里做同样的事&#xff0c;我能想到的办法…

DolphinScheduler对比Airflow

DolphinSchedulerAirFlow稳定性单点故障去中心化的多Master和多Worke是&#xff08;单一调度程序&#xff09;HA额外要求不需要(本身就支持HA)Celery / Dask / Mesos Load Balancer DB过载处理任务队列机制&#xff0c;单个机器上可调度的任务数量可以灵活配置&#xff0c;当…

Python中字符串格式化:%和format

Python2.6推出了[str.format()]方法&#xff0c;和原有的%格式化方式有小小的区别。那个方法更好&#xff1f; 下面的方法有同样的输出&#xff0c;它们的区别是什么&#xff1f; #!/usr/bin/pythonsub1 "python string!"sub2 "an arg"a "i am a …

jsAutomation 服务器不能创建对象(转)

var ExApp new ActiveXObject("Excel.Application") “automation服务器不能创建对象”的问题的解决方案大全本人工作中的应用系统都是jsp的&#xff0c;大量javascript程序&#xff0c;一旦出“automation服务器不能创建对象”问题&#xff0c;大量报表及查询无法保…