Flink状态存储-StateBackend

文章目录

  • 前言
  • 一、MemoryStateBackend
  • 二、FSStateBackend
  • 三、RocksDBStateBackend
  • 四、StateBackend配置方式
  • 五、状态持久化
  • 六、状态重分布
          • OperatorState 重分布
          • KeyedState 重分布
  • 七、状态过期


前言

Flink是一个流处理框架,它需要对数据流进行状态管理以支持复杂的计算逻辑。在Flink中,状态存储是指如何和在哪里存储这些状态数据。Flink提供了多种状态后端(State Backend)来实现这种存储,以满足不同的应用场景和性能需求。 StateBackend需要具备如下两种能力:
1、在计算过程中提供访问 State 的能力,开发者在编写业务逻辑中能够使用 StateBackend 的接口读写数据。
2、能够将 State 持久化到外部存储,提供容错能力。
根据使用场景的不同, Flink 内置了 3 种 StateBackend 。其体系结构如下图所示。
在这里插入图片描述
纯内存:MemoryStateBackend,适用于验证、测试,不推荐生产环境。
内存+文件:FsStateBackend,适用于长周期大规模的数据。
RocksDB:RocksDBStateBackend,适用于长周期大规模的数据。

在运行时,MemoryStateBackend 和 FsStateBackend 本地的 State 都保存在 TaskManager 的内存中,所以其底层依赖于 HeapKeyedStateBackend。HeapKeyedStateBackend 面向 Flink 引擎内部,使用者无须感知。


一、MemoryStateBackend

默认情况下,状态信息是通过MemoryStateBackend 存储在 TaskManager 的堆内存中的, KV 类型的State,窗口算子的 State 使用 HashTable 来保存数据、触发器等。执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中。 MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。

基于内存的 StateBackend 在生产环境下不建议使用,可以在本地开发调试测试 。
注意点如下 :

  • State 存储在 JobManager 的内存中,受限于 JobManager 的内存大小。
  • 每个 State 默认 5MB,可通过 MemoryStateBackend 构造函数调整。
  • 每个 State 不能超过 Akka Frame 大小。

二、FSStateBackend

文件型状态存储 FSStateBackend,运行时所需的 State 数据全部保存在 TaskManager 的内存中, 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中,如使用 HDFS 的路径为 “hdfs://namenode:40010/flink/checkpoints”,使用本地文件系统的路径为:“file:///data/flink/checkpoints”。

FSStateBackend 适用于处理大状态、长窗口,或大键值状态的有状态处理任务。
缺点:
状态大小受TaskManager内存限制(默认支持5M)
优点:
状态访问速度很快
状态信息不会丢失
用于: 生产,也可存储状态数据量大的情况

三、RocksDBStateBackend

RocksDBStateBackend 跟内存型和文件型 StateBackend 不同,其使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于 TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中,在 JobManager 内存中会存储少量的检查点元数据。RocksDB 克服了 State 受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。 但是 RocksDBStateBackend 相比基于内存的 StateBackcnd ,访问 State 的成本高很多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10。

适用场景:
最适合用于处理大状态、长窗口,或大键值状态的有状态处理任务。
RocksDBStateBackend 非常适合用于高可用方案。
RocksDBStateBackend 是目前唯一支持增量检查点的后端,增量检查点非常适用于超大状态的场景。
注意点

  • 总 State 大小仅限于磁盘大小,不受内存限制。
  • RocksDBStateBackend 也需要配置外部文件系统,集中保存 State。
  • RocksDB的 JNI API 基于byte数组,单 key 和单 Value 的大小不能超过 231 字节。
  • 对于使用具有合并操作状态的应用程序,如 ListState ,随着时间可能会累积到超过 231 字节大小,这将会导致在接下来的查询中失败。

四、StateBackend配置方式

  • 单任务调整
修改当前任务代码
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(newFsStateBackend("hdfs://namenode:9000/flink/checkpoints"));或者new MemoryStateBackend()或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
}
  • 全局调整(不建议)
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend),
filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

五、状态持久化

