Flink中StateBackend(工作状态)与Checkpoint(状态快照)的关系

State Backends

由 Flink 管理的 keyed state 是一种分片的键/值存储,每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。

如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。

Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现:

  • 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,将其状态快照持久化到(分布式)文件系统;
  • 另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:
    • FsStateBackend,将其状态快照持久化到(分布式)文件系统;
    • MemoryStateBackend,它使用 JobManager 的堆保存状态快照。

在这里插入图片描述

当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。

Checkpoint

Flink 定期对每个算子的所有状态进行持久化快照,并将这些快照复制到更持久的地方,例如分布式文件系统。 如果发生故障,Flink 可以恢复应用程序的完整状态并恢复处理,就好像没有出现任何问题一样。

这些快照的存储位置是通过作业_checkpoint storage_定义的。 有两种可用检查点存储实现:一种持久保存其状态快照 到一个分布式文件系统,另一种是使用 JobManager 的堆。

在这里插入图片描述

Flink不同版本StateBackend(状态)与Checkpoint Storage(快照) 关系

在Flink1.14之前StateBackend与Checkpoint Storage 耦合在一起,但在Flink1.14之后把StateBackend与Checkpoint Storage 实现了解耦,使逻辑更加清晰。

Flink1.14之前

  • 基于 RocksDB state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:8020/data/rocksdb/ck", true));//true: 增量checkpoint; false:全量checkpoint
env.setStateBackend(new RocksDBStateBackend("file:///data/rocksdb/ck", true));//本地文件系统
  • 基于heap state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:8020/data/fs/ck"));//远程分布式文件系统
env.setStateBackend(new FsStateBackend("file:///data/fs/ck"));//本地文件系统
  • 基于heap state backend,使用 JobManager 的堆保存状态快照。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend());

Flink1.14之后(推荐使用)

  • 基于 RocksDB state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));//true: 增量checkpoint; false:全量checkpoint
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/data/rocksdb/ck");//远程分布式文件系统
env.getCheckpointConfig().setCheckpointStorage("file:///data/rocksdb/ck");//本地文件系统

flink-conf.yaml配置:

 state.backend: rocksdbstate.checkpoints.dir: hdfs:///checkpoints/
  • 基于heap state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/data/fs/ck");//远程分布式文件系统
env.getCheckpointConfig().setCheckpointStorage("file:///data/fs/ck");//本地文件系统

flink-conf.yaml配置:

 state.backend: hashmapstate.checkpoints.dir: hdfs:///checkpoints/
  • 基于heap state backend,使用 JobManager 的堆保存状态快照。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

flink-conf.yaml配置:

 state.backend: hashmapstate.checkpoint-storage: jobmanager

总结

  • 默认情况下 checkpoint 是禁用的,需要手动开启:

    env.enableCheckpointing(long interval, CheckpointingMode mode)

  • 默认情况下,StateBackend是保持在 TaskManagers 的heap内存中,checkpoint 保存在 JobManager 的内存中。
  • 只有基于 RocksDB state backend的状态快照才支持增量checkpoint,基于heap的并不支持
  • Flink状态分为Keyed State和非keyed State:
    • Keyed State,可以使用RocksDB state backend和heap state backend。 所有支持的状态类型如下所示:

      • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

      • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

      • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

      • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

      • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

    • 非keyed State,不使用 RocksDB state backend,需要保存在内存中,包括:

      • 算子状态 (Operator State);
      • 广播状态 (Broadcast State),尤其需要考虑保证充足的内存;
      • 自定义 Operator State:CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

        void snapshotState(FunctionSnapshotContext context) throws Exception;
        void initializeState(FunctionInitializationContext context) throws Exception;

参考:

Fault Tolerance via State Snapshots

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/657940.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android Studio 安装配置教程 - Windows版

Android Studio下载 安装&#xff1a; 下载&#xff1a; Android Studio Hedgehog | 2023.1.1 | Android Developers (google.cn) 安装&#xff1a; 基本不需要思考跟着走 默认下一步 默认下一步 自定义修改路径&#xff0c;下一步 默认下一步&#xff0c;不勾选 默认下一…

RHCE 综合项目-博客

目录 业务需求 一、准备工作 1、配置静态IP 2、修改主机名及hosts映射 3、开启防火墙 4、时间同步 5、配置免密ssh登录 二、环境搭建 1、Server-web端安装LAMP环境软件 2、Server-NFS-DNS端上传博客软件 3、Server-NFS-DNS端设置NFS共享 三、Server-web设置 1、挂…

【代码随想录-链表】反转链表

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航 檀越剑指大厂系列:全面总结 jav…

OpenCV 2 - 矩阵的掩膜操作

1知识点 1-1 CV_Assert(myImage.depth() == CV_8U); 确保输入图像是无符号字符类型,若该函数括号内的表达式为false,则会抛出一个错误。 1-2 Mat.ptr(int i = 0); 获取像素矩阵的指针,索引 i 表示第几行,从0开始计行数。 1-3 const uchar* current = mylmage.ptr(row); 获得…

day26 节点操作——查找节点

目录 DOM节点查找节点父节点查找子节点查找兄弟关系查找 DOM节点 DOM节点&#xff1a; DOM树里每一个内容都称之为节点 节点类型&#xff1a; 元素节点&#xff1a;所有的标签&#xff0c;比如body、div html是根节点属性节点&#xff1a;所有的属性&#xff0c;比如href、cla…

