蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造

张冯君(远远)

Koupleless PMC

蚂蚁集团技术工程师

就职于蚂蚁集团中间件团队,参与维护与建设蚂蚁 SOFAArk 和 Koupleless 开源项目、内部 SOFAServerless 产品的研发和实践。

本文 3488 字,预计阅读 11 分钟 


业务背景

基于开源 Apache Flink 打造的蚂蚁流式计算引擎在蚂蚁有着广泛的应用,基本覆盖蚂蚁所有实时业务。蚂蚁 Flink 实时计算服务的提交服务主要负责提交、重启、重置、停止作业等一系列运维操作。服务端在处理 Flink 作业提交请求的时候,需要对用户提交上来的 Java 代码或者 SQL 代码进行编译,将用户代码翻译成 Flink 引擎可以识别的执行计划,这部分逻辑需要依赖 Flink 引擎层的代码,同时服务端需要支持 Flink 多版本的编译,为保证不同编译请求的正确性、隔离性和安全性,采用了目前业界常用的进程模型来处理编译请求——服务端每收到一次编译请求,都起一个子进程来执行编译逻辑。

上图是一次编译请求的执行过程,这样的方式具有进程模型的固有缺陷:

  1. 响应速度慢:实际业务中大部分编译请求本身并不是很复杂的操作,但是启动子进程需要经历 VM 冷启动、字节码文件加载,以及 JIT 编译技术对解释执行的字节码进行优化,生成本地执行代码的过程还需加上 JVM 内部垃圾回收所耗费的时间。据统计实际平均请求耗时达到了 15s 及以上,严重影响用户提交任务的体验。

  2. 资源消耗大:启动子进程需要更多内存,CPU 耗时更多,需要消耗大量的资源,导致服务端单机处理能力有限,系统稳定性差,极大影响整体的吞吐率。

实时计算团队对编译任务优化做了很多的探索,包括采用 CDS(Class Data Sharing)进程热启动、复用引擎类加载器的线程模型等方式,但都遇到了不同的资源消耗加大,资源不安全等问题,在生产上基本不可用,无法全量推开。

在进一步探索中,实时计算团队尝试将进程模型改造为线程模型,但面临核心问题:线程执行意味着共享 JVM,在需要支持不同 Flink 引擎版本、不同业务自定义 ODF 包的场景下,需要一个机制来为不同编译任务实现版本隔离、包隔离。并确保在运行时不同的编译任务互相不影响且结果正确。

Koupleless 开源以来,除了致力于多应用合并部署节省资源外,一直在进行轻量化模块研发的探索。其特性高度契合 Flink 编译场景:

  • 轻量化:适合依赖简单、逻辑简单,甚至是代码片段的应用,Flink 编译任务通常轻量级,流量触发运行,运行完即可结束。

  • 原生隔离能力:Koupleless 进行合并部署的底层原理就是通过类加载隔离来实现多应用的代码隔离,这块的类隔离框架正好符合多 Flink 编译任务进行类隔离的诉求。

整体方案

