一、Flink 状态介绍
1. 流处理的无状态和有状态
- 无状态的流处理:根据每一次当前输入的数据直接转换输出结果的过程,在处理中只需要观察每个输入的独立事件。例如, 将一个字符串类型的数据拆分开作为元组输出或将每个输入的数值加 1 后输出。Flink 中的基本转换算子 (map、filter、flatMap 等) 在计算时不依赖其他数据,所以都属于无状态的算子。
-
有状态的流处理:根据每一次当前输入的数据和一些其他已处理的数据共同转换输出结果的过程,这些其他已处理的数据就称之为状态(state),状态由任务维护,可以被任务的业务逻辑访问。例如,做求和(sum)计算时,需要当前输入的数据和保存的之前所有输入数据的和共同计算;窗口操作中会将当前达到的数据和保存的之前已经到达的所有数据共同处理。Flink 中的聚合算子和窗口算子都属于有状态的算子。
2. Flink 的状态管理
- 在传统的事务型处理架构中,状态数据一般是保存在数据库中的,在业务处理过程中与数据库交互进行状态的读取和更新;但对于大数据实时处理架构来说,在业务处理时频繁地读写外部数据库会造成性能达不到要求,因此不能使用数据库进行状态管理
- 在实时流处理中一般将状态直接保存在内存中来保证性能,但必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题随之产生
- Flink 拥有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态一致性、状态的高效存储和访问、持久化保存和故障恢复以及资源扩展时的调整。开发者只需要调用相应的 API 就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上
二、Flink 状态分类
1. 托管状态
Managed State,所有的托管状态都由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现
1.1 算子状态
Operator State,状态作用范围限定为当前的算子任务实例,只对当前的并行子任务实例有效;使用较少
- 由同一并行任务所处理的所有数据都可以访问到相同的算子状态
- 算子状态对于同一任务而言是共享的
- 算子状态不能由相同或不同算子的另一个任务访问
1.1.1 算子状态数据结构
- 列表状态(List state):将状态表示为一组数据的列表
- 联合列表状态(Union list state):也是将状态表示为一组数据的列表。与列表状态的区别在于,在发生故障时或者从保存点(savepoint)启动应用程序时恢复的方式不同
- 广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
1.1.2 案例
public class TestFlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});//定义一个有状态的map算子,用于统计输入数据个数DataStream<Integer> resultStream = dataStream.map(new MyCountMapper());resultStream.print();env.execute();}//定义有状态的 map 操作//实现 ListCheckpointed 接口,泛型为状态数据类型public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {//定义一个本地变量作为状态private Integer count = 0;@Overridepublic Integer map(SensorReading value) throws Exception {count++;return count;}//对状态做快照@Overridepublic List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {return Collections.singletonList(count);}//容错恢复状态@Overridepublic void restoreState(List<Integer> state) throws Exception {for(Integer num : state) {count += num;}}}}
1.2 按键分区状态
Keyed State,状态的作用范围以 key 来隔离,是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,即 keyBy 之后才可以使用
- 在进行按键分区(keyBy)之后,具有相同 key 的所有数据,都会分配到同一个并行子任务中,这个任务会维护和处理这个 key 对应的状态实例
- 一个并行子任务可能会处理多个 key 的数据,所以该任务会为每个 key 都维护一个状态实例
- 在底层,同一个并行子任务的所有 KeyedState 会根据 key 保存成键值对(key-value)的形式,当一条数据到来时,任务会自动将状态的访问范围限定为当前数据的 key,并从键值对(key-value)存储中读取出对应的状态值
- 具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的
- 在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State 可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同
1.2.1 按键分区状态数据结构
//按键分区状态的实例化方法:在富函数中,调用 getRuntimeContext() 方法获取到运行时上下文之后
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
- 值状态:
ValueState<T>
,将状态表示为单个的值,值的类型为 TValueState.value()
:获取状态值ValueState.update(T value)
:添加或更新状态值ValueState.clear()
:清空操作
- 列表状态:
ListState<T>
,将状态表示为一组数据的列表,列表里的元素的数据类型为 TListState.add(T value)
:追加状态值ListState.addAll(List<T> values)
:追加状态值列表ListState.get()
:获取状态值的Iterable<T>
ListState.update(List<T> values)
:更新状态值列表ListState.clear()
:清空操作
- 映射状态:
MapState<K, V>
,将状态表示为一组 Key-Value 对MapState.get(UK key)
:获取状态值MapState.put(UK key , UV value)
:添加或更新状态值MapState.contains(UK key)
:判断状态值是否存在MapState.remove(UK key)
:删除状态值MapState.clear()
:清空操作
- 聚合状态:
ReducingState<T>
和AggregatingState<I, O>
,将状态表示为一个用于聚合操作的列表ReducingState.add()
:聚合状态值,调用实例化 ReducingState 时自定义 ReduceFunction 中的方法;AggregatingState 同理ReducingState.clear()
:清空操作,AggregatingState 同理
1.2.2 案例
/**按键分区状态的使用步骤:1. 在自定义算子Function中声明一个按键分区数据结构,由于声明时需要使用 getRuntimeContext(),因此要使用继承富函数类的方式自定义算子Function2. 在自定义算子Function的对应算子方法中进行状态的读写等相关操作
*/
public class TestFlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});/*需求:自定义有状态的map算子,按sensor_id统计个数*///使用按键分区状态必须先进行keyByDataStream<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());resultStream.print();env.execute();}//使用继承富函数类的方式自定义MapFunctionpublic static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {//定义一个值状态属性private ValueState<Integer> myValueState;//在open方法中实例化值状态@Overridepublic void open(Configuration parameters) throws Exception {myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("value-state", Integer.class));}@Overridepublic Integer map(SensorReading value) throws Exception {//获取状态值Integer count = myValueState.value();if(count == null) {count = 0;}count++;//更新状态值myValueState.update(count);return count;}}
}
2. 原始状态
Raw State,原始状态是自定义的,相当于开辟了一块内存,需要开发者自己管理,实现状态的序列化和故障恢复
- Flink 不会对原始状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储
- 只有在遇到托管状态无法实现的特殊需求时,才考虑使用原始状态;一般情况下不推荐使用
三、Flink 状态编程案例
/**需求:检测同一个传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警信息
*/
public class FlinkKeyedStateCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//定义一个有状态的 flatMap 操作,若同一个传感器连续两个温度的差值超过 10 度,则输出报警//报警信息:sensor_id,前一次温度值,当前温度值DataStream<Tuple3<String, Double, Double>> warningStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));warningStream.print();env.execute();}//使用继承富函数类的方式自定义FlatMapFunctionpublic static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {//定义温度差阈值属性private Double threshold;//定义值状态属性,保存上一次的温度值private ValueState<Double> lastTempState;public TempChangeWarning(Double threshold) {this.threshold = threshold;}//在open方法中实例化值状态@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));}//重写flatMap方法@Overridepublic void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {//获取上一次温度状态值Double lastTemp = lastTempState.value();//如果状态值不为null,则进行差值判断if(lastTemp != null) {Double diff = Math.abs(lastTemp - value.getTemperature());//差值超过阈值,则输出报警信息if(diff >= threshold) {out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));}}//更新状态值lastTempState.update(value.getTemperature());}//在close方法中清空状态@Overridepublic void close() throws Exception {lastTempState.clear();}}
}
四、Flink 状态后端
State Backends,一个可插入的决定状态的存储、访问以及维护等工作的组件
1. 介绍
在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backends)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。
2. 分类
-
MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储
在 TaskManager 的 JVM 堆上;而将 checkpoint 存储在 JobManager 的内存中。 -
FsStateBackend:文件系统级的状态后端,对于本地状态,跟 MemoryStateBackend 一样,也会存储在 TaskManager 的 JVM 堆上,但会将 checkpoint 存储到远程的持久化文件系统(FileSystem)中,如 HDFS。
-
RocksDBStateBackend:将所有状态和 checkpoint 序列化后,存入本地的 RocksDB 中存储。RocksDBStateBackend 的支持并不直接包含在 flink 中,需要引入依赖。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>1.10.1</version> </dependency>
3. 配置
3.1 配置文件配置
-
进入 flink 安装目录下的 conf 目录,打开
flink-conf.yaml
文件cd /opt/module/flink/conf vim flink-conf.yaml
-
在文件中的
Fault tolerance and checkpointing
部分进行配置#Fault tolerance and checkpointing #============================================================ state.backend: filesystem #默认值为 filesystem,可选值为 jobmanager/filesystem/rocksdb#state.checkpoints.dir: hdfs://namenode:port/flink/checkpointsjobmanager.execution.failover-strategy: region #容错恢复策略,默认是按区域恢复
3.2 代码配置
在代码中为每个作业单独配置状态后端
public class TestStatebackend {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//配置状态后端//1.MemoryStateBackendenv.setStateBackend(new MemoryStateBackend());//2.FsStateBackendenv.setStateBackend(new FsStateBackend("hdfs://......"));//3.RocksDBStateBackend,需要先引入依赖env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});dataStream.print();env.execute();}
}