13、Flink 的 Operator State 详解

1.算子状态 (Operator State)

算子状态(或者非 keyed 状态)是绑定到一个并行算子实例的状态,Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。

当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例,处理重分发过程有多种不同的方案。

算子状态作为一种特殊类型的状态使用,用于实现 source/sink,以及无法对 state 进行分区而没有主键的这类场景中。

注意: Python DataStream API 仍无法支持算子状态。

2.使用 Operator State

用户可以通过实现 CheckpointedFunction 接口来使用 operator state。

a)CheckpointedFunction 概述

CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;

进行 checkpoint 时会调用 snapshotState(),自定义函数初始化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复;因此 initializeState() 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。

当前 operator state 以 list 的形式存在;这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派,根据状态的不同访问方式,有如下几种重新分配的模式

  • Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成;当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。;比如说,算子 A 的并发度为 1,包含两个元素 element1element2,当并发度增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。
  • Union redistribution: 每个算子保存一个列表形式的状态集合;整个状态由所有的列表拼接而成;当作业恢复或重新分配时,每个算子都将获得所有的状态数据【不建议使用】
b)带缓冲的 SinkFunction

案例SinkFunctionCheckpointedFunction 中进行数据缓存,然后统一发送到下游,演示了列表状态数据的 event-split redistribution。

public class BufferingSinkimplements SinkFunction<Tuple2<String, Integer>>,CheckpointedFunction {private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {bufferedElements.add(value);if (bufferedElements.size() >= threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.update(bufferedElements);}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}}
}

initializeState 方法接收一个 FunctionInitializationContext 参数,用来初始化 non-keyed state 的 “容器” 即 ListState 用于在 checkpoint 时保存 non-keyed state 对象,和 keyed state 类似,StateDescriptor 会包括状态名字、以及状态类型相关信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);

调用不同的获取状态对象的接口,会使用不同的状态分配算法,比如 getUnionListState(descriptor) 会使用 union redistribution 算法, 而 getListState(descriptor) 则使用 even-split redistribution 算法。

当初始化状态对象后,通过 isRestored() 方法判断是否从之前的故障中恢复,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑,BufferingSink 中初始化时,恢复回来的 ListState 的所有元素会添加到一个局部变量中,供下次 snapshotState() 时使用,然后清空 ListState,再把当前局部变量中的所有元素写入到 checkpoint 中。

同样可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。

c)带状态的 Source Function

需要保证更新状态以及输出的原子性(用于支持 exactly-once 语义),需要在发送数据前获取数据源的全局锁。

public static class CounterSourceextends RichParallelSourceFunction<Long>implements CheckpointedFunction {/**  current offset for exactly once semantics */private Long offset = 0L;/** flag for job cancellation */private volatile boolean isRunning = true;/** 存储 state 的变量. */private ListState<Long> state;@Overridepublic void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("state",LongSerializer.INSTANCE));// 从已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法for (Long l : state.get()) {offset = l;}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {state.update(Collections.singletonList(offset));}
}

要获取 checkpoint 成功消息的算子,可以参考 org.apache.flink.api.common.state.CheckpointListener 接口

【当算子完成 checkpoint 后会回调 notifyCheckpointComplete() 方法】。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/5907.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Excel 批量获取sheet页名称,并创建超链接指向对应sheet页

参考资料 用GET.WORKBOOK函数实现excel批量生成带超链接目录且自动更新 目录 一. 需求二. 名称管理器 → 自定义获取sheet页名称函数三. 配合Index函数&#xff0c;获取所有的sheet页名称四. 添加超链接&#xff0c;指向对应的sheet页 一. 需求 ⏹有如下Excel表&#xff0c;需…

Java 正则表达式代码演示

正则表达式&#xff08;Regular Expressions&#xff09;是一种用于描述字符串匹配模式的强大工具。在 Java 中&#xff0c;可以使用 java.util.regex 包来处理正则表达式。 文章目录 一、基本用法二、高级用法 一、基本用法 导入正则表达式类: import java.util.regex.Patte…

Easy TCP Analysis上线案例库功能,为用户提供一个TCP抓包分析案例分享学习的平台

​案例库&#xff0c;提供给用户相互分享TCP抓包故障排查案例或是经典学习案例的功能&#xff0c;任何用户都可从案例库查看其它用户分享的案例&#xff0c;每个用户也都可以上传自己的案例&#xff0c;经过平台审核去重即可展示在案例库。 对于学习&#xff0c;最典型的三次握…

webscoket+webrtc实现语音通话

1.项目方案 前端采用webrtc创建音频上下文&#xff0c;后创建音频源输入和音频处理器&#xff0c;连接音频输入与处理器&#xff0c;处理器再连接到音频输出&#xff08;扬声器&#xff09;&#xff0c;再通过事件获取音频数据&#xff0c;把音频数据转换成字节数据通过webscok…

【Docker学习】docker start深入研究

docker start也是很简单的命令。但因为有了几个选项&#xff0c;又变得复杂&#xff0c;而且... 命令&#xff1a; docker container start 描述&#xff1a; 启动一个或多个已停止的容器。 用法&#xff1a; docker container start [OPTIONS] CONTAINER [CONTAINER...] 别名&…

【网络编程】网络基础

TCP/IP五层模型 物理层&#xff1a;负责光/电信号的传递方式. 比如现在以太网通用的网线&#xff08;双绞线&#xff09;、早期以太网采用的的同轴电缆&#xff08;现在主要用于有线电视&#xff09;、光纤&#xff0c;现在的 WIFI无线网使用电磁波等都属于物理层的概念。物理层…

搭建AI大模型步骤

