17、Flink 的 Checkpointing 配置详解

Checkpointing
1.概述

Flink 中的每个方法或算子都能够是有状态的,状态化的方法在处理单个 元素/事件 的时候存储数据,为了让状态容错,Flink 需要为状态添加 checkpoint(检查点)

2.开启与配置 Checkpoint

默认 checkpoint 是禁用的,通过调用 StreamExecutionEnvironmentenableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

checkpoint 其它属性包括

  • checkpoint 存储: 可以设置检查点快照的持久化位置,默认使用 JobManager 堆内存,建议在生产中使用持久性文件系统。

  • 精确一次(exactly-once)对比至少一次(at-least-once):可以在 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入 checkpoint 模式,推荐精确一次,至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。

  • checkpoint 超时:如果 checkpoint 执行的时间超过了配置的阈值,进行中的 checkpoint 会被抛弃。

  • checkpoints 之间的最小时间:在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展,如果值设为 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。

    使用 checkpoints 之间的最小时间,在 checkpoint 的执行时间超过平均值时不会受到影响(例如目标存储系统忽然变得很慢)这个值也意味着并发 checkpoint 的数目是一

  • checkpoint 可容忍连续失败次数:可容忍多少次连续的 checkpoint 失败,超过阈值之后会触发作业错误 fail over,默认次数为 0,不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over;可容忍的 checkpoint 失败情形:Job Manager的IOException,TaskManager 做 checkpoint 时异步部分的失败, checkpoint 超时等;TaskManager 做 checkpoint 时同步部分的失败会直接触发作业fail over;其它的 checkpoint 失败(如一个 checkpoint 被另一个 checkpoint 包含)会被忽略掉。

  • 并发 checkpoint 的数目: 默认在上一个 checkpoint 未完成(失败或成功)的情况下,系统不会触发另一个 checkpoint,以便拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程;允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法调用比较耗时的外部服务),该选项不能和 “checkpoints 间的最小时间” 同时使用。

  • externalized checkpoints: 配置周期存储 checkpoint 到外部系统中,Externalized checkpoints 将元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。

  • 非对齐 checkpoints: 启用非对齐 checkpoints 以便在背压时大大减少创建 checkpoint 的时间,仅适用于精确一次(exactly-once)checkpoints 并且只有一个并发检查点。

  • 部分任务结束的 checkpoints: 默认,即使 DAG 的部分已经处理完它们的所有记录,Flink也会继续执行 checkpoints。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);// 高级选项:// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 开启 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
3.相关配置选项
KeyDefaultTypeDescription
state.backend.incrementalfalseBoolean配置状态后端是否应创建增量检查点(如果可能);对于增量检查点,只存储与前一个检查点的diff,而不是完整的检查点状态,启用后,web UI中显示的或从其余API获取的状态大小仅表示增量检查点大小,而不是完整检查点大小,某些状态后端可能不支持增量检查点并忽略此选项。
state.backend.local-recoveryfalseBoolean配置状态后端的本地恢复,默认禁用,本地恢复目前只支持keyed state backends(包括EmbeddedRocksDBStateBackend和HashMapStateBackend)。
state.checkpoint-storage(none)String配置检查点存储,可通过名称指定 [“jobmanager” 或“filesystem”],也可以通过“CheckpointStorageFactory”的类名指定。
state.checkpoint.cleaner.parallel-modetrueBoolean是否使用传递到 cleaner 中的 ExecutorService 并行丢弃检查点的状态
state.checkpoints.create-subdirtrueBoolean是否在“state.checkpoints.dir”下创建以作业id命名的子目录,以存储检查点的数据文件和元数据,默认为true。
state.checkpoints.dir(none)String默认目录,用于在Flink支持的文件系统中存储检查点的数据文件和元数据,存储路径必须可从所有参与的进程/节点(即所有TaskManager和JobManager)访问。
state.checkpoints.num-retained1Integer要保留的已完成检查点的最大数量。
state.savepoints.dir(none)String保存点的默认目录,由将存储点写入文件系统的状态后端(HashMapStateBackend、EmbeddedRocksDBStateBackend)使用。
state.storage.fs.memory-threshold20 kbMemorySize状态数据文件的最小大小,所有小于的状态块都内联存储在根检查点元数据文件中,此配置的最大内存阈值为1MB。
state.storage.fs.write-buffer-size4096Integer写入文件系统的检查点流的写入缓冲区的默认大小,实际写入缓冲区大小被确定为此选项和选项“state.storage.fs.memory threshold”的最大值。
taskmanager.state.local.root-dirs(none)Stringconfig 参数定义根目录,存储用于本地恢复的基于文件的状态;本地恢复目前只覆盖keyed state backends,如果未配置,它将默认为<WORKING_DIR>/localState,<WORKING_DIR>可以通过process.taskmanager.WORKING-dir进行配置
4.配置 State Backend

