为pytorch前向和反向的Tensor生成描述性统计
- 代码
在调试Megatron-DeepSpeed的精度时,我们希望对比每一层前向和反向传播的输入输出误差。然而,由于数据量过大,直接保存所有数据不太现实。因此,我们生成了输入输出tensor的描述性统计信息,并等间隔抽样N个数据点,以比较这些点的相对误差,从而查找精度异常的位置。为了准确定位,我们通过类名和对象ID生成唯一的对象名称(形式为[类名-创建的第几个])以及前向和反向传播的次数。通过保存上述信息,我们可以详细记录并回溯当时的实际输入输出数据。
代码
cat > linear_test.py <<-'EOF'
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from datetime import datetime# 设置设备
device = "cpu"if torch.cuda.is_available():device = "cuda:4"def is_tensor(val):# 判断是否为tensor或Parameterreturn isinstance(val, (torch.Tensor, nn.Parameter))def describe_tensor(tensor):# 返回tensor的描述,包括形状和部分数据统计信息shape = list(tensor.shape)tensor_data = tensor.cpu().float().detach().numpy().ravel()num_points = min(16, len(tensor_data))indices = np.linspace(0, len(tensor_data) - 1, num_points, dtype=int)stats = [np.max(tensor_data), np.min(tensor_data), np.mean(tensor_data), np.std(tensor_data)]sample_data = tensor_data[indices]stats_str = ",".join(f"{x:.5f}" for x in stats)sample_str = ",".join(f"{x:.5f}" for x in sample_data)return f"{shape}-{stats_str},{sample_str}"def generate_random_data(shape):# 生成符合指定形状的随机数据max_val, min_val, mean, std = 0.04025, -0.04651, 0.0, 0.00134data = np.random.normal(mean, std, shape)data = (data - data.min()) / (data.max() - data.min()) * (max_val - min_val) + min_valreturn dataindex_counter = 0def log_tensor_data(name, tensor):# 打印tensor的日志数据global index_counterindex_counter += 1timestamp = datetime.now().strftime("%H%M%S%f")if is_tensor(tensor):print(f"{timestamp},{index_counter},{name},0,{describe_tensor(tensor)}")elif isinstance(tensor, (tuple, list)):for idx, t in enumerate(tensor):if is_tensor(t):print(f"{timestamp},{index_counter},{name},{idx},{describe_tensor(t)}")def log_gradient(model):# 打印模型参数梯度信息for name, param in model.named_parameters():if param.grad is not None:log_tensor_data(f"grad-{name}", param.grad)# 对象和类名缓存
object_cache = {}
class_name_count = {}def get_unique_name(class_name, obj_id):# 生成唯一的对象名称if class_name not in class_name_count:class_name_count[class_name] = 0uid = f"{class_name}_{obj_id}"if uid not in object_cache:class_name_count[class_name] += 1object_cache[uid] = {"idx": class_name_count[class_name]}return f'{class_name}-{object_cache[uid]["idx"]}'def initialize_module_attributes(module):# 初始化模块属性if not hasattr(module, 'uuid'):module.uuid = get_unique_name(module.__class__.__name__, id(module))if not hasattr(module, 'backward_step'):module.backward_step = 0if not hasattr(module, 'forward_step'):module.forward_step = 0def forward_decorator():# 包装forward函数的修饰器def decorator(func):def wrapped(*args, **kwargs):module = args[0]initialize_module_attributes(module)module.forward_step += 1log_tensor_data(f"forward-{module.uuid}-{module.forward_step}-input", args)output = func(*args, **kwargs)log_tensor_data(f"forward-{module.uuid}-{module.forward_step}-output", output)return outputreturn wrappedreturn decoratordef pre_backward_hook(module, grad_input):# 反向传播前的钩子函数initialize_module_attributes(module)module.backward_step += 1log_tensor_data(f"backward-{module.uuid}-{module.backward_step}-input", grad_input)def post_backward_hook(module, grad_input, grad_output):# 反向传播后的钩子函数initialize_module_attributes(module)log_tensor_data(f"backward-{module.uuid}-{module.backward_step}-output", grad_output)def register_backward_hooks(module):# 注册反向传播钩子module.register_full_backward_pre_hook(pre_backward_hook)module.register_full_backward_hook(post_backward_hook)class CustomLinear(nn.Module):def __init__(self, shape):super(CustomLinear, self).__init__()weight_data = torch.from_numpy(generate_random_data(shape)).half().to(device)self.weight = nn.Parameter(weight_data)self.register_parameter('bias', None)register_backward_hooks(self)@forward_decorator()def forward(self, input_):return F.linear(input_, self.weight, self.bias)class MyModel(nn.Module):def __init__(self):super(MyModel, self).__init__()self.layer1 = CustomLinear((5504, 4096))self.layer2 = CustomLinear((4096, 5504))@forward_decorator()def forward(self, input_):out = self.layer1(input_)out = self.layer2(out)return out
# 设置随机种子
np.random.seed(1)
torch.manual_seed(2)# 创建和训练模型
model = MyModel().half().to(device)
model.train()input_data = torch.from_numpy(generate_random_data((1024, 12, 4096))).half().to(device)
target_data = torch.from_numpy(generate_random_data((1024, 12, 4096))).half().to(device)for _ in range(2):outputs = model(input_data)outputs.backward(target_data) # 使用全一的梯度来反向传播log_gradient(model)
EOF
python3 linear_test.py