分布式执行引擎ray入门--(2)Ray Data

目录

一、overview

基础代码

核心API:

二、核心概念

2.1 加载数据

从S3上读

从本地读: 

 其他读取方式

  读取分布式数据(spark)

 从ML libraries 库中读取(不支持并行读取)

从sql中读取

2.2 变换数据

map

flat_map

Transforming batches

Shuffling rows

Repartitioning data

2.3 消费数据

1) 按行遍历

2)按batch遍历

3)遍历batch时shuffle

4)为分布式并行训练分割数据

2.4 保存数据

保存文件

修改分区数

将数据转换为python对象

将数据转换为分布式数据(spark)


今天来带大家一起来学习下ray中对数据的操作,还是非常简洁的。

一、overview

 

基础代码

from typing import Dict
import numpy as np
import ray# Create datasets from on-disk files, Python objects, and cloud storage like S3.
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")# Apply functions to transform data. Ray Data executes transformations in parallel.
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:length = batch["petal length (cm)"]width = batch["petal width (cm)"]batch["petal area (cm^2)"] = length * widthreturn batchtransformed_ds = ds.map_batches(compute_area)# Iterate over batches of data.
for batch in transformed_ds.iter_batches(batch_size=4):print(batch)# Save dataset contents to on-disk files or cloud storage.
transformed_ds.write_parquet("local:///tmp/iris/")

使用ray.data可以方便地从硬盘、python对象、S3上读取文件

最后写入云端

核心API:

  • 简单变换(map_batches())

  • 全局聚合和分组聚合(groupby())

  • Shuffling 操作 (random_shuffle(), sort(), repartition()).

二、核心概念

2.1 加载数据

  • 从S3上读

import ray#加载csv文件
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
print(ds.schema())
ds.show(limit=1)#加载parquet文件
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")#加载image
ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages/")# Text
ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")# binary
ds = ray.data.read_binary_files("s3://anonymous@ray-example-data/documents")#tfrecords
ds = ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords")
  • 从本地读: 

ds = ray.data.read_parquet("local:///tmp/iris.parquet")
  • 处理压缩文件
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv.gz",arrow_open_stream_args={"compression": "gzip"},
)
  •  其他读取方式

import ray# 从python对象里获取
ds = ray.data.from_items([{"food": "spam", "price": 9.34},{"food": "ham", "price": 5.37},{"food": "eggs", "price": 0.94}
])ds = ray.data.from_items([1, 2, 3, 4, 5])# 从numpy里获取
array = np.ones((3, 2, 2))
ds = ray.data.from_numpy(array)# 从pandas里获取
df = pd.DataFrame({"food": ["spam", "ham", "eggs"],"price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_pandas(df)# 从py arrow里获取table = pa.table({"food": ["spam", "ham", "eggs"],"price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_arrow(table)
  •   读取分布式数据(spark)

import ray
import raydpspark = raydp.init_spark(app_name="Spark -> Datasets Example",num_executors=2,executor_cores=2,executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
ds = ray.data.from_spark(df)ds.show(3)

 从ML libraries 库中读取(不支持并行读取)

import ray.data
from datasets import load_dataset# 从huggingface里读取(不支持并行读取)
hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_ds = ray.data.from_huggingface(hf_ds["train"])
ray_ds.take(2)# 从TensorFlow中读取(不支持并行读取)
import ray
import tensorflow_datasets as tfdstf_ds, _ = tfds.load("cifar10", split=["train", "test"])
ds = ray.data.from_tf(tf_ds)print(ds)

从sql中读取

import mysql.connectorimport raydef create_connection():return mysql.connector.connect(user="admin",password=...,host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",connection_timeout=30,database="example",)# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql("SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql("SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

Ray还支持从BigQuery和MongoDB中读取,篇幅问题,不赘述了。

2.2 变换数据

变换默认是lazy,直到遍历、保存、检视数据集时才执行

map

import os
from typing import Any, Dict
import raydef parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:row["filename"] = os.path.basename(row["path"])return rowds = (ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True).map(parse_filename)
)

flat_map

from typing import Any, Dict, List
import raydef duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:return [row] * 2print(ray.data.range(3).flat_map(duplicate_row).take_all()
)# 结果:
# [{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]
# 原先的元素都变成2个

Transforming batches

from typing import Dict
import numpy as np
import raydef increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:batch["image"] = np.clip(batch["image"] + 4, 0, 255)return batch# batch_format:指定batch类型,可不加
ds = (ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple").map_batches(increase_brightness, batch_format="numpy")
)

如果初始化较贵,使用类而不是函数,这样每次调用类的时候,进行初始化。类有状态,而函数没有状态。

并行度可以指定(min,max)来自由调整

Shuffling rows

import rayds = (ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple").random_shuffle()
)

Repartitioning data

import rayds = ray.data.range(10000, parallelism=1000)# Repartition the data into 100 blocks. Since shuffle=False, Ray Data will minimize
# data movement during this operation by merging adjacent blocks.
ds = ds.repartition(100, shuffle=False).materialize()# Repartition the data into 200 blocks, and force a full data shuffle.
# This operation will be more expensive
ds = ds.repartition(200, shuffle=True).materialize()

2.3 消费数据

1) 按行遍历

import rayds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")for row in ds.iter_rows():print(row)

2)按batch遍历

numpy、pandas、torch、tf使用不同的API遍历batch

# numpy
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):print(batch)# pandas
import ray
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):print(batch)# torch
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_torch_batches(batch_size=2):print(batch)# tf
import rayds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")tf_dataset = ds.to_tf(feature_columns="sepal length (cm)",label_columns="target",batch_size=2
)
for features, labels in tf_dataset:print(features, labels)