Flink 的 checkpointing 机制会将 timer 以及 stateful 的 operator 进行快照,然后存储下来, 包括连接器(connectors),窗口(windows)以及用户自定义的状态。

Checkpoint 存储在哪里取决于配置的 State Backend(比如 JobManager memory、 file system、 database)。

默认情况下,状态是保存在 TaskManagers 的内存中,checkpoint 保存在 JobManager 的内存中,为了持久化大体量状态, Flink 支持存储 checkpoint 状态到其他的 state backends 上。

Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
env.configure(config);
5.迭代作业中的状态和 checkpoint

Flink 现在为没有迭代(iterations)的作业提供一致性的处理保证,在迭代作业上开启 checkpoint 会导致异常。

为了在迭代程序中强制进行 checkpoint,用户需要在开启 checkpoint 时设置一个特殊的标志:

 env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true);

请注意在环形边上游走的记录(以及与之相关的状态变化)在故障时会丢失。

6.部分任务结束后的 Checkpoint
a)概述

从 1.14 版本开始 Flink 支持在部分任务结束后继续进行Checkpoint,如果一部分数据源是有限数据集,那么就可以。

从 1.15 版本开始,这一特性被默认打开,如果想要关闭这一功能,可以执行:

Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

此时,结束的任务不会参与 Checkpoint 的过程,在实现自定义的算子或者 UDF (用户自定义函数)时需要考虑这一点。

为了支持部分任务结束后的 Checkpoint 操作,调整了 任务的生命周期 并且引入了 StreamOperator#finish 方法,在这一方法中,用户需要写出所有缓冲区中的数据。

在 finish 方法调用后的 checkpoint 中,这一任务不能再有缓冲区中的数据,因为在 finish() 后没有办法输出这些数据,大部分情况下,finish() 后这一任务的状态为空,唯一的例外是如果其中某些算子中包含外部系统事务的句柄(例如为了实现恰好一次语义), 在这种情况下,在 finish() 后进行的 checkpoint 操作应该保留这些句柄,并且在结束 checkpoint(即任务退出前所等待的 checkpoint)时提交。

b)对 operator state 的影响

在部分 Task 结束后的 checkpoint 中,Flink 对 UnionListState 进行了特殊的处理, UnionListState 一般用于实现对外部系统读取位置的一个全局视图(例如,用于记录所有 Kafka 分区的读取偏移)。

如果在算子的某个并发调用 close() 方法后丢弃它的状态,就会丢失它所分配的分区的偏移量信息,为了解决这一问题,对于使用 UnionListState 的算子,只允许在它的并发都在运行或都已结束的时候才能进行 checkpoint 操作。

ListState 一般不会用于类似的场景,仍然需要注意在调用 close() 方法后进行的 checkpoint 会丢弃算子的状态并且 这些状态在算子重启后不可用。

任何支持并发修改操作的算子也可以支持部分并发实例结束后的恢复操作,从这种类型的快照中恢复等价于将算子的并发改为正在运行的并发实例数。

c)任务结束前等待最后一次 Checkpoint

为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 finish() 方法后等待下一次 checkpoint 成功后退出。

