Flink之状态管理

Flink状态管理

  • 状态
    • 概述
    • 状态分类
  • 键控、按键分区状态
    • 概述
    • 值状态 ValueState
    • 列表状态 ListState
    • Map状态 MapState
    • 归约状态 ReducingState
    • 聚合状态 Aggregating State
  • 算子状态
    • 概述
    • 列表状态 ListState
    • 联合列表状态 UnionListState
    • 广播状态 Broadcast State
  • 状态有效期 (TTL)
    • 概述
    • StateTtlConfig 配置对象
    • 参数说明
    • 清理
    • 使用示例
  • 状态后端 State Backend
    • 概述
    • 可用状态后端
    • 状态后端的配置

状态

概述

在流处理任务中,数据会以连续的流的形式输入到Flink中,而状态计算允许我们跟踪和处理这些输入数据的状态信息。状态可以是任何需要记录和使用的数据,例如聚合计数、累积结果、窗口中的中间状态等。

Flink中的状态管理是指在流处理任务中对数据的状态进行有效管理和维护的过程。状态管理是非常重要的,因为它允许我们在流式处理中维护和操作数据的状态信息,以实现复杂的计算逻辑和应用需求。

图片来源于网络,如有侵权,联系删除

状态分类

在Flink中,Flink状态有两种:系统状态Managed State和原始状态Raw State。通常使用系统状态,而原始状态则需要自定义实现。

系统状态根据数据集是否按照某一个Key进行分区,将状态分为算子状态Operator State和按键分区状态Keyed State。

1.系统状态

由Flink管理的全局状态,可以在整个应用程序中共享。系统状态与算子或键无关,可以被整个应用程序中的所有算子访问和更新。

2.原始状态

原始状态是一种低级别的状态表示形式,它提供了一种灵活的方式来定义和管理状态。它允许开发人员自定义状态的存储和访问方式,以满足特定的需求。

3.算子状态

用于在算子之间维护中间结果、聚合状态等。它与具体的算子实例绑定,与其他算子实例的状态相互独立。算子状态是分布式的,可以在故障恢复时进行检查点和状态恢复。

一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

图片来源于网络,如有侵权,联系删除。
4.按键分区状态

与流的键相关联的状态,用于存储和管理与每个键相关的数据信息。按键分区状态能在Keyed Stream或Keyed ProcessFunction中使用。它会根据键将数据进行分区,保证相同键的数据会被同一个状态管理。

很多有状态的操作,如聚合、窗口都是要先做keyBy进行按键分区,之后任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。

(图片来源于网络,如有侵权,联系删除。)

键控、按键分区状态

概述

按键分区状态Keyed State是任务按照键key来访问和维护的状态。它就是以key为作用范围进行隔离。

注意:

使用按键分区状态必须基于Keyed Stream。没有进行keyBy分区的Data Stream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问按键分区状态。

Keyed State在Flink中分为不同类型,具体支持的状态类型如下所示:

ValueState<T>:存储和访问单个值的状态,通常是一个单一的状态值。它可以用于存储中间结果、累加器等ListState<T>:存储和访问元素列表的状态,通常用于按键分区的列表操作MapState<UK,UV>:存储和访问键值对的状态,通常用于需要以键-值对形式存储和检索数据的情况AggregatingState<IN,OUT>:使用用户定义的聚合函数来逐个聚合元素的状态,通常用于对数据进行聚合操作,如计算平均值ReducingState<T>:使用用户定义的reduce函数来逐个聚合元素的状态,通常用于聚合操作,如求和

值状态 ValueState

值状态(ValueState)是Flink中的一种状态类型,用于存储和访问单个值。它可以用于在状态中保存和维护一个单一的值。

值状态通常用于在状态中存储一些需要随时间更新的值,例如计数器、累加器、最大/最小值等。

接口如下:

// T是泛型,表示状态的数据内容可以是任何具体的数据类型
public interface ValueState<T> extends State {// 获取当前状态的值T value() throws IOException;// 对状态进行更新,传入的参数value就是要覆写的状态值void update(T var1) throws IOException;
}

创建一个状态描述器StateDescriptor来提供状态的基本信息,状态描述器构造方法如下

public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {// 需要传入状态的名称和类型public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {super(name, typeInfo, (Object)null);}	 
}