读到这里默认大家对 Koupleless 类隔离框架有一定的了解(不了解的话推荐阅读:https://github.com/sofastack/sofa-ark),就不再赘述框架细节,直接分享在原类加载机制下,针对当前编译任务场景做了哪些架构升级的改造和技术特性的支持。

业务改造

Flink 提交作业的核心在于编译用户提交过来的代码,这个过程会解析提交请求,获取提交请求依赖的 Flink 引擎包,Connector/Backend 插件包,以及用户上传的 UDF 包,然后定义环境变量和 Classpath,启动子进程来编译用户代码,获取执行计划。Flink 编译任务改造前的进程模型流程:

编译进程的核心在于正确的构造 Classpath,在进程模型下,很容易做到,而在线程模型下,则是需要保证 ClassLoader 的正确性,需要满足以下需求:

  1. 编译结果准确性:每次编译线程都应该获取正确完整的 ClassLoader,主要包括 Flink Lib 包的 ClassLoader,Flink Connector/Backend 插件的 ClassLoader,用户 UDF 的 ClassLoader。

  2. 资源高效复用:编译请求需要尽可能复用通用 Flink Lib 包,Flink Connector/Backend 包等,以期达到最优的性能。

  3. 多版本类隔离:不同的编译请求会依赖不同版本的引擎,需要对不同版本的 Flink 包进行类隔离,使得多个编译请求同时运行时互不影响。

基于以上诉求,我们设计出了一套类加载的框架

  • 针对 Flink Lib,Flink Connector/Backend 的高频通用的 ClassLoader 会直接构建出来,常驻在内存中,可直接复用,生命周期和服务端进程一致;

  • 针对用户 UDF 代码,构建请求级别的 ClassLoader,生命周期和请求一致。

执行流程如下:

  1. 在接收到编译请求的时候,复用对应的 flink-lib,flink-connector/backend ClassLoader

  2. 基于请求依赖的 UDF 包,构建请求级别的 ClassLoader

  3. 启动线程执行编译逻辑,并且回收 UDF ClassLoader

Flink 编译任务进行 Koupleless 改造后的线程模型:

  1. 服务端收到编译请求

  2. 解析请求参数,得到依赖的 flink-lib 包版本、flink-opt 包版本、UDF 业务自定义包信息等

  3. 根据参数准备本次编译任务需要的所有 Ark Plugin

  4. 若发现存在 Plugin 未安装,动态构造 Classpath 并启动

  5. 所有 Plugin 准备完毕后,构造本地编译任务对应的 Biz

  6. 异步线程启动 Biz 执行编译任务

同时,为了尽可能复用和降低 Plugin、Biz 构建的开销,我们设计整个模式支持运行时动态构建 Plugin、Biz 及其 Classpath,而无需提前准备众多 Plugin FatJar,同时支持运行时动态按需加载 Plugin。

  1. 引擎无关的能力放在 Container 层,也称基座层,实际上就是服务端进程原有的能力;

  2. 引擎相关的能力放在 Plugin 层,将引擎相关的包各自构造独立的 ClassLoader,抽象成 Plugin,实际上就是把上文所说的 flink-lib,flink-connector/backend 抽象成了 Ark Plugin 组件,编译请求来了之后会加载对应的 Plugin,并根据指定顺序来加载类,这一层可以实现 ClassLoader 的复用,注意这一层需要将每个版本的引擎的每个包都抽象成 Plugin,这样可以保证不同的编译请求可以复用正确的 Plugin;

  3. 具体的编译请求由 Biz 层处理,这里的 Biz 层实际上就是针对每一个编译请求,会启动一个新的线程,从 Plugin 层加载需要的 ClassLoader,并构造 Biz ClassLoader 来加载一些特定类,最后使用线程模型来启动编译请求对应的 main 函数,实现线程化编译。

在整个三层结构中,基座层几乎没有特殊的改造,核心设计优化聚焦于 Plugin 层与 Biz 层,设计了更灵活的 Plugin、Biz 构造方式。

由于无法预先得知编译请求需要的 Flink 引擎版本列表,需要提前在服务器中准备好所有版本的 Plugin 供请求来时直接使用,我们支持按需动态构建运行时 Plugin 并动态装载到 JVM 中,因此,我们无需为 Flink 各个 SDK 的所有版本提前构建完整的 Plugin FatJar,同时无需提前做所有版本 Plugin 的预热,只需要在请求到来时,检查所需 Plugin 是否已装载,若没有,按需装载即可。

为了稳定性和该模型的持续运行,我们建设了配套的线程回收逻辑和自愈流程。因为本方案使用了线程模型,不可避免的会存在少量资源泄漏问题,我们设计了一套线程回收逻辑:

  • 定时扫描内存中处于空闲状态的线程池;

  • 判断线程池对应的 ClassLoader,若线程池对应的 ClassLoader 是 Biz CalssLoader 或 Plugin CalssLoader,那么该线程池是编译期间构造的;

  • 此时追踪到对应的编译请求,若请求已经失效,直接强制回收线程池。

此外,为了解决编译线程缓慢的 Meta 增长问题,我们建设了 Meta 检测,超过一定阈值时触发 JVM 重启等自愈流程。

技术特性

阶段一

动态装配 Plugin 及其 Classpath

在 Flink 编译任务中,业务依赖很复杂,不同编译请求可能依赖不同的 flink-lib、flink-opt、UDF 包等,为了编译的正确性需要类隔离,同时因 Flink 引擎包是所有编译任务都需要依赖的,因此对通用的包需要能共享且最大程度提高共享,降低隔离重复加载成本。在 Koupleless 类加载模型下,天然针对这一特性设计了 Plugin、Biz 的加载方案。

每个 Flink、UDF 包都对应一个 Plugin,为了不在服务启动时就加载全量 Plugin,我们支持了动态装配 Plugin 的特性(目前暂无需要动态卸载的场景),根据请求按需加载 Plugin。

同时由于版本很多,为每个包的每个版本都提前构建 Plugin FatJar 也是很大的工程,比如要 flink-version1-plugin、flink-version2-plugin、flink-opt1-plugin、flink-opt2-plugin 等等,我们更轻量化地支持了在运行时根据 Plugin 依赖的 Jar List 动态构造 Classpath 的能力。

如 flink-version1-plugin classpath = common-plugin url + jar1 url + jar2 url + ... + jarN url 动态构成。这样的模式只需要提前构建最简单的 common-plugin 供所有 Flink、Opt Plugin 复用即可。

阶段二

Biz 运行时只对依赖的 Plugin 可见

之前 Ark Container 中的所有 Plugin 对所有 Biz 可见,Biz 进行类加载时,会检索所有的运行时 Plugin Export 列表,查找 export 了当前 class/resource 的 Plugin 进行委托加载。实际 Flink 编译任务中,每次编译请求对应创建一个新 Biz,每次编译请求只会依赖部分指定版本的 Flink、Opt 包,即只依赖部分 Plugin,Biz 运行时进行类加载时只在这些依赖的 Plugin 中查找 Export 信息并进行委托加载。

比如编译任务 Biz1 在进行类加载时,只从依赖的 flink-lib-plugin、flink-opt2-plugin 中委托加载,其余 Plugin 对该编译任务 Biz1 完全不可见。

结果

随机抽取一部分作业,直接测试进程模型和线程模型编译结果的一致性,直接比对生成的执行计划内容。

随机抽取一部分包含 UDF(用户自定义依赖)和不包含 UDF 的作业,直接使用线程模型编译,观察成功率,耗时,机器负载等指标。

整体来说,Flink 编译任务使用线程模型编译从功能上来说,可以正确替代原来进程模型的能力,编译的结果一致,编译出来的执行计划一致。编译任务执行耗时从原来的平均 10s 多降低到 5.6s,平均降低 50%,吞吐从 5~10/min 个编译任务提升到 50/min 及以上,提升 5 倍及以上。

总结

这次 Flink 编译任务,是 Koupleless 在新的实时计算场景中落地的成功探索,以一种新的方式使用类加载框架。在一个大基座上面运行 Job 类模块,流量触发运行,请求完即执行卸载,轻量快捷。欢迎大家碰到相关场景时使用 Koupleless,一起探索 Koupleless 更多的使用场景吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/76257.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用pycharm社区版调试DIFY后端python代码

目录 背景 前置条件 DIFY使用的框架 API服务调试配置步骤(基于tag为0.15.3的版本) 1.配置.env文件 2.关闭docker里面的docker-api-1服务 3.使用DOCKER启动本地环境需要用到的中间件,并暴露端口 注意事项一: 注意事项二&#xff1a…

从 macos 切换到 windows 上安装的工具类软件

起因 用了很多年的macos, 已经习惯了macos上的操作, 期望能在windows上获得类似的体验, 于是花了一些时间来找windows上相对应的软件. 截图软件 snipaste​​​​​​ windows和macos都有的软件, 截图非常好用 文件同步软件 oneDrive: 尝试了不同的同步软件, 还是微软在各…

MySQL体系架构(一)

1.1.MySQL的分支与变种 MySQL变种有好几个,主要有三个久经考验的主流变种:Percona Server,MariaDB和 Drizzle。它们都有活跃的用户社区和一些商业支持,均由独立的服务供应商支持。同时还有几个优秀的开源关系数据库,值得我们了解一下。 1.1.1.Drizzle Drizzle是真正的M…

【项目实训项目博客】prompt初版实践

通过对camel技术的理解,我们向其中添加了市场营销角色的prompt 初版设计如下: chatchainconfig.json { "chain": [ { "phase": "DemandAnalysis", "phaseType": "SimplePhase", "max_turn_step…

[Bond的杂货铺] CKS 证书也到货咯

最近比较忙,忘记写Blog了:) 一年前黑五去官网蹲了一手Cyber Monday,买了英文考试券bundle,当时只考了cka,后来cks差点都忘记了。将近一年后,无意中收到官方的提醒邮件,说考试券本已过期&#x…

【回眸】Linux 内核 (十五) 之 多线程编程 上

前言 进程和线程 区别 线程API 1.创建线程 2.线程退出 3.线程等待 4.线程脱离 5. 线程ID获取及比较 6.创建及销毁互斥锁 7.创建及销毁条件变量 8. 等待 9.触发 多线程编程 后记 前言 高产的几天。 进程和线程 区别 进程——资源分配的最小单位,线…

127.0.0.1本地环回地址(Loopback Address)

127.0.0.1 是计算机网络中的一个特殊IPv4地址,称为本地环回地址(Loopback Address),主要用于以下用途: 1. 基本定义 本地主机(Localhost):该地址始终指向当前正在使用的计算机本身&a…

S7-1200 PLC热电偶和热电阻模拟量模块

热电偶和热电阻模拟量模块 S7-1200 PLC有专用用于对温度进行采集的热电偶模块SM1231 TC和SM 1231RTD。热电偶模块有4AI和8AI两种,下面以SM1231 TC 4AI为例看一下接线图。 该模块一共有4个通道,每个通道有两个接线端子,比如0,0-。…

深度了解向量引论

今天去研究了一个基本数学原理 这个其实需要证明 今天推导了一下这个公式,感觉收获挺大 下面是手工推导过程

Feign修仙指南:声明式HTTP请求的优雅之道

各位在微服务世界摸爬滚打的道友们!今天要解锁的是Spring Cloud的绝世神通——Feign!这货堪称HTTP界的"言出法随",只需定义接口,就能自动生成HTTP请求代码!从此告别手动拼装URL的苦日子,让你的代…

UDP学习笔记(四)UDP 为什么大小不能超过 64KB?

🌐 UDP 为什么大小不能超过 64KB?TCP 有这个限制吗? 在进行网络编程或者调试网络协议时,我们常常会看到一个说法: “UDP 最大只能发送 64KB 数据。” 这到底是怎么回事?这 64KB 是怎么来的?TCP…

LabVIEW 中串口设备与采集卡的同步精度

在 LabVIEW 项目开发中,常涉及多种设备协同工作,如通过串口设备采集温度,利用采集卡(如 NI 6251)采集压力。此时,设备间的同步精度至关重要,它直接影响系统数据的准确性与可靠性。下面&#xff…

DP_AUX辅助通道介绍

DisplayPort(简称DP)是一个由PC及芯片制造商联盟开发,视频电子标准协会(VESA)标准化的数字式视频接口标准。该接口免认证、免授权金,主要用于视频源与显示器等设备的连接,并也支持携带音频、USB…

[GESP202312 五级] 平均分配

文章目录 题目描述输入格式输出格式输入输出样例 #1输入 #1输出 #1 输入输出样例 #2输入 #2输出 #2 提交链接提示解析参考代码 题目描述 小杨认为,所有大于等于 a a a 的完全平方数都是他的超级幸运数。 小杨还认为,所有超级幸运数的倍数都是他的幸运…

[Mysql]buffersize修改

1、找到my.cnf文件位置 ps -ef|grep mysqld 2、编辑my.cnf cd /etc/my.cnf.d vim my.cnf 一般修改为内存的50%~70% 3、重启服务 systemctl restart mysqld

清晰易懂的 Apollo 配置中心安装与使用教程

Apollo 是携程开源的分布式配置管理平台,支持配置实时推送、版本管理、权限控制等功能。本教程将手把手教你完成 Apollo 核心组件安装、基础配置管理及避坑指南,助你快速掌握企业级配置管理能力。 一、环境准备(关键依赖) 1. 基础…

PyTorch池化层详解:原理、实现与示例

池化层(Pooling Layer)是卷积神经网络中的重要组成部分,主要用于降低特征图的空间维度、减少计算量并增强模型的平移不变性。本文将通过PyTorch代码演示池化层的实现原理,并详细讲解最大池化、平均池化、填充(Padding&…

如何构建并优化提示词?

提示词是一个小白最容易上手大模型的方式,提示词就是你告诉大模型应该如何去完成一项工作的系统性的命令,所以写一个好的提示词是比较关键的,那么如何写好一个提示词呢? 要写好提示词,其实就像我们要把一些命令清晰地传…

面向大模型的开发框架LangChain

这篇文章会带给你 如何使用 LangChain:一套在大模型能力上封装的工具框架如何用几行代码实现一个复杂的 AI 应用面向大模型的流程开发的过程抽象 文章目录 这篇文章会带给你写在前面LangChain 的核心组件文档(以 Python 版为例)模型 I/O 封装…

【蓝桥杯】动态规划:线性动态规划

1. 最长上升子序列(LIS) 1.1. 题目 想象你有一排数字,比如:3, 1, 2, 1, 8, 5, 6 你要从中挑出一些数字,这些数字要满足两个条件: 你挑的数字的顺序要和原来序列中的顺序一致(不能打乱顺序) 你挑的数字要一个比一个大(严格递增) 问:最多能挑出多少个这样的数字? …