python分布式框架_高性能分布式执行框架——Ray

Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,它使用了和传统分布式计算系统不一样的架构和对分布式计算的抽象方式,具有比Spark更优异的计算性能。

Ray目前还处于实验室阶段,最新版本为0.2.2版本。虽然Ray自称是面向AI应用的分布式计算框架,但是它的架构具有通用的分布式计算抽象。本文对Ray进行简单的介绍,帮助大家更快地了解Ray是什么,如有描述不当的地方,欢迎不吝指正。

一、简单开始

首先来看一下最简单的Ray程序是如何编写的。

# 导入ray,并初始化执行环境

import ray

ray.init()

# 定义ray remote函数

@ray.remote

def hello():

return "Hello world !"

# 异步执行remote函数,返回结果id

object_id = hello.remote()

# 同步获取计算结果

hello = ray.get(object_id)

# 输出计算结果

print hello

在Ray里,通过Python注解@ray.remote定义remote函数。使用此注解声明的函数都会自带一个默认的方法remote,通过此方法发起的函数调用都是以提交分布式任务的方式异步执行的,函数的返回值是一个对象id,使用ray.get内置操作可以同步获取该id对应的对象。熟悉Java里的Future机制的话对此应该并不陌生,或许会有人疑惑这和普通的异步函数调用没什么大的区别,但是这里最大的差异是,函数hello是分布式异步执行的。

remote函数是Ray分布式计算抽象中的核心概念,通过它开发者拥有了动态定制计算依赖(任务DAG)的能力。比如:

@ray.remote

def A():

return "A"

@ray.remote

def B():

return "B"

@ray.remote

def C(a, b):

return "C"

a_id = A.remote()

b_id = B.remote()

c_id = C.remote(a_id, b_id)

print ray.get(c_id)

例子代码中,对函数A、B的调用是完全并行执行的,但是对函数C的调用依赖于A、B函数的返回结果。Ray可以保证函数C需要等待A、B函数的结果真正计算出来后才会执行。如果将函数A、B、C类比为DAG的节点的话,那么DAG的边就是函数C参数对函数A、B计算结果的依赖,自由的函数调用方式允许Ray可以自由地定制DAG的结构和计算依赖关系。另外,提及一点的是Python的函数可以定义函数具有多个返回值,这也使得Python的函数更天然具备了DAG节点多入和多出的特点。

405877-20171126235604765-82501554.png

二、系统架构

Ray是使用什么样的架构对分布式计算做出如上抽象的呢,一下给出了Ray的系统架构(来自Ray论文,参考文献1)。

405877-20171126235615625-1165176825.png

作为分布式计算系统,Ray仍旧遵循了典型的Master-Slave的设计:Master负责全局协调和状态维护,Slave执行分布式计算任务。不过和传统的分布式计算系统不同的是,Ray使用了混合任务调度的思路。在集群部署模式下,Ray启动了以下关键组件:

GlobalScheduler:Master上启动了一个全局调度器,用于接收本地调度器提交的任务,并将任务分发给合适的本地任务调度器执行。

RedisServer:Master上启动了一到多个RedisServer用于保存分布式任务的状态信息(ControlState),包括对象机器的映射、任务描述、任务debug信息等。

LocalScheduler:每个Slave上启动了一个本地调度器,用于提交任务到全局调度器,以及分配任务给当前机器的Worker进程。

Worker:每个Slave上可以启动多个Worker进程执行分布式任务,并将计算结果存储到ObjectStore。

ObjectStore:每个Slave上启动了一个ObjectStore存储只读数据对象,Worker可以通过共享内存的方式访问这些对象数据,这样可以有效地减少内存拷贝和对象序列化成本。ObjectStore底层由Apache Arrow实现。

Plasma:每个Slave上的ObjectStore都由一个名为Plasma的对象管理器进行管理,它可以在Worker访问本地ObjectStore上不存在的远程数据对象时,主动拉取其它Slave上的对象数据到当前机器。

需要说明的是,Ray的论文中提及,全局调度器可以启动一到多个,而目前Ray的实现文档里讨论的内容都是基于一个全局调度器的情况。我猜测可能是Ray尚在建设中,一些机制还未完善,后续读者可以留意此处的细节变化。

