Pytorch 多卡并行(1)—— 原理简介和 DDP 并行实践

  • 近年来,深度学习模型的规模越来越大,需要处理的数据也越来越多,单卡训练的显存空间和计算效率都越来越难以满足需求。因此,多卡并行训练成为了一个必要的解决方案
  • 本文主要介绍使用 Pytorch 的 DistributedDataParallel(DDP)库进行分布式数据并行训练的方法

文章目录

  • 1. 多卡并行简介
    • 1.1 两种并行形式
    • 1.2 Pytorch 中的多卡并行
  • 2. 使用 DDP 进行单机多卡训练
    • 2.1 原理概述
    • 2.2 使用 DDP 改写单卡训练代码

1. 多卡并行简介

  • 多卡并行训练主要用于解决以下几个问题:
    1. 相同 batch size 下加速训练:多卡并行可以将数据分为多份同时在不同的GPU上运行,从而大大加快训练速度
    2. 相同速度下使用更大的 batch size:多卡并行可以在多个GPU之间共享显存,允许我们设置更大的 batch size
    3. 增加可训练的模型规模:有些模型参数多到单卡训练无法承受,而多卡并行可以将模型放入多个GPU中,从而扩充可训练模型的规模

1.1 两种并行形式

  • 多卡并行训练有数据并行和模型并行两种形式
    在这里插入图片描述

    1. 数据并行:每个GPU都保存一个模型副本,训练数据划分成多份交给各个GPU计算梯度,然后汇总梯度更新模型参数。根据梯度汇总的方式,数据并行又可以分成 Parameter ServerRing All-Reduce 两种,前者使用一个 master GPU 汇总梯度更新参数,再将参数分发给各个模型;后者以环的形式互相传递梯度,每个GPU都维护一个优化器,各自汇总梯度并自行更新模型参数。Ring All-Reduce 方案能更高效地利用所有卡的上下行带宽,是目前的主流方案
      在这里插入图片描述

    2. 模型并行:将模型切分成多个部分放在不同的GPU上并行运行,每个GPU负责处理一部分模型参数,并将处理后的结果发送到其它GPU进行合并,从而实现整体模型的更新。这种操作目前并不常见,一是因为大部分模型单卡都放得下,二是因为通讯开销比数据并行多。根据模型切分方式,模型并行也可以分成 Pipelined ParallelismTensor Slicing 两种,前者将模型的各个层放到不同的 GPU 上运行,这种做法比较通用,但是效率不高;后者针对模型中各种模块(attention、FFN 等)的张量计算操作进行拆解,把 tensor 计算分块分散到不同的机器上进行并行,效率较高但是通用性差
      在这里插入图片描述

  • 关于各种并行方法的详细说明可以参考:分布式训练、混合精度训练、梯度累加…一文带你优雅地训练大型模型

1.2 Pytorch 中的多卡并行

  • 随着各种深度学习框架的日趋完善,很多并行方法已经被整合其中,这让实现多卡并行加速训练变得相对简单。Pytorch 中提供了 DP(DataParallel) 和 DDP(DistributedDataParallel) 两种数据并行方法,它们的性能对比如下
    在这里插入图片描述
    红色柱子是 DP,绿色柱子是 DDP,蓝色柱子是 DDP + Apex 混合精度训练。注意到 DDP 的表现大幅优于 DP,这是因为
    1. DP 使用 Parameter Server 方式汇聚梯度并更新参数,主卡计算负载和通信带宽需求相比其他卡都显著高,导致主卡的计算能力和上下行带宽成为性能瓶颈;
    2. DDP 使用更高效的 Ring All-Reduce 方案,基本实现了 “使用几块GPU就是几倍加速” 的效果
  • 接下来本文会介绍使用 DDP 进行多卡加速的具体做法,参考自:Pytorch 官方教程

2. 使用 DDP 进行单机多卡训练

2.1 原理概述

  • DDP 会在每个 GPU 上运行一个进程,每个进程中都有一套完全相同的 Trainer 副本(包括 model 和 optimizer),各个进程之间通过一个进程池进行通信。这里有几个术语
    1. node:多机多卡运行时,每个机器称为一个 “node”,其中每一张卡都可以运行一个并行进程
    2. world size:所有并行进程的总数,各个 node 上并行的GPU总数
    3. rank:所有 node 的所有进程中,各个进程的标识符号,是从0开始计数的整数
    4. local rank:当前 node 的所有进程中,各个进程的标识符号,是从0开始计数的整数
    5. group:所有并行的进程组成一个 group(进程池),只有组内的进程间才可以相互通信

