pytorch 数据预加载

1. Abstract

本文介绍一个工具 PreDataLoader,它包装 torch.utils.data.DataLoader,接收该类的一个实例 loader,启动一个线程 t,创建一个队列 qtloader 中的数据预加载到队列 q 中, 以在模型计算时也能启动启动数据加载程序, 节省数据加载时间。代码:

class PreDataLoader(object):"""@Author: Yuwei from https://www.zhihu.com/people/aewil-zheng, with few changes** 包装 torch.utils.data.DataLoader, 接收该类的一个实例 loader, 启动一个线程 t, 创建一个队列 qt 将 loader 中的数据预加载到队列 q 中, 以在模型计算时也能启动启动数据加载程序, 节省数据加载时间** 若提供了 cuda device, 数据将直接被加载到 GPU 上"""def __init__(self, loader, device=None, queue_size=2):""":param loader: torch.utils.data.DataLoader:param device: torch.device('cuda' or 'cpu'), to use cpu, set None:param queue_size: the number of samples to be preloaded"""self.__loader = loaderself.__device = deviceself.__queue_size = queue_sizeself.__load_stream = torch.cuda.Stream(device=device) \if str(device).startswith('cuda') else None  # 如果提供了 cuda device, 则创建 cuda 流self.__queue = Queue(maxsize=self.__queue_size)self.__idx = 0self.__worker = Thread(target=self._load_loop)self.__worker.setDaemon(True)self.__worker.start()def _load_loop(self):""" 不断的将数据加载到队列里 """if str(self.__device).startswith('cuda'):logging.info(f'>>> data will be preloaded into device \'{self.__device}\'')logging.info(f'>>> this may cost more GPU memory!!!')# The loop that will load into the queue in the backgroundtorch.cuda.set_device(self.__device)while True:for sample in self.__loader:self.__queue.put(self._load_instance(sample))else:while True:for sample in self.__loader:self.__queue.put(sample)def _load_instance(self, sample):""" 将 batch 数据从 CPU 加载到 GPU 中 """if torch.is_tensor(sample):with torch.cuda.stream(self.__load_stream):return sample.to(self.__device, non_blocking=True)elif sample is None or type(sample) == str:return sampleelif isinstance(sample, dict):return {k: self._load_instance(v) for k, v in sample.items()}else:return [self._load_instance(s) for s in sample]def __iter__(self):self.__idx = 0return selfdef __next__(self):# 加载线程挂了if not self.__worker.is_alive() and self.__queue.empty():self.__idx = 0self.__queue.join()self.__worker.join()raise StopIteration# 一个 epoch 加载完了elif self.__idx >= len(self.__loader):self.__idx = 0raise StopIteration# 下一个 batchelse:out = self.__queue.get()self.__queue.task_done()self.__idx += 1return outdef next(self):return self.__next__()def __len__(self):return len(self.__loader)@propertydef sampler(self):return self.__loader.sampler@propertydef dataset(self):return self.__loader.dataset

如果你对实现技术细节不感兴趣,也可直接拿来用。后面我将对相关细节展开讨论,包括:

  • python 中的并发与并行;
  • cuda 流:torch.cuda.Stream(device=device)

2. python 中的并发与并行

总所周知,由于 Global Interpreter Lock (GIL) 的存在,Python 语言中,任何时间点只有一个线程在执行,即便在多核 CPU 上,Python 的多线程也无法实现真正的并行计算。

GIL 的原因在于 Python 的内存管理并不是线程安全的。为了防止多个线程同时操作一个对象,造成数据混乱的问题,Python 设定了 GIL 来限制多线程的并发执行。因此,尽管你可以在 Python 中创建多线程,并且看起来他们是同时运行的,但实质上,在任一时刻,只有一个线程在执行。

既然如此,上面代码使用多线程是如何提高程序的效率的?再看:

然而,如果你的程序是 IO 密集型的,例如大量的网络请求或文件读写操作,那么使用多线程还是能显著提高程序的效率的,因为在等待 IO 的过程中,其他线程还可以继续执行。

数据的预加载应该算是 IO 吧,那模型计算和数据加载能并行吗

2.1 Numpy 和 PyTorch 底层计算是多线程并行的

