用 PyTorch 编写分布式应用程序

用 PyTorch 编写分布式应用程序

在这个简短的教程中,我们将介绍 PyTorch 的分布式软件包。 我们将了解如何设置分布式设置,使用不同的交流策略以及如何仔细查看软件包的内部结构。

设定

PyTorch 中包含的分布式软件包(即torch.distributed)使研究人员和从业人员可以轻松地并行化他们在跨进程和机器集群的计算。 为此,它利用了传递消息的语义,从而允许每个进程将数据传递给其他任何进程。 与并行处理(HTG1)包相反,进程可以使用不同的通信后端,而不仅限于在同一台计算机上执行。

为了开始,我们需要能够同时运行多个进程的能力。 如果您有权访问计算群集,则应咨询本地系统管理员或使用您喜欢的协调工具。 (例如 pdsh , clustershell 或其他)。出于本教程的目的,我们将使用以下模板使用一台计算机并分叉多个进程。

"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Processdef run(rank, size):""" Distributed function to be implemented later. """passdef init_process(rank, size, fn, backend='gloo'):""" Initialize the distributed environment. """os.environ['MASTER_ADDR'] = '127.0.0.1'os.environ['MASTER_PORT'] = '29500'dist.init_process_group(backend, rank=rank, world_size=size)fn(rank, size)if __name__ == "__main__":size = 2processes = []for rank in range(size):p = Process(target=init_process, args=(rank, size, run))p.start()processes.append(p)for p in processes:p.join()

上面的脚本产生了两个进程,每个进程将设置分布式环境,初始化进程组(dist.init_process_group),最后执行给定的run函数。

让我们看一下init_process功能。 它确保每个进程将能够使用相同的 IP 地址和端口通过主机进行协调。 请注意,我们使用了gloo后端,但其他后端也可用。 (请参阅 5.1 节),我们将在本教程的结尾部分介绍dist.init_process_group中发生的魔术,但实际上,它允许进程通过共享位置相互进行通信。

点对点通讯

发送和接收

数据从一个进程到另一个进程的传输称为点对点通信。 这些是通过sendrecv功能或它们的_直接_对应部分isendirecv实现的。

"""Blocking point-to-point communication."""def run(rank, size):tensor = torch.zeros(1)if rank == 0:tensor += 1# Send the tensor to process 1dist.send(tensor=tensor, dst=1)else:# Receive tensor from process 0dist.recv(tensor=tensor, src=0)print('Rank ', rank, ' has data ', tensor[0])

在上面的示例中,两个进程都从零张量开始,然后进程 0 递增张量并将其发送到进程 1,以便它们都以 1.0 结尾。 请注意,进程 1 需要分配内存以存储它将接收的数据。

另请注意,send / recv阻塞:两个过程都停止,直到通信完成。 另一方面,无阻塞; 脚本继续执行,方法返回Work对象,我们可以选择wait()对象。

"""Non-blocking point-to-point communication."""def run(rank, size):tensor = torch.zeros(1)req = Noneif rank == 0:tensor += 1# Send the tensor to process 1req = dist.isend(tensor=tensor, dst=1)print('Rank 0 started sending')else:# Receive tensor from process 0req = dist.irecv(tensor=tensor, src=0)print('Rank 1 started receiving')req.wait()print('Rank ', rank, ' has data ', tensor[0])

使用立即数时,我们必须谨慎使用已发送和已接收的张量。 由于我们不知道何时将数据传递给其他进程,因此在req.wait()完成之前,我们既不应该修改发送的张量也不应该访问接收的张量。 换一种说法,

  • dist.isend()之后写入tensor将导致不确定的行为。
  • dist.irecv()之后从tensor读取将导致不确定的行为。

但是,在执行req.wait()之后,我们可以确保进行了通信,并且tensor[0]中存储的值为 1.0。

当我们希望对流程的通信进行精细控制时,点对点通信非常有用。 它们可用于实现精美的算法,例如百度的 DeepSpeech 或 Facebook 的大规模实验中使用的算法。(请参阅 4.1 节)

集体交流

与点对点通信相反,集合允许跨中所有进程的通信模式。 小组是我们所有过程的子集。 要创建组,我们可以将等级列表传递给dist.new_group(group)。 默认情况下,集合在所有进程(也称为世界)上执行。 例如,为了获得所有过程中所有张量的总和,我们可以使用dist.all_reduce(tensor, op, group)集合。

""" All-Reduce example."""
def run(rank, size):""" Simple point-to-point communication. """group = dist.new_group([0, 1])tensor = torch.ones(1)dist.all_reduce(tensor, op=dist.reduce_op.SUM, group=group)print('Rank ', rank, ' has data ', tensor[0])

由于我们需要组中所有张量的总和,因此我们将dist.reduce_op.SUM用作化简运算符。 一般来说,任何可交换的数学运算都可以用作运算符。 PyTorch 开箱即用,带有 4 个这样的运算符,它们都在元素级运行:

  • dist.reduce_op.SUM
  • dist.reduce_op.PRODUCT
  • dist.reduce_op.MAX
  • dist.reduce_op.MIN

除了dist.all_reduce(tensor, op, group)之外,PyTorch 中目前共有 6 个集合体。

  • dist.broadcast(tensor, src, group):将tensorsrc复制到所有其他进程。
  • dist.reduce(tensor, dst, op, group):将op应用于所有tensor,并将结果存储在dst中。
  • dist.all_reduce(tensor, op, group):与 reduce 相同,但是结果存储在所有进程中。
    (img/56c0ad0f1ad17f666308d58b7389493c.jpg)]过程。
  • dist.gather(tensor, dst, gather_list, group):从dst中的所有进程复制tensor
  • dist.all_gather(tensor_list, tensor, group):将所有进程中的tensor从所有进程复制到tensor_list
  • dist.barrier(group):阻止<cite>组</cite>中的所有进程,直到每个进程都进入此功能。

分布式训练

**注意:**您可以在此 GitHub 存储库的中找到本节的示例脚本。

现在我们了解了分布式模块的工作原理,让我们用它编写一些有用的东西。 我们的目标是复制 DistributedDataParallel 的功能。 当然,这将是一个教学示例,在现实世界中,您应该使用上面链接的经过官方测试,优化的最佳版本。

很简单,我们想要实现随机梯度下降的分布式版本。 我们的脚本将允许所有进程在其数据批次上计算其模型的梯度,然后平均其梯度。 为了在更改进程数时确保相似的收敛结果,我们首先必须对数据集进行分区。 (您也可以使用 tnt.dataset.SplitDataset 代替下面的代码段。)

""" Dataset partitioning helper """
class Partition(object):def __init__(self, data, index):self.data = dataself.index = indexdef __len__(self):return len(self.index)def __getitem__(self, index):data_idx = self.index[index]return self.data[data_idx]class DataPartitioner(object):def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):self.data = dataself.partitions = []rng = Random()rng.seed(seed)data_len = len(data)indexes = [x for x in range(0, data_len)]rng.shuffle(indexes)for frac in sizes:part_len = int(frac * data_len)self.partitions.append(indexes[0:part_len])indexes = indexes[part_len:]def use(self, partition):return Partition(self.data, self.partitions[partition])

使用上面的代码片段,我们现在可以使用以下几行简单地对任何数据集进行分区:

""" Partitioning MNIST """
def partition_dataset():dataset = datasets.MNIST('./data', train=True, download=True,transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))]))size = dist.get_world_size()bsz = 128 / float(size)partition_sizes = [1.0 / size for _ in range(size)]partition = DataPartitioner(dataset, partition_sizes)partition = partition.use(dist.get_rank())train_set = torch.utils.data.DataLoader(partition,batch_size=bsz,shuffle=True)return train_set, bsz

假设我们有 2 个副本,则每个进程的train_set为 60000/2 = 30000 个样本。 我们还将批量大小除以副本数,以使_整体_批量大小保持为 128。

现在,我们可以编写通常的向前-向后优化训练代码,并添加一个函数调用以平均模型的梯度。 (以下内容主要是受 PyTorch MNIST 官方示例的启发)。

""" Distributed Synchronous SGD Example """
def run(rank, size):torch.manual_seed(1234)train_set, bsz = partition_dataset()model = Net()optimizer = optim.SGD(model.parameters(),lr=0.01, momentum=0.5)num_batches = ceil(len(train_set.dataset) / float(bsz))for epoch in range(10):epoch_loss = 0.0for data, target in train_set:optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)epoch_loss += loss.item()loss.backward()average_gradients(model)optimizer.step()print('Rank ', dist.get_rank(), ', epoch ',epoch, ': ', epoch_loss / num_batches)

仍然需要执行average_gradients(model)函数,该函数只需要一个模型并在整个世界上平均其梯度即可。

""" Gradient averaging. """
def average_gradients(model):size = float(dist.get_world_size())for param in model.parameters():dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)param.grad.data /= size

! 我们成功实现了分布式同步 SGD,并且可以在大型计算机集群上训练任何模型。

**注意:**尽管从技术上来说最后一句话是是正确的,但要实现同步 SGD 的生产级实现,还需要更多技巧。 同样,请使用经过测试和优化的。

我们自己的环减少

另一个挑战是,假设我们要实现 DeepSpeech 的高效环网减少。 使用点对点集合很容易实现。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):rank = dist.get_rank()size = dist.get_world_size()send_buff = th.zeros(send.size())recv_buff = th.zeros(send.size())accum = th.zeros(send.size())accum[:] = send[:]left = ((rank - 1) + size) % sizeright = (rank + 1) % sizefor i in range(size - 1):if i % 2 == 0:# Send send_buffsend_req = dist.isend(send_buff, right)dist.recv(recv_buff, left)accum[:] += recv[:]else:# Send recv_buffsend_req = dist.isend(recv_buff, right)dist.recv(send_buff, left)accum[:] += send[:]send_req.wait()recv[:] = accum[:]

在上面的脚本中,allreduce(send, recv)函数的签名与 PyTorch 中的签名略有不同。 它需要一个recv张量,并将所有send张量的总和存储在其中。 作为练习留给读者,我们的版本与 DeepSpeech 中的版本之间仍然有一个区别:它们的实现将梯度张量划分为_个块_,以便最佳地利用通信带宽。 (提示: torch.chunk)

进阶主题

现在,我们准备发现torch.distributed的一些更高级的功能。 由于涉及的内容很多,因此本节分为两个小节:

  1. 通讯后端:我们在这里学习如何使用 MPI 和 Gloo 进行 GPU-GPU 通讯。
  2. 初始化方法:我们了解如何最好地设置dist.init_process_group()中的初始协调阶段。

通讯后端

torch.distributed最优雅的方面之一是它具有抽象能力,并且可以在不同的后端之上构建。 如前所述,目前在 PyTorch 中实现了三个后端:Glo,NCCL 和 MPI。 它们各自具有不同的规格和权衡,具体取决于所需的用例。 可以在中找到支持功能的对照表。

Gloo 后端

到目前为止,我们已经广泛使用 Gloo 后端。 它作为开发平台非常方便,因为它已包含在预编译的 PyTorch 二进制文件中,并且可在 Linux(自 0.2 开始)和 macOS(自 1.3 开始)上运行。 它支持 CPU 上的所有点对点和集合操作,以及 GPU 上的所有集合操作。 CUDA 张量的集体运算的实现未像 NCCL 后端提供的那样优化。

如您所知,如果将model放在 GPU 上,我们的分布式 SGD 示例将无法正常工作。 为了使用多个 GPU,让我们还进行以下修改:

  1. 使用device = torch.device("cuda:{}".format(rank))
  2. model = Net() [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-t748GofX-1692404941159)(img/84b92171e855642e5bbfa912f0cdb093.jpg)] model = Net().to(device)
  3. 使用data, target = data.to(device), target.to(device)

经过上述修改,我们的模型现在可以在两个 GPU 上训练,您可以使用watch nvidia-smi监视其使用情况。

MPI 后端

消息传递接口(MPI)是来自高性能计算领域的标准化工具。 它允许进行点对点和集体通信,并且是torch.distributed API 的主要灵感。 存在几种针对不同目的而优化的 MPI 实现(例如 Open-MPI , MVAPICH2 , Intel MPI)。 使用 MPI 后端的优势在于 MPI 在大型计算机群集上的广泛可用性和高水平的优化。 一些 最近的 实现也能够利用 CUDA IPC 和 GPU Direct 技术来避免通过 CPU 进行内存复制。

不幸的是,PyTorch 的二进制文件不能包含 MPI 实现,我们将不得不手动对其进行重新编译。 幸运的是,鉴于编译后,PyTorch 会单独查看以查找可用的 MPI 实现,因此此过程相当简单。 以下步骤通过从源安装 PyTorch 来安装 MPI 后端。

  1. 创建并激活您的 Anaconda 环境,按照指南的要求安装所有先决条件,但是尚未运行。
  2. 选择并安装您喜欢的 MPI 实现。 请注意,启用支持 CUDA 的 MPI 可能需要一些其他步骤。 在我们的情况下,我们将坚持不支持 GPU 的 Open-MPI conda install -c conda-forge openmpi
  3. 现在,转到克隆的 PyTorch 存储库并执行python setup.py install

为了测试我们新安装的后端,需要进行一些修改。

  1. if __name__ == '__main__':下的内容替换为init_process(0, 0, run, backend='mpi')
  2. 运行mpirun -n 4 python myscript.py

这些更改的原因是,MPI 需要在生成流程之前创建自己的环境。 MPI 也将生成自己的进程,并执行初始化方法中描述的握手,使init_process_groupranksize参数多余。 实际上,这非常强大,因为您可以将附加参数传递给mpirun,以便为每个进程定制计算资源。 (诸如每个进程的内核数量,将计算机手动分配给特定等级,以及等之类的东西。)这样做,您应该获得与其他通信后端相同的熟悉输出。

NCCL 后端

NCCL 后端提供了针对 CUDA 张量的集体操作的优化实现。 如果仅将 CUDA 张量用于集体操作,请考虑使用此后端以获得最佳性能。 NCCL 后端包含在具有 CUDA 支持的预构建二进制文件中。

初始化方法

为了完成本教程,我们来谈谈我们称为的第一个功能:dist.init_process_group(backend, init_method)。 特别是,我们将介绍负责每个过程之间初始协调步骤的不同初始化方法。 这些方法使您可以定义协调方式。 根据您的硬件设置,这些方法之一自然应该比其他方法更合适。 除了以下各节之外,您还应该查看官方文档。

环境变量

在本教程中,我们一直在使用环境变量初始化方法。 通过在所有计算机上设置以下四个环境变量,所有进程将能够正确连接到主服务器,获取有关其他进程的信息,最后与它们握手。

  • MASTER_PORT:计算机上的空闲端口,它将托管等级为 0 的进程。
  • MASTER_ADDR:将以等级 0 托管进程的计算机的 IP 地址。
  • WORLD_SIZE:进程总数,这样主机可以知道要等待多少个工人。
  • RANK:每个进程的等级,因此他们将知道它是否是工人的主人。

共享文件系统

共享文件系统要求所有进程都有权访问共享文件系统,并将通过共享文件进行协调。 这意味着每个进程都将打开文件,写入文件信息,然后等到每个人都打开文件。 之后,所有必需的信息将可用于所有过程。 为了避免争用情况,文件系统必须通过 fcntl 支持锁定。

dist.init_process_group(init_method='file:///mnt/nfs/sharedfile',rank=args.rank,world_size=4)

TCP

通过提供等级 0 和可访问的端口号的进程的 IP 地址,可以实现通过 TCP 进行初始化。 在这里,所有工作人员都可以连接到等级为 0 的流程,并交换有关如何相互联系的信息。

dist.init_process_group(init_method='tcp://10.1.1.20:23456',rank=args.rank,world_size=4)

致谢

我要感谢 PyTorch 开发人员在实现,文档和测试方面做得如此出色。 当代码不清楚时,我总是可以依靠文档或测试来找到答案。 我要特别感谢 Soumith Chintala,Adam Paszke 和 Natalia Gimelshein 提供的有见地的评论并回答了有关初稿的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/45357.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++中的运算符总结(4):逻辑运算符(上)

C中的运算符总结&#xff08;4&#xff09;&#xff1a;逻辑运算符&#xff08;上&#xff09; 8、逻辑运算 NOT、 AND、 OR 和 XOR 逻辑 NOT 运算用运算符!表示&#xff0c;用于单个操作数。表 1是逻辑 NOT 运算的真值表&#xff0c;这种运算将提供的布尔标记反转&#xff1…

【LLM评估篇】Ceval | rouge | MMLU等指标

note 一些大模型的评估模型&#xff1a;多轮&#xff1a;MTBench关注评估&#xff1a;agent bench长文本评估&#xff1a;longbench&#xff0c;longeval工具调用评估&#xff1a;toolbench安全评估&#xff1a;cvalue&#xff0c;safetyprompt等 文章目录 note常见评测benchm…

ubuntu20搭建环境使用的一下指令

1.更新源 sudo vim etc/apt/sources.listdeb http://mirrors.aliyun.com/ubuntu/ xenial main deb-src http://mirrors.aliyun.com/ubuntu/ xenial maindeb http://mirrors.aliyun.com/ubuntu/ xenial-updates main deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates…

Redis实现共享Session

Redis实现共享Session 分布式系统中&#xff0c;sessiong共享有很多的解决方案&#xff0c;其中托管到缓存中应该是最常用的方案之一。 1、引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM…

设计模式详解-迭代器模式

类型&#xff1a;行为型模式 实现原理&#xff1a;提供一种方法顺序访问一个聚合对象中各个元素, 而又无须暴露该对象的内部表示。 作用&#xff1a;用于顺序访问集合对象的元素&#xff0c;不需要知道集合对象的底层表示 解决的问题&#xff1a;不同的方式遍历整个整合对象…

openGauss学习笔记-44 openGauss 高级数据管理-存储过程

文章目录 openGauss学习笔记-44 openGauss 高级数据管理-存储过程44.1 语法格式44.2 参数说明44.3 示例 openGauss学习笔记-44 openGauss 高级数据管理-存储过程 存储过程是能够完成特定功能的SQL语句集。用户可以进行反复调用&#xff0c;从而减少SQL语句的重复编写数量&…

SpringBoot 学习(03): 弱语言的注解和SpringBoot注解的异同

弱语言代表&#xff1a;Hyperf&#xff0c;一个基于 PHP Swoole 扩展的常驻内存框架 注解概念的举例说明&#xff1b; 说白了就是&#xff0c;你当领导&#xff0c;破烂事让秘书帮你去安排&#xff0c;你只需要批注一下&#xff0c;例如下周要举办一场活动&#xff0c;秘书将方…

sql server安装报错 合成活动模板库(ATL) 失败

错误 “合成活动模板库(ATL) 规则失败“ 解决办法&#xff1a; 进入SQL Server 2008R2安装包目录找到文件&#xff1a;sqlsupport_msi&#xff0c;安装此文件之后&#xff0c;再安装SQL Server&#xff0c;便可解决该问题。C:\SQL Server 2008R2\new\SQL Server 2008R2\2052_CH…

java Spring Boot yml多环境拆分文件管理优化

上文 java Spring Boot yml多环境配置 我们讲了多环境开发 但这种东西都放在一起 还是非常容易暴露信息的 并且对维护来讲 也不是非常的友好 这里 我们在resources下创建三个文件 分别叫 application-pro.yml application-dev.yml application-test.yml 我们直接将三个环境 转…

web在线编辑器(vue版)

目录 前言一、monaco-editor1、源码2、体积优化 二、ace-editor&#xff1f;1、源码2、体积优化 总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 例如&#xff1a;随着人工智能的不断发展&#xff0c;机器学习这门技术也越来越重要&#xff0c;很多…

Android 广播发送流程分析

在上一篇文章中Android 广播阻塞、延迟问题分析方法讲了广播阻塞的分析方法&#xff0c;但是分析完这个问题&#xff0c;自己还是有一些疑问&#xff1a; 广播为啥会阻塞呢&#xff1f;发送给接收器就行了&#xff0c;为啥还要等着接收器处理完才处理下一个&#xff1f;由普通…

esp-idf的电源管理——sleep功能

目录 1 light sleep的时间补偿2 light sleep前的时钟校准3 为什么Flash下电比较特殊4 light sleep流程概览4.1 电源域的控制4.2 唤醒源的设置4.3 esp_light_sleep_start浏览4.4 esp_sleep_start浏览4.5 rtc sleep概览4.6 light sleep的时间调整5 最后看一下deep sleep1 light s…

2308C++协程流程5

参考 #include <协程> #include "简异中.cpp" //用来中文定义的.构 任务{构 承诺型{任务 取中(){中{协柄<承诺型>::从承诺(*本)};}从不挂起 初挂起(){中{};}从不挂起 终挂起()无异{中{};}空 中空(){ 输出<<"取协程结果\n"; }//7空 对异…

《Go 语言第一课》课程学习笔记(三)

构建模式&#xff1a;Go 是怎么解决包依赖管理问题的&#xff1f; Go 项目的布局标准是什么&#xff1f; 首先&#xff0c;对于以生产可执行程序为目的的 Go 项目&#xff0c;它的典型项目结构分为五部分&#xff1a; 放在项目顶层的 Go Module 相关文件&#xff0c;包括 go.…

kotlin的列表

在 kotlin中&#xff0c;列表是一种常见的数据结构&#xff0c;用于存储有序的元素集合。 kotlin的标准库提供了 List 接口及其实现类 ArrayList、LinkedList 等&#xff0c;以及一些扩展函数来操作和处理列表。 1.创建列表 // 创建一个可变列表 val mutableList mutableLis…

JVM前世今生之JVM内存模型

JVM内存模型所指的是JVM运行时区域&#xff0c;该区域分为两大块 线程共享区域 堆内存、方法区&#xff0c;即所有线程都能访问该区域&#xff0c;随着虚拟机和GC创建和销毁 线程独占区域 虚拟机栈、本地方法栈、程序计数器&#xff0c;即每个线程都有自己独立的区域&#…

帆软大屏2.0企业制作

&#xfffc; 数字化观点中心 / 当前页 如何从0-1制作数据大屏&#xff0c;我用大白话给你解释清楚了 文 | 商业智能BI相关文章 阅读次数&#xff1a;18,192 次浏览 2023-06-08 11:51:49 好莱坞大片《摩天营救》中有这么一个场景&#xff1a; &#xfffc; 你可以看见反派大b…

flink1.17 实现 udf scalarFunctoin get_json_object 支持 非标准化json

特色 相比官方的json_value,该函数支持非标准化json,比如v是个object,但是非标准json会外套一层引号,内部有反引号. eg: {"kkkk2": "{\"kkkk1\":\"vvvvvvv\"}" } 支持value为 100L 这种java格式的bigint. {"k":999L…

使用Nginx调用网关,然后网关调用其他微服务

问题前提&#xff1a;目前我的项目是已经搭建了网关根据访问路径路由到微服务&#xff0c;然后现在我使用了Nginx将静态资源都放在了Nginx中&#xff0c;然后我后端定义了一个接口访问一个html页面&#xff0c;但是html页面要用到静态资源&#xff0c;这个静态资源在我的后端是…

Http 状态码汇总

文章目录 Http 状态码汇总1xx&#xff08;信息性状态码&#xff09;2xx&#xff08;成功状态码&#xff09;3xx&#xff08;重定向状态码&#xff09;4xx&#xff08;客户端错误状态码&#xff09;5xx&#xff08;服务器错误状态码&#xff09; Http 状态码汇总 1xx&#xff08…