11、Flink 的 Keyed State 详解

1.Keyed DataStream

使用 keyed state,首先需要为DataStream指定 key(主键);这个 key 用于状态分区(数据流中的 Record 也会被分区)可以使用 DataStream 中 Java/Scala API 的 keyBy(KeySelector) 或者是 Python API 的 key_by(KeySelector) 来指定 key,将生成 KeyedStream

Key selector 函数接收单条 Record 作为输入,返回这条记录的 key,该 key 可以为任何类型,但是它的计算产生方式必须具备确定性,Flink 的数据模型不基于 key-value 对,将数据集在物理上封装成 key 和 value 是没有必要的,Key 是“虚拟”的,用以操纵分组算子。

案例: key selector 函数。

// some ordinary POJO
public class WC {public String word;public int count;public String getWord() { return word; }
}DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words.keyBy(WC::getWord);

Flink 有两种不同定义 key 的方式

可以通过 tuple 字段索引,或者选取对象字段的表达式来指定 key 即 Tuple Keys 和 Expression Keys。

2.使用 Keyed State
a)概述

keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下,即这些状态仅可在 KeyedStream 上使用,在Java/Scala API上可以通过 stream.keyBy(...) 得到 KeyedStream,在Python API上可以通过 stream.key_by(...) 得到 KeyedStream

支持的状态类型如下

  • ValueState: 保存一个可以更新和检索的值(每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
  • ListState: 保存一个元素的列表,可以往这个列表中追加数据,并在当前的列表上进行检索,通过 add(T) 或者 addAll(List) 添加元素,通过 Iterable get() 获得整个列表,还可以通过 update(List) 覆盖当前的列表。
  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合,使用 add(T) 增加的元素会用提供的 ReduceFunction 进行聚合。
  • AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合,和 ReducingState 相反的是,聚合类型可能与添加到状态的元素的类型不同,使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
  • MapState: 维护了一个映射列表,可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器,使用 put(UK,UV) 或者 putAll(Map) 添加映射,使用 get(UK) 检索特定 key,使用 entries()keys()values() 分别检索映射、键和值的可迭代视图,还可以通过 isEmpty() 来判断是否包含任何键值对。

所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

状态对象仅用于与状态交互,状态本身不一定存储在内存中,还可能在磁盘或其他位置;从状态中获取的值取决于输入元素所代表的 key,在不同 key 上调用同一个接口,可能得到不同的值。

必须创建一个 StateDescriptor,才能得到对应的状态句柄,它保存了状态名称, 状态所持有值的类型,可能包含用户指定的函数,例如ReduceFunction,根据不同的状态类型,可以创建ValueStateDescriptorListStateDescriptorAggregatingStateDescriptor, ReducingStateDescriptorMapStateDescriptor

状态通过 RuntimeContext 进行访问,只能在 rich functions 中使用,RichFunctionRuntimeContext 提供如下方法

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

案例FlatMapFunction 使用状态

实现了计数窗口,把元组的第一个元素当作 key,该函数将出现的次数以及总和存储在 “ValueState” 中,一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。

注意:会为每个不同的 key(元组中第一个元素)保存一个单独的值。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type informationTuple2.of(0L, 0L)); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);}
}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value -> value.f0).flatMap(new CountWindowAverage()).print();// the printed output will be (1,4) and (1,5)
b)状态有效期 (TTL)
参数配置配置2配置3
状态的可见性NeverReturnExpired 不返回过期数据ReturnExpiredIfNotCleanedUp 会返回过期但未清理的数据
TTL 更新策略OnCreateAndWrite 仅在创建和写入时更新OnReadAndWrite 读取和写入时更新
状态清理策略cleanupFullSnapshot 全量快照时进行清理cleanupIncrementally 增量数据清理cleanupInRocksdbCompactFilter RocksDB 压缩过滤器

任何类型的 keyed state 都可以有 有效期 (TTL),如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值,所有状态类型都支持单元素的 TTL,列表元素和映射元素将独立到期。

在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象,然后把配置传递到 state descriptor 中启用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL 配置有以下几个选项

newBuilder 的第一个参数表示数据的有效期,是必选项;

TTL 的更新策略(默认是 OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新

  • StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新

    (注意: 如果同时将状态的可见性配置为 StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp, 那么在PyFlink作业中,状态的读缓存将会失效,这将导致一部分的性能损失)

数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据

    (注意: 在PyFlink作业中,状态的读写缓存都将失效,这将导致一部分的性能损失)

  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据

NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除,这对于不能访问过期数据的场景下非常有用,比如敏感数据, ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。

注意:

  • 状态上次的修改时间会和数据一起保存在 state backend 中,开启该特性会增加状态数据的存储;Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。
  • 暂时只支持基于 processing time 的 TTL。
  • 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
  • TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
  • 不建议 checkpoint 恢复前后将 state TTL 从短调长,这可能会产生潜在的数据错误。
  • 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null;如果用户值序列化器不支持 null, 可以用 NullableSerializer 包装一层。
  • 启用 TTL 配置后,StateDescriptor 中的 defaultValue(已标记 deprecated)将会失效,在此基础上,用户需要手动管理那些实际值为 null 或已过期的状态默认值。
c)过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,会有后台线程定期清理(需要 StateBackend 支持)可以通过 StateTtlConfig 配置关闭后台清理。

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).disableCleanupInBackground().build();

