Mars 如何分布式地执行

先前,我们已经介绍过 Mars 是什么。如今 Mars 已在 Github 开源并对内上线试用,本文将介绍 Mars 已实现的分布式执行架构,欢迎大家提出意见。

架构

Mars 提供了一套分布式执行 Tensor 的库。该库使用 mars.actors 实现的 Actor 模型编写,包含 Scheduler、Worker 和 Web 服务。

用户向 Mars Web Service 提交的是由 Tensor 组成的 Graph。Web Service 接收这些图并提交到一台 Scheduler。在提交作业到各个 Worker 之前,Mars Scheduler 先将 Tensor 图编译成一张由 Chunk 和 Operand 组成的图,此后对图进行分析和切分。此后,Scheduler 在所有 Scheduler 中根据一致性哈希创建一系列控制单个 Operand 执行的 OperandActor。Operand 以符合拓扑序的顺序进行调度,当所有 Operand 完成执行,整张图将被标记为已完成,客户端能够从 Web 中拉取结果。整个执行过程如下图所述。

作业提交

用户端通过 RESTful API 向 Mars 服务提交作业。用户通过编写 Tensor 上的代码,此后通过 session.run(tensor) 将 Tensor 操作转换为 Tensor 构成的 Graph 并提交到 Web API。此后,Web API 将作业提交到 SessionActor 并在集群中创建一个 GraphActor 用于图的分析和管理。用户端则开始查询图的执行状态,直至执行结束。

在 GraphActor 中,我们首先根据 chunks 设置将 Tensor 图转换为 Operand 和 Chunk 组成的图,这一过程使得图可以被进一步拆分并能够并行执行。此后,我们在图上进行一系列的分析以获得 Operand 的优先级,同时向起始 Operand 指派 Worker,关于这一部分的细节可以参考 准备执行图 章节。此后,每个 Operand 均建立一个 OperandActor 用于控制该 Operand 的具体执行。当 Operand 处于 READY状态(如同在 Operand 状态 章节描述的那样),Scheduler 将会为 Operand 选择目标 Worker,随后作业被提交 Worker 进行实际的执行。

执行控制

当一个 Operand 被提交到 Worker,OperandActor 等待 Worker 上的回调。如果 Operand 执行成功,Operand 的后继将被调度。如果 Operand 执行失败,OperandActor 将会尝试数次,如果仍失败则将此次执行标记为失败。

取消作业

用户端可以使用 RESTful API 取消运行中的作业。取消请求将被写入 Graph 的状态存储中,同时 GraphActor 上的取消接口将被调用。如果作业在准备阶段,它将在检测到停止请求后立即结束,否则请求将被下发到每个 OperandActor,并设置状态为 CANCELLING。如果此时 Operand 没有运行,Operand 状态将被直接置为 CANCELLED。如果 Operand 正在运行,停止请求将被下发到 Worker 中并导致一个 ExecutionInterrupted 错误,该错误将返回给 OperandActor,此时 Operand 的状态将被标记为 CANCELLED。

准备执行图

当一个 Tensor 图被提交到 Mars Scheduler,一张包含更细粒度的,由 Operand 和 Chunk 构成的图将根据数据源中包含的 chunks 参数被生成。

图压缩

当完成 Chunk 图的生成后,我们将会通过合并图中相邻的节点来减小图的规模,这一合并也能让我们充分利用 numexpr 这样的加速库来加速计算过程。目前 Mars 仅会合并形成单条链的 Operand。例如,当执行下面的代码

import mars.tensor as mt
a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()

Mars 将会合并 Operand ADD 和 SUM 成为 FUSE 节点。RAND Operand 不会被合并,因为它们并没有和 ADD 及 SUM 组成一条简单的直线。

初始 Worker 分配

为 Operand 分配 Worker 对于图执行的性能而言至关重要。随机分配初始 Operand 可能导致巨大的网络开销,并有可能导致不同 Worker 间作业分配的不平衡。因为非初始节点的分配能够根据其前驱生成数据的物理分布及各个 Worker 的空闲情况方便地确定,在执行图准备阶段,我们只考虑初始 Operand 的分配问题。

