如何使用torchrun启动单机多卡DDP并行训练

如何使用torchrun启动单机多卡DDP并行训练

这是一个最近项目中需要使用的方式,新近的数据集大概在40w的规模,而且载入的原始特征都比较大(5~7M),所以准备尝试DistributedDataParallel;

主要目的有两个:

  • 1是通过并行训练有效降低训练耗时,之前也使用过单机单卡,以及单机多卡DataParallel的方式;
  • 2是通过使用并行训练的方式,倍增有效batch_size的数量,提高单卡对显存的利用率,仍然是为了加速;

类似的实现代码你可以在许多技术博客上检索到,以下是我实际实践的主要代码;

单机单卡到单机多卡的代码修改

# 导入库
import math
# import torch
from torch.utils.data import Dataset, DataLoader
import torch.distributed as dist ## DDP
from torch.utils.data.distributed import DistributedSampler ## DDP
from torch.nn.parallel import DistributedDataParallel as DDP ## DDP
from torch.distributed import init_process_group, destroy_process_group ## DDP# 设置可见的GPU
# os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
# 忽略冗余数据存在的潜在问题
# os.environ["KMP_DUPLICATE_LIB_OK"] = "True"def train():# setuplocal_rank = int(os.environ["LOCAL_RANK"]) ## DDP   init_process_group(backend="nccl")torch.cuda.set_device(local_rank)# ...# local_rank = dist.get_rank()total_rank = dist.get_world_size()device = torch.device("cuda", local_rank)# random resetseed = 1234torch.manual_seed(seed)torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)random.seed(seed)np.random.seed(seed)# model = Model().to(device)if local_rank <= 0:print(model)# model.load(local_model_path)lr *= math.sqrt(total_rank)# optimizer = ...# scheduler = ...## train_datas = Dataset("train")sampler = DistributedSampler(train_datas)# 或者自定义# sampler = MyDistributedSampler(train_data_path)train_dataloader = DataLoader(train_datas,batch_size=batch_size,num_workers=num_workers,shuffle=(sampler is None),sampler=sampler,pin_memory=True,drop_last=True,)train_dataloaders = [train_dataloader]# warp modelmodel = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)model = DDP(model, device_ids=[local_rank], output_device=local_rank, find_unused_parameters=True)for epoch in range(max_epoch):e_INDEX = train_epoch(epoch, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler)dist.destroy_process_group()def train_epoch(epoch, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler):model.train()if local_rank >= 0:torch.distributed.barrier()# 如果使用了自定义的Sampler 记得实现一个对应set_epoch方法功能的新方法,并由采样器进行调用if local_rank >= 0:for i, train_dataloader in enumerate(train_dataloaders):# train_dataloader.sampler.set_epoch(epoch)train_dataloader.sampler.shuffle_data_on_ranks(seed=epoch + i)e_loss = 0e_num = 0# ...for train_dataloader in train_dataloaders:for data in train_dataloader:loss = train_step(data, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler)e_loss += loss[0]e_num += loss[1]if local_rank >= 0:torch.distributed.barrier()total = torch.tensor(copy.deepcopy([e_loss, e_num]),dtype=torch.float32,device=device,)torch.distributed.all_reduce(total, torch.distributed.ReduceOp.SUM, async_op=False)e_loss, e_num = total.tolist()e_loss /= e_num# if local_rank <= 0:if isinstance(model, torch.nn.parallel.DistributedDataParallel):model.module.save(opt.notes)   else:model.save(opt.notes)   scheduler.step(e_loss) # if need e_loss# scheduler.step()# 测试:每个GPU进程输出的模型第一层参数是相同的for param in model.parameters():print("    GPU{} Model param layer1=>".format(local_rank), param)breake_INDEX = eval_epcoh(model, val_dataloader, epoch, local_rank)return e_INDEXdef train_step(data, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler):data_a,label_a = datadata_a = data_a.to(device)label_a = label_a.to(device)# 这里要注意,如果使用的是自定义api 如这里的compute_loss该方法中 包含了模型的推理和损失计算过程,那么就需要使用model.module进行调用# 如果直接使用模型推理 那么直接调用 forward函数即可,如model(data_a)if isinstance(model, torch.nn.parallel.DistributedDataParallel):loss, _ = model.module.compute_loss(data_a, label_a)else:loss, _loss_dict_ = model.compute_loss(data_a, label_a)   optimizer.zero_grad() loss.backward() optimizer.step() return loss.item(), label_a.shape[0]

对于测试集的验证,可以不使用Sampler(验证集/测试集在每个GPU进程中都相同),仅在某一个GPU中,完整测试即可;

启动方式:

  • torchrun -m --nnodes=1 --nproc_per_node=4 main
  • nohup torchrun -m --nnodes=1 --nproc_per_node=4 main 127.0.0.1 5301 > /dev/null 2>&1 & disown