2.2 使用 DDP 改写单卡训练代码

  • 考虑如何将以下单机单卡代码改为 DDP 单机多卡运行

    # 单 GPU 训练示例
    import torch
    import torch.nn.functional as F
    from torch.utils.data import Dataset, DataLoaderclass Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int, ) -> None:self.gpu_id = gpu_idself.model = model.to(gpu_id)self.train_data = train_dataself.optimizer = optimizerself.save_every = save_everydef _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")for source, targets in self.train_data:source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp = self.model.state_dict()PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)if epoch % self.save_every == 0:self._save_checkpoint(epoch)class MyTrainDataset(Dataset):def __init__(self, size):self.size = sizeself.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]def __len__(self):return self.sizedef __getitem__(self, index):return self.data[index]def load_train_objs():train_set = MyTrainDataset(2048)  # load your datasetmodel = torch.nn.Linear(20, 1)  # load your modeloptimizer = torch.optim.SGD(model.parameters(), lr=1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=True)def main(device, total_epochs, save_every, batch_size):dataset, model, optimizer = load_train_objs()train_data = prepare_dataloader(dataset, batch_size)trainer = Trainer(model, train_data, optimizer, device, save_every)trainer.train(total_epochs)if __name__ == "__main__":import argparseparser = argparse.ArgumentParser(description='simple distributed training job')parser.add_argument('--total-epochs', type=int, default=50, help='Total epochs to train the model')parser.add_argument('--save-every', type=int, default=10, help='How often to save a snapshot')parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')args = parser.parse_args()device = 0  # shorthand for cuda:0main(device, args.total_epochs, args.save_every, args.batch_size)
    
  • 将单卡训练代码改写为 DDP 并行的要点如下

    1. 引入 DDP 相关库
      # 使用 DistributedDataParallel 进行单机多卡训练
      import torch
      import torch.nn.functional as F
      from torch.utils.data import Dataset, DataLoader
      import os# 对 python 多进程的一个 pytorch 包装,用于后续分发进程
      import torch.multiprocessing as mp
      # 这个 sampler 可以把采样的数据分散到各个 CPU 上                                      
      from torch.utils.data.distributed import DistributedSampler     
      # 实现分布式数据并行的核心类        
      from torch.nn.parallel import DistributedDataParallel as DDP         
      # 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
      from torch.distributed import init_process_group, destroy_process_group 
      
    2. 在程序入口初始化进程池;在程序出口销毁进程池
      def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):# 初始化进程池ddp_setup(rank, world_size)# 进行训练dataset, model, optimizer = load_train_objs()train_data = prepare_dataloader(dataset, batch_size)trainer = Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()
      
    3. 使用 DistributedDataParallel 包装模型,这样模型才能在各个进程间同步参数
      self.model = DDP(model, device_ids=[gpu_id])    # model 要用 DDP 包装一下
      
      包装后 model 变成了一个 DDP 对象,要访问其参数得这样写 self.model.module.state_dict()
    4. 构造 Dataloader 时使用 DistributedSampler 作为 sampler,这个采样器可以自动将数量为 batch_size 的数据分发到各个GPU上,并保证数据不重叠
      def prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠)
      
      注意需要在各 epoch 入口调用该 sampler 对象的 set_epoch() 方法,否则每个 epoch 加载的样本顺序都不变
    5. 运行过程中单独控制某个进程进行某些操作,比如要想保存 ckpt,由于每张卡里都有完整的模型参数,所以只需要控制一个进程保存即可。需要注意的是:使用 DDP 改写的代码会在每个 GPU 上各自运行,因此需要在程序中获取当前 GPU 的 rank(gpu_id),这样才能对针对性地控制各个 GPU 的行为
      if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)
      
    6. 使用 torch.multiprocessing.spawn 方法将代码分发到各个 GPU 的进程中执行
      # 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args
      # 注意不需要传入 fn 的 rank 参数,它由 mp.spawn 自动分配
      world_size = torch.cuda.device_count()
      mp.spawn(fn=main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size
      )
      
  • 完整的修改版代码如下,请参考注释自行对比

    # 使用 DistributedDataParallel 进行单机多卡训练
    import torch
    import torch.nn.functional as F
    from torch.utils.data import Dataset, DataLoader
    import os# 对 python 多进程的一个 pytorch 包装
    import torch.multiprocessing as mp# 这个 sampler 可以把采样的数据分散到各个 CPU 上                                      
    from torch.utils.data.distributed import DistributedSampler     # 实现分布式数据并行的核心类        
    from torch.nn.parallel import DistributedDataParallel as DDP         # DDP 在每个 GPU 上运行一个进程,其中都有一套完全相同的 Trainer 副本(包括model和optimizer)
    # 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
    from torch.distributed import init_process_group, destroy_process_group def ddp_setup(rank, world_size):"""setup the distribution process groupArgs:rank: Unique identifier of each processworld_size: Total number of processes"""# MASTER Node(运行 rank0 进程,多机多卡时的主机)用来协调各个 Node 的所有进程之间的通信os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhostos.environ["MASTER_PORT"] = "12355"     # 任意空闲端口init_process_group(backend="nccl",                     # Nvidia CUDA CPU 用这个 "nccl"rank=rank,                          world_size=world_size)torch.cuda.set_device(rank)class Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int,) -> None:self.gpu_id = gpu_idself.model = model.to(gpu_id)self.train_data = train_dataself.optimizer = optimizerself.save_every = save_every                    # 指定保存 ckpt 的周期self.model = DDP(model, device_ids=[gpu_id])    # model 要用 DDP 包装一下def _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")self.train_data.sampler.set_epoch(epoch)        # 在各个 epoch 入口调用 DistributedSampler 的 set_epoch 方法是很重要的,这样才能打乱每个 epoch 的样本顺序for source, targets in self.train_data: source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp = self.model.module.state_dict()            # 由于多了一层 DDP 包装,通过 .module 获取原始参数 PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)# 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 ckpt 以免重复保存if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)class MyTrainDataset(Dataset):def __init__(self, size):self.size = sizeself.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]def __len__(self):return self.sizedef __getitem__(self, index):return self.data[index]def load_train_objs():train_set = MyTrainDataset(2048)  # load your datasetmodel = torch.nn.Linear(20, 1)  # load your modeloptimizer = torch.optim.SGD(model.parameters(), lr=1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠)def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):# 初始化进程池ddp_setup(rank, world_size)# 进行训练dataset, model, optimizer = load_train_objs()train_data = prepare_dataloader(dataset, batch_size)trainer = Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser(description='simple distributed training job')parser.add_argument('--total-epochs', type=int, default=50, help='Total epochs to train the model')parser.add_argument('--save-every', type=int, default=10, help='How often to save a snapshot')parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')args = parser.parse_args()world_size = torch.cuda.device_count()# 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args# 注意不需要 fn 的 rank 参数,它由 mp.spawn 自动分配mp.spawn(fn=main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/76020.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

