spark官方文档_Spark整合Ray思路漫谈

什么是Ray

之前花了大概两到三天把Ray相关的论文,官网文档看了一遍,同时特意去找了一些中文资料看Ray当前在国内的发展情况(以及目前国内大部分人对Ray的认知程度)。

先来简单介绍下我对Ray的认知。

首先基因很重要,所以我们先需要探查下Ray最初是为了解决什么问题而产生的。Ray的论文显示,它最早是为了解决增强学习的挑战而设计的。增强学习的难点在于它是一个需要边学习,边做实时做预测的应用场景,这意味会有不同类型的tasks同时运行,并且他们之间存在复杂的依赖关系,tasks会在运行时动态产生产生新的tasks,现有的一些计算模型肯定是没办法解决的。如果Ray只是为了解决RL事情可能没有那么复杂,但是作者希望它不仅仅能跑增强学习相关的,希望是一个通用的分布式机器学习框架,这就意味着Ray必然要进行分层抽象了,也就是至少要分成系统层应用层

系统层面,既然是分布式的应用,那么肯定需要有一个应用内的resource/task调度和管理。首先是Yarn,K8s等资源调度框架是应用程序级别的的调度,Ray作为一个为了解决具体业务问题的应用,应该要跑在他们上面而不是取代他们,而像Spark/Flink虽然也是基于task级别的资源调度框架,但是因为他们在设计的时候是为了解决一个比较具体的抽象问题,所以系统对task/资源都做了比较高的封装,一般用户是面向业务编程,很难直接操控task以及对应的resource。我们以Spark为例,用户定义好了数据处理逻辑,至于如何将这些逻辑分成多少个Job,Stage,Task,最后占用多少Resource (CPU,GPU,Memory,Disk)等等,都是由框架自行决定,而用户无法染指。这也是我一直诟病Spark的地方。所以Ray在系统层面,是一个通用的以task为调度级别的,同时可以针对每个task控制资源粒度的一个通用的分布式task执行系统。记住,在Ray里,你需要明确定义Task以及Task的依赖,并且为每个task指定合适(数量,资源类型)的资源。比如你需要用三个task处理一份数据,那么你就需要自己启动三个task,并且指定这些task需要的资源(GPU,CPU)以及数量(可以是小数或者整数)。而在Spark,Flink里这是不大可能的。Ray为了让我们做这些事情,默认提供了Python的语言接口,你可以像使用Numpy那样去使用Ray。实际上,也已经有基于Ray做Backend的numpy实现了,当然它属于应用层面的东西了。Ray系统层面很简单,也是典型的master-worker模式。类似spark的driver-executor模式,不同的是,Ray的worker类似yarn的worker,是负责Resource管理的,具体任务它会启动Python worker去执行你的代码,而spark的executor虽然也会启动Python worker执行python代码,但是对应的executor也执行业务逻辑,和python worker有数据交换和传输。

应用层面,你可以基于Ray的系统进行编程,因为Ray默认提供了Python的编程接口,所以你可以自己实现增强学习库(RLLib),也可以整合已有的算法框架,比如tensorflow,让tensorflow成为Ray上的一个应用,并且轻松实现分布式。我记得知乎上有人说Ray其实就是一个Python的分布式RPC框架,这么说是对的,但是显然会有误导,因为这很可能让人以为他只是“Python分布式RPC框架”。

如何和Spark协作

根据前面我讲述的,我们是可以完全基于Ray实现Spark的大部分API的,只是是Ray backend而非Spark core backend。实际上Ray目前正在做流相关的功能,他们现在要做的就是要兼容Flink的API。虽然官方宣称Ray是一个新一代的机器学习分布式框架,但是他完全可以cover住当前大数据和AI领域的大部分事情,但是任重道远,还需要大量的事情。所以对我而言,我看中的是它良好的Python支持,以及系统层面对资源和task的控制,这使得:

1.我们可以轻易的把我们的单机Python算法库在Ray里跑起来(虽然算法自身不是分布式的),但是我们可以很好的利用Ray的资源管理和调度功能,从而解决AI平台的资源管理问题。

2.Ray官方提供了大量的机器学习算法的实现,以及对当前机器学习框架如Tensorflow,Pytorch的整合,而分布式能力则比这些库原生提供的模式更靠谱和易用。毕竟对于这些框架而言,支持他们分布式运行的那些辅助库(比如TensorFlow提供parameter servers)相当简陋。

