Flink源码之State创建流程

StreamOperatorStateHandler

在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateHandler类型的成员变量, StreamOperatorStateHandler对象变量封装了keyedStatedBackend和operatorStateBackend,用于统一管理SteamOperator的状态。

 OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeState(StreamTaskStateInitializer) StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackendStreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理StreamOperatorStateHandler::initializeOperatorStateStateInitializationContextImpl::new //封装DefaultKeyedStateStore和OperatorStateStoreCheckpointedStreamOperator::initializeState(StateInitializationContext)//调用用户定义函数中的initializeState方法,可获取Operator StateStreamingRuntimeContext::setKeyedStateStore

Flink中主要有两种StateBackend:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

每个StreamTask一个StateBackend成员变量,在构造函数中进行初始化,通过用户代码中设置或StateBackendLoader::loadStateBackendFromConfig从配置中加载,默认为HashMapStateBackend。简单起见,以HashMapStateBackend为例剖析创建KeyedStatedBackend和OperatorStateBackend以及处理数据流时是如何使用KeyedState和OperatorState的。

OperatorState

OperatorState创建流程:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::operatorStateBackendHashMapStateBackend::createOperatorStateBackend //创建DefaultOperatorStateBackendStreamOperatorStateHandler::new //创建StreamOperatorStateHandlerStreamOperatorStateHandler::initializeOperatorState //调用CheckpointedFunction::initializeStateStateInitializationContextImpl::new //该实例可getOperatorStateStore

使用Operator State的用户业务代码需要实现CheckpointedFunction接口,该接口中有以两个下方法:

void initializeState(FunctionInitializationContext context) throws Exception;void snapshotState(FunctionSnapshotContext context) throws Exception;

其中initializeState方法则会被StreamOperatorStateHandler.initializeOperatorState 调用,在initializeState方法中可使用

FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::newPartitionableListState::new  //内部是ArrayList

因此通过OperatorStateStore获取的ListState内部本质上是一个ArrayList, 业务代码中可以调用add方法向这个内部List添加元素,由StateBackend管理每个Operator State,这样就实现了一个分布式状态管理,借助Checkpoint可以实现状态持久化及容灾恢复。

OperatorStateStore有三个获取状态方法:

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)throws Exception

KeyedState

KeyedState创建流程如下:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::keyedStatedBackendHashMapStateBackend::createKeyedStateBackend //创建HeapKeyedStateBackendHeapKeyedStateBackendBuilder::buildInternalKeyContextImpl::new //用于保存当前正在处理的keyStreamOperatorStateHandler::new //创建StreamOperatorStateHandlerDefaultKeyedStateStore::new //创建DefaultKeyedStateStoreStreamingRuntimeContext::setKeyedStateStore //设置keyedStateStore成员变量AbstractStreamUdfOperator::openFunctionUtils::openFunctionRichFunction::open

KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState时,用户自定义函数实现RichFunction接口,在open方法中调用getRuntimeContext().getState方法获取状态:

getRuntimeContext().getState() //获取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedStateLatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabledTtlStateFactory::createStateAndWrapWithTtlIfEnabled //包装TTLHeapKeyedStateBackend::createInternalStateHeapKeyedStateBackend::tryRegisterStateTable //这里很关键,对每个State创建一个StateTableCopyOnWriteStateTable::new//异步快照,这里传递了当前KeyedStateBackend的InternalKeyContextStateTable::new //根据当前Task管理的KeyGroups数量创建StateMap数组CopyOnWriteStateTable::createStateMap //一个KeyGroup一个StateMapCopyOnWriteStateMap::new //存储key及其对应的状态HeapValueState::createHeapValueState::new //有个成员变量指向存储当前state的CopyOnWriteStateMapHeapValueState::setCurrentNamespace  //默认为VoidNamespace

KeyedState有以下几种类型

ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 获取HeapValueStateListState<T> getListState(ListStateDescriptor<T> stateProperties)获取HeapListStateMapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)获取HeapMapStategetAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)获取HeapAggregatingStategetReducingState(ReducingStateDescriptor<T> stateProperties)获取HeapReducingState

RocksDBStateBackend

EmbeddedRocksDBStateBackend 管理OperatorState与HashMapStateBackend 一样,也是通过DefaultOperatorStateBackend进行管理的。