1.26布雷斯悖论(设计做减法,使效率更高,netlogo模拟),自组织映射神经网络SOM

布雷斯悖论 红色的是普通道路&#xff0c;车越多通行时间越长 假定条件是 均衡状态就是两条路的通行时间相同 纳什均衡并不一定是全局最优 纳什均衡的关键就是单个个体做出改变时&#xff0c;只会使自己的利益受到损失&#xff0c;而不会使其他人发生改变 在达到纳什平衡时&…

让MySQL和Redis数据保持一致的4种策略

1 前言 先阐明一下 MySQL 和 Redis 的关系&#xff1a;MySQL 是数据库&#xff0c;用来持久化数据&#xff0c;一定程度上保证数据的可靠性&#xff1b;Redis 是用来当缓存&#xff0c;用来提升数据访问的性能。 关于如何保证 MySQL 和 Redis 中的数据一致&#xff08;即缓存…

DevSecOps 平台需求来源分析

目录 一、为什么要开展DevSecOps平台建设 1.1 产业发展的角度方面分析 1.2 企业内部角度分析 二、 DevSecOps平台建设需求来源 2.1 从外因看DevSecOps平台建设的需求来源 2.1.1 网络安全和数据合规在国内外快速发展 2.1.2 法规的落地促使安全管理的数字化和平台建设成为刚…

网安文件包含漏洞

文件包含概念&#xff1a; 开发人员通常会把可重复使用的函数写到单个文件中&#xff0c;在使用某些函数时&#xff0c;直接调用此文件&#xff0c;而无需再次编写&#xff0c;这种调用文件的过程一般被称为包含。为了使代码更加灵活&#xff0c;通常会将被包含的文件设置为变…

Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP&#xff1f; Flink CEP即 Flink Complex Event Processing&#xff0c;是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型&#xff0c;即对于无界流中的各种数据(称为事件)&#xff0c;提供一种组合匹配的…

Keepalived + DR 集群

目录 1、Keepalive VRRP 说明 故障切换 工作原理 核心组件 2、Keepalived DR 集群 拓扑规划 前期准备 配置 Httpd 服务 配置 Nginx 服务 配置 LVS 主 node_01 配置 LVS 从 node_02 测试 LVS 集群 测试主备切换 3、Keepalived 脑裂现象 4、Keepalived 心态检测 …

平安健康与中航健康时尚集团携手并进,共创会员制健康管理美好未来

近日&#xff0c;深圳市中航健康时尚集团股份有限公司(以下简称“中航健康时尚”)与平安健康正式达成战略合作。平安健康总裁吴军、中航健康时尚董事长王岚等领导出席签约仪式&#xff0c;就此次战略合作展开深入交流。 据了解&#xff0c;中航健康时尚集团创建于1995年&#x…

内存泄漏的原因及排查方法

&#x1f9d1;‍&#x1f393; 个人主页&#xff1a;《爱蹦跶的大A阿》 &#x1f525;当前正在更新专栏&#xff1a;《VUE》 、《JavaScript保姆级教程》、《krpano》、《krpano中文文档》 ​ ​ ✨ 前言 随着网页应用的逐渐复杂化,内存管理也变得越来越重要。内存泄漏不仅会…

YUDIAN(宇电)温控器参数笔记(二)

没想到啊&#xff0c;时隔3年&#xff0c;我又用到了这个温控器&#xff0c;又来更新一下&#xff0c;因为我刚好要做一个简易的控温系统&#xff0c;类似于恒温水槽。 这个系统大概就是&#xff1a; 温控器用pt100测温&#xff0c;作为输入&#xff0c;输入连接到一个ssr上&a…

Start gtkmm 4 Programming (range controls)_

文章目录 基础解析 Chapter 7. Range Widgets https://gtkmm.org/en/documentation.htmlhttps://gnome.pages.gitlab.gnome.org/gtkmm-documentation/index.html 基础 容器: 容器小部件与其他小部件一样&#xff0c;派生自Gtk::Widget.例如Gtk::Grid可以容纳许多子小部件&…

vue3教程,如何手动获取后端数据(入门到精通3,新人必学篇)

概述&#xff1a;没有后端数据的前端&#xff0c;就失去了灵魂&#xff0c;由于本人没有写后端数据&#xff0c;所有调用黑马的&#xff0c;往下看相信对你会有收获的。 目录 第一步&#xff1a;安装axios 第二步&#xff1a;编写后端访问地址 第三步&#xff1a;编写具体的…

如何更新github上fork的项目(需要一定git基础)

如何更新Fork的项目(需要一定git基础) 前言&#xff1a;本文记录一下自己在github上fork了大佬的开源博客项目https://github.com/tangly1024/NotionNext&#xff0c;如何使用git克隆以及自定义开发和同步合并原项目更新迭代内容的的步骤 如何更新fork的项目(进阶版) 首先你…

解决:ModuleNotFoundError: No module named ‘selenium’

解决&#xff1a;ModuleNotFoundError: No module named ‘selenium’ 文章目录 解决&#xff1a;ModuleNotFoundError: No module named selenium背景报错问题报错翻译报错位置代码报错原因解决方法方法一&#xff0c;直接安装方法二&#xff0c;手动下载安装方法三&#xff0…

数字图像处理(实践篇)三十七 OpenCV-Python 使用SIFT和BFmatcher对两个输入图像的关键点进行匹配实践

目录 一 涉及的函数 二 实践 三 报错处理 使用SIFT(尺度不变特征变换)算法