可以配置更细粒度的后台清理策略,当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

d)全量快照时进行清理

可以启用全量快照时进行清理的策略,可以减少整个快照的大小,当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据,该策略可以通过 StateTtlConfig 进行配置。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupFullSnapshot().build();

这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。

注意:

  • 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。
e)增量数据清理

可以选择增量式清理状态数据,在状态访问或/和处理时进行,如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器,每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupIncrementally(10, true).build();

该策略有两个参数

  • 第一个参数表示每次清理时检查状态的条目数,在每个状态访问时触发;
  • 第二个参数表示是否在处理每条记录时触发清理,Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

注意:

  • 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
  • 增量清理会增加数据处理的耗时。
  • 现在仅 Heap state backend 支持增量清除机制,在 RocksDB state backend 上启用该特性无效。
  • 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用,但异步快照则没有这个问题。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
f)在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器,RocksDB 会周期性的对数据进行合并压缩从而减少存储空间,Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)).build();

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 可以通过 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数;时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能;RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目,比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中,该功能可以确保文件定期通过压缩过滤器压缩,可以通过StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime) 方法设定定期压缩的时间,定期压缩的时间的默认值是 30 天,可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。

还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 压缩时调用 TTL 过滤器会降低速度,TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查,对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
  • 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
  • 定期压缩功能只在 TTL 启用时生效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/6440.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

搜好货API接口:快速获取商品列表的利器

搜好货商品列表API接口允许开发者根据关键字搜索并获取相关的商品列表数据。接口支持多种参数配置&#xff0c;可以根据需求灵活调整搜索条件和结果返回格式。 点击获取key和secret API接口请求说明 请求地址&#xff1a;https://api.souhaohuo.com/goods/search请求方法&…

速卖通关键字搜索API接口:快速获取商品列表的利器

速卖通关键字搜索API接口允许开发者根据用户输入的关键字进行商品搜索&#xff0c;并返回与之相关的商品列表。通过调用该接口&#xff0c;您可以快速获取与关键字匹配的商品信息&#xff0c;包括商品标题、价格、图片等&#xff0c;为您的电商业务提供有力支持。 三、API接口…

以信息挖掘为关键技术的智慧校园建设

随着信息技术的快速发展&#xff0c;数据信息资源以井喷的姿态涌现。数据信息的大量涌现给人们带来丰富的数据信息资源&#xff0c;但面对海量的信息资源时&#xff0c;加大了人们对有效信息资源获取的难度&#xff0c;数据挖掘技术正是这一背景下的产物&#xff0c;基于数据挖…

【Redis】Redis安装、配置、卸载使用可视化工具连接Redis

文章目录 1.前置条件2.安装Redis2.1下载Redis安装包并解压2.2在redis目录下执行make命令2.3修改Redis配置文件2.4启动Redis服务2.5连接redis服务 3.Redis卸载4.使用可视化工具连接Redis 1.前置条件 Linux操作系统需要要是64位.如果不清楚自己Linux上是多少位的,可以使用以下命…

C语言之详细讲解文件操作(抓住文件操作的奥秘)

什么是文件 与普通文件载体不同&#xff0c;文件是以硬盘为载体存储在计算机上的信息集合&#xff0c;文件可以是文本文档、图片、程序等等。文件通常具有点三个字母的文件扩展名&#xff0c;用于指示文件类型&#xff08;例如&#xff0c;图片文件常常以KPEG格式保存并且文件…

一文了解复杂度

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、算法效率二、时间复杂度1.定义2.大O的渐进表示法3.一般常见复杂度4.实例 三、空间复杂度1.定义2.空间复杂度计算3.实例 总结 前言 计算复杂性理论&#xf…

Redis的持久化方法,各自优缺点,怎么选择?

持久化&#xff1a; redis基于内存是数据库&#xff0c;内容存到内存中&#xff0c;也可以存到硬盘中&#xff0c;这个过程就叫持久化。有两种方案&#xff0c;RDB和AOP两种。 RDB RDB持久化就是把当前进程数据生成快照保存到硬盘的过程RDB文件是⼀个压缩的二进制文件&#…

VisualGLM-6B微调(V100)

Visualglm-6b-CSDN博客文章浏览阅读1.3k次。【官方教程】XrayGLM微调实践&#xff0c;&#xff08;加强后的GPT-3.5&#xff09;能力媲美4.0&#xff0c;无次数限制。_visualglm-6bhttps://blog.csdn.net/u012193416/article/details/131074962?ops_request_misc%257B%2522req…