但是,我们知道,数据处理它自身有一个很大的生态,比如你的用户画像数据都在数据湖里,你需要把这些数据进行非常复杂的计算才能作为特征喂给你的机器学习算法。而如果这个时候,你还要面向资源编程(或者使用一个还不够成熟的上层应用)而不是面向“业务”编程,这就显得很难受了,比如我就想用SQL处理数据,我只关注处理的业务逻辑,这个当前Ray以及之上的应用显然还是做不到如Spark那么便利的(毕竟Spark就是为了数据处理而生的),所以最好的方式是,数据的获取和加工依然是在Spark之上,但是数据准备好了就应该丢给用户基于Ray写的代码处理了。Ray可以通过Arrow项目读取HDFS上Spark已经处理好的数据,然后进行训练,然后将模型保存为HDFS。当然对于预测,Ray可以自己消化掉或者丢给其他系统完成。我们知道Spark 在整合Python生态方面做出了非常多的努力,比如他和Ray一样,也提供了python 编程接口,所以spark也较为容易的整合譬如Tensorflow等框架,但是没办法很好的管控资源(比如GPU),而且,spark 的executor 会在他所在的服务器上启动python worker,而spark一般而言是跑在yarn上的,这就对yarn造成了很大的管理麻烦,而且通常yarn 和hdfs之类的都是在一起的,python环境还有资源(CPU/GPU)除了管理难度大以外,还有一个很大的问题是可能会对yarn的集群造成比较大的稳定性风险。

所以最好的模式是按如下步骤开发一个机器学习应用:

写一个python脚本,在数据处理部分,使用pyspark,在程序的算法训练部分,使用ray,spark 运行在yarn(k8s)上,ray运行在k8s里

好处显而易见:用户完全无感知他的应用其实是跑在两个集群里的,对他来说就是一个普通python脚本。

从架构角度来讲,复杂的python环境管理问题都可以丢给ray集群来完成,spark只要能跑基本的pyspark相关功能即可,数据衔接通过数据湖里的表(其实就是一堆parquet文件)即可。当然,如果最后结果数据不大,也可以直接通过client完成pyspark到ray的交互。

Spark和Ray的架构和部署

现在我们来思考一个比较好的部署模式,架构图大概类似这样:

537561c22050438ef3007c6e80d9ac34.png

首先,大家可以理解为k8s已经解决一切了,我们spark,ray都跑在K8s上。但是,如果我们希望一个spark 是实例多进程跑的时候,我们并不希望是像传统的那种方式,所有的节点都跑在K8s上,而是将executor部分放到yarn cluster. 在我们的架构里,spark driver 是一个应用,我们可以启动多个pod从而获得多个spark driver实例,对外提供负载均衡,roll upgrade/restart 等功能。也就是k8s应该是面向应用的。但是复杂的计算,我们依然希望留给Yarn,尤其是还涉及到数据本地性,计算和存储放到一起(yarn和HDFS通常是在一起的),避免k8s和HDFS有大量数据交换。

因为Yarn对Java/Scala友好,但是对Python并不友好,尤其是在yarn里涉及到Python环境问题会非常难搞(主要是Yarn对docker的支持还是不够优秀,对GPU支持也不好),而机器学习其实一定重度依赖Python以及非常复杂的本地库以及Python环境,并且对资源调度也有比较高的依赖,因为算法是很消耗机器资源的,必须也有资源池,所以我们希望机器学习部分能跑在K8s里。但是我们希望整个数据处理和训练过程是一体的,算法的同学应该无法感知到k8s/yarn的区别。为了达到这个目标,用户依然使用pyspark来完成计算,然后在pyspark里使用ray的API做模型训练和预测,数据处理部分自动在yarn中完成,而模型训练部分则自动被分发到k8s中完成。并且因为ray自身的优势,算法可以很好的控制自己需要的资源,比如这次训练需要多少GPU/CPU/内存,支持所有的算法库,在做到对算法最少干扰的情况下,算法的同学们有最好的资源调度可以用。

下面展示一段MLSQL代码片段展示如何利用上面的架构:

-- python 训练模型的代码set py_train='''import rayray.init()@ray.remote(num_cpus=2, num_gpus=1)def f(x):    return x * xfutures = [f.remote(i) for i in range(4)]print(ray.get(futures))''';load script.`py_train` as py_train;-- 设置需要的python环境描述set py_env='''''';load script.`py_env` as py_env;-- 加载hive的表load hive.`db1.table1` as table1;-- 对Hive做处理,比如做一些特征工程select features,label from table1 as data;-- 提交Python代码到Ray里,此时是运行在k8s里的train data as PythonAlg.`/tmp/tf/model`where scripts="py_train"and entryPoint="py_train"and condaFile="py_env"and  keepVersion="true"and fitParam.0.fileFormat="json" -- 还可以是parquetand `fitParam.0.psNum`="1";

下面是PySpark的示例代码:

from pyspark.ml.linalg import Vectors, SparseVectorfrom pyspark.sql import SparkSessionimport loggingimport rayfrom pyspark.sql.types import StructField, StructType, BinaryType, StringType, ArrayType, ByteTypefrom sklearn.naive_bayes import GaussianNBimport osfrom sklearn.externals import joblibimport pickleimport scipy.sparse as spfrom sklearn.svm import SVCimport ioimport codecsos.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3"logger = logging.getLogger(__name__)base_dir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"spark = SparkSession.builder.master("local[*]").appName("example").getOrCreate()data = spark.read.format("libsvm").load(base_dir + "/data/mllib/sample_libsvm_data.txt")## 广播数据dataBr = spark.sparkContext.broadcast(data.collect())## 训练模型 这部分代码会在spark executor里的python worker执行def train(row):    import ray    ray.init()    train_data_id = ray.put(dataBr.value)    ## 这个函数的python代码会在K8s里的Ray里执行    @ray.remote    def ray_train(x):        X = []        y = []        for i in ray.get(train_data_id):            X.append(i["features"])            y.append(i["label"])        if row["model"] == "SVC":            gnb = GaussianNB()            model = gnb.fit(X, y)            # 为什么还需要encode一下?            pickled = codecs.encode(pickle.dumps(model), "base64").decode()            return [row["model"], pickled]        if row["model"] == "BAYES":            svc = SVC()            model = svc.fit(X, y)            pickled = codecs.encode(pickle.dumps(model), "base64").decode()            return [row["model"], pickled]    result = ray_train.remote(row)    ray.get(result)   ##训练模型 将模型结果保存到HDFS上rdd = spark.createDataFrame([["SVC"], ["BAYES"]], ["model"]).rdd.map(train)spark.createDataFrame(rdd, schema=StructType([StructField(name="modelType", dataType=StringType()),                                              StructField(name="modelBinary", dataType=StringType())])).write.     format("parquet").     mode("overwrite").save("/tmp/wow")

这是一个标准的Python程序,只是使用了pyspark/ray的API,我们就完成了上面所有的工作,同时训练两个模型,并且数据处理的工作在spark中,模型训练的在ray中。

完美结合!最重要的是解决了资源管理的问题!

作者:祝威廉

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/446078.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux加密框架 crypto 算法管理 - 算法查找接口 crypto_find_alg

算法查找接口crypto_find_alg 算法实例tfm是算法的一个可运行的副本,因此在创建算法实例前首先要查找确认算法是否已经注册有效,此时算法查找由函数crypto_find_alg实现。补充: struct crypto_tfm *tfm; crypto_tfm类型指针tfm可以理解为指代…

linux加密框架 crypto 算法管理 - 算法查找接口 crypto_alg_mod_lookup

参考链接 Linux加密框架的算法管理(二)_家有一希的博客-CSDN博客linux加密框架 crypto 算法管理 - 算法查找接口 crypto_find_alg_CHYabc123456hh的博客-CSDN博客 函数介绍 crypto_alg_mod_lookup函数输入参数包括待查找的算法名name、算法类型type和算…

qt triggered信号_Qt之网络编程UDP通信

点击上方“Qt学视觉”,选择“星标”公众号重磅干货,第一时间送达想要学习的同学们还请认真阅读每篇文章,相信你一定会有所收获UDP通信概述UDP(UserDatagramProtocol,用户数据报协议)是轻量的、不可靠的、面向数据报(datagram)、无…

adguard没有核心 core no_面试官:线程池如何按照core、max、queue的执行顺序去执行?...

前言这是一个真实的面试题。前几天一个朋友在群里分享了他刚刚面试候选者时问的问题:"线程池如何按照core、max、queue的执行循序去执行?"。我们都知道线程池中代码执行顺序是:corePool->workQueue->maxPool,源码…

linux加密框架 crypto 算法管理 - 算法查找接口 crypto_larval_lookup

参考链接 Linux加密框架的算法管理(二)_家有一希的博客-CSDN博客 crypto_larval_lookup函数介绍 crypto_larval_lookup函数的输入参数包括待查找的算法名name、算法类型type和算法类型屏蔽位mask,查找命中时返回查找到的算法或注册用算法幼…

linux加密框架 crypto 算法管理 - 算法查找接口 crypto_alg_lookup函数

