Flink State 误用之痛,你中招了吗?

简介: 本文主要讨论一个问题:ValueState 中存 Map 与 MapState 有什么区别?如果不懂这两者的区别,而且使用 ValueState 中存大对象,生产环境很可能会出现以下问题:CPU 被打满、吞吐上不去。

本文主要讨论一个问题:ValueState 中存 Map 与 MapState 有什么区别?

如果不懂这两者的区别,而且使用 ValueState 中存大对象,生产环境很可能会出现以下问题:

· CPU 被打满
· 吞吐上不去

1、 结论

从性能和 TTL 两个维度来描述区别。

性能

· RocksDB 场景,MapState 比 ValueState 中存 Map 性能高很多。
· 生产环境强烈推荐使用 MapState,不推荐 ValueState 中存大对象
· ValueState 中存大对象很容易使 CPU 打满
· Heap State 场景,两者性能类似。

TTL

Flink 中 State 支持设置 TTL:

· MapState 的 TTL 是基于 UK 级别的
· ValueState 的 TTL 是基于整个 key 的

举一反三

能使用 ListState 的场景,不要使用 ValueState 中存 List。大佬们已经把 MapState 和 ListState 性能都做了很多优化,高性能不香吗?下文会详细分析 ValueState 和 MapState 底层的实现原理,通过分析原理得出上述结论。

2、 State 中要存储哪些数据

ValueState 会存储 key、namespace、value,缩写为 。MapState 会存储 key、namespace、userKey、userValue,缩写为 。

解释一下上述这些名词。

Key

ValueState 和 MapState 都是 KeyedState,也就是 keyBy 后才能使用 ValueState 和 MapState。所以 State 中肯定要保存 key。

例如:按照 app 进行 keyBy,总共有两个 app,分别是:app1 和 app2。那么状态存储引擎中肯定要存储 app1 或 app2,用于区分当前的状态数据到底是 app1 的还是 app2 的。

这里的 app1、app2 也就是所说的 key。

Namespace

Namespace 用于区分窗口。

假设需要统计 app1 和 app2 每个小时的 pv 指标,则需要使用小时级别的窗口。状态引擎为了区分 app1 在 7 点和 8 点的 pv 值,就必须新增一个维度用来标识窗口。

Flink 用 Namespace 来标识窗口,这样就可以在状态引擎中区分出 app1 在 7 点和 8 点的状态信息。

Value、UserKey、UserValue

ValueState 中存储具体的状态值。也就是上述例子中对应的 pv 值。MapState 类似于 Map 集合,存储的是一个个 KV 键值对。为了与 keyBy 的 key 进行区分,所以 Flink 中把 MapState 的 key、value 分别叫 UserKey、UserValue。

下面讲述状态引擎是如何存储这些数据的。

3、StateBackend 如何存储和读写State 数据

Flink 支持三种 StateBackend,分别是:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。

其中 MemoryStateBackend、FsStateBackend 两种 StateBackend 在任务运行期间都会将 State 存储在内存中,两者在 Checkpoint 时将快照存储的位置不同。RocksDBStateBackend 在任务运行期间将 State 存储在本地的 RocksDB 数据库中。

所以下文将 MemoryStateBackend、FsStateBackend 统称为 heap 模式,RocksDBStateBackend 称为 RocksDB 模式。

3.1 Heap 模式 ValueState 和 MapState 如何存储

Heap 模式表示所有的状态数据都存储在 TM 的堆内存中,所有的状态都存储的原始对象,不会做序列化和反序列化。(注:Checkpoint 的时候会涉及到序列化和反序列化,数据的正常读写并不会涉及,所以这里先不讨论。)

Heap 模式下,无论是 ValueState 还是 MapState 都存储在 CopyOnWriteStateMap 中。

· key 、 Namespace 分别对应 CopyOnWriteStateMap 的 K、N。
· ValueState 的 value 对应 CopyOnWriteStateMap 的 V。