搭建AI大模型需要以下步骤&#xff1a; 数据收集和预处理&#xff1a;收集大量的训练数据&#xff0c;并进行清洗、标注和预处理&#xff0c;使其适合模型训练。 模型选择&#xff1a;根据具体的任务需求&#xff0c;选择适合的深度学习模型&#xff0c;如卷积神经网络&#x…

QtConcurrent::run操作界面ui的注意事项

先说结论&#xff1a;QtConcurrent::run启动的耗时处理函数&#xff0c;不允许处理ui界面对象&#xff0c;如控件&#xff0c;如进度条等等&#xff01; QtConcurrent::run非常好用&#xff0c;胜过QThead的两种方式&#xff08;run和moveToThread&#xff09;&#xff0c;例如…

使用 uni-app 开发 iOS 应用的操作步骤

哈喽呀&#xff0c;大家好呀&#xff0c;淼淼又来和大家见面啦&#xff0c;上一期和大家一起探讨了使用uniapp开发iOS应用的优势及劣势之后有许多小伙伴想要尝试使用uniapp开发iOS应用&#xff0c;但是却不懂如何使用uniapp开发iOS应用&#xff0c;所以这一期淼淼就来给你们分享…

数据结构复习指导之数组和特殊矩阵

文章目录 数组和特殊矩阵 考纲内容 复习提示 前言 1.数组的定义 2.数组的存储结构 3.特殊矩阵的压缩存储 3.1对称矩阵 3.2三角矩阵 3.3三对角矩阵 4.稀疏矩阵 5.知识回顾 数组和特殊矩阵 考纲内容 &#xff08;一&#xff09;栈和队列的基本概念 &#xff08;二&a…

【C++】:const成员,取地址及const取地址操作符重载

目录 一&#xff0c;const成员二&#xff0c;取地址及const取地址操作符重载 一&#xff0c;const成员 将const修饰的“成员函数”称之为const成员函数&#xff0c;const修饰类成员函数&#xff0c;实际修饰该成员函数隐含的this指针&#xff0c;表明在该成员函数中不能对类的…

百度竞价开户详解:步骤、优势与注意事项

随着互联网的普及&#xff0c;网络营销已成为企业不可或缺的一部分。其中&#xff0c;百度竞价作为一种高效的网络推广方式&#xff0c;受到了越来越多企业的青睐。本文将详细介绍百度竞价开户的流程、优势以及注意事项&#xff0c;帮助企业更好地利用这一工具提升品牌知名度和…

UnityWebGL获取话筒实时数据

看了木子李大佬的数字人https://digital.lkz.fit/之后&#xff0c;我也想搞一个&#xff0c;于是开始研究起来&#xff0c;先从WebGL录音开始&#xff0c;一共试了三个插件&#xff0c;个个都有问题…… 1、UnityWebGLMicrophone 用起来没啥问题&#xff0c;但是只能录音&#…

数据结构的队列(c语言版)

一.队列的概念 1.队列的定义 队列是一种常见的数据结构&#xff0c;它遵循先进先出的原则。类似于现实生活中排队的场景&#xff0c;最先进入队列的元素首先被处理&#xff0c;而最后进入队列的元素则要等到前面的元素都被处理完后才能被处理。 在队列中&#xff0c;元素只能…

《恶意不息》是一款什么样的游戏,苹果电脑怎么玩《恶意不息》恶意不息游戏内怎么存档 mac电脑玩游戏

近日steam游戏商城新上架了一款名叫《恶意不息》的游戏十分火爆&#xff0c;那么《恶意不息》是一款什么样的游戏&#xff0c;苹果电脑怎么玩《恶意不息》&#xff1f;一起来看看吧&#xff01; 一、《恶意不息》是一款什么样的游戏&#xff1f; Private Division&#xff0c;…

【蓝桥杯嵌入式】第七届省赛 - 模拟液位检测告警系统

代码开源&#xff0c;Gitee自取 代码开源&#xff0c;Gitee自取 代码开源&#xff0c;Gitee自取 目录 0 前言 1 展示 1.1 源码 1.2 演示视频 1.3 题目展示 2 工程配置 3 资源配置&代码实现 3.1 定时器 3.2 液位检测 3.3 液位阈值设定 3.4 液位阈值设定 3.5 串…

BST二叉搜索树

概念 二叉搜索树&#xff08;Binary Search Tree&#xff0c;简称BST&#xff09;&#xff0c;又称为二叉排序树或二叉查找树&#xff0c;是一种特殊的二叉树数据结构。它具有以下基本性质&#xff1a; 节点的值的有序性&#xff1a;对于BST中的任意一个节点&#xff0c;其左…

Angular基础-搭建Angular运行环境

这篇文章介绍了在Angular项目中进行开发环境搭建的关键步骤。包括node.js安装和配置、安装Angular CLI工具、安装angular-router、创建Angular项目等步骤。这篇文章为读者提供了清晰的指南&#xff0c;帮助他们快速搭建Angular开发环境&#xff0c;为后续的项目开发奠定基础。 …

【Debug】TensorRT报错汇总

搭建TensorRT过程参见&#xff1a;【通俗易懂】Windows系统安装TensorRT 下面是运行中的问题汇总及解决。 报错1. No module named ‘pycuda’ 解决&#xff1a; pip install pycuda报错2. AttributeError: ‘tensorrt.tensorrt.Builder’ object has no attribute ‘max_wor…

在ubuntu 24.04 上安装vmware workstation 17.5.1

ubuntu安装在新组装的i9 14900机器上&#xff0c;用来学习笨叔的ARM64体系结构编程&#xff0c;也熟悉Linux的用法。但有时候写文档总是不方便&#xff0c;还是需要window来用。因此想在ubuntu 24.04上安装Linux版本的vmware worksation 17.5.1以虚拟机的方式安装windows 11。其…