Spark整合Ray思路漫谈

什么是Ray

之前花了大概两到三天把Ray相关的论文,官网文档看了一遍,同时特意去找了一些中文资料看Ray当前在国内的发展情况(以及目前国内大部分人对Ray的认知程度)。

先来简单介绍下我对Ray的认知。

首先基因很重要,所以我们先需要探查下Ray最初是为了解决什么问题而产生的。Ray的论文显示,它最早是为了解决增强学习的挑战而设计的。增强学习的难点在于它是一个需要边学习,边做实时做预测的应用场景,这意味会有不同类型的tasks同时运行,并且他们之间存在复杂的依赖关系,tasks会在运行时动态产生产生新的tasks,现有的一些计算模型肯定是没办法解决的。如果Ray只是为了解决RL事情可能没有那么复杂,但是作者希望它不仅仅能跑增强学习相关的,希望是一个通用的分布式机器学习框架,这就意味着Ray必然要进行分层抽象了,也就是至少要分成系统层应用层

系统层面,既然是分布式的应用,那么肯定需要有一个应用内的resource/task调度和管理。首先是Yarn,K8s等资源调度框架是应用程序级别的的调度,Ray作为一个为了解决具体业务问题的应用,应该要跑在他们上面而不是取代他们,而像Spark/Flink虽然也是基于task级别的资源调度框架,但是因为他们在设计的时候是为了解决一个比较具体的抽象问题,所以系统对task/资源都做了比较高的封装,一般用户是面向业务编程,很难直接操控task以及对应的resource。我们以Spark为例,用户定义好了数据处理逻辑,至于如何将这些逻辑分成多少个Job,Stage,Task,最后占用多少Resource (CPU,GPU,Memory,Disk)等等,都是由框架自行决定,而用户无法染指。这也是我一直诟病Spark的地方。所以Ray在系统层面,是一个通用的以task为调度级别的,同时可以针对每个task控制资源粒度的一个通用的分布式task执行系统。记住,在Ray里,你需要明确定义Task以及Task的依赖,并且为每个task指定合适(数量,资源类型)的资源。比如你需要用三个task处理一份数据,那么你就需要自己启动三个task,并且指定这些task需要的资源(GPU,CPU)以及数量(可以是小数或者整数)。而在Spark,Flink里这是不大可能的。Ray为了让我们做这些事情,默认提供了Python的语言接口,你可以像使用Numpy那样去使用Ray。实际上,也已经有基于Ray做Backend的numpy实现了,当然它属于应用层面的东西了。Ray系统层面很简单,也是典型的master-worker模式。类似spark的driver-executor模式,不同的是,Ray的worker类似yarn的worker,是负责Resource管理的,具体任务它会启动Python worker去执行你的代码,而spark的executor虽然也会启动Python worker执行python代码,但是对应的executor也执行业务逻辑,和python worker有数据交换和传输。

应用层面,你可以基于Ray的系统进行编程,因为Ray默认提供了Python的编程接口,所以你可以自己实现增强学习库(RLLib),也可以整合已有的算法框架,比如tensorflow,让tensorflow成为Ray上的一个应用,并且轻松实现分布式。我记得知乎上有人说Ray其实就是一个Python的分布式RPC框架,这么说是对的,但是显然会有误导,因为这很可能让人以为他只是“Python分布式RPC框架”。

如何和Spark协作

根据前面我讲述的,我们是可以完全基于Ray实现Spark的大部分API的,只是是Ray backend而非Spark core backend。实际上Ray目前正在做流相关的功能,他们现在要做的就是要兼容Flink的API。虽然官方宣称Ray是一个新一代的机器学习分布式框架,但是他完全可以cover住当前大数据和AI领域的大部分事情,但是任重道远,还需要大量的事情。所以对我而言,我看中的是它良好的Python支持,以及系统层面对资源和task的控制,这使得:

1.我们可以轻易的把我们的单机Python算法库在Ray里跑起来(虽然算法自身不是分布式的),但是我们可以很好的利用Ray的资源管理和调度功能,从而解决AI平台的资源管理问题。

2.Ray官方提供了大量的机器学习算法的实现,以及对当前机器学习框架如Tensorflow,Pytorch的整合,而分布式能力则比这些库原生提供的模式更靠谱和易用。毕竟对于这些框架而言,支持他们分布式运行的那些辅助库(比如TensorFlow提供parameter servers)相当简陋。

但是,我们知道,数据处理它自身有一个很大的生态,比如你的用户画像数据都在数据湖里,你需要把这些数据进行非常复杂的计算才能作为特征喂给你的机器学习算法。而如果这个时候,你还要面向资源编程(或者使用一个还不够成熟的上层应用)而不是面向“业务”编程,这就显得很难受了,比如我就想用SQL处理数据,我只关注处理的业务逻辑,这个当前Ray以及之上的应用显然还是做不到如Spark那么便利的(毕竟Spark就是为了数据处理而生的),所以最好的方式是,数据的获取和加工依然是在Spark之上,但是数据准备好了就应该丢给用户基于Ray写的代码处理了。Ray可以通过Arrow项目读取HDFS上Spark已经处理好的数据,然后进行训练,然后将模型保存为HDFS。当然对于预测,Ray可以自己消化掉或者丢给其他系统完成。我们知道Spark 在整合Python生态方面做出了非常多的努力,比如他和Ray一样,也提供了python 编程接口,所以spark也较为容易的整合譬如Tensorflow等框架,但是没办法很好的管控资源(比如GPU),而且,spark 的executor 会在他所在的服务器上启动python worker,而spark一般而言是跑在yarn上的,这就对yarn造成了很大的管理麻烦,而且通常yarn 和hdfs之类的都是在一起的,python环境还有资源(CPU/GPU)除了管理难度大以外,还有一个很大的问题是可能会对yarn的集群造成比较大的稳定性风险。

所以最好的模式是按如下步骤开发一个机器学习应用:

写一个python脚本,
在数据处理部分,使用pyspark,
在程序的算法训练部分,使用ray,
spark 运行在yarn(k8s)上,
ray运行在k8s里

好处显而易见:用户完全无感知他的应用其实是跑在两个集群里的,对他来说就是一个普通python脚本。

从架构角度来讲,复杂的python环境管理问题都可以丢给ray集群来完成,spark只要能跑基本的pyspark相关功能即可,数据衔接通过数据湖里的表(其实就是一堆parquet文件)即可。当然,如果最后结果数据不大,也可以直接通过client完成pyspark到ray的交互。

Spark和Ray的架构和部署

现在我们来思考一个比较好的部署模式,架构图大概类似这样:

首先,大家可以理解为k8s已经解决一切了,我们spark,ray都跑在K8s上。但是,如果我们希望一个spark 是实例多进程跑的时候,我们并不希望是像传统的那种方式,所有的节点都跑在K8s上,而是将executor部分放到yarn cluster. 在我们的架构里,spark driver 是一个应用,我们可以启动多个pod从而获得多个spark driver实例,对外提供负载均衡,roll upgrade/restart 等功能。也就是k8s应该是面向应用的。但是复杂的计算,我们依然希望留给Yarn,尤其是还涉及到数据本地性,计算和存储放到一起(yarn和HDFS通常是在一起的),避免k8s和HDFS有大量数据交换。

