详解 Flink 的状态管理

一、Flink 状态介绍

1. 流处理的无状态和有状态

  • 无状态的流处理:根据每一次当前输入的数据直接转换输出结果的过程,在处理中只需要观察每个输入的独立事件。例如, 将一个字符串类型的数据拆分开作为元组输出或将每个输入的数值加 1 后输出。Flink 中的基本转换算子 (map、filter、flatMap 等) 在计算时不依赖其他数据,所以都属于无状态的算子。

在这里插入图片描述

  • 有状态的流处理:根据每一次当前输入的数据和一些其他已处理的数据共同转换输出结果的过程,这些其他已处理的数据就称之为状态(state),状态由任务维护,可以被任务的业务逻辑访问。例如,做求和(sum)计算时,需要当前输入的数据和保存的之前所有输入数据的和共同计算;窗口操作中会将当前达到的数据和保存的之前已经到达的所有数据共同处理。Flink 中的聚合算子和窗口算子都属于有状态的算子。

    在这里插入图片描述

2. Flink 的状态管理

  • 在传统的事务型处理架构中,状态数据一般是保存在数据库中的,在业务处理过程中与数据库交互进行状态的读取和更新;但对于大数据实时处理架构来说,在业务处理时频繁地读写外部数据库会造成性能达不到要求,因此不能使用数据库进行状态管理
  • 在实时流处理中一般将状态直接保存在内存中来保证性能,但必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题随之产生
  • Flink 拥有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态一致性、状态的高效存储和访问、持久化保存和故障恢复以及资源扩展时的调整。开发者只需要调用相应的 API 就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上

二、Flink 状态分类

1. 托管状态

Managed State,所有的托管状态都由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现

1.1 算子状态

Operator State,状态作用范围限定为当前的算子任务实例,只对当前的并行子任务实例有效;使用较少

在这里插入图片描述

  • 由同一并行任务所处理的所有数据都可以访问到相同的算子状态
  • 算子状态对于同一任务而言是共享的
  • 算子状态不能由相同或不同算子的另一个任务访问
1.1.1 算子状态数据结构
  • 列表状态(List state):将状态表示为一组数据的列表
  • 联合列表状态(Union list state):也是将状态表示为一组数据的列表。与列表状态的区别在于,在发生故障时或者从保存点(savepoint)启动应用程序时恢复的方式不同
  • 广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
1.1.2 案例
public class TestFlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});//定义一个有状态的map算子,用于统计输入数据个数DataStream<Integer> resultStream = dataStream.map(new MyCountMapper());resultStream.print();env.execute();}//定义有状态的 map 操作//实现 ListCheckpointed 接口,泛型为状态数据类型public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {//定义一个本地变量作为状态private Integer count = 0;@Overridepublic Integer map(SensorReading value) throws Exception {count++;return count;}//对状态做快照@Overridepublic List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {return Collections.singletonList(count);}//容错恢复状态@Overridepublic void restoreState(List<Integer> state) throws Exception {for(Integer num : state) {count += num;}}}}
1.2 按键分区状态

Keyed State,状态的作用范围以 key 来隔离,是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,即 keyBy 之后才可以使用

在这里插入图片描述

  • 在进行按键分区(keyBy)之后,具有相同 key 的所有数据,都会分配到同一个并行子任务中,这个任务会维护和处理这个 key 对应的状态实例
  • 一个并行子任务可能会处理多个 key 的数据,所以该任务会为每个 key 都维护一个状态实例
  • 在底层,同一个并行子任务的所有 KeyedState 会根据 key 保存成键值对(key-value)的形式,当一条数据到来时,任务会自动将状态的访问范围限定为当前数据的 key,并从键值对(key-value)存储中读取出对应的状态值
  • 具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的
  • 在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State 可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同
