先来看一下WeighedRandomSampler:
[docs]class WeightedRandomSampler(Sampler[int]):r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights).Args:weights (sequence) : a sequence of weights, not necessary summing up to onenum_samples (int): number of samples to drawreplacement (bool): if ``True``, samples are drawn with replacement.If not, they are drawn without replacement, which means that when asample index is drawn for a row, it cannot be drawn again for that row.generator (Generator): Generator used in sampling.Example:>>> # xdoctest: +IGNORE_WANT("non-deterministic")>>> list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True))[4, 4, 1, 4, 5]>>> list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False))[0, 1, 4, 3, 2]"""weights: Tensornum_samples: intreplacement: booldef __init__(self, weights: Sequence[float], num_samples: int,replacement: bool = True, generator=None) -> None:if not isinstance(num_samples, int) or isinstance(num_samples, bool) or \num_samples <= 0:raise ValueError(f"num_samples should be a positive integer value, but got num_samples={num_samples}")if not isinstance(replacement, bool):raise ValueError(f"replacement should be a boolean value, but got replacement={replacement}")weights_tensor = torch.as_tensor(weights, dtype=torch.double)if len(weights_tensor.shape) != 1:raise ValueError("weights should be a 1d sequence but given "f"weights have shape {tuple(weights_tensor.shape)}")self.weights = weights_tensorself.num_samples = num_samplesself.replacement = replacementself.generator = generatordef __iter__(self) -> Iterator[int]:rand_tensor = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator)yield from iter(rand_tensor.tolist())def __len__(self) -> int:return self.num_samples
再来看一下分布式的采样器。
[docs]class DistributedSampler(Sampler[T_co]):r"""Sampler that restricts data loading to a subset of the dataset.It is especially useful in conjunction with:class:`torch.nn.parallel.DistributedDataParallel`. In such a case, eachprocess can pass a :class:`~torch.utils.data.DistributedSampler` instance as a:class:`~torch.utils.data.DataLoader` sampler, and load a subset of theoriginal dataset that is exclusive to it... note::Dataset is assumed to be of constant size and that any instance of it alwaysreturns the same elements in the same order.Args:dataset: Dataset used for sampling.num_replicas (int, optional): Number of processes participating indistributed training. By default, :attr:`world_size` is retrieved from thecurrent distributed group.rank (int, optional): Rank of the current process within :attr:`num_replicas`.By default, :attr:`rank` is retrieved from the current distributedgroup.shuffle (bool, optional): If ``True`` (default), sampler will shuffle theindices.seed (int, optional): random seed used to shuffle the sampler if:attr:`shuffle=True`. This number should be identical across allprocesses in the distributed group. Default: ``0``.drop_last (bool, optional): if ``True``, then the sampler will drop thetail of the data to make it evenly divisible across the number ofreplicas. If ``False``, the sampler will add extra indices to makethe data evenly divisible across the replicas. Default: ``False``... warning::In distributed mode, calling the :meth:`set_epoch` method atthe beginning of each epoch **before** creating the :class:`DataLoader` iteratoris necessary to make shuffling work properly across multiple epochs. Otherwise,the same ordering will be always used.Example::>>> # xdoctest: +SKIP>>> sampler = DistributedSampler(dataset) if is_distributed else None>>> loader = DataLoader(dataset, shuffle=(sampler is None),... sampler=sampler)>>> for epoch in range(start_epoch, n_epochs):... if is_distributed:... sampler.set_epoch(epoch)... train(loader)"""def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,rank: Optional[int] = None, shuffle: bool = True,seed: int = 0, drop_last: bool = False) -> None:if num_replicas is None:if not dist.is_available():raise RuntimeError("Requires distributed package to be available")num_replicas = dist.get_world_size()if rank is None:if not dist.is_available():raise RuntimeError("Requires distributed package to be available")rank = dist.get_rank()if rank >= num_replicas or rank < 0:raise ValueError(f"Invalid rank {rank}, rank should be in the interval [0, {num_replicas - 1}]")self.dataset = datasetself.num_replicas = num_replicasself.rank = rankself.epoch = 0self.drop_last = drop_last# If the dataset length is evenly divisible by # of replicas, then there# is no need to drop any data, since the dataset will be split equally.if self.drop_last and len(self.dataset) % self.num_replicas != 0: # type: ignore[arg-type]# Split to nearest available length that is evenly divisible.# This is to ensure each rank receives the same amount of data when# using this Sampler.self.num_samples = math.ceil((len(self.dataset) - self.num_replicas) / self.num_replicas # type: ignore[arg-type])else:self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) # type: ignore[arg-type]self.total_size = self.num_samples * self.num_replicasself.shuffle = shuffleself.seed = seeddef __iter__(self) -> Iterator[T_co]:if self.shuffle:# deterministically shuffle based on epoch and seedg = torch.Generator()g.manual_seed(self.seed + self.epoch)indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]else:indices = list(range(len(self.dataset))) # type: ignore[arg-type]if not self.drop_last:# add extra samples to make it evenly divisiblepadding_size = self.total_size - len(indices)if padding_size <= len(indices):indices += indices[:padding_size]else:indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]else:# remove tail of data to make it evenly divisible.indices = indices[:self.total_size]assert len(indices) == self.total_size# subsampleindices = indices[self.rank:self.total_size:self.num_replicas]assert len(indices) == self.num_samplesreturn iter(indices)def __len__(self) -> int:return self.num_samplesdef set_epoch(self, epoch: int) -> None:r"""Set the epoch for this sampler.When :attr:`shuffle=True`, this ensures all replicasuse a different random ordering for each epoch. Otherwise, the next iteration of thissampler will yield the same ordering.Args:epoch (int): Epoch number."""self.epoch = epoch
最后是WeightedRandomSamplerDDP
class WeightedRandomSamplerDDP(torch.utils.data.distributed.DistributedSampler):r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights).Args:data_set: Dataset used for sampling.weights (sequence) : a sequence of weights, not necessary summing up to onenum_replicas (int, optional): Number of processes participating indistributed training. By default, :attr:`world_size` is retrieved from thecurrent distributed group.rank (int, optional): Rank of the current process within :attr:`num_replicas`.By default, :attr:`rank` is retrieved from the current distributedgroup.num_samples (int): number of samples to drawreplacement (bool): if ``True``, samples are drawn with replacement.If not, they are drawn without replacement, which means that when asample index is drawn for a row, it cannot be drawn again for that row.generator (Generator): Generator used in sampling."""weights: torch.Tensornum_samples: intreplacement: booldef __init__(self, data_set, weights: Sequence[float], num_samples: int,num_replicas: Optional[int] = None, rank: Optional[int] = None, shuffle: bool = True,seed: int = 0, drop_last: bool = True, replacement: bool = True, generator=None) -> None:super(WeightedRandomSamplerDDP, self).__init__(data_set, num_replicas, rank, shuffle, seed, drop_last)self.weights = torch.as_tensor(weights, dtype=torch.double)if self.drop_last and len(self.dataset) % self.num_replicas != 0: # type: ignore[arg-type]# Split to nearest available length that is evenly divisible.# This is to ensure each rank receives the same amount of data when# using this Sampler.self.num_samples = math.ceil((len(self.dataset) - self.num_replicas) / self.num_replicas # type: ignore[arg-type])else:self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) # type: ignore[arg-type]self.replacement = replacementself.generator = generatorself.weights = self.weights[self.rank::self.num_replicas]self.num_samples = self.num_samples // self.num_replicasdef __iter__(self):rand_tensor = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator)rand_tensor = self.rank + rand_tensor * self.num_replicasreturn iter(rand_tensor.tolist())def __len__(self):return self.num_samples