FATE Flow 源码解析 - 作业提交处理流程

背景介绍

FATE 是隐私计算中最有名的开源项目了,从 star 的数量上来看也可以看出来。截止 2023 年 3 月共收获 4.9k 个 star,但是 FATE 一直被认为代码框架复杂,难以理解,作为一个相关的从业者,后续会持续对 FATE 项目的源码进行解析,方便对隐私计算感兴趣的后来者提供一点点帮助。

本文主要基于 FATE-Flow 2022 年 12 月发布的版本 v1.10.0,后续的版本可能略有差异。针对 FATE-Flow 的代码,基于 v1.10.0 的做了一个代码注解的仓库,方便查看具体的代码 https://github.com/hustyichi/FATE-Flow

Fate-Flow 基础介绍

FATE-Flow 是 FATE 项目的重要组成部分,主要用于实现作业的调度,整体的设计可以查看 官方文档
FATE 中提交的训练作业会提交给 Fate-Flow,由 FATE-Flow 统一进行调度执行,最终生成所需训练结果

Fate-Flow 是作为一个 Web 服务对外提供服务的,对应的初始启动文件为 FATE-Flow/python/fate_flow/fate_flow_server.py ,熟悉 flask 的可以看到,最终就是调用 run_simple 创建了一个 Web 服务,根据 app 可以找到 Web 服务的主要代码都在 FATE-Flow/python/fate_flow/apps 目录下,路由注册的代码如下所示:

# 注册 HTTP 路由,将 Fate-Flow/python/fate_flow/apps 以及 Fate-Flow/python/fate_flow/scheduling_apps 下所有 python 文件client_urls_prefix = [register_page(path)for path in search_pages_path(Path(__file__).parent)
]
scheduling_urls_prefix = [register_page(path)for path in search_pages_path(Path(__file__).parent.parent / 'scheduling_apps')
]

可以看到注册的路由主要就是 apps 目录与 scheduling_apps 目录下的路由。

一个注意点:FATE-Flow 是没办法独立运行的,需要作为 FATE 的一部分执行。 FATE-Flow 项目部分依赖的代码,比如 fate_arch 是存在于 FATE 工程下,对应的路径为 FATE/python/fate_arch ,找不到代码时可以联合 FATE 代码仓库进行阅读

作业处理流程

作为一个作业调度的服务,最重要的就是完整的处理流程,先厘清这个主线,其他分支就更容易理解了,主要流程如下所示:

作业提交

作业提交是通过 FATE-Flow/python/fate_flow/apps/job_app.py 中的 submit_job 进行提交的,主要的处理都是通过 DAGScheduler.submit() 来完成的,简化版本的代码如下所示:


def submit(cls, submit_job_conf: JobConfigurationBase, job_id: str = None):# 没有 id 时默认生成唯一 idif not job_id:job_id = job_utils.generate_job_id()submit_result = {"job_id": job_id}job = Job()job.f_job_id = job_idjob.f_dsl = dsljob.f_train_runtime_conf = train_runtime_confjob.f_roles = runtime_conf["role"]job.f_initiator_role = job_initiator["role"]job.f_initiator_party_id = job_initiator["party_id"]job.f_role = job_initiator["role"]job.f_party_id = job_initiator["party_id"]# 通知各个站点 (party) 去创建对应的作业 job 以及对应的任务 tasksstatus_code, response = FederatedScheduler.create_job(job=job)# 更新 job 状态为 WAITINGjob.f_status = JobStatus.WAITING# 将 job 状态同步给各个站点(party)status_code, response = FederatedScheduler.sync_job_status(job=job)return submit_result

可以看到提交作业时,主要是在数据库中的作业表 Job 生成对应的记录,并将作业的数据与状态同步给各个站点,并根据作业 job 的信息初始化生成对应的任务 task,最终实际执行时是以任务为单位进行的

资源申请

提交后作业的状态变为 WAITING,在 DAGScheduler.run_do() 对 WAITING 状态的作业进行了处理,可以看到如下所示:

def run_do(self):# 默认处理 WAITING 状态的第一个创建的 job 进行处理,会分配必要的资源,处理结束状态变为 RUNNINGjobs = JobSaver.query_job(is_initiator=True, status=JobStatus.WAITING, order_by="create_time", reverse=False)if len(jobs):job = jobs[0]self.schedule_waiting_jobs(job=job, lock=True)

可以看到实际处理的方法是 schedule_waiting_jobs() 方法,对应的代码如下所示:

def schedule_waiting_jobs(cls, job):job_id, initiator_role, initiator_party_id, = job.f_job_id, job.f_initiator_role, job.f_initiator_party_id,# 检查作业的前置依赖关系dependence_status_code, federated_dependence_response = FederatedScheduler.dependence_for_job(job=job)if dependence_status_code == FederatedSchedulingStatusCode.SUCCESS:# 申请相关资源apply_status_code, federated_response = FederatedScheduler.resource_for_job(job=job, operation_type=ResourceOperation.APPLY)if apply_status_code == FederatedSchedulingStatusCode.SUCCESS:# 启动 job 执行,状态更新至 RUNNINGcls.start_job(job_id=job_id, initiator_role=initiator_role, initiator_party_id=initiator_party_id)

在此阶段,会申请作业执行所需的资源,资源申请时会调用依次调用各个站点对应的接口,分配必要的 CPU 与内存资源,对应的接口为 FATE-Flow/python/fate_flow/scheduling_apps/party_app.py 中的 /<job_id>/<role>/<party_id>/resource/apply 接口,最终调用 FATE-Flow/python/fate_flow/manager/resource_manager.py 中的 resource_for_job() 方法执行资源的获取,此时会基于数据库表 EngineRegistry 去做资源的动态分配限制。具体的分配策略的实现后续专门介绍,这边就不具体展开了。
可以理解为这个阶段结束,作业执行所需的资源就已经被占用,从而保证后续作业的顺利执行

实际执行

实际作业的执行是在 DAGScheduler.run_do()中完成的,处理的状态是在 RUNNING,可以看到如下所示:

def run_do(self):# 默认处理所有 RUNNING 状态的 jobjobs = JobSaver.query_job(is_initiator=True, status=JobStatus.RUNNING, order_by="create_time", reverse=False)for job in jobs:self.schedule_running_job(job=job, lock=True)

可以看到实际的作业执行是在 schedule_running_job() 中完成的,此方法真正的任务执行是通过调用 TaskScheduler.schedule() 完成的,对应的代码如下所示:


def schedule(cls, job, dsl_parser, canceled=False):initiator_tasks_group = JobSaver.get_tasks_asc(job_id=job.f_job_id, role=job.f_role, party_id=job.f_party_id)waiting_tasks = []# 获取就绪的 tasksfor initiator_task in initiator_tasks_group.values():if initiator_task.f_status == TaskStatus.WAITING:waiting_tasks.append(initiator_task)# 执行所有就绪的 tasksfor waiting_task in waiting_tasks:status_code = cls.start_task(job=job, task=waiting_task)def start_task(cls, job, task):# 申请 task 相关的资源apply_status = ResourceManager.apply_for_task_resource(task_info=task.to_human_model_dict(only_primary_with=["status"]))if not apply_status:return SchedulingStatusCode.NO_RESOURCE# 更新状态为 RUNNING , 并同步给各个站点task.f_status = TaskStatus.RUNNINGupdate_status = JobSaver.update_task_status(task_info=task.to_human_model_dict(only_primary_with=["status"]))FederatedScheduler.sync_task_status(job=job, task=task)# 实际调用参与方执行 taskstatus_code, response = FederatedScheduler.start_task(job=job, task=task)

可以看到作业的执行事实上是依次获取所有就绪的任务 Task,然后执行 FederatedScheduler.start_task() 去执行 task 做成所需完成的功能,而 FederatedScheduler.start_task() 事实上就是发起一次请求调用 party_app.py 中的 /<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/start 接口完成的
实际的任务执行是在 TaskController.start_task() 中完成,对应的代码如下所示:

def start_task(cls, job_id, component_name, task_id, task_version, role, party_id, **kwargs):task_info = {"job_id": job_id,"task_id": task_id,"task_version": task_version,"role": role,"party_id": party_id,}# 根据 id 获取对应的任务task = JobSaver.query_task(task_id=task_id, task_version=task_version, role=role, party_id=party_id)[0]task_info["engine_conf"] = {"computing_engine": run_parameters.computing_engine}# 根据执行环境选择对应的 engine,目前主要是 eggroll 和 spark,默认是 eggrollbackend_engine = build_engine(run_parameters.computing_engine)# 实际执行对应的任务,对于 eggroll,会启动新进程执行 python/fate_flow/worker/task_executor.py 脚本run_info = backend_engine.run(task=task,run_parameters=run_parameters,run_parameters_path=run_parameters_path,config_dir=config_dir,log_dir=job_utils.get_job_log_directory(job_id, role, party_id, component_name),cwd_dir=job_utils.get_job_directory(job_id, role, party_id, component_name),user_name=kwargs.get("user_id"))# 更新 task 相关的执行情况,执行正常的情况下状态为 RUNNINGtask_info.update(run_info)task_info["start_time"] = current_timestamp()cls.update_task(task_info=task_info)task_info["party_status"] = TaskStatus.RUNNINGcls.update_task_status(task_info=task_info)

简单理解任务 Task 最终只是根据作业在独立进程中完成特定命令的执行,最终作业就是一系列任务的执行的组合。当所有任务完成时,作业也就完成了

进度更新

前面提到作业 job 的执行事实上仅仅是一系列对应的任务 task 的执行,因此 FATE-Flow 的进度更新也是根据任务 task 的完成的数量占所有 task 的数量来确定的。具体的代码如下:

def schedule_running_job(cls, job: Job, force_sync_status=False):# 调度 job 进行执行task_scheduling_status_code, auto_rerun_tasks, tasks = TaskScheduler.schedule(job=job, dsl_parser=dsl_parser, canceled=job.f_cancel_signal)# 更新 job 执行的进度以及状态tasks_status = dict([(task.f_component_name, task.f_status) for task in tasks])new_job_status = cls.calculate_job_status(task_scheduling_status_code=task_scheduling_status_code, tasks_status=tasks_status.values())# 根据 job 中已完成 task 的数量与总 task 的数量确定完成的进度total, finished_count = cls.calculate_job_progress(tasks_status=tasks_status)new_progress = float(finished_count) / total * 100if new_job_status != job.f_status or new_progress != job.f_progress:# 通知参与方更新 job 执行的进度信息if int(new_progress) - job.f_progress > 0:job.f_progress = new_progressFederatedScheduler.sync_job(job=job, update_fields=["progress"])cls.update_job_on_initiator(initiator_job=job, update_fields=["progress"])# 有状态变化时通知相关方更新 job 状态信息if new_job_status != job.f_status:job.f_status = new_job_statusFederatedScheduler.sync_job_status(job=job)cls.update_job_on_initiator(initiator_job=job, update_fields=["status"])# 处理结束,执行必要资源回收if EndStatus.contains(job.f_status):cls.finish(job=job, end_status=job.f_status)

可以看到最终就是调用 calculate_job_progress() 计算特定作业 job 中任务 task 完成的数量,最终确定完成的进度。
所有的处理处理结束时,调用 finish() 执行必要的资源回收

总结

本文对 FATE-Flow 的作业的完整执行流程进行了梳理,为了简化删除了大量异常分支的处理,有兴趣的可以结合实际的 FATE-Flow v1.10.0 的源码进行查看,应该会更有裨益

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/44414.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React@16.x(56)Redux@4.x(5)- 实现 createStore

