Flink State 最佳实践

本文主要分享与交流 Flink 状态使用过程中的一些经验与心得,当然标题取了“最佳实践”之名,希望文章内容能给读者带去一些干货。本文内容首先是回顾 state 相关概念,并认识和区别不同的 state backend;之后将分别对 state 使用访问以及 checkpoint 容错相关内容进行详细讲解,分享一些经验和心得。

State 概念回顾

我们先回顾一下到底什么是 state,流式计算的数据往往是转瞬即逝, 当然,真实业务场景不可能说所有的数据都是进来之后就走掉,没有任何东西留下来,那么留下来的东西其实就是称之为 state,中文可以翻译成状态。

在下面这个图中,我们的所有的原始数据进入用户代码之后再输出到下游,如果中间涉及到 state 的读写,这些状态会存储在本地的 state backend(可以对标成嵌入式本地 kv 存储)当中。

图片 1.png

接下来我们会在四个维度来区分两种不同的 state:operator state 以及 keyed state。

1. 是否存在当前处理的 key(current key):operator state 是没有当前 key 的概念,而 keyed state 的数值总是与一个 current key 对应。
2. 存储对象是否 on heap: 目前 operator state backend 仅有一种 on-heap 的实现;而 keyed state backend 有 on-heap 和 off-heap(RocksDB)的多种实现。
3. 是否需要手动声明快照(snapshot)和恢复 (restore) 方法:operator state 需要手动实现 snapshot 和 restore 方法;而 keyed state 则由 backend 自行实现,对用户透明。
4. 数据大小:一般而言,我们认为 operator state 的数据规模是比较小的;认为 keyed state 规模是相对比较大的。需要注意的是,这是一个经验判断,不是一个绝对的判断区分标准。

StateBackend 的分类

下面这张图对目前广泛使用的三类 state backend 做了区分,其中绿色表示所创建的operator/keyed state backend 是 on-heap 的,黄色则表示是 off-heap 的。

图片 2.png

一般而言,在生产中,我们会在 FsStateBackend 和 RocksDBStateBackend 间选择:

  • FsStateBackend:性能更好;日常存储是在堆内存中,面临着 OOM 的风险,不支持增量 checkpoint。
  • RocksDBStateBackend:无需担心 OOM 风险,是大部分时候的选择。

RocksDB StateBackend 概览和相关配置讨论

RocksDB 是 Facebook 开源的 LSM 的键值存储数据库,被广泛应用于大数据系统的单机组件中。Flink 的 keyed state 本质上来说就是一个键值对,所以与 RocksDB 的数据模型是吻合的。下图分别是 “window state” 和 “value state” 在 RocksDB 中的存储格式,所有存储的 key,value 均被序列化成 bytes 进行存储。

图片 3.png

在 RocksDB 中,每个 state 独享一个 Column Family,而每个 Column family 使用各自独享的 write buffer 和 block cache,上图中的 window state 和 value state实际上分属不同的 column family。

下面介绍一些对 RocksDB 性能比较有影响的参数,并整理了一些相关的推荐配置,至于其他配置项,可以参阅社区相关文档。

状态建议
state.backend.rocksdb.thread.num后台 flush 和 compaction 的线程数. 默认值 ‘1‘. 建议调大
state.backend.rocksdb.writebuffer.count每个 column family 的 write buffer 数目,默认值 ‘2‘. 如果有需要可以适当调大
state.backend.rocksdb.writebuffer.size每个 write buffer 的 size,默认值‘64MB‘. 对于写频繁的场景,建议调大
state.backend.rocksdb.block.cache-size每个 column family 的 block cache大小,默认值‘8MB’,如果存在重复读的场景,建议调大

State best practice:一些使用 state 的心得

Operator state 使用建议

■ 慎重使用长 list

下图展示的是目前 task 端 operator state 在执行完 checkpoint 返回给 job master 端的 StateMetaInfo 的代码片段。