因为Yarn对Java/Scala友好,但是对Python并不友好,尤其是在yarn里涉及到Python环境问题会非常难搞(主要是Yarn对docker的支持还是不够优秀,对GPU支持也不好),而机器学习其实一定重度依赖Python以及非常复杂的本地库以及Python环境,并且对资源调度也有比较高的依赖,因为算法是很消耗机器资源的,必须也有资源池,所以我们希望机器学习部分能跑在K8s里。但是我们希望整个数据处理和训练过程是一体的,算法的同学应该无法感知到k8s/yarn的区别。为了达到这个目标,用户依然使用pyspark来完成计算,然后在pyspark里使用ray的API做模型训练和预测,数据处理部分自动在yarn中完成,而模型训练部分则自动被分发到k8s中完成。并且因为ray自身的优势,算法可以很好的控制自己需要的资源,比如这次训练需要多少GPU/CPU/内存,支持所有的算法库,在做到对算法最少干扰的情况下,算法的同学们有最好的资源调度可以用。

下面展示一段MLSQL代码片段展示如何利用上面的架构:

-- python 训练模型的代码
set py_train='''
import ray
ray.init()
@ray.remote(num_cpus=2, num_gpus=1)
def f(x):return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
''';
load script.`py_train` as py_train;-- 设置需要的python环境描述
set py_env='''
''';
load script.`py_env` as py_env;-- 加载hive的表
load hive.`db1.table1` as table1;-- 对Hive做处理,比如做一些特征工程
select features,label from table1 as data;-- 提交Python代码到Ray里,此时是运行在k8s里的
train data as PythonAlg.`/tmp/tf/model`
where scripts="py_train"
and entryPoint="py_train"
and condaFile="py_env"
and  keepVersion="true"
and fitParam.0.fileFormat="json" -- 还可以是parquet
and `fitParam.0.psNum`="1";

下面是PySpark的示例代码:

from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.sql import SparkSession
import logging
import rayfrom pyspark.sql.types import StructField, StructType, BinaryType, StringType, ArrayType, ByteType
from sklearn.naive_bayes import GaussianNB
import os
from sklearn.externals import joblib
import pickle
import scipy.sparse as sp
from sklearn.svm import SVC
import io
import codecsos.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3"
logger = logging.getLogger(__name__)base_dir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"
spark = SparkSession.builder.master("local[*]").appName("example").getOrCreate()data = spark.read.format("libsvm").load(base_dir + "/data/mllib/sample_libsvm_data.txt")## 广播数据
dataBr = spark.sparkContext.broadcast(data.collect())## 训练模型 这部分代码会在spark executor里的python worker执行
def train(row):import rayray.init()train_data_id = ray.put(dataBr.value)## 这个函数的python代码会在K8s里的Ray里执行@ray.remotedef ray_train(x):X = []y = []for i in ray.get(train_data_id):X.append(i["features"])y.append(i["label"])if row["model"] == "SVC":gnb = GaussianNB()model = gnb.fit(X, y)# 为什么还需要encode一下?pickled = codecs.encode(pickle.dumps(model), "base64").decode()return [row["model"], pickled]if row["model"] == "BAYES":svc = SVC()model = svc.fit(X, y)pickled = codecs.encode(pickle.dumps(model), "base64").decode()return [row["model"], pickled]result = ray_train.remote(row)ray.get(result)##训练模型 将模型结果保存到HDFS上
rdd = spark.createDataFrame([["SVC"], ["BAYES"]], ["model"]).rdd.map(train)
spark.createDataFrame(rdd, schema=StructType([StructField(name="modelType", dataType=StringType()),StructField(name="modelBinary", dataType=StringType())])).write. \format("parquet"). \mode("overwrite").save("/tmp/wow")

这是一个标准的Python程序,只是使用了pyspark/ray的API,我们就完成了上面所有的工作,同时训练两个模型,并且数据处理的工作在spark中,模型训练的在ray中。

完美结合!最重要的是解决了资源管理的问题!


原文链接
本文为阿里云原创内容,未经允许不得转载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517134.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

建设数据中台之前,建议先看这份企业数据能力测评 | 大咖说中台

作者 | 耿立超来源 | 《大数据平台架构与原型实现:数据中台建设实战》“我的企业目前在数据应用上处于什么水平?接下来应该朝哪个方向努力?”本文试图帮助企业决策者和IT负责人解答这一问题。今天,数据之于企业的重要性已经勿须多…

