用 Mars Remote API 轻松分布式执行 Python 函数

Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。

启动 Mars 分布式环境可以参考:

  1. 命令行方式在集群中部署。
  2. Kubernetes 中部署。
  3. MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的可以直接使用。

如何使用 Mars Remote API

使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。

拿用蒙特卡洛方法计算 π 为例。代码如下,我们编写了两个函数,calc_chunk 用来计算每个分片内落在圆内的点的个数,calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 π 值。

from typing import List
import numpy as npdef calc_chunk(n: int, i: int):# 计算n个随机点(x和y轴落在-1到1之间)到原点距离小于1的点的个数rs = np.random.RandomState(i)a = rs.uniform(-1, 1, size=(n, 2))d = np.linalg.norm(a, axis=1)return (d < 1).sum()def calc_pi(fs: List[int], N: int):# 将若干次 calc_chunk 计算的结果汇总,计算 pi 的值return sum(fs) * 4 / NN = 200_000_000
n = 10_000_000fs = [calc_chunk(n, i)for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)

%%time 下可以看到结果:

3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s

在单机需要 12.3 s。

要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。

import mars.remote as mr# 函数调用改成 mars.remote.spawn
fs = [mr.spawn(calc_chunk, args=(n, i))for i in range(N // n)]
# 把 spawn 的列表传入作为参数,再 spawn 新的函数
pi = mr.spawn(calc_pi, args=(fs, N))
# 通过 execute() 触发执行,fetch() 获取结果
print(pi.execute().fetch())

%%time 下看到结果:

3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s

结果一模一样,但是却有数倍的性能提升。

可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。

一个例子

为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。

困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。

准备数据

这个例子里我们使用 otto 数据集。

首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。

import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_splitdef gen_data():df = pd.read_csv('otto/train.csv')X = df.drop(['target', 'id'], axis=1)y = df['target']label_encoder = LabelEncoder()label_encoder.fit(y)y = label_encoder.transform(y)return train_test_split(X, y, test_size=0.33, random_state=123)X_train, X_test, y_train, y_test = gen_data()

模型

接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。

RandomForest:

from sklearn.ensemble import RandomForestClassifierdef random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False,**kw):model = RandomForestClassifier(verbose=verbose, **kw)model.fit(X_train, y_train)return model

接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。

def gen_random_forest_parameters():for n_estimators in [50, 100, 600]:for max_depth in [None, 3, 15]:for criterion in ['gini', 'entropy']:yield {'n_estimators': n_estimators,'max_depth': max_depth,'criterion': criterion}

LogisticRegression 也是这个过程。我们先定义模型。

from sklearn.linear_model import LogisticRegressiondef logistic_regression(X_train: pd.DataFrame,y_train: pd.Series,verbose: bool = False,**kw):model = LogisticRegression(verbose=verbose, **kw)model.fit(X_train, y_train)return model

接着生成供 LogisticRegression 使用的超参。

def gen_lr_parameters():for penalty in ['l2', 'none']:for tol in [0.1, 0.01, 1e-4]:yield {'penalty': penalty,'tol': tol}

XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。

from xgboost import XGBClassifierdef xgb(X_train: pd.DataFrame,y_train: pd.Series,verbose: bool = False,**kw):model = XGBClassifier(verbosity=int(verbose), **kw)model.fit(X_train, y_train)return model

生成一系列超参。

def gen_xgb_parameters():for n_estimators in [100, 600]:for criterion in ['gini', 'entropy']:for learning_rate in [0.001, 0.1, 0.5]:yield {'n_estimators': n_estimators,'criterion': criterion,'learning_rate': learning_rate}

验证

接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。

from sklearn.metrics import log_lossdef metric_model(model, X_test: pd.DataFrame,y_test: pd.Series) -> float:if isinstance(model, bytes):model = pickle.loads(model)y_pred = model.predict_proba(X_test)return log_loss(y_test, y_pred)def train_and_metric(train_func,train_params: dict,X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series,verbose: bool = False):# 把训练和验证封装到一起model = train_func(X_train, y_train, verbose=verbose, **train_params)metric = metric_model(model, X_test, y_test)return model, metric

找出最好的模型

做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把 n_jobs 设成 -1,这样能更好利用单机的多核。

results = []# -------------
# Random Forest
# -------------for params in gen_random_forest_parameters():print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(random_forest, params,X_train, y_train,X_test, y_test)print(f'metric: {metric}')results.append({'model': model, 'metric': metric})# -------------------
# Logistic Regression
# -------------------for params in gen_lr_parameters():print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(logistic_regression, params,X_train, y_train,X_test, y_test)print(f'metric: {metric}')results.append({'model': model, 'metric': metric})# -------
# XGBoost
# -------for params in gen_xgb_parameters():print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(xgb, params,X_train, y_train,X_test, y_test)print(f'metric: {metric}')results.append({'model': model, 'metric': metric})

运行一下,需要相当长时间,我们省略掉一部分输出内容。

calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'}
metric: 0.6964123781828575
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'}
metric: 0.6912312790832288
# 省略其他模型的输出结果
CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s

从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。

使用 Remote API 分布式加速

现在我们尝试使用 Remote API 通过分布式方式加速整个过程。

集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。

n_cores = 8
mem = 2 * n_cores  # 16G
# o 是 MaxCompute 入口,这里创建 10 个 worker 的集群,每个 worker 8核16G
cluster = o.create_mars_cluster(10, n_cores, mem, image='extended')

为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到 MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。

if not o.exist_resource('otto_train.csv'):with open('otto/train.csv') as f:# 上传资源o.create_resource('otto_train.csv', 'file', fileobj=f)def gen_data():# 改成从资源读取df = pd.read_csv(o.open_resource('otto_train.csv'))X = df.drop(['target', 'id'], axis=1)y = df['target']label_encoder = LabelEncoder()label_encoder.fit(y)y = label_encoder.transform(y)return train_test_split(X, y, test_size=0.33, random_state=123)

稍作改动之后,我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上运行。

import mars.remote as mr# n_output 说明是 4 输出
# execute() 执行后,数据会读取到 Mars 集群内部
data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()
# remote_ 开头的都是 Mars 对象,这时候数据在集群内,这些对象只是引用
remote_X_train, remote_X_test, remote_y_train, remote_y_test = data

目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,我们要对 train_and_metric 稍作改动,把模型 pickle 了之后再返回。

def distributed_train_and_metric(train_func,train_params: dict,X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series,verbose: bool = False):model, metric = train_and_metric(train_func, train_params,X_train, y_train, X_test, y_test, verbose=verbose)return pickle.dumps(model), metric

后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。

接着我们就对前面的执行过程稍作改动,把函数调用全部都用 mars.remote.spawn 来改写。

import numpy as nptasks = []
models = []
metrics = []# -------------
# Random Forest
# -------------for params in gen_random_forest_parameters():# fixed random_stateparams['random_state'] = 123task = mr.spawn(distributed_train_and_metric,args=(random_forest, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs={'verbose': 2},n_output=2)tasks.extend(task)# 把模型和评价分别存储models.append(task[0])metrics.append(task[1])# -------------------
# Logistic Regression
# -------------------for params in gen_lr_parameters():# fixed random_stateparams['random_state'] = 123task = mr.spawn(distributed_train_and_metric,args=(logistic_regression, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs={'verbose': 2},n_output=2)tasks.extend(task)# 把模型和评价分别存储models.append(task[0])metrics.append(task[1])# -------
# XGBoost
# -------for params in gen_xgb_parameters():# fixed random_stateparams['random_state'] = 123# 再指定并发为核的个数params['n_jobs'] = n_corestask = mr.spawn(distributed_train_and_metric,args=(xgb, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs={'verbose': 2},n_output=2)tasks.extend(task)# 把模型和评价分别存储models.append(task[0])metrics.append(task[1])# 把顺序打乱,目的是能分散到 worker 上平均一点
shuffled_tasks = np.random.permutation(tasks)
_ = mr.ExecutableTuple(shuffled_tasks).execute()

可以看到代码几乎一致。

运行查看结果:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s

时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。

细心的读者可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的内容只会输出到 worker 的标准输出流,我们在客户端不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。

以第0个模型为例,我们可以在 Mars 对象上直接调用 fetch_log 方法。

print(models[0].fetch_log())

输出我们简略一部分。

building tree 1 of 50
building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
# 中间省略
building tree 49 of 50
building tree 50 of 50

要看哪个模型都可以通过这种方式。试想下,如果没有 fetch_log API,你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行,也不用担心日志混合在一起。

想要了解 fetch_log 接口,可以查看 文档。

还有更多

Mars Remote API 的能力其实不止这些,举个例子,在 remote 内部可以 spawn 新的函数;也可以调用 Mars tensor、DataFrame 或者 learn 的算法。这些内容,读者们可以先行探索,后续我们再写别的文章介绍。

总结

Mars Remote API 通过并行和分布式 Python 函数,用很小的修改代价,极大提升了执行效率。

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/514794.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IIoT 安防保卫战一触即发,Fortinet 亮剑

在工业 4.0 和数字经济高速发展双浆下&#xff0c;工业互联网作为连接工业经济的重要要素&#xff0c;不断推动实体经济数字化转型的重要载体和基础设施。 《中国制造 2025》战略指出&#xff0c;工业互联网是新一轮工业革命和产业变革的重点发展行业&#xff0c;其应用及发展…

linux共享软件_为什么 linux 要用 tar.gz,很少用 7z 或 zip?

因为 7z 和 zip 压缩格式都不能保留 unix 风格的文件权限&#xff0c;比如解压出个可执行文件要重新 chmod chown 才能恢复正常。而 tar 格式可以。而 tar 本身不提供压缩&#xff0c;无非就是把包括所有文件的內容和权限拼成一个文件而己&#xff0c;所以用另外如 gzip 格式压…

端应用研发进入云原生时代

简介&#xff1a; 随着技术的发展和各种用户端场景的涌现&#xff0c;业务前台形式变得更加多样&#xff0c;“面向多样化的端场景提供无缝的、一致的数字用户旅程”已经成为了新时代企业应用架构的关键目标&#xff0c;同时它也是当下大前端技术发展背后的核心业务牵引。基于阿…

关于卫星定位,你想知道的一切

简介&#xff1a; 本文将简要介绍卫星定位的原理和应用情况&#xff0c;方便大家对北斗、卫星定位有更多的了解。 5G和北斗&#xff0c;是国之重器。北斗作为卫星定位系统&#xff0c;目前在国际上已处于领先地位&#xff0c;而且已经渗透到我们工作和生活的方方面面。本文将简…

拯救运维工程师,数据链 DNA 来袭!

在《凤凰项目——一个IT运维的传奇故事》一书中讲述运维工程师的常态&#xff1a; A工程师&#xff1a;“是的&#xff0c;我们复制了你给的那个文件……是的&#xff0c;就是1.0.13版……你说那个版本是错的&#xff0c;这话是什么意思……什么&#xff1f;你什么时候把它改了…

jre for mac 删除_在 Mac 的 Docker Desktop 中运行 K8s

Docker Desktop for Mac 从 Docker Community Edition 18.06.0-ce-mac70 2018-07-25 版本起&#xff0c;添加了对 Kubernetes 的支持&#xff0c;可以方便的在 Mac 上运行一个单节点的 K8s 集群。在 Docker Desktop 的 Preferences 中的 Kubernetes 页面里&#xff0c;提供了一…

阳振坤:OceanBase 数据库七亿 tpmC 的关键技术

OB君&#xff1a;2020年9月25日&#xff0c;OceanBase在外滩大会举办的“数据库&#xff0c;新标杆&#xff0c;新征途”分论坛正式落幕&#xff0c;内容涵盖数据库的趋势探讨、分布式数据库的技术创新与行业应用&#xff0c;及国内数据库的发展与生态。欢迎持续关注本系列内容…

数百万台车联网设备同时在线0故障,中瑞集团的云原生探索之路 | 云原生Talk

简介&#xff1a; 在保持对业界趋势调度关注的同时&#xff0c;始终选用最适合自身的技术&#xff0c;这可能是中瑞能在车联网领域引领行业的重要原因之一&#xff0c;正如中瑞CTO所说“阿里云云原生产品体系带给我们的&#xff0c;不是单纯的IT工具&#xff0c;而是整个团队战…

mysql主键重复会覆盖还是_mysql如果主键重复了会发生什么情况

首先创建一个person表&#xff1a;create TABLE person(id int not null auto_increment,name VARCHAR(255) ,age int,PRIMARY key (id))同时打开两个sql窗口set autocommitoff;set id-1;SELECTauto_increment into idFROMinformation_schema.TABLESWHEREtable_name personAND…

终止中台乱象 《2021年中国中台市场研究报告》隆重发布

2015年&#xff0c;阿里提出“大中台&#xff0c;小前台”的战略&#xff0c;帮助一线业务更敏捷地适应市场变化。随后&#xff0c;多家互联网巨头纷纷布局中台战略&#xff0c;中台概念由此全面打响。 通过中台&#xff0c;可以打通数据孤岛&#xff0c;实现快速响应、智能预…

Flink State 误用之痛,你中招了吗?

简介&#xff1a; 本文主要讨论一个问题&#xff1a;ValueState 中存 Map 与 MapState 有什么区别&#xff1f;如果不懂这两者的区别&#xff0c;而且使用 ValueState 中存大对象&#xff0c;生产环境很可能会出现以下问题&#xff1a;CPU 被打满、吞吐上不去。 本文主要讨论一…

Dubbo-go 源码笔记(一)Server 端开启服务过程

简介&#xff1a; 随着微服务架构的流行&#xff0c;许多高性能 rpc 框架应运而生&#xff0c;由阿里开源的 dubbo 框架 go 语言版本的 dubbo-go 也成为了众多开发者不错的选择。本文将介绍 dubbo-go 框架的基本使用方法&#xff0c;以及从 export 调用链的角度进行 server 端源…

mysql怎么多重查询_mysql基于值的多重查询

{"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],"search_count":[{"count_phone":4,"count":4}]},"card":[{"des":"阿里云数据库专家保驾护航&#xff0c;为用户…

华为在中国建立其全球最大的网络安全透明中心

2021年6月9日&#xff0c;华为最大的网络安全透明中心今天在中国东莞正式启用&#xff0c;来自GSMA、阿联酋、印尼的监管机构及英国标准协会、SUSE等机构代表出席并在活动上发言。借此机会&#xff0c;华为发布了《华为产品安全基线》白皮书&#xff0c;首次将产品安全需求基线…

浅析云控平台画面传输的视频流方案

简介&#xff1a; 本文将小结本次云控平台画面传输的视频流方案。 背景 ARC&#xff08;高德车机云控平台&#xff09;是一个基于车载设备业务深度定制的云控平台&#xff0c;通过该平台我们能够实现远程使用不同类型的车载设备。为了让远程使用者像在本地一样使用车载设备&am…

解读云原生基础设施

简介&#xff1a; 云原生是云计算领域的热点之一。就像 “一千个人眼里有一千个哈姆雷特”&#xff0c;大家对"云原生"的定义也见仁见智。本文将介绍云原生应用架构和生命周期管理的进化方向。 作者 | 易立 阿里云资深技术专家 导读&#xff1a;云原生是云计算领域的…

mysql al32utf8_Oracle 11g更改字符集AL32UTF8为ZHS16GBK

Oracle 9i更改字符集AL32UTF8为ZHS16GBKSQLgt; conn /as sysdba SQLgt; shutdown immediate; SQLgt; startup mount SQLgt; A首页 → 数据库技术背景&#xff1a;阅读新闻Oracle 11g更改字符集AL32UTF8为ZHS16GBK[日期&#xff1a;2011-04-26]来源&#xff1a;Linux社区作者&am…

共筑全场景智慧生态,华为HMS全球应用创新大赛火热开启

6月10日&#xff0c;2021华为HMS全球应用创新大赛&#xff08;Apps UP&#xff09;正式启动。此次大赛以“HMS Innovate For All”为主题&#xff0c;激励全球开发者集成华为HMS Core开放能力开发创新应用&#xff0c;打造全场景数字创新体验&#xff0c;为全球消费者带来全场景…

2020-11-06

一、背景介绍 &#xff08;一&#xff09;流平台通用框架 目前流平台通用的架构一般来说包括消息队列、计算引擎和存储三部分&#xff0c;通用架构如下图所示。客户端或者 web 的 log 日志会被采集到消息队列&#xff1b;计算引擎实时计算消息队列的数据&#xff1b;实时计算…

移动端堆栈关键行定位的新思路

简介&#xff1a; 崩溃堆栈是我们日常应用问题排查中的重要辅助手段&#xff0c;在移动开发上也不例外&#xff0c;为了支持用户在堆栈上的快速定位&#xff0c;我们面临一个看似比较简单问题&#xff1a;高亮崩溃中的关键行, 辅助用户快速定位问题。 阿里云 云原生应用研发平台…