Flink中的状态管理

1 Flink中的状态

  当数据流中的许多操作只查看一个每次事件(如事件解析器),一些操作会跨多个事件的信息(如窗口操作)。这些操作称为有状态。状态由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态。可以简单的任务状态就是一个本地变量,可以被任务的业务逻辑访问。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WppGyApk-1596294459379)(C:\资料\flink\笔记\7 状态编程和容错机制\assets\1595942654393.png)]

  有些算子有些任务是没有状态的,如map操作,只跟输入数据有关。像窗口操作不管是增量窗口函数还是全窗口函数都要保持里面的信息的,一开始在窗口到达结束时间之前是不输出数据的,所以最后输出数据的时候,他的计算是要依赖之前的,全窗口可以认为是把所有数据都作为状态保存下来。增量聚合窗口来一个聚合一次要保存的是中间聚合状态。像ProcessFunction可以有状态也可以没有状态。

  无状态流处理和有状态流处理的主要区别:无状态流处理分别接收每条输入数据,根据最新输入的数据生成输出数据;有状态流处理会维护状态,根据每条输入记录进行更新,并基于最新输入的记录和当前的状态值生成输出记录,即综合考虑多个事件之后的结果。

需要状态操作的一些例子如下:

  • 应用程序搜索某些事件模式时,状态将存储迄今遇到的事件序列。
  • 每分钟/小时/天聚合事件时,将状态保存挂起的聚合。
  • 在数据流上训练机器学习模型时,状态保存模型参数的当前版本。
  • 需要管理历史数据时,状态允许有效访问过去发生的事件。

2 状态类型

  每个状态都是当前任务去管理维护,每个状态都是和当前算子关联在一起的,如果需要Flink真正的把他管理起来的话在运行时的时候Flink就必须要知道当前状态定义的类型是什么,所以一开始必须注册对应的状态,要有所谓的描述器。Flink有两种基本的状态:Operator State算子状态和Keyed State键控状态,他们的主要区别就是作用范围不一样,算子状态的作用范围就是限定为算子任务(也就是当前一个分区执行的时候,所有数据来了都能访问到状态)。键控状态中并不是当前分区所有的数据都能访问所有的状态,而是按照keyby之后的key做划分,当前key只能访问自己的状态

2.1 Operator State

  每个算子状态绑定到一个并行算子实例,作用范围限定为算子任务,同一并行任务的状态是共享的,并行处理的所有数据都可以访问到相同的状态。Kafka Connector就是使用算子状态的很好的一个例子,Kafka consumer的每个并行实例都维护一个主题分区和偏移,作为算子状态。当并行性发生变化时,算子状态接口支持在并行运算符实例之间重新分配状态。可以有不同的方案来进行这种再分配。

  因为同一个并行任务处理的所有数据都可以访问到当前的状态,所以就相当于本地变量

  算子状态有3种基本数据结构:①列表状态(List state):状态表示为一组数据的列表②联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。③广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。那就可以访问到别的并行子任务的状态。

  算子状态运用的时候可能应用场景没那么多,一般都是keyby之后根据不同的key做分区讨论。如果所有数据来了全部统一处理的话一般还要划分成不同的状态要保存为链表,并行度调整的时候可以根据这个列表拆开,做进一步调整。

  联合列表状态与列表状态的区别:主要是并行度调整状态怎样重新分配,列表状态本身分配的时候直接分配;联合列表状态的话就是把所有元素都联合起来,然后由每个任务自己定义最后留下哪些,也就是自己截取要哪一部分。

2.2 Keyed State

  Keyed State只能在KeyedStream后使用,键控状态总是相对于键,根据键来维护和访问的

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sh5kgIiP-1596294459387)(C:\资料\flink\笔记\7 状态编程和容错机制\assets\1595941884637.png)]

  Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。键控状态基于每个key去管理,一般keyby进行HashCode重分区后基于它自己独享的内存空间就会针对每一个不同的key分别保存一份独立的存储状态,而且接下来来了一个新的数据只能访问自己的状态,不能访问其他key的,Flink会为每一个key维护一个状态。

  Flink的Keyed State支持的数据类型如下:

序号类型说明方法
1ValueState[T]用来保存单个的值ValueState.update(value: T)
ValueState.value()
2ListState[T]保存一个列表ListState.add(value: T)
ListState.addAll(values: java.util.List[T])
ListState.update(values: java.util.List[T])
ListState.get()(注意:返回的是Iterable[T])
3MapState[K, V]保存Key-Value对MapState.get(key: K)
MapState.put(key: K, value: V)
MapState.contains(key: K)
MapState.remove(key: K)
4ReducingState[T]保留一个值,该值表示添加到状态的所有值的汇总,需要用户提供ReduceFunctionReducingState.add(value: T)
ReducingState.get()
5AggregatingState[I, O]保留一个值,该值表示添加到状态的所有值的汇总,需要用户提供AggregateFunctionAggregatingState.add(value: T)
AggregatingState.get()
6FoldingState<T, ACC>保留一个值,该值表示添加到状态的所有值的汇总,需要用户提供FoldFunctionAggregatingState.add(value: T)
AggregatingState.get()

  每个状态都有clear()是清空操作。

  在进行状态编程时需要通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。案例如下:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {private var sum: ValueState[(Long, Long)] = _override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {// access the state valueval tmpCurrentSum = sum.value// If it hasn't been used before, it will be nullval currentSum = if (tmpCurrentSum != null) {tmpCurrentSum} else {(0L, 0L)}// update the countval newSum = (currentSum._1 + 1, currentSum._2 + input._2)// update the statesum.update(newSum)// if the count reaches 2, emit the average and clear the stateif (newSum._1 >= 2) {out.collect((input._1, newSum._2 / newSum._1))sum.clear()}}override def open(parameters: Configuration): Unit = {sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]))}
}object ExampleCountWindowAverage extends App {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3L),(1L, 5L),(1L, 7L),(1L, 4L),(1L, 2L))).keyBy(_._1).flatMap(new CountWindowAverage()).print()// the printed output will be (1,4) and (1,5)env.execute("ExampleManagedState")
}

声明状态操作为:

    sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]))

读取状态为:

    val tmpCurrentSum = sum.value

  更新状态为:

    sum.update(newSum)

3 状态后端

  Flink提供不同的State Backends状态后端,指定如何和在何处存储状态。

  (1)MemoryStateBackend

  状将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,将checkpoint存储在JobManager的内存中

  (2)FsStateBackend

  本地状态存在TaskManager的JVM堆上,checkpoint存到远程的持久化文件系统(FileSystem)上

  (3)RocksDBStateBackend

  将所有状态序列化后,存入本地的RocksDB中存储。

  设置状态后端如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
//val checkpointPath: String = checkpoint_Path
//val backend = new RocksDBStateBackend(checkpointPath)
//env.setStateBackend(backend)env.setStateBackend(new FsStateBackend(YOUR_PATH))
env.enableCheckpointing(1000)
// 配置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473554.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python之日志处理(logging模块)

主要内容 日志相关概念logging模块简介使用logging提供的模块级别的函数记录日志logging模块日志流处理流程使用logging四大组件记录日志配置logging的几种方式向日志输出中添加上下文信息参考文档 一、日志相关概念 日志是一种可以追踪某些软件运行时所发生事件的方法。软件开…

LeetCode 514. 自由之路(记忆化递归 / DP)

文章目录1. 题目2. 解题1. 题目 电子游戏“辐射4”中&#xff0c;任务“通向自由”要求玩家到达名为“Freedom Trail Ring”的金属表盘&#xff0c;并使用表盘拼写特定关键词才能开门。 给定一个字符串 ring&#xff0c;表示刻在外环上的编码&#xff1b;给定另一个字符串 ke…

Flink中的容错机制

1 checkpoint Flink 故障恢复机制的核心&#xff0c;就是应用状态的一致性检查点checkpoint。 在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint&#xff0c;处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下…

LeetCode 698. 划分为k个相等的子集(回溯)

文章目录1. 题目2. 解题1. 题目 给定一个整数数组 nums 和一个正整数 k&#xff0c;找出是否有可能把这个数组分成 k 个非空子集&#xff0c;其总和都相等。 示例 1&#xff1a; 输入&#xff1a; nums [4, 3, 2, 3, 5, 2, 1], k 4 输出&#xff1a; True 说明&#xff1a;…

MySQL中的表中增加删除字段

1增加两个字段&#xff1a; mysql> create table id_name(id int,name varchar(20)); Query OK, 0 rows affected (0.13 sec)mysql> alter table id_name add age int,add address varchar(11); Query OK, 0 rows affected (0.13 sec) Records: 0 Duplicates: 0 Warnin…

Ubuntu下svn 版本管理客户端工具及常用方法

Ubuntu16.04系统下安装RapidSVN版本控制器及配置diff,editor,merge和exploer工具&#xff0c;在Window下我们使用TortoiseSVN(小乌龟)&#xff0c;可以很方便地进行查看、比较、更新、提交、回滚等SVN版本控制操作。 在Linux下我们可以使用RapidSVN。RapidSVN是一款轻量级的免费…

Flink的Table API 与SQL的流处理

1 流处理与SQL的区别 Table API和SQL&#xff0c;本质上还是基于关系型表的操作方式&#xff1b;而关系型表、SQL本身&#xff0c;一般是有界的&#xff0c;更适合批处理的场景。所以在流处理的过程中&#xff0c;有一些特殊概念。 SQL流处理处理对象字段元组的有界集合字段元…