如何让 python 处理速度翻倍?内含代码

阿里妹导读:作为在日常开发生产中非常实用的语言,有必要掌握一些python用法,比如爬虫、网络请求等场景,很是实用。但python是单线程的,如何提高python的处理速度,是一个很重要的问题,这个问题的…

Zipkin 存储追踪数据至 MySQL

下载zipkin-mysql数据库脚本 https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/mysql-v1/src/main/resources 创建数据库名称为zipkin,字符集编码:utf8mb4 初始化脚本 -- -- Copyright 2015-2019 The OpenZipkin Authors -- -- Licen…

Spring Cloud Alibaba 新一代微服务解决方案

本篇是「跟我学 Spring Cloud Alibaba」系列的第一篇, 每期文章会在公众号「架构进化论」进行首发更新,欢迎关注。 1、Spring Cloud Alibaba 是什么 Spring Cloud Alibaba 是阿里巴巴提供的微服务开发一站式解决方案,是阿里巴巴开源中间件…

它估值25亿!被马云领投,是华为“老战友”,网友:也许股价能超茅台!

最近一条新闻被炒的沸沸扬扬:十年以来中国最大IPO,中芯国际将融资532亿元!何为IPO?翻译即为一家公司第一次向全社会公开售出它的股份。买的人越多,代表着社会对其信心越大。为什么2020年,能爆发这样1场最大…

RabbitMQ 最新版安装 (Linux环境)

文章目录一、Erlang1. Erlang下载2. Erlang 上传并解压3. 验证rabbitmq依赖是否安装4. 安装rabbitmq依赖5. Erlang 编译、安装6. Erlang 配置环境变量7. Erlang 验证二、RabbitMQ2.1. RabbitMQ 下载2.2. RabbitMQ 上传并解压2.3. RabbitMQ 配置2.4. 配置环境变2.5. 启动 Rabbit…

双11 背后的全链路可观测性:阿里巴巴鹰眼在“云原生时代”的全面升级

导读:作为一支深耕多年链路追踪技术 (Tracing) 与性能管理服务 (APM) 的团队,阿里巴巴中间件鹰眼团队的工程师们见证了阿里巴巴基础架构的多次升级,每一次的架构升级都会对系统可观测性能力 (Observability) 带来巨大挑战,而这次的…

一切转型始于数据和模型 | 2020 MATLAB EXPO 中国线上用户大会:即将上线

2020 MATLAB EXPO 中国线上用户大会一切转型始于数据和模型2020 年 7 月 21-24 日 | 线上直播MATLAB 和 Simulink,作为业界普遍使用的科学计算与模型仿真软件,已被全球的工程师和科学家们广泛应用于加快汽车、航空、电子、金融服务、生物医药以及其他行业…

Dubbo 如何成为连接异构微服务体系的最佳服务开发框架

从编程开发的角度来说,Apache Dubbo (以下简称 Dubbo )首先是一款 RPC 服务框架,它最大的优势在于提供了面向接口代理的服务编程模型,对开发者屏蔽了底层的远程通信细节。同时 Dubbo 也是一款服务治理框架,…

Zipkin 基于MQ存 储链路信息至 MySQL

RabbitMQ 最新版安装 (Linux环境) https://gblfy.blog.csdn.net/article/details/120498390 启动rabbitmq 队列是空的 数据库表是无数据的 启动nacos 应用集成rabbitMQ 父工程导入依赖 <!-- 消息队列通用依赖 --><dependency><groupId>org.springframewo…

标签编辑新工具:如何使用控制台标签编辑器(Tag editor)

创建阿里云资源时&#xff0c;您可以给资源绑定标签。已经创建的资源&#xff0c;也可以在资源列表页面或者通过API&#xff0c;批量的添加、更改和删除标签。当遇到如下更为复杂问题和场景&#xff0c;该如何快速解决标签问题呢&#xff1f; 资源跨度大&#xff0c;需要跨资源…