初始 Worker 分配需要遵循几个准则。首先,分配给每个 Worker 执行的 Operand 需要尽量保持平衡满,这能够使计算集群在整个执行阶段都有较高的利用率,这在执行的最后阶段显得尤其重要。其次,初始节点分配需要使后续节点执行时的网络”传输尽量小。也就是说,初始点分配需要充分遵循局部性原则。

需要注意的是,上述准则在某些情况下会彼此冲突。一个网络传输量最小的分配方案可能会非常偏斜。我们开发了一套启发式算法来获取两个目标的平衡,该算法描述如下:

  1. 选择列表中的第一个初始节点和第一台机器;
  2. 从 Operand 图转换出的无向图中自该点开始进行深度优先搜索;
  3. 如果另一个未被分配的初始节点被访问到,我们将其分配给步骤1中选择的机器;
  4. 当访问到的 Operand 总数大于平均每个 Worker 接受的 Operand 个数时,停止分配;
  5. 前往步骤1,如果仍有 Worker 未被分配 Operand,否则结束。

调度策略

当一个 Operand 组成的 Graph 执行时,合适的执行顺序会减少集群中暂存的数据总量,从而减小数据被 Spill 到磁盘的可能性。合适的 Worker 能够减少执行时网络传输的总量。

Operand 选择策略

合适的执行顺序能够显著减小集群中暂存的数据总量。下图中展示了 Tree Reduction 的例子,圆形代表 Operand,方形代表 Chunk,红色代表 Operand 正在执行,蓝色代表 Operand 可被执行,绿色代表 Operand 产生的 Chunk 已被存储,灰色代表 Operand 及其相关数据已被释放。假设我们有两台 Worker,并且每个 Operand 的资源使用量均相等,每张图展示的是不同策略下经过5个时间单元的执行后的状态。左图展示的是节点依照层次分别执行,而右图展示的是依照接近深度优先的顺序执行。左图中,有6个 Chunk 的数据需要暂存,右图只有2个。

因为我们的目标是减少存储在集群中的数据总数,我们为进入 READY 状态的 Operand 设定了一套优先级策略:

  1. 深度更大的 Operand 需要被优先执行;
  2. 被更深的 Operand 依赖的 Operand 需要被优先执行;
  3. 输出规模更小的节点需要被优先执行。

Worker 选择策略

当 Scheduler 准备执行图时,初始 Operand 的 Worker 已被确定。我们选择后续 Operand 分配 Worker 的依据是输入数据所在的 Worker。如果某个 Worker 拥有的输入数据大小最大,则该 Worker 将被选择用于执行后续 Operand。如果这样的 Worker 有多个,则各个候选 Worker 的资源状况将起到决定作用。

Operand 状态

Mars 中的每一个操作符都被一个 OperandActor 单独调度。执行的过程是一个状态转移的过程。在 OperandActor 中,我们为每一个状态的进入过程定义一个状态转移函数。起始 Operand 在初始化时位于 READY 状态,非起始 Operand 在初始化时则位于 UNSCHEDULED 状态。当给定的条件满足,Operand 将转移到另一个状态并执行相应的操作。状态转移的流程可以参考下图:

我们在下面描述每个状态的含义及 Mats 在这些状态下执行的操作。

  • UNSCHEDUED:一个 Operand 位于此状态,当它的上游数据没有准备好。
  • READY:一个 Operand 位于此状态,当所有上游输入数据均已准备完毕。在进入这一状态时,OperandActor 向 AssignerActor 中选择的所有 Worker 提交作业。如果某一 Worker 准备运行作业,它将向 Scheduler 发送消息,Scheduler 将向其他 Worker 发送停止运行的消息,此后向该 Worker 发送消息以启动作业执行。
  • RUNNING:一个 Operand 位于此状态,当它的执行已经启动。在进入此状态时,OperandActor 会检查作业是否已经提交。如果尚未提交,OperandActor 将构造一个由 FetchChunk Operand 和当前 Operand 组成的图,并将其提交到 Worker 中。此后,OperandActor 会在 Worker 中注册一个回调来获取作业执行完成的消息。
  • FINISHED:一个 Operand 位于此状态,当作业执行已完成。当 Operand 进入此状态,且 Operand 无后继,一个消息将被发送到 GraphActor 以决定是否整个 Graph 的执行都已结束。与此同时,OperandActor 向它的前驱和后继发送执行完成的消息。如果一个前驱收到此消息,它将检查是否所有的后继都已执行完成。如是,当前 Operand 上的数据可以被释放。如果一个后继收到此消息,它将检查是否所有的前驱已完成。如是,该后继的状态可以转移到 READY。
  • FREED:一个 Operand 位于此状态,当其上所有数据都已被释放。
  • CANCELLED:一个 Operand 位于此状态,当所有重新执行的尝试均告失败。当 Operand 进入此状态,它将把相同状态传递到后继节点。
  • CANCELLING:一个 Operand 位于此状态,当它正在被取消执行。如果此前作业正在执行,一个取消执行的请求会被发送到 Worker 上。
  • CANCELLED:一个 Operand 位于此状态,当执行已被取消并停止运行。如果执行进入这一状态,OperandActor 会尝试将书友的后继都转为 CANCELLING。