LeetCode 833. 字符串中的查找与替换(排序,replace)

文章目录1. 题目2. 解题1. 题目 某个字符串 S 需要执行一些替换操作&#xff0c;用新的字母组替换原有的字母组&#xff08;不一定大小相同&#xff09;。 每个替换操作具有 3 个参数&#xff1a;起始索引 i&#xff0c;源字 x 和目标字 y。 规则是&#xff1a;如果 x 从原始…

Json.NET

我前面的一篇博客 Metro应用Json数据处理 介绍了如何使用 DataContractJsonSerializer 类将对象的实例序列化为JSON字符串以及将JSON字符串反序列化为对象的实例的处理方式。而此种处理方式的一个很大的缺点就是要求JSON字符串格式是约定好的&#xff0c;而在很多情况下我们无法…

天池 在线编程 最大得分(DP)

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744275 2. 解题 class Solution { public:/*** param matrix: the matrix* return: the maximum score you can get*/int maximumScore(vector<vector<i…

天池 在线编程 LR String

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744276 2. 解题 class Solution { public:/*** param s: a string* param t: a string* param n: max times to swap a l and a r.* return: return if s can …

天池 在线编程 音乐组合

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744274 2. 解题 对60求余后&#xff0c;0, 30的为 Cn2C_n^2Cn2​&#xff0c;其余的相加等于60的&#xff0c;种类相乘 class Solution { public:/*** param …

java之NIO(Channel,Buffer,Selector)

java之NIO 1 什么是NIO Java NIO (New IO&#xff0c;Non-Blocking IO)是从Java 1.4版本开始引入的一套新的IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO的三大核心部分&#xff1a;通道(Channel)&#xff0c;缓冲区(Buffer), 选择器(Selector)&#xff0c;数据总是从…

LeetCode 1652. 拆炸弹(前缀和)

文章目录1. 题目2. 解题1. 题目 你有一个炸弹需要拆除&#xff0c;时间紧迫&#xff01;你的情报员会给你一个长度为 n 的 循环 数组 code 以及一个密钥 k 。 为了获得正确的密码&#xff0c;你需要替换掉每一个数字。所有数字会 同时 被替换。 如果 k > 0 &#xff0c;将…

MYSQL从入门到精通

SQL是数据库的查询语言&#xff0c;语法结构简单&#xff0c;相信本文会让你从入门到熟练。 掌握SQL后&#xff0c;不论你是产品经理、运营人员或者数据分析师&#xff0c;都会让你分析的能力边界无限拓展。别犹豫了&#xff0c;赶快上车吧&#xff01; SQL最小化的查询结构如下…

LeetCode 1653. 使字符串平衡的最少删除次数(DP)

文章目录1. 题目2. 解题1. 题目 给你一个字符串 s &#xff0c;它仅包含字符 a 和 b​​​​ 。 你可以删除 s 中任意数目的字符&#xff0c;使得 s 平衡 。 我们称 s 平衡的 当不存在下标对 (i,j) 满足 i < j 且 s[i] b 同时 s[j] a 。 请你返回使 s 平衡 的 最少 删除…

LeetCode 1654. 到家的最少跳跃次数(BFS)

文章目录1. 题目2. 解题1. 题目 有一只跳蚤的家在数轴上的位置 x 处。请你帮助它从位置 0 出发&#xff0c;到达它的家。 跳蚤跳跃的规则如下&#xff1a; 它可以 往前 跳恰好 a 个位置&#xff08;即往右跳&#xff09;。它可以 往后 跳恰好 b 个位置&#xff08;即往左跳&…

Maven详解及实例

1 什么是Maven Maven对项目进行模型抽象&#xff0c;充分运用的面向对象的思想&#xff0c;Maven可以通过一小段描述信息来管理项目的构建&#xff0c;报告和文档的软件项目管理工具。Maven 除了以程序构建能力为特色之外&#xff0c;还提供高级项目管理工具。简单的来说Maven是…

Python多线程详解

1、多线程的理解 多进程和多线程都可以执行多个任务&#xff0c;线程是进程的一部分。线程的特点是线程之间可以共享内存和变量&#xff0c;资源消耗少&#xff08;不过在Unix环境中&#xff0c;多进程和多线程资源调度消耗差距不明显&#xff0c;Unix调度较快&#xff09;&…

LeetCode 1655. 分配重复整数(回溯)

文章目录1. 题目2. 解题2.1 回溯1. 题目 给你一个长度为 n 的整数数组 nums &#xff0c;这个数组中至多有 50 个不同的值。 同时你有 m 个顾客的订单 quantity &#xff0c;其中&#xff0c;整数 quantity[i] 是第 i 位顾客订单的数目。请你判断是否能将 nums 中的整数分配给…