注意:这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显,极端情况下,如果 checkpoint 的周期被设置为 Long.MAX_VALUE,那么任务永远不会结束,因为下一次 checkpoint 不会进行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/11375.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

指针读取文件(简单、干净的知识点罗列)

一.基本操作 FILE *fp;//定义一个文件指针 fpfopen("打开文件名","打开方式");//把文件指针具体的去指向一个文件 //然后对文件进行操作 fclose(fp);//要关闭文件 二.对文件操作的函数 Ⅰ: 1&#xff09;fgetc函数&#xff1a; 功能&#xff1a;从f…

Vue2前端增加上下文

需求描述: 1- 项目部署时存在两个Nginx, 一个Nginx支持vue项目的基本配置, 但是项目入口要配置在上一层Nginx上, 而且上一层Nginx包装了一层上下文; 2- 页面跳转时要保持上一层Nginx的上下文; 3- URL可以正常刷新 方案一: 上层Nginx的上下文转发时不转发到下层Nginx, 只做一…

git中的rebase命令与merge命令

使用git rebase的情境 使用rebase的原因&#xff1a;在 Git 中使用 rebase 命令通常是为了将一系列的更改从一个分支重新应用到另一个分支上。这样做主要是基于下面这种情境&#xff1a; 假设你正在一个特性分支上开发&#xff08;例如 feature 分支&#xff09;&#xff0c;而…

海外静态IP购买指南:探索全球网络连接的奥秘

在数字化时代&#xff0c;互联网连接的重要性不言而喻。对于企业和个人而言&#xff0c;拥有稳定、高速的网络连接是成功的关键。而在特定应用场景下&#xff0c;如远程办公、跨境电商、服务器托管等&#xff0c;海外静态IP地址的需求日益凸显。本文将为您详细解读海外静态IP购…

object.key()用法

object.key(obj) 一、概念&#xff1a;返回一个由一个给定对象的自身可枚举属性组成的数组。 二、用法&#xff1a; 1、参数为对象&#xff1a;则返回为 对象属性名组成的数组。 let obj {日期&#xff1a;date,姓名&#xff1a;userName,地址:address}console.log(Object.k…

国产化开源鸿蒙系统智能终端RK3568主板在电子班牌项目的应用

国产化开源鸿蒙系统智能终端主板AIoT-3568A、人脸识别算法的的电子班牌方案可支持校园信息发布、人脸识别考勤、考场管理、查询互动等多项功能&#xff0c;助力学校在硬件上实现信息化、网络化、数字化&#xff0c;构建“学校、教师、学生”三个维度的智慧教育空间。 方案优势 …

HTTP调用API框架Forest

Forest是一个高层的、极简的声明式HTTP调用API框架 相比于直接使用Httpclient您不再用写一大堆重复的代码了&#xff0c;而是像调用本地方法一样去发送HTTP请求 maven <dependency><groupId>com.dtflys.forest</groupId><artifactId>forest-spring-b…

事件高级。

一、注册事件&#xff08;绑定事件&#xff09; 就是给元素添加事件 注册事件有两种方式&#xff1a;传统方式和方法监听注册方式 1 传统注册方式 方法监听注册事件 2、 addEventListener 事件监听方式 里面的事件类型是字符串&#xff0c;必定加引号&#xff0c;而且不带o…

交流负载箱:电力系统的智能升级

随着科技的不断发展&#xff0c;电力系统也在不断地进行升级和改进。在这个过程中&#xff0c;交流负载箱作为一种新型的电力设备&#xff0c;为电力系统的智能升级提供了有力的支持。本文将对交流负载箱在电力系统中的应用及其优势进行简要分析。 首先&#xff0c;交流负载箱…

Flutter 中的 Image 小部件:全面指南

Flutter 中的 Image 小部件&#xff1a;全面指南 在 Flutter 中&#xff0c;Image 小部件用于展示图片。Flutter 提供了多种方式来加载和展示图片&#xff0c;包括从本地资源、网络 URL 或者通过其他方式获取的图片流。Image 小部件是 Flutter 中显示图片的基础&#xff0c;并…