3)遍历batch时shuffle

只需要在遍历batch时增加local_shuffle_buffer_size参数即可。

非全局洗牌,但性能更好。

import rayds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")for batch in ds.iter_batches(batch_size=2,batch_format="numpy",local_shuffle_buffer_size=250,
):print(batch)

4)为分布式并行训练分割数据

import ray@ray.remote
class Worker:def train(self, data_iterator):for batch in data_iterator.iter_batches(batch_size=8):passds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
workers = [Worker.remote() for _ in range(4)]
shards = ds.streaming_split(n=4, equal=True)
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])

2.4 保存数据

保存文件

非常类似pandas保存文件,唯一的区别保存本地文件时需要加入local://前缀。

注意:如果不加local://前缀,ray则会将不同分区的数据写在不同节点上

import rayds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")# local
ds.write_parquet("local:///tmp/iris/")# s3
ds.write_parquet("s3://my-bucket/my-folder")

修改分区数

import os
import rayds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.repartition(2).write_csv("/tmp/two_files/")print(os.listdir("/tmp/two_files/"))

将数据转换为python对象

import rayds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")df = ds.to_pandas()
print(df)

将数据转换为分布式数据(spark)

import ray
import raydpspark = raydp.init_spark(app_name = "example",num_executors = 1,executor_cores = 4,executor_memory = "512M"
)ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_spark(spark)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/735848.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pandas DataFrame 写入 Excel 的三种场景及方法

一、引言 本文主要介绍如何将 pandas 的 DataFrame 数据写入 Excel 文件中,涉及三个不同的应用场景: 单个工作表写入:将单个 DataFrame 写入 Excel 表中;多个工作表写入:将多个 DataFrame 写入到同一个 Excel 表中的…

Spring boot2.7整合jetcache 本地linkedhashmap缓存方案

好 上文 Spring boot2.7整合jetcache 远程redis缓存方案 我们讲完了 远程实现方案 本文 我们来说说 本地 jetcache解决方案 首先是 application.yml 在jetcache下加上 local:default:type: linkedhashmapkeyConvertor: fastjson我们技术用的 本地缓存 linkedhashmap 这里 我们…

关于安卓子模块打包,aar,jar丢失问题

背景 项目中,三方依赖是一个本地子module,子module中,又引入了一些aar,jar包。这个时候,如果子module要输入一个aar包给业务侧调用,观察打包结果。 问题 如果直接使用gradle进行打包,会导致子…

【大厂AI课学习笔记NO.69】使用开源管理仓库

了解了开源框架,开源项目,今天来学习开源管理仓库。 我们先说Git,开源的版本管理分布式系统。 GitHub,则是世界上最大的代码托管平台,面向开源和私有项目托管。 有的人总是分不清这两个,其实一个是版本管…

批量提取PDF指定区域内容到 Excel 以及根据PDF里面第一页的标题来批量重命名-附思路和代码实现

首先说明下,PDF需要是电子版本的,不能是图片或者无法选中的那种。 需求1:假如我有一批数量比较多的同样格式的PDF电子文档,需要把特定多个区域的数字或者文字提取出来 需求2:我有一批PDF文档,但是文件的名…

创建旅游景点图数据库Neo4J技术验证

文章目录 创建旅游景点图数据库Neo4J技术验证写在前面基础数据建库python3源代码KG效果KG入库效率优化方案PostGreSQL建库 创建旅游景点图数据库Neo4J技术验证 写在前面 本章主要实践内容: (1)neo4j知识图谱库建库。使用导航poi中的公园、景…

【C语言】——详解操作符(下)

【C语言】——详解操作符(下) 前言七、关系操作符八、逻辑操作符8.1、& 与运算符8.2、 | 或运算符 九、条件操作符十、逗号表达式十一、下标引用与函数调用操作符11.1、[ ] 下标引用操作符11.2、( ) 函数调用操作符 十二、 结构成员操作符12.1、…