测试模型的第一层输出(part),可以看到它们都是相同的:

    GPU0 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158,  0.0723, -0.0191],[ 0.0715, -0.1204, -0.0295],[-0.0790,  0.0723,  0.0963],GPU3 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158,  0.0723, -0.0191],[ 0.0715, -0.1204, -0.0295],[-0.0790,  0.0723,  0.0963],GPU1 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158,  0.0723, -0.0191],[ 0.0715, -0.1204, -0.0295],[-0.0790,  0.0723,  0.0963],GPU2 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158,  0.0723, -0.0191],[ 0.0715, -0.1204, -0.0295],[-0.0790,  0.0723,  0.0963],                    

一点经验

其实主要的代码并不复杂,但是这里我要提到的一点经验是:如果你的模型参数在每个epoch结束之后(梯度同步更新完成)的打印结果,并不一致,并且你的DDP主要代码也与上述代码大同小异,那么很有可能是你的模型有问题(并不是不正确,只不过无法得到DDP的支持);

这也是我的项目所面临的问题:我使用了一个相对简单的模型,可以很好的实践DDP,但另外一个复杂许多的模型就无法通过测试(每个epoch后模型参数在各个GPU进程间会变的不同),具体的后续补充;

关于MyDistributedSampler

from torch.utils.data.sampler import Samplerclass MyDistributedSampler(Sampler):def __init__(self, data_path, m, batch_size, local_rank, total_rank, device, distribute=False):# 这里的读取 与 Dataset中 读取原始数据是一致的(我们使用的文件是excel)data_lines = pd.read_excel(data_path)self.distribute = distributeself.total_rank = -1self.local_rank = -1self.device = deviceif distribute:if not dist.is_available():raise RuntimeError("Requires distributed package to be available")self.total_rank = total_rankself.local_rank = local_rankself._m_per_class = mself._batch_size = batch_size# 拿到与单卡一致的所有数据self._labels_to_indices = self._get_labels_to_indices(data_lines)# 拿到与单卡一致的索引数据self._global_labels = (self._labels_to_indices.reset_index()["a_cls"].drop_duplicates().values.tolist())if not self.distribute:# 单卡情况 待操作数据 就是 所有的索引数据self.labels = self._global_labelselse:# 多卡时,则要按照 当前的local_rank 进行划分self.labels = self._split_label_randoms(seed=0)[self.local_rank]assert (self._batch_size % self._m_per_class) == 0, "m_per_class must divide batch_size without any remainder"# 采样器的样本数self._sample_length = self._get_sample_length()print("Init Sampler with Mper with {} items, and m = {}, batch_num = {}""\n".format(self._sample_length, m, self.num_iters()))returndef __iter__(self):# 创建一个迭代器:迭代长度 与样本数量一致 并初始化为0idx_list = [0] * self._sample_lengthi = 0num_iters = self.num_iters()for _ in range(num_iters):random.shuffle(self.labels)# 为使用HardTripletLoss计算损失,这里随机 固定个数的分类 以batch_size=32 m=4为例,则这里去的个数为8# 对于我们的数据集:实际count的最小关联数是6 因此将m设为4是合理的curr_label_set = self.labels[: self._batch_size // self._m_per_class]for label in curr_label_set:if len(self._labels_to_indices.loc[label]) >= self._m_per_class:items = self._labels_to_indices.loc[label].sample(self._m_per_class) else:items = self._labels_to_indices.loc[label].sample(self._m_per_class,replace=True)  t = []for item in items.index.values:cur_id = np.random.choice(self._labels_to_indices.loc[label, item]["ids"],1,replace=False,)[0]t.append(cur_id)# 确定采样规范 并返回,可以看到这里已经是一个符合要求的完整序列# 注意:对应的 Dataloader 不可以使用shuffleidx_list[i : i + self._m_per_class] = t[: self._m_per_class]i += self._m_per_classreturn iter(idx_list)# set_epoch的实现: 仅多卡的时候才需要调用def shuffle_data_on_ranks(self, seed, display_details=False):# 每个epoch都要重新划分各个GPU进程所分的的数据(它们彼此不重复)self.labels = self._split_label_randoms(seed)[self.local_rank]self._sample_length = self._get_sample_length()if display_details:display = sorted(self.labels)[:5]print("Shuffle sampler with {} items, rank = {}, total rank = {}, batch_num = {}, label of head{} is {}".format(self._sample_length,self.local_rank,self.total_rank,self.num_iters(),len(display),display,))def num_iters(self):# 样本数 整除 batch_size的 值,即整个训练集被迭代一次 所需要的次数return self._sample_length // self._batch_sizedef _get_sample_length(self):# 这就是实际要加载的样本数,由于这里的self.labels 已经根据local_rank完成了划分,因此这个sample_len就是每个GPU进程所分得得样本数sample_length = sum([self._labels_to_indices.loc[k]["count"].sum() for k in self.labels])# 由于梯度同步时,需要每个GPU进程都有数据,因此如下这段同步只要是为了 保证所有GPU进程中的采样数量是一致的(与最小值相同)(如果你的数据很规整 这段同步实际用不到)if self.total_rank >= 0:torch.distributed.barrier()total = torch.tensor([sample_length],dtype=torch.float32,device=self.device,)torch.distributed.all_reduce(total, torch.distributed.ReduceOp.MIN, async_op=True)sample_length = int(total.tolist()[0])sample_length -= sample_length % self._batch_sizereturn sample_lengthdef _split_label_randoms(self, seed):split_label = []global_label = self._global_labels.copy()random.Random(seed).shuffle(global_label)for i in range(self.total_rank):# 可以看到 索引数据会根据 total_rank 间隔取值split_label.append(global_label[i :: self.total_rank])return split_label# @staticmethoddef _get_labels_to_indices(self, data_lines):df = data_lines.groupby(["a_cls", "a_item"]).agg(count=("a_id", "count"), ids=("a_id", "unique"))return dfdef __len__(self):return self._sample_length

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/180457.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt 自定义标题栏