使用Axios从前端上传文件并且下载后端返回的文件

前端代码&#xff1a; function uploadAndDownload(){showLoading();const fileInput document.querySelector(#uploadFile);const file fileInput.files[0];const formData new FormData()formData.append(file, file)return new Promise((resolve, reject) > {axios({…

【经典论文阅读1】FM模型——搜推算法里的瑞士军刀

全文由『说文科技』原创出品&#xff0c;文章同步更新于公众号『说文科技』。版权所有&#xff0c;翻版必究。 FM模型发表于2010年&#xff0c;它灵活好用且易部署。作者行文极其流畅&#xff0c;作者首先对要处理的问题进行介绍&#xff0c;接着作者提出FM模型&#xff0c;这…

Vue单页面应用和多页面应用的区别

概念&#xff1a; SPA单页面应用&#xff08;SinglePage Web Application&#xff09;&#xff0c;指只有一个主页面的应用&#xff0c;一开始只需要加载一次js、css等相关资源。所有内容都包含在主页面&#xff0c;对每一个功能模块组件化。单页应用跳转&#xff0c;就是切换…

Grad-CAM(梯度加权类激活图)

Grad-CAM&#xff08;Gradient-weighted Class Activation Mapping&#xff09;是一种可视化技术&#xff0c;用于解释卷积神经网络&#xff08;CNN&#xff09;的决策过程。它通过生成类激活图&#xff08;Class Activation Map&#xff0c;CAM&#xff09;来突出显示对网络预…

笔试强训-day17_T2 十字爆破

一、题目链接 十字爆破 二、题目描述 牛牛在玩一个游戏&#xff1a; 一共有n行m列共nm个方格&#xff0c;每个方格中有一个整数。 牛牛选择一个方格&#xff0c;可以得到和这个方格同行、同列的所有数之和的得分。 例如&#xff1a;对于一个22的方格&#xff1a; 1 2 3 4 牛牛…

Python字符串常用方法(全网最细,仅此一份)

🥇作者简介:CSDN内容合伙人、新星计划第三季Python赛道Top1 🔥本文已收录于Python系列专栏: 👉Python从入门到精通 💬订阅专栏后可私信博主进入Python学习交流群,进群可领取Python180G全栈视频教程以及Python相关电子书合集 😊私信未回可以加V:hacker0327 备注P…

Nodejs-异步并发控制

异步并发控制 在 node 中可以利用异步发起并行调用。但是如果并发量过大&#xff0c;就会导致下层服务器吃不消。 bagpipe 解决方案 解决方案 通过一个队列来控制并发量如果当前活跃的异步调用小于限定值&#xff0c;从队列中取出执行如果活跃调用达到限定值&#xff0c;调…

文心一言 VS 讯飞星火 VS chatgpt (251)-- 算法导论18.2 4题

四、假设关键字 {1&#xff0c;2&#xff0c;…&#xff0c;n} 被插入一棵最小度数为 2 的空 B 树中&#xff0c;那么最终的B树有多少个结点&#xff1f;如果要写代码&#xff0c;请用go语言。 文心一言&#xff1a; B树&#xff08;B-tree&#xff09;是一种自平衡的树&…

对于Java和mysql的时间类型不一样而引发的问题应该怎么处理

在Java和MySQL中&#xff0c;时间类型有一些不同&#xff0c;可能会导致一些问题。一种常见的情况是Java中的时间类型与MySQL中的时间类型不匹配&#xff0c;例如Java中的java.sql.Timestamp和MySQL中的TIMESTAMP类型之间的差异。这种情况下&#xff0c;可以采取以下处理方法&a…

深入探索微信小程序:图像处理与优雅预览的艺术

深入探索微信小程序&#xff1a;图像处理与优雅预览的艺术 微信小程序中的图片基础一、图片上传与压缩二、图片预览技巧三、图片处理进阶&#xff1a;Canvas与滤镜四、性能优化与最佳实践参考资料 微信小程序中的图片基础 图片资源存储&#xff1a;本地资源与网络资源的使用区…

Linux系统下设置命令的别名

给常用Docker命令起别名&#xff0c;方便我们访问&#xff1a; 第一步&#xff1a;修改/root/.bashrc文件 vi /root/.bashrc第二版&#xff1a;添加别名 # .bashrc # User specific aliases and functionsalias rmrm -i alias cpcp -i alias mvmv -i alias dpsdocker ps --f…

sql数据库——增删改

1.插入 insert into <表><字段1&#xff0c;字段2&#xff0c;字段3> values<值1&#xff0c;值2&#xff0c;值3> 2.更新/修改 update <表名> set 修改字段1值1&#xff0c;修改字段2值2&#xff0c;修改字段3值3&#xff0c; 3.删除 delete …