Ray的任务也是通过类似Spark中Driver的概念的方式进行提交的,有所不同的是:

Spark的Driver提交的是任务DAG,一旦提交则不可更改。

而Ray提交的是更细粒度的remote function,任务DAG依赖关系由函数依赖关系自由定制。

论文给出的架构图里并未画出Driver的概念,因此我在其基础上做了一些修改和扩充。

405877-20171126235635515-2119278255.png

Ray的Driver节点和和Slave节点启动的组件几乎相同,不过却有以下区别:

Driver上的工作进程DriverProcess一般只有一个,即用户启动的PythonShell。Slave可以根据需要创建多个WorkerProcess。

Driver只能提交任务,却不能接收来自全局调度器分配的任务。Slave可以提交任务,也可以接收全局调度器分配的任务。

Driver可以主动绕过全局调度器给Slave发送Actor调用任务(此处设计是否合理尚不讨论)。Slave只能接收全局调度器分配的计算任务。

三、核心操作

基于以上架构,我们简单讨论一下Ray中关键的操作和流程。

1. ray.init()

在PythonShell中,使用ray.init()可以在本地启动ray,包括Driver、HeadNode(Master)和若干Slave。

import ray

ray.init()

如果是直连已有的Ray集群,只需要指定RedisServer的地址即可。

ray.init(redis_address="")

本地启动Ray得到的输出如下:

>>> ray.init()

Waiting for redis server at 127.0.0.1:58807 to respond...

Waiting for redis server at 127.0.0.1:23148 to respond...

Allowing the Plasma store to use up to 13.7439GB of memory.

Starting object store with directory /tmp and huge page support disabled

Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================

View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5

======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}

>>>

本地启动Ray时,可以看到Ray的WebUI的访问地址。

2. ray.put()

使用ray.put()可以将Python对象存入本地ObjectStore,并且异步返回一个唯一的ObjectID。通过该ID,Ray可以访问集群中任一个节点上的对象(远程对象通过查阅Master的对象表获得)。

对象一旦存入ObjectStore便不可更改,Ray的remote函数可以将直接将该对象的ID作为参数传入。使用ObjectID作为remote函数参数,可以有效地减少函数参数的写ObjectStore的次数。

@ray.remote

def f(x):

pass

x = "hello"

# 对象x往ObjectStore拷贝里10次

[f.remote(x) for _ in range(10)]

# 对象x仅往ObjectStore拷贝1次

x_id = ray.put(x)