EmbeddedRocksDBStateBackend 管理KeyedState则是使用RocksDBKeyedStateBackend实现,这样可以借助磁盘加内存进行大状态管理:

RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState

总结

Flink内置状态管理是相比其他分布式流式处理系统最大的优势之一,不用借助外部存储组件,就可实现高效可靠的分布式状态管理,极大降低了学习和使用成本。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/39695.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Web framework-Gin

一、Gin Go Web--Go Module 软件框架&#xff08;software framework&#xff09;&#xff0c;通常指的是为了实现某个业界标准或完成特定基本任务的软件组件规范&#xff0c;也指为了实现某个软件组件规范时&#xff0c;提供规范所要求之基础功能的软件产品。 框架就是&#…

机器学习|Softmax 回归的数学理解及代码解析

机器学习&#xff5c;Softmax 回归的数学理解及代码解析 Softmax 回归是一种常用的多类别分类算法&#xff0c;适用于将输入向量映射到多个类别的概率分布。在本文中&#xff0c;我们将深入探讨 Softmax 回归的数学原理&#xff0c;并提供 Python 示例代码帮助读者更好地理解和…

HarmonyOS NEXT新能力,一站式高效开发HarmonyOS应用

2023年8月6日华为开发者大会2023&#xff08;HDC.Together&#xff09;圆满收官&#xff0c;伴随着HarmonyOS 4的发布&#xff0c;华为向开发者发布了汇聚所有最新开发能力的HarmonyOS NEXT开发者预览版&#xff0c;并分享了围绕“一次开发&#xff0c;多端部署” “可分可合&a…

代码随想录算法训练营第60天|动态规划part17| 647. 回文子串、516.最长回文子序列、动态规划总结篇

代码随想录算法训练营第60天&#xff5c;动态规划part17&#xff5c; 647. 回文子串、516.最长回文子序列、动态规划总结篇 647. 回文子串 647. 回文子串 思路&#xff1a; 暴力解法 两层for循环&#xff0c;遍历区间起始位置和终止位置&#xff0c;然后还需要一层遍历判断…

【mysql】—— 表的增删改查

目录 序言 &#xff08;一&#xff09;Create操作 1、单行数据 全列插入 2、多行数据 指定列插入 3、插入否则更新 4、直接替换 &#xff08;二&#xff09;Retrieve操作 1、SELECT 列 1️⃣全列查询 2️⃣指定列查询 3️⃣查询字段为表达式 4️⃣为查询结果指定…

数据结构——堆

数据结构——堆 堆堆简介堆的分类 二叉堆过程插入操作 删除操作向下调整&#xff1a; 增加某个点的权值实现参考代码&#xff1a;建堆方法一&#xff1a;使用 decreasekey&#xff08;即&#xff0c;向上调整&#xff09;方法二&#xff1a;使用向下调整 应用对顶堆 其他&#…

Python Flask+Echarts+sklearn+MySQL(评论情感分析、用户推荐、BI报表)项目分享

Python FlaskEchartssklearnMySQL(评论情感分析、用户推荐、BI报表)项目分享 项目背景&#xff1a; 随着互联网的快速发展和智能手机的普及&#xff0c;人们越来越倾向于在网上查找餐厅、购物中心、酒店和旅游景点等商户的点评和评分信息&#xff0c;以便做出更好的消费决策。…

YOLOv8 : 网络结构

一. YOLOv8网络结构 1. Backbone YOLOv8的Backbone同样参考了CSPDarkNet-53网络&#xff0c;我们可以称之为CSPDarkNet结构吧&#xff0c;与YOLOv5不同的是&#xff0c;YOLOv8使用C2f(CSPLayer_2Conv)代替了C3模块(如果你比较熟悉YOLOv5的网络结构&#xff0c;那YOLOv8的网络…

【GitHub】Pycharm本地项目打包上传到Github仓库的操作步骤

文章目录 1、Pycharm端的设置操作2、Github端的设置操作3、Pycharm上配置Github4、Git本地项目至GitHub仓库5、前往Github中查看确认6、常见报错 1、Pycharm端的设置操作 通过CtrlAltS快捷组合键的方式&#xff0c;打开设置&#xff0c;导航到版本控制一栏中的Git&#xff0c;…

Gin安装解决国内go 与 热加载

