【Apache Flink】实现有状态函数

文章目录

  • 在RuntimeContext 中声明键值分区状态
  • 通过ListCheckPonitend 接口实现算子列表状态
  • 使用CheckpointedFunction接口
  • 接收检查点完成通知
  • 参考文档

在RuntimeContext 中声明键值分区状态

Flink为键值分区状态(Keyed State)提供了几种不同的原语(数据类型)。这是因为不同的算法和操作可能需要管理不同类型的状态。其中一些原语包括:

  1. ValueState: 这种状态类型用于存储单个的,可能更新的值。常见的用途包括存储计数器或聚合。

  2. ListState: 这种状态用于存储一组元素(通常是元素的长列表)。借助此状态,可以简单地追加元素和迭代所有元素。

  3. ReducingStateAggregatingState<IN, OUT>: 这两种状态都用于合并元素,通常在窗口操作中使用。

    • ReducingState:将添加的元素与现有元素通过reduce函数进行合并,最后只会保留一个元素,即合并的结果。

    • AggregatingState:与ReducingState类似,但是其可以存储转换后的聚合结果,而不是输入元素。

  4. MapState<UK, UV>: 这种状态类型存储一个key-value映射。

要使用某一类型的 keyed state,需要提供一个 StateDescriptor,用于声明状态的名称和类型。然后可以通过 RuntimeContext 获取状态。

这些状态类型都是接口,并将存储后端(Flink提供了内存和RocksDB两种用于存储状态的后端)的具体实现细节隔离出来,因此用户可以不用关心状态是如何存储和访问的。

Flink 的键控状态使我们能够通过简单的API调用,就能够很自然地处理键控数据流,我们只需要关心特定键的当前事件和状态,Flink 框架会自动地处理状态的分布式存储和故障恢复等

我们需要了解在 Flink 中,RuntimeContext 提供访问在运行期间的任务 (比如 Map、Reduce 或 Filter function) 可以访问的上下文信息,例如任务的并行度,任务名称,任务 ID,输入和输出信号等。此外,RuntimeContext 还为用户代码提供了生成和维护分布式累加器和键值状态的方法。

在 Apache Flink 中,键值状态(Keyed State)是一种类型的状态,它是以 key 为中心的。每一个 key 都可以对应一个状态。我们可以在 Flink 算子的open()方法中通过 RuntimeContext 获取和初始化它。

举个例子,假设我们正在构建一个实时的网络游戏分析系统,我们可能关注每位玩家的实时得分,这个得分基于他们在游戏中执行的动作(例如完成一项任务,击败一个敌人等)。在这个场景中,每个玩家的ID就是一个 "键",同时他们的游戏得分就是与键关联的 "状态"。当玩家在游戏中执行动作时,我们需要调整他们的分数状态

然后,我们的 Flink 代码可以定义一个 RichMapFunction 来维护每个玩家的分数状态:

public class PlayerScoreFunction extends RichMapFunction<GameEvent, Tuple2<String, Long>> {// 定义键控状态private transient ValueState<Long> scoreState;@Overridepublic void open(Configuration params) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("playerScore", // 状态的名称TypeInformation.of(new TypeHint<Long>() {}),0L); // 默认值scoreState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple2<String, Long> map(GameEvent gameEvent) throws Exception {// update the statelong currentScore = scoreState.value();currentScore += gameEvent.getScore();scoreState.update(currentScore);// return the updated scorereturn new Tuple2<>(gameEvent.getPlayerId(), currentScore);}
}

在这个例子中,PlayerScoreFunction 接收 GameEvent 流,这是玩家在游戏中的各种动作生成的事件。我们将玩家的 ID 作为键来处理这个流。通过 getRuntimeContext().getState(descriptor) 我们获得了状态。然后我们在每次新的 GameEvent 到来时,根据事件中的分数增量用 scoreState.update(currentScore) 更新状态,然后将更新后的得分以及玩家的 ID 一起输出给下一个算子,例如,连接到实时的游戏分数仪表盘,将每个玩家的最新得分显示给观众看。

通过ListCheckPonitend 接口实现算子列表状态

算子状态(Operator State)在流处理系统(比如 Apache Flink)中,是一种特殊类型的状态,针对的是整个算子,而不是特定的键值。它存储的是某一特定算子的所有记录的全局信息。