StateBackend 中的数据最终需要持久化到第三方存储中,确保集群故障或者作业故障能够恢复。 HeapSnapshotStrategy 策略对应于 HeapKeyedStateBackend,RocksDBStateBackend 的持久化策略有两种:全量持久化策略(RocksFullSnapshotStrategy)和 增量持久化策略 (RocksIncementalSnapshotStrategy)。

1、全量持久化策略
全盘持久化,也就是说每次把全量的 Slate 写人到状态存储中 (如 HDFS)。内存型、文件型、 RocksDB 类型的 StatcBackend 支持全量持久化策略。 在执行持久化策略的时候,使用异步机制,每个算子启动 1 个独立的线程,将自身的状态写入分布式存储中。在做持久化的过程中,状态可能会被持续修改,基于内存的状态后端使用 CopyOnWriteStateTable 来保证线程安全,RocksDBStateBackend 则使用 RocksDB 的快照机制,使用快照来保证线程安全。

2、增量持久化策略
增量持久化就是每次持久化增量的 State,只有 RocksDBStateBackend 支持增量持久化。Flink 增量式的检查点以 RocksDB 为基础, RocksDB 是一个基于 LSM-Tree 的 KV 存储。新的数据保存在内存中, 称为 memtable。如果 Key 相同,后到的数据将覆盖之前的数据,一旦 memtable 写满了,RocksDB 就会将数据压缩并写入磁盘。memtable 的数据持久化到磁盘后,就变成了不可变的 sstable。

因为 sstable 是不可变的,Flink 对比前一个检查点创建和删除的 RocksDB sstable 文件就可以计算出状态有哪些发生改变。

为了确保 sstable 是不可变的,Flink 会在 RocksDB 触发刷新操作,强制将 memtable 刷新到磁盘上 。在 Flink 执行检查点时,会将新的 sstable 持久化到 HDFS 中,同时保留引用。这个过程中 Flink 并不会持久化本地所有的 sstable,因为本地的一部分历史 sstable 在之前的检查点中已经持久化到存储中了,只需增加对 sstable 文件的引用次数就可以。 RocksDB 会在后台合并 sstable 并删除其中重复的数据。然后在 RocksDB 删除原来的 sstable,替换成新合成的 sstable.。新的 sstable 包含了被删除的 sstable中的信息,通过合并历史的 sstable 会合并成一个新的 sstable,并删除这些历史sstable。可以减少检查点的历史文件,避免大量小文件的产生。

六、状态重分布

在实际的生产环绕中,作业预先设置的并行度很多时候并不合理,太多则浪费资源,太少则资源不足,可能导致数据积压延迟变大或者处理时间太长,所以在运维过程中,需要根据作业的运行监控数据调整其并行度。调整并行度的关键是处理 State。回想一下前文中的内容,State 位于算子内,改变了并行度,则意味着算子个数改变了,需要将 State 重新分配给算子。下面从 OperatorState 和 KeyedState 两种 State 角度,介绍如何将 State 重新分配给算子。

OperatorState 重分布

1、ListState
并行度在改变的时候,会将并发上的每个 List 都取出,然后把这些 List 合并到一个新的 List,根据元素的个数均匀分配给新的 Task。

2、UnionListState
比 ListState 更加灵活, 把划分的方式交给用户去做,当改变并发的时候,会将原来的 List 拼接起来,然后不做划分,直接交给用户。

3、BroadcastState
操作 BroadcastState 的 UDF 需要保证不可变性,所以各个算子的同一个 BroadcastState 完全一样。在改变并发的时候,把这些数据分发到新的 Task 即可。

KeyedState 重分布

基于 Key-Group ,每个 Key 隶属于唯一的 Key-Group。Key Group 分配给 Task 实例,每个 Task 至少有 一个 Key-Group 。 Key-Group 数量取决于最大并行度 (MaxParallism) 。 KeyedStream 并发的上限是 Key-Group 的数量,等于最大并行度。

七、状态过期

1、DataStream 中状态过期
可以对 每一个 State 设置 清理策略 StateTtlConfig,可以设置的内容如下:
过期时间:超过多长时间未访问,视为 State 过期,类似于缓存。
过期时间更新策略:创建和写时更新、读取和写时更新。
State 可见性:未清理可用,超时则不可用。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