get 方式安装超时问题&#xff0c;国内直接用官网推荐的下面这个命令大概率是安装不成功的 go get -u github.com/gin-gonic/gin 可以在你的项目目录下执行下面几个命令&#xff1a; 比如我的项目在E:\Oproject\zl cmd E:\Oproject\zl>就在目录下执行 go env -w GO111…

回归预测 | MATLAB实现GRU门控循环单元多输入多输出

回归预测 | MATLAB实现GRU门控循环单元多输入多输出 目录 回归预测 | MATLAB实现GRU门控循环单元多输入多输出预测效果基本介绍程序设计往期精彩参考资料 预测效果 基本介绍 MATLAB实现GRU门控循环单元多输入多输出&#xff0c;数据为多输入多输出预测数据&#xff0c;输入10个…

pytorch安装VAE项目详解

安装VAE项目 一、 基本环境二、代码来源三、搭建conda环境四、下载数据集五、启动项目六、其他相关问题 一、 基本环境 工具版本号OSwin 11pycharm2020.1GPU3050 二、代码来源 github地址为&#xff1a; https://github.com/AntixK/PyTorch-VAE/blob/8700d245a9735640dda458d…

ZooKeeper的应用场景(集群管理、Master选举)

5 集群管理 随着分布式系统规模的日益扩大&#xff0c;集群中的机器规模也随之变大&#xff0c;因此&#xff0c;如何更好地进行集群管理也显得越来越重要了。 所谓集群管理&#xff0c;包括集群监控与集群控制两大块&#xff0c;前者侧重对集群运行时状态的收集&#xff0c;后…

08 - 追加commit和修改最新的commit message

查看所有文章链接&#xff1a;&#xff08;更新中&#xff09;GIT常用场景- 目录 文章目录 1. 追加提交2. 修改最新的commit message 1. 追加提交 将改动追加到上一次的commit 现在我已经修改了Readme文件并且已经add、commit操作&#xff0c;但是还没有push到远程仓库&#x…

【左神算法刷题班】第17节:在有序二维数组中查找目标值、等于目标字符串的子序列个数

第17节 题目1&#xff1a;在有序二维数组中查找目标值 给定一个每一行有序、每一列也有序&#xff0c;整体可能无序的二维数组 再给定一个数num&#xff0c; 返回二维数组中有没有num这个数 例子 数组如下&#xff0c;找 6 是否存在。 1 3 5 7 2 4 6 13 3 9 14 …

“心理健康人工智能产学研创新联盟”揭牌成立|深兰科技

8月14日上午&#xff0c;“2023树洞救援年会”在上海举行&#xff0c;会上举行了“心理健康人工智能产学研创新联盟”的签约和揭牌仪式。“树洞行动救援团”创始人深兰科技科学院智能科学首席科学家、荷兰阿姆斯特丹自由大学人工智能系终身教授黄智生&#xff0c;深兰科技集团创…

华纳云:ubuntu启动宝塔的方法是什么

要在Ubuntu上启动宝塔面板&#xff08;BT Panel&#xff09;&#xff0c;请按照以下步骤操作&#xff1a; 下载并安装宝塔面板&#xff1a; 在终端中执行以下命令&#xff0c;以下载并运行宝塔面板的安装脚本&#xff1a; sudo su wget -O install.sh http://download.bt.c…

Git 常用操作

一、Git 常用操作 1、Git 切换分支 git checkout命令可以用于三种不同的实体&#xff1a;文件&#xff0c;commit&#xff0c;以及分支。checkout的意思就是对于一种实体的不同版本之间进行切换的操作。checkout一个分支&#xff0c;会更新当前的工作空间中的文件&#xff0c;…

68 # 中间层如何请求其他服务

前端 ajax 有跨域问题&#xff0c;可以先访问中间层&#xff0c;在通过 node 去请求别的服务端口&#xff0c;可以解决跨域问题 编写中间层调用 // 中间层的方式const http require("http");// http.get 默认发送 get 请求 // http.request 支持其他请求格式 postl…

Java基础知识实际应用(学生信息管理系统、猜拳小游戏、打印日历)

一、Java学生信息管理系统 这个系统包含了添加、修改、删除、查询和显示所有学生信息等功能。您可以在此基础上进行修改和完善&#xff0c;以适应您的需求。 import java.util.Scanner;public class StudentManagementSystem {private static Scanner scanner new Scanner(S…