分布式执行引擎ray入门--(2)Ray Data

目录

一、overview

基础代码

核心API:

二、核心概念

2.1 加载数据

从S3上读

从本地读: 

 其他读取方式

  读取分布式数据(spark)

 从ML libraries 库中读取(不支持并行读取)

从sql中读取

2.2 变换数据

map

flat_map

Transforming batches

Shuffling rows

Repartitioning data

2.3 消费数据

1) 按行遍历

2)按batch遍历

3)遍历batch时shuffle

4)为分布式并行训练分割数据

2.4 保存数据

保存文件

修改分区数

将数据转换为python对象

将数据转换为分布式数据(spark)


今天来带大家一起来学习下ray中对数据的操作,还是非常简洁的。

一、overview

 

基础代码

from typing import Dict
import numpy as np
import ray# Create datasets from on-disk files, Python objects, and cloud storage like S3.
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")# Apply functions to transform data. Ray Data executes transformations in parallel.
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:length = batch["petal length (cm)"]width = batch["petal width (cm)"]batch["petal area (cm^2)"] = length * widthreturn batchtransformed_ds = ds.map_batches(compute_area)# Iterate over batches of data.
for batch in transformed_ds.iter_batches(batch_size=4):print(batch)# Save dataset contents to on-disk files or cloud storage.
transformed_ds.write_parquet("local:///tmp/iris/")

使用ray.data可以方便地从硬盘、python对象、S3上读取文件

最后写入云端

核心API:

  • 简单变换(map_batches())

  • 全局聚合和分组聚合(groupby())

  • Shuffling 操作 (random_shuffle(), sort(), repartition()).

二、核心概念

2.1 加载数据

  • 从S3上读

import ray#加载csv文件
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
print(ds.schema())
ds.show(limit=1)#加载parquet文件
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")#加载image
ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages/")# Text
ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")# binary
ds = ray.data.read_binary_files("s3://anonymous@ray-example-data/documents")#tfrecords
ds = ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords")
  • 从本地读: 

ds = ray.data.read_parquet("local:///tmp/iris.parquet")
  • 处理压缩文件
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv.gz",arrow_open_stream_args={"compression": "gzip"},
)
  •  其他读取方式

import ray# 从python对象里获取
ds = ray.data.from_items([{"food": "spam", "price": 9.34},{"food": "ham", "price": 5.37},{"food": "eggs", "price": 0.94}
])ds = ray.data.from_items([1, 2, 3, 4, 5])# 从numpy里获取
array = np.ones((3, 2, 2))
ds = ray.data.from_numpy(array)# 从pandas里获取
df = pd.DataFrame({"food": ["spam", "ham", "eggs"],"price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_pandas(df)# 从py arrow里获取table = pa.table({"food": ["spam", "ham", "eggs"],"price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_arrow(table)
  •   读取分布式数据(spark)

import ray
import raydpspark = raydp.init_spark(app_name="Spark -> Datasets Example",num_executors=2,executor_cores=2,executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
ds = ray.data.from_spark(df)ds.show(3)

 从ML libraries 库中读取(不支持并行读取)

import ray.data
from datasets import load_dataset# 从huggingface里读取(不支持并行读取)
hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_ds = ray.data.from_huggingface(hf_ds["train"])
ray_ds.take(2)# 从TensorFlow中读取(不支持并行读取)
import ray
import tensorflow_datasets as tfdstf_ds, _ = tfds.load("cifar10", split=["train", "test"])
ds = ray.data.from_tf(tf_ds)print(ds)

从sql中读取

import mysql.connectorimport raydef create_connection():return mysql.connector.connect(user="admin",password=...,host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",connection_timeout=30,database="example",)# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql("SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql("SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

Ray还支持从BigQuery和MongoDB中读取,篇幅问题,不赘述了。

2.2 变换数据

变换默认是lazy,直到遍历、保存、检视数据集时才执行

map

import os
from typing import Any, Dict
import raydef parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:row["filename"] = os.path.basename(row["path"])return rowds = (ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True).map(parse_filename)
)

flat_map

from typing import Any, Dict, List
import raydef duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:return [row] * 2print(ray.data.range(3).flat_map(duplicate_row).take_all()
)# 结果:
# [{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]
# 原先的元素都变成2个

Transforming batches

from typing import Dict
import numpy as np
import raydef increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:batch["image"] = np.clip(batch["image"] + 4, 0, 255)return batch# batch_format:指定batch类型,可不加
ds = (ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple").map_batches(increase_brightness, batch_format="numpy")
)

如果初始化较贵,使用类而不是函数,这样每次调用类的时候,进行初始化。类有状态,而函数没有状态。

并行度可以指定(min,max)来自由调整

Shuffling rows

import rayds = (ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple").random_shuffle()
)

Repartitioning data

import rayds = ray.data.range(10000, parallelism=1000)# Repartition the data into 100 blocks. Since shuffle=False, Ray Data will minimize
# data movement during this operation by merging adjacent blocks.
ds = ds.repartition(100, shuffle=False).materialize()# Repartition the data into 200 blocks, and force a full data shuffle.
# This operation will be more expensive
ds = ds.repartition(200, shuffle=True).materialize()

2.3 消费数据

1) 按行遍历

import rayds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")for row in ds.iter_rows():print(row)

2)按batch遍历

numpy、pandas、torch、tf使用不同的API遍历batch

# numpy
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):print(batch)# pandas
import ray
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):print(batch)# torch
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_torch_batches(batch_size=2):print(batch)# tf
import rayds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")tf_dataset = ds.to_tf(feature_columns="sepal length (cm)",label_columns="target",batch_size=2
)
for features, labels in tf_dataset:print(features, labels)