参考链接 Linux加密框架的算法管理(二)_家有一希的博客-CSDN博客 函数介绍 static struct crypto_alg *crypto_alg_lookup(const char *name, u32 type,u32 mask) {struct crypto_alg *alg;u32 test 0;if (!((type | mask) & CRYPTO_ALG_TESTED))…

linux加密框架 crypto 算法管理 - 动态和静态算法管理

参考链接 Linux加密框架的算法管理(三)_家有一希的博客-CSDN博客 动态和静态算法管理 静态算法 加密框架中的算法分为静态算法和动态算法两种,其中静态算法指的是以"算法名.ko"形式存在的静态编译的算法模块,如aes.k…

linux加密框架 crypto 算法管理 - 算法检测

参考链接 Linux加密框架的算法管理(四)_家有一希的博客-CSDN博客 函数介绍 如前所述,无论是静态算法还是动态算法,算法注册的最后一步都是进行算法正确性检验,一般流程是先调用__crypto_register_alg函数进行通用的算…

select选中的值_selenium下拉框处理(select)

前言 web自动化中,常见的场景还有一个下拉框的选择,哪么在selenium中如何做下拉框的操作呢?selectselect在HTML中表示元素名,可创建单选或多选菜单。HTML中select长什么样子:select在HTML中元素名,下面有选…

linux加密框架 crypto 算法管理 - 创建哈希算法实例

crypto_alloc_ahash函数 加密框架中的哈希算法可以是同步方式实现的也可以是异步方式实现的,但是算法应用不关注哈希算法的实现方式,关注的是哈希算法提供的算法接口。为实现统一管理,加密框架默认哈希算法的实现方式为异步方式,…

发票管理软件_企业为什么需要ERP企业管理软件?

对于一个制造企业来说,生产是企业最大的动力,而生产也需要进行优化管理,一个好的生产管理方式会带给企业巨大的发展空间和利润价值。对于一个制造企业来说,生产是企业最大的动力,而生产也需要进行优化管理,…

linux加密框架 crypto 算法管理 - 应用角度讲解加密框架的运行流程

参考链接 Linux加密框架的应用示例(一)_家有一希的博客-CSDN博客 本文大纲 本节将从应用角度说明加密框架的运行流程,包括加密框架如何管理算法、如何动态创建算法,应用模块如何创建算法实例、如何通过算法实例调用算法接口等。…

java 累进计费率计算_设计费400万,缴纳所得税100万,如何筹划

很多公司老板都会把利润放在第一位,照理说这是没错的,公司要盈利才能继续经营下去。我国有很多针对小微企业的政策,盈利不高的情况下,基本不会去考虑纳税问题,也没有多少税收压力。但是对一些暴利的服务型行业、软件设…

linux加密框架 crypto 算法管理 - 哈希算法应用实例

参考链接 Linux加密框架应用示例(二)_家有一希的博客-CSDN博客linux加密框架 crypto 算法管理 - 应用角度讲解加密框架的运行流程_CHYabc123456hh的博客-CSDN博客 在应用模块中创建并初始化哈希算法实例 假设某个SA配置使用的认证算法为"hmac(md5…

Linux加密框架 crypto crypto_larval | crypto_larval_alloc | __crypto_register_alg 介绍

参考链接 Lniux加密框架中的主要数据结构(五)_家有一希的博客-CSDN博客crypto_larval struct crypto_larval {struct crypto_alg alg;struct crypto_alg *adult;struct completion completion;u32 mask; };结构体名叫 crypto_larval (算法幼…

好玩的脚本代码大全_Github | 推荐一个Python脚本集合项目

点击上方"蓝字"关注我们Python大数据分析记录 分享 成长用python写小脚本是一件好玩的事情,因为不是个大活儿,而且能解决眼边前十分繁琐的事情,这种轻松且便宜的代码颇受人民群众的欢迎~有点生活小妙招的意味大家较为熟知的脚本…

linux加密框架 crypto 算法管理 - 算法查找接口

参考链接 Linux加密框架的算法管理(二)_家有一希的博客-CSDN博客linux加密框架 crypto 算法管理 - 算法查找接口 crypto_find_alg_CHYabc123456hh的博客-CSDN博客linux加密框架 crypto 算法管理 - 算法查找接口 crypto_alg_mod_lookup_CHYabc123456hh的…

xml模糊查询语句_2Mybatis学习笔记07:动态SQL语句(原创,转载请注明来源)

开发环境:硬件环境:Windows10JDK 1.8; 软件环境:JavaEclipseMybatismaven3.6tomcat8.0Postgresql 10.6; 用到的jar包: asm-3.3.1.jar cglib-2.2.2.jar commons-logging-1.1.1.jar javassist-3.17.1-GA.jar …

硬件密码组件的硬件结构、作用及实现应用设计

引 言 1 硬件密码组件的概念 密码技术是解决信息安全问题的核心技术。要实现信息的保密性、完整性、可控性和不可否认性等安全要求,都离不开密码技术的运用。在具体的信息安全系统中,密码技术的运用可以基于软件密码组件(简称为SCM&#xf…

sql倒序查询语句_SQL丨1.基本查询语句复习

此为自用查询语句1.selectSELECT column1,column2 FROM table1;常用的格式惯例:大写了SELECT和FROM,而将表名和列名小写;通常在列名中使用下划线,避免使用空格;在每个语句末尾添加分号;SQL不区分大小写。2.…