Flink 1.10 细粒度资源管理解析

相信不少读者在开发 Flink 应用时或多或少会遇到在内存调优方面的问题,比如在我们生产环境中遇到最多的 TaskManager 在容器化环境下占用超出容器限制的内存而被 YARN/Mesos kill 掉[1],再比如使用 heap-based StateBackend 情况下 State 过大导致 GC 频繁影响吞吐。这些问题对于不熟悉 Flink 内存管理的用户来说十分难以排查,而且 Flink 晦涩难懂的内存配置参数更是让用户望而却步,结果是往往将内存调大至一个比较浪费的阈值以尽量避免内存问题。

对于作业规模不大的普通用户而言,这些通常在可以接受的范围之内,但对于上千并行度的大作业来说,浪费资源的总量会非常可观,而且进程的不稳定性导致的作业恢复时间也会比普通作业长得多,因此阿里巴巴的 Blink 团队针对内存管理机制做了大量的优化,并于近期开始合并到 Flink。本文的内容主要基于阿里团队工程师宋辛童在 Flink Forward Beijing 的分享[2],以及后续相关的几个 FLIP 提案。

Flink 目前(1.9)的内存管理

TaskManager 作为 Master/Slave 架构中的 Slave 提供了作业执行需要的环境和资源,最为重要而且复杂,因此 Flink 的内存管理也主要指 TaskManager 的内存管理。

TaskManager 的资源(主要是内存)分为三个层级,分别是最粗粒度的进程级(TaskManager 进程本身),线程级(TaskManager 的 slot)和 SubTask 级(多个 SubTask 共用一个 slot)。

1-640.png图1.TaskManager 资源层级

在进程级,TaskManager 将内存划分为以下几块:

  • Heap Memory: 由 JVM 直接管理的 heap 内存,留给用户代码以及没有显式内存管理的 Flink 系统活动使用(比如 StateBackend、ResourceManager 的元数据管理等)。
  • Network Memory: 用于网络传输(比如 shuffle、broadcast)的内存 Buffer 池,属于 Direct Memory 并由 Flink 管理。
  • Cutoff Memory: 在容器化环境下进程使用的物理内存有上限,需要预留一部分内存给 JVM 本身,比如线程栈内存、class 等元数据内存、GC 内存等。
  • Managed Memory: 由 Flink Memory Manager 直接管理的内存,是数据在 Operator 内部的物理表示。Managed Memory 可以被配置为 on-heap 或者 off-heap (direct memory)的,off-heap 的 Managed Memory 将有效减小 JVM heap 的大小并减轻 GC 负担。目前 Managed Memory 只用于 Batch 类型的作业,需要缓存数据的操作比如 hash join、sort 等都依赖于它。

根据 Managed Memory 是 on-heap 或 off-heap 的不同,TaskManager 的进程内存与 JVM 内存分区关系分别如下:

640.png图2.TaskManager 内存分区

在线程级别,TaskManager 会将其资源均分为若干个 slot (在 YARN/Mesos/K8s 环境通常是每个 TaskManager 只包含 1 个 slot),没有 slot sharing 的情况下每个 slot 可以运行一个 SubTask 线程。除了 Managed Memory,属于同一 TaskManager 的 slot 之间基本是没有资源隔离的,包括 Heap Memory、Network Buffer、Cutoff Memory 都是共享的。所以目前 slot 主要的用处是限制一个 TaskManager 的 SubTask 数。

从作为资源提供者的 TaskManager 角度看, slot 是资源的最小单位,但从使用者 SubTask 的角度看,slot 的资源还可以被细分,因为 Flink 的 slot sharing 机制。默认情况下, Flink 允许多个 SubTask 共用一个 slot 的资源,前提是这些 SubTask 属于同一个 Job 的不同 Task。以官网的例子来说,一个拓扑为 Source(6)-map(6)-keyby/window/apply(6)-sink(1) 的作业,可以运行在 2 个 slot 数为 3 的 TaskManager 上(见图3)。

640-3.png
图3.TaskManager Slot Sharing

这样的好处是,原本一共需要 19 个 slot 的作业,现在只需要作业中与 Task 最大并行度相等的 slot, 即 6 个 slot 即可运行起来。此外因为不同 Task 通常有不同的资源需求,比如 source 主要使用网络 IO,而 map 可能主要需要 cpu,将不同 Task 的 subtask 放到同一 slot 中有利于资源的充分利用。

可以看到,目前 Flink 的内存管理是比较粗粒度的,资源隔离并不是很完整,而且在不同部署模式下(Standalone/YARN/Mesos/K8s)或不同计算模式下(Streaming/Batch)的内存分配也不太一致,为深度平台化及大规模应用增添了难度。