当前输入数据与上一条数据差值比较:

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态ValueState<Integer> lastState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型lastState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastState", Types.INT));}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 取出上一条数据的水位值,注意Integer默认值是nullint lastValue = lastState.value() == null ? 0 : lastState.value();// 求水位线差值的绝对值>5的数据Integer currentValue = value.f1;if (Math.abs(currentValue - lastValue) > 5) {out.collect("窗口:" + value.f0 + " 数据:" + value + " 当前值" + currentValue + " 上一条数据值:" + lastValue + " 差值>5");}// 更新状态里的水位值lastState.update(currentValue);}}

输入测试数据:

>nc -lk 8086
key1,5
key1,7
key1,13
key1,20
key1,10

控制台输出结果:

窗口:key1 数据:(key1,13) 当前值13 上一条数据值:7 差值>5
窗口:key1 数据:(key1,20) 当前值20 上一条数据值:13 差值>5
窗口:key1 数据:(key1,10) 当前值10 上一条数据值:20 差值>5

列表状态 ListState

列表状态(ListState)是Flink中的一种状态类型,用于存储和访问元素列表。它可以用于在状态中保存和维护一组元素,并对列表中的元素进行添加、删除和访问操作。

列表状态通常用于需要在状态中保存多个元素的场景,例如累积计算、聚合操作或缓冲区管理等。

在ListState接口中同样有一个类型参数T,表示列表中数据的类型。

public interface ListState<T> extends MergingState<T, Iterable<T>> {void update(List<T> var1) throws Exception;void addAll(List<T> var1) throws Exception;
}

ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>update(List<T> values):传入一个列表values,直接对状态进行覆盖add(T value):在状态列表中添加一个元素valueaddAll(List<T> values):向列表中添加多个元素,以列表values形式传入void clear(): 清空List状态 本组数据

ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

定义一个描述列表状态的描述符。描述符指定状态的名称和类型,状态描述器构造方法如下

public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> {public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {super(name, new ListTypeInfo(elementTypeInfo), (Object)null);}
}    

取流中3个最大值,且排序

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态ListState<Integer> listState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("listState", Types.INT));}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 来一条数据则存list状态里listState.add(value.f1);// 从list状态拿出来,得到一个IterableIterable<Integer> iterableList = listState.get();// 拷贝到List中List<Integer> list = new ArrayList<>();for (Integer val : iterableList) {list.add(val);}// 对List进行降序排序list.sort((o1, o2) -> o2 - o1);// list中的个数是连续变大的,一但超过3个就立即清理if (list.size() > 3) {// 元素清除,清除第4个list.remove(3);}out.collect("keyBy:" + value.f0 + " 当前数据:" + value + " 最大3个水位值:" + list.toString());// 更新list状态listState.update(list);}}
key1,1
key1,5
key1,7
key1,8
key1,9
keyBy:key1 当前数据:(key1,1) 最大3个水位值:[1]
keyBy:key1 当前数据:(key1,5) 最大3个水位值:[5, 1]
keyBy:key1 当前数据:(key1,7) 最大3个水位值:[7, 5, 1]
keyBy:key1 当前数据:(key1,8) 最大3个水位值:[8, 7, 5]
keyBy:key1 当前数据:(key1,9) 最大3个水位值:[9, 8, 7]

Map状态 MapState

Map 状态(MapState)是 Flink 中的一种状态类型,用于存储和访问键值对。它可以用于在状态中保存和维护一组键值对。

Map 状态通常用于需要根据键进行查找和更新的场景,例如缓存、索引、关联操作等。对应的是MapState<UK, UV>接口,有UK、UV两个泛型,分别表示保存的key和value的类型。

MapState提供了操作映射状态的方法,与Map的使用非常类似。另外,MapState也提供了获取整个映射相关信息的方法

public interface MapState<UK, UV> extends State {// 传入一个key作为参数,查询对应的value值UV get(UK var1) throws Exception;// 传入一个键值对,更新key对应的value值void put(UK var1, UV var2) throws Exception;// 将传入的映射map中所有的键值对,全部添加到映射状态中void putAll(Map<UK, UV> var1) throws Exception;// 将指定key对应的键值对删除void remove(UK var1) throws Exception;// 判断是否存在指定的key,返回一个boolean值boolean contains(UK var1) throws Exception;// 获取映射状态中所有的键值对Iterable<Map.Entry<UK, UV>> entries() throws Exception;// 获取映射状态中所有的键(key),返回一个可迭代Iterable类型Iterable<UK> keys() throws Exception;// 获取映射状态中所有的值(value),返回一个可迭代Iterable类型Iterable<UV> values() throws Exception;// 获取迭代器Iterator<Map.Entry<UK, UV>> iterator() throws Exception;// 判断映射是否为空,返回一个boolean值boolean isEmpty() throws Exception;
}