图片 4.png

由于 operator state 没有 key group 的概念,所以为了实现改并发恢复的功能,需要对 operator state 中的每一个序列化后的元素存储一个位置偏移 offset,也就是构成了上图红框中的 offset 数组。

那么如果你的 operator state 中的 list 长度达到一定规模时,这个 offset 数组就可能会有几十 MB 的规模,关键这个数组是会返回给 job master,当 operator 的并发数目很大时,很容易触发 job master 的内存超用问题。我们遇到过用户把 operator state 当做黑名单存储,结果这个黑名单规模很大,导致一旦开始执行 checkpoint,job master 就会因为收到 task 发来的“巨大”的 offset 数组,而内存不断增长直到超用无法正常响应。

■ 正确使用 UnionListState

union list state 目前被广泛使用在 kafka connector 中,不过可能用户日常开发中较少遇到,他的语义是从检查点恢复之后每个并发 task 内拿到的是原先所有operator 上的 state,如下图所示:

图片 5.png

kafka connector 使用该功能,为的是从检查点恢复时,可以拿到之前的全局信息,如果用户需要使用该功能,需要切记恢复的 task 只取其中的一部分进行处理和用于下一次 snapshot,否则有可能随着作业不断的重启而导致 state 规模不断增长。

Keyed state 使用建议

■ 如何正确清空当前的 state

state.clear() 实际上只能清理当前 key 对应的 value 值,如果想要清空整个 state,需要借助于 applyToAllKeys 方法,具体代码片段如下:

图片 6.png

如果你的需求中只是对 state 有过期需求,借助于 state TTL 功能来清理会是一个性能更好的方案。

■ RocksDB 中考虑 value 值很大的极限场景

受限于 JNI bridge API 的限制,单个 value 只支持 2^31 bytes 大小,如果存在很极限的情况,可以考虑使用 MapState 来替代 ListState 或者 ValueState,因为RocksDB 的 map state 并不是将整个 map 作为 value 进行存储,而是将 map 中的一个条目作为键值对进行存储。

■ 如何知道当前 RocksDB 的运行情况

比较直观的方式是打开 RocksDB 的 native metrics ,在默认使用 Flink managed memory 方式的情况下,state.backend.rocksdb.metrics.block-cache-usage ,state.backend.rocksdb.metrics.mem-table-flush-pending,state.backend.rocksdb.metrics.num-running-compactions 以及 state.backend.rocksdb.metrics.num-running-flushes 是比较重要的相关 metrics。

下面这张图是 Flink-1.10 之后,打开相关 metrics 的示例图:

图片 7.png

而下面这张是 Flink-1.10 之前或者关闭 state.backend.rocksdb.memory.managed 的效果:

图片 8.png

■ 容器内运行的 RocksDB 的内存超用问题

在 Flink-1.10 之前,由于一个 state 独占若干 write buffer 和一块 block cache,所以我们会建议用户不要在一个 operator 内创建过多的 state,否则需要考虑到相应的额外内存使用量,否则容易造成在容器内运行时,相关进程被容器环境所杀。对于用户来说,需要考虑一个 slot 内有多少 RocksDB 实例在运行,一个 RocksDB 中有多少 state,整体的计算规则就很复杂,很难真得落地实施。

Flink-1.10 之后,由于引入了 RocksDB 的内存托管机制,在绝大部分情况下, RocksDB 的这一部分 native 内存是可控的,不过受限于 RocksDB 的相关 cache 实现限制(这里暂不展开,后续会有文章讨论),在某些场景下,无法做到完美控制,这时候建议打开上文提到的 native metrics,观察相关 block cache 内存使用是否存在超用情况,可以将相关内存添加到 taskmanager.memory.task.off-heap.size 中,使得 Flink 有更多的空间给 native 内存使用。

一些使用 checkpoint 的使用建议

■ Checkpoint 间隔不要太短