Flink 1.10 细粒度的资源管理

为了改进 Flink 内存管理机制,阿里巴巴的工程师结合 Blink 的优化经验分别就进程、线程、SubTask(Operator)三个层面分别提出了 3 个 FLIP,均以 1.10 为目标 release 版本。下面将逐一介绍每个提案的内容。

FLIP-49: 统一 TaskExecutor 的内存配置

■ 背景

TaskExecutor 在不同部署模式下具体负责作业执行的进程,可以简单视为 TaskManager。目前 TaskManager 的内存配置存在不一致以及不够直观的问题,具体有以下几点:

  • 流批作业内容配置不一致。Managed Memory 只覆盖 DataSet API,而 DataStream API 的则主要使用 JVM 的 heap 内存,相比前者需要更多的调优参数且内存消耗更难把控。
  • RocksDB 占用的 native 内存并不在内存管理里,导致使用 RocksDB 时内存需要很多手动调优。
  • 不同部署模式下,Flink 内存计算算法不同,并且令人难以理解。

针对这些问题,FLIP-49[4] 提议通过将 Managed Memory 的用途拓展至 DataStream 以解决这个问题。DataStream 中主要占用内存的是 StateBackend,它可以从管理 Managed Memory 的 MemoryManager 预留部分内存或分配内存。通过这种方式同一个 Flink 配置可以运行 Batch 作业和 Streaming 作业,有利于流批统一。

■ 改进思路

对比1.jpg

可以看到目前 DataStream 作业的内存分配没有经过 MemoryManager 而是直接向 JVM 申请,容易造成 heap OOM 或者物理内存占用过大[3],因此直接的修复办法是让 MemoryManager 了解到 StateBackend 的内存占用。这会有两种方式,一是直接通过 MemoryManager 申请内存,二是仍使用隐式分配的办法,但需要通知 MemoryManager 预留这部分内存。此外 MemoryManager 申请 off-heap 的方式也会有所变化,从 ByteBuffer#allocateDirect() 变为 Unsafe#allocateMemory(),这样的好处是显式管理的 off-heap 内存可以从 JVM 的 -XX:MaxDirectMemorySize 参数限制中分离出来。

另外 MemoryManager 将不只可以被配置为 heap/off-heap,而是分别拥有对应的内存池。这样的好处是在同一个集群可以运行要求不同类型内存的作业,比如一个 FsStateBackend 的 DataStream 作业和一个 RocksDBStateBackend 的 DataStream 作业。heap/off-heap 的比例可以通过参数配置,1/0 则代表了完全的 on-heap 或者 off-heap。

改进之后 TaskManager 的各内存分区如下:

640-4.png

TaskManager 新内存结构

表格 1.jpg
表格2.jpg

值得注意的是有 3 个分区是没有默认值的,包括 Framework Heap Memory、Total Flink Memory 和 Total Process Memory,它们是决定总内存的最关键参数,三者分别满足不同部署模式的需要。比如在 Standalone 默认下,用户可以配置 Framework Heap Memory 来限制用户代码使用的 heap 内存;而在 YARN 部署模式下,用户可以通过配置 YARN container 的资源来间接设置 Total Process Memory。

FLIP-56: 动态 slot 分配

■ 背景

目前 Flink 的资源是预先静态分配的,也就是说 TaskManager 进程启动后 slot 的数目和每个 slot 的资源数都是固定的而且不能改变,这些 slot 的生命周期和 TaskManager 是相同的。Flink Job 后续只能向 TaskManager 申请和释放这些 slot,而没有对 slot 资源数的话语权。

640-5.png

图5. 静态 slot 分配

这种粗粒度的资源分配假定每个 SubTask 的资源需求都是大致相等的,优点是较为简单易用,缺点在于如果出现 SubTask 的资源需求有倾斜的情况,用户则需要按其中某个 SubTask 最大资源来配置总体资源,导致资源浪费且不利于多个作业复用相同 Flink 集群。

■ 改进思路

FLIP-56[5] 提议通过将 TaskManager 的资源改为动态申请来解决这个问题。TaskManager 启动的时候只需要确定资源池大小,然后在有具体的 Flink Job 申请资源时再按需动态分配 slot。Flink Job 申请 slot 时需要附上资源需求,TaskManager 会根据该需求来确定 slot 资源。

640-6.png
图6. 动态 slot 分配