算子状态的维护主要包括以下步骤:

  1. 定义算子状态:首先,我们需要在处理函数中定义一个或多个算子状态。我们可以指定算子状态的名字,并定义它存储的数据类型。

  2. 读取和写入算子状态:一旦定义了算子状态,我们就可以在流处理函数中对它进行读取和写入。读取算子状态通常在需要根据状态信息做出处理决策时进行。写入算子状态通常在我们需要更新状态信息时进行。

  3. 保持状态一致:为了保持状态的一致性,我们需要定期将算子状态进行快照(Snapshot)并保存到远程存储系统中。在系统中断后,我们可以从最新的快照恢复算子状态。

  4. 状态恢复:在系统中断后,我们可以使用保存的快照恢复算子状态,恢复流处理的执行。

维护算子状态的方法可能会根据具体的流处理系统有所不同,但基本原理是相同的。这四步是维护算子状态的基本过程。

在 Flink 中,ListState 是 CheckpointedState 的一种。ListState 可以为每一条数据保存不止一个值,也就是说,所有的数据都会添加到该状态中。在故障恢复时,这些元素按添加的顺序重放。我们从 CheckpointedFunctionListCheckpointed 接口的抽象类型继承,然后实现 snapshotStaterestoreState 方法,以完成状态恢复。

具体来说,如果我们想使用 ListCheckpointed 接口实现算子列表状态,可以参考以下的代码:

我们每次接收到未序列化的 String 类型的数值,就把它转成 Integer 类型存储在一个列表(List)中。在每个 Checkpoint 操作当中,通过 snapshotState 方法进行状态的快照并返回。当故障发生后,Flink 会调用 restoreState 方法将状态恢复回来。

如果算子是并行的,Flink 会为每一个子任务调用 restoreState 方法,并在算子的每个子任务中创建一个新的列表状态实例。在故障后进行状态恢复时,Flink 将提取快照并将其分发到每个子任务。

public class ListStateFunction extends RichMapFunction<String, Integer> implements ListCheckpointed<Integer> {private List<Integer> bufferElements;public ListStateFunction(){this.bufferElements = new ArrayList<>();}@Overridepublic Integer map(String value) throws Exception {int parsedValue = Integer.parseInt(value);bufferElements.add(parsedValue);return bufferElements.size();}// 每次 checkpoint 时,将缓存的元素进行快照@Overridepublic List<Integer> snapshotState(long checkpointId, long timestamp)  {return this.bufferElements;}// 从存储中恢复状态@Overridepublic void restoreState(List<Integer> state) {this.bufferElements.addAll(state);}
}

使用 ListCheckpointed 还是 CheckpointedFunction 取决于特定的需求和上下文,两者在功能上是相似的,但 CheckpointedFunction 提供了更多的灵活性,可以让你自己决定如何存储和恢复状态以及存储于哪种类型的状态后端。

使用CheckpointedFunction接口

Apache Flink提供了一个特殊的接口CheckpointedFunction,可以在自定义函数中使用它来操作和管理算子状态。这个接口会在检查点(checkpoint)操作时触发,允许访问和编辑操作员状态。

h使用CheckpointedFunction的例子:

public class CountWithCheckpoint implements CheckpointedFunction, MapFunction<Long, Long> {private transient ValueState<Long> counter;@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("counter", TypeInformation.of(new TypeHint<Long>() {}));counter = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(Long value) throws Exception {Long currentCount = counter.value();Long newCount = currentCount == null ? 1L : currentCount + 1;counter.update(newCount);return newCount;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {counter.clear();}
}

此示例创建一个计数但在每个检查点清空的函数。initializeState()方法会在各种生命周期事件(例如,开始和恢复)时调用并初始化状态变量。然后在map()方法中,状态被更新。snapshotState()在checkpoint操作时触发,这里我们仅清空状态,无任何持久化操作。

在操作和维护算子状态时,我们需要考虑状态的一致性和恢复,以处理可能的故障和中断。实际中可能会对snapshotState()方法更复杂的逻辑,比如将状态存储至远端。

接收检查点完成通知

在Apache Flink中,当所有任务成功从接头位置创建检查点后,作业管理器将坐标控制条以通知所有任务检查点的成功完成。然后,所有任务都会得到一个新的检查点的完成通知。

如果要接收这样的通知并对其做出反应,可以让你的RichFunction实现CheckpointListener接口。以下是一个基本示例:

函数使用ListState进行状态管理,每个接收到的元素都会被添加到状态中。并且,我们实现了notifyCheckpointComplete(long checkpointId)函数,以便在每次成功完成检查点后接收到通知。这个函数里你可以进行一些操作如清除状态、更新外部系统等。

触发的notifyCheckpointComplete方法是在下一次checkpoint发生在Task周的快照操作之前,具体的实现要根据你的检查点配置和故障恢复能力进行规划。

public class MyFunction extends RichMapFunction<Long, Long> implements CheckpointListener {private transient ListState<Long> checkpointedState;@Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("state", Long.class);checkpointedState = getRuntimeContext().getListState(descriptor);}@Overridepublic Long map(Long value) throws Exception {checkpointedState.add(value);return value;}@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {// 监听到检查点成功完成的通知,此处可以进行相关逻辑处理}
}

参考文档

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/124306.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【洛谷 P1106】删数问题 题解(贪心+字符串)

删数问题 题目描述 键盘输入一个高精度的正整数 N N N&#xff08;不超过 250 250 250 位&#xff09;&#xff0c;去掉其中任意 k k k 个数字后剩下的数字按原左右次序将组成一个新的非负整数。编程对给定的 N N N 和 k k k&#xff0c;寻找一种方案使得剩下的数字组成…

Arduino驱动ME007-ULA防水测距模组(超声波传感器)

目录 1、传感器特性 2、控制器和传感器连线图 3、驱动程序 3.1、读取串口数据

css:如何通过不同的值,改变盒子的样式和字体颜色通过computed而不是v-if

在使用uniapp编写功能时&#xff0c;可以通过computed方法来实现根据num这个值也可以是后端传过来的值只要是number类型都可以。不同取值来修改盒子的背景颜色和字体颜色。首先&#xff0c;在data中定义一个num来存储当前的值&#xff0c;然后在computed中创建一个样式对象&…

如何在 uniapp 里面使用 pinia 数据持久化 (pinia-plugin-persistedstate)

想要在 uniapp 里面使用 pinia-plugin-persistedstate 会遇到的问题就是 uniapp里面没有浏览器里面的 sessionStorage localStorage 这些 api。 我们只需要替换掉 pinia-plugin-persistedstate 默认的储存 api 就可以了。使用 createPersistedState 重新创建一个实例, 把里面的…

Windows server服务器允许多用户远程的设置

在Windows Server上允许多用户同时进行远程桌面连接&#xff0c;您需要配置远程桌面服务以支持多用户并确保许可证和授权允许多用户连接。以下是在Windows Server上允许多用户远程桌面连接的步骤&#xff1a; 注意&#xff1a;这些步骤适用于 Windows Server 2012、Windows Ser…

开源B2B网站电子商务平台源码下载搭建 实现高效交易的桥梁

随着互联网的普及和电子商务的快速发展&#xff0c;B2B&#xff08;Business-to-Business&#xff09;网站电子商务平台在商业领域中发挥着越来越重要的作用。通过开源B2B网站电子商务平台源码搭建&#xff0c;企业可以构建自己的电子商务平台&#xff0c;实现高效交易的桥梁。…

Oracle修改带数据的字段类型

insert into TNW_FUND_SELORG(TFDINFOID,TSOINFOID) select TFD_INFO_ID,TSO_INFO_ID from TFD_SEL_FUNDLINK_TO_OLDFUNDWEB_DB /*修改原字段名*/ ALTER TABLE 表名 RENAME COLUMN 字段名 TO 字段名1; /*添加一个和原字段同名的字段*/ ALTER TABLE 表名 ADD 字段名 VARCHAR…

轻量级 IDE 文本编辑器 Geany 发布 2.0

Geany 是功能强大、稳定、轻量的开发者专用文本编辑器&#xff0c;支持 Linux、Windows 和 macOS&#xff0c;内置支持 50 多种编程语言。 2005 年Geany 发布首个版本 0.1。上周四刚好是 Geany 诞生 18 周年纪念日&#xff0c;官方发布了 2.0 正式版以表庆祝。 下载地址&#…

骨传导耳机优缺点是什么,这几点骨感耳机的利与弊一定得知道!

随着近几年骨感耳机的风头逐渐兴起&#xff0c;骨感耳机受到了不少人的关注&#xff0c;并且存在很多人对于骨感耳机的利与弊还存在着一定的盲点&#xff0c;下面让我来给大家讲解一下。 骨感耳机的利&#xff1a; 1、不入耳的设计对耳道的损伤更小 骨感耳机采用一种独特的声…

好用工具分享 | tmux 终端会话分离工具

目录 1 tmux的安装 2 tmux的基本操作 2.1 启动与退出 2.2 分离会话 2.3 查看会话 2.4 重接会话 2.5 杀死会话 2.6 切换会话 tmux是一个 terminal multiplexer&#xff08;终端复用器&#xff09;&#xff0c;它可以启动一系列终端会话。 我们使用命令行时&#xff0c;…

vscode免密码认证ssh连接virtual box虚拟机

文章目录 安装软件virtual box配置vscode配置创建并传递密钥连接虚拟机最后 安装软件 安装vscode和virtual box&#xff0c;直接官网下载对应软件包&#xff0c;下载之后&#xff0c;点击执行&#xff0c;最后傻瓜式下一步安装即可 virtual box配置 创建一个仅主机网络的网卡 …

听GPT 讲Rust源代码--library/std(7)

题图来自 Programming languages: How Google is using Rust to reduce memory safety vulnerabilities in Android[1] File: rust/library/std/src/sys/unix/kernel_copy.rs 在Rust的标准库中&#xff0c;kernel_copy.rs文件位于sys/unix目录下&#xff0c;其主要作用是实现特…

Aware接口回调的作用及其意义

Aware接口回调的作用是让Bean获取Spring容器的一些资源或上下文信息&#xff0c;从而更方便地访问其他Bean或资源。 Aware接口包括以下几种&#xff1a; ApplicationContextAware&#xff1a;实现该接口的Bean可以获取到Spring容器的ApplicationContext对象&#xff0c;从而可以…

github搜索技巧探索

毕设涉及到推荐系统&#xff0c;那么就用搜索推荐系统相关资料来探索一下GitHub的搜搜技巧 文章目录 1. 基础搜索2. 限定在特定仓库搜索3. 按照语言搜索4. 按照star数量搜索5. 搜索特定用户/组织的仓库6. 查找特定文件或路径7. 按时间搜索8. 搜索不包含某个词的仓库9. 搜索特定…

stream流—关于Collectors.toMap使用详解

目录 使用规则&#xff1a;1.将list转成以id为key的map&#xff0c;value是id对应的某对象2.假如id存在重复值&#xff0c;则会报错Duplicate key xxx3.想获得一个id和name对应的Map<String, String>3.1 name为空时null3.2 id重复时 4.分组 使用groupingby 使用规则&…

Visual Studio 2019部署桌面exe(笔记)

一、使用Visual Studio自带的Publish功能 上述两张图片一般会自动加载&#xff0c;只需要查看一下即可。 签名问题&#xff1a; 生成exe执行文件 双击setup.exe 桌面生成&#xff08;默认图标&#xff09; 换图标&#xff1a; 对应桌面生成的exe

Redis快速上手篇七(集群-六台虚拟机)

Redis集群 主从复制的场景无法吗满足主机单点故障时需要引入集群配置 一般数据库要处理的读请求远大于写请求 &#xff0c;针对这种情况&#xff0c;我们优化数据库可以采用读写分离的策略。我们可以部 署一台主服务器主要用来处理写请求&#xff0c;部署多台从服务器 &#…

BEVFusion论文与模型代码分享

BEVFusion有两篇撞名的文章: 一篇是:BEVFusion: Multi-Task Multi-Sensor Fusion with Unified Bird’s-Eye View Representation Zhijian 论文:https://arxiv.org/pdf/2205.13790.pdf 代码:https://github.com/mit-han-lab/bevfusion 另外一篇是:BEVFusion:A Simple …

pycharm 2023.2.3设置conda虚拟环境

分两步&#xff1a; &#xff08;1&#xff09;设置Virtualenv Environment &#xff08;2&#xff09;设值Conda Executable 加载conda环境&#xff0c;然后选择conda环境

异步 AIMD 收敛

给出的一直都是同步 AIMD 收敛&#xff0c;所以简单&#xff0c;但不至于 bbr 单流情形退化成简陋。 给出一个异步 AIMD 收敛过程是必要的&#xff0c;可见&#xff0c;它同样是简洁优美的&#xff1a; 虽然我没有标注太多&#xff0c;它始终没有成为一团乱麻。 和同步 AIM…