目录
一、trainer保存模型的问题
1.1、在trainer训练时,怎么控制模型保存的数量的同时,还可以保存最优的模型参数呢?
1.2、使用trainer与deepspeed ZeRO3时,怎么保存模型为huggingface格式呢?
二、 huggingface PreTrained Model中的from_pretrained()
三、 huggingface Trainer 中的train()
四、 huggingface Trainer 中的 _load_best_model() 函数
五、 huggingface Trainer 中的_save_checkpoint() 函数
六、 huggingface Trainer 中的save_model () 函数
七、 huggingface Trainer 中的_save () 函数
八、 huggingface Trainer 中的compute_loss () 函数
九、 huggingface Trainer 中的训练过程函数 _inner_training_loop()
本文主要简单介绍 huggingface 中常见函数的使用
torch版本:2.1
Transformers 版本:4.30
一、trainer保存模型的问题
1.1、在trainer训练时,怎么控制模型保存的数量的同时,还可以保存最优的模型参数呢?
在使用Hugging Face的
Trainer
进行模型训练时,通过设置TrainingArguments
中的一些参数,你可以控制保存模型的数量,并确保最优模型不会被删除。主要涉及以下几个参数:
save_strategy
:定义了模型保存的策略,可以是"no"(不保存模型),"epoch"(每个epoch结束时保存),或"steps"(每指定步数后保存)。save_total_limit
:指定同时保存的模型检查点的最大数量。如果设置了这个参数,当保存新的检查点时,超出这个数量限制的最旧的检查点将被删除。evaluation_strategy
:定义了评估的策略,与save_strategy
类似,可以是"no","epoch"或"steps"。load_best_model_at_end
:如果设置为True
,在训练结束时将加载评价指标最好的模型(需要同时设置evaluation_strategy
)。metric_for_best_model
:指定哪个评价指标用于评估最佳模型。这个设置要求evaluation_strategy
不为"no"。这里有一个设置示例:
from transformers import Trainer, TrainingArgumentstraining_args = TrainingArguments(output_dir="./results",per_device_train_batch_size=8,num_train_epochs=3,save_strategy="epoch",save_total_limit=2, # 同时保存的最大检查点数量evaluation_strategy="epoch",eval_steps=100, # 如果不是每个epoch评估,可以设置评估步数load_best_model_at_end=True,metric_for_best_model="accuracy", # 假设使用的评估指标是准确率 )trainer = Trainer(model=model,args=training_args,train_dataset=train_dataset,eval_dataset=eval_dataset,compute_metrics=compute_metrics, # 需要定义一个函数来计算指标 )
在这个设置中,
Trainer
将在每个epoch结束时保存模型,并保持最多两个最新的检查点。同时,它会在训练结束时加载表现最优的模型检查点,基于所指定的评价指标(在这个例子中是accuracy
)。这样,即使save_total_limit
导致了一些检查点的删除,最优模型依然会被保留,而且它在训练结束时会被重新加载(用的是下文中的_load_best_model()函数,具体请看下文),只要在trainer执行结束后再保存一次模型即可。请确保
compute_metrics
函数正确定义并返回一个包含metric_for_best_model
指定指标的字典。记得根据具体的模型和任务调整这些参数。这些设置将帮助控制检查点的数量,同时确保最优模型得到保存。
1.2、使用trainer与deepspeed ZeRO3时,怎么保存模型为huggingface格式呢?
使用Hugging Face的
Trainer
调用DeepSpeed特别是ZeRO阶段3(Zero Redundancy Optimizer)时,需要注意的是,DeepSpeed插件管理其参数的分布式状态,这会影响直接保存整个预训练模型的方式。以下是如何在这样的设置下,保存为Hugging Face Transformer库兼容格式的模型的步骤:当使用
Trainer
和DeepSpeed ZeRO阶段3时,我们可以需要设计一个自定义的函数,可以通过以下思路去保存模型,确保它与Hugging Face Transformers兼容。
- 确保所有参数都在一个设备上:在保存之前,可能需要确保所有分片的参数都聚集到一个设备(通常是CPU)上,以确保模型完整。
- 去除模型的“外包装器”,提取最里面的
PreTrainedModel
类- 使用
PreTrainedModel
类中通用的save_pretrained
方法:与通常直接使用torch.save
不同,为了确保模型与Hugging Face Transformers兼容,应使用PreTrainedModel
类的save_pretrained
方法。但是,在使用DeepSpeed ZeRO 3时,由于模型参数被优化器分片管理,直接调用此方法在标准情况下可能会遇到问题。
二、 huggingface PreTrained Model中的from_pretrained()
def from_pretrained(cls, pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], *model_args, **kwargs):# 模型参数,也就是模型文件目录下的config文件config = kwargs.pop("config", None) state_dict = kwargs.pop("state_dict", None)cache_dir = kwargs.pop("cache_dir", None)from_tf = kwargs.pop("from_tf", False)from_flax = kwargs.pop("from_flax", False)ignore_mismatched_sizes = kwargs.pop("ignore_mismatched_sizes", False)force_download = kwargs.pop("force_download", False)resume_download = kwargs.pop("resume_download", False)proxies = kwargs.pop("proxies", None)output_loading_info = kwargs.pop("output_loading_info", False)local_files_only = kwargs.pop("local_files_only", False)use_auth_token = kwargs.pop("use_auth_token", None)revision = kwargs.pop("revision", None)# 是否接受远端自定义模块trust_remote_code = kwargs.pop("trust_remote_code", None) _ = kwargs.pop("mirror", None)from_pipeline = kwargs.pop("_from_pipeline", None)from_auto_class = kwargs.pop("_from_auto", False)_fast_init = kwargs.pop("_fast_init", True)# 模型dtypetorch_dtype = kwargs.pop("torch_dtype", None) # 参数先加入cpu中,节省显存, 与 DeepSpeed Zero-3 冲突,选其一low_cpu_mem_usage = kwargs.pop("low_cpu_mem_usage", None)# device_map, 一般为 auto,节省显存, 与 DeepSpeed Zero-3 冲突,选其一device_map = kwargs.pop("device_map", None)max_memory = kwargs.pop("max_memory", None)offload_folder = kwargs.pop("offload_folder", None)offload_state_dict = kwargs.pop("offload_state_dict", False)load_in_8bit = kwargs.pop("load_in_8bit", False)load_in_4bit = kwargs.pop("load_in_4bit", False)# 量化的configquantization_config = kwargs.pop("quantization_config", None) subfolder = kwargs.pop("subfolder", "")commit_hash = kwargs.pop("_commit_hash", None)variant = kwargs.pop("variant", None)use_safetensors = kwargs.pop("use_safetensors", None if is_safetensors_available() else False)
三、 huggingface Trainer 中的train()
这个
train
函数是 Hugging Face Transformers 库中Trainer
类的主要训练入口点。它负责设置训练环境、加载模型检查点、配置超参数搜索、寻找可执行的批次大小,并最终执行内部训练循环。以下是对该函数的详细解释:
- 如果
resume_from_checkpoint
参数是False
,则将其设置为None
。- 启动内存跟踪器,用于监控训练过程中的内存使用情况。
- 获取训练参数
args
。- 设置训练状态标志
self.is_in_train
为True
。- 如果启用了全精度 FP16 或 BF16 评估且不进行训练,则将模型移动到指定的设备上。
- 处理旧版本的 "model_path" 参数,并发出警告,建议使用
resume_from_checkpoint
参数。- 检查是否有未预期的关键字参数,如果有则引发
TypeError
。- 设置超参数搜索试验 (
optuna.Trial
或Dict[str, Any]
)。- 设置训练批次大小
self._train_batch_size
。- 重新初始化模型(如果给定
model_init
的情况下,一般不给定):
- 如果设置了
self.model_init
,则首先根据self.args.seed
设置随机种子。- 调用
self.call_model_init(trial)
实例化模型。- 将优化器和学习率调度器重置为
None
。- 判断并获得模型检查点路径:
- 如果未找到有效的检查点,则引发
ValueError
。- 如果
resume_from_checkpoint
是True
,则从args.output_dir
中加载最后一个检查点。- 加载模型检查点:
- 如果
resume_from_checkpoint
不为None
,且未启用 SageMaker 模型并行或 DeepSpeed,则加载检查点。也就是说train函数默认的断点续传不支持
SageMaker 模型或 DeepSpeed模型- 如果模型已被重新初始化,则将其移动到指定的设备上,并更新
self.model_wrapped
。- 使用
find_executable_batch_size
函数寻找可执行的批次大小,该函数会根据给定的批次大小和是否启用了自动寻找批次大小功能进行计算。- 执行内部训练循环
self._inner_training_loop
,传入训练参数、检查点路径、超参数搜索试验和评估时忽略的键列表等参数。总的来说,这个函数负责完成训练前的所有准备工作,包括加载检查点、设置超参数搜索、配置训练环境等,最终调用内部训练循环执行实际的训练过程。它提供了一些可配置选项,如从检查点恢复训练、超参数搜索和忽略评估时的某些键等,以满足不同的训练需求。
def train(self,resume_from_checkpoint: Optional[Union[str, bool]] = None,trial: Union["optuna.Trial", Dict[str, Any]] = None,ignore_keys_for_eval: Optional[List[str]] = None,**kwargs,
):"""主要的训练入口点。参数:resume_from_checkpoint (`str` 或 `bool`, *可选*):如果是 `str`,则表示之前由 [`Trainer`] 实例保存的本地检查点路径。如果是 `bool` 且等于 `True`,则加载之前由 [`Trainer`] 实例在 *args.output_dir* 中保存的最后一个检查点。如果设置了该参数,训练将从加载的模型/优化器/调度器状态继续进行。trial (`optuna.Trial` 或 `Dict[str, Any]`, *可选*):超参数搜索的试验运行或超参数字典。ignore_keys_for_eval (`List[str]`, *可选*):在训练期间进行评估时收集预测时应忽略的模型输出字典中的键列表。kwargs:用于隐藏已弃用参数的其他关键字参数。"""# 1. 如果 resume_from_checkpoint 是 False,则将其设置为 Noneif resume_from_checkpoint is False:resume_from_checkpoint = None# 2. 内存指标 - 必须尽早设置self._memory_tracker.start()# 3. 获取训练参数args = self.args# 4. 设置训练状态标志self.is_in_train = True# 5. 如果启用了 FP16 或 BF16 全精度评估且不进行训练,则将模型移动到指定设备上if (args.fp16_full_eval or args.bf16_full_eval) and not args.do_train:self._move_model_to_device(self.model, args.device)# 6. 处理旧版本的 "model_path" 参数if "model_path" in kwargs:resume_from_checkpoint = kwargs.pop("model_path")warnings.warn("`model_path` 已弃用,将在未来版本中被移除。请改用 `resume_from_checkpoint`。",FutureWarning,)# 7. 检查是否有未预期的关键字参数if len(kwargs) > 0:raise TypeError(f"train() 收到意外的关键字参数: {', '.join(list(kwargs.keys()))}。")# 8. 设置超参数搜索self._hp_search_setup(trial)# 9. 设置训练批次大小self._train_batch_size = self.args.train_batch_size# 10. 重新初始化模型model_reloaded = Falseif self.model_init is not None:# 10.1 在使用 model_init 实例化模型之前,设置随机种子enable_full_determinism(self.args.seed) if self.args.full_determinism else set_seed(self.args.seed)# 10.2 调用 model_init 函数实例化模型self.model = self.call_model_init(trial)model_reloaded = True# 10.3 重新初始化优化器和调度器self.optimizer, self.lr_scheduler = None, None# 11. 加载模型检查点if isinstance(resume_from_checkpoint, bool) and resume_from_checkpoint:# 11.1 如果 resume_from_checkpoint 是 True,则从 args.output_dir 中加载最后一个检查点resume_from_checkpoint = get_last_checkpoint(args.output_dir)if resume_from_checkpoint is None:raise ValueError(f"在输出目录 ({args.output_dir}) 中未找到有效的检查点")# 12. 如果 resume_from_checkpoint 不为 None,且未启用 SageMaker 模型并行或 DeepSpeed,则加载检查点if resume_from_checkpoint is not None and not is_sagemaker_mp_enabled() and not self.is_deepspeed_enabled:self._load_from_checkpoint(resume_from_checkpoint)# 13. 如果模型已被重新初始化,则将其移动到指定设备上并更新 self.model_wrappedif model_reloaded:if self.place_model_on_device:self._move_model_to_device(self.model, args.device)self.model_wrapped = self.model# 14. 寻找可执行的批次大小inner_training_loop = find_executable_batch_size(self._inner_training_loop, self._train_batch_size, args.auto_find_batch_size)# 15. 执行内部训练循环return inner_training_loop(args=args,resume_from_checkpoint=resume_from_checkpoint,trial=trial,ignore_keys_for_eval=ignore_keys_for_eval,)
四、 huggingface Trainer 中的 _load_best_model() 函数
这个
_load_best_model
函数是 Hugging Face Transformers 库中Trainer
类的一个内部方法,用于从指定的检查点路径加载最佳模型权重。以下是对该函数的详细解释:
- 记录日志,表示将从给定路径加载最佳模型及其得分。
- 构造不同权重文件的路径,包括普通权重文件、安全张量权重文件和适配器权重文件
- 获取要加载的模型实例,如果启用了 SageMaker 模型并行,则使用
self.model_wrapped
,否则使用self.model
。- 检查上述权重文件是否存在。
- 如果启用了 DeepSpeed,则使用
deepspeed_load_checkpoint
函数加载检查点。- 如果启用了 SageMaker 模型并行:
- 如果存在 "user_content.pt" 文件,则使用新的 SMP API 从检查点恢复模型权重。
- 如果不存在 "user_content.pt" 文件,则使用旧的 SMP API 加载检查点。首先加载状态字典,然后将其加载到模型中。
- 如果启用了 FSDP,则使用 FSDP 插件加载模型。
- 如果使用了 PEFT 和 LoRA:
- 如果适配器权重文件存在,则加载适配器权重。
- 如果适配器权重文件不存在,则发出警告。
- 如果不使用 PEFT 和 LoRA,则加载普通模型权重:
- 首先在 CPU 上加载模型状态字典,以避免 OOM 错误。
- 然后将状态字典加载到模型中,同时处理 FSDP 的一个 bug。
- 如果未启用 SageMaker 模型并行且已成功加载,则发出加载后的警告。
- 如果存在分片检查点索引文件,则加载分片检查点。
- 如果无法找到最佳模型权重文件,则发出警告,建议在分布式训练时启用
--save_on_each_node
选项。总的来说,这个函数的主要作用是从指定的检查点路径加载最佳模型权重,支持不同的环境和配置,如 DeepSpeed、SageMaker 模型并行、FSDP、PEFT 和 LoRA 等。它通过检查不同的权重文件路径,并根据具体情况选择合适的加载方式来完成模型权重的恢复。这个功能在模型训练和评估中非常有用,可以加载先前保存的最佳模型,从而继续训练或进行推理。
def _load_best_model(self):# 1. 记录日志,表示将从给定路径加载最佳模型logger.info(f"Loading best model from {self.state.best_model_checkpoint} (score: {self.state.best_metric}).")# 2. 构造不同权重文件的路径best_model_path = os.path.join(self.state.best_model_checkpoint, WEIGHTS_NAME)best_safe_model_path = os.path.join(self.state.best_model_checkpoint, SAFE_WEIGHTS_NAME)best_adapter_model_path = os.path.join(self.state.best_model_checkpoint, ADAPTER_WEIGHTS_NAME)best_safe_adapter_model_path = os.path.join(self.state.best_model_checkpoint, ADAPTER_SAFE_WEIGHTS_NAME)# 3. 获取要加载的模型实例model = self.model_wrapped if is_sagemaker_mp_enabled() else self.model# 4. 检查权重文件是否存在if (os.path.exists(best_model_path)or os.path.exists(best_safe_model_path)or os.path.exists(best_adapter_model_path)or os.path.exists(best_safe_adapter_model_path)):# 5. 如果启用了 DeepSpeed,则使用 deepspeed_load_checkpoint 加载检查点if self.is_deepspeed_enabled:deepspeed_load_checkpoint(self.model_wrapped, self.state.best_model_checkpoint)else:has_been_loaded = True# 6. 如果启用了 SageMaker 模型并行if is_sagemaker_mp_enabled():# 6.1 如果存在 "user_content.pt" 文件,则使用新的 SMP API 加载检查点if os.path.isfile(os.path.join(self.state.best_model_checkpoint, "user_content.pt")):smp.resume_from_checkpoint(path=self.state.best_model_checkpoint,tag=WEIGHTS_NAME,partial=False,load_optimizer=False,)# 6.2 如果不存在 "user_content.pt" 文件,则使用旧的 SMP API 加载检查点else:if self.args.save_safetensors and os.path.isfile(best_safe_model_path):state_dict = safetensors.torch.load_file(best_safe_model_path, device="cpu")else:state_dict = torch.load(best_model_path, map_location="cpu")state_dict["_smp_is_partial"] = Falseload_result = model.load_state_dict(state_dict, strict=True)# 7. 如果启用了 FSDP,则使用 FSDP 插件加载模型elif self.is_fsdp_enabled:self.accelerator.state.fsdp_plugin.load_model(self.accelerator, model, self.state.best_model_checkpoint)# 8. 如果使用了 PEFT 和 LoRAelse:if is_peft_available() and isinstance(model, PeftModel):# 8.1 如果模型使用了 PEFT 和 LoRA,并且适配器权重文件存在,则加载适配器权重if hasattr(model, "active_adapter") and hasattr(model, "load_adapter"):if os.path.exists(best_adapter_model_path) or os.path.exists(best_safe_adapter_model_path):model.load_adapter(self.state.best_model_checkpoint, model.active_adapter)# Load_adapter has no return value present, modify it when appropriate.from torch.nn.modules.module import _IncompatibleKeysload_result = _IncompatibleKeys([], [])# 8.2 如果适配器权重文件不存在,则发出警告else:logger.warning("The intermediate checkpoints of PEFT may not be saved correctly, "f"using `TrainerCallback` to save {ADAPTER_WEIGHTS_NAME} in corresponding folders, ""here are some examples https://github.com/huggingface/peft/issues/96")has_been_loaded = Falseelse:logger.warning("Could not load adapter model, make sure to have `peft>=0.3.0` installed")has_been_loaded = False# 9. 如果不使用 PEFT 和 LoRA,则加载普通模型权重else:# 9.1 在 CPU 上加载模型状态字典,以避免 OOM 错误if self.args.save_safetensors and os.path.isfile(best_safe_model_path):state_dict = safetensors.torch.load_file(best_safe_model_path, device="cpu")else:state_dict = torch.load(best_model_path, map_location="cpu")# 9.2 如果模型在 GPU 上,仍然可以正常加载# 为了解决 FSDP 的一个 bug (https://github.com/pytorch/pytorch/issues/82963)load_result = model.load_state_dict(state_dict, False)# 10. 如果未启用 SageMaker 模型并行且已成功加载,则发出加载后的警告if not is_sagemaker_mp_enabled() and has_been_loaded:self._issue_warnings_after_load(load_result)# 11. 如果存在分片检查点索引文件,则加载分片检查点elif os.path.exists(os.path.join(self.state.best_model_checkpoint, WEIGHTS_INDEX_NAME)):load_result = load_sharded_checkpoint(model, self.state.best_model_checkpoint, strict=is_sagemaker_mp_enabled())if not is_sagemaker_mp_enabled():self._issue_warnings_after_load(load_result)# 12. 如果无法找到最佳模型权重文件,则发出警告else:logger.warning(f"Could not locate the best model at {best_model_path}, if you are running a distributed training ""on multiple nodes, you should activate `--save_on_each_node`.")
五、 huggingface Trainer 中的_save_checkpoint() 函数
总的来说,
_save_checkpoint
函数的作用是保存模型检查点、优化器状态、学习率调度器状态、随机数生成器状态等,以便在需要时可以恢复训练或进行推理。它支持多种训练模式和配置,如 DeepSpeed、FSDP、SageMaker 模型并行、TPU 等,并根据具体情况采取不同的保存策略。该函数的主要步骤包括:
- 构造检查点文件夹名称。
- 如果需要,存储浮点运算数 (FLOPs)。
- 获取输出目录。
- 保存模型权重。
- 如果启用相关功能,则保存 DeepSpeed 检查点、FSDP 优化器状态等。当deepspeed使用self.model_wrapped.save_checkpoint 保存state3检查点时,需要设置stage3_gather_16bit_weights_on_model_save = True,否则模型不一定会被保存,另外,该函数还保存了deepspeed的优化器状态等组件。
- 保存优化器状态、学习率调度器状态,并根据不同的训练环境 (TPU、SageMaker 模型并行等) 采用不同的保存方式。
- 如果使用梯度缩放,保存梯度缩放器状态。
- 根据指定的指标更新最佳指标和最佳模型检查点路径。
- 保存 Trainer 状态。
- 保存各种随机数生成器 (RNG) 状态,包括 Python、NumPy、CPU、CUDA 和 XLA (如果使用 TPU)。
- 如果启用,将当前检查点推送到 Hugging Face Hub。
- 可能删除一些旧的检查点,以控制磁盘使用量。
def _save_checkpoint(self, model, trial, metrics=None):# 1. self.model 是我们要保存的模型的引用,# 但对于 FullyShardedDDP 情况有一个例外。# FullyShardedDDP 是 PyTorch 中的一种并行化训练策略,它将模型权重分散到多个 GPU 上,# 因此在这种情况下,self.model 可能不是完整的模型引用。# assert unwrap_model(model) is self.model, "internal model should be a reference to self.model"# 2. 构造检查点文件夹名称。# 检查点文件夹名称由前缀 "checkpoint" 和当前的全局步数组成,例如 "checkpoint-1234"。# 全局步数 (self.state.global_step) 表示模型已经训练了多少步,# 它是一个计数器,用于跟踪模型训练的进度。checkpoint_folder = f"{PREFIX_CHECKPOINT_DIR}-{self.state.global_step}"# 3. 如果没有超参数搜索后端和试验,则存储浮点运算数。# 浮点运算数 (FLOPs) 是一个指标,用于衡量模型运行时所需的计算量。# 存储 FLOPs 可以帮助我们了解模型的计算复杂度,并用于优化和比较不同模型的性能。# 这个操作只在没有进行超参数搜索的情况下执行,因为超参数搜索通常会涉及多个模型配置。if self.hp_search_backend is None and trial is None:self.store_flos()# 4. 获取输出目录。# 输出目录是保存检查点文件的位置。如果指定了 trial (超参数搜索的一次试验),# 则输出目录会包含试验的相关信息;否则,输出目录就是默认的输出目录。run_dir = self._get_output_dir(trial=trial)output_dir = os.path.join(run_dir, checkpoint_folder)# 5. 保存模型。# 调用 self.save_model 方法将模型权重保存到指定的输出目录。# _internal_call=True 表示这是一个内部调用,而不是用户主动调用保存模型。self.save_model(output_dir, _internal_call=True)# 6. 如果启用了 DeepSpeed,则保存 DeepSpeed 检查点。# DeepSpeed 是一个用于加速大型模型训练的库,它提供了自己的检查点保存机制。# 在 zero3 模式下,模型文件本身不一定会被保存,除非启用了一个特定的配置选项 `stage3_gather_16bit_weights_on_model_save`。# 这是因为在 zero3 模式下,模型权重被分割成多个部分,需要通过特殊的方式重新组装。if self.is_deepspeed_enabled:self.model_wrapped.save_checkpoint(output_dir)# 7. 保存优化器和学习率调度器状态。# 如果启用了 ShardedDDPOption.SIMPLE,则先调用 optimizer.consolidate_state_dict() 方法来合并优化器状态字典。# 这是因为在某些情况下,优化器状态字典可能被分散在多个GPU上,需要先合并才能正确保存。if self.sharded_ddp == ShardedDDPOption.SIMPLE:self.optimizer.consolidate_state_dict()# 8. 如果启用了 FSDP (Fully Sharded Data Parallelism),则使用 FSDP 接口获取完整的优化器状态字典。# FSDP 是 PyTorch 中的另一种并行化训练策略,它将模型权重和优化器状态分散到多个GPU上。# 因此,需要使用 FSDP 提供的特殊接口来获取完整的优化器状态字典。# 注意,full_optim_state_dict 在 PyTorch 2.2 之后将被弃用。if self.fsdp:full_osd = self.model.__class__.full_optim_state_dict(self.model, self.optimizer)# 9. 如果使用 TPU,则通过 xm 模块保存优化器和调度器状态。# TPU (Tensor Processing Unit) 是谷歌开发的专用于机器学习的加速器。# 在 TPU 上训练时,需要使用 xm 模块提供的特殊接口来保存和加载模型、优化器和调度器状态。# xm.rendezvous 确保所有 TPU 核心都准备好保存状态。# xm.save 将状态保存到指定的文件中。# warnings.catch_warnings 用于捕获和处理可能出现的警告。if is_torch_tpu_available():xm.rendezvous("saving_optimizer_states")xm.save(self.optimizer.state_dict(), os.path.join(output_dir, OPTIMIZER_NAME))with warnings.catch_warnings(record=True) as caught_warnings:xm.save(self.lr_scheduler.state_dict(), os.path.join(output_dir, SCHEDULER_NAME))reissue_pt_warnings(caught_warnings)# 10. 如果启用了 SageMaker 模型并行,则保存优化器和调度器状态。# SageMaker 是亚马逊提供的机器学习服务,它支持模型并行训练。# 在 SageMaker 模型并行中,优化器状态可能被分散在多个节点上,因此需要使用特殊的方式保存和加载。# 首先,获取本地优化器状态字典 (不包括其他节点的状态)。# 然后,使用 smp.barrier() 确保所有节点都准备好进行保存操作。# 如果当前节点是主节点或配置为要分片优化器状态,则使用 smp.save 保存优化器状态。# partial=True 表示只保存部分状态,v3 选项指示是否使用新的分片格式。# 如果需要保存,还会保存学习率调度器和梯度缩放器 (如果使用了梯度缩放) 的状态。elif is_sagemaker_mp_enabled():opt_state_dict = self.optimizer.local_state_dict(gather_if_shard=False)smp.barrier()if smp.rdp_rank() == 0 or smp.state.cfg.shard_optimizer_state:smp.save(opt_state_dict,os.path.join(output_dir, OPTIMIZER_NAME),partial=True,v3=smp.state.cfg.shard_optimizer_state,)if self.args.should_save:with warnings.catch_warnings(record=True) as caught_warnings:torch.save(self.lr_scheduler.state_dict(), os.path.join(output_dir, SCHEDULER_NAME))reissue_pt_warnings(caught_warnings)if self.do_grad_scaling:torch.save(self.scaler.state_dict(), os.path.join(output_dir, SCALER_NAME))# 11. 如果应该保存且未启用 DeepSpeed,则保存优化器、调度器和梯度缩放器 (如果需要) 的状态。# 对于 FSDP,使用之前获取的完整优化器状态字典。# 对于其他情况,直接保存优化器的状态字典。# 同时,还会保存学习率调度器的状态字典,并捕获和处理可能出现的警告。# 如果使用了梯度缩放,还会保存梯度缩放器的状态字典。elif self.args.should_save and not self.is_deepspeed_enabled:if self.fsdp:torch.save(full_osd, os.path.join(output_dir, OPTIMIZER_NAME))else:torch.save(self.optimizer.state_dict(), os.path.join(output_dir, OPTIMIZER_NAME))with warnings.catch_warnings(record=True) as caught_warnings:torch.save(self.lr_scheduler.state_dict(), os.path.join(output_dir, SCHEDULER_NAME))reissue_pt_warnings(caught_warnings)if self.do_grad_scaling:torch.save(self.scaler.state_dict(), os.path.join(output_dir, SCALER_NAME))# 12. 确定新的最佳指标和最佳模型检查点。# 如果提供了指标 (metrics) 并且指定了用于确定最佳模型的指标名称 (self.args.metric_for_best_model),# 则根据指定的指标值和比较操作符 (大于或小于) 来更新最佳指标和最佳模型检查点路径。# 比较操作符由 self.args.greater_is_better 决定,如果为 True,则使用 np.greater 比较,否则使用 np.less。# 如果当前指标值比之前的最佳指标更优,则更新最佳指标和最佳模型检查点路径。if metrics is not None and self.args.metric_for_best_model is not None:metric_to_check = self.args.metric_for_best_modelif not metric_to_check.startswith("eval_"):metric_to_check = f"eval_{metric_to_check}"metric_value = metrics[metric_to_check]operator = np.greater if self.args.greater_is_better else np.lessif (self.state.best_metric is Noneor self.state.best_model_checkpoint is Noneor operator(metric_value, self.state.best_metric)):self.state.best_metric = metric_valueself.state.best_model_checkpoint = output_dir# 13. 保存 Trainer 状态。# 如果需要保存,则将 Trainer 的当前状态保存到 JSON 文件中。# Trainer 状态包括一些配置信息、计数器和其他元数据,用于在恢复训练时重建 Trainer 的状态。if self.args.should_save:self.state.save_to_json(os.path.join(output_dir, TRAINER_STATE_NAME))# 14. 在非分布式训练中保存随机数生成器 (RNG) 状态。# RNG 状态包括 Python、NumPy、CPU 和 CUDA (如果可用) 的随机数生成器状态。# 保存 RNG 状态可以确保在恢复训练时,模型可以从相同的随机种子继续训练,保证结果的可重复性。# 如果使用 TPU,还会保存 XLA 的随机数生成器状态。rng_states = {"python": random.getstate(),"numpy": np.random.get_state(),"cpu": torch.random.get_rng_state(),}if torch.cuda.is_available():if self.args.parallel_mode == ParallelMode.DISTRIBUTED:# 在分布式训练中,保存所有 GPU 的 CUDA RNG 状态,以确保随机性一致。rng_states["cuda"] = torch.cuda.random.get_rng_state_all()else:# 在非分布式训练中,只需保存单个 GPU 的 CUDA RNG 状态。rng_states["cuda"] = torch.cuda.random.get_rng_state()if is_torch_tpu_available():rng_states["xla"] = xm.get_rng_state()# 15. 确保输出目录存在。# 如果输出目录不存在,则创建该目录。exist_ok=True 确保即使目录已经存在也不会报错。os.makedirs(output_dir, exist_ok=True)# 16. 保存 RNG 状态。# 如果是单机训练 (self.args.world_size <= 1),则将所有 RNG 状态保存到一个文件中。# 如果是分布式训练,则每个进程将 RNG 状态保存到单独的文件中,文件名包含进程索引。# 这样做是为了避免不同进程之间的竞争条件,确保每个进程可以安全地保存和加载自己的 RNG 状态。if self.args.world_size <= 1:torch.save(rng_states, os.path.join(output_dir, "rng_state.pth"))else:torch.save(rng_states, os.path.join(output_dir, f"rng_state_{self.args.process_index}.pth"))# 17. 如果启用了推送到 Hub,则从检查点推送。# Hub 是 Hugging Face 提供的模型托管和共享平台。# 如果设置了 self.args.push_to_hub,则调用 self._push_from_checkpoint 方法将当前检查点推送到 Hub。# 这样,用户就可以在 Hub 上共享和管理模型及其检查点。if self.args.push_to_hub:self._push_from_checkpoint(output_dir)# 18. 可能删除一些旧的检查点。# 如果需要保存,则调用 self._rotate_checkpoints 方法删除一些旧的检查点。# use_mtime=True 表示使用文件的修改时间而不是创建时间来排序检查点文件。# output_dir=run_dir 表示在整个运行目录中搜索和删除旧的检查点,而不是只在当前检查点文件夹中搜索。# 这个操作可以帮助控制磁盘使用量,防止检查点文件无限制地积累。if self.args.should_save:self._rotate_checkpoints(use_mtime=True, output_dir=run_dir)
六、 huggingface Trainer 中的save_model () 函数
该函数用于保存模型,以便可以使用
from_pretrained()
重新加载它。它支持多种训练环境和配置,包括 TPU、SageMaker 模型并行、ShardedDDP、FSDP 和 DeepSpeed。根据不同的配置,该函数采取不同的保存策略:
- 如果使用 TPU,则调用
_save_tpu
方法保存模型。- 如果启用了 SageMaker 模型并行,则在所有进程中获取模型状态字典,然后在主进程中保存该状态字典。如果使用较新版本的 SageMaker,还会创建一个空文件
user_content.pt
作为标记。- 如果启用了 ShardedDDP 的 ZERO_DP_2 或 ZERO_DP_3 选项、使用了 FSDP 或启用了 FSDP,则根据具体情况使用不同的保存方法:
- 如果启用了 FSDP,则使用
accelerator.state.fsdp_plugin.save_model
保存 FSDP 模型。- 如果未启用 FSDP,则获取模型状态字典,并在主进程中保存该状态字典。
- 如果启用了 DeepSpeed,则执行以下操作:
- 如果zero 0 1 2 阶段需要保存,则调用
_save
方法保存模型(文章下面将介绍 _save() 函数)。- 如果启用了 DeepSpeed 的 zero3 模式,则执行以下步骤:
a. 删除输出目录中的WEIGHTS_NAME
文件,因为在 zero3 模式下,该文件是无效的。
b. 如果 DeepSpeed 配置中的stage3_gather_16bit_weights_on_model_save
为 True,则在所有进程中调用model_wrapped.save_16bit_model
保存 16 位精度模型权重。如果保存失败,则打印警告信息,并调用model_wrapped.save_checkpoint
保存完整的检查点。- 如果需要保存且未使用上述任何特殊情况,则在主进程中调用
_save
方法保存模型。- 如果由用户调用
save_model
方法并设置了push_to_hub
参数,则将模型推送到 Hugging Face Hub,并使用提交消息 "Model save"。
def save_model(self, output_dir: Optional[str] = None, _internal_call: bool = False):"""# 1. 这个函数用于保存模型,以便可以使用 `from_pretrained()` 重新加载它。# 2. 只在主进程中保存模型。"""# 3. 如果没有提供输出目录,则使用 self.args.output_dir 作为输出目录。if output_dir is None:output_dir = self.args.output_dir# 4. 如果使用 TPU,则调用 _save_tpu 方法保存模型。if is_torch_tpu_available():self._save_tpu(output_dir)# 5. 如果启用了 SageMaker 模型并行,则执行以下步骤:# 1. 在所有进程中调用 model_wrapped.state_dict() 获取模型状态字典。# 2. 在主进程中创建输出目录。# 3. 如果需要保存,则调用 _save 方法保存模型状态字典。# 4. 如果使用的是较新版本的 SageMaker (>= 1.10),则在输出目录中创建一个空文件 'user_content.pt',# 表示使用了新版本的 SageMaker 保存模型状态字典。elif is_sagemaker_mp_enabled():os.makedirs(output_dir, exist_ok=True)state_dict = self.model_wrapped.state_dict()if self.args.should_save:self._save(output_dir, state_dict=state_dict)if IS_SAGEMAKER_MP_POST_1_10:Path(os.path.join(output_dir, "user_content.pt")).touch()# 6. 如果启用了 ShardedDDP 的 ZERO_DP_2 或 ZERO_DP_3 选项、使用了 FSDP 或启用了 FSDP,则执行以下步骤:# 1. 如果启用了 FSDP,则在主进程中:# a. 创建输出目录。# b. 使用 accelerator.state.fsdp_plugin.save_model 保存 FSDP 模型。# 2. 如果未启用 FSDP,则获取模型状态字典,并在主进程中调用 _save 方法保存模型状态字典。elif (ShardedDDPOption.ZERO_DP_2 in self.args.sharded_ddpor ShardedDDPOption.ZERO_DP_3 in self.args.sharded_ddpor self.fsdp is not Noneor self.is_fsdp_enabled):if self.is_fsdp_enabled:os.makedirs(output_dir, exist_ok=True)self.accelerator.state.fsdp_plugin.save_model(self.accelerator, self.model, output_dir)else:state_dict = self.model.state_dict()if self.args.should_save:self._save(output_dir, state_dict=state_dict)# 7. 如果启用了 DeepSpeed,则执行以下步骤:# 1. 如果需要保存,则调用 _save 方法保存模型。# 2. 如果启用了 DeepSpeed 的 zero3 模式,则执行以下操作:# a. 删除输出目录中的 WEIGHTS_NAME 文件,因为在 zero3 模式下,该文件是无效的。# b. 如果 DeepSpeed 配置中的 stage3_gather_16bit_weights_on_model_save 为 True,# 则在所有进程中调用 model_wrapped.save_16bit_model 保存 16 位精度模型权重。# 如果保存失败,则打印警告信息,并调用 model_wrapped.save_checkpoint 保存完整的检查点。elif self.is_deepspeed_enabled:if self.args.should_save:# deepspeed ZeRO 0 1 2 没有对模型进行切片,所以可以使用 save_pretrained 相关的模型保存方法self._save(output_dir)# deepspeed ZeRO 3 则使用 self.model_wrapped.save_checkpoint 进行保存if is_deepspeed_zero3_enabled():if self.args.should_save:file = os.path.join(output_dir, WEIGHTS_NAME)if os.path.isfile(file):os.remove(file)if not self.model_wrapped.save_16bit_model(output_dir, WEIGHTS_NAME):logger.warning("deepspeed.save_16bit_model didn't save the model, since"" stage3_gather_16bit_weights_on_model_save=false. Saving the full checkpoint instead, use"" zero_to_fp32.py to recover weights")self.model_wrapped.save_checkpoint(output_dir)# 8. 如果需要保存且未使用上述任何特殊情况,则在主进程中调用 _save 方法保存模型。elif self.args.should_save:self._save(output_dir)# 9. 如果由用户调用 save_model 方法并设置了 push_to_hub 参数,则将模型推送到 Hugging Face Hub,并使用提交消息 "Model save"。if self.args.push_to_hub and not _internal_call:self.push_to_hub(commit_message="Model save")
七、 huggingface Trainer 中的_save () 函数
该函数用于保存模型检查点、tokenizer 和训练参数。它执行以下主要步骤:
- 确定输出目录,如果未提供则使用
self.args.output_dir
。- 创建输出目录,如果该目录已经存在则不会报错。
- 记录正在保存模型检查点的信息。
- 获取支持的模型类型列表,包括
PreTrainedModel
和可选的PeftModel
。PeftModel
是一种用于微调大型预训练语言模型的技术,它通过引入新的可训练参数来修改模型权重,从而显著降低所需的参数数量。- 如果模型不属于支持的类型,则执行以下操作:
- 获取模型的状态字典 (如果未提供)。
- 如果模型的内部模型属于支持的类型,则调用内部模型的
save_pretrained
方法保存模型权重,并指定是否使用安全的序列化格式 (SafeTensors)。- 如果内部模型也不属于支持的类型,则打印警告信息,并根据
safe_serialization
参数,使用safetensors.torch.save_file
或torch.save
保存模型状态字典。- 如果模型属于支持的类型,则直接调用模型的
save_pretrained
方法保存模型权重,并指定是否使用安全的序列化格式 (SafeTensors)。- 如果存在 tokenizer,则调用
tokenizer.save_pretrained
方法将其保存到输出目录。- 将训练参数保存到输出目录中的
training_args.bin
文件中。这是一个良好的实践,可以保存用于训练模型的参数配置,以便在将来加载模型时了解其训练细节。
def _save(self, output_dir: Optional[str] = None, state_dict=None):# 1. 如果执行这个函数,则表示当前是进程 0,因此不需要检查是否为进程 0。# 2. 如果没有提供输出目录,则使用 self.args.output_dir 作为输出目录。output_dir = output_dir if output_dir is not None else self.args.output_dir# 3. 创建输出目录,如果该目录已经存在则不会报错。os.makedirs(output_dir, exist_ok=True)# 4. 将正在保存模型检查点的信息记录到日志中。logger.info(f"Saving model checkpoint to {output_dir}")# 5. 获取支持的模型类型列表。# 如果 PEFT (Parameter-Efficient Fine-Tuning) 可用,则支持 PreTrainedModel 和 PeftModel。# 否则,只支持 PreTrainedModel。# PEFT 是一种用于微调大型预训练语言模型的技术,它通过引入新的可训练参数来修改模型权重,从而显著降低所需的参数数量。supported_classes = (PreTrainedModel,) if not is_peft_available() else (PreTrainedModel, PeftModel)# 6. 如果模型不属于支持的类型,则执行以下步骤:# 1. 如果未提供状态字典,则获取模型的状态字典。# 2. 如果模型的内部模型属于支持的类型,则调用内部模型的 save_pretrained 方法保存模型权重。# safe_serialization 参数指示是否使用安全的序列化格式 (SafeTensors)。# 3. 如果内部模型也不属于支持的类型,则打印警告信息,并根据 safe_serialization 参数,# 使用 safetensors.torch.save_file 或 torch.save 保存模型状态字典。if not isinstance(self.model, supported_classes):if state_dict is None:state_dict = self.model.state_dict()if isinstance(unwrap_model(self.model), supported_classes):unwrap_model(self.model).save_pretrained(output_dir, state_dict=state_dict, safe_serialization=self.args.save_safetensors)else:logger.info("Trainer.model is not a `PreTrainedModel`, only saving its state dict.")if self.args.save_safetensors:safetensors.torch.save_file(state_dict, os.path.join(output_dir, SAFE_WEIGHTS_NAME))else:torch.save(state_dict, os.path.join(output_dir, WEIGHTS_NAME))# 7. 如果模型属于支持的类型,则直接调用模型的 save_pretrained 方法保存模型权重。# safe_serialization 参数指示是否使用安全的序列化格式 (SafeTensors)。else:self.model.save_pretrained(output_dir, state_dict=state_dict, safe_serialization=self.args.save_safetensors)# 8. 如果存在 tokenizer,则调用 tokenizer.save_pretrained 方法将其保存到输出目录。if self.tokenizer is not None:self.tokenizer.save_pretrained(output_dir)# 9. 将训练参数保存到输出目录中的 training_args.bin 文件中。# 这是一个良好的实践,可以保存用于训练模型的参数配置,以便在将来加载模型时了解其训练细节。torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME))
八、 huggingface Trainer 中的compute_loss () 函数
该函数定义了 Trainer 计算损失的方式。它执行以下主要步骤:
- 如果启用了标签平滑 (
label_smoother
) 并且输入中包含 "labels" 键,则将其从输入中弹出并保存为labels
变量。标签平滑是一种正则化技术,可以缓解过拟合并提高模型的泛化能力。- 将输入传递给模型,获取模型输出。
- 如果
args.past_index >= 0
,则保存过去的状态 (_past
)。这是用于处理序列生成任务,其中需要保留之前的隐藏状态以生成下一个令牌。- 如果提供了标签 (
labels
),则计算损失:
- 如果模型属于因果语言模型 (Causal Language Model),则使用
self.label_smoother
计算损失,并将shift_labels
设置为 True,表示需要将标签向右移动一个位置。- 对于其他模型,直接使用
self.label_smoother
计算损失。- 如果没有提供标签,且模型输出是一个字典且不包含 "loss" 键,则引发
ValueError
异常。这是因为在这种情况下,我们无法从模型输出中获取损失值。- 如果模型输出是一个字典并包含 "loss" 键,则将
loss
设置为outputs["loss"]
。如果模型输出是一个元组或列表,则将loss
设置为第一个元素。这是因为某些模型可能会返回元组或列表作为输出,而不是ModelOutput
对象。- 如果
return_outputs
为 True,则返回一个元组(loss, outputs)
。否则,只返回loss
。
def compute_loss(self, model, inputs, return_outputs=False):"""# 1. 这个函数定义了 Trainer 计算损失的方式。默认情况下,所有模型都会在第一个输出元素中返回损失值。# 2. 可以通过子类化和重写这个函数来实现自定义的损失计算行为。"""# 3. 如果启用了标签平滑 (label smoothing),并且输入中包含 "labels" 键,则将其从输入中弹出并保存为 labels 变量。# 标签平滑是一种正则化技术,可以缓解过拟合并提高模型的泛化能力。if self.label_smoother is not None and "labels" in inputs:labels = inputs.pop("labels")else:labels = None# 4. 将输入传递给模型,获取模型输出。outputs = model(**inputs)# 5. 如果 args.past_index >= 0,则保存过去的状态 (past state)。# 这是用于处理序列生成任务,其中需要保留之前的隐藏状态以生成下一个令牌。# TODO: 这部分代码需要在将来进行修复和优化,使其更加清晰。if self.args.past_index >= 0:self._past = outputs[self.args.past_index]# 6. 如果提供了标签 (labels),则计算损失:# 1. 如果模型属于因果语言模型 (Causal Language Model),则使用 self.label_smoother 计算损失,# 并将 shift_labels 设置为 True,表示需要将标签向右移动一个位置。# 2. 对于其他模型,直接使用 self.label_smoother 计算损失。if labels is not None:if unwrap_model(model)._get_name() in MODEL_FOR_CAUSAL_LM_MAPPING_NAMES.values():loss = self.label_smoother(outputs, labels, shift_labels=True)else:loss = self.label_smoother(outputs, labels)# 7. 如果没有提供标签,且模型输出是一个字典且不包含 "loss" 键,则引发 ValueError 异常。# 这是因为在这种情况下,我们无法从模型输出中获取损失值。else:if isinstance(outputs, dict) and "loss" not in outputs:raise ValueError("The model did not return a loss from the inputs, only the following keys: "f"{','.join(outputs.keys())}. For reference, the inputs it received are {','.join(inputs.keys())}.")# 8. 如果模型输出是一个字典并包含 "loss" 键,则将 loss 设置为 outputs["loss"]。# 如果模型输出是一个元组或列表,则将 loss 设置为第一个元素。# 这是因为某些模型可能会返回元组或列表作为输出,而不是 ModelOutput 对象。loss = outputs["loss"] if isinstance(outputs, dict) else outputs[0]# 9. 如果 return_outputs 为 True,则返回一个元组 (loss, outputs)。否则,只返回 loss。return (loss, outputs) if return_outputs else loss
九、 huggingface Trainer 中的训练过程函数 _inner_training_loop()
该函数是 Transformers 库中用于内部训练循环的主要函数。它执行以下主要步骤:
- 初始化训练环境,包括设置批次大小、获取数据加载器、计算训练步数等。
- 根据配置,决定是否需要延迟创建优化器和学习率调度器。
- 如果启用了 DeepSpeed,则使用
deepspeed_init
初始化优化器和学习率调度器。- 创建优化器和学习率调度器 (如果尚未创建)。
- 包装模型,并根据配置使用
accelerator.prepare
方法准备模型、优化器和学习率调度器。- 从检查点加载模型、优化器和学习率调度器状态 (如果提供)。
- 记录训练信息,如样本数量、epochs 数量、批次大小等。
- 初始化训练状态,包括从检查点加载状态 (如果提供)。
- 更新回调处理器的引用,以便在训练过程中调用相应的回调函数。
- 初始化损失张量和梯度缩放器 (如果使用混合精度训练)。
- 调用回调处理器的
on_train_begin
方法。- 如果不忽略数据跳过,则跳过已训练的 epochs。
- 进入训练循环,遍历所有 epochs 和批次:
- 对于每个 epoch,设置数据加载器的采样器。
- 获取 epoch 迭代器,并重置过去的状态 (如果需要)。
- 调用回调处理器的
on_epoch_begin
方法。- 如果从检查点恢复训练,则加载随机数生成器状态。
- 对于每个批次:
- 跳过已训练的批次 (如果需要)。
- 调用回调处理器的
on_step_begin
方法。- 使用
accelerator.accumulate
Context Manager 执行训练步骤。- 计算损失,并根据配置处理 NaN 或 Inf 值。
- 更新浮点运算数。
- 如果是梯度累积的最后一批或当前 epoch 的最后一批:
- 执行梯度裁剪 (如果需要)。
- 执行优化器步骤。
- 如果优化器步骤成功运行,则更新学习率调度器、清除梯度、更新全局步数和 epoch。
- 调用回调处理器的
on_step_end
方法。- 记录和保存模型 (如果需要)。
- 否则,调用回调处理器的
on_substep_end
方法。- 检查是否需要停止当前 epoch 或整个训练过程。
- 如果在当前 epoch 没有迭代任何批次,则打印警告并停止训练。
- 调用回调处理器的
on_epoch_end
方法。- 记录和保存模型 (如果需要)。
- 如果启用了 TPU 指标调试选项,则打印指标报告。
- 检查是否需要停止整个训练过程。
- 清理过去的状态 (如果存在)。
- 如果启用了
load_best_model_at_end
选项,则加载最佳模型检查点。- 计算总训练损失。
- 计算训练指标并记录。
- 获取输出目录并排序检查点。
- 如果启用了
save_total_limit
选项,则删除旧的检查点。- 调用回调处理器的
on_train_end
方法。- 返回
TrainOutput
对象,包含训练的全局步数、损失和其他指标。
def _inner_training_loop(self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None):# 这是Trainer的内部训练循环方法,用于执行模型的训练过程# batch_size: 训练批次大小# args: 包含训练相关参数的对象,如优化器、学习率等# resume_from_checkpoint: 如果从检查点恢复训练,则指定检查点路径# trial: 如果是超参数搜索,则传入超参数试验对象# ignore_keys_for_eval: 在评估期间需要忽略的键# 1. 释放显存# Accelerator提供了一些工具来优化内存使用# free_memory()方法释放未使用的内存,避免内存占用过高self.accelerator.free_memory()# 2. 设置当前训练批次大小self._train_batch_size = batch_sizelogger.debug(f"Currently training with a batch size of: {self._train_batch_size}")# 将当前训练批次大小记录到日志中,方便调试# 3. 获取训练数据加载器train_dataloader = self.get_train_dataloader()# 通过实现的get_train_dataloader方法获取训练数据加载器# 4. 设置训练控制变量# 根据命令行参数和数据集长度,计算以下变量:# total_train_batch_size: 总训练批次大小(考虑并行、分布式和梯度累积)# len_dataloader: 数据加载器长度(如果可用)# num_update_steps_per_epoch: 每个epoch的更新步数# num_examples: 训练样本总数# max_steps: 总的训练步数# num_train_epochs: 总的训练周期数# num_train_samples: 总的训练样本数total_train_batch_size = args.train_batch_size * args.gradient_accumulation_steps * args.world_size# 总训练批次大小 = 每个GPU的批次大小 * 梯度累积步数 * GPU数量len_dataloader = Noneif has_length(train_dataloader):# 如果数据加载器提供了长度信息len_dataloader = len(train_dataloader)num_update_steps_per_epoch = len_dataloader // args.gradient_accumulation_stepsnum_update_steps_per_epoch = max(num_update_steps_per_epoch, 1)# 每个epoch的更新步数 = 数据加载器长度 / 梯度累积步数# 确保至少有1个更新步骤num_examples = self.num_examples(train_dataloader)# 获取训练样本总数if args.max_steps > 0:# 如果指定了最大训练步数max_steps = args.max_stepsnum_train_epochs = args.max_steps // num_update_steps_per_epoch + int(args.max_steps % num_update_steps_per_epoch > 0)# 总的训练周期数 = 最大步数 / 每个epoch的更新步数 + 是否需要额外的一个epochnum_train_samples = args.max_steps * total_train_batch_size# 总的训练样本数 = 最大步数 * 总训练批次大小else:# 如果没有指定最大步数,则根据epoch数计算max_steps = math.ceil(args.num_train_epochs * num_update_steps_per_epoch)# 最大步数 = 训练epoch数 * 每个epoch的更新步数(向上取整)num_train_epochs = math.ceil(args.num_train_epochs)# 总的训练周期数向上取整num_train_samples = self.num_examples(train_dataloader) * args.num_train_epochs# 总的训练样本数 = 训练样本总数 * 训练epoch数elif args.max_steps > 0: # 如果数据加载器长度无法获取,则依赖max_stepsmax_steps = args.max_stepsnum_train_epochs = sys.maxsize # 设置一个很大的epoch数量num_update_steps_per_epoch = max_stepsnum_examples = total_train_batch_size * args.max_stepsnum_train_samples = args.max_steps * total_train_batch_sizeelse:raise ValueError("args.max_steps must be set to a positive value if dataloader does not have a length, was"f" {args.max_steps}")# 如果数据加载器长度无法获取,并且也没有指定最大步数,则引发错误# 5. 计算日志、评估和保存的绝对步数# 如果使用比率表示,将比率转换为绝对步数if args.logging_steps and args.logging_steps < 1:args.logging_steps = math.ceil(max_steps * args.logging_steps)# 日志步数 = 最大步数 * 日志步数比率(向上取整)if args.eval_steps and args.eval_steps < 1:args.eval_steps = math.ceil(max_steps * args.eval_steps)# 评估步数 = 最大步数 * 评估步数比率(向上取整)if args.save_steps and args.save_steps < 1:args.save_steps = math.ceil(max_steps * args.save_steps)# 保存步数 = 最大步数 * 保存步数比率(向上取整)# 6. 检查是否启用了underflow/overflow调试选项if DebugOption.UNDERFLOW_OVERFLOW in self.args.debug:# 如果启用了underflow/overflow调试选项if self.args.n_gpu > 1:# 如果使用多GPU,则引发错误,因为目前此选项不支持数据并行raise ValueError("Currently --debug underflow_overflow is not supported under DP. Please use DDP"" (torch.distributed.launch).")else:# 如果只使用单GPU,则创建DebugUnderflowOverflow对象用于调试debug_overflow = DebugUnderflowOverflow(self.model) # noqa# 7. 判断是否延迟优化器创建# 在某些情况下(如使用Sharded DDP、SageMaker等),需要延迟优化器创建delay_optimizer_creation = (self.sharded_ddp is not Noneand self.sharded_ddp != ShardedDDPOption.SIMPLEor is_sagemaker_mp_enabled()or self.fsdp is not None)# 8. 如果使用DeepSpeed,初始化优化器和学习率调度器if self.is_deepspeed_enabled:self.optimizer, self.lr_scheduler = deepspeed_init(self, num_training_steps=max_steps)# 使用deepspeed_init函数初始化DeepSpeed优化器和学习率调度器# 9. 如果不需要延迟,则创建优化器和学习率调度器if not delay_optimizer_creation:self.create_optimizer_and_scheduler(num_training_steps=max_steps)# 调用create_optimizer_and_scheduler方法创建优化器和学习率调度器# 10. 初始化TrainerStateself.state = TrainerState()# 创建TrainerState对象,用于存储训练状态信息self.state.is_hyper_param_search = trial is not None# 如果提供了trial对象,则表示进行超参数搜索# 11. 如果启用了梯度检查点,则启用模型的梯度检查点if args.gradient_checkpointing:self.model.gradient_checkpointing_enable()# 调用模型的gradient_checkpointing_enable方法启用梯度检查点# 梯度检查点可以减少内存使用,但会增加计算开销# 12. 包装模型model = self._wrap_model(self.model_wrapped)# 调用_wrap_model方法包装模型# 可能会进行数据并行处理、DeepSpeed包装等操作# 13. 如果使用SageMaker MP并从检查点恢复,则从检查点加载模型if is_sagemaker_mp_enabled() and resume_from_checkpoint is not None:self._load_from_checkpoint(resume_from_checkpoint, model)# 如果使用SageMaker模型并行,并且提供了检查点路径# 则调用_load_from_checkpoint方法从检查点加载模型权重# 14. 判断是否使用accelerator.prepare# 对于某些特殊情况(如Fairscale Sharded DDP、FSDP-XLA、SageMaker MP/DP、DataParallel、IPEX),不能使用accelerator.prepareuse_accelerator_prepare = True if model is self.model else False# 如果model是原始模型对象,则可以使用accelerator.prepare# 否则不能使用,需要手动处理优化器等if delay_optimizer_creation:self.create_optimizer_and_scheduler(num_training_steps=max_steps)# 如果之前延迟了优化器创建,则在这里创建# 15. 使用accelerator.prepare准备模型和优化器if use_accelerator_prepare:# 如果可以使用accelerator.prepareif hasattr(self.lr_scheduler, "step"):# 如果学习率调度器有step方法if self.use_apex:# 如果使用Apex混合精度训练model = self.accelerator.prepare(self.model)# 只需要准备模型else:# 否则同时准备模型和优化器model, self.optimizer = self.accelerator.prepare(self.model, self.optimizer)else:# 如果学习率调度器没有step方法(例如DummyScheduler)# 则同时准备模型、优化器和学习率调度器model, self.optimizer, self.lr_scheduler = self.accelerator.prepare(self.model, self.optimizer, self.lr_scheduler)# 16. 如果启用了FSDP,则更新模型对象if self.is_fsdp_enabled:self.model = model# 17. 更新model_wrapped属性if model is not self.model:self.model_wrapped = model# 18. 如果使用DeepSpeed,则更新deepspeed属性if self.is_deepspeed_enabled:self.deepspeed = self.model_wrapped# 19. 如果从检查点恢复并且使用DeepSpeed,则从检查点加载模型if resume_from_checkpoint is not None and self.is_deepspeed_enabled:deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint)# 调用deepspeed_load_checkpoint函数从检查点加载DeepSpeed模型# 20. 从检查点加载优化器和学习率调度器状态(如果有)self._load_optimizer_and_scheduler(resume_from_checkpoint)# 重要: 到这一步# self.model 是原始的Transformers模型# self.model_wrapped 是包装后的模型(DDP、DeepSpeed等)# 开始训练!logger.info("***** Running training *****")logger.info(f" Num examples = {num_examples:,}")logger.info(f" Num Epochs = {num_train_epochs:,}")logger.info(f" Instantaneous batch size per device = {self._train_batch_size:,}")logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size:,}")logger.info(f" Gradient Accumulation steps = {args.gradient_accumulation_steps}")logger.info(f" Total optimization steps = {max_steps:,}")logger.info(f" Number of trainable parameters = {get_model_param_count(model, trainable_only=True):,}")# 记录训练相关信息到日志# 一些其他设置...self.state.epoch = 0start_time = time.time()epochs_trained = 0steps_trained_in_current_epoch = 0steps_trained_progress_bar = None# 21. 检查是否需要从检查点继续训练if resume_from_checkpoint is not None and os.path.isfile(os.path.join(resume_from_checkpoint, TRAINER_STATE_NAME)):# 如果提供了检查点路径,并且检查点中包含TrainerState文件self.state = TrainerState.load_from_json(os.path.join(resume_from_checkpoint, TRAINER_STATE_NAME))# 则从该文件加载TrainerStateepochs_trained = self.state.global_step // num_update_steps_per_epoch# 计算已经训练的epoch数if not args.ignore_data_skip:# 如果不忽略已经训练过的数据steps_trained_in_current_epoch = self.state.global_step % (num_update_steps_per_epoch)steps_trained_in_current_epoch *= args.gradient_accumulation_steps# 计算当前epoch已经训练的步数else:steps_trained_in_current_epoch = 0logger.info(" Continuing training from checkpoint, will skip to saved global_step")logger.info(f" Continuing training from epoch {epochs_trained}")logger.info(f" Continuing training from global step {self.state.global_step}")if not args.ignore_data_skip:if skip_first_batches is None:logger.info(f" Will skip the first {epochs_trained} epochs then the first"f" {steps_trained_in_current_epoch} batches in the first epoch. If this takes a lot of time,"" you can install the latest version of Accelerate with `pip install -U accelerate`.You can"" also add the `--ignore_data_skip` flag to your launch command, but you will resume the"" training on data already seen by your model.")else:logger.info(f" Will skip the first {epochs_trained} epochs then the first"f" {steps_trained_in_current_epoch} batches in the first epoch.")if self.is_local_process_zero() and not args.disable_tqdm and skip_first_batches is None:steps_trained_progress_bar = tqdm(total=steps_trained_in_current_epoch)steps_trained_progress_bar.set_description("Skipping the first batches")# 22. 更新callback_handler中的引用self.callback_handler.model = self.modelself.callback_handler.optimizer = self.optimizerself.callback_handler.lr_scheduler = self.lr_schedulerself.callback_handler.train_dataloader = train_dataloaderif self.hp_name is not None and self._trial is not None:self.state.trial_name = self.hp_name(self._trial)if trial is not None:assignments = trial.assignments if self.hp_search_backend == HPSearchBackend.SIGOPT else trialself.state.trial_params = hp_params(assignments)else:self.state.trial_params = Noneself.state.max_steps = max_stepsself.state.num_train_epochs = num_train_epochsself.state.is_local_process_zero = self.is_local_process_zero()self.state.is_world_process_zero = self.is_world_process_zero()# 23. 初始化损失tr_loss = torch.tensor(0.0).to(args.device)self._total_loss_scalar = 0.0self._globalstep_last_logged = self.state.global_stepmodel.zero_grad()# 24. 调用callback的on_train_begin方法self.control = self.callback_handler.on_train_begin(args, self.state, self.control)# 25. 如果不忽略已经训练过的数据,则跳过之前训练过的epoch和batchif not args.ignore_data_skip:for epoch in range(epochs_trained):is_random_sampler = hasattr(train_dataloader, "sampler") and isinstance(train_dataloader.sampler, RandomSampler)if is_torch_less_than_1_11 or not is_random_sampler:for _ in train_dataloader:breakelse:_ = list(train_dataloader.sampler)total_batched_samples = 0for epoch in range(epochs_trained, num_train_epochs):# 26. 在分布式训练中,设置每个epoch的数据采样器if isinstance(train_dataloader, DataLoader) and isinstance(train_dataloader.sampler, DistributedSampler):train_dataloader.sampler.set_epoch(epoch)elif hasattr(train_dataloader, "dataset") and isinstance(train_dataloader.dataset, IterableDatasetShard):train_dataloader.dataset.set_epoch(epoch)# 27. 如果使用TPU,则使用PyTorch XLA的ParallelLoaderif is_torch_tpu_available():parallel_loader = pl.ParallelLoader(train_dataloader, [args.device]).per_device_loader(args.device)epoch_iterator = parallel_loaderelse:epoch_iterator = train_dataloader# 28. 在每个epoch开始时重置past状态(如果需要)if args.past_index >= 0:self._past = Nonesteps_in_epoch = (len(epoch_iterator)if len_dataloader is not Noneelse args.max_steps * args.gradient_accumulation_steps)# 计算当前epoch的总步数# 29. 调用callback的on_epoch_begin方法self.control = self.callback_handler.on_epoch_begin(args, self.state, self.control)# 30. 如果从检查点恢复,并且是当前epoch的第一个batch,则加载RNG状态if epoch == epochs_trained and resume_from_checkpoint is not None and steps_trained_in_current_epoch == 0:self._load_rng_state(resume_from_checkpoint)rng_to_sync = Falsesteps_skipped = 0if skip_first_batches is not None and steps_trained_in_current_epoch > 0:# 31. 如果需要跳过当前epoch的一些batch,则调用skip_first_batchesepoch_iterator = skip_first_batches(epoch_iterator, steps_trained_in_current_epoch)steps_skipped = steps_trained_in_current_epochsteps_trained_in_current_epoch = 0rng_to_sync = Truestep = -1for step, inputs in enumerate(epoch_iterator):# 32. 计算总的批次样本数total_batched_samples += 1if rng_to_sync:# 33. 如果需要同步RNG状态,则从检查点加载self._load_rng_state(resume_from_checkpoint)rng_to_sync = False# 34. 跳过已经训练过的批次if steps_trained_in_current_epoch > 0:steps_trained_in_current_epoch -= 1if steps_trained_progress_bar is not None:steps_trained_progress_bar.update(1)if steps_trained_in_current_epoch == 0:self._load_rng_state(resume_from_checkpoint)continueelif steps_trained_progress_bar is not None:steps_trained_progress_bar.close()steps_trained_progress_bar = None# 35. 在梯度累积步开始时,调用callback的on_step_begin方法if step % args.gradient_accumulation_steps == 0:self.control = self.callback_handler.on_step_begin(args, self.state, self.control)# 36. 使用gradient_accumulation_context包装训练步骤with self.accelerator.accumulate(model):tr_loss_step = self.training_step(model, inputs)# 37. 处理NaN和inf损失值if (args.logging_nan_inf_filterand not is_torch_tpu_available()and (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step))):tr_loss += tr_loss / (1 + self.state.global_step - self._globalstep_last_logged)else:tr_loss += tr_loss_step# 38. 计算当前步骤的浮点运算数self.current_flos += float(self.floating_point_ops(inputs))# 39. 如果是梯度累积步的最后一步或者当前epoch的最后一步,则进行以下操作:if total_batched_samples % args.gradient_accumulation_steps == 0 or (steps_in_epoch <= args.gradient_accumulation_stepsand (step + 1) == steps_in_epoch):# 40. 梯度裁剪if args.max_grad_norm is not None and args.max_grad_norm > 0:if self.do_grad_scaling:if is_torch_tpu_available():gradients = xm._fetch_gradients(self.optimizer)xm.all_reduce("sum", gradients, scale=1.0 / xm.xrt_world_size())self.scaler.unscale_(self.optimizer)if is_sagemaker_mp_enabled() and args.fp16:self.optimizer.clip_master_grads(args.max_grad_norm)elif hasattr(self.optimizer, "clip_grad_norm"):self.optimizer.clip_grad_norm(args.max_grad_norm)elif hasattr(model, "clip_grad_norm_"):model.clip_grad_norm_(args.max_grad_norm)elif self.use_apex:nn.utils.clip_grad_norm_(amp.master_params(self.optimizer),args.max_grad_norm,)else:self.accelerator.clip_grad_norm_(model.parameters(),args.max_grad_norm,)# 41. 优化器步骤optimizer_was_run = Trueif is_torch_tpu_available():if self.do_grad_scaling:self.scaler.step(self.optimizer)self.scaler.update()else:xm.optimizer_step(self.optimizer)elif self.do_grad_scaling:scale_before = self.scaler.get_scale()self.scaler.step(self.optimizer)self.scaler.update()scale_after = self.scaler.get_scale()optimizer_was_run = scale_before <= scale_afterelse:self.optimizer.step()optimizer_was_run = not self.accelerator.optimizer_step_was_skippedif optimizer_was_run:# 42. 更新学习率调度器(如果不是ReduceLROnPlateau)if not isinstance(self.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):self.lr_scheduler.step()# 43. 梯度清零model.zero_grad()self.state.global_step += 1self.state.epoch = epoch + (step + 1 + steps_skipped) / steps_in_epoch# 44. 调用callback的on_step_end方法self.control = self.callback_handler.on_step_end(args, self.state, self.control)# 45. 可能会记录日志、保存模型和评估模型self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval)else:# 46. 调用callback的on_substep_end方法self.control = self.callback_handler.on_substep_end(args, self.state, self.control)# 47. 检查是否需要提前停止当前epoch或整个训练if self.control.should_epoch_stop or self.control.should_training_stop:break# 48. 如果在epoch迭代器中没有任何样本,则提前终止训练if step < 0:logger.warning("There seems to be not a single sample in your epoch_iterator, stopping training at step"f" {self.state.global_step}! This is expected if you're using an IterableDataset and set"f" num_steps ({max_steps}) higher than the number of available samples.")self.control.should_training_stop = True# 49. 调用callback的on_epoch_end方法self.control = self.callback_handler.on_epoch_end(args, self.state, self.control)self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval)# 50. 如果启用了TPU指标调试,则打印相关指标if DebugOption.TPU_METRICS_DEBUG in self.args.debug:if is_torch_tpu_available():xm.master_print(met.metrics_report())else:logger.warning("You enabled PyTorch/XLA debug metrics but you don't have a TPU ""configured. Check your training configuration if this is unexpected.")if self.control.should_training_stop:break# 51. 清理past状态(如果有)if args.past_index and hasattr(self, "_past"):delattr(self, "_past")logger.info("\n\nTraining completed. Do not forget to share your model on huggingface.co/models =)\n\n")# 52. 如果配置了在结束时加载最佳模型,则加载最佳模型if args.load_best_model_at_end and self.state.best_model_checkpoint is not None:if is_torch_tpu_available():xm.rendezvous("load_best_model_at_end")elif args.parallel_mode == ParallelMode.DISTRIBUTED:dist.barrier()elif is_sagemaker_mp_enabled():smp.barrier()self._load_best_model()# 53. 计算总的训练损失self._total_loss_scalar += tr_loss.item()train_loss = self._total_loss_scalar / self.state.global_step# 54. 计算训练速度指标metrics = speed_metrics("train", start_time, num_samples=num_train_samples, num_steps=self.state.max_steps)self.store_flos()metrics["total_flos"] = self.state.total_flos# 55. 将总的浮点运算数存储到metrics字典中metrics["train_loss"] = train_loss# 56. 将训练损失存储到metrics字典中self.is_in_train = False# 57. 设置is_in_train标志为False,表示训练过程结束self._memory_tracker.stop_and_update_metrics(metrics)# 58. 停止内存跟踪器,并将内存使用情况更新到metrics字典中self.log(metrics)# 59. 记录metrics信息到日志中run_dir = self._get_output_dir(trial)checkpoints_sorted = self._sorted_checkpoints(use_mtime=False, output_dir=run_dir)# 60. 获取输出目录,并对检查点文件进行排序# 61. 如果设置了只保留一个检查点,并且最佳模型检查点存在,则删除旧的检查点if self.args.should_save and self.state.best_model_checkpoint is not None and self.args.save_total_limit == 1:for checkpoint in checkpoints_sorted:if checkpoint != self.state.best_model_checkpoint:logger.info(f"Deleting older checkpoint [{checkpoint}] due to args.save_total_limit")shutil.rmtree(checkpoint)self.control = self.callback_handler.on_train_end(args, self.state, self.control)# 62. 调用callback的on_train_end方法return TrainOutput(self.state.global_step, train_loss, metrics)# 63. 返回TrainOutput对象,包含全局步数、训练损失和指标
待更新..............