Numpy 的底层实现是 C 语言,计算速度和并发性远胜于 Python,当我们使用 numpy 进行计算时,特别是复杂的矩阵运算,Python 程序会把这个任务抛给底层的 C 语言进行计算,从而能够使用 CPU 多核。验证:

import time
import numpy as npdef dot():start = time.time()a = np.random.randn(10000, 10000)b = np.random.randn(10000, 10000)np.dot(a, b)end = time.time()print(end - start)dot()

验证代码用 numpy.dot() 计算两个 10000 10000 10000 维的矩阵乘法,观察 CPU 的使用效率(i5-10400,6核心12线程),发现 CPU 使用率很快从不足 20% 提升至 80% 左右。计算时间约为 15s

为了确定是否真的使用了多核,再设计一个 Python 计算程序:

import timedef add():cnt = 1start = time.time()for i in range(500000000):  # 累加cnt += 1end = time.time()print(end - start)add()

五亿次加法运算,耗时约 20s,CPU 使用率全程维持在 20% 以下。如此说来,numpy 确实是在使用多核并行计算。

下面看一看 Python 多线程能不能使它们并行计算:

import threading
import time
import numpy as npdef dot():start = time.time()a = np.random.randn(10000, 10000)b = np.random.randn(10000, 10000)np.dot(a, b)end = time.time()print(end - start)def add():cnt = 1start = time.time()for i in range(500000000):cnt += 1end = time.time()print(end - start)t = threading.Thread(target=dot)s = time.time()
add()
t.start()
t1.join()
e = time.time()
print(e - s)

输出:

15.057043313980103
23.129913806915283
23.13091516494751

如果说整个程序只能同时使用一个 CPU 核,那么整体计算时间应该是两部分计算时间的和 35s 左右,但这里只用了 23s,可见 numpy 底层并行计算是实锤了。而且,这两个函数的计算是并行的,即 np.dot() 在计算的时候,add() 也在计算。为什么 add 计算相比其单独运行时多了 3s?而 np.dot() 计算时间基本没变?

可以排除 CPU 资源不够的可能,否则的话,np.dot() 的计算时间也要加长;再者我观察了 CPU 利用率,全程未达到 100%。我觉得这是线程切换的开销add() 可能不是一直在运行的,多个 Python 线程还是只能使用一个 CPU 核,线程之间交替执行,只不过 np.dot() 线程在离开后,底层运行还在继续,而 add() 线程离开后,其不再运行。即:有那么 3s 时间,add() 没运行,“单核 CPU” 转向了线程 np.dot() 检查计算结果是否已返回。

再增加一个 numpy 计算任务线程:

...
t1 = threading.Thread(target=dot)
t2 = threading.Thread(target=dot)s = time.time()
add()
t1.start()
t2.start()
t1.join()
t2.join()
e = time.time()
print(e - s)

输出:

25.624603986740112
27.81219220161438
30.751672983169556
30.752644538879395

时间增加了不少,基本快赶上计算一次 dot() 时间的两倍了。这大概是由于 CPU 的计算达到了极限:

CPU 利用率长时间维持在 100%。

以上验证对于 PyTorch 也是一样的。

结论:numpy 和 pytorch 的计算不受 GIL 的限制,可以使用 CPU 多核;一个线程中,numpy 和 pytorch 将计算丢给底层的 C/C++ 语言后,“等待计算结果”类似于 IO,会释放 GIL 锁,而计算还在继续,其他 python 线程可以得到执行。
推论:使用 GPU 计算是同样的道理,python 程序将计算丢给 GPU 后,等待计算结果,当前线程阻塞,释放 GIL 锁,其他 python 线程得以执行,从而提高计算效率。

3. torch.cuda.Stream(device=device)


torch.cuda.Stream 是 PyTorch 库中的一个类,用于管理 GPU 上的异步操作。

在 GPU 上执行计算任务时,通常可以使用多个流(stream)来并行执行不同的操作。每个流都有自己的命令队列,可以独立地执行操作,从而提高计算效率。torch.cuda.Stream 就是用来创建和管理这些流的。

使用 torch.cuda.Stream,可以将一系列 GPU 操作放入一个流中,并且可以通过调用流的 synchronize() 方法来等待流中所有操作完成。这对于需要处理多个 GPU 操作的情况非常有用。

