def__iter__(self) -> Iterator: if self.shuffle: # deterministically shuffle based on epoch and seed g = torch.Generator() g.manual_seed(self.seed + self.epoch) indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type] else: indices = list(range(len(self.dataset))) # type: ignore[arg-type]
ifnot self.drop_last: # add extra samples to make it evenly divisible padding_size = self.total_size - len(indices) if padding_size <= len(indices): indices += indices[:padding_size] else: indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size] else: # remove tail of data to make it evenly divisible. indices = indices[:self.total_size]
assertlen(indices) == self.total_size
# subsample indices = indices[self.group_index:self.total_size:self.group_num] #### 此处!!!!!!!!!! assertlen(indices) == self.num_samples
for_parallel rank inrange(world_size): initialize_layers() for batch in dataset: x = forward(batch) compute_loss(x,batch).backward() backward(x.grad) step()
def_is_owner(i): returnTrueif rank owns i elseFalse
definitialize_layers(): for i inrange(num_layers): l = layers[i] allocate_on_gpu l.param_fp16 if _is_owner(i): allocate_on_cpu l.param_fp32 allocate_on_cpu l.optim_states_fp32 allocate_on_cpu l.cpu_grad
defforward(x): for i inrange(num_layers): x = layers[i].forward(x) return x
defbackward(dx): for i inrange(num_layers, 0, -1): dx=layers[i].backward(dx) reduce(layers[i].grad, dest_rank = _owner_rank(i)) if _is_owner(i) l.cpu_grad.copy(l.grad) elsepass del layers[i].grad
defstep(): for i inrange(num_layers): l=layers[i] if _is_owner(i): update_in_cpu(l.optim_states_fp32, l.cpu_grad, l.param_fp32) l.param_fp16.copy(l.param_fp32) BROADCAST(l.param_fp16, src=_owner_rank(i))