【深度学习】单机多卡 | DataParallel将计算任务在多个 GPU 上并行执行,可以在多个 GPU 上分摊工作负载,从而加快训练速度
- 写在最前面
- DataParallel (DP) 简介
- 使用 DataParallel 的场景
- 使用 DataParallel 的基本步骤
- 代码部分
- train.py
- 简单的代码示例
- 代码解析
- DataParallel 的局限性
- 小结
写在最前面
希望在单机多卡的模式下运行我的模型代码,加快训练速度。
请教吕博:如何更改代码?
其中,提到模型先用DP方式运行
DP是什么?又被学到了一个知识点。
在深度学习和分布式计算领域,DP 通常指的是 DataParallel
。DataParallel
是一种将计算任务在多个 GPU 上并行执行的方法。它在单机多卡环境中非常有用,可以在多个 GPU 上分摊工作负载,从而加快训练速度。
DataParallel (DP) 简介
torch.nn.DataParallel
是 PyTorch 中的一个工具,可以让模型在多个 GPU 上并行运行。它通过将输入批次拆分成多个子批次,每个子批次发送到不同的 GPU 上,并行执行前向传播和反向传播,然后将每个 GPU 上的梯度聚合到主 GPU 上进行参数更新。
使用 DataParallel 的场景
- 单机多卡训练: 当你有一台机器配备了多块 GPU,并希望利用所有的 GPU 资源来加速模型训练时,
DataParallel
是一个简单而有效的解决方案。 - 简化代码: 相比于更复杂的分布式训练方案,
DataParallel
提供了一种较为简化的方式来实现多 GPU 并行训练,通常只需要对模型进行简单包装。
使用 DataParallel 的基本步骤
- 定义模型: 创建你的神经网络模型。
- 包装模型: 使用
torch.nn.DataParallel
包装你的模型。 - 将模型和数据迁移到 GPU: 使用
.to(device)
将模型和输入数据迁移到合适的设备上。 - 训练模型: 按照常规方式训练模型。
代码部分
train.py
仅展示相关部分
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
# import部分省略def evaluate(model, device, dataloader):model.eval()total_loss, total_step = 0.0, 0.0# 使用with torch.no_grad()来禁用梯度计算with torch.no_grad():# 对dataloader中的每个batch进行遍历for step, batch in enumerate(dataloader):# 将batch中的数据移动到指定设备上batch = tuple(t.to(device) for t in batch)input_ids, attention_mask, decoder_input_ids, decoder_attention_mask, labels = batch# 通过模型进行前向传播,并获取输出结果outputs = model(input_ids, attention_mask=attention_mask, decoder_input_ids=decoder_input_ids,decoder_attention_mask=decoder_attention_mask, labels=labels)# 获取模型输出结果中的loss值loss = outputs['loss']# 累加总损失和总步数total_loss += loss.item()total_step += 1# 返回总损失和Nonereturn total_loss / total_step, None# 设置随机数种子、日志路径
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 初始化tokenizer、添加特殊的tokens
# define dataloader 定义数据加载器
# 每批的个数4/梯度累计个数4
batch_size = int(args.batch_size / args.gradient_accumulation_steps)
# from processing.dataset import BartDataset
# return input_ids, attention_mask, decoder_input_ids, decoder_attention_mask, labels
trian_dataset = BartDataset(tokenizer, args, mode='train')
train_dataloader = DataLoader(dataset=trian_dataset,batch_size=batch_size,shuffle=True,collate_fn=trian_dataset.collate_fn,num_workers=20 # 优化数据加载
)eval_dataset = BartDataset(tokenizer, args, mode='test')
eval_dataloader = DataLoader(dataset=eval_dataset,batch_size=batch_size,shuffle=False,collate_fn=eval_dataset.collate_fn,num_workers=20 # 优化数据加载
)# define model 实例化模型# 检查GPU数量并设置DataParallel
if torch.cuda.device_count() > 1:print(f"Using {torch.cuda.device_count()} GPUs")net = nn.DataParallel(model)
else:print("Using single GPU or CPU")net = model# define criterion 定义损失函数# define optimizer优化器
# 参考:https://blog.csdn.net/hottie_xiaomiao/article/details/124392847
# 打印每一次迭代元素的名字和param
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
# 参数组:每组参数可以指定自己的优化器参数,即可使用不同的优化策略
optimizer_grouped_parameters = [{'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},{'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]
optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=args.bart_lr)
# 计算总步数
total_steps = int(len(trian_dataset) * args.epochs / args.gradient_accumulation_steps)
# 初始化学习率调整器
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=int(args.warmup_steps * total_steps),num_training_steps=total_steps)# Begin training
# logger部分省略# 定义一个初始的最好评估损失,即正无穷大
best_eval_loss = float('inf')
# 定义当前的步骤和当前损失
current_step, current_loss = 0, 0
# 定义全局步骤数
global_step = 0# 对于每个 epoch 进行循环
for epoch in range(args.epochs):# 将模型设置为训练模式model.train()# 对训练数据集进行循环for step, batch in enumerate(train_dataloader):# 将batch中的每一个tensor都移动到指定的设备上(如GPU)batch = tuple(t.to(device) for t in batch)# 从batch中获取输入,注意这里的命名方式要和模型中forward函数中的输入命名相同input_ids, attention_mask, decoder_input_ids, decoder_attention_mask, labels = batch# 将输入传递给模型进行前向计算outputs = model(input_ids, attention_mask=attention_mask, decoder_input_ids=decoder_input_ids,decoder_attention_mask=decoder_attention_mask, labels=labels)# TODO:1# print(outputs)# 获取模型的损失loss = outputs['loss']# 记录当前损失和步骤数,用于计算平均损失current_loss += loss.item()current_step += 1# 如果使用了梯度累积,则将损失除以累积步骤数if args.gradient_accumulation_steps > 1:loss = loss / args.gradient_accumulation_steps# 反向传播计算梯度loss.backward()# 将梯度进行裁剪,以防止梯度爆炸clip_grad_norm_(model.parameters(), args.max_clip_norm)# 如果达到了梯度累积的步骤数,则进行一次优化更新if (step + 1) % args.gradient_accumulation_steps == 0:optimizer.step()scheduler.step()optimizer.zero_grad()global_step += 1# 如果当前步骤是一个log间隔的倍数,则打印日志信息,清空当前步骤和当前损失# 在训练完一个 epoch 后,对模型在验证集上进行评估eval_loss, _ = evaluate(model, device, eval_dataloader)logger.info("Eval loss: {:.6f}, the best loss: {:.6f}".format(eval_loss, best_eval_loss))# 如果当前的评估损失比之前的最好评估损失更小,则更新最好评估损失if eval_loss < best_eval_loss:best_eval_loss = eval_loss# 创建一个输出目录,用于存储模型的输出# 在日志中输出检查点保存路径,将模型、tokenizer、args的设置保存到检查点路径中
简单的代码示例
以下是使用 DataParallel
在多 GPU 上运行模型的一个简单示例:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset# 设置环境变量,指定使用的GPU
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,6,7"# 定义设备
globalDevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 定义一个简单的CNN模型
class CNN(nn.Module):def __init__(self):super(CNN, self).__init__()self.conv = nn.Conv2d(3, 16, 3, 1)self.fc = nn.Linear(16 * 26 * 26, 10)def forward(self, x):x = self.conv(x)x = torch.relu(x)x = x.view(x.size(0), -1)x = self.fc(x)return x# 实例化模型
cnn = CNN().to(globalDevice)# 检查GPU数量并设置DataParallel
if torch.cuda.device_count() > 1:print(f"Using {torch.cuda.device_count()} GPUs")net = nn.DataParallel(cnn)
else:print("Using single GPU or CPU")net = cnn# 定义数据集和数据加载器
class SimpleDataset(Dataset):def __init__(self, size):self.size = sizedef __len__(self):return self.sizedef __getitem__(self, idx):return torch.randn(3, 28, 28), torch.tensor(1)dataset = SimpleDataset(1000)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, num_workers=20)# 定义优化器和损失函数
optimizer = optim.SGD(net.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()# 简单的训练过程
for epoch in range(args.epochs):for inputs, labels in dataloader:inputs, labels = inputs.to(globalDevice), labels.to(globalDevice)optimizer.zero_grad()outputs = net(inputs)loss = criterion(outputs, labels)loss.backward()optimizer.step()print(f"Epoch {epoch+1}, Loss: {loss.item()}")
代码解析
-
环境变量设置:
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,6,7"
指定要使用的 GPU。
-
定义设备:
globalDevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")
根据是否有可用的 GPU 设置设备。
-
定义模型:
class CNN(nn.Module):...
-
包装模型:
if torch.cuda.device_count() > 1:net = nn.DataParallel(cnn) else:net = cnn
如果检测到多个 GPU,使用
DataParallel
包装模型。 -
数据加载器:
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, num_workers=20)
使用
num_workers
参数优化数据加载。 -
训练过程:
for epoch in range(2):...
DataParallel 的局限性
- 数据并行粒度:
DataParallel
进行的是数据并行操作,每个 GPU 处理一部分数据批次。这可能导致 GPU 利用率不均衡,尤其是在有计算负载差异的情况下。 - 单节点限制:
DataParallel
主要用于单节点多 GPU。如果需要跨节点并行(分布式训练),应该考虑使用torch.nn.parallel.DistributedDataParallel
。
小结
DataParallel
是 PyTorch 提供的一种简单易用的多 GPU 并行方法,适合单节点多卡训练。通过这种方法,可以在多个 GPU 上分摊计算任务,提高训练速度和效率。对于更复杂的分布式计算任务,可以考虑使用 DistributedDataParallel
。
欢迎大家添加好友交流。