一、定义
- 定义
- 数据集格式
- llamafactory 训练案例
- 入门文档阅读
二、实现
-
定义
DPO通过直接优化语言模型来实现对其行为的精确控制,而无需使用复杂的强化学习,也可以有效学习到人类偏好,DPO相较于RLHF更容易实现且易于训练,效果更好.
DPO 是一种自动微调方法,它通过最大化预训练模型在特定任务上的奖励来优化模型参数。与传统的微调方法相比,DPO 绕过了建模奖励函数这一步,而是通过直接在偏好数据上优化模型来提高性能。相对RLHF两阶段而言具有多项优越性:
(1)简单性:DPO更容易实施和培训,使其更易于使用。
(2)稳定性:不易陷入局部最优,保证训练过程更加可靠。
(3)效率:与RLHF 相比, DPO 需要更少的计算资源和数据,使其计算量轻。
(4)有效性:实验结果表明,DPO在情感控制、摘要和对话生成等任务中可以优于 RLHF 。 -
数据集格式
[{"conversations": [{"from": "human","value": "国会的转发\n美国国会由众议院和参议院组成,每两年换届一次(参议员任期为6年,但参议院选举是错位的,使得国会的组成仍然每两年变化一次)。这两年期间按顺序标记,第115届国会发生在2017-2018年。\n\n密歇根大学信息学院的研究人员在这段时间内收集了现任国会议员(我们将“国会议员”缩写为MoC)的Twitter帖子,并对它们进行编码,标记为原创声明或其他用户提交的转发。我们将重点关注转发数据。这些发布的数据不仅包括转发的文本,还包括国会议员的信息和原始推文的帐户。\n#python:\n\nimport pandas as pd\nimport numpy as np\nimport matplotlib.pyplot as plt\nimport seaborn as sb\nimport statsmodels.api as sm\nimport os\nfrom pandas.plotting import register\\_matplotlib\\_converters\nregister\\_matplotlib\\_converters()\n\npd.options.mode.chained\\_assignment = None\n\n# 在接下来的内容中,我们将加载数据,但出于速度原因,我们将排除推文本身的文本。\n\ntweets = pd.read\\_csv(os.path.join(base, \"retweets\\_congress\\_115th.csv.gz\"), \n usecols = lambda x: x != \"full\\_text\",\n dtype = {\"tweet\\_id\\_str\": \"str\"},\n index\\_col = \"tweet\\_id\\_str\")\ntweets.shape\nout: (209856, 21)\n\ntweets.columns\nout:Index(['user\\_id\\_str', 'user\\_screen\\_name', 'user\\_followers\\_count', 'created\\_at',\n 'retweet\\_count', 'retweeted\\_status', 'year', 'bioguide', 'birthday',\n 'gender', 'name', 'state', 'chamber', 'party', 'region', 'rt\\_user\\_id',\n 'rt\\_screen\\_name', 'rt\\_followers\\_count', 'rt\\_bio', 'age', 'age\\_bucket'],\n dtype='object')\n\n第(a)部分\n国会主要由两个政党(民主党和共和党)组成,还有一些独立派。哪个政党负责大部分的转发?\n\n计算推文的政党的边际概率,并将结果保存到party\\_marg。显示这个结果。\n\n第(b)部分\n我们可能会问的一个问题是,“国会成员彼此转发的频率是多少?”\n\n创建一个名为“is\\_congress\\_retweet”的新列,指示转发是否来自MoC。首先使用.unique()方法获取唯一的“user\\_id\\_str”值集合。然后使用这组id,在“rt\\_user\\_id”列上使用.isin(values)方法创建新的布尔列“is\\_congress\\_retweet”。\n\n找到转发MoC的边际概率,并将结果保存到“congress\\_retweet\\_marg”。显示这些结果。是更常见地转发国会议员还是来自国会外部?"}],"chosen": {"from": "gpt","value": "计算推文的政党边际概率,我们可以使用以下代码:\n```python\ntweets['party'] = tweets['party'].replace(0, 'Democrat')\ntweets['party'] = tweets['party'].replace(1, 'Republican')\ntweets['party'] = tweets['party'].replace(2, 'Independent')\n\nparty_counts = tweets.groupby('party').size().sort_values(ascending=False)\n\nparty_marg = party_counts / party_counts.sum()\n\nprint(\"政党的边际概率:\")\nprint(party_marg)\n```\n这段代码将'party'列中的数字值替换为相应的政党名称,然后按政党对推文进行分组并计算每个组的大小。最后,通过将每个组的大小除以总推文数来计算每个政党的边际概率。\n\n输出应该类似于:\n```markdown\n政党的边际概率:\n0.522152 民主党\n0.445541 共和党\n0.032307 独立派\n```\n这表明大多数转发来自民主党,其次是共和党,然后是独立派。\n\n要创建一个名为\"is\\_congress\\_retweet\"的新列,指示转发是否来自国会议员,我们可以使用以下代码:\n```python\ncongress_ids = tweets['user_id_str'].unique()\nis_congress_retweet = tweets['rt_user_id'].isin(congress_ids)\ntweets['is_congress_retweet'] = is_congress_retweet\n\ncongress_retweet_marg = tweets['is_congress_retweet'].mean()\nprint(\"转发国会议员的边际概率:\")\nprint(congress_retweet_marg)\n```\n这段代码首先获取唯一的\"user\\_id\\_str\"值集合,对应于国会议员的集合。然后,在\"rt\\_user\\_id\"列上使用.isin(values)方法创建一个新的布尔列\"is\\_congress\\_retweet\",指示转发是否来自国会议员。最后,通过取\"is\\_congress\\_retweet\"列的平均值来计算转发国会议员的边际概率。\n\n输出应该类似于:\n```markdown\n转发国会议员的边际概率:\n0.775151\n```\n这表明大多数转发不是来自国会议员,而是来自国会之外。"},"rejected": {"from": "gpt","value": "回答问题的第(a)部分,您可以通过首先使用groupby()函数按政党对数据进行分组,然后计算每个组的转发数来计算政党的边际概率。具体来说,您可以使用以下代码:\n```scss\nparty_count = tweets.groupby('party').count()\nparty_count.loc['Democrats','retweet_count'] / party_count.loc['Republicans','retweet_count']\n```\n回答问题的第(b)部分,您可以创建一个新列来指示转发是否来自国会议员,方法是使用“bioguide”列上的.unique()方法获取唯一的国会议员用户ID集合,然后使用该集合创建一个布尔列,指示转发是否来自国会议员,使用“rt_user_id”列上的.isin()方法。最后,使用.sum()方法来计算转发国会议员的边际概率,然后将其与转发来自国会以外的人的概率进行比较。以下是相关代码:\n```makefile\nimport pandas as pd\ntweets = pd.read_csv('retweets_congress_115th.csv')\n\n# 获取唯一的国会议员ID集合\nmovers_ids = tweets['bioguide'].unique()\n\n# 创建一个新的布尔列,指示转发是否来自国会议员\ntweets['is_congress_retweet'] = tweets['rt_user_id'].isin(movers_ids)\n\n# 计算转发国会议员或来自国会以外的人的边际概率\nmovers_retweets = tweets[tweets['is_congress_retweet'] == True].shape[0]\nnon_movers_retweets = tweets[tweets['is_congress_retweet'] == False].shape[0]\nproportion_movers_retweets = movers_retweets / (movers_retweets + non_movers_retweets)\nprint('Proportion of retweets from MoCs:', proportion_movers_retweets)\n```"}}]
其中chosen 代表是好的回答, rejected代表的是不好的回答
- llamafactory 训练案例
CUDA_VISIBLE_DEVICES=0 llamafactory-cli train examples/train_lora/llama3_lora_dpo.yaml
- 入门文档阅读
https://huggingface.co/docs/trl/dpo_trainer
代码:https://github.com/huggingface/trl/blob/main/examples/scripts/dpo.py
https://github.com/huggingface/trl/tree/main
数据格式:
def process(row):row["prompt"] = tokenizer.apply_chat_template(row["chosen"][:-1], tokenize=False)row["chosen"] = tokenizer.apply_chat_template([row["chosen"][-1]], tokenize=False)row["rejected"] = tokenizer.apply_chat_template([row["rejected"][-1]], tokenize=False)return row
获取正向的预测、反向的预测—>reward_accuracies = (chosen_rewards > rejected_rewards).float() —>纠正loss---->反向传播
def dpo_loss(self,policy_chosen_logps: torch.FloatTensor,policy_rejected_logps: torch.FloatTensor,reference_chosen_logps: torch.FloatTensor,reference_rejected_logps: torch.FloatTensor,reference_free: bool = True,
) -> Tuple[torch.FloatTensor, torch.FloatTensor, torch.FloatTensor]:"""Compute the DPO loss for a batch of policy and reference model log probabilities.Args:policy_chosen_logps: Log probabilities of the policy model for the chosen responses. Shape: (batch_size,)policy_rejected_logps: Log probabilities of the policy model for the rejected responses. Shape: (batch_size,)reference_chosen_logps: Log probabilities of the reference model for the chosen responses. Shape: (batch_size,)reference_rejected_logps: Log probabilities of the reference model for the rejected responses. Shape: (batch_size,)beta: Temperature parameter for the DPO loss, typically something in the range of 0.1 to 0.5. We ignore the reference model as beta -> 0.reference_free: If True, we ignore the _provided_ reference model and implicitly use a reference model that assigns equal probability to all responses.Returns:A tuple of three tensors: (losses, chosen_rewards, rejected_rewards).The losses tensor contains the DPO loss for each example in the batch.The chosen_rewards and rejected_rewards tensors contain the rewards for the chosen and rejected responses, respectively."""pi_logratios = policy_chosen_logps - policy_rejected_logpsref_logratios = reference_chosen_logps - reference_rejected_logpsif reference_free:ref_logratios = 0logits = pi_logratios - ref_logratioslosses = -F.logsigmoid(self.beta * logits)chosen_rewards = self.beta * (policy_chosen_logps - reference_chosen_logps).detach()rejected_rewards = self.beta * (policy_rejected_logps - reference_rejected_logps).detach()return losses, chosen_rewards, rejected_rewards
def get_batch_loss_metrics(self,model,batch: Dict[str, Union[List, torch.LongTensor]],train_eval: Literal["train", "eval"] = "train",
):"""Compute the DPO loss and other metrics for the given batch of inputs for train or test."""metrics = {}(policy_chosen_logps,policy_rejected_logps,policy_chosen_logits,policy_rejected_logits,policy_chosen_logps_avg,) = self.concatenated_forward(model, batch)# if reference_chosen_logps and reference_rejected_logps in batch use them, otherwise use the reference modelif ("reference_chosen_logps" in batchand "reference_rejected_logps" in batchand self.args.rpo_alpha is not None):reference_chosen_logps = batch["reference_chosen_logps"]reference_rejected_logps = batch["reference_rejected_logps"]else:with torch.no_grad():if self.ref_model is None:with self.null_ref_context():(reference_chosen_logps,reference_rejected_logps,_,_,_,) = self.concatenated_forward(self.model, batch)else:(reference_chosen_logps,reference_rejected_logps,_,_,_,) = self.concatenated_forward(self.ref_model, batch)losses, chosen_rewards, rejected_rewards = self.dpo_loss(policy_chosen_logps,policy_rejected_logps,reference_chosen_logps,reference_rejected_logps,)reward_accuracies = (chosen_rewards > rejected_rewards).float()if self.args.rpo_alpha is not None:losses = losses * self.args.rpo_alpha - policy_chosen_logps_avgprefix = "eval_" if train_eval == "eval" else ""metrics[f"{prefix}rewards/chosen"] = chosen_rewards.mean().cpu()metrics[f"{prefix}rewards/rejected"] = rejected_rewards.mean().cpu()metrics[f"{prefix}rewards/accuracies"] = reward_accuracies.mean().cpu()metrics[f"{prefix}rewards/margins"] = (chosen_rewards - rejected_rewards).mean().cpu()metrics[f"{prefix}logps/rejected"] = policy_rejected_logps.detach().mean().cpu()metrics[f"{prefix}logps/chosen"] = policy_chosen_logps.detach().mean().cpu()metrics[f"{prefix}logits/rejected"] = policy_rejected_logits.detach().mean().cpu()metrics[f"{prefix}logits/chosen"] = policy_chosen_logits.detach().mean().cpu()return losses.mean(), metricsdef compute_loss(self,model: Union[PreTrainedModel, nn.Module],inputs: Dict[str, Union[torch.Tensor, Any]],return_outputs=False,
) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict[str, torch.Tensor]]]:if not self.use_dpo_data_collator:warnings.warn("compute_loss is only implemented for DPODataCollatorWithPadding, and you passed a datacollator that is different than ""DPODataCollatorWithPadding - you might see unexpected behavior. Alternatively, you can implement your own prediction_step method if you are using a custom data collator")compute_loss_context_manager = torch.cuda.amp.autocast if self._peft_has_been_casted_to_bf16 else nullcontextwith compute_loss_context_manager():loss, metrics = self.get_batch_loss_metrics(model, inputs, train_eval="train")# Make sure to move the loss to the device the original accumulating loss is at back in the `Trainer` class:loss = loss.to(self.args.device)# force log the metricsself.store_metrics(metrics, train_eval="train")if return_outputs:return (loss, metrics)return loss