Qwen 微调脚本分析 Qwen/finetune.py
Qwen/finetune.py :
# 基于fastchat和tatsu-lab/stanford_alpaca的修订代码,用于训练语言模型
# 提供使用LoRA(低秩适应)和量化(QLoRA)压缩的选项,以及使用DeepSpeed的分布式训练支持# 导入各种必要的库和模块
from dataclasses import dataclass, field
import json
import math
import logging
import os
from typing import Dict, Optional, List
import torch
from torch.utils.data import Dataset
from deepspeed import zero
from deepspeed.runtime.zero.partition_parameters import ZeroParamStatus
import transformers
from transformers import Trainer, GPTQConfig, deepspeed
from transformers.trainer_pt_utils import LabelSmoother
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from accelerate.utils import DistributedType# 忽略标记ID,来自LabelSmoother的忽略索引
IGNORE_TOKEN_ID = LabelSmoother.ignore_index# 模型参数类,定义模型相关的参数
@dataclass
class ModelArguments:model_name_or_path: Optional[str] = field(default="Qwen/Qwen-7B")# 数据参数类,定义数据集相关的参数
@dataclass
class DataArguments:data_path: str = field(default=None, metadata={"help": "Path to the training data."})eval_data_path: str = field(default=None, metadata={"help": "Path to the evaluation data."})lazy_preprocess: bool = False# 训练参数类,继承自transformers的TrainingArguments,定义训练相关的参数
@dataclass
class TrainingArguments(transformers.TrainingArguments):cache_dir: Optional[str] = field(default=None)optim: str = field(default="adamw_torch")model_max_length: int = field(default=8192,metadata={"help": "Maximum sequence length. Sequences will be right padded (and possibly truncated)."},)use_lora: bool = False# Lora参数类,定义Lora模型压缩相关的参数
@dataclass
class LoraArguments:lora_r: int = 64lora_alpha: int = 16lora_dropout: float = 0.05lora_target_modules: List[str] = field(default_factory=lambda: ["c_attn", "c_proj", "w1", "w2"])lora_weight_path: str = ""lora_bias: str = "none"q_lora: bool = False# 将参数从DeepSpeed的Zero优化器转换为可用于其他操作的形式
def maybe_zero_3(param):if hasattr(param, "ds_id"):assert param.ds_status == ZeroParamStatus.NOT_AVAILABLE# 如果使用Zero-3,将参数转移到CPU并克隆with zero.GatheredParameters([param]):param = param.data.detach().cpu().clone()else:# 否则,直接将参数转移到CPU并克隆param = param.detach().cpu().clone()return param# 从模型中获取PEFT状态字典,可能涉及Zero优化器的转换
def get_peft_state_maybe_zero_3(named_params, bias):# 根据bias参数选择要返回的参数部分if bias == "none":to_return = {k: t for k, t in named_params if "lora_" in k}elif bias == "all":to_return = {k: t for k, t in named_params if "lora_" in k or "bias" in k}elif bias == "lora_only":to_return = {}maybe_lora_bias = {}lora_bias_names = set()for k, t in named_params:if "lora_" in k:to_return[k] = tbias_name = k.split("lora_")[0] + "bias"lora_bias_names.add(bias_name)elif "bias" in k:maybe_lora_bias[k] = tfor k, t in maybe_lora_bias:if bias_name in lora_bias_names:to_return[bias_name] = telse:raise NotImplementedError# 对选取的参数进行必要的转换to_return = {k: maybe_zero_3(v) for k, v in to_return.items()}return to_return# 获取本地排名,如果未设置则为None
local_rank = None# 在本地排名为0时打印信息
def rank0_print(*args):if local_rank == 0:print(*args)def safe_save_model_for_hf_trainer(trainer: transformers.Trainer, output_dir: str, bias="none"):"""根据transformers.Trainer实例的安全方式保存模型到指定目录。参数:- trainer: transformers.Trainer 实例,代表一个训练器,用于获取模型状态等。- output_dir: str,指定保存模型的输出目录。- bias: str,默认为"none",表示处理模型偏置的方式,本函数暂不支持更改偏置。说明:- 该函数会根据是否启用zero3模式、是否使用LORA,来决定如何获取模型的状态字典,并保存至磁盘。- 仅当trainer的args.should_save标志为True且args.local_rank为0时,才会真正保存模型。"""# 检查是否启用了zero3模式if deepspeed.is_deepspeed_zero3_enabled():state_dict = trainer.model_wrapped._zero3_consolidated_16bit_state_dict()else:# 根据是否使用LORA,选择不同的方式获取状态字典if trainer.args.use_lora:state_dict = get_peft_state_maybe_zero_3(trainer.model.named_parameters(), bias)else:state_dict = trainer.model.state_dict()# 条件满足时,执行模型保存操作if trainer.args.should_save and trainer.args.local_rank == 0:trainer._save(output_dir, state_dict=state_dict)def preprocess(sources,tokenizer: transformers.PreTrainedTokenizer,max_len: int,system_message: str = "You are a helpful assistant."
) -> Dict:"""对输入源数据进行预处理,包括编码、填充等,以便于输入到模型中进行训练或推理。参数:- sources: 输入源数据列表,每个源数据是一个字典列表,表示对话中的不同角色的句子。- tokenizer: transformers.PreTrainedTokenizer,用于对文本进行编码的分词器。- max_len: int,指定编码后序列的最大长度。- system_message: str,默认为"You are a helpful assistant.",表示系统消息。返回值:- 预处理后数据的字典,包括input_ids(编码后的输入序列)、labels(对应的标签序列)和attention_mask(注意力掩码)。说明:- 该函数首先定义了不同角色的起始标记,然后根据输入源数据,应用模板并进行编码,同时生成对应的标签序列。- 最后,对编码后的序列进行填充至最大长度,并返回预处理后的数据。"""# 定义不同角色的标记roles = {"user": "<|im_start|>user", "assistant": "<|im_start|>assistant"}# 获取tokenizer的开始和结束标记的IDim_start = tokenizer.im_start_idim_end = tokenizer.im_end_id# 将换行符转换为input_idsnl_tokens = tokenizer('\n').input_ids# 为不同的角色创建对应的标记序列_system = tokenizer('system').input_ids + nl_tokens_user = tokenizer('user').input_ids + nl_tokens_assistant = tokenizer('assistant').input_ids + nl_tokens# 初始化存储输入ID和目标ID的列表input_ids, targets = [], []# 遍历每个对话数据for i, source in enumerate(sources):# 如果对话的第一个消息不是用户的消息,则跳过它if roles[source[0]["from"]] != roles["user"]:source = source[1:]# 初始化当前对话的输入ID和目标ID列表input_id, target = [], []# 创建系统消息的输入ID序列,并添加到input_idsystem = [im_start] + _system + tokenizer(system_message).input_ids + [im_end] + nl_tokensinput_id += system# 创建系统消息的目标ID序列,并添加到target,忽略序列中的某些部分target += [im_start] + [IGNORE_TOKEN_ID] * (len(system)-3) + [im_end] + nl_tokens# 确保input_id和target的长度相同assert len(input_id) == len(target)# 遍历对话中的每个句子for j, sentence in enumerate(source):role = roles[sentence["from"]]# 创建当前句子的输入ID序列_input_id = tokenizer(role).input_ids + nl_tokens + \tokenizer(sentence["value"]).input_ids + [im_end] + nl_tokensinput_id += _input_id# 根据角色创建当前句子的目标ID序列,并添加到targetif role == '<|im_start|>user':_target = [im_start] + [IGNORE_TOKEN_ID] * (len(_input_id)-3) + [im_end] + nl_tokenselif role == '<|im_start|>assistant':_target = [im_start] + [IGNORE_TOKEN_ID] * len(tokenizer(role).input_ids) + \_input_id[len(tokenizer(role).input_ids)+1:-2] + [im_end] + nl_tokenselse:raise NotImplementedErrortarget += _target# 确保input_id和target的长度相同assert len(input_id) == len(target)# 如果input_id序列长度小于max_len,则用padding填充input_id += [tokenizer.pad_token_id] * (max_len - len(input_id))# 对应地,target序列也进行填充target += [IGNORE_TOKEN_ID] * (max_len - len(target))# 将当前对话的input_id和target添加到列表中input_ids.append(input_id[:max_len])targets.append(target[:max_len])# 将input_ids和targets列表转换为张量input_ids = torch.tensor(input_ids, dtype=torch.int)targets = torch.tensor(targets, dtype=torch.int)# 创建包含input_ids、labels和attention_mask的字典,并返回return dict(input_ids=input_ids,labels=targets,attention_mask=input_ids.ne(tokenizer.pad_token_id), # 创建attention_mask,排除padding部分)class SupervisedDataset(Dataset):"""Dataset for supervised fine-tuning."""# 构造函数,初始化SupervisedDataset类的实例def __init__(self, raw_data, tokenizer: transformers.PreTrainedTokenizer, max_len: int):super(SupervisedDataset, self).__init__() # 调用基类的构造函数# 在rank 0进程中打印格式化输入信息rank0_print("Formatting inputs...")# 从原始数据中提取对话数据sources = [example["conversations"] for example in raw_data]# 使用preprocess函数预处理对话数据data_dict = preprocess(sources, tokenizer, max_len)# 将预处理后的数据赋值给实例变量self.input_ids = data_dict["input_ids"]self.labels = data_dict["labels"]self.attention_mask = data_dict["attention_mask"]# 返回数据集中样本的数量def __len__(self):return len(self.input_ids)# 根据索引i获取数据集中的样本def __getitem__(self, i) -> Dict[str, torch.Tensor]:return dict(input_ids=self.input_ids[i],labels=self.labels[i],attention_mask=self.attention_mask[i],)# 定义一个函数,用于创建监督式微调的数据模块
def make_supervised_data_module(tokenizer: transformers.PreTrainedTokenizer, # 用于分词的tokenizer对象data_args, # 数据参数,包含数据路径和是否使用惰性加载等配置max_len, # 输入序列的最大长度
) -> Dict:# 根据是否采用惰性加载选择数据集类dataset_cls = (LazySupervisedDataset if data_args.lazy_preprocess else SupervisedDataset)# 在rank 0进程中打印加载数据信息rank0_print("Loading data...")# 打开并加载训练数据的JSON文件train_json = json.load(open(data_args.data_path, "r"))# 创建训练数据集实例train_dataset = dataset_cls(train_json, tokenizer=tokenizer, max_len=max_len)# 如果提供了评估数据路径,则加载评估数据集if data_args.eval_data_path:eval_json = json.load(open(data_args.eval_data_path, "r"))eval_dataset = dataset_cls(eval_json, tokenizer=tokenizer, max_len=max_len)else:# 如果没有提供评估数据路径,则设置评估数据集为Noneeval_dataset = None# 返回包含训练数据集和评估数据集的字典return dict(train_dataset=train_dataset, eval_dataset=eval_dataset)这段代码定义了一个名为 `train` 的函数,它是启动模型训练流程的主函数。下面是对函数中每一行的逐行注释:```python
# 定义训练函数
def train():global local_rank # 声明local_rank为全局变量,以便在函数内部修改其值# 创建一个参数解析器,用于解析命令行参数到数据类parser = transformers.HfArgumentParser((ModelArguments, DataArguments, TrainingArguments, LoraArguments))# 解析命令行参数到对应的数据类实例(model_args,data_args,training_args,lora_args,) = parser.parse_args_into_dataclasses()# 如果使用DeepSpeed并且是单GPU环境,设置分布式类型为DeepSpeedif getattr(training_args, 'deepspeed', None) and int(os.environ.get("WORLD_SIZE", 1))==1:training_args.distributed_state.distributed_type = DistributedType.DEEPSPEED# 设置local_rank为训练参数中的local_rank值local_rank = training_args.local_rank# 初始化设备映射和世界大小device_map = Noneworld_size = int(os.environ.get("WORLD_SIZE", 1))ddp = world_size != 1# 如果使用QLoRA并且是分布式数据并行(DDP)环境,设置设备映射if lora_args.q_lora:device_map = {"": int(os.environ.get("LOCAL_RANK") or 0)} if ddp else "auto"# 如果同时使用FSDP或ZeRO-3和QLoRA,输出警告信息if len(training_args.fsdp) > 0 or deepspeed.is_deepspeed_zero3_enabled():logging.warning("FSDP or ZeRO3 are incompatible with QLoRA.")# 检查模型是否为聊天模型is_chat_model = 'chat' in model_args.model_name_or_path.lower()# 如果使用LoRA并且启用了ZeRO-3,但不是聊天模型,抛出错误if (training_args.use_loraand not lora_args.q_loraand deepspeed.is_deepspeed_zero3_enabled()and not is_chat_model):raise RuntimeError("ZeRO3 is incompatible with LoRA when finetuning on base model.")# 设置模型加载参数model_load_kwargs = {'low_cpu_mem_usage': not deepspeed.is_deepspeed_zero3_enabled(),}# 加载模型配置config = transformers.AutoConfig.from_pretrained(model_args.model_name_or_path,cache_dir=training_args.cache_dir,trust_remote_code=True,)# 禁用缓存config.use_cache = False# 加载模型和分词器model = transformers.AutoModelForCausalLM.from_pretrained(model_args.model_name_or_path,config=config,cache_dir=training_args.cache_dir,device_map=device_map,trust_remote_code=True,# 如果使用LoRA和QLoRA,设置量化配置quantization_config=GPTQConfig(bits=4, disable_exllama=True)if training_args.use_lora and lora_args.q_loraelse None,**model_load_kwargs,)tokenizer = transformers.AutoTokenizer.from_pretrained(model_args.model_name_or_path,cache_dir=training_args.cache_dir,model_max_length=training_args.model_max_length,padding_side="right",use_fast=False,trust_remote_code=True,)# 设置分词器的填充标记IDtokenizer.pad_token_id = tokenizer.eod_id# 如果使用LoRAif training_args.use_lora:if lora_args.q_lora or is_chat_model:modules_to_save = Noneelse:modules_to_save = ["wte", "lm_head"]# 创建LoRA配置lora_config = LoraConfig(r=lora_args.lora_r,lora_alpha=lora_args.lora_alpha,target_modules=lora_args.lora_target_modules,lora_dropout=lora_args.lora_dropout,bias=lora_args.lora_bias,task_type="CAUSAL_LM",modules_to_save=modules_to_save # 用于添加新标记的参数)# 如果使用QLoRA,准备模型进行k-bit训练if lora_args.q_lora:model = prepare_model_for_kbit_training(model, use_gradient_checkpointing=training_args.gradient_checkpointing)# 应用LoRA配置到模型model = get_peft_model(model, lora_config)# 打印LoRA可训练参数model.print_trainable_parameters()# 如果启用梯度检查点,启用模型的输入梯度if training_args.gradient_checkpointing:model.enable_input_require_grads()# 加载数据模块data_module = make_supervised_data_module(tokenizer=tokenizer, data_args=data_args, max_len=training_args.model_max_length)# 创建训练器trainer = Trainer(model=model, tokenizer=tokenizer, args=training_args, **data_module)# 开始训练trainer.train()# 保存训练状态trainer.save_state()# 安全地保存模型到硬盘safe_save_model_for_hf_trainer(trainer=trainer, output_dir=training_args.output_dir, bias=lora_args.lora_bias)if __name__ == "__main__":train()