3)遍历batch时shuffle

只需要在遍历batch时增加local_shuffle_buffer_size参数即可。

非全局洗牌,但性能更好。

import rayds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")for batch in ds.iter_batches(batch_size=2,batch_format="numpy",local_shuffle_buffer_size=250,
):print(batch)

4)为分布式并行训练分割数据

import ray@ray.remote
class Worker:def train(self, data_iterator):for batch in data_iterator.iter_batches(batch_size=8):passds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
workers = [Worker.remote() for _ in range(4)]
shards = ds.streaming_split(n=4, equal=True)
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])

2.4 保存数据

保存文件

非常类似pandas保存文件,唯一的区别保存本地文件时需要加入local://前缀。

注意:如果不加local://前缀,ray则会将不同分区的数据写在不同节点上

import rayds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")# local
ds.write_parquet("local:///tmp/iris/")# s3
ds.write_parquet("s3://my-bucket/my-folder")

修改分区数

import os
import rayds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.repartition(2).write_csv("/tmp/two_files/")print(os.listdir("/tmp/two_files/"))

将数据转换为python对象

import rayds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")df = ds.to_pandas()
print(df)

将数据转换为分布式数据(spark)

import ray
import raydpspark = raydp.init_spark(app_name = "example",num_executors = 1,executor_cores = 4,executor_memory = "512M"
)ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_spark(spark)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/735848.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pandas DataFrame 写入 Excel 的三种场景及方法

一、引言 本文主要介绍如何将 pandas 的 DataFrame 数据写入 Excel 文件中,涉及三个不同的应用场景: 单个工作表写入:将单个 DataFrame 写入 Excel 表中;多个工作表写入:将多个 DataFrame 写入到同一个 Excel 表中的…

Spring boot2.7整合jetcache 本地linkedhashmap缓存方案

好 上文 Spring boot2.7整合jetcache 远程redis缓存方案 我们讲完了 远程实现方案 本文 我们来说说 本地 jetcache解决方案 首先是 application.yml 在jetcache下加上 local:default:type: linkedhashmapkeyConvertor: fastjson我们技术用的 本地缓存 linkedhashmap 这里 我们…

【大厂AI课学习笔记NO.69】使用开源管理仓库

了解了开源框架,开源项目,今天来学习开源管理仓库。 我们先说Git,开源的版本管理分布式系统。 GitHub,则是世界上最大的代码托管平台,面向开源和私有项目托管。 有的人总是分不清这两个,其实一个是版本管…

批量提取PDF指定区域内容到 Excel 以及根据PDF里面第一页的标题来批量重命名-附思路和代码实现

首先说明下,PDF需要是电子版本的,不能是图片或者无法选中的那种。 需求1:假如我有一批数量比较多的同样格式的PDF电子文档,需要把特定多个区域的数字或者文字提取出来 需求2:我有一批PDF文档,但是文件的名…

【C语言】——详解操作符(下)

【C语言】——详解操作符(下) 前言七、关系操作符八、逻辑操作符8.1、& 与运算符8.2、 | 或运算符 九、条件操作符十、逗号表达式十一、下标引用与函数调用操作符11.1、[ ] 下标引用操作符11.2、( ) 函数调用操作符 十二、 结构成员操作符12.1、…

新版ui周易测算网站H5源码/在线起名网站源码/运势测算网站系统源码,附带系统搭建教程

支持对接第三方支付 安装方法以linux为例 1、建议在服务器上面安装宝塔面板,以便操作,高逼格技术员可以忽略这步操作。 2、把安装包文件解压到根目录,同时建立数据库,把数据文件导入数据库 3、修改核心文件config/inc_config.…

“ReferenceError: AMap is not defined“

问题 笔者进行web开发&#xff0c;引入高德地图&#xff0c;控制台报错 "ReferenceError: AMap is not defined"详细问题 vue.runtime.esm.js:4662 [Vue warn]: Error in mounted hook: "ReferenceError: AMap is not defined"found in---> <Map&…

React-嵌套路由