模拟统计 数字 出现频率计数

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态MapState<Integer, Integer> mapState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("mapState", Types.INT, Types.INT));}/*** 模拟统计 数字 出现频率计数*/@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 判断是否存在对应的keyInteger number = value.f1;if (mapState.contains(number)) {// 包含key,直接对value+1Integer count = mapState.get(number);mapState.put(number, ++count);} else {// 不包含key,初始化mapState.put(number, 1);}out.collect("keyBy:" + value.f0 + " 数字:" + number + " 出现次数:" + mapState.get(number));}}
nc -lk 8086
key1,1
key1,1
key1,2
key1,3
key1,2
key1,1
keyBy:key1 数字:1 出现次数:1
keyBy:key1 数字:1 出现次数:2
keyBy:key1 数字:2 出现次数:1
keyBy:key1 数字:3 出现次数:1
keyBy:key1 数字:2 出现次数:2
keyBy:key1 数字:1 出现次数:3

归约状态 ReducingState

归约状态(Reducing State)是 Flink 中一种特殊类型的状态,用于对输入流进行归约操作。归约操作将输入流中的元素逐个进行聚合,生成一个汇总的结果值。不同于普通的 Map、List 或 Value 状态,归约状态可以在接收到新的元素时,对当前的状态值进行相应的归约操作。

归约状态ReducingState类似于值状态,不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。

使用接口ReducingState,调用的方法类似于ListState,只不过它保存的只是一个聚合值,调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

// 对Reducing状态,获取结果
OUT get() throws Exception;
// 对Reducing状态,添加数据
void add(IN var1) throws Exception;
// 对Reducing状态,清空数据
void clear();

创建一个状态描述器StateDescriptor来提供状态的基本信息,状态描述器构造方法如下

public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {private final ReduceFunction<T> reduceFunction;public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) {super(name, typeInfo, (Object)null);this.reduceFunction = (ReduceFunction)Preconditions.checkNotNull(reduceFunction);}
}

使用归约状态来计算输入流中的累计和

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态ReducingState<Integer> reducingState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称、reduceFunction、存储类型reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("reducingState", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, Types.INT));}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 来一条数据则添加到reducing状态里reducingState.add(value.f1);// 对本组的Reducing状态,获取结果Integer sum = reducingState.get();out.collect("keyBy:" + value.f0 + " 当前数据:" + value + " 水位值合计:" + sum);}}
key1,1
key1,2
key1,3
keyBy:key1 当前数据:(key1,1) 水位值合计:1
keyBy:key1 当前数据:(key1,2) 水位值合计:3
keyBy:key1 当前数据:(key1,3) 水位值合计:6

聚合状态 Aggregating State

聚合状态是Flink 中一种特殊类型的状态,用于对输入流进行聚合操作。聚合操作将输入流中的元素逐个进行聚合,并生成一个汇总的结果值。与归约状态不同,聚合状态可以在接收到新的元素时,根据自定义的聚合逻辑对当前的状态值进行增量聚合。

AggregatingState接口相关方法

// 对Reducing状态,获取结果
OUT get() throws Exception;
// 对Reducing状态,添加数据
void add(IN var1) throws Exception;
// 对Reducing状态,清空数据
void clear();

与归约状态不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction来定义的
。里面通过一个累加器Accumulator来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {private final AggregateFunction<IN, ACC, OUT> aggFunction;public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, TypeInformation<ACC> stateType) {super(name, stateType, (Object)null);this.aggFunction = (AggregateFunction)Preconditions.checkNotNull(aggFunction);}
}