MapState 将会把整个 Map 作为 CopyOnWriteStateMap 的 V,相当于 Flink 引擎创建了一个 HashMap 用于存储 MapState 的 KV 键值对。

具体 CopyOnWriteStateMap 是如何实现的,可以参考:万字长文详解 Flink 中的 CopyOnWriteStateTable。

回到正题:Heap 模式下,ValueState 中存 Map 与 MapState 有什么区别?

Heap 模式下没有区别。

ValueState 中存 Map,相当于用户手动创建了一个 HashMap 当做 V 放到了状态引擎中。而 MapState 是 Flink 引擎帮用户创建了一个 HashMap 当做 V 放到了状态引擎中。

所以实质上 ValueState 中存 Map 与 MapState 都是一样的,存储结构都是 CopyOnWriteStateMap。区别在于 ValueState 是用户手动创建 HashMap,MapState 是 Flink 引擎创建 HashMap。

3.2 RocksDB 模式 ValueState 和 MapState 如何存储

RocksDB 模式表示所有的状态数据存储在 TM 本地的 RocksDB 数据库中。RocksDB 是一个 KV 数据库,且所有的 key 和 value 都是 byte 数组。所以无论是 ValueState 还是 MapState,存储到 RocksDB 中都必须将对象序列化成二进制当前 kv 存储在 RocksDB 中。

 3.2.1 ValueState 如何映射成 RocksDB 的 kv

ValueState 有 key、namespace、value 需要存储,所以最简单的思路:

1、将 ValueState 的 key 序列化成 byte 数组
2、将 ValueState 的 namespace 序列化成 byte 数组
3、将两个 byte 数组拼接起来做为 RocksDB 的 key
4、将 ValueState 的 value 序列化成 byte 数组做为 RocksDB 的 value

然后就可以写入到 RocksDB 中。

查询数据也用相同的逻辑:将 key 和 namespace 序列化后拼接起来作为 RocksDB 的 key,去 RocksDB 中进行查询,查询到的 byte 数组进行反序列化就得到了 ValueState 的 value。

这就是 RocksDB 模式下,ValueState 的读写流程。

 3.2.2 MapState 如何映射成 RocksDB 的 kv

MapState 有 key、namespace、userKey、userValue 需要存储,所以最简单的思路:

1、将 MapState 的 key 序列化成 byte 数组
2、将 MapState 的 namespace 序列化成 byte 数组
3、将 MapState 的 userKey 序列化成 byte 数组
4、将三个 byte 数组拼接起来做为 RocksDB 的 key
5、将 MapState 的 value 序列化成 byte 数组做为 RocksDB 的 value

然后就可以写入到 RocksDB 中。

查询数据也用相同的逻辑:将 key、namespace、userKey 序列化后拼接起来作为 RocksDB 的 key,去 RocksDB 中进行查询,查询到的 byte 数组进行反序列化就得到了 MapState 的 userValue。

这就是 RocksDB 模式下,MapState 的读写流程。

3.3 RocksDB 模式下,ValueState 中存 Map 与 MapState 有什么区别?

■ 3.3.1 假设 Map 集合有 100 个 KV 键值对,具体两种方案会如何存储数据?

ValueState 中存 Map,Flink 引擎会把整个 Map 当做一个大 Value,存储在 RocksDB 中。对应到 RocksDB 中,100 个 KV 键值对的 Map 集合会序列化成一个 byte 数组当做 RocksDB 的 value,存储在 RocksDB 的 1 行数据中。

MapState 会根据 userKey,将 100 个 KV 键值对分别存储在 RocksDB 的 100 行中。

■ 3.3.2 修改 Map 中的一个 KV 键值对的流程

ValueState 的情况,虽然要修改 Map 中的一个 KV 键值对,但需要将整个 Map 集合从 RocksDB 中读出来。具体流程如下:

1、将 key、namespace 序列化成 byte 数组,生成 RocksDB 的 key
2、从 RocksDB 读出 key 对应 value 的 byte 数组
3、将 byte 数组反序列化成整个 Map
4、堆内存中修改 Map 集合
5、将 Map 集合写入到 RocksDB 中,需要将整个 Map 集合序列化成 byte 数组,再写入