1.概念 说明&#xff1a;在一级路由中又内嵌了其他路由&#xff0c;这种关系就叫做嵌套路由&#xff0c;嵌套至一级路由内的路由又称作二级路由。 2.实现步骤 说明&#xff1a;使用childen属性配置路由嵌套关系&#xff0c;使用<Outlet/>组件配置二级路由渲染的位置。…

吴恩达机器学习-可选实验室:逻辑回归,决策边界(Logistic Regression,Decision Boundary))

文章目录 目标数据集图数据逻辑回归模型复习逻辑回归和决策边界绘图决策边界恭喜 目标 在本实验中&#xff0c;你将:绘制逻辑回归模型的决策边界。这会让你更好地理解模型的预测。 import numpy as np %matplotlib widget import matplotlib.pyplot as plt from lab_utils_co…

Redis核心数据结构之整数集合

整数集合 概述 整数集合(intset)是集合键的底层实现之一&#xff0c;当一个集合只包含整数值元素&#xff0c;并且这个结合的元素数量不多时&#xff0c;Redis就会使用整数集合作为集合键的底层实现。 例子 举个例子&#xff0c;如果创建一个只包含五个元素的集合键&#x…

MySQL 8.0 架构 之 慢查询日志(Slow query log)(2)流程图:查询记录到慢查询日志中的条件

文章目录 MySQL 8.0 架构 之 慢查询日志&#xff08;Slow query log&#xff09;&#xff08;2&#xff09;流程图&#xff1a;查询记录到慢查询日志中的条件确定查询是否会记录在慢查询日志中的流程图参考 【声明】文章仅供学习交流&#xff0c;观点代表个人&#xff0c;与任何…

JavaScript数组方法常用方法大全

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1. push()2. pop()3. unshift()4. shift()5. isArray()6. map()7. filter()8. every()9. some()10. splice()11. slice()12. indexOf()13. includes()14. concat()1…

【大厂AI课学习笔记NO.76】人工智能人才金字塔

人工智能领域&#xff0c;分为源头创新人才、产业研发人才、应用开发人才和实用技能人才。 人工智能领域的人才结构呈现多样化特点&#xff0c;主要可以分为源头创新人才、产业研发人才、应用开发人才和实用技能人才四大类。这四大类人才在人工智能领域的发展中各自扮演着不可或…

Python刘诗诗

写在前面 刘诗诗在电视剧《一念关山》中饰演了女主角任如意&#xff0c;这是一个极具魅力的女性角色&#xff0c;她既是一位有着高超武艺和智慧的女侠士&#xff0c;也曾经是安国朱衣卫前左使&#xff0c;身怀绝技且性格坚韧不屈。剧中&#xff0c;任如意因不满于朱衣卫的暴行…

P1948 [USACO08JAN] Telephone Lines S

Here 典中之典&#xff01;&#xff01; 解题思路 可选k条边代价为0如何决策&#xff1f; 将到当前位置选择了几条代价为0的边放入状态&#xff0c;即若当前状态选的边数小于&#xff0c;则可以进行决策&#xff0c;是否选择当前边&#xff0c;若选&#xff0c;则&#xff0c…

基于智慧灯杆的智慧城市解决方案(2)

功能规划 智慧照明功能 智慧路灯的基本功能仍然是道路照明, 因此对照明功能的智慧化提升是最基本的一项要求。 对道路照明管理进行智慧化提升, 实施智慧照明, 必然将成为智慧城市中道路照明发展的主要方向之一。 智慧照明是集计算机网络技术、 通信技术、 控制技术、 数据…

uniapp:小程序数字键盘功能样式实现

代码如下&#xff1a; <template><view><view><view class"money-input"><view class"input-container" click"toggleBox"><view class"input-wrapper"><view class"input-iconone"…

C++ 队列

目录 队列的应用场景 1、429. N 叉树的层序遍历 2、 103. 二叉树的锯齿形层序遍历 3、662. 二叉树最大宽度 4、515. 在每个树行中找最大值 队列的应用场景 广度优先搜索&#xff08;BFS&#xff09;&#xff1a;队列是广度优先搜索算法的核心数据结构。在BFS中&#xff…

C语言:深入补码计算原理

C语言&#xff1a;深入补码计算原理 有符号整数存储原码、反码、补码转换规则数据与内存的关系 补码原理 有符号整数存储 原码、反码、补码 有符号整数的2进制表示方法有三种&#xff0c;即原码、反码和补码 三种表示方法均有符号位和数值位两部分&#xff0c;符号位用0表示“…

Linux:kubernetes(k8s)lable和selecto标签和选择器的使用(11)

通过标签是可以让我们的容器和容器之间相互认识&#xff0c;简单来说一边打了标签&#xff0c;一边使用选择器去选择就可以快速的让他们之间耦合 定义标签有两种办法&#xff0c;一个是文件中&#xff0c;一个是命令行里 我们在前几章编进文件的时候里面都有lable比如 这个就是…