1.2.1 按键分区状态数据结构
//按键分区状态的实例化方法:在富函数中,调用 getRuntimeContext() 方法获取到运行时上下文之后
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN,  OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • 值状态:ValueState<T>,将状态表示为单个的值,值的类型为 T
    • ValueState.value():获取状态值
    • ValueState.update(T value):添加或更新状态值
    • ValueState.clear():清空操作
  • 列表状态:ListState<T>,将状态表示为一组数据的列表,列表里的元素的数据类型为 T
    • ListState.add(T value):追加状态值
    • ListState.addAll(List<T> values):追加状态值列表
    • ListState.get():获取状态值的 Iterable<T>
    • ListState.update(List<T> values):更新状态值列表
    • ListState.clear():清空操作
  • 映射状态:MapState<K, V>,将状态表示为一组 Key-Value 对
    • MapState.get(UK key):获取状态值
    • MapState.put(UK key , UV value):添加或更新状态值
    • MapState.contains(UK key):判断状态值是否存在
    • MapState.remove(UK key):删除状态值
    • MapState.clear():清空操作
  • 聚合状态:ReducingState<T>AggregatingState<I, O>,将状态表示为一个用于聚合操作的列表
    • ReducingState.add():聚合状态值,调用实例化 ReducingState 时自定义 ReduceFunction 中的方法;AggregatingState 同理
    • ReducingState.clear():清空操作,AggregatingState 同理
1.2.2 案例
/**按键分区状态的使用步骤:1. 在自定义算子Function中声明一个按键分区数据结构,由于声明时需要使用 getRuntimeContext(),因此要使用继承富函数类的方式自定义算子Function2. 在自定义算子Function的对应算子方法中进行状态的读写等相关操作
*/
public class TestFlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});/*需求:自定义有状态的map算子,按sensor_id统计个数*///使用按键分区状态必须先进行keyByDataStream<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());resultStream.print();env.execute();}//使用继承富函数类的方式自定义MapFunctionpublic static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {//定义一个值状态属性private ValueState<Integer> myValueState;//在open方法中实例化值状态@Overridepublic void open(Configuration parameters) throws Exception {myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("value-state", Integer.class));}@Overridepublic Integer map(SensorReading value) throws Exception {//获取状态值Integer count = myValueState.value();if(count == null) {count = 0;}count++;//更新状态值myValueState.update(count);return count;}}
}

2. 原始状态

Raw State,原始状态是自定义的,相当于开辟了一块内存,需要开发者自己管理,实现状态的序列化和故障恢复

  • Flink 不会对原始状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储
  • 只有在遇到托管状态无法实现的特殊需求时,才考虑使用原始状态;一般情况下不推荐使用

三、Flink 状态编程案例

/**需求:检测同一个传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警信息
*/
public class FlinkKeyedStateCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//定义一个有状态的 flatMap 操作,若同一个传感器连续两个温度的差值超过 10 度,则输出报警//报警信息:sensor_id,前一次温度值,当前温度值DataStream<Tuple3<String, Double, Double>> warningStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));warningStream.print();env.execute();}//使用继承富函数类的方式自定义FlatMapFunctionpublic static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {//定义温度差阈值属性private Double threshold;//定义值状态属性,保存上一次的温度值private ValueState<Double> lastTempState;public TempChangeWarning(Double threshold) {this.threshold = threshold;}//在open方法中实例化值状态@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));}//重写flatMap方法@Overridepublic void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {//获取上一次温度状态值Double lastTemp = lastTempState.value();//如果状态值不为null,则进行差值判断if(lastTemp != null) {Double diff = Math.abs(lastTemp - value.getTemperature());//差值超过阈值,则输出报警信息if(diff >= threshold) {out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));}}//更新状态值lastTempState.update(value.getTemperature());}//在close方法中清空状态@Overridepublic void close() throws Exception {lastTempState.clear();}}
}

四、Flink 状态后端

State Backends,一个可插入的决定状态的存储、访问以及维护等工作的组件

1. 介绍

​ 在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backends)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

2. 分类

  • MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储
    在 TaskManager 的 JVM 堆上;而将 checkpoint 存储在 JobManager 的内存中。

  • FsStateBackend:文件系统级的状态后端,对于本地状态,跟 MemoryStateBackend 一样,也会存储在 TaskManager 的 JVM 堆上,但会将 checkpoint 存储到远程的持久化文件系统(FileSystem)中,如 HDFS。

  • RocksDBStateBackend:将所有状态和 checkpoint 序列化后,存入本地的 RocksDB 中存储。RocksDBStateBackend 的支持并不直接包含在 flink 中,需要引入依赖。

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>1.10.1</version>
    </dependency>
    