虽然理论上 Flink 支持很短的 checkpoint 间隔,但是在实际生产中,过短的间隔对于底层分布式文件系统而言,会带来很大的压力。另一方面,由于检查点的语义,所以实际上 Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的 checkpoint,可能会影响整体的性能。当然,这个建议的出发点是底层分布式文件系统的压力考虑。

■ 合理设置超时时间

默认的超时时间是 10min,如果 state 规模大,则需要合理配置。最坏情况是分布式地创建速度大于单点(job master 端)的删除速度,导致整体存储集群可用空间压力较大。建议当检查点频繁因为超时而失败时,增大超时时间。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/516257.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

驻云CEO教你0门槛搭建电商网站,精选产品组合限量神券 低价买

云计算试飞员老蒋开播啦!4月27日 上午10点30分锁定云栖号直播,驻云CEO老蒋手把手教你搭建电商网站,0门槛入门;不同场景下精选产品组合,放心低价买。 观看直播: https://yq.aliyun.com/live/2707 大咖方案力…

uniapp 封装网络请求

文章目录一、前端1. 封装网络请求2. 封装模块请求方法二、后端2.1. 返回对象2.2. 热搜接口三、微信模拟请求3.1. Network3.2. Console一、前端 1. 封装网络请求 创建 utils 文件夹创建 request.js ,封装请求对象 // const BASE_URL https://api.imooc-blog.lgds…

求伯君领衔5代技术人对话,00后浪来袭1024程序员节

10 月 23-25 日,由 CSDN 等多家单位精心筹划的“长沙 中国 1024 程序员节”将盛大举行。程序员节活动以开源为主议题,包括:2 场岳麓尖峰对话,2020 开源技术英雄大会,10 场热门技术分论坛/峰会,创意集市&am…

容器环境自建数据库、中间件一键接入阿里云 Prometheus 监控

阿里云Prometheus服务4月9日发布重大升级,支持容器环境下一键接入MySQL、Redis、MangoDB、ElasticSearch等数据库和Kafka、ZooKeeper等中间件的监控,并提供开箱即用的监控大盘,现在接入更有15天免费试用。 15天免费试用地址,点击这…

状态管理 - 全局状态管理工具

文章目录一、单向数据流1. 理念示意图2. 简述二、什么是全局状态管理模式三、重点概念3.1. 什么是全局状态管理模式?3.2.全局状态管理工具?3.3. 什么是 vuex四、在项目中导入 vuex4.1. 状态管理配置4.2. 注册vuex五、测试 vuex 是否导入成功5.1. 创建搜索…

Nacos SDK for Scala 发布

脱胎于历经阿里巴巴10年生产验证的内部产品,支持具有数百万服务的大规模场景,Nacos作为高性能的动态服务发现、配置管理和服务管理平台从2018年开源以来,版本迭代速度很快,已经发布到1.2.1,已支持企业使用Nacos生产高可…

对话阿里云:开源与自研如何共处?

头图 | CSDN 下载自东方 IC来源 | CSDN(ID:CSDNnews)责编 | 晋兆雨从「鲜为人知」的专业名词,到 2006 年的精准定义,再到如今全面上云时代的「百花齐放」,云计算的发展趟过蛮荒之地,已形成极具规…

Flink 流批一体的实践与探索

自 Google Dataflow 模型被提出以来,流批一体就成为分布式计算引擎最为主流的发展趋势。流批一体意味着计算引擎同时具备流计算的低延迟和批计算的高吞吐高稳定性,提供统一编程接口开发两种场景的应用并保证它们的底层执行逻辑是一致的。对用户来说流批一…

uniapp 用户登录

文章目录一、登录基础1. 登录组件2. 页面引用组件3. 明确登录的实现思路4. 封装 action 调用登录接口5. 保存用户登录状态6. 成已登录的用户视图6. 现退出登录功能一、登录基础 1. 登录组件 对于 登录 功能来说提供了两个登录的入口。 那么大家想一下,现在我们已…