值得注意的是,slot 资源需求可以是 unknown。提案引入了一个新的默认 slot 资源要求配置项,它表示一个 slot 占总资源的比例。如果 slot 资源未知,TaskManager 将按照该比例切分出 slot 资源。为了保持和现有静态 slot 模型的兼容性,如果该配置项没有被配置,TaskManager 会根据 slot 数目均等分资源生成 slot。

目前而言,该 FLIP 主要涉及到 Managed Memory 资源,TaskManager 的其他资源比如 JVM heap 还是多个 slot 共享的。

FLIP-53: 细粒度的算子资源管理

■ 背景

FLIP-56 使得 slot 的资源可以根据实际需求确定,而 FLIP-53 则探讨了 Operator (算子)层面如何表达资源需求,以及如何根据不同 Operator 的设置来计算出总的 slot 资源。

目前 DataSet API 以及有可以指定 Operator 资源占比的方法(TaskConfig 和 ChainedDriver),因此这个 FLIP 只涉及到 DataStream API 和 Table/SQL API (先在 Blink Planner 实现)。不过提案并没有包括用户函数 API 上的变化(类似新增 dataStream.setResourceSpec() 函数),而是主要讨论 DataStream 到 StreamGraph 的翻译过程如何计算 slot 资源。改进完成后,这三个 API 的资源计算逻辑在底层会是统一的。

■ 改进思路

要理解 Flink 内部如何划分资源,首先要对 Flink 如何编译用户代码并部署到分布式环境的过程有一定的了解。

640-7.jpg

图7. Flink 作业编译部署流程

以 DataStream API 为例,用户为 DataStream 新增 Operator 时,Flink 在底层会将以一个对应的 Transform 来封装。比如 dataStream.map(new MyMapFunc()) 会新增一个 OneInputTransformation 实例,里面包括了序列化的 MyMapFunc 实例,以及 Operator 的配置(包括名称、uid、并行度和资源等),并且记录了它在拓扑中的前一个 Transformation 作为它的数据输入。

当 env.execute() 被调用时,在 client 端 StreamGraphGenerator 首先会遍历 Transformation 列表构造出 StreamGraph 对象(每个 Operator 对应一个 StreamNode),然后 StreamingJobGraphGenerator 再将 StreamGraph 翻译成 DataStream/DataSet/Table/SQL 通用的 JobGraph(此时会应用 chaining policy 将可以合并的 Operator 合并为 OperatorChain,每个 OperatorChain 或不能合并的 Operator 对应一个 JobVertex),并将其传给 JobManager。

JobManager 收到 JobGraph 后首先会将其翻译成表示运行状态的 ExecutionGraph,ExecutionGraph 的每个节点称为 ExecutionJobVertex,对应一个 JobVertex。ExecutionJobVertex 有一个或多个并行度且可能被调度和执行多次,其中一个并行度的一次执行称为 Execution,JobManager 的 Scheduler 会为每个 Execution 分配 slot。

细粒度的算子资源管理将以下面的方式作用于目前的流程:

  1. 用户使用 API 构建的 Operator(以 Transformation 表示)会附带 ResourceSpecs,描述该 Operator 需要的资源,默认为 unknown。
  2. 当生成 JobGraph 的时候,StreamingJobGraphGenerator 根据 ResourceSpecs 计算出每个 Operator 占的资源比例(主要是 Managed Memory 的比例)。
  3. 进行调度的时候,Operator 的资源将被加总成为 Task 的 ResourceProfiles (包括 Managed Memory 和根据 Task 总资源算出的 Network Memory)。这些 Task 会被划分为 SubTask 实例被部署到 TaskManager 上。
  4. 当 TaskManager 启动 SubTask 的时候,会根据各 Operator 的资源占比划分 Slot Managed Memory。划分的方式可以是用户指定每个 Operator 的资源占比,或者默认均等分。

值得注意的是,Scheduler 的调度有分 EAGER 模式和 LAZY_FROM_SOURCE 两种模式,分别用于 Stream 作业和 Batch 作业,它们会影响到 slot 的资源计算。Stream 类型的作业要求所有的 Operator 同时运行,因此资源的需求是急切的(EAGER);而 Batch 类型的作业可以划分为多个阶段,不同阶段的 Operator 不需要同时运行,可以等输入数据准备好了再分配资源(LAZY_FROM_SOURCE)。这样的差异导致如果要充分利用 slot,Batch 作业需要区分不同阶段的 Task,同一时间只考虑一个阶段的 Task 资源。

解决的方案是将 slot sharing 的机制拓展至 Batch 作业。默认情况下 Stream 作业的所有 Operator 都属于 default sharing group,所以全部 Operator 都能共用都一个 slot。对于 Batch 作业而言,我们将整个 JobGraph 根据 suffle 划分为一至多个 Region,每个 Region 属于独立的 sharing group,因而不会被放到同一个 slot 里面。