Worker 中的执行细节

一个 Mars Worker 包含多个进程,以减少全局解释器锁(GIL)对执行的影响。具体的执行在独立的进程中完成。为减少不必要的内存拷贝和进程间通讯,Mars Worker 使用共享内存来存储执行结果。

当一个作业被提交到 Worker,它将首先被置于队列中等待分配内存。当内存被分配后,其他 Worker 上的数据,或者当前 Worker 上已被 spill 到磁盘的数据将会被重新载入内存中。此时,所有计算需要的数据已经都在内存中,真正的计算过程将启动。当计算完成,Worker 将会把作业放到共享存储空间中。这四种执行状态的转换关系见下图。

执行控制

Mars Worker 通过 ExecutionActor 控制所有 Operand 在 Worker 中的执行。该 Actor 本身并不参与实际运算或者数据传输,只是向其他 Actor 提交任务。

Scheduler 中的 OperandActor 通过 ExecutionActor 上的 enqueue_graph 调用向 Worker 提交作业。Worker 接受 Operand 提交并且将其换存在队列中。当作业可以执行时,ExecutionActor 将会向 Scheduler 发送消息,Scheduler 将确定是否将执行该操作。当 Scheduler 确定在当前 Worker 上执行 Operand,它将调用 start_execution 方法,并通过 add_finish_callback注册一个回调。这一设计允许执行结果被多个位置接收,这对故障恢复有价值。

ExecutionActor 使用 mars.promise 模块来同时处理多个 Operand 的执行请求。具体的执行步骤通过 Promise 类的 then 方法相串联。当最终的执行结果被存储,之前注册的回调将被触发。如果在之前的任意执行步骤中发生错误,该错误会被传导到最后 catch 方法注册的处理函数中并得到处理。

Operand 的排序

所有在 READY 状态的 Operand 都被提交到 Scheduler 选择的 Worker 中。因此,在执行的绝大多数时间里,提交到 Operand 的 Worker 个数通常都高于单个 Worker 能够处理的 Operand 总数。因此,Worker 需要对 Operand 进行排序,此后选择一部分 Worker 来执行。这一排序过程在 TaskQueueActor 中进行,该 Actor 中维护一个优先队列,其中存储 Operand 的相关信息。与此同时,TaskQueueActor 定时运行一个作业分配任务,对处于优先队列头部的 Operand 分配执行资源直至没有多余的资源来运行 Operand,这一分配过程也会在新 Operand 提交或者 Operand 执行完成时触发。

内存管理

Mars Worker 管理两部分内存。第一部分是每个 Worker 进程私有的内存空间,由每个进程自己持有。第二部分是所有进程共享的内存空间,由 Apache Arrow 中的 plasma_store 持有。

为了避免进程内存溢出,我们引入了 Worker 级别的 QuotaActor,用于分配进程内存。当一个 Operand 开始执行前,它将为输入和输出 Chunk 向 QuotaActor 发送批量内存请求。如果剩余的内存空间可以满足请求,该请求会被 QuotaActor 接受。否则,请求将排队等待空闲资源。当相关内存使用被释放,请求的资源会被释放,此时,QuotaActor 能够为其他 Operand 分配资源。