3. 配置

3.1 配置文件配置
  • 进入 flink 安装目录下的 conf 目录,打开 flink-conf.yaml 文件

    cd /opt/module/flink/conf
    vim flink-conf.yaml
    
  • 在文件中的 Fault tolerance and checkpointing 部分进行配置

    #Fault tolerance and checkpointing
    #============================================================
    state.backend: filesystem #默认值为 filesystem,可选值为 jobmanager/filesystem/rocksdb#state.checkpoints.dir: hdfs://namenode:port/flink/checkpointsjobmanager.execution.failover-strategy: region #容错恢复策略,默认是按区域恢复
    
3.2 代码配置

在代码中为每个作业单独配置状态后端

public class TestStatebackend {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//配置状态后端//1.MemoryStateBackendenv.setStateBackend(new MemoryStateBackend());//2.FsStateBackendenv.setStateBackend(new FsStateBackend("hdfs://......"));//3.RocksDBStateBackend,需要先引入依赖env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});dataStream.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/25535.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Set up a WordPress blog with Nginx

CentOS7 配置Nginx域名HTTPS Here is the revised guideline for setting up a WordPress blog with Nginx: Step 1: Install Nginx, MySQL, and PHP (LEMP Stack) Install Nginx: sudo yum install nginx sudo systemctl start nginx sudo systemctl enable nginxInstall MyS…

Java_字符串、字符与数字之间的相互转换

一、数字转字符串 //将整个数字转化为字符串int i456;//方法一 String str1 Integer.toString(i);System.out.println(str1);//方法二String str2i"";System.out.println(str2);二、字符串转数字 //整数方法一String str"123";int num1Integer.parseInt(st…

Vue3路由跳转并传递参数

文章目录 1. 前言2. 准备工作2.1 编写路由规则2.2 源页面2.3 目标页面 3. 源页面如何传递参数给目标页面3.1 通过 router-link 标签传递参数&#xff08;很少使用&#xff09;3.2 通过 js 代码传递参数&#xff08;经常使用&#xff09; 4. 目标页面接收源页面传递过来的参数5.…

台积电代工!Intel新AI PC芯片Lunar Lake发布:AI算力120TOPS!

根据英特尔披露的数据显示&#xff0c;Lunar Lake的GPU性能提升50%、NPU内核的AI算力增加了四倍、SoC耗电量减少40%、GPU AI算力增加3.5倍&#xff0c;整个SoC的算力超过了120TOPS。 6月4日&#xff0c;英特尔CEO帕特基辛格在COMPUTEX 2024上发表主题演讲&#xff0c;正式公布…

在 React 应用中,怎么封装一个路由权限

在React应用中,封装一个路由权限控制通常涉及到几个关键步骤。这通常涉及到React Router(特别是React Router v5或v6)和自定义的权限检查逻辑。以下是一个基本的步骤指南,以及如何使用React Hooks(如useEffect和useState)来封装路由权限: 定义权限检查逻辑: 首先,你需…

如何确保redis缓存中的数据与数据库一致

一、双写模式&#xff1a; 在写入数据库时&#xff0c;也写入缓存。 二&#xff1a;失效模式&#xff1a; 在写入新数据后&#xff0c;删除缓存中数据&#xff0c;下次请求时查询数据库&#xff0c;并把查到的最新数据写入缓存。 不管是双写模式还是失效模式&#xff0c;缓…

Letcode-Top 100二叉树专题

94. 二叉树的中序遍历 方法一&#xff1a;递归法 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeN…

SpringBoot的学习要点

目录 SpringBoot 创建项目 配置文件 注解 命名规范 SpringBoot整合第三方技术 …… 中文文档&#xff1a;Spring Boot 中文文档 SpringBoot Spring Boot 是基于 Spring 框架的一种快速构建微服务应用的方式它主要提供了自动配置、简化配置、运行时应用监控等功能它…

大水文之------端午练练JS好了

最近有点不太知道要干啥了&#xff0c;昨天看了集cocos的介绍&#xff0c;下载了个DashBoard&#xff0c;看了看里面的内容&#xff0c;确实有点小震惊&#xff0c;还有些免费的源码可以学习&#xff0c;挺好的。 昨天学习ts&#xff0c;感觉自己的js水平好像不太行&#xff0c…

Functional ALV系列 (10) - 将填充FieldCatalog封装成函数

在前面的博文中&#xff0c;已经讲了封装的思路和实现&#xff0c;主要是利用 cl_salv_data_descr>read_structdescr () 方法来实现。在这里&#xff0c;贴出代码方便大家参考。 编写获取内表组件的通用方法 form frm_get_fields using pt_data type any tablechanging…

排序---基数排序

前言 个人小记 一、简介 基数排序是一种非比较排序&#xff0c;所以排序速度较快&#xff0c;当为32位int整数排序时&#xff0c;可以将数分为个位十位分别为2^16,使得拷贝只需要两轮&#xff0c;从而达到2*n&#xff0c;然后给一个偏移量&#xff0c;使得可以对负数排序。以…

C++期末复习提纲(血小板)

目录 1.this指针 2.静态成员变量 3.面向对象程序设计第一阶段 4.面向对象程序设计第二阶段 5.面向对象程序设计第三阶段 6.简答题 &#xff08;1&#xff09;拷贝构造函数执行的三种情况&#xff1a; &#xff08;2&#xff09;虚析构函数的作用&#xff1a; &#xff…

55.ReentrantReadWriteLock应用于缓存

简单的缓存案例 模拟一个数据层dao @Slf4j public class GenericDao {public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {try {log.debug("进入数据库查询.....");Constructor<T> constructor = beanClass.getDeclaredCo…

如何看待华为去google化自己做鸿蒙系统,对开发人员有什么影响

华为去Google化并自主研发鸿蒙系统是一个重要的战略决策&#xff0c;这一决策对开发人员产生了深远的影响。以下是对这一决策及其对开发人员影响的详细分析&#xff1a; 一、华为去Google化自主研发鸿蒙系统的背景 在美国的技术封锁和限制下&#xff0c;华为面临着使用Androi…

Python基础——字符串

一、Python的字符串简介 Python中的字符串是一种计算机程序中常用的数据类型【可将字符串看作是一个由字母、数字、符号组成的序列容器】&#xff0c;字符串可以用来表示文本数据。 通常使用一对英文的单引号&#xff08;&#xff09;或者双引号&#xff08;"&#xff09;…

html接口响应断言

接口响应值除类json格式&#xff0c;还有html格式 断言步骤 第一步&#xff1a;替换空格replace 原本返回的格式和网页内容一致&#xff0c;每行前面有很多空格&#xff0c;需要去除这些空格 第二步&#xff1a;分割split 因为行与行之前有回车符&#xff0c;所以把回车符替…

Spring之SpringMVC源码

SpringMVC源码 一、SpringMVC的基本结构 1.MVC简介 以前的纯Servlet的处理方式&#xff1a; Overrideprotected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {String type req.getParameter(Constant.REQUEST_PA…

【Java面试】十六、并发篇:线程基础

文章目录 1、进程和线程的区别2、并行和并发的区别3、创建线程的四种方式3.1 Runnable和Callable创建线程的区别3.2 线程的run和start 4、线程的所有状态与生命周期5、新建T1、T2、T3&#xff0c;如何保证线程的执行顺序6、notify和notifyAll方法有什么区别7、wait方法和sleep方…

QT-轻量级的笔记软件MyNote

MyNote v2.0 一个轻量级的笔记软件&#x1f4d4; Github项目地址: https://github.com/chandlerye/MyNote/tree/main 应用简介 MyNote v2.0 是一款个人笔记管理软件&#xff0c;没有复杂的功能&#xff0c;旨在提供便捷的笔记记录、管理以及云同步功能。基于Qt 6.6.3 个人开…

MATLAB入门知识

目录 原教程链接&#xff1a;数学建模清风老师《MATLAB教程新手入门篇》https://www.bilibili.com/video/BV1dN4y1Q7Kt/ 前言 历史记录 脚本文件&#xff08;.m&#xff09; Matlab帮助系统 注释 ans pi inf无穷大 -inf负无穷大 i j虚数单位 eps浮点相对精度 0/&a…