目录 1&#xff0c;分析2&#xff0c;实现2.1&#xff0c;基础实现2.2&#xff0c;优化2.2.1&#xff0c;随机字符串2.2.2&#xff0c;action 的判断2.2.2&#xff0c;监听器的优化 2.3&#xff0c;最终形态 1&#xff0c;分析 createStore()&#xff0c;参数1为 reducer&…

0601STM32TIM

TOC 分为四部分&#xff0c;八小节 一部分&#xff1a;主要讲定时器基本定时的功能&#xff0c;也就是定一个事件&#xff0c;让定时器每隔这个时间产生一个中断&#xff0c;来实现每隔一个固定时间来执行一段程序的目的&#xff0c;比如做一个时钟、秒表&#xff0c;或者使用一…

【Linux】1w详解如何实现一个简单的shell

目录 实现思路 1. 交互 获取命令行 2. 子串分割 解析命令行 3. 指令的判断 内建命令 4. 普通命令的执行 补充&#xff1a;vim 文本替换 整体代码 重点思考 1.getenv和putenv是什么意思 2.代码extern char **environ; 3.内建命令是什么 4.lastcode WEXITSTATUS(sta…

Java-final关键字详解

Java-final关键字详解 一、引言 二、什么是 final 关键字&#xff1f; 三、final 变量 final 局部变量 final 实例变量 final 静态变量 四、final 方法 五、final 类 六、final 关键字的实际应用 1. 定义常量 2. 防止方法被重写 3. 创建不可变类 4. 优化性能 七、…

切割01串(牛客小白月赛98)

题意&#xff1a; 给三个整数n&#xff0c;l&#xff0c;r&#xff0c;和一个字符串s&#xff0c;满足l<|c0-c1|<r就可以切成字符串a和字符串b&#xff0c;c0为字符串a左侧出现0的次数&#xff0c;c1为字符串b右侧出现1的次数&#xff0c;求最多切割次数 知识点&#x…

Onnx 1-深度学习-概述1

Onnx 1-深度学习-概述1 一: Onnx 概念1> Onnx 介绍2> Onnx 的作用3> Onnx 应用场景4> Onnx 文件格式1. Protobuf 特点2. onnx.proto3协议3> Onnx 模型基本操作二:Onnx API1> 算子详解2> Onnx 算子介绍三: Onnx 模型1> Onnx 函数功能

昇思学习打卡-8-计算机视觉/FCN图像语义分割

目录 FCN介绍FCN所用的技术训练数据的可视化模型训练模型推理FCN的优点和不足优点不足 FCN介绍 FCN主要用于图像分割领域&#xff0c;是一种端到端的分割方法&#xff0c;是深度学习应用在图像语义分割的开山之作。通过进行像素级的预测直接得出与原图大小相等的label map。因…

【C++基础】初识C++(2)--引用、const、inline、nullptr

目录 一、引用 1.1 引用的概念和定义 1.2 引用的特性 1.3引用的使用 1.4 const引用 1.5 指针和引用的关系 二、inline 三、nullptr 一、引用 1.1 引用的概念和定义 引⽤不是新定义⼀个变量&#xff0c;⽽是给已存在变量取了⼀个别名&#xff0c;编译器不会为引⽤…

微软的人工智能语音生成器在测试中达到与人类同等水平

微软公司开发了一种新的神经编解码语言模型 Vall-E&#xff0c;在自然度、语音鲁棒性和说话者相似性方面都超越了以前的成果。它是同类产品中第一个在两个流行基准测试中达到人类同等水平的产品&#xff0c;而且显然非常逼真&#xff0c;以至于微软不打算向公众开放。 VALL-E …

Node.js 模块系统

Node.js 模块系统 Node.js 的模块系统是其核心特性之一,它允许开发者将代码组织成可重用的模块。这种系统促进了代码的模块化,使得大型应用程序的构建和管理变得更加容易。本文将深入探讨 Node.js 的模块系统,包括其工作原理、如何创建和使用模块,以及模块系统的优势和局限…