以下是一个使用 torch.cuda.Stream 的示例代码:

import torchstream = torch.cuda.Stream()  # 创建流对象with torch.cuda.stream(stream):  # 在流中执行操作# 执行GPU操作# ...stream.synchronize()  # 等待流中操作完成

在上述示例中,我们首先创建了一个 torch.cuda.Stream 对象 stream。然后,我们使用 with 语句块将一些 GPU 操作放入流中执行。最后,我们调用 stream.synchronize() 来等待流中的操作完成。

通过使用 torch.cuda.Stream,我们可以更灵活地控制 GPU 操作的执行顺序和并行性,以优化计算性能。


以上是 GPT3.5 给出的关于 torch.cuda.Stream 的简介。另外,还可参考教程《如何在 Pytorch 中使用 CUDA 流(CUDA stream)》 讲的不错。我现在将其搬过来:


什么是 CUDA 流(CUDA stream)?

CUDA 流是一种在 GPU 上并行执行操作的机制。在默认情况下,PyTorch 会在默认的流上执行所有的操作,即在主流(default stream)上进行。但是,当我们有一些可以并行执行的操作时,通过将这些操作分配到不同的流上,我们可以在 GPU 上更有效地利用计算资源。

第一句就强调:并行执行操作的机制。

如何创建 CUDA 流?

可以通过 torch.cuda.Stream() 函数来创建 CUDA 流:

stream = torch.cuda.Stream()

使用 torch.cuda.Stream() 函数创建了一个名为 stream 的 CUDA 流。

如何使用 CUDA 流?

通过 with 上下文管理操作,并使用 stream.synchronize() 方法等待操作完成:

import torch# 创建两个CUDA流
stream1 = torch.cuda.Stream()
stream2 = torch.cuda.Stream()# 分别将操作记录到两个流上
with torch.cuda.stream(stream1):# 执行操作1# ...with torch.cuda.stream(stream2):# 执行操作2# ...# 等待两个流上的操作完成
torch.cuda.synchronize(stream1)
torch.cuda.synchronize(stream2)

我们创建了两个 CUDA 流 stream1stream2。然后,在两个流上分别记录操作,并使用torch.cuda.synchronize() 方法等待这些操作完成。

如何利用 CUDA 流提高性能?

一种常见的用法是将计算数据传输操作分配到不同的流上,从而实现计算和数据传输的并行执行


3.1 对 PreDataLoader 中 CUDA 流的解释

with torch.cuda.stream(self.__load_stream):return sample.to(self.__device, non_blocking=True)

这一句 sample.to(self.__device, non_blocking=True) 算是数据传输吧,它处在一个数据预加载线程中,想要与模型计算并行。那么按照上面的教程:一个 CUDA 流中的操作是顺序执行的,模型计算使用的是默认流(default stream),平时我们的代码 sample.to(device) 也使用了默认流,这意味着数据的传输和模型计算是串行的。

所以,PreDataLoader 中定义了一个新的 CUDA 流,把 sample.to(self.__device, non_blocking=True) 放入这个新 CUDA 流,就可以和模型计算并行了。

4. @property

@property 是一个装饰器,用于将类的方法转换为属性。通过使用 @property,您可以定义一个方法,并将其作为实例的属性来访问,而不需要使用函数调用的语法。

下面是一个示例,说明如何使用 @property 装饰器:

class Circle:def __init__(self, radius):self.radius = radius@propertydef diameter(self):return 2 * self.radius@diameter.setterdef diameter(self, value):self.radius = value / 2# 创建 Circle 对象
circle = Circle(5)# 访问 diameter 属性(实际上是调用了 diameter 方法)
print(circle.diameter)  # 输出:10# 设置 diameter 属性(实际上是调用了 diameter.setter 方法)
circle.diameter = 14
print(circle.radius)  # 输出:7

在上面的示例中,Circle 类定义了一个 radius 实例变量和一个 diameter 方法(被 @property 装饰)。当我们像访问属性一样访问 circle.diameter 时,实际上是调用了 diameter 方法并返回其结果。

此外,我们还可以使用 @property 创建一个 setter 方法,用于设置属性的值。在示例中,diameter 属性的 setter 方法名为 diameter.setter,它接受一个参数 value,我们可以在 setter 方法中对 self.radius 进行更新。