MapState 的情况,要修改 Map 中的一个 KV 键值对,根据 key、namespace、userKey 即可定位到要修改的那一个 KV 键值对。具体流程如下:

1、将 key、namespace、userKey 序列化成 byte 数组,生成 RocksDB 的 key
2、从 RocksDB 读出 key 对应 value 的 byte 数组
3、将 byte 数组反序列化成 userValue
4、堆内存中修改 userValue 的值
5、将 userKey、userValue 写入到 RocksDB 中,需要先序列化,再写入

■ 3.3.3 结论

要修改 Map 中的一个 KV 键值对:

如果使用 ValueState 中存 Map,则每次修改操作需要序列化反序列化整个 Map 集合,每次序列化反序列大对象会非常耗 CPU,很容易将 CPU 打满。

如果使用 MapState,每次修改操作只需要序列化反序列化 userKey 那一个 KV 键值对的数据,效率较高。

举一反三:其他使用 ValueState、value 是大对象且 value 频繁更新的场景,都容易将 CPU 打满。

例如:ValueState 中存储的位图,如果每条数据都需要更新位图,则可能导致 CPU 被打满。

为了便于理解,上述忽略了一些实现细节,下面补充一下。

3.4 直接拼接 key 和 namespace 可能导致 RocksDB 的 key 冲突

假设 ValueState 中有两个数据:

· key1 序列化后的二进制为 0x112233, namespace1 序列化后的二进制为0x4455
· key2 序列化后的二进制为 0x1122, namespace2 序列化后的二进制为0x334455

这两个数据对应的 RocksDB key 都是 0x1122334455,这样的话,两个不同的 key、namespace 映射到 RocksDB 中变成了相同的数据,无法做区分。

解决方案:

在 key 和 namespace 中间写入 key 的 byte 数组长度,在 namespace 后写入 namespace 的 byte 长度。

写入这两个长度就不可能出现 key 冲突了,具体为什么,读者可以自行思考。

3.5 RocksDB 的 key 中还会存储 KeyGroupId

对 KeyGroup 不了解的同学可以参考:Flink 源码:从 KeyGroup 到 Rescale。

加上 KeyGroupId 也比较简单。只需要修改 RocksDB key 的拼接方式,在序列化 key 和 namespace 之前,先序列化 KeyGroupId 即可。

4. State TTL 简述

Flink 中 TTL 的实现,都是将用户的 value 封装了一层,具体参考下面的 TtlValue 类:

public class TtlValue<T> implements Serializable {@Nullableprivate final T userValue;private final long lastAccessTimestamp;
}

TtlValue 类中有两个字段,封装了用户的 value 且有一个时间戳字段,这个时间戳记录了这条数据写入的时间。

如果开启了 TTL,则状态中存储的 value 就是 TtlValue 对象。时间戳字段也会保存到状态引擎中,之后查询数据时,就可以通过该时间戳判断数据是否过期。

· ValueState 将 value 封装为 TtlValue。
· MapState 将 userValue 封装成 TtlValue。
· ListState 将 element 封装成 TtlValue。

ValueState 中存 Map 与 MapState 有什么区别?

如果 ValueState 中存 Map,则整个 Map 被当做 value,只维护一个时间戳。所以要么整个 Map 过期,要么都不过期。

MapState 中如果存储了 100 个 KV 键值对,则 100 个 KV 键值对都会存储各自的时间戳。因此每个 KV 键值对的 TTL 是相互独立的。

5.总结

本文从实现原理详细分析了 ValueState 中存 Map 与 MapState 有什么区别?下面将从性能和 TTL 两个维度来描述两者的区别。

 

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/514783.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Dubbo-go 源码笔记(一)Server 端开启服务过程

简介&#xff1a; 随着微服务架构的流行&#xff0c;许多高性能 rpc 框架应运而生&#xff0c;由阿里开源的 dubbo 框架 go 语言版本的 dubbo-go 也成为了众多开发者不错的选择。本文将介绍 dubbo-go 框架的基本使用方法&#xff0c;以及从 export 调用链的角度进行 server 端源…