模拟统计 数字 出现频率计数

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}/*** param1 键的类型* param2 输入类型* param3 输出元素的类型*/public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态AggregatingState<Integer, HashMap<Integer, Integer>> aggregatingState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称、AggregateFunction、累加器类型aggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, HashMap<Integer, Integer>, HashMap<Integer, Integer>>("aggregatingState",new MyAggregateFunction(),TypeInformation.of(new TypeHint<HashMap<Integer, Integer>>() {})));}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 将水位值添加到聚合状态中aggregatingState.add(value.f1);// 从聚合状态中获取结果HashMap<Integer, Integer> res = aggregatingState.get();out.collect("keyBy:" + value.f0 + " 数字:" + value.f1 + " 出现次数:" + res.get(value.f1));}}/*** param1 聚合的值的类型 (输入值)* param2 累加器的类型 (中间聚合状态)* param3 聚合结果的类型*/public static class MyAggregateFunction implements AggregateFunction<Integer, HashMap<Integer, Integer>, HashMap<Integer, Integer>> {// 创建累加器,类型HashMap<Integer, Integer>@Overridepublic HashMap<Integer, Integer> createAccumulator() {HashMap<Integer, Integer> map = new HashMap<>();return map;}@Overridepublic HashMap<Integer, Integer> add(Integer value, HashMap<Integer, Integer> accumulator) {if (accumulator.containsKey(value)) {Integer sum = accumulator.get(value) + 1;accumulator.put(value, sum);} else {accumulator.put(value, 1);}return accumulator;}@Overridepublic HashMap<Integer, Integer> getResult(HashMap<Integer, Integer> accumulator) {return accumulator;}@Overridepublic HashMap<Integer, Integer> merge(HashMap<Integer, Integer> a, HashMap<Integer, Integer> b) {return null;}}
key1,1
key1,2
key1,3
key1,2
key1,3
key1,2
keyBy:key1 数字:1 出现次数:1
keyBy:key1 数字:2 出现次数:1
keyBy:key1 数字:3 出现次数:1
keyBy:key1 数字:2 出现次数:2
keyBy:key1 数字:3 出现次数:2
keyBy:key1 数字:2 出现次数:3

算子状态

概述

算子状态(Operator State)是 Flink 中一种用于保存和管理算子(Operator)状态的机制。算子状态通常用于在算子之间保持一些中间结果,或者用于保存全局信息。

算子状态是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个算子状态。

算子状态一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。

当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

在Flink中,算子任务可以分为无状态和有状态两种情况。

无状态算子

无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。例如:基本转换算子map、filter、flatMap等计算时不依赖其他数据,就都属于无状态的算子。

有状态算子

有状态的算子任务,除当前数据之外,还需要一些其他数据来得到计算结果。其他数据就是所谓的状态。例如:聚合算子、窗口算子都属于有状态的算子。

算子状态有以下几个特点:

算子状态是与算子实例绑定的,每个算子实例都会维护自己的状态。这意味着在并行计算中,每个并行实例都会有独立的状态算子状态可以是一种类型,也可以是多种类型的组合。常见的算子状态类型包括 ValueState、ListState、MapState 等算子状态可以在算子实例之间进行快速的备份和恢复,以保证程序的容错性算子状态可以存储在内存中,也可以通过配置选择将其存储在外部持久化存储系统中,如 RocksDB

算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

列表状态 ListState

在算子状态的上下文中,不会按键分别处理状态,每一个并行子任务上会保留一个列表

当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个大列表,然后再均匀地分配给所有并行任务。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流DataStreamSource<String> source = env.socketTextStream("IP", 8086);source.map(new MyMapFunction()).print();env.execute();}// 实现CheckpointedFunction接口public static class MyMapFunction implements MapFunction<String, Integer>, CheckpointedFunction {// 本地变量private Integer count = 0;// 定义状态private ListState<Integer> state;@Overridepublic Integer map(String value) throws Exception {return ++count;}/*** 本地变量持久化:将本地变量拷贝到算子状态中,开启checkpoint时才会调用*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState...");// 清空算子状态state.clear();// 将 本地变量 添加到 算子状态 中state.add(count);}/*** 初始化本地变量:程序启动和恢复时, 从状态中把数据添加到本地变量,每个子任务调用一次*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState...");// 从上下文初始化 子状态state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Integer>("state", Types.INT));// 从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Integer val : state.get()) {count += val;}}}}

初始化本地变量方法与并行度设置有关

initializeState...
initializeState...

输入测试数据

1
2
3
4
1> 1
2> 1
1> 2
2> 2

联合列表状态 UnionListState

它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。