总结:使用 @property 装饰器可以将一个方法定义为属性,并提供更加方便和易读的方式来访问和设置属性。

既然担心 Python 线程的 GIL 问题,为何不直接用多进程?

:多进程没那么好用,进程是重量级的,有独立的内存管理,共享内存是比较麻烦的:

import multiprocessingclass Int(object):def __init__(self, i):self.__int = idef add(self):self.__int += 1def print(self):print(self.__int)def add(integer: Int):integer.add()integer.print()print(id(integer))if __name__ == '__main__':a_integer = Int(0)p1 = multiprocessing.Process(target=add, args=(a_integer,))p2 = multiprocessing.Process(target=add, args=(a_integer,))p3 = multiprocessing.Process(target=add, args=(a_integer,))p1.start()p2.start()p3.start()add(a_integer)a_integer.print()

输出:

1
1839132811024
1
1
2091010788944
1
1721319788112
1
2095109213776

可见,各进程操作的 Int 对象不是同一个,即,创建子进程时传入参数会是参数的一份拷贝

如果将 multiprocessing.Process 换成 threading.Thread,则输出:

1
2691328945888
2
2691328945888
3
2691328945888
4
2691328945888
4

创建线程时传入参数会是参数对象本身

此外,子进程不能访问主线程的变量,如果:

def add(integer: Int):integer.add()integer.print()b_integer.add()  # 加一个主进程中的变量print(id(integer))

则会报错。而线程则可以

可以看到,PreDataLoader 中的线程是访问了主程序的数据了的,如果用进程,一是编程比较麻烦,二是效率也未必就高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/197503.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pg数据库备库为什么要在线恢复

在线恢复是 PostgreSQL 和 pgpool-II 环境中一种重要的功能,它允许你在不中断服务的情况下,重新同步或恢复一个陈旧或者损坏的备库(副本)。在线恢复特别重要,因为它能够保持高可用性和最小化停机时间。这在大型数据库系…

3.5毫米音频连接器接线方式

3.5毫米音频连接器接线方式 耳机插头麦克风插头 绘制电路图注意事项 3.5毫米音频连接器分为单声道开关型和无开关型如下图: sleeve(套筒) tip(尖端) ring(环) 耳机插头 麦克风插头 绘制电路图…

【重点】【滑动窗口】76.最小覆盖子串