springboot本地文件同步到nacos 本地文件上传到nacos 使用nacos

导入依赖 <!--配置文件使用nacos--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId><version>nacos版本</version></dependency><dependency…

Linux 操作系统MySQL 数据库APL函数

1 MYSQL 相关API函数 在使用API函数之前需要提前使用指令创建一个数据库 函数功能&#xff1a;链接一个数据库 函数头文件&#xff1a; <mysql/mysql.h> 链接库&#xff1a; -lmysqlclient 函数的原型&#xff1a; MYSQL *mysql_real_connect(MYSQL *mysql,const …

Wifi——Wifi断连问题分析

一、iperf测试wifi断连 1.信号强度差 -36表示非常强&#xff1b;但网络质量依然非常差。 可以分析出四个原因&#xff1a; 2.与throughput相关 为什么同一个网络的信号强度估算会有一定差异&#xff1f;&#xff01; 下图是上述log的一些信息&#xff1a;

如何在Python中自定义异常?

在Python中自定义异常类型是一个简单而强大的特性&#xff0c;它允许开发者创建特定于应用的异常&#xff0c;从而提供更清晰的错误处理逻辑。以下是如何在Python中自定义异常的详细步骤&#xff1a; ### 1. 理解内置异常层次结构 Python有一个内置的异常层次结构&#xff0c;…

TypeScript 类型系统深度解析:类型全览

TypeScript 类型系统深度解析&#xff1a;类型全览 引言 TypeScript 类型系统是其核心特性之一&#xff0c;它为 JavaScript 提供了静态类型检查的能力。这不仅帮助开发者在编译时发现错误&#xff0c;还增强了代码的可读性和可维护性。本文将为您详细介绍 TypeScript 中的各…

高考志愿系统-信息管理模块:院校信息分析

信息模块包括三个信息实体&#xff1a;招生学校&#xff0c;专业&#xff0c;分数线。 学校实体中有一个叫院校代码的属性&#xff0c;专业实体中含有院校代码这个属性&#xff0c;属于外键&#xff0c;一个学校有多个专业&#xff0c;所以学校和专业属于一对多关系。 专业实…

【Python】使用PyTorch训练一个手写数字识别模型(MNIST)

文章目录 1. 准备工作2. 训练网络3. 测试网络4. 训练和测试循环5. 模型保存6. 最终完整代码7. 结果截图使用PyTorch训练一个手写数字识别模型(MNIST) 在这篇博客中,使用了PyTorch构建一个简单的神经网络来识别手写数字。将使用MNIST数据集,这是一个经典的机器学习基准数据集…

KBPC2510-ASEMI开关电源整流方桥KBPC2510

编辑&#xff1a;ll KBPC2510-ASEMI开关电源整流方桥KBPC2510 型号&#xff1a;GBU810 品牌&#xff1a;ASEMI 封装&#xff1a;KBPC-4 正向电流&#xff08;Id&#xff09;&#xff1a;25A 反向耐压&#xff08;VRRM&#xff09;&#xff1a;1000V 正向浪涌电流&#x…

阅读送书抽奖?玩转抽奖游戏,js-tool-big-box工具库新上抽奖功能

先讨论一个问题&#xff0c;你做软件工作是为了什么&#xff1f;从高中选专业&#xff0c;就喜欢上了软件开发&#xff1f;还是当初毕业不知道干啥&#xff0c;不喜欢自己的专业&#xff0c;投入软件开发的怀抱&#xff1f;还是干着干着别的&#xff0c;突然觉得互联网行业真不…

WordPress中插入视频的两种方法详解

最近我在建设WordPress网站的时候需要上传视频&#xff0c;我使用的是Hostease的主机安装的WordPress&#xff0c;随后在咨询了他们的技术支持后获得了一些解决方法。下面将介绍WordPress中插入视频的两种方法&#xff1a;本地上传和外部引用。 本地上传视频 使用WordPress的古…