Vue 组件开发 - 数据输入框组件

目录 设计通用组件的一般思路组件效果1. HTML结构2. index.js3. input-number.js3.1 input-number.js代码注解设计通用组件的一般思路 明确需求; 设计API(组件的API:只来自props、events 和 slots); 2.1 确定命名、规则 2.2 编写业务逻辑 即使逻辑的第一版没做好,后续还可…

活动预告 | 2020移动云客户高端峰会即将揭幕,邀您一起携手云端!

一直以来,移动云致力于不断丰富自身产品体系,延伸服务深度和广度,提供“云网一体、贴身服务、随心定制、安全可控”的云服务,致力成为5G时代你身边的智慧云。依托中国移动网络资源与本地化服务团队,移动云为政府、互联…

如何在CDN边缘节点执行你的JavaScript?

Serverless和边缘计算都是目前云计算领域备受瞩目的研究和应用方向。做为拥有超过130Tbps带宽分发能力的阿里云CDN,巨大的边缘网络有超过2800个节点遍布全球,如何将中心源站的计算能力下沉到边缘节点,进一步降低客户端访问延时进而提升用户体…

微信小程序+腾讯地图 获取定位与地图选点插件

文章目录一、思路二、逆地址解析2.1. app.json2.2. 页面加入2.3. 后台代码三、地图插件调用3.1. app.json加入3.2. js页面加入3.3. wxml页面成功截图:腾讯位置服务官网: https://lbs.qq.com一、思路 通过 wx.getLocation 返回经纬度传到后台,后台调用腾…

前端工具安装和运行相关

目录1. nodemon 运行出错:无法运行脚本2. XXX~待续1. nodemon 运行出错:无法运行脚本 无法加载文件 C:\Users\xxx\AppData\Roaming\npm\nodemon.ps1,因为在此系统上禁止运行脚本。 解决办法: 管理员身份打开powerShell&#xf…

友盟+联合EB级云数据 实现友盟域和企业私域数据全面融合

友盟联合EB级云数据 实现友盟域和企业私域数据全面融合 2020-04-28 互联网大数据计算 国内领先的第三方全域数据智能服务商友盟,联合阿里云EB级云数据仓库MaxCompute为企业提供面向分析的,实现友盟域数据与企业私域数据全面融合的自助分析服务“U-DOP…

微信小程序之实现地图定位(使用腾讯位置服务插件)

文章目录一. 腾讯位置服务插件简介1. 完整的地图能力2. 地图插件的优势二. 开通腾讯位置服务三、接入插件四、 开发者密钥配置五、插件的使用一. 腾讯位置服务插件简介 1. 完整的地图能力 腾讯位置服务基于微信提供的小程序插件能力,专注于(围绕&#…

分享16个经典的免费UI素材网站

免费字体 www.fontspace.com 一键AI抠图 www.remove.bg/zh 在线PS工具/可以转换Sketch文件

单人开发场景下的测试环境实践

在软件研发过程中,“测试环境”是部署最频繁、也是开发者使用最频繁的一种运行环境,稳定而易用的测试环境能够极大提高开发者的工作效率和幸福感。为更好的将阿里巴巴在测试环境管理方面的实践和经验跟广大开发者分享,《云效说码》策划了《阿…

函数计算如何访问 PostgreSQL 数据库

函数计算(Function Compute):函数计算 是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为您准备好计算资源,弹性地可靠地运行任务,并…

智能可穿戴迎来长续航焕新活力 出门问问TicWatch Pro 3即将国内上市

人工智能独角兽公司出门问问将于10月21日面向国内市场正式发布全新一代旗舰级全智能手表TicWatch Pro 3(4G版)。出门问问携手新浪科技举行新品线上发布会,出门问问创始人兼CEO李志飞以及出门问问研发高级总监张博将以直播形式与消费者见证Tic…