[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()可以通过ObjectID获取ObjectStore内的对象并将之转换为Python对象。对于数组类型的对象,Ray使用共享内存机制减少数据的拷贝成本。而对于其它对象则需要将数据从ObjectStore拷贝到进程的堆内存中。

如果调用ray.get()操作时,对象尚未创建好,则get操作会阻塞,直到对象创建完成后返回。get操作的关键流程如下:

Driver或者Worker进程首先到ObjectStore内请求ObjectID对应的对象数据。

如果本地ObjectStore没有对应的对象数据,本地对象管理器Plasma会检查Master上的对象表查看对象是否存储其它节点的ObjectStore。

如果对象数据在其它节点的ObjectStore内,Plasma会发送网络请求将对象数据拉到本地ObjectStore。

如果对象数据还没有创建好,Master会在对象创建完成后通知请求的Plasma读取。

如果对象数据已经被所有的ObjectStore移除(被LRU策略删除),本地调度器会根据任务血缘关系执行对象的重新创建工作。

一旦对象数据在本地ObjectStore可用,Driver或者Worker进程会通过共享内存的方式直接将对象内存区域映射到自己的进程地址空间中,并反序列化为Python对象。

另外,ray.get()可以一次性读取多个对象的数据:

result_ids = [ray.put(i) for i in range(10)]

ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

4. @ray.remote

Ray中使用注解@ray.remote可以声明一个remote function。remote函数时Ray的基本任务调度单元,remote函数定义后会立即被序列化存储到RedisServer中,并且分配了一个唯一的ID,这样就保证了集群的所有节点都可以看到这个函数的定义。

不过,这样对remote函数定义有了一个潜在的要求,即remote函数内如果调用了其它的用户函数,则必须提前定义,否则remote函数无法找到对应的函数定义内容。

remote函数内也可以调用其它的remote函数,Driver和Slave每次调用remote函数时,其实都是向集群提交了一个计算任务,从这里也可以看到Ray的分布式计算的自由性。

Ray中调用remote函数的关键流程如下:

调用remote函数时,首先会创建一个任务对象,它包含了函数的ID、参数的ID或者值(Python的基本对象直接传值,复杂对象会先通过ray.put()操作存入ObjectStore然后返回ObjectID)、函数返回值对象的ID。

任务对象被发送到本地调度器。

本地调度器决定任务对象是在本地调度还是发送给全局调度器。如果任务对象的依赖(参数)在本地的ObejctStore已经存在且本地的CPU和GPU计算资源充足,那么本地调度器将任务分配给本地的WorkerProcess执行。否则,任务对象被发送给全局调度器并存储到任务表(TaskTable)中,全局调度器根据当前的任务状态信息决定将任务发给集群中的某一个本地调度器。

本地调度器收到任务对象后(来自本地的任务或者全局调度分配的任务),会将其放入一个任务队列中,等待计算资源和本地依赖满足后分配给WorkerProcess执行。

Worker收到任务对象后执行该任务,并将函数返回值存入ObjectStore,并更新Master的对象表(ObjectTable)信息。

@ray.remote注解有一个参数num_return_vals用于声明remote函数的返回值个数,基于此实现remote函数的多返回值机制。

@ray.remote(num_return_vals=2)

def f():

return 1, 2

x_id, y_id = f.remote()

ray.get(x_id) # 1

ray.get(y_id) # 2

@ray.remote注解的另一个参数num_gpus可以为任务指定GPU的资源。使用内置函数ray.get_gpu_ids()可以获取当前任务可以使用的GPU信息。

@ray.remote(num_gpus=1)

def gpu_method():

return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())

5. ray.wait()

ray.wait()操作支持批量的任务等待,基于此可以实现一次性获取多个ObjectID对应的数据。

# 启动5个remote函数调用任务

results = [f.remote(i) for i in range(5)]

# 阻塞等待4个任务完成,超时时间为2.5s

ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包含了5个ObjectID,使用ray.wait操作可以一直等待有4个任务完成后返回,并将完成的数据对象放在第一个list类型返回值内,未完成的ObjectID放在第二个list返回值内。如果设置了超时时间,那么在超时时间结束后仍未等到预期的返回值个数,则已超时完成时的返回值为准。

6. ray.error_info()

使用ray.error_info()可以获取任务执行时产生的错误信息。

>>> import time

>>> @ray.remote

>>> def f():

>>> time.sleep(5)

>>> raise Exception("This task failed!!")

>>> f.remote()

Remote function __main__.f failed with:

Traceback (most recent call last):

File "", line 4, in f

Exception: This task failed!!

You can inspect errors by running

ray.error_info()

If this driver is hanging, start a new one with

ray.init(redis_address="127.0.0.1:65452")

>>> ray.error_info()