2、Flink SQL 中状态过期
Flink SQL 在流 Join、聚合类的场景中,使用了 State,如果 State 不定时清理。 则可能会导致 State 过多,内存溢出。 为了稳妥起见,最好为每个 FLink SQL 作业提供 State 清理的策略。如果定时清理 State,则存在可能因为 State 被清理而导致计算结果不完全准确的风险。FLink 的 Table API 和 SQL 接口中提供了参数设置选项,能够让使用者在精确和资源消耗做折中。

StreamQueryConfig qConfig = ... 
//设置过期时间为 min = 12 小时 ,max = 24 小时 
qConfig.withIdleStateRetentionTime(Time.hours(12)Time.hours(24));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/714863.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10个技巧,3分钟教会你高效寻找开源项目

作为程序员&#xff0c;不论是开发还是学习&#xff0c;肯定会用到开源项目&#xff0c;那么怎么快速在开源网站找到这些项目呢&#xff1f; 常用的开源网站有&#xff1a;github 和 gitee github是全球最大的开源社区&#xff0c;今天就以github为例&#xff0c;演示一下 gi…

JavaWeb之 Servlet(2万6千字详解)

目录 前言1. Servlet 简介2. Servlet 前世今生3. Servlet 执行流程4. Servlet 快速入门5. 两种配置 Servlet程序 URL的方式5.1 使用 注解来配置 Servlet程序 的 URL5.1.1 urlPattern 的配置规则精确匹配目录匹配&#xff1a;使用 * 符号代表任意路径扩展名匹配任意匹配 5.1.2 小…

【MATLAB】语音信号识别与处理:SG滤波算法去噪及谱相减算法呈现频谱

1 基本定义 SG 滤波算法&#xff08;Savitzky - Golay 滤波算法&#xff09;是一种数字信号处理算法&#xff0c;用于对信号进行平滑处理。该算法利用最小二乘法拟合局部数据段&#xff0c;然后用拟合的函数来估计每个数据点的值&#xff0c;从而实现平滑处理。 SG 滤波算法的…

redis05 sprngboot整合redis

redis的Java客户端 整合步骤 添加redis的pom依赖 <!-- 引入redis依赖 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency><!-- 引入redis连…

51单片机学习day02

基于普中的stc89c52&#xff0c; 串口&#xff1a; 通讯接口&#xff0c;51单片机自带UART&#xff08;通用异步收发器&#xff09;&#xff0c;可实现窗口通讯。 硬件电路&#xff1a; 简单双向串口通信有两根通信线&#xff08;发送端TXD和接收端RXD&#xff09;&#xff0…

HelixToolKit的模型旋转操作

前面加载了模型以后&#xff0c;鼠标拖动和缩放比较好操作&#xff1b;但是旋转似乎没有&#xff0c; 操作了一阵&#xff0c;也不是没有&#xff0c;应该是还不熟悉&#xff1b; 旋转的指示器在右下角&#xff0c;现在U面看到正面&#xff0c; 想看一下模型的背面&#xff0…

【Java项目介绍和界面搭建】拼图小游戏——添加图片

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏 …

扼杀网络中的环路:STP、RSTP、MSTP

目录 前言&#xff1a; 一、STP&#xff08;Spanning Tree Protocol&#xff09; 1.1 STP功能 1.2 STP应用 二、RSTP&#xff08;Rapid Spanning Tree Protocol&#xff09; 2.1 RSTP功能 2.2 RSTP应用 三、MSTP&#xff08;Multiple Spanning Tree Protocol&#xff0…

Angular 由一个bug说起之四:jsonEditor使用不当造成的bug

一&#xff1a;问题 项目中使用了一个JSON第三方库&#xff1a; GitHub - josdejong/jsoneditor: A web-based tool to view, edit, format, and validate JSON 当用户编辑JSON格式的数据&#xff0c;查找替换时&#xff1a; 用户的期望结果是&#xff1a;$$ 被替换为$$_text&a…

[物联网] OneNet 多协议TCP透传