合宙Air724UG LuatOS-Air LVGL API控件-表格(Table)

表格(Table) 示例代码 --创建表格Table1 lvgl.table_create(lvgl.scr_act(),nil)--设置表格为4行5列lvgl.table_set_row_cnt(Table1,4)lvgl.table_set_col_cnt(Table1,5)--给每个单元格赋值lvgl.table_set_cell_value(Table1, 0, 0, "选手")l…

Android之RecyclerView仿ViewPage滑动

文章目录 前言一、效果图二、实现步骤1.xml主布局2.所有用到的drawable资源文件3.xml item布局4.adapter适配器5.javabean实体类6.activity使用 总结 前言 我们都知道ViewPageFragment滑动,但是的需求里面已经有了这玩意,但是在Fragment中还要有类似功能…

基于3D扫描和3D打印的产品逆向工程实战【数字仪表】

逆向工程是一种从物理零件创建数字设计的强大方法,并且可以与 3D 扫描和 3D 打印等技术一起成为原型设计工具包中的宝贵工具。 推荐:用 NSDT编辑器 快速搭建可编程3D场景 3D 扫描仪可以非常快速地测量复杂的物体,并且在涉及现实生活参考时可以…

花生壳内网穿透+Windows系统,如何搭建网站?

1. 准备工作 在百度搜索“Win7下安装ApachePHPMySQL”,根据搜到的教程自行安装WAMP环境。 如果在网页上键入http://127.0.0.1/ 出现以下页面表示您的服务器已经建好,下一步就是关键,如何通过花生壳内网穿透,让外网的用户访问到您…

设计模式 - 责任链

一、前言 ​ 相信大家平时或多或少都间接接触过责任链设计模式,只是可能有些同学自己不知道此处用的是该设计模式,比如说 Java Web 中的 Filter 过滤器,就是非常经典的责任链设计模式的例子。 那么什么是责任链设计模式呢? ​ …

大数据课程L6——网站流量项目的SparkStreaming

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 了解网站流量项目的SparkStreaming概述; ⚪ 掌握网站流量项目的SparkStreaming实现 Wordcount 底层流程; ⚪ 掌握网站流量项目的SparkStreaming实现历史批次的累积处理; ⚪ 掌握网站流…

快速学会git版本管理——上传gitee仓库

首先在gitee右上角有一个新建仓库 创建之后打开自己想要上传的文件 右键打开 Git Bash Here 接下来会弹出git的窗口 首先先初始化仓库 用git命令 git init 然后用git add . 上传所有文件上传到暂存区(上一篇文章说过add是单个文件,add . 是所有文件) 没有显示错误 …

OpenCV_CUDA_VS编译安装