题目 思路参考《算法小抄》 class Solution {public String minWindow(String s, String t) {int startIndex -1, endIndex s.length(), valid 0, left 0, right 0;char[] sArray s.toCharArray();char[] tArray t.toCharArray();int[] need new int[256];int[] windo…

【软考S01计算机系统知识】E01 中央处理单元

E01 中央处理单元 计算机系统硬件基本组成中央处理单元组成功能 多核 CPU 计算机系统硬件基本组成 计算机系统由硬件和软件组成,基本硬件系统由 运算器、控制器、存储器、输入设备 和 输出设备 5大部件组成; 中央处理单元: 运算器、控制器等…

el-table分页时多选数据的保存和回显

大致思路: 把所有选择的数据全部存到一个大数组中,切页的时候匹配原数据利用ref节点的.toggleRowSelection方法进行回显 具体步骤: 1、勾选和全选时需要判断是选中还是取消,然后更新大数组数据。 2、分页获取新数据之后匹配当…

2023Q4 私有化版本发布,和鲸 ModelWhale 持续赋能大科研、高校教改的 AI for Science

作为数据科学多人协同平台,和鲸 ModelWhale 从一而终地为各级用户提供完备而周全的解决方案,覆盖数据研究、算法探索、模型调优、Python 案例教学等多个场景。特别地,如果对研究分析平台有更高的安全合规要求、希望兼容原有业务系统&#xff…

不懂编程,如何获取全面海量的重要数据?

在大数据和人工智能时代,数据的重要性变得更加突出。以下是数据在这个时代的重要性所体现的几个方面: 决策依据 模型训练 个性化服务 创新驱动 智能决策支持 本文,将介绍两个获取数据的方法 1、利用爬虫框架写采集程序 在前面&#xff…

JIRA 重建索引

JIRA为了增快搜索速度,为所有的问题的字段生成一个索引文件。这个索引文件存在磁盘的一个文件里面, 并且会实时更新。但是有时候某些操作后(例如增加自定义字段),需要重新建索引。 详情请见 Re-indexing after major c…

pg_stat_replication.state 含义

在PostgreSQL中,pg_stat_replication视图提供了有关连接到主服务器的流式复制进程(备用服务器)的信息。该视图中的一个列是state,它指示复制进程的当前状态。 state列可以具有各种值: startup: This WAL sender 刚开始运行 catc…

(华为)网络工程师教程笔记(网工教程)网工入门——3、静态路由路由表的配置

参考文章:【全236集】网络工程师从基础入门到进阶必学教程!通俗易懂,2023最新版,学完即可就业!网工入门_华为认证_HCIA_HCIP_数据通信_网工学习路线 文章目录 13. 网工入门10-静态路由(路由表的配置&#x…

spark3.x 写入hudi报错

报错信息如下: Exception in thread "main" org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20231201202516518 at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64) at org.apa…

GORM 多对多many2many 自定义连接表

文章目录 多对多 many2many表结构搭建多对多添加多对多查询多对多的删除、更新 自定义连接表生成表结构操作案例添加文章并添加标签,并自动关联添加文章,关联已有标签给已有文章关联标签替换已有文章的标签查询文章列表,显示标签 自定义连接…

在 Mac 上使用浅色或深色外观

在 Mac 上,选取苹果菜单 >“系统设置”,然后点按边栏中的“外观” 。(你可能需要向下滚动。)选择右侧的“浅色”、“深色”或“自动”。 “浅色”表示不会发生变化的浅色外观。 “深色”表示不会发生变化的深色外观。“深色模式…

JVM中 Minor GC 和 Full GC 的区别

Java中的垃圾回收(Garbage Collection, GC)是自动内存管理的一部分,其主要职责是识别并清除程序中不再使用的对象来释放内存。Java虚拟机(JVM)在运行时进行垃圾回收,主要分为两种类型:Minor GC和…

uniapp 之 短信验证码登录

一、需求 输入手机号码&#xff0c;可以获取验证码。 二、实现效果 点击前&#xff1a; 点击后&#xff1a; 三、代码实现 <template><view class"login"><view class"infobox"><view class"item"><input type…

跟着GPT学习shell脚本,理论与实践相结合的学习计划。(二)

第9周&#xff1a;项目实战 - 实现一个完整的Shell脚本项目 学习目标 应用所学的Shell脚本知识来实现一个实际项目。从规划到实现&#xff0c;经历完整的项目开发流程。 项目建议&#xff1a;自动化服务器健康检查脚本 项目描述&#xff1a; 开发一个Shell脚本&#xff0c;…

使用Java语言判断一个数据类型是奇数还是偶数

判断一个数字类型是奇数&#xff0c;还是偶数&#xff0c;只需要引入Scanner类&#xff0c;然后按照数据类型的定义方式进行定义&#xff0c;比较是按照与2进行整除后的结果&#xff1b;如果余数为零&#xff0c;则代表为偶数&#xff0c;否则为奇数。 import java.util.Scann…

一起学习云计算

目录 前言 一、云计算是什么&#xff1f; 二、云计算的组成 三、交付模型 四、 云部署模式 前言 随着经济社会的迅速发展&#xff0c;人们对于网络资源的要求量也越来越高&#xff0c;随之出现的一系列网络平台及服务也越来越多&#xff0c;对于云计算的出现提供了必…

★136. 只出现一次的数字(位运算)

136. 只出现一次的数字 这个题主要考察的知识点是位运算&#xff08;这里是异或&#xff09; 如果不要求空间复杂度为O&#xff08;1&#xff09;&#xff0c;那有很多方法。但是这里有这样的要求。 可以通过位运算 的方法来实现。 异或运算 ⊕有以下三个性质&#xff1a; 任…

Mysql中的正经行锁、间隙锁和临键锁

行锁、间隙锁和临键锁是数据库中的三种不同类型的锁&#xff0c;三者都属于行锁&#xff0c;第一个一般叫他正经的行锁&#xff08;《Mysql是怎样运行的》一书中的说法&#xff09;。 行锁&#xff08;Row Lock&#xff09;&#xff1a;行锁是指对数据表中的某一行进行的锁定操…