共享内存由 plasma_store 管理,通常会占据整个内存的 50%。由于不存在溢出的可能,这部分内存无需经过 QuotaActor 而是直接通过 plasma_store 的相关方法进行分配。当共享内存使用殆尽,Mars Worker 会尝试将一部分不在使用的 Chunk spill 到磁盘中,以腾出空间容纳新的 Chunk。

从共享内存 spill 到磁盘的 Chunk 数据可能会被未来的 Operand 重新使用,而从磁盘重新载入共享内存的操作可能会非常耗费 IO 资源,尤其在共享内存已经耗尽,需要 spill 其他 Chunk 到磁盘以容纳载入的 Chunk 时。因此,当数据共享并不需要时,例如该 Chunk 只会被一个 Operand 使用,我们会将 Chunk 直接载入进程私有内存中,而不是共享内存,这可以显著减少作业总执行时间。

未来工作

Mars 目前正在快速迭代,近期将考虑实现 Worker 级别的 failover 及 shuffle 支持,Scheduler 级别的 failover 也在计划中。

 


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/519997.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

maven编译 Process terminated【已解决】

在idea中打开了settings文件,找到提示的报错位置,删除或者调整即可

揭秘人工智能(系列):深度学习是否过分夸大?

2012年左右,多伦多大学的研究人员首次使用深度学习来赢下了ImageNet,它是一项非常受欢迎的计算机图像识别竞赛。对于那些参与AI行业的人来说,这是一个大问题,因为计算机视觉是使计算机能够理解图像背景的学科,也是人工…

互联网诞生记:风起于青萍之末

戳蓝字“CSDN云计算”关注我们哦!作者 | 老姜出品 | CSDN云计算(ID:CSDNcloud)“起初阿帕创造阿帕网络。阿帕网络是空虚混沌。渊面黑暗。阿帕的灵运行在网络里面。阿帕说:‘要有一个协议。’就有了一个协议。阿帕看它是…

powerpc 汇编linux,PowerPc下的寻址模式

本篇文章主要描述了Powerpc的寻址模式,让自己对内存映射、寻址的概念理解深入些。在开始讨论寻址模式之前,让我们首先来回顾一下计算机内存的概念。可能之前已经了解了关于内存和编程的一些事实,但是由于现代编程语言正试图淡化计算机中的一些…

2018最佳GAN论文回顾(上)

我很高兴今年参加了一个研究项目,这要求我必须熟悉大量用于计算机视觉方面的深度学习领域的资料。我对过去两、三年内取得的进展感到惊讶,这真的非常令人兴奋和鼓舞,所有不同的子领域,如图像修复、对抗性样本、超分辨率或是三维重…

被神话的大数据——从大数据(big data)到深度数据(deep data)思维转变

自从阿法狗战胜人类顶级棋手之后,深度学习、人工智能变得再一次火热起来。有些人认为,深度学习的再一次兴起是源于硬件的提升、数据量的增多以及高效算法的研究。这并不完全精确,有一个基本的误解是更大的数据会产生更好的机器学习结果。然而…

spring整合rabbitMQ最新版

文章目录一、简单对象1. 依赖2. 生产者3. 消费者4. 配置文件5. spring版本二、复杂对象2.1. 生产者2.2. 消费者一、简单对象 1. 依赖 <!--spring整合rabbitmq--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-ra…

搞定面试算法系列 | 分治算法三步走

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 江子抑转自 | 编程拯救世界主要思想分治算法&#xff0c;即分而治之&#xff1a;把一个复杂问题分成两个或更多的相同或相似子问题&#xff0c;直到最后子问题可以简单地直接求解&#xff0c;最后将子问题的解合并为原问题的…

通过FD耗尽实验谈谈使用HttpClient的正确姿势

一段问题代码实验 在进行网络编程时&#xff0c;正确关闭资源是一件很重要的事。在高并发场景下&#xff0c;未正常关闭的资源数逐渐积累会导致系统资源耗尽&#xff0c;影响系统整体服务能力&#xff0c;但是这件重要的事情往往又容易被忽视。我们进行一个简单的实验&#xf…

与“十“俱进 阿里数据库运维10年演进之路