640-7.png

图8. 不同作业类型的 Slot Sharing Group

总结

随着 Flink 的越来越大规模地被应用于各种业务,目前资源管理机制的灵活性、易用性不足的问题越发凸显,新的细粒度资源管理机制将大大缓解这个问题。此外,新资源管理机制将统一流批两者在 runtime 层资源管理,这也为将最终的流批统一打下基础。对于普通用户而言,这里的大多数变动是透明的,主要的影响应该是出现新的内存相关的配置项需要了解一下。

参考资料:

1.[[FLINK-13477] Containerized TaskManager killed because of lack of memory overhead](https://issues.apache.org/jira/browse/FLINK-13477)

2.机遇与挑战:Apache Flink 资源管理机制解读与展望

3.[[FLINK-7289] Memory allocation of RocksDB can be problematic in container environments](https://issues.apache.org/jira/browse/FLINK-7289)

4.FLIP-49: Unified Memory Configuration for TaskExecutors

5.FLIP-56: Dynamic Slot Allocation

6.FLIP-53: Fine Grained Operator Resource Management

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/515969.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文告诉你雾计算与云计算的区别及对物联网的价值!

作者 | Victoria Puzhevich翻译 | 风车云马,责编 | 晋兆雨出品 | CSDN云计算头图 | 付费下载于视觉中国雾计算是一种分布式计算结构。将数据和应用程序等资源放置在数据源和云之间的逻辑位置。雾计算的优点之一是让许多用户同时连接到互联网上。从本质上说&#xff…

企业微信H5_消息推送概述,发送应用消息示例

文章目录一、阅读和调试1. 文档阅读2. postman发送消息二、实战演练2.1. 发送消息2.2. 前端代码2.3. 后端代码2.4. 发送文本消息2.5. 接收消息三、源码分享3.1. 后端源码3.2. 前端源码一、阅读和调试 1. 文档阅读 文档链接:https://developer.work.weixin.qq.com/…

linux部署springboot项目及后台执行linux命令的两种方式

linux部署springboot项目及后台执行linux命令的两种方式 1.将springboot项目打成jar包 这里推荐两种方法: 第一种:在idea的terminal窗口执行命令:mvn package spring-boot:repackage 第二种:在maven的Lifestyle中点击package打包 打好的jar包会在target目录下. 2.将jar包复…

FAST20 论文学习

BCW: Buffer-Controlled Writes to HDDs for SSD-HDD Hybrid Storage Server 原文地址 为了兼顾访问性能和硬件成本,目前有不少的存储系统都采用了混合存储(Hybrid Storage),使用 SSD 来提供微秒级访问,配合 HDD 来降…

企业微信H5_消息推送接收消息回调配置、内网穿透到本地

文章目录一、环境准备1. 阅读文档2. 登录管控台3. 编辑配置4. 内网穿透5. 测试案例6. 公网访问验证7. 保存配置8. 验证URL有效性二、源码分享2.1. 后端源码2.2. 前端源码一、环境准备 1. 阅读文档 官网文档:https://developer.work.weixin.qq.com/document/path/9…

Serverless 选型:深度解读 Serverless 架构及平台选择

作者 | 悟鹏 阿里巴巴技术专家 导读:本文尝试以日常开发流程为起点,分析开发者在每个阶段要面对的问题,然后组合解决方案,提炼面向 Serverless 的开发模型,并与业界提出的 Serverless 产品形态做对应,为开发…

CSDN 星城大巡礼,长沙“科技之星”年度企业评选正式开启

2020年,长沙市委主要领导发出“软件产业再出发”的号召,颁布了软件三年行动计划。今年5月,CSDN 作为专业的 IT 社区,与长沙高新区签约,将全国总部落户长沙,这一战略决策,让CSDN与长沙的联结进一…

企业微信H5_集成消息解密类,消息推送Get及Post回调处理

文章目录一、 验证URL有效性1. 阅读文档2. 文档分析3. 加解密方案说明4. 下载加解密算法5. 案例分析二、实战集成2.1. 工具类拷贝2.2. 依赖引入2.3. 案例1集成2.4. 参数处理2.5. 重启项目2.6. 验证URL有效性2.7. 验证三、消息接收与处理3.1. 文档阅读3.2. 案例2拷贝3.3. 参数处…

新一代高效Git协同模型AGit-Flow详解

【以下为分享实录,有删节】 Git工作流概述及AGit-Flow的优势 目前,Git已成为源代码管理的标准和基础设施。“为什么Git能这么成功”?Git的创建者Linux在Git十周年的一次采访中,道出了其中的奥秘: The big thing abo…

云原生人物志|APISIX温铭:让API网关“666”

云原生已无处不在,《云原生人物志》是CSDN重磅推出的系列原创采访,我们关注云原生中每一个技术人、公司的身影。知微见著,窥见云原生价值与趋势。 作者 | 宋慧 出品 | CSDN云计算 头图 | 付费下载于IC Photo 第一期,我们采访了唯…

xshell和Xftp连接Linux

xshell和Xftp连接Linux 简单介绍下这两种工具: Xshell :远程连接linux,执行命令行; Xftp :远程连接linux,可视化的实现windows和linux之间的文件传输; 2.关于如何获知linux的ip地址 在虚拟机中登录用户,输入用户名,密码: 此处注意一点:注意区分密码的大小写!!!,因为你在设置密…

企业微信_客户联系,获取客户及客户群列表及详情

文章目录一、调试接口1. 阅读文档2. 权限配置3. 指定应用二、POSTMAN调试接口2.1. 获取配置了客户联系功能的成员列表2.2. 获取客户列表2.3. 获取客户详情2.4. 获取客户群列表2.5. 获取客户群详情三、实战演练代码拆解3.1. 获取配置了客户联系功能的成员列表3.2. 获取客户列表3…

Flink 与 Hive 的磨合期

有不少读者反馈,参考上篇文章《Hive 终于等来了 Flink》部署 Flink 并集成 Hive 时,出现一些 bug 以及兼容性等问题。虽已等来,却未可用。所以笔者增加了这一篇文章,作为姊妹篇。 回顾 在上篇文章中,笔者使用的 CDH 版…

使用arthas排查cpu飙高问题

文章目录一1. 下载arthas2. 启动3. 选择指定jvm进程4. 筛选线程5. 日志分析一 官方文档:https://arthas.aliyun.com/doc 1. 下载arthas curl -O https://arthas.aliyun.com/arthas-boot.jar2. 启动 直接用java -jar的方式启动: java -jar arthas-bo…

oracle 数据库 字符串函数

oracle 数据库 字符串函数 介绍oracle对字符串的操作函数,如图所示,测试字段为:STUDENT 表的 STUNAME 字段 ps:oracle字符串索引从1开始 1.定位索引函数:instr() instr(str,char,begin,n) str:源字符串 char:目标字…

jvm如何排查生产环境cpu飙高的问题

文章目录一、生产环境 cpu 飙高产生的原因?1. CAS 自旋没有控制自旋次数2. 死循环3. 阿里云 Redis 被注入挖矿程序4. 服务器被 DDOS 工具攻击二、windows环境下如何排查cpu飙高问题2.1. 任务管理器2.2. jvisualvm三、环境下如何排查cpu飙高问题3.1. 监控命令3.2. 使…

云原生人物志|华为云CTO张宇昕:云原生已经进入深水区

云原生已无处不在,《云原生人物志》是CSDN重磅推出的系列原创采访,我们关注云原生中每一个技术人、公司的身影。知微见著,窥见云原生价值与趋势。 作者 | 宋慧 出品 | CSDN云计算 头图 | 华为云网站 云原生成为云计算领域当之无愧的最热门技…

开箱即用,Knative 给您极致的容器 Serverless 体验

作者 | 冬岛 阿里巴巴技术专家 导读:托管 Knative 开箱即用,您不需要为这些常驻实例付出任何成本。结合 SLB 云产品提供 Gateway 的能力以及基于突发性能型实例的保留规格功能,极大的节省您的 IaaS 开支,您支付的每一分钱都没有浪…

oracle 11g 数据库cmd修改用户名密码及创建用户

oracle 11g 数据库cmd修改用户名密码及创建用户1. 数据库oracle 11g cmd命令修改用户名和密码1.1. 前言1.2. cmd窗口登录oracle1.3. 更改system用户的密码1.4. 测试修改成果2. 创建新用户并赋予权限2.1. cmd窗口登录oracle2.2.创建用户2.3.分配权限2.4.oracle用户权限等级1. 数…

全国交通智慧升级,阿里云视频上云打造高速公路“视觉中枢”

2019年底,交通运输部办公厅发布《全国高速公路视频联网监测工作实施方案》和《全国高速公路视频联网技术要求》,全面加快推进可视、可测、可控、可服务的高速公路运行监测体系建设。2020年底,基本建立全国高速公路视频联网监测管理机制和制度…