[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n File "", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

7. Actor

Ray的remote函数只能处理无状态的计算需求,有状态的计算需求需要使用Ray的Actor实现。在Python的class定义前使用@ray.remote可以声明Actor。

@ray.remote

class Counter(object):

def __init__(self):

self.value = 0

def increment(self):

self.value += 1

return self.value

使用如下方式创建Actor对象。

a1 = Counter.remote()

a2 = Counter.remote()

Ray创建Actor的流程为:

Master选取一个Slave,并将Actor创建任务分发给它的本地调度器。

创建Actor对象,并执行它的构造函数。

从流程可以看出,Actor对象的创建时并行的。

通过调用Actor对象的方法使用Actor。

a1.increment.remote() # ray.get returns 1

a2.increment.remote() # ray.get returns 1

调用Actor对象的方法的流程为:

首先创建一个任务。

该任务被Driver直接分配到创建该Actor对应的本地执行器执行,这个操作绕开了全局调度器(Worker是否也可以使用Actor直接分配任务尚存疑问)。

返回Actor方法调用结果的ObjectID。

为了保证Actor状态的一致性,对同一个Actor的方法调用是串行执行的。

四、安装Ray

如果只是使用Ray,可以使用如下命令直接安装。

pip intall ray

如果需要编译Ray的最新源码进行安装,按照如下步骤进行(MaxOS):

# 更新编译依赖包

brew update

brew install cmake pkg-config automake autoconf libtool boost wget

pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six

# 下载源码编译安装

git clone https://github.com/ray-project/ray.git

cd ray/python

python setup.py install

# 测试

python test/runtest.py

# 安装WebUI需要的库[可选]

pip install jupyter ipywidgets bokeh

# 编译Ray文档[可选]

cd ray/doc

pip install -r requirements-doc.txt

make html

open _build/html/index.html

我在MacOS上安装jupyter时,遇到了Python的setuptools库无法升级的情况,原因是MacOS的安全性设置问题,可以使用如下方式解决:

重启电脑,启动时按住Command+R进入Mac保护模式。

打开命令行,输入命令csrutils disable关闭系统安全策略。

重启电脑,继续安装jupyter。

安装完成后,重复如上的方式执行csrutils enable,再次重启即可。

进入PythonShell,输入代码本地启动Ray:

import ray

ray.init()

浏览器内打开WebUI界面如下:

405877-20171126235654640-1767073624.png

参考资料

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/431010.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

原 hibernate与mysql字段类型对应关系

原 hibernate与mysql字段类型对应关系 发表于8个月前(2015-04-17 08:56) 阅读(1102) | 评论(0) 2人收藏此文章, 我要收藏赞01月16日厦门 OSC 源创会火热报名中,奖品多多哦 摘要 hibernate与mysql字段类型对应关系 …

下拉选择框 其他_列表框 vs 下拉列表,哪个更好?

许多UI控件允许用户选择选项,它们包括复选框、单选按钮、切换开关、步进器、列表框和下拉列表。 在本文中,作者对列表框和下拉列表进行了定义,讨论何时使用各个元素,以及各个情况下使用哪一种更加合适。摘要列表框和下拉列表是紧凑…

springboot整合elasticsearch_Spring Boot学习10_整合Elasticsearch

一、Elasticsearch概念•以 员工文档 的形式存储为例:一个文档代表一个员工数据。存储数据到 ElasticSearch 的行为叫做 索引 ,但在索引一个文档之前,需要确定将文档存储在哪里。•一个 ElasticSearch 集群可以 包含多个 索引 ,相…

php制作图片轮播_图片轮播效果实现方法

图片轮播效果如何实现呢本文主要介绍了JQuery实现图片轮播效果的制作原理以及实现代码,文章末尾附上源码下载,具有很好的参考价值。下面跟着小编一起来看下吧,希望能帮助到大家。用JQuery操作DOM确实很方便,并且JQuery提供了非常人…

python有趣的面试题_一道3行代码的Python面试题,我懵逼了...|python基础教程|python入门|python教程...

https://www.xin3721.com/eschool/pythonxin3721/ 前言 本文的文字及图片来源于网络,仅供学习、交流使用,不具有任何商业用途,版权归原作者所有,如有问题请及时联系我们以作处理。 今天来说说交流群里一位群友问的Python题目。刚开始由于没有电脑,我也没有运行出来&…

个人应用开发详记. (三)

好久没来更新了... IM即时通讯已进入最后阶段. 各个功能模块 框架基本写好. 剩下的就是细节上的优化了 由于内容上并没有什么大幅度的变动 . 就不上图了 . 元旦回家 放假3天~ 争取年前搞定此APP 转载于:https://www.cnblogs.com/ImyFen/p/5089968.html

r语言清除变量_如何优雅地计算多变量 | R语言进阶

社会科学研究经常会遇到“超多变量”的情况——多量表、多维度、多题项,以及复杂的正反计分题……如何更高效地计算量表总分?如何更简洁地进行反向计分?传统的统计工具(Excel、SPSS等)虽然也能解决这些问题&#xff0c…

php模板初级教程,风格模板初级不完全修改教程

风格模板初级不完全修改教程更新时间:2006年10月09日 00:00:00 作者:就自己的一点点经验,希望能给初接触模版修改的朋友有个参考。关于模版修改, 引用星星签名里的一句话“学好HTML很重要” :)一个风格,…