一、OpenCV 我这里是下载的OpenCV4.5.4,但是不知道到在vs里面build时一直报错,后面换了4.7.0的版本测试,安装成功。 Release OpenCV 4.5.4 opencv/opencv GitHub 这个里面有官方预编译好的OpenCV库,可以直接食用。 扩展包&am…

SQL4 查询结果限制返回行数

描述 题目:现在运营只需要查看前2个用户明细设备ID数据,请你从用户信息表 user_profile 中取出相应结果。 示例: iddevice_idgenderageuniversityprovince12138male21北京大学Beijing23214male复旦大学Shanghai36543female20北京大学Beijin…

设计模式(1) - UML类图

1、前言 从这一节开始,我们将一起学习设计模式。我们的学习目标是什么呢? 了解常用设计模式以及它们的使用场景;分析实际工程中设计模式的使用,揣摩实际意图,了解作者设计思路;尝试运用设计模式迭代、重构…

css transition 指南

css transition 指南 在本文中&#xff0c;我们将深入了解 CSS transition&#xff0c;以及如何使用它们来创建丰富、精美的动画。 基本原理 我们创建动画时通常需要一些动画相关的 CSS。 下面是一个按钮在悬停时移动但没有动画的示例&#xff1a; <button class"…

MySQL下载安装环境变量配置,常用命令

一、下载安装 mysql官网 下载连接 这个是下载图形安装 https://dev.mysql.com/downloads/installer/ 这个是下载免图形安装 https://dev.mysql.com/downloads/mysql/ 担心个别宝宝没有账号&#xff0c;这边也提供一下&#xff0c;方便下载&#xff1a; 账户&#xff1a;1602404…

算法基础-数学知识-容斥原理、博弈论

容斥原理、博弈论 容斥原理890. 能被整除的数&#xff08;二进制状态压缩版本&#xff0c;复杂度多一个Om&#xff09;890. 能被整除的数&#xff08;dfs版本&#xff09; 博弈论无限制nim游戏AcWing 891. Nim游戏AcWing 892. 台阶-Nim游戏&#xff08;待补&#xff09; 集合版…

Linux中防火墙的简单使用方法

目录 前言 ​编辑 一、概念 1、防火墙的分类&#xff1a; 2、防火墙性能 3、硬件防火墙的品牌、软件防火墙的品牌 4、硬件防火墙与软件防火墙比较 二、linux中的防火墙 1、iptables 2.netfilter/iptables功能 3、四表 iptables中表的优先级 4、五链 三、iptables…

数字化转型背景下企业知识管理能力提升路径

近年来&#xff0c;科技不断进步&#xff0c;颠覆性技术&#xff08;例如 5G、云计算、物联网、大数据分析和人工智能等&#xff09;正在重新定义企业如何管理项目和运营效率。知识管理体系亦需要随着科技的进步而改变&#xff0c;以适应新的数字时代环境&#xff0c;并且高效知…

说说MySQL回表查询与覆盖索引

分析&回答 什么是回表查询&#xff1f; 通俗的讲就是&#xff0c;如果索引的列在 select 所需获得的列中&#xff08;因为在 mysql 中索引是根据索引列的值进行排序的&#xff0c;所以索引节点中存在该列中的部分值&#xff09;或者根据一次索引查询就能获得记录就不需要…

从零开始搭建vite4.0-vue3.0项目

目录 前言 项目地址 项目初始化 git初始化 别名配置 解决vscode报错 vue-router安装 pinia安装 环境配置 axios安装 element-plus按需引入 eslint与prettier安装 scss安装 stylelint配置 代码提交规范配置 husky与lint-stage配置 前言 pnpm和npm的命令行完全一…

FastChat

Fast Chat是一个用于训练/部署和评估基于大型语言模型的聊天机器人的开发平台。其核心功能包括&#xff1a; 最先进模型的权重/训练代码和评估代码(例如Vicuna/FastChat-T5)基于分布式多模型的服务系统&#xff0c;具有Web界面和与OpenAI兼容的RESTful API。 安装 pip instal…

在Cisco设备上配置接口速度和双工

默认情况下&#xff0c;思科交换机将自动协商速度和双工设置。将设备&#xff08;交换机、路由器或工作站&#xff09;连接到 Cisco 交换机上的端口时&#xff0c;将发生协商过程&#xff0c;设备将就传输参数达成一致&#xff0c;当今的大多数网络适配器都支持此功能。 在本文…

八路DI八路DO,开关量远程IO模块,Modbus TCP数据采集模块 YL90-RJ45

特点&#xff1a; ● 八路开关量输入&#xff0c;八路开关量输出 ● DI状态变化自动发送状态数据&#xff0c;可以捕获脉冲 ● 采用Socket自由协议编程简单、轻松应用 ● 开关量毫秒级响应速度适应多种场合 ● 内置网页功能&#xff0c;可以通过网页查询与控制 ● 同时也…