[物联网] OneNet 多协议TCP透传 STM32物联网–ONENET云平台的多协议接入产品创建 : https://blog.csdn.net/qq_44942724/article/details/134492924 Onenet tcp 透传 : https://blog.csdn.net/flyme2010/article/details/107086001 tcp服务端测试工具 : http://tcp.xnkiot.com/…

zephyr学习

zephyr内核对象学习 定时器 类似linux的定时器&#xff0c; 可以分别设置第一次到期时间和后续的周期触发时间&#xff0c; 可以注册到期回调和停止回调 还有一个计数状态&#xff0c;用于标记timer到期了多少次 duration&#xff1a;设定timer第一次到期的时间。 period: …

【蛀牙】日常生活如何正确护理牙齿?刷牙、洗牙、补牙

程序员生活指南之 【蛀牙】日常生活如何正确护理牙齿&#xff1f;刷牙、洗牙、补牙 文章目录 一、日常如何清洗牙齿&#xff1f;——刷牙与洗牙1、牙齿污垢1.1 牙菌斑1.2 软垢1.3 牙结石1.4 牙龈出血 2、如何刷牙2.1 关于时间2.2 各种工具2.3 巴氏刷牙法 二、定期进行洗牙3、如…

题目 1076: 内部收益率

题目描述: 在金融中&#xff0c;我们有时会用内部收益率IRR来评价项目的投资财务效益&#xff0c;它等于使得投资净现值NPV等于0的贴现率。换句话说&#xff0c;给定项目的期数T、初始现金流CF0和项目各期的现金流CF1, CF2, ...&#xff0c;CFT&#xff0c;IRR是下面方程的解&…

RISC-V特权架构 - 特权模式与指令

RV32/64 特权架构 - 特权模式与指令 1 特权模式2 特权指令2.1 mret&#xff08;从机器模式返回到先前的模式&#xff09;2.2 sret&#xff08;从监管模式返回到先前的模式&#xff09;2.3 wfi&#xff08;等待中断&#xff09;2.4 sfence.vma&#xff08;内存屏障&#xff09; …

SpringBoot+Vue+MySQL:装修管理新架构探索

✍✍计算机毕业编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java、…

FPGA开源项目分享——2D N-Body重力模拟器

​导语 今天继续康奈尔大学FPGA 课程ECE 5760的典型案例分享——2D N-Body重力模拟器。 &#xff08;更多其他案例请参考网站&#xff1a; Final Projects ECE 5760&#xff09; 1. 项目概述 项目网址 Grav Sim 项目说明 该项目的目标是创建一个用DE1-SOC进行硬件加速的2…

简易内存池2 - 华为OD统一考试(C卷)

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 200分 题解&#xff1a; Java / Python / C 题目描述 请实现一个简易内存池,根据请求命令完成内存分配和释放。 内存池支持两种操作命令&#xff0c;REQUEST和RELEASE&#xff0c;其格式为: REQUEST请求的内存大小 …

Redis 【1】—— 安装 与 配置

Redis 【1】—— 安装 与 配置 一、安装 与 配置&#xff08;一&#xff09;使用 yum 安装&#xff08;二&#xff09;创建符号链接1. 软链接2. 相关指令 &#xff08;三&#xff09;修改配置文件&#xff08;四&#xff09;Redis 的启停 一、安装 与 配置 &#xff08;一&…

外贸业务员没客户的7大原因+解决办法!

业务员没有客户&#xff0c;就是无源之水&#xff0c;无本之木&#xff0c;这自然也就没有业绩。那些吃空饷的业务员&#xff0c;迟早会拖垮公司。所以不管是什么原因导致的业务员没客户&#xff0c;都要一一查验清楚。七个业务员没有客户的原因&#xff0c;七种对策&#xff0…

华为数通方向HCIP-DataCom H12-821题库(多选题:21-40)

第21题 管理员在配置 VRRP 时,下面哪些不是必须配置的? A.抢占模式 B.抢占延时 C.虚拟IP 地址 D.虚拟路由器的优先级 【参考答案】ABD 【答案解析】 VRRP的作用之一是提供一个虚拟的IP地址,用作默认网关,用来实现冗余和故障转移。因此,配置虚拟IP地址是必须的。华为设备vr…