在小米实习接到的第一个工作,是对MiLM进行长文本SFT。具体而言,部门内部的SFT代码之前并没有适配TP(tensor parallelism), 13B、30B模型训练单卡最多防线22K个token,长文本训练使用PoSE来增强,实际上还是不完全的阉割版。
在我来的时候,TP的框架已经被前一个实习生适配的差不多了,因而我基于此开始进行后续的训练,以及代码理解、代码调整与PR等
TP的概念图还比较容易理解,但是具体代码,影响到方方面面,需要在多处进行修改,我还是花了很大的功夫去进行理解,在此记录自己的路径与一些思考

TP与SP是什么,如何实现?

张量并行与序列并行细节分析

TP+SP流程:

参考:Xtuner

  • 32k数据——序列并行环境初始化(GPU分组,哪些GPU共同处理一个32k)——seq_len切分——embedding层——修改本来的mask、position id, label, input id——切分后的数据输入到attn中——q,k,v proj——all to all算子,使得qkv从seq切分转化为head切分——attn weight(qk)、softmax、attn_output计算(可能要用到flash,则定义lockmask_flash_attention_forward方法)——all to all,将原来的attn output从head切分转化成seq_len切分——attn output的transpose、reshape、o_proj——传入MiFeedForward,因为这个矩阵乘法结果等价,所有feedforward的序列并行也顺带着进行了——巴拉巴拉——进入下一个模块
  • (为什么关于切分之后norm的问题,因为norm是针对hidden dim的,所以每次pre norm时候由于都处于seq切分状态,所以norm不影响)

修改模型还需要注意细节,即dataloader加载序列并行数据的逻辑和原本的逻辑是完全不一样的
根源在于dataloader的_inner_training_loop这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from typing import Iterator, Optional

import math
import torch
from torch.utils.data import Sampler, Dataset

from .utils import (
get_group_num,
get_group_index,
)


class ParallelGroupSampler(Sampler):
def __init__(self, dataset: Dataset, group_num: Optional[int] = None,
group_index: Optional[int] = None, shuffle: bool = True,
seed: int = 42, drop_last: bool = False) -> None:
super().__init__(dataset)

if group_num is None:
group_num = get_group_num()

if group_index is None:
group_index = get_group_index()

self.dataset = dataset
self.group_num = group_num #### 此处!!!!!!!!!!
self.group_index = group_index #### 此处!!!!!!!!!!
self.epoch = 0
self.drop_last = drop_last

if self.drop_last and len(self.dataset) % self.group_num != 0: # type: ignore[arg-type]
self.num_samples = len(self.dataset) // self.group_num # type: ignore[arg-type]

else:
self.num_samples = math.ceil(len(self.dataset) / self.group_num) # type: ignore[arg-type]

self.total_size = self.num_samples * self.group_num
self.shuffle = shuffle
self.seed = seed

def __iter__(self) -> Iterator:
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]
else:
indices = list(range(len(self.dataset))) # type: ignore[arg-type]

if not self.drop_last:
# add extra samples to make it evenly divisible
padding_size = self.total_size - len(indices)
if padding_size <= len(indices):
indices += indices[:padding_size]
else:
indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
else:
# remove tail of data to make it evenly divisible.
indices = indices[:self.total_size]

assert len(indices) == self.total_size

# subsample
indices = indices[self.group_index:self.total_size:self.group_num] #### 此处!!!!!!!!!!
assert len(indices) == self.num_samples

return iter(indices)

def __len__(self) -> int:
return self.num_samples

def set_epoch(self, epoch: int) -> None:
self.epoch = epoch

理解这个为什么要这么写,需要顺序参考以下开源代码:
Sampler1
Sampler2
distributed env

然后,这个代码替换的是transformer源代码中trainer的一部分:

Deepspeed-Zero入门

官方文档

也可稍微看一下知乎解析

并行的不止模型本身

  • 解决了模型状态,再来看剩余状态,也就是激活值(activation)、临时缓冲区(buffer)以及显存碎片(fragmentation)。
  • 激活值同样使用分片方法,并且配合checkpointing
  • 模型训练过程中经常会创建一些大小不等的临时缓冲区,比如对梯度进行AllReduce啥的,解决办法就是预先创建一个固定的缓冲区,训练过程中不再动态创建,如果要传输的数据较小,则多组数据bucket后再一次性传输,提高效率
  • 显存出现碎片的一大原因是时候gradient checkpointing后,不断地创建和销毁那些不保存的激活值,解决方法是预先分配一块连续的显存,将常驻显存的模型状态和checkpointed activation存在里面,剩余显存用于动态创建和销毁discarded activation

Deepspeed Zero3(最~~~重要的一集)

DeepSpeed ZeroOffload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
for_parallel rank in range(world_size):
initialize_layers()
for batch in dataset:
x = forward(batch)
compute_loss(x,batch).backward()
backward(x.grad)
step()

def _is_owner(i):
return True if rank owns i else False

def initialize_layers():
for i in range(num_layers):
l = layers[i]
allocate_on_gpu l.param_fp16
if _is_owner(i):
allocate_on_cpu l.param_fp32
allocate_on_cpu l.optim_states_fp32
allocate_on_cpu l.cpu_grad

def forward(x):
for i in range(num_layers):
x = layers[i].forward(x)
return x

def backward(dx):
for i in range(num_layers, 0, -1):
dx=layers[i].backward(dx)
reduce(layers[i].grad, dest_rank = _owner_rank(i))
if _is_owner(i) l.cpu_grad.copy(l.grad)
else pass
del layers[i].grad

def step():
for i in range(num_layers):
l=layers[i]
if _is_owner(i):
update_in_cpu(l.optim_states_fp32,
l.cpu_grad,
l.param_fp32)
l.param_fp16.copy(l.param_fp32)
BROADCAST(l.param_fp16, src=_owner_rank(i))

想要理解为什么能够将optimizer states从GPU传到CPU上计算

反向传播过程,会获得当前layer的参数的梯度,最简单的参数更新来说,直接原参数 - lr*grad即可更新
然后,当前获得的梯度,也会传递到前一层,在前一层反向传播的过程中作为链式法则的一部分
那么,为什么offload中可以直接删除GPU上的grad呢?前一层不也要用吗?删除了之后如何传递?
似乎,是在backward中:

1
2
for i in range(num_layers, 0, -1):
dx=layers[i].backward(dx)

此处即完成了梯度传递给前一层。
实际上,获取上一层、计算当前层、传递给下一层,都集成在了这一个里面。