如何使用torchrun启动单机多卡DDP并行训练
这是一个最近项目中需要使用的方式,新近的数据集大概在40w的规模,而且载入的原始特征都比较大(5~7M),所以准备尝试DistributedDataParallel;
主要目的有两个:
- 1是通过并行训练有效降低训练耗时,之前也使用过单机单卡,以及单机多卡DataParallel的方式;
- 2是通过使用并行训练的方式,倍增有效batch_size的数量,提高单卡对显存的利用率,仍然是为了加速;
类似的实现代码你可以在许多技术博客上检索到,以下是我实际实践的主要代码;
单机单卡到单机多卡的代码修改
# 导入库
import math
# import torch
from torch.utils.data import Dataset, DataLoader
import torch.distributed as dist ## DDP
from torch.utils.data.distributed import DistributedSampler ## DDP
from torch.nn.parallel import DistributedDataParallel as DDP ## DDP
from torch.distributed import init_process_group, destroy_process_group ## DDP# 设置可见的GPU
# os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
# 忽略冗余数据存在的潜在问题
# os.environ["KMP_DUPLICATE_LIB_OK"] = "True"def train():# setuplocal_rank = int(os.environ["LOCAL_RANK"]) ## DDP init_process_group(backend="nccl")torch.cuda.set_device(local_rank)# ...# local_rank = dist.get_rank()total_rank = dist.get_world_size()device = torch.device("cuda", local_rank)# random resetseed = 1234torch.manual_seed(seed)torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)random.seed(seed)np.random.seed(seed)# model = Model().to(device)if local_rank <= 0:print(model)# model.load(local_model_path)lr *= math.sqrt(total_rank)# optimizer = ...# scheduler = ...## train_datas = Dataset("train")sampler = DistributedSampler(train_datas)# 或者自定义# sampler = MyDistributedSampler(train_data_path)train_dataloader = DataLoader(train_datas,batch_size=batch_size,num_workers=num_workers,shuffle=(sampler is None),sampler=sampler,pin_memory=True,drop_last=True,)train_dataloaders = [train_dataloader]# warp modelmodel = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)model = DDP(model, device_ids=[local_rank], output_device=local_rank, find_unused_parameters=True)for epoch in range(max_epoch):e_INDEX = train_epoch(epoch, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler)dist.destroy_process_group()def train_epoch(epoch, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler):model.train()if local_rank >= 0:torch.distributed.barrier()# 如果使用了自定义的Sampler 记得实现一个对应set_epoch方法功能的新方法,并由采样器进行调用if local_rank >= 0:for i, train_dataloader in enumerate(train_dataloaders):# train_dataloader.sampler.set_epoch(epoch)train_dataloader.sampler.shuffle_data_on_ranks(seed=epoch + i)e_loss = 0e_num = 0# ...for train_dataloader in train_dataloaders:for data in train_dataloader:loss = train_step(data, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler)e_loss += loss[0]e_num += loss[1]if local_rank >= 0:torch.distributed.barrier()total = torch.tensor(copy.deepcopy([e_loss, e_num]),dtype=torch.float32,device=device,)torch.distributed.all_reduce(total, torch.distributed.ReduceOp.SUM, async_op=False)e_loss, e_num = total.tolist()e_loss /= e_num# if local_rank <= 0:if isinstance(model, torch.nn.parallel.DistributedDataParallel):model.module.save(opt.notes) else:model.save(opt.notes) scheduler.step(e_loss) # if need e_loss# scheduler.step()# 测试:每个GPU进程输出的模型第一层参数是相同的for param in model.parameters():print(" GPU{} Model param layer1=>".format(local_rank), param)breake_INDEX = eval_epcoh(model, val_dataloader, epoch, local_rank)return e_INDEXdef train_step(data, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler):data_a,label_a = datadata_a = data_a.to(device)label_a = label_a.to(device)# 这里要注意,如果使用的是自定义api 如这里的compute_loss该方法中 包含了模型的推理和损失计算过程,那么就需要使用model.module进行调用# 如果直接使用模型推理 那么直接调用 forward函数即可,如model(data_a)if isinstance(model, torch.nn.parallel.DistributedDataParallel):loss, _ = model.module.compute_loss(data_a, label_a)else:loss, _loss_dict_ = model.compute_loss(data_a, label_a) optimizer.zero_grad() loss.backward() optimizer.step() return loss.item(), label_a.shape[0]
对于测试集的验证,可以不使用Sampler(验证集/测试集在每个GPU进程中都相同),仅在某一个GPU中,完整测试即可;
启动方式:
torchrun -m --nnodes=1 --nproc_per_node=4 main
nohup torchrun -m --nnodes=1 --nproc_per_node=4 main 127.0.0.1 5301 > /dev/null 2>&1 & disown
测试模型的第一层输出(part),可以看到它们都是相同的:
GPU0 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158, 0.0723, -0.0191],[ 0.0715, -0.1204, -0.0295],[-0.0790, 0.0723, 0.0963],GPU3 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158, 0.0723, -0.0191],[ 0.0715, -0.1204, -0.0295],[-0.0790, 0.0723, 0.0963],GPU1 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158, 0.0723, -0.0191],[ 0.0715, -0.1204, -0.0295],[-0.0790, 0.0723, 0.0963],GPU2 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158, 0.0723, -0.0191],[ 0.0715, -0.1204, -0.0295],[-0.0790, 0.0723, 0.0963],
一点经验
其实主要的代码并不复杂,但是这里我要提到的一点经验是:如果你的模型参数在每个epoch结束之后(梯度同步更新完成)的打印结果,并不一致,并且你的DDP主要代码也与上述代码大同小异,那么很有可能是你的模型有问题(并不是不正确,只不过无法得到DDP的支持);
这也是我的项目所面临的问题:我使用了一个相对简单的模型,可以很好的实践DDP,但另外一个复杂许多的模型就无法通过测试(每个epoch后模型参数在各个GPU进程间会变的不同),具体的后续补充;
关于MyDistributedSampler
from torch.utils.data.sampler import Samplerclass MyDistributedSampler(Sampler):def __init__(self, data_path, m, batch_size, local_rank, total_rank, device, distribute=False):# 这里的读取 与 Dataset中 读取原始数据是一致的(我们使用的文件是excel)data_lines = pd.read_excel(data_path)self.distribute = distributeself.total_rank = -1self.local_rank = -1self.device = deviceif distribute:if not dist.is_available():raise RuntimeError("Requires distributed package to be available")self.total_rank = total_rankself.local_rank = local_rankself._m_per_class = mself._batch_size = batch_size# 拿到与单卡一致的所有数据self._labels_to_indices = self._get_labels_to_indices(data_lines)# 拿到与单卡一致的索引数据self._global_labels = (self._labels_to_indices.reset_index()["a_cls"].drop_duplicates().values.tolist())if not self.distribute:# 单卡情况 待操作数据 就是 所有的索引数据self.labels = self._global_labelselse:# 多卡时,则要按照 当前的local_rank 进行划分self.labels = self._split_label_randoms(seed=0)[self.local_rank]assert (self._batch_size % self._m_per_class) == 0, "m_per_class must divide batch_size without any remainder"# 采样器的样本数self._sample_length = self._get_sample_length()print("Init Sampler with Mper with {} items, and m = {}, batch_num = {}""\n".format(self._sample_length, m, self.num_iters()))returndef __iter__(self):# 创建一个迭代器:迭代长度 与样本数量一致 并初始化为0idx_list = [0] * self._sample_lengthi = 0num_iters = self.num_iters()for _ in range(num_iters):random.shuffle(self.labels)# 为使用HardTripletLoss计算损失,这里随机 固定个数的分类 以batch_size=32 m=4为例,则这里去的个数为8# 对于我们的数据集:实际count的最小关联数是6 因此将m设为4是合理的curr_label_set = self.labels[: self._batch_size // self._m_per_class]for label in curr_label_set:if len(self._labels_to_indices.loc[label]) >= self._m_per_class:items = self._labels_to_indices.loc[label].sample(self._m_per_class) else:items = self._labels_to_indices.loc[label].sample(self._m_per_class,replace=True) t = []for item in items.index.values:cur_id = np.random.choice(self._labels_to_indices.loc[label, item]["ids"],1,replace=False,)[0]t.append(cur_id)# 确定采样规范 并返回,可以看到这里已经是一个符合要求的完整序列# 注意:对应的 Dataloader 不可以使用shuffleidx_list[i : i + self._m_per_class] = t[: self._m_per_class]i += self._m_per_classreturn iter(idx_list)# set_epoch的实现: 仅多卡的时候才需要调用def shuffle_data_on_ranks(self, seed, display_details=False):# 每个epoch都要重新划分各个GPU进程所分的的数据(它们彼此不重复)self.labels = self._split_label_randoms(seed)[self.local_rank]self._sample_length = self._get_sample_length()if display_details:display = sorted(self.labels)[:5]print("Shuffle sampler with {} items, rank = {}, total rank = {}, batch_num = {}, label of head{} is {}".format(self._sample_length,self.local_rank,self.total_rank,self.num_iters(),len(display),display,))def num_iters(self):# 样本数 整除 batch_size的 值,即整个训练集被迭代一次 所需要的次数return self._sample_length // self._batch_sizedef _get_sample_length(self):# 这就是实际要加载的样本数,由于这里的self.labels 已经根据local_rank完成了划分,因此这个sample_len就是每个GPU进程所分得得样本数sample_length = sum([self._labels_to_indices.loc[k]["count"].sum() for k in self.labels])# 由于梯度同步时,需要每个GPU进程都有数据,因此如下这段同步只要是为了 保证所有GPU进程中的采样数量是一致的(与最小值相同)(如果你的数据很规整 这段同步实际用不到)if self.total_rank >= 0:torch.distributed.barrier()total = torch.tensor([sample_length],dtype=torch.float32,device=self.device,)torch.distributed.all_reduce(total, torch.distributed.ReduceOp.MIN, async_op=True)sample_length = int(total.tolist()[0])sample_length -= sample_length % self._batch_sizereturn sample_lengthdef _split_label_randoms(self, seed):split_label = []global_label = self._global_labels.copy()random.Random(seed).shuffle(global_label)for i in range(self.total_rank):# 可以看到 索引数据会根据 total_rank 间隔取值split_label.append(global_label[i :: self.total_rank])return split_label# @staticmethoddef _get_labels_to_indices(self, data_lines):df = data_lines.groupby(["a_cls", "a_item"]).agg(count=("a_id", "count"), ids=("a_id", "unique"))return dfdef __len__(self):return self._sample_length