语音对讲软件_微信语音转播软件是哪个?怎样一键转发?

文末送社群运营资料有一句话说得好,好马配好鞍,如果经验丰富的社群工作人员想要看到良好的社群运营效果,那单单凭借个人的力量是远远不够的,建议将希望寄托在第三方工具的身上,比如微信语音转播软件就是绝佳选择。按照…

php if require,关于php:required_if Laravel 5验证

我有一个表格,用户可以填写出售房屋的信息。 对于其中一项投入,用户必须选择"待售"或"待租"天气。 如果是For Sale,则会出现两个价格输入字段,如果是For Rent,则会基于jQuery显示一些其他价格输入…

asp.net 安装element ui_不用上官网,自己部署一套Element官方最新文档

ElementUI官方的访问速度一直很慢,公司内网也无法进行外网访问。故研究了下最新的ElementUI API(2.13.2)部署教程。先上效果图ElementUI文档部署过程到github下载最新的elementui源码,这里我使用git下载到本地git clone https://github.com/ElemeFE/elem…

如何写一个计算器?

考虑这样一个问题,给定一个字符串,“11(34)-2*38/2”,如何将它转化为如下形式: “112” “347” “279” “2*36” “9-63” “8/24” “347” 换句话说,就是如何将字符串按照四则运算计算出来,如何写一个计…

由于在客户端检测到一个协议错误_HTTP协议,你了解多少?

HTTP简介HTTP协议是Hyper Text Transfer Protocol(超文本传输协议)的缩写,是用于从万维网(WWW:World Wide Web )服务器传输超文本到本地浏览器的传送协议。HTTP是一个基于TCP/IP通信协议来传递数据(HTML 文件, 图片文件, 查询结果等)。HTTP是一个属于应用层的面向对象的协议&am…

idea中没有j2ee_idea神器功能大全

IDEA 全称 IntelliJ IDEA,是java语言开发的集成环境,IntelliJ在业界被公认为最好的java开发工具之一,尤其在智能代码助手、代码自动提示、重构、J2EE支持、各类版本工具(git、svn、github等)、JUnit、CVS整合、代码分析、 创新的GUI设计等方面…

linux 固定ip_linux固定IP

在新安装的Linux系统命令行下,敲入:ifconfig,显示如下界面。上面这张图显示网卡没有启动,那么我们敲入代码:ifup eth0启动网卡。网卡启动后,我们可以看出,IP地址和网关等其他信息都已经出现。但是我们需要的…

php编译称opcode文件,PHP源码保护和性能加速

什么是Opcache?每一次执行 PHP 脚本的时候,该脚本都需要被编译成字节码,而 Opcache 可以对该字节码进行缓存,这样,下次请求同一个脚本的时候,该脚本就不需要重新编译,这极大节省了脚本的执行时间&#xff…

9553下载站java,java se development kit11最新版 64位

java se development kit11,简称java11,是一款专门进行java开发的编程软件,这款软件还拥有applet和组件的开发环境等操作,是程序员们进行java开发的飞铲不错软件,如果你喜欢这款软件,那就来下载基本介绍自从…

java sleep方法_一文搞懂 Java 线程中断!

在之前的一文《如何”优雅”地终止一个线程》详细说明了 stop 终止线程的坏处及如何优雅地终止线程,那么还有别的可以终止线程的方法吗?答案是肯定的,它就是我们今天要分享的——线程中断。下面的这断代码大家应该再熟悉不过了,线…

java 观察者模式_图解Java设计模式之观察者模式

图解Java设计模式之观察者模式天气预报项目需求天气预报设计方案 1 - 普通方案观察者模式(Observer)原理观察者模式解决天气预报需求观察者模式在JDK应用的源码分析天气预报项目需求1)气象站可以将每天测量到的湿度、温度、气压等等以公告的形…

怎么在同一页中分页_分库分表业界难题,跨库分页的几种常见方案

为什么需要研究跨库分页?互联网很多业务都有分页拉取数据的需求,例如:(1)微信消息过多时,拉取第N页消息;(2)京东下单过多时,拉取第N页订单;(3)浏览58同城,查看第N页帖子;…