导语 阿里巴巴集团拥有超大的数据库实例规模&#xff0c;在快速发展的过程中我们在运维管理方面也在不断的面临变化&#xff0c;从物理器到容器、从独占到混布、从本地盘到存储计算分离、从集团内到大促云资源&#xff0c;从开源的MySQL到自研分布式数据库&#xff0c;运维管控…

jmeter 压测 RabbitMQ_单机

文章目录一、MQ压测1. 资料列表2. jmeter软件包3. 插件列表二、远程服务器监控2.1. 监控声明2.2. 监控场景的区别2.3. 软件列表2.4. 插件操作2.5. 软件操作三、jmeter编写MQ脚本3.1.创建线程组3.2. 创建MQ生产者3.3. 创建MQ消费者四、监听器4.1. 聚合报告4.2. 观察树4.3. 监控五…

云+X案例展 | 民生类:纷享销客助力沃得农机构筑智能化、信息化之路

本案例由纷扬科技投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。​​​​“2004年到20…

如何“神还原”数据中心? 阿里联合NTU打造了工业级精度的仿真沙盘!

如何保障数据中心的稳定运行&#xff0c;是多年来一直困扰业界的难题。机房环境如果发生未预期变化&#xff0c;可能造成难以估计的损失。所以我们希望能构建一个“变更沙盘”&#xff0c;在真实变更之前&#xff0c;操作人员可以先在沙盘中进行试变更&#xff0c;若变更效果在…

RabbitMQ 手动签收

下面这基础地方都必须设置&#xff0c;不然无效 // 同一时刻服务器只会发一条消息给消费者channel.basicQos(1); // 消息的标识&#xff0c;false只确认当前一个消息收到&#xff0c;true确认所有consumer获得的消息 channel.basicAck(message.getMessageProperties().getDeli…

把16进制转换为ascii字符c语言,ASCII转16进制C语言

满意答案u2gseftj278推荐于 2016.03.01采纳率&#xff1a;56% 等级&#xff1a;11已帮助&#xff1a;14340人以前引别人的&#xff0c;自己懒得再写了呵呵。原理就是这样的&#xff0c;你可以直接用的//函 数 名&#xff1a;AscToHex()//功能描述&#xff1a;把ASCII转换为1…

四大维度全景揭秘阿里巴巴智能对话开发平台

在阿里巴巴智能服务事业部的X蜂会上&#xff0c;小蜜北京团队的高级算法专家李永彬&#xff08;水德&#xff09;分享了小蜜智能对话开发平台的构建&#xff0c;围绕平台来源、设计理念、核心技术、业务落地情况四大维度讲述了一个较为完整的智能任务型对话开发平台的全景。以下…

2019年技术盘点云数据库篇(二):阿里云携手MongoDB率先上线4.2数据库 云上数据库已是大势所趋...

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 刘丹出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;随着技术的飞速发展&#xff0c;云数据库在云计算的大背景下&#xff0c;作为一种新兴的共享基础架构方法逐渐发展起来&#xff0c;它极大地增强了数据…

Oracle 查看表空间的大小及使用情况sql语句

SELECT a.tablespace_name "表空间名称", total / (1024 * 1024) "表空间大小(M)", free / (1024 * 1024) "表空间剩余大小(M)", (total - free) / (1024 * 1024 ) "表空间使用大小(M)", total / (1024 * 1024 * 1024) "表空…

高可用、弹性动态的金融级移动架构在蚂蚁金服的演进之路

本文基于重岳在 2018 年 Arch Summit 北京站的分享内容进行总结&#xff0c;希望通过本篇文章介绍近些年来支付宝在移动端架构的上演进和思考&#xff0c;期冀能给读者们带来些许帮助。 支付宝作为国民级应用&#xff0c;当前全球用户已经超过 10 亿&#xff0c;提供了超过 200…

Android代码混淆方法,Android 代码混淆零基础入门

内容提要本篇文章主要有三个部分&#xff0c;让读者读完后能自己写规则混淆项目对Android代码怎么开启混淆做一个简单的介绍。对混淆规则做一个简单介绍&#xff1b;在混淆过后Crash日志反推代码工具retrace.bat、可视化反推工具GUI说明。对混淆的一个简单介绍&#xff1a;Andr…