并行度缩放之后的并行子任务就获取到了联合后完整的大列表,可以自行选择要使用的状态项和要丢弃的状态项。

        /*** 初始化本地变量:程序启动和恢复时, 从状态中把数据添加到本地变量,每个子任务调用一次*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState...");// 从上下文初始化 子状态state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Integer>("union-state", Types.INT));// 从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Integer val : state.get()) {count += val;}}}

广播状态 Broadcast State

广播状态是 Flink 中一种特殊的算子状态类型,可用于在流处理任务中将数据广播到所有并行任务中共享和访问。它适用于将少量的全局信息广播到算子的每个实例,以便进行更灵活的计算。

因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展。而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接删除,因为状态都是复制出来的,并不会丢失

广播状态具有以下特点:

广播状态只需要占用少量的内存,因为它通常用于存储比较小的全局数据或配置信息广播状态在整个任务中共享,使得每个算子实例都可以访问广播状态中的数据,而无需进行网络通信广播状态在任务开始时被广播并分发到每个算子实例,保持数据的一致性

更改广播状态示例

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> sourceMap = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}});// 广播配置流DataStreamSource<String> dataStreamSource = env.socketTextStream("IP", 8087);// 使用给定的名称和给定的类型信息新建一个MapStateDescriptorMapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);// 得到广播流BroadcastStream<String> broadcastStream = dataStreamSource.broadcast(broadcastMapState);// 数据流和广播配置流进行关联BroadcastConnectedStream<Tuple2<String, Integer>, String> broadcastConnectedStream = sourceMap.connect(broadcastStream);// 调用 processbroadcastConnectedStream.process(new BroadcastProcessFunction<Tuple2<String, Integer>, String, String>() {/*** 数据流的处理* 数据流只能读取广播状态,不能修改* @param value 非广播侧的输入类型* @param ctx 广播端的输入类型* @param out 运算符的输出类型*/@Overridepublic void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 通过上下文获取广播状态,取出值ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);Integer configValue = broadcastState.get("myConfig");// 注意:刚启动时,可能是数据流的第一条数据先来configValue = (configValue == null ? 0 : configValue);if (value.f1 > configValue) {out.collect("输入数字:" + value.f1 + " > 广播状态值:" + configValue);} else {out.collect("输入数字:" + value.f1 + " <= 广播状态值:" + configValue);}}/*** 广播后配置流的处理* 只有广播流才能修改广播状态*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {// 通过上下文获取广播状态,往里面写数据BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);broadcastState.put("myConfig", Integer.valueOf(value));}}).print();env.execute();}

输入测试数据

nc -lk 8086
key1,1
key1,2
key1,3

输出:

1> 输入数字:1 > 广播状态值:0
2> 输入数字:2 > 广播状态值:0
1> 输入数字:3 > 广播状态值:0

更改广播状态

nc -lk 8087
5

输入测试数据

nc -lk 8086
key1,6
key1,8

输出:

2> 输入数字:6 > 广播状态值:5
1> 输入数字:8 > 广播状态值:5

状态有效期 (TTL)

概述

状态效期、生存时间(State TTL,Time-to-Live)是 Flink 中的一个功能,用于为状态设置过期时间。通过设置状态生存时间,可以自动清理过期的状态数据,避免无限增长的状态。

任何类型的keyed state都可以有 有效期 (TTL)。如果配置了TTL且状态值已过期,则会尽最大可能清除对应的值

所有状态类型都支持单元素的TTL。 这意味着列表元素和映射元素将独立到期。

StateTtlConfig 配置对象

配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的enableTimeToLive()方法启动TTL功能。

创建一个StateTtlConfig配置对象

        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();

启动TTL功能

        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("MyState", String.class);valueStateDescriptor.enableTimeToLive(stateTtlConfig);

参数说明

newBuilder()

状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig。方法需要传入一个Time作为参数,这就是设定的状态生存时间。

在这里插入图片描述

setUpdateType()

设置更新类型。更新类型指定了什么时候更新状态失效时间

在这里插入图片描述

DisabledTTL 已禁用。这意味着状态不会过期,它将一直保持有效,直到显式删除或状态存储由于其他原因而被清理OnCreateAndWrite:表示只有创建状态和更改状态(写操作)时更新失效时间。配置默认为OnCreateAndWriteOnReadAndWrite:表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间

setStateVisibility()