【每日一练】python类和对象现实举例详细讲解

""" 本节课程目的&#xff1a; 1.掌握类描述现实世界实物思想 2.掌握类和对象的关系 3.理解什么事面向对象 """ #比如设计一个闹钟&#xff0c;在这里就新建一个类 class Clock:idNone #闹钟的序列号&#xff0c;也就是类的属性priceNone #闹…

Git最常用操作速查表

Git常用操作 文章目录 Git常用操作1. 克隆/拉取2. 分支操作1. 查看分支2. 创建分支3. 切换到分支4. 删除分支5. 删除远程分支6. 推送分支到远程 3. 暂存库操作4. Git团队规范1. 原则2. 分支设计3. commit备注一般规范 1. 克隆/拉取 git clone xxx 从远程仓库克隆 git rebase…

【开源之美】:WinMerge Files

一、引言 强大的windows端文件比较工具&#xff0c;跟Beyond Compare相比&#xff0c;更为强大。但是这里我们推荐他的原因&#xff0c;不仅是因为作为一个使用的工具&#xff0c;主要是因为他开源&#xff0c;可以通过调试优秀的源代码&#xff0c;进一步的提升C项目设计和编…

Alternative to Receptive field in Transformers and what factors impact it

题意&#xff1a;Transformer中感受野的替代概念及其影响因素 问题背景&#xff1a; I have two transformer networks. One with 3 heads per attention and 15 layers in total and second one with 5 heads per layer and 30 layers in total. Given an arbitrary set of d…

什么是数据模型?数据模型与数据治理有什么关系?

在企业数据治理的广阔领域中&#xff0c;首要且关键的一步是明确沟通数据治理的需求。这包括对企业所持有的数据种类、数据存储位置、以及当前数据管理的具体情况有一个清晰的了解和记录。了解企业的数据资产是制定有效数据治理策略的基础。企业需要识别和盘点所有类型的数据资…

AIGC产品经理学习路径

基础篇&#xff08;课时 2 &#xff09; AIGC 行业视角 AIGC 的行业发展演进&#xff1a;传统模型/深度学习/大模型 AIGC 的产品设计演进&#xff1a;AI Embedded / AI Copilot / AI Agen AIGC 的行业产业全景图 AIGC 的产品应用全景图 AIGC 职业视角 AI 产品经理/ AIGC…

2974.最小数字游戏

1.题目描述 你有一个下标从 0 开始、长度为 偶数 的整数数组 nums &#xff0c;同时还有一个空数组 arr 。Alice 和 Bob 决定玩一个游戏&#xff0c;游戏中每一轮 Alice 和 Bob 都会各自执行一次操作。游戏规则如下&#xff1a; 每一轮&#xff0c;Alice 先从 nums 中移除一个 …

Spring MVC 全面指南:从入门到精通的详细解析

引言&#xff1a; Spring MVC&#xff0c;作为Spring框架的一个重要模块&#xff0c;为构建Web应用提供了强大的功能和灵活性。无论是初学者还是有一定经验的开发者&#xff0c;掌握Spring MVC都将显著提升你的Web开发技能。本文旨在为初学者提供一个全面且易于理解的学习路径…

数据建设实践之大数据平台(五)安装hive

安装hive 上传安装包到/opt/software目录并解压 [bigdata@node101 software]$ tar -zxvf hive-3.1.3-with-spark-3.3.1.tar.gz -C /opt/services [bigdata@node101 services]$ mv apache-hive-3.1.3-bin apache-hive-3.1.3 配置环境变量 export JAVA_HOME=/opt/services…

Debezium系列之:验证mysql、mariadb等兼容mysql协议数据库账号权限

Debezium系列之:验证mysql、mariadb等兼容mysql协议数据库账号权限 一、数据库需要开启binlog二、创建账号和账号需要赋予的权限三、账号具有权限查看日志信息四、验证账号权限五、验证账号能否执行show master status六、验证数据库是否开启binlog一、数据库需要开启binlog …