炸裂!这些大厂跪求的人才太牛了!

今年所有的互联网公司都在ALL in AI&#xff0c;百度、腾讯、阿里巴巴、京东等互联网巨头都在四处挖掘AI人才。AI的岗位需求很多&#xff0c;几乎每天都有数百个JD放出。而亿欧智库发布的《2020全球人工智能人才培养研究报告》提到&#xff0c;至今为止AI的人才储备仍跟不上需求…

60TB 数据量的作业从 Hive 迁移到 Spark 在 Facebook 的实践

Facebook 经常使用分析来进行数据驱动的决策。在过去的几年里&#xff0c;用户和产品都得到了增长&#xff0c;使得我们分析引擎中单个查询的数据量达到了数十TB。我们的一些批处理分析都是基于 Hive 平台&#xff08;Apache Hive 是 Facebook 在2009年贡献给社区的&#xff09…

阿里主管通知我试用期延期……

阿里妹导读&#xff1a;接下来的文章是一篇发布在阿里内网里的文章。花木是一位走出体制的博士&#xff0c;讲述自己Landing的经历。今天&#xff0c;她将这段经历分享给大家&#xff0c;告诉我们&#xff1a;脸先着地又怎样&#xff0c;哪有那么多坦途&#xff1b;最美的&…

使用hbuilder的maps模块调起百度地图导航

首先需要在百度地图开放平台&#xff0c;创建应用拿到appid&#xff0c;然后在hbuilder进行如下配置&#xff1a; hbuilder的manifest.json的配置如下&#xff1a; permissions下添加如下代码&#xff1a; "Maps": {"description": "地图"} 然…

SpringBoot2.x RabbitMQ Nacos Nacos-Config

文章目录一、依赖配置1. 引入依赖2. 配置文件3. 主配置二、生产者代码代码Conding2.1. 发送客户端2.2. 确认机制2.3. 消息 return机制2.4. controller2.5. MQ工具类2.6. 常量类三、消费端3.2. 消费者代码3.2. RabbitMQ常用命令一、依赖配置 1. 引入依赖 <!--服务注册发现--…

考拉海购技术支持的前世今生

本文来自考拉海购技术支持中心负责人--书渊的分享&#xff0c;想和大家聊一聊考拉技术支持的前世今生&#xff0c;在这个发展历程的介绍当中&#xff0c;大家也可以此对考拉窥一斑而知全豹。当然&#xff0c;既然是聊我们的家常(“黑历史”)&#xff0c;我会从这几年在考拉供应…

PHP 依赖镜像出问题后,阿里工程师的一顿“神操作“令人叫绝!

阿里妹导读&#xff1a;上个月&#xff0c;PHP开发者在网上纷纷反映出现 Composer 镜像无法访问的问题。阿里云内部一位 90 后工程师顾咏连夜开工排查&#xff0c;快速解决问题后&#xff0c;他在问题群里收到了一大波来自用户的红包。顾咏最后谢绝了红包&#xff0c;接受了阿里…

Elasticsearch SkyWalking 分布式链路追踪

文章目录1. 安装包下载2. 解压3. 修改配置文件4. 启动5. 测试验证1. 安装包下载 https://www.apache.org/dyn/closer.cgi/skywalking/8.8.0/apache-skywalking-apm-8.8.0.tar.gzhttps://archive.apache.org/dist/skywalking 2. 解压 tar -zxvf apache-skywalking-apm-8.8.0…

为什么HR 20分钟就淘汰了一个前端高级工程师?

最近HR小姐姐面了一个前端开发&#xff0c;4 年经验&#xff0c; 应聘的是前端高级开发工程师。他的简历中提到很多技术点&#xff0c;从 HTML、CSS、JavaScript 再到 Vue.js 和 React 一个都不缺&#xff0c;跨平台PC、移动端、小程序也都经历过&#xff0c;看着像个实战派。深…