设置状态的可见性。状态可见性是指因为清除操作并不是实时的,当状态过期之后还可能继续存在,如果对它进行访问,能否正常读取到是一个问题

在这里插入图片描述

NeverReturnExpired:默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除,不能继续读取ReturnExpireDefNotCleanedUp:如果过期状态还存在,就返回它的值

清理

过期数据的清理

默认情况下,过期数据会在读取的时候被删除,同时会有后台线程定期清理(StateBackend支持)。可以通过StateTtlConfig配置关闭后台清理:

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();

全量快照时进行清理

可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot().build();

增量数据清理

在状态访问或处理时进行,会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

 StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1))/*** @cleanupSize 每次清理时检查状态的条目数,在每个状态访问时触发* @runCleanupForEveryRecord  表示是否在处理每条记录时触发清理*/.cleanupIncrementally(10, true).build();

在RocksDB压缩时清理

如果使用RocksDBstatebackend,则会启用Flink为RocksDB定制的压缩过滤器。RocksDB会周期性的对数据进行合并压缩从而减少存储空间。Flink提供的RocksDB压缩过滤器会在压缩时过滤掉已经过期的状态数据。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000).build();

使用示例

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态ValueState<Integer> lastState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建StateTtlConfigStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s
//                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入更新 时更新过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入更新 时更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// 状态描述器 启用TTLValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastState", Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 获取状态值Integer lastValue = lastState.value();out.collect("keyBy:" + value.f0 + " 状态值: " + lastValue);// 更新状态lastState.update(value.f1);}}

快速输入测试数据

key1,1
key1,2
key1,4
keyBy:key1 状态值: null
keyBy:key1 状态值: 1
keyBy:key1 状态值: 2

等待超过5秒输入测试数据

key1,6
keyBy:key1 状态值: null

状态后端 State Backend

概述

状态后端是 Flink 中用于管理和持久化状态数据的机制。状态后端负责将算子状态和键控状态Keyed State存储在可靠且可恢复的存储系统中,并提供对状态数据的读取和写入操作。

状态后端主要负责管理本地状态的存储方式和位置

可用状态后端

Flink内置了以下这些开箱即用的state backends :

如果不设置,默认使用HashMapStateBackend。两种状态后端最大的区别,就在于本地状态存放在哪里

HashMapStateBackend: 哈希表状态后端EmbeddedRocksDBStateBackend:内嵌RocksDB状态后端

1.HashMapStateBackend

在HashMapStateBackend内部,数据以Java对象的形式存储在堆中。Key/value形式的状态和窗口算子会持有一个hash table,其中存储着状态值、触发器。

具有以下特点:

高性能:由于状态存储在内存中,哈希表状态后端提供极快的数据读取和写入性能低延迟:状态的访问速度非常快,因为无需进行磁盘或网络访问低容错性:哈希表状态后端不提供持久化能力,即在故障发生时可能会丢失状态数据。适用于开发和调试环境,或对数据一致性要求较低的场景

2.EmbeddedRocksDBStateBackend

将状态数据存储在硬盘上的RocksDB数据库中,RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置开启后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。

RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。

不同于HashMapStateBackend中的java对象,状态数据被以序列化字节数组的方式存储,需要序列化、反序列化,因此key之间的比较是以字节序的形式进行而不是使用Java的调用.hashCode()和.equals()方法。

执行是异步快照,不会因为保存检查点而阻塞数据的处理,并且还提供了增量式保存检查点的机制,在很多情况下可以大大提升保存效率。

具有以下特点:

持久化和可恢复性:内嵌RocksDB状态后端可将状态数据持久化到磁盘,并在故障发生时能够恢复状态数据高容量:由于状态存储在磁盘上,内嵌RocksDB状态后端可以处理大规模的状态数据中等性能:相较于哈希表状态后端,内嵌RocksDB状态后端的读写性能略低。但由于RocksDB是一个高效的键值存储引擎,它仍然提供了相对较好的读写性能

状态后端的配置

默认状态后端是由集群配置文件flink-conf.yaml指定的,配置的键名称为state.backend

默认配置对集群上运行的所有作业都有效,可以通过更改配置值来改变默认的状态后端。还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

1.配置默认全局的状态后端

flink-conf.yaml中,可以使用state.backend来配置默认状态后端。