mysql怎么多重查询_mysql基于值的多重查询

{"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],"search_count":[{"count_phone":4,"count":4}]},"card":[{"des":"阿里云数据库专家保驾护航&#xff0c;为用户…

华为在中国建立其全球最大的网络安全透明中心

2021年6月9日&#xff0c;华为最大的网络安全透明中心今天在中国东莞正式启用&#xff0c;来自GSMA、阿联酋、印尼的监管机构及英国标准协会、SUSE等机构代表出席并在活动上发言。借此机会&#xff0c;华为发布了《华为产品安全基线》白皮书&#xff0c;首次将产品安全需求基线…

浅析云控平台画面传输的视频流方案

简介&#xff1a; 本文将小结本次云控平台画面传输的视频流方案。 背景 ARC&#xff08;高德车机云控平台&#xff09;是一个基于车载设备业务深度定制的云控平台&#xff0c;通过该平台我们能够实现远程使用不同类型的车载设备。为了让远程使用者像在本地一样使用车载设备&am…

解读云原生基础设施

简介&#xff1a; 云原生是云计算领域的热点之一。就像 “一千个人眼里有一千个哈姆雷特”&#xff0c;大家对"云原生"的定义也见仁见智。本文将介绍云原生应用架构和生命周期管理的进化方向。 作者 | 易立 阿里云资深技术专家 导读&#xff1a;云原生是云计算领域的…

mysql al32utf8_Oracle 11g更改字符集AL32UTF8为ZHS16GBK

Oracle 9i更改字符集AL32UTF8为ZHS16GBKSQLgt; conn /as sysdba SQLgt; shutdown immediate; SQLgt; startup mount SQLgt; A首页 → 数据库技术背景&#xff1a;阅读新闻Oracle 11g更改字符集AL32UTF8为ZHS16GBK[日期&#xff1a;2011-04-26]来源&#xff1a;Linux社区作者&am…

共筑全场景智慧生态,华为HMS全球应用创新大赛火热开启

6月10日&#xff0c;2021华为HMS全球应用创新大赛&#xff08;Apps UP&#xff09;正式启动。此次大赛以“HMS Innovate For All”为主题&#xff0c;激励全球开发者集成华为HMS Core开放能力开发创新应用&#xff0c;打造全场景数字创新体验&#xff0c;为全球消费者带来全场景…

2020-11-06

一、背景介绍 &#xff08;一&#xff09;流平台通用框架 目前流平台通用的架构一般来说包括消息队列、计算引擎和存储三部分&#xff0c;通用架构如下图所示。客户端或者 web 的 log 日志会被采集到消息队列&#xff1b;计算引擎实时计算消息队列的数据&#xff1b;实时计算…

移动端堆栈关键行定位的新思路

简介&#xff1a; 崩溃堆栈是我们日常应用问题排查中的重要辅助手段&#xff0c;在移动开发上也不例外&#xff0c;为了支持用户在堆栈上的快速定位&#xff0c;我们面临一个看似比较简单问题&#xff1a;高亮崩溃中的关键行, 辅助用户快速定位问题。 阿里云 云原生应用研发平台…

mysql exists in join_子查询、left join、 exists 替代 not in

如果要实现一张表有而另外一张表没有的数据时&#xff0c;我们通常会这么写&#xff1a;SELECT id FROM user WHERE id NOT IN (SELECT id FROM student)not in不会走索引, 可以用exists替代SELECT id FROM user WHERE NOT exists (SELECT id FROM student WHERE user.id stud…

华云数据升级发布“信创云基座“ 用“全芯全栈”支持“信创强国”

2021年6月10日&#xff0c;北京——2021年是我国“十四五”规划的开局之年&#xff0c;也是我国“加快数字发展 建设数字中国”的关键之年。值此历史交汇的关键点&#xff0c;云计算、大数据、人工智能、物联网、工业互联网、区块链等重点产业将对国家数字经济发展起到巨大推动…

最IN的云原生架构,阿里云 Serverless 事件总线 EventBridge 重磅发布

简介&#xff1a; Serverless 是云计算下一个10年的主要形态&#xff0c;通过大量端到端的整合和云服务的集成&#xff0c;能极大地提高研发效率。了解阿里云 Serverless 产品家族的最新进展&#xff0c;包括函数计算FC、Serverless应用引擎SAE和 Serverless事件总线EventBridg…

智能技术改变淘宝,阿里巴巴首次详解核心商业AI体系

简介&#xff1a; 双11背后的万亿人次商品需求&#xff1a;淘宝创造新一代智能科技&#xff0c;淘宝成为超大规模智能APP&#xff0c;前沿科技重塑双11人货场。 图&#xff1a;淘宝APP已成为超大规模智能APP “淘宝APP已成为超大规模智能APP。”阿里巴巴集团资深副总裁周靖人11…

wow mysql dbc_DBC中悲观锁介绍附案例详解

DBC中悲观锁介绍附案例详解了解下DBC中悲观锁&#xff1a;代码如下&#xff1a;BDUtils 工具类&#xff1a;package JDBC;import java.sql.*;public class BDUtils {private BDUtils() {}static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoun…

融云任杰:强互动,RTC下一个“爆点”场景|拟合

从无序中寻找踪迹&#xff0c;从眼前事探索未来。 2021 年正值黄金十年新开端&#xff0c;CSDN 以中立技术社区专业、客观的角度&#xff0c;深度探讨中国前沿 IT 技术演进&#xff0c;推出年度重磅企划栏目——「拟合」&#xff0c;通过对话企业技术高管大咖&#xff0c;跟踪报…

直面最大挑战双11 阿里数据中台为商家带来确定性保障

2020双11将成为史上最具科技含量的一届双11。 11月3日&#xff0c;在阿里巴巴双11技术沟通会上&#xff0c;阿里巴巴集团首席技术官程立公布了大规模运用于2020双11的十大前沿技术&#xff0c;既有基于数字技术的原生商业创新&#xff0c;也有引领时代的基础技术突破。 阿里巴巴…

mysql rpm包安装指定路径_安装rpm包时指定路径

1、安装rpm包可以指定路径&#xff0c;但是安装包时它可能执行一些内置的命令。如果手动指定路径&#xff0c;可能造成部分功能失效比如下面安装jdk的rpm包。默认安装后它会创建个软链接。下面就提示创建软链接失败了。但是不影响使用[rootdawn-cobbler-1-1 /]# mkdir /tools[r…

玩转ECS第6讲 | 弹性计算Region化部署和跨可用区容灾介绍

弹性计算Region化部署和跨可用区容灾本身是非常复杂的课题&#xff0c;本次分享由阿里云弹性计算架构负责人李钟&#xff08;谢顿&#xff09;为大家介绍如何选择Region&#xff0c;同时结合阿里云在Region化部署和跨可用区容灾的实践经验&#xff0c;分享多region部署场景中如…

为产业数字化赋能!施耐德电气数字产业示范园落户北京

无人车在工厂里按部就班输送货物&#xff0c;机械臂快速装备各零件&#xff0c;庞大的车间内&#xff0c;智能生产线只有寥寥的工人……小时候畅想过的智能工厂&#xff0c;在北京亦庄新成立的施耐德电气数字产业示范园里实现了。 新成立的示范园包括施耐德电气&#xff08;中…

在大规模 Kubernetes 集群上实现高 SLO 的方法

简介&#xff1a; 随着 Kubernetes 集群规模和复杂性的增加&#xff0c;集群越来越难以保证高效率、低延迟的交付 pod。本文将分享蚂蚁金服在设计 SLO 架构和实现高 SLO 的方法和经验。 作者 | 蚂蚁金服技术专家 姚菁华&#xff1b;蚂蚁金服高级开发工程师 范康 导读&#xf…