Flink State 状态原理解析 | 京东物流技术团队

一、Flink State 概念

State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。

Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBackend 实现将相关数据存储到 FS 文件系统或者 RocksDB 数据库中。在Flink应用运行过程中,通过 checkpoint 快照定期地保存状态数据。并在 Flink 应用重启时加载checkpoint/savepoint 来实现状态的恢复,从而让 Flink 应用继续完成之前的数据计算,实现数据精确一次向下游传递。

1.1 Apache Flink 中 State 的存储实现 StateBackend 分类

分为以下3类:

  • 基于内存的 HeapStateBackend。状态存储在内存中。
  • 基于 HDFS 或 OSS 的 FsStateBackend。状态存储在内存,并在做 cp(checkpoint)时存到远端。
  • 基于 RocksDB 的 RocksDBStateBackend。将对象序列化成二进制存在内存和本地磁盘的 RocksDB 数据中,并在 cp 时存到远端。

HeapStateBackend 和 RocksDBStateBackend 分别对应在 TaskManager 内存模型中的位置:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

RocksDBStateBackend 中存储结构:

namespace: 在不同的 namespace 下存在相同名称的状态。

1.1.1 State 状态持久化

通过 Chandy-Lamport 分布式快照算法进行 checkpoint 完成状态数据的持久化。然后在 Flink 应用重启时读取 State 状态数据,进行运行现场的还原。

chekcpoint 分类:

  • 基于内存的全量 checkpoint
  • HDFS 全量 checkpoint
  • RocksDB 全量 checkpoint/增量 checkpoint

1.2 State 基于算子和数据分组的分类

State 可分为 Operator State 和 Keyed State 两类。

  • Operator State(称为 non-keyed state)

常常存在于Source, Sink中。具体实现类例如:

  • BroadcastState

例:Kafka Source 中用 OperatorState 记录 offset。

  • Keyed State

任何类型的 keyed state 都可以有有效期(TTL),所有状态类型都支持单元素的 TTL。 这意味着 List 元素和 Map 映射元素将独立到期。

例:SQL GroupBy/PartitionBy 后的窗口中的数据,每个 key 都有对应的 State。key 与 key 之间的 State 数据不可见。

keyed state 的具体实现类:

  • ValueState
  • MapState
  • ListState
  • AggregatingState
  • ReducingState
  • 。。。。。

Flink State思维导图:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Keyed StateOperator State
适用算子类型只适用于KeyedStream上的算子可用于所有算子
状态分配每个Key对应一个状态一个算子子任务对应一个状态
横向扩展状态随着keyBy的分组KeyGroup自动在多个算子子任务上迁移有多种状态重新分配的方式
创建和访问方式自定义算子(重写RichFunction,通过State 名称从 getRuntimeContext方法创建或获得 State )实现 CheckpointedFunction 等接口
支持数据结构ValueState、ListState、MapState等ListState、BroadcastState等

二、常见状态相关处理流程

2.1 Flink 应用中状态是如何存储的?

1. Kafka Source 如何存储 OperatorState?

class FlinkKafkaConsumerBase {private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; // state名称:"topic-partition-offset-states"
// 特殊的State类型:Union State 
}

unionOffsetStates这个变量就是 OperatorState类型的。

2. Map算子如何存储需要累计的数据?

  • ValueState/MapState/ListState/…

思考:keyby 后的数据分发与多并行度 subtask 之间的关系是怎样的?

首先,datastream 中数据经过 keyby 之后,会划分到各个 KeyedStream 中。每个 KeyedStream 有自己的 KeyedState(如ValueState/ListState/MapState)。

其次,KeyedStream 中的数据会以 KeyGroup 方式组织在一起。KeyGroup 是 Flink 重新分发 key state 的最小单元。

最后,KeyGroup 中的数据会通过取模最大并行度的方式分散到各个 subtask 中。以下是关键源码:

KeyGroupStreamPartitioner#selectChannel(record)
{K key;key = keySelector.getKey(record.getInstance().getValue());return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
--KeyGroupRangeAssignment#assignKeyToParallelOperator(){return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}--KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup()公式:OperatorIndex = keyGroupId * parallelism / maxParallelism--KeyGroupRangeAssignment#assignToKeyGroup(){return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}

2.2 修改并行度场景时 State 状态存储的变化

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

2.3 State 与 Checkpoint 关系

分布式快照 Checkpoint 的概念,定期将 State 持久化到 外部存储系统(HDFS/OSS) 上。用户可以通过实现 CheckpointedFunction 接口来使用 operator state。通过 barrier 来对齐 checkpoint,等待 State 持久化完成(此过程参数不同也可能是异步的)。

常见 State 与 CP 相关的问题

  • State 状态过大。现象为多个算子或单个算子多个 subtask 做 checkpoint 慢,可导致 CP 对齐时间长,严重时会导致 CP 超时。
  • 数据倾斜导致某个 subtask 处理不及时。现象为单个算子少数几个 subtask 做 checkpoint 慢,导致 CP 对齐时间长。严重时会导致 CP 超时。
  • 大作业(并行度搞)频繁做 CP,会频繁上传小文件,导致 HDFS 集群小文件过多。

常用解决措施:调大托管内存大小。

三、参考文档:

  • Flink State 官方文档:Flink 状态与容错
  • https://cloud.tencent.com/developer/article/1403939
  • https://www.modb.pro/db/81206

作者:京东物流 吴云涛

来源:京东云开发者社区 自猿其说Tech 转载请注明来源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/202929.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA加载阿里Java规范插件

IDEA加载阿里巴巴Java开发手册插件&#xff0c;在写代码的时候会自动扫描代码规范。 1、打开Settings 2、打开Plugins 3、搜索Alibaba Java Code Guidelines&#xff08;XenoAmess TPM&#xff09;插件&#xff0c;点击Install进行安装&#xff0c;然后重启IDE生效。 4、鼠标右…

测试新手百科:Postman简介、安装、入门使用方法详细攻略!

一、Postman背景介绍 用户在开发或者调试网络程序或者是网页B/S模式的程序的时候是需要一些方法来跟踪网页请求的&#xff0c;用户可以使用一些网络的监视工具比如著名的Firebug等网页调试工具。今天给大家介绍的这款网页调试工具不仅可以调试简单的css、html、脚本等简单的网…

要求CHATGPT高质量回答的艺术:提示工程技术的完整指南—第 12 章:多选提示

要求CHATGPT高质量回答的艺术&#xff1a;提示工程技术的完整指南—第 12 章&#xff1a;多选提示 这种技术会向模型提出一个问题或任务&#xff0c;并将一组预定义的选项作为可能的答案。 该技术可用于生成仅限于特定选项集的文本&#xff0c;并可用于问题解答、文本补全和其…

记录一个困难(python)

在从一个网页跳转另一个网页&#xff08;该网页是登录页面&#xff09; 采用python的selenium库对网页进行自动化登录 import time from selenium import webdriver path chromedriver.exe driver webdriver.Chrome(path) driver.get("url") time.sleep(2) driver.f…

显存优化 Trick(gradient_accumulation、gradient_checkpointing、xformers)

目录 Out of MemoryGradient AccumulationGradient CheckpointingXformersDiffusers的显存优化 Out of Memory 先来说下OOM问题&#xff0c;其实也是日常会遇到的情况。模型申请的显存超过了设备实际显存大小&#xff0c;则会报错Out of Memory。一般情况下&#xff0c;batch …

java--匿名内部类

1.匿名内部类 ①就是一种特殊的局部内部类&#xff1b;所谓匿名&#xff1a;指的是程序员不需要为这个类声明名字。 ②特点&#xff1a;匿名内部类本质就是一个子类&#xff0c;并会立即创建出一个子类对象。 ③作用&#xff1a;用于更方便的创建一个子类对象。 2.匿名内部类…

Stm32 CubeIDE对RTC的日期、时间读写,后备存储的读写的部分做个补充说明

上一篇文章讲了Stm32 CubeIDE对RTC的日期、时间读写&#xff0c;后备存储的读写&#xff0c;发现几个问题&#xff0c;再次说明一下情况&#xff1a; 1.如果对RTC做初始化&#xff1a; hrtc.Instance RTC;hrtc.Init.HourFormat RTC_HOURFORMAT_24;hrtc.Init.AsynchPrediv 12…

用队列实现栈的功能(c++实现)

使用一个队列实现栈的基本功能&#xff1a;push、pop、判断栈是否为空等&#xff0c;实现的代码如下&#xff1a; #include<iostream> #include<queue> #include<ctime>//计算代码所需要的时间 using namespace std;class MyStack { public:queue<int>…

Dagger2使用

在android引入Dagger2库 //引入Dagger2implementation("com.google.dagger:dagger:2.48.1")annotationProcessor ("com.google.dagger:dagger-compiler:2.48.1") 构造器注入 创建一个类 public class Car {//在构造器上面添加dagger的Inject即可Injectp…

Java利用UDP实现简单群聊

一、创建新项目 首先新建一个新的项目&#xff0c;并按如下操作 二、实现代码 界面ChatFrame类 package 群聊; import javax.swing.*; import java.awt.*; import java.awt.event.*; import java.net.InetAddress; public abstract class ChatFrame extends JFrame { p…

C语言之多维数组

所谓多维数组就是以多个数组为单位组成的数组&#xff0c;即元素本身是数组的数组。下面我们来学习多维数组的基本知识&#xff1a; 多维数组 上一节学习的数组都是int型或double型等单一类型&#xff0c;实际上数组本身也可以作为组成数组的元素。 以数组作为元素的数组时二…

JavaBean是什么

详情请参考JavaBean规范&#xff1a;https://www.oracle.com/java/technologies/javase/javabeans-spec.html JavaBean是可重用的软件组件&#xff0c;是一个java类&#xff0c;方法名称符合一定的规范&#xff0c;这样使用方使用起来方便&#xff0c;例如框架和工具可以根据规…

Java架构师系统架构设计原则应用

目录 1 导语2 如何设计高并发系统:局部并发原则3 如何设计高并发系统:服务化与拆分4 高可用系统有哪些设计原则?5 如何保持简单轻量的架构-DRY、KISS,YAGNI原则6 如何设计组件间的交互和行为-HCLC,CQS,SOC7 框架层面的发展趋势-约定大于配置想学习架构师构建流程请跳转:…

[leetcode 差分数组] 拼车 M

车上最初有 capacity 个空座位。车 只能 向一个方向行驶&#xff08;也就是说&#xff0c;不允许掉头或改变方向&#xff09; 给定整数 capacity 和一个数组 trips , trip[i] [numPassengersi, fromi, toi] 表示第 i 次旅行有 numPassengersi 乘客&#xff0c;接他们和放他们…

C++11:智能指针

文章目录 前言正文(1) 三种智能指针(2) 智能指针的设计思路(3) unique_ptrunique_ptr的几种初始化方法获取unique_ptr的地址unique_ptr的使用 (4) shared_ptr 前言 一般来说&#xff0c;我们想要在堆上开辟内存空间需要使用关键字new&#xff0c;但由于使用new之后我们还是需要…

【数据结构】动态规划(Dynamic Programming)

一.动态规划&#xff08;DP&#xff09;的定义&#xff1a; 求解决策过程&#xff08;decision process&#xff09;最优化的数学方法。 将多阶段决策过程转化为一系列单阶段问题&#xff0c;利用各阶段之间的关系&#xff0c;逐个求解。 二.动态规划的基本思想&#xff1a; …

cpper绝不当炮灰选手-剑指大厂-c++后端面试大成攻略副本

针对于c后端&#xff0c;本文会直接从面试角度出发&#xff0c;盘点整理在c后端面试中出现的面试题型与经典题目。 包含&#xff1a; c/c&#xff08;36道&#xff09; 设计模式&#xff08;14道&#xff09; 数据结构与算法&#xff08;35道&#xff09; 操作系统&#xf…

【vue】点击导航菜单切换局部页面,打开展示默认栏目,页面刷新等问题

非专业前端&#xff0c;局限性较高&#xff0c;有些问题看起来很小&#xff0c;但是初次接触很棘手&#xff0c;需要查找很多博客&#xff0c;内容也很杂。以下只是过程中总结下来的&#xff0c;要解决的就是标题中的三个问题。 这是我需要达成的效果。 1.第一个是进入导航菜单…

浅谈web性能测试

什么是性能测试&#xff1f; web性能应该注意些什么&#xff1f; 性能测试&#xff0c;简而言之就是模仿用户对一个系统进行大批量的操作&#xff0c;得出系统各项性能指标和性能瓶颈&#xff0c;并从中发现存在的问题&#xff0c;通过多方协助调优的过程。而web端的性能测试…

Clion运行QT,模拟VS弹出CMD框打印

参考&#xff1a;https://stackoverflow.com/questions/35385772/running-clion-on-the-system-console-like-visual-studio 在运行配置的地方进行编辑&#xff1a; 可执行文件设置&#xff1a;C:\Windows\System32\cmd.exe程序实参&#xff1a;/c “start cmd.exe cmd /c “…