在Qt中&#xff0c;如果你想要自定义窗口的标题栏&#xff0c;你可以通过覆盖窗口的windowTitleChanged信号来实现。然而&#xff0c;直接修改Qt的标题栏可能会带来一些问题&#xff0c;因为Qt的设计是尽量使窗口系统的行为标准化。 以下是一个基本的示例&#xff0c;如何在Qt…

Java中的集合

Java中的集合 java.util 包中的集合 Java 集合框架提供了各种集合类&#xff0c;用于存储和管理对象。以下是 Java 集合框架中常见的集合类&#xff1a; List 接口表示一个有序的集合&#xff0c;其中的元素可以重复。List 接口有以下实现类&#xff1a; ArrayList&#xff1…

人工智能_机器学习053_支持向量机SVM目标函数推导_SVM条件_公式推导过程---人工智能工作笔记0093

然后我们再来看一下支持向量机SVM的公式推导情况 来看一下支持向量机是如何把现实问题转换成数学问题的. 首先我们来看这里的方程比如说,中间的黑线我们叫做l2 那么上边界线我们叫l1 下边界线叫做l3 如果我们假设l2的方程是上面这个方程WT.x+b = 0 那么这里 我们只要确定w和…

<Linux> 文件理解与操作

目录 前言&#xff1a; 一、关于文件的预备知识 二、C语言文件操作 1. fope 2. fclose 3. 文件写入 3.1 fprintf 3.2 snprintf 三、系统文件操作 1. open 2. close 3. write 4. read 四、C文件接口与系统文件IO的关系 五、文件描述符 1. 理解文件描述符 2. 文…

时延抖动和通信的本质

先从网络时延抖动的根源说起。 信息能否过去取决于信道容量&#xff0c;而信道利用率则取决于编码。这是香农定律决定的。 考虑到主机处理非常快&#xff0c;忽略处理时延&#xff0c;端到端时延就是信息传播时延&#xff0c;但现实中通信信道利用率非常不均匀&#xff0c;统…

一则 MongoDB 副本集迁移实操案例

文中详细阐述了通过全量 增量 Oplog 的迁移方式&#xff0c;完成一套副本集 MongoDB 迁移的全过程。 作者&#xff1a;张然&#xff0c;DBA 数据库技术爱好者~ 爱可生开源社区出品&#xff0c;原创内容未经授权不得随意使用&#xff0c;转载请联系小编并注明来源。 本文约 900…

python炒股自动化(1),量化交易接口区别

要实现股票量化程序化自动化&#xff0c;就需要券商提供的API接口&#xff0c;重点是个人账户小散户可以申请开通&#xff0c;上手要简单&#xff0c;接口要足够全面&#xff0c;功能完善&#xff0c;首先&#xff0c;第一步就是要找对渠道和方法&#xff0c;这里我们不讨论量化…

linux 内核等待队列

等待队列在Linux内核中用来阻塞或唤醒一个进程&#xff0c;也可以用来同步对系统资源的访问&#xff0c;还可以实现延迟功能 在软件开发中任务经常由于某种条件没有得到满足而不得不进入睡眠状态&#xff0c;然后等待条件得到满足的时候再继续运行&#xff0c;进入运行状态。这…

网络安全--基于Kali的网络扫描基础技术