# 默认状态后端,哈希表状态后端
state.backend.type: hashmap# 内嵌RocksDB状态后端
state.backend.type: rocksdb# 定义检查点和元数据写入的目录
state.checkpoints.dir: hdfs://node01:8020/flink/checkpoints

2.设置每个Job的状态后端

使用hashmap状态后端

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
HashMapStateBackend hashMapStateBackend = new HashMapStateBackend();env.setStateBackend(hashMapStateBackend);

使用rocksdb状态后端

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();env.setStateBackend(embeddedRocksDBStateBackend);

注意:Flink发行版中默认包含了RocksDB(解压的Flink安装包),在IDE中使用rocksdb状态后端,需要为Flink项目添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
</dependency>

3.提交参数指定

flink run-application -t yarn-application-p 2 -Dstate.backend.type=rocksdb -c 全类名 jar包

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/135104.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

换服还是掀桌?哪条才是程序员的出路?

站在时代的风口浪尖&#xff0c;猪都能起飞。 大数据互联网正是时代的宠儿&#xff0c;IT行业的发展也正如火如荼。 人人都眼红程序员的高薪资&#xff0c;认为他们吃着时代的红利。 但是三百六十行&#xff0c;行行出社畜。”996“也好&#xff0c;甚至"007"也罢…

OpenAI开发者大会掀起风暴:GPT模型价格狂降50%,应用商店即将亮相,AI技术将引爆全球!

OpenAI首届开发者大会召开了&#xff01; 关键信息&#xff1a; GPT-4升级版GPT-4 Turbo来了&#xff0c;上下文窗口达到128k&#xff0c;为GPT-4的4倍&#xff1b;OpenAI还降低了几乎所有模型的API使用价格&#xff0c;整体便宜了一半多&#xff1b;GPT-4系列的多模态能力向B…

竞赛选题 深度学习手势识别 - yolo python opencv cnn 机器视觉

文章目录 0 前言1 课题背景2 卷积神经网络2.1卷积层2.2 池化层2.3 激活函数2.4 全连接层2.5 使用tensorflow中keras模块实现卷积神经网络 3 YOLOV53.1 网络架构图3.2 输入端3.3 基准网络3.4 Neck网络3.5 Head输出层 4 数据集准备4.1 数据标注简介4.2 数据保存 5 模型训练5.1 修…

ZYNQ_project:key_beep

通过按键控制蜂鸣器工作。 模块框图&#xff1a; 时序图&#xff1a; 代码&#xff1a; /*1位按键消抖 */ module key_filter (input wire sys_clk ,input wire sys_rst_n ,input wire key_in ,output …

AM@向量代数@向量基本概念和向量线性运算

文章目录 abstract向量的基本概念向量向量的坐标分解式和坐标&#x1f47a;向量的模向量的长度(大小)&#x1f47a;零向量单位向量&#x1f47a;方向向量非零向量的单位向量正规化向量夹角&#x1f47a; 向量方向角和向量间夹角投影几何描述向量的线性运算向量的加减运算向量的…

Linux中固定ip端口和修改ip地址

一&#xff0c;更改虚拟网络编辑器 1&#xff0c;首先启动VMware&#xff0c;选择自己要更改ip或固定ip的虚拟机&#xff0c;并找到虚拟网络配编辑器&#xff0c;点击进入 2&#xff0c;进入之后需要点击右下角获取管理员权限后才能修改&#xff0c;有管理员权限之后图片如下 …

技术分享 | app自动化测试(Android)--元素定位方式与隐式等待

元素定位是 UI 自动化测试中最关键的一步&#xff0c;假如没有定位到元素&#xff0c;也就无法完成对页面的操作。那么在页面中如何定位到想要的元素&#xff0c;本小节讨论 Appium 元素定位方式。 Appium的元素定位方式 定位页面的元素有很多方式&#xff0c;比如可以通过 I…

python使用selenium做自动化,最新版Chrome与chromedriver不兼容

目前Chrome版本是118.0.5993.118 下方是版本对应的下载地址&#xff1a; chrome版本118&#xff1a; https://download.csdn.net/download/qq_35845339/88510476 chrome版本119&#xff1a; chromedriverlinux64https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testin…

华为取消6000万订单影响在扩大,高通嘴硬强调不受影响

高通公布了2023年第三季度的业绩&#xff0c;业绩显示营收下滑24%&#xff0c;净利润下滑36%&#xff0c;不过高通强调预计今年四季度业绩将回升&#xff0c;意思是说华为取消订单带来的影响较小。 一、高通处境不利已延续4年时间 2019年美国对华为采取措施&#xff0c;众多中国…