新版ui周易测算网站H5源码/在线起名网站源码/运势测算网站系统源码,附带系统搭建教程

支持对接第三方支付 安装方法以linux为例 1、建议在服务器上面安装宝塔面板,以便操作,高逼格技术员可以忽略这步操作。 2、把安装包文件解压到根目录,同时建立数据库,把数据文件导入数据库 3、修改核心文件config/inc_config.…

“ReferenceError: AMap is not defined“

问题 笔者进行web开发&#xff0c;引入高德地图&#xff0c;控制台报错 "ReferenceError: AMap is not defined"详细问题 vue.runtime.esm.js:4662 [Vue warn]: Error in mounted hook: "ReferenceError: AMap is not defined"found in---> <Map&…

React-嵌套路由

1.概念 说明&#xff1a;在一级路由中又内嵌了其他路由&#xff0c;这种关系就叫做嵌套路由&#xff0c;嵌套至一级路由内的路由又称作二级路由。 2.实现步骤 说明&#xff1a;使用childen属性配置路由嵌套关系&#xff0c;使用<Outlet/>组件配置二级路由渲染的位置。…

吴恩达机器学习-可选实验室:逻辑回归,决策边界(Logistic Regression,Decision Boundary))

文章目录 目标数据集图数据逻辑回归模型复习逻辑回归和决策边界绘图决策边界恭喜 目标 在本实验中&#xff0c;你将:绘制逻辑回归模型的决策边界。这会让你更好地理解模型的预测。 import numpy as np %matplotlib widget import matplotlib.pyplot as plt from lab_utils_co…

Day41| 416 分割等和子集

目录 416 分割等和子集 416 分割等和子集 class Solution { public:bool canPartition(vector<int>& nums) {int sum 0;vector<int> dp(10010, 0);for (int i 0; i < nums.size(); i) {sum nums[i];}if (sum % 2 1) return false;int target sum /…

软考笔记--信息系统架构

一.架构风格 信息系统架构设计的一个核心问题是能否使用重复的信息系统架构模式&#xff0c;即能否达到架构级别的软件重用。信息系统架构风格是描述某个特定应用领域中系统组织方式的惯用模式&#xff0c;架构风格定义了一个系统家族&#xff0c;即一个架构定义一个词汇表和一…

pytorch单机多卡训练 logger日志记录和wandb可视化

PyTorch 单机多卡训练示例 1、工具&#xff1a;2、代码3、启动 1、工具&#xff1a; wandb&#xff1a;云端保存训练记录&#xff0c;可实时刷新logging&#xff1a;记录训练日志argparse&#xff1a;设置全局参数 2、代码 import os import time import torch import wandb…

elementPlus的坑

记录由 element ui 到element plus的过程 el-form v-model与:model v-model就不用说了&#xff0c;这个:model类似于内置的API接口&#xff0c;用的时候这两个值一样就行 不一样的话会出现&#xff0c;如下奇怪的情况 能输入&#xff0c;但是只能文本框中只显示1个字符&#x…

jmeter快速使用

文章目录 前言一、安装jmeter二、插件安装三、添加常用监听器参考 前言 Apache JMeter may be used to test performance both on static and dynamic resources, Web dynamic applications. It can be used to simulate a heavy load on a server, group of servers, network…

Redis核心数据结构之整数集合

整数集合 概述 整数集合(intset)是集合键的底层实现之一&#xff0c;当一个集合只包含整数值元素&#xff0c;并且这个结合的元素数量不多时&#xff0c;Redis就会使用整数集合作为集合键的底层实现。 例子 举个例子&#xff0c;如果创建一个只包含五个元素的集合键&#x…

MySQL 8.0 架构 之 慢查询日志(Slow query log)(2)流程图:查询记录到慢查询日志中的条件

文章目录 MySQL 8.0 架构 之 慢查询日志&#xff08;Slow query log&#xff09;&#xff08;2&#xff09;流程图&#xff1a;查询记录到慢查询日志中的条件确定查询是否会记录在慢查询日志中的流程图参考 【声明】文章仅供学习交流&#xff0c;观点代表个人&#xff0c;与任何…

JavaScript数组方法常用方法大全

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1. push()2. pop()3. unshift()4. shift()5. isArray()6. map()7. filter()8. every()9. some()10. splice()11. slice()12. indexOf()13. includes()14. concat()1…

RK3588 Android 12 系统内核开发+Native层脚本自启动+SELinux配置

前言 开发板型号&#xff1a;RK_EVB7_RK3588_LP4…_V11目标&#xff1a;在开发板上随开机自启动脚本&#xff0c;带起二进制程序&#xff0c;并完备一些其他系统功能。简介&#xff1a;本文自启动脚本run.sh唯一的作用就是拉起二进制程序demo&#xff1b;demo是简单的hello_wo…