文章目录 1. 标准ICMP扫描1.1使用Ping命令1.1.1格式1.1.2实战 1.2使用Nmap工具1.2.1格式1.2.2实战1.2.2.1主机在线1.2.2.2主机不在线 1.3使用Fping命令1.3.1格式1.3.2实战 2. 时间戳查询扫描2.1格式2.2实战 3. 地址掩码查询扫描3.1格式3.2实战 2. TCP扫描2.1TCP工作机制2.2TCP …

MySQL 索引类型

什么是索引&#xff1f; 索引是一种用于提高数据库查询性能的数据结构。它是在表中一个或多个列上创建的&#xff0c;可以加快对这些列的数据检索速度。 索引的作用是通过创建一个额外的数据结构&#xff0c;使得数据库可以更快地定位和访问数据。当执行查询语句时&#xff0c…

【数据库设计和SQL基础语法】--SQL语言概述--SQL的基本结构和语法规则(一)

一、SQL的基本结构 2.1 SQL语句的组成要素 SQL语句的组成要素 关键字&#xff08;Keywords&#xff09;: 定义&#xff1a;SQL语句的基本操作命令&#xff0c;表示要执行的动作。例子&#xff1a;SELECT、INSERT、UPDATE、DELETE等。 标识符&#xff08;Identifiers&#xf…

位运算总结

文章目录 &#x1f348;1. 基础位运算&#x1f34c;2. 给一个数n&#xff0c;确定它的二进制表示中的第x位是0还是1&#x1f34f;3. 将一个数n的二进制表示的第x位修改成1&#x1f353;4. 将一个数的n的二进制表示的第x位修改成0&#x1f954;5. 位图的思想&#x1fad2;6. 提前…

医疗智能化:人工智能的助力与隐患

文章目录 引言&#xff1a;积极影响风险和挑战 结尾&#xff1a; 引言&#xff1a; 医疗领域正处于人工智能技术革新的前沿。人工智能的涌现为医疗保健带来了前所未有的变革&#xff0c;同时也潜藏着一系列积极影响和潜在挑战。探索人工智能在医疗领域中的影响将有助于我们更深…

医保移动支付程序开发

作为公司最苦命的开发&#xff0c;年初接到任务开发医保移动支付程序&#xff08;微信小程序和支付宝小程序&#xff09;&#xff0c;为医疗机构提供线上医保结算。好家伙&#xff0c;我一看解压后资料大于一个G&#xff0c;内心无比的惊慌。 一、技术流程图 图太大了显示不全需…

0-1背包的初始化问题

题目链接 这道题的状态转移方程比较易于确定。dp[i][j]表示能放前i个物品的情况下&#xff0c;容量为j时能放物品的数量&#xff08;这道题歌曲数量对应物品数量&#xff0c;容量对应时间&#xff09;。 技巧&#xff08;收获&#xff09; 二维dp数组可以视情况优化为一维dp数组…

【创建一个组件并通过npm让其他人安装和调用】

创建一个组件并通过npm让其他人安装和调用 步骤一&#xff1a;创建一个组件步骤二&#xff1a;准备发布步骤三&#xff1a;注册npm账号并登录步骤四&#xff1a;发布组件步骤五&#xff1a;安装和使用组件 步骤一&#xff1a;创建一个组件 在本地创建一个新的文件夹来存放你的组…

Linux——vim编辑文件时——.swp文件解决方案

test.cpp样例 当我们vim test.cpp进入编辑文件。 却忘记了保存退出 再次进入就会出现一下画面 当你摁下Enter键位 出现以下几个选项 O——是只读不写 E——是正常打开文件但不会载入磁盘内容 R——覆盖——是加载存储磁盘的文件(当我们忘记保存时&#xff0c;系统会自动帮我…

事件代理?

1.什么是事件代理&#xff1f; 事件代理也叫事件委托&#xff0c;只指定一个事件处理程序&#xff0c;就可以管理某一类型得事件。 可以简单理解为&#xff0c;事件代理就是将本应该绑定子元素事件绑定给父元素代理。它的优点就是&#xff1a;减少事件得执行&#xff0c;减少浏…

CentOS 7 部署 MariaDB 的 2 种方法

有两种安装 MariaDB 服务器的方法。您可以安装 CentOS 7 存储库中可用的默认版本&#xff0c;也可以通过手动添加 MariaDB 存储库来安装最新版本。 如果安装过MariaDB或MySQL&#xff0c;使用以下命令彻底删除它们: yum remove mariadb* yum remove mysql* 方法一: 使用 Yum…

[ Vue3 ] 三种方式实现组件数据双向绑定

Vue3 三种方式实现组件数据双向绑定 在 Vue 中&#xff0c;组件数据双向绑定是一项非常重要的特性&#xff0c;它使得我们能够轻松地在组件中处理数据的变化并将其同步到视图 比如我们想要在父组件中修改数据能够同步给子组件&#xff0c;并且子组件修改数据也能同步给父组件…