优雅设计之美:实现Vue应用程序的时尚布局

本文为翻译文章&#xff0c;原文链接&#xff1a; ** https://fadamakis.com/clean-layout-architecture-for-vue-applications-a738201a2a1e 前言 页面布局是减少代码重复和创建可维护且具有专业外观的应用程序的基本模式。如果使用的是Nuxt&#xff0c;则可以提供开箱即用…

11.8旧有报错与修改

我将uart_done&#xff08;出问题的信号&#xff09;的变量类型设为reg了&#xff0c;也就是我是reg uart_done这个信号的&#xff0c;这样做是错误的&#xff0c;哪怕你在接收模块确实定义的是reg类型&#xff0c;但是在顶层模块的时候&#xff0c;它可以视为是一条单纯的线而…

oled显示器程序(IIC)从stm32f103移植到stm32f429出现bug不显示-解决移植失败问题

出现问题处&#xff1a; 刚开始更换了这两行代码&#xff0c;然后更换位置后&#xff0c;oled正常显示&#xff0c;如下为正确顺序 I2C_Configuration();//配置CPU的硬件I2COLED_Init();//OLED初始化 在这段代码中&#xff0c;I2C_Configuration() 函数用于配置CPU的硬件 I2C…

AJAX-解决回调函数地狱问题

一、同步代码和异步代码 1.同步代码 浏览器是按照我们书写代码的顺序一行一行地执行程序的。浏览器会等待代码的解析和工作&#xff0c;在上一行完成之后才会执行下一行。这也使得它成为一个同步程序。 总结来说&#xff1a;逐行执行&#xff0c;需原地等待结果后&#xff0…

深度学习之基于YoloV5-Deepsort人物识别与追踪系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 YoloV5-Deepsort是一种基于深度学习的人物识别与追踪系统&#xff0c;具有较高的准确率和实时性能。 YoloV5是一种…

【算法与数据结构】77、LeetCode组合

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;如果k是固定的&#xff0c;最直接的方法就是建立k个for循环&#xff0c;将结果全部压入result容器中。…

一篇文章带你使用(MMKV--基于 mmap 的高性能通用 key-value 组件)

一、MMKV是什么&#xff1f; MMKV 是基于 mmap 内存映射的 key-value 组件&#xff0c;底层序列化/反序列化使用 protobuf 实现&#xff0c;性能高&#xff0c;稳定性强。也是腾讯微信团队使用的技术。 支持的数据类型 支持以下 Java 语言基础类型&#xff1a; boolean、int…

第23章(上)_索引原理之索引与约束

文章目录 索引索引分类主键选择索引的代价 约束外键约束约束与索引的区别 索引使用场景不要使用索引的场景总结 索引 索引的概念&#xff1a;索引是一种有序的存储结构。索引按照单个或多个列的值进行排序。 索引的目的&#xff1a;提升搜索效率。 索引分类 按照数据结构分为…

蓝桥杯双周赛算法心得——串门(双链表数组+双dfs)

大家好&#xff0c;我是晴天学长&#xff0c;树和dfs的结合&#xff0c;其邻接表的存图方法也很重要。需要的小伙伴可以关注支持一下哦&#xff01;后续会继续更新的。&#x1f4aa;&#x1f4aa;&#x1f4aa; 1) .串门 2) .算法思路 串门&#xff08;怎么存图很关键&#xf…

TLS回调函数

TLS在逆向中的作用 TLS回调函数常用于反调试 TLS先于EP代码执行 TLS是什么 TLS是各线程的独立的数据存储空间 使用TLS技术可以在线程内部独立使用或修改进程的全局数据或静态数据 创建和终止某进程时&#xff0c;TLS回调函数都会自动调用执行 使用OD调试TLS函数

ElasticSearch与Lucene是什么关系?Lucene又是什么?

一. ElasticSearch 与 Lucene 的关系 Elasticsearch&#xff08;ES&#xff09;和Apache Lucene之间有密切的关系&#xff0c;可以总结如下&#xff1a; Elasticsearch构建于Lucene之上&#xff1a;Elasticsearch实际上是一个分布式的、实时的搜索和分析引擎&#xff0c;它构建…