2023年的深度学习入门指南(19) - LLaMA 2源码解析
上一节我们学习了LLaMA 2的补全和聊天两种API的使用方法。本节我们来看看LLaMA 2的源码。
补全函数text_completion源码解析
上一节我们讲了LLaMA 2的编程方法。我们来复习一下:
generator = Llama.build(ckpt_dir=ckpt_dir,tokenizer_path=tokenizer_path,max_seq_len=max_seq_len,max_batch_size=max_batch_size,)prompts = ["上下五千年,英雄万万千。黄沙百战穿金甲,不破楼兰终不还",]results = generator.text_completion(prompts,max_gen_len=max_gen_len,temperature=temperature,top_p=top_p,)
我们先来看看text_completion函数的参数是什么意思,该函数的原型为:
def text_completion(
self,
prompts: List[str],
temperature: float = 0.6,
top_p: float = 0.9,
max_gen_len: Optional[int] = None,
logprobs: bool = False,
echo: bool = False,
) -> List[CompletionPrediction]:
我们来看下这些参数的含义:
- prompts:这是一个字符串列表,每个字符串都是一个用于生成文本的提示。
- temperature(默认值为0.6):这是一个控制生成文本随机性的参数。温度值越高,生成的文本就越随机;温度值越低,生成的文本就越倾向于最可能的输出。
- top_p(默认值为0.9):这是一个控制生成文本多样性的参数,它设定了从最高概率的词开始,累计到总概率超过top_p的词为止,然后从这些词中随机选择一个词作为生成的词。这种方法也被称为nucleus sampling或top-p sampling。
- max_gen_len:可选参数,表示生成的文本的最大长度。如果未指定,那么将使用模型参数中的最大序列长度减1。
- logprobs(默认值为False):如果为True,那么在返回的结果中会包含生成的每个词的对数概率。
- echo(默认值为False):这是一个控制是否在生成的文本中包含输入提示的参数。
参数明白了之后我们看text_completion完整实现:
def text_completion(self,prompts: List[str],temperature: float = 0.6,top_p: float = 0.9,max_gen_len: Optional[int] = None,logprobs: bool = False,echo: bool = False,) -> List[CompletionPrediction]:if max_gen_len is None:max_gen_len = self.model.params.max_seq_len - 1prompt_tokens = [self.tokenizer.encode(x, bos=True, eos=False) for x in prompts]generation_tokens, generation_logprobs = self.generate(prompt_tokens=prompt_tokens,max_gen_len=max_gen_len,temperature=temperature,top_p=top_p,logprobs=logprobs,echo=echo,)if logprobs:return [{"generation": self.tokenizer.decode(t),"tokens": [self.tokenizer.decode(x) for x in t],"logprobs": logprobs_i,}for t, logprobs_i in zip(generation_tokens, generation_logprobs)]return [{"generation": self.tokenizer.decode(t)} for t in generation_tokens]
总结起来就三步,这个text_completion其实就是generate的包装函数:
- 编码:调用tokenizer.encode
- 生成:调用generate
- 解码:调用tokenizer.decode
分词
import os
from logging import getLogger
from typing import Listfrom sentencepiece import SentencePieceProcessorlogger = getLogger()class Tokenizer:def __init__(self, model_path: str):# reload tokenizerassert os.path.isfile(model_path), model_pathself.sp_model = SentencePieceProcessor(model_file=model_path)logger.info(f"Reloaded SentencePiece model from {model_path}")# BOS / EOS token IDsself.n_words: int = self.sp_model.vocab_size()self.bos_id: int = self.sp_model.bos_id()self.eos_id: int = self.sp_model.eos_id()self.pad_id: int = self.sp_model.pad_id()logger.info(f"#words: {self.n_words} - BOS ID: {self.bos_id} - EOS ID: {self.eos_id}")assert self.sp_model.vocab_size() == self.sp_model.get_piece_size()def encode(self, s: str, bos: bool, eos: bool) -> List[int]:assert type(s) is strt = self.sp_model.encode(s)if bos:t = [self.bos_id] + tif eos:t = t + [self.eos_id]return tdef decode(self, t: List[int]) -> str:return self.sp_model.decode(t)
首先是用到了分词组件SentencePieceProcessor。SentencePieceProcessor是SentencePiece库中的一个组件,它实现了子词(subword)tokenize和detokenize的功能。
其主要作用包括:
- 将文本tokenize成子词(subword)。SentencePiece 使用的数据驱动方法,可以学习文本的词汇表并将文本tokenize成子词单元。
- 将子词detokenize合并成原始文本。可以将tokenize后的子词序列重新合并为原始文本。
- 提供vocab管理。可以获得tokenize的子词词汇表等信息。
- 支持多种语言文本的tokenize和detokenize。
- 提供高效的实现。底层使用C++实现,可以快速处理大规模文本。
- 提供多种模型选择,如BPE、unigram等。
- 支持自定义训练子词化模型。
好,下面我们回到这段代码本身。这段代码实现了一个基于SentencePiece的Tokenizer类,可以进行文本的tokenize和detokenize。
主要逻辑:
- 在初始化时加载SentencePiece模型文件model_path。
- 获取模型的词汇表大小n_words,以及特殊token的id(bos_id,eos_id,pad_id)。
- encode方法可以将字符串文本s tokenize成id列表。可以选择在开始加入bos_id,结尾加入eos_id。
- decode方法可以将id列表解码还原为字符串文本。
这样就构建了一个封装SentencePiece tokenize/detokenize的Tokenizer类。可以加载自定义的SentencePiece模型,然后就可以方便地对文本进行子词化处理。
这种方式可以重复使用已训练好的SentencePiece模型,为下游NLP任务提供可靠的tokenize和detokenize功能。
最后我们再讲一讲几个特殊的符号bos_id、eos_id和pad_id:
- bos_id: 开始符(Beginning of Sentence)的id。用于表示一个序列的开始。
- eos_id: 结束符(End of Sentence)的id。用于表示一个序列的结束。
- pad_id: 填充符(Padding)的id。当需要将多个序列长度对齐时,可以使用pad_id在较短序列后面填充。
聊天函数chat_completion
在进入generate函数之前,我们再看看chat_completion是如何实现的。
def chat_completion(self,dialogs: List[Dialog],temperature: float = 0.6,top_p: float = 0.9,max_gen_len: Optional[int] = None,logprobs: bool = False,) -> List[ChatPrediction]:if max_gen_len is None:max_gen_len = self.model.params.max_seq_len - 1prompt_tokens = []for dialog in dialogs:if dialog[0]["role"] != "system":dialog = [{"role": "system","content": DEFAULT_SYSTEM_PROMPT,}] + dialogdialog = [{"role": dialog[1]["role"],"content": B_SYS+ dialog[0]["content"]+ E_SYS+ dialog[1]["content"],}] + dialog[2:]assert all([msg["role"] == "user" for msg in dialog[::2]]) and all([msg["role"] == "assistant" for msg in dialog[1::2]]), ("model only supports 'system', 'user' and 'assistant' roles, ""starting with 'system', then 'user' and alternating (u/a/u/a/u...)")dialog_tokens: List[int] = sum([self.tokenizer.encode(f"{B_INST} {(prompt['content']).strip()} {E_INST} {(answer['content']).strip()} ",bos=True,eos=True,)for prompt, answer in zip(dialog[::2],dialog[1::2],)],[],)assert (dialog[-1]["role"] == "user"), f"Last message must be from user, got {dialog[-1]['role']}"dialog_tokens += self.tokenizer.encode(f"{B_INST} {(dialog[-1]['content']).strip()} {E_INST}",bos=True,eos=False,)prompt_tokens.append(dialog_tokens)generation_tokens, generation_logprobs = self.generate(prompt_tokens=prompt_tokens,max_gen_len=max_gen_len,temperature=temperature,top_p=top_p,logprobs=logprobs,)if logprobs:return [{"generation": {"role": "assistant","content": self.tokenizer.decode(t),},"tokens": [self.tokenizer.decode(x) for x in t],"logprobs": logprobs_i,}for t, logprobs_i in zip(generation_tokens, generation_logprobs)]return [{"generation": {"role": "assistant", "content": self.tokenizer.decode(t)}}for t in generation_tokens]
我们先看一下参数:
- dialogs:一个对话列表,其中每个对话都是一个字典列表,表示一段对话。
- temperature:一个浮点数,表示生成文本时使用的温度。默认值为 0.6。
- top_p:一个浮点数,表示生成文本时使用的 top-p 采样。默认值为 0.9。
- max_gen_len:一个可选的整数,表示生成文本的最大长度。如果未指定,则使用模型参数中的最大序列长度减一。
- logprobs:一个布尔值,表示是否返回生成文本的对数概率。默认值为 False。
函数返回一个 ChatPrediction 列表,其中每个元素都是一个字典,包含生成的回复和相关信息。
函数首先检查 max_gen_len 是否为 None,如果是,则将其设置为模型参数中的最大序列长度减一。然后,对于每个对话,函数执行以下操作:
- 如果第一条消息的角色不是 “system”,则在对话的开头添加一条默认的系统提示。
- 将第一条和第二条消息合并为一条消息,并更新对话。
- 检查对话中消息的角色是否符合要求(即以 “system” 开始,然后交替出现 “user” 和 “assistant”)。
- 对于每一组相邻的提示和回答(即每两条消息),使用 tokenizer 对其进行编码,并将编码后的 token 连接起来。
- 检查最后一条消息是否来自用户。
- 对最后一条消息进行编码,并将编码后的 token 添加到 token 列表中。
接下来,函数调用 generate 方法生成回复,并根据 logprobs 参数的值返回相应的结果。如果 logprobs 为 True,则返回包含生成回复、token 和对数概率的字典列表;否则,返回仅包含生成回复的字典列表。这些生成回复都具有 “assistant” 角色,并使用 tokenizer 进行解码。
总体来说,只是增加了对于对话角色的业务逻辑处理,核心还是调用generate函数。
温度与top p采样
在进入讲解generate函数之前,我们先讲一个小知识点,就是温度temperature的作用。我们看下面的代码:
if temperature > 0:probs = torch.softmax(logits[:, -1] / temperature, dim=-1)next_token = sample_top_p(probs, top_p)else:next_token = torch.argmax(logits[:, -1], dim=-1)
temperature 是一个超参数,用于控制生成文本的多样性。当 temperature 较高时,概率分布更加平坦,因此采样出的标记更具多样性。当 temperature 较低时,概率分布更加尖锐,因此采样出的标记更倾向于概率最大的那个。当 temperature 等于 0 时,直接选择概率最大的标记。
那么,sample_top_p是如何实现的呢?我把解说写在代码注释里面了:
def sample_top_p(probs, p):# 这行代码将输入的概率 probs 按照降序排序。probs_sort 是排序后的概率,probs_idx 是对应的索引。probs_sort, probs_idx = torch.sort(probs, dim=-1, descending=True)# 这行代码计算 probs_sort 的累积和。累积和是从第一个元素开始,依次将序列中的每个元素与前面所有元素的和相加得到的。probs_sum = torch.cumsum(probs_sort, dim=-1)# 这行代码生成一个布尔掩码,用于指示哪些累积和减去当前概率的值大于 p。这用于确定哪些概率应该被设为0,以保证被抽样的概率和不超过 p。mask = probs_sum - probs_sort > p# 这行代码使用上述生成的掩码,将那些使累积和减去当前概率的值大于 p 的 probs_sort 中的元素设为0。probs_sort[mask] = 0.0# 这行代码将 probs_sort 中的每个元素除以它们的和,以便重新归一化概率分布。probs_sort.div_(probs_sort.sum(dim=-1, keepdim=True))# 这行代码从归一化的 probs_sort 中抽取一个样本。torch.multinomial 是PyTorch中的多项式分布抽样函数,它根据每个元素的权重抽取样本。next_token = torch.multinomial(probs_sort, num_samples=1)# 这行代码使用 torch.gather 函数从 probs_idx 中收集对应 next_token 的索引,这样就能得到原始概率 probs 中对应的索引。next_token = torch.gather(probs_idx, -1, next_token)return next_token
总的来说,sample_top_p保留了按概率高低排序的大致分布,但过滤了长尾部分的低概率噪声。然后从重归一化的分布中采样,既保证了质量,又增加了适当的随机性。
generate函数
好,我们终于开始探索最核心的生成函数上了:
@torch.inference_mode()def generate(self,prompt_tokens: List[List[int]],max_gen_len: int,temperature: float = 0.6,top_p: float = 0.9,logprobs: bool = False,echo: bool = False,) -> Tuple[List[List[int]], Optional[List[List[float]]]]:
首先是这个函数的参数,其实我们已经比较熟悉了。包括输入的提示 tokens(prompt_tokens),最大生成长度(max_gen_len),温度参数(temperature,影响生成文本的随机性), top_p(用于决定采样过程中保留的 token 集合的概率阈值,也被称为 “nucleus sampling”),是否返回每个 token 的对数概率(logprobs),以及是否将输入的提示返回(echo)。
params = self.model.paramsbsz = len(prompt_tokens)assert bsz <= params.max_batch_size, (bsz, params.max_batch_size)min_prompt_len = min(len(t) for t in prompt_tokens)max_prompt_len = max(len(t) for t in prompt_tokens)assert max_prompt_len <= params.max_seq_lentotal_len = min(params.max_seq_len, max_gen_len + max_prompt_len)pad_id = self.tokenizer.pad_idtokens = torch.full((bsz, total_len), pad_id, dtype=torch.long, device="cuda")for k, t in enumerate(prompt_tokens):tokens[k, : len(t)] = torch.tensor(t, dtype=torch.long, device="cuda")if logprobs:token_logprobs = torch.zeros_like(tokens, dtype=torch.float)prev_pos = 0eos_reached = torch.tensor([False] * bsz, device="cuda")input_text_mask = tokens != pad_id
接着,根据提供的 prompt_tokens 初始化一个 tokens 张量,长度为 total_len,并填充模型的 pad_id。然后,将 prompt_tokens 的内容复制到 tokens 张量的对应位置。
for cur_pos in range(min_prompt_len, total_len):logits = self.model.forward(tokens[:, prev_pos:cur_pos], prev_pos)if logprobs:token_logprobs[:, prev_pos + 1 : cur_pos + 1] = -F.cross_entropy(input=logits.transpose(1, 2),target=tokens[:, prev_pos + 1 : cur_pos + 1],reduction="none",ignore_index=pad_id,)if temperature > 0:probs = torch.softmax(logits[:, -1] / temperature, dim=-1)next_token = sample_top_p(probs, top_p)else:next_token = torch.argmax(logits[:, -1], dim=-1)next_token = next_token.reshape(-1)# only replace token if prompt has already been generatednext_token = torch.where(input_text_mask[:, cur_pos], tokens[:, cur_pos], next_token)tokens[:, cur_pos] = next_tokeneos_reached |= (~input_text_mask[:, cur_pos]) & (next_token == self.tokenizer.eos_id)prev_pos = cur_posif all(eos_reached):break
然后,对于 tokens 张量中的每一个位置,计算下一个 token 的 logits,并基于这些 logits 生成下一个 token。如果 logprobs 参数为真,则计算每个 token 的对数概率。如果温度大于 0,则使用 softmax 函数和温度参数对 logits 进行缩放,然后使用 top-p 采样生成下一个 token。否则,直接选择 logits 最大的 token。新生成的 token 会替换 tokens 张量中的对应位置。
如果生成的 token 是结束标记(eos_id),则更新 eos_reached 标记。如果所有的序列都已经生成了结束标记,则停止生成。
if logprobs:token_logprobs = token_logprobs.tolist()out_tokens, out_logprobs = [], []for i, toks in enumerate(tokens.tolist()):# cut to max gen lenstart = 0 if echo else len(prompt_tokens[i])toks = toks[start : len(prompt_tokens[i]) + max_gen_len]probs = Noneif logprobs:probs = token_logprobs[i][start : len(prompt_tokens[i]) + max_gen_len]# cut to eos tok if anyif self.tokenizer.eos_id in toks:eos_idx = toks.index(self.tokenizer.eos_id)toks = toks[:eos_idx]probs = probs[:eos_idx] if logprobs else Noneout_tokens.append(toks)out_logprobs.append(probs)return (out_tokens, out_logprobs if logprobs else None)
最后,如果 logprobs 参数为真,则将 token_logprobs 转换为列表。然后,对于 tokens 张量中的每一行(即每一个生成的序列),如果 echo 参数为假,则去掉提示部分。然后,如果存在结束标记,则去掉结束标记之后的部分。最后,返回生成的 tokens 和对数概率(如果 logprobs 参数为真)。
这个函数返回的是一个元组,第一个元素是一个列表,包含每一个生成的 token 序列。第二个元素是一个列表,包含每一个生成的对数概率序列(如果 logprobs 参数为真)。
build构造函数
最后我们再说下构造Llama的部分:
@staticmethoddef build(ckpt_dir: str,tokenizer_path: str,max_seq_len: int,max_batch_size: int,model_parallel_size: Optional[int] = None,) -> "Llama":if not torch.distributed.is_initialized():torch.distributed.init_process_group("nccl")if not model_parallel_is_initialized():if model_parallel_size is None:model_parallel_size = int(os.environ.get("WORLD_SIZE", 1))initialize_model_parallel(model_parallel_size)local_rank = int(os.environ.get("LOCAL_RANK", 0))torch.cuda.set_device(local_rank)# seed must be the same in all processestorch.manual_seed(1)if local_rank > 0:sys.stdout = open(os.devnull, "w")start_time = time.time()checkpoints = sorted(Path(ckpt_dir).glob("*.pth"))assert len(checkpoints) > 0, f"no checkpoint files found in {ckpt_dir}"assert model_parallel_size == len(checkpoints), f"Loading a checkpoint for MP={len(checkpoints)} but world size is {model_parallel_size}"ckpt_path = checkpoints[get_model_parallel_rank()]checkpoint = torch.load(ckpt_path, map_location="cpu")with open(Path(ckpt_dir) / "params.json", "r") as f:params = json.loads(f.read())model_args: ModelArgs = ModelArgs(max_seq_len=max_seq_len,max_batch_size=max_batch_size,**params,)tokenizer = Tokenizer(model_path=tokenizer_path)model_args.vocab_size = tokenizer.n_wordstorch.set_default_tensor_type(torch.cuda.HalfTensor)model = Transformer(model_args)model.load_state_dict(checkpoint, strict=False)print(f"Loaded in {time.time() - start_time:.2f} seconds")return Llama(model, tokenizer)
虽然这么一大段,但其实都是一些初始化的工作。
-
分布式设置:首先,这段代码检查是否已经初始化了 PyTorch 的分布式环境,如果没有则进行初始化。然后,检查是否已经初始化了模型并行环境,如果没有,则获取环境变量 WORLD_SIZE 的值作为模型并行的大小,并进行初始化。
-
设备设置:获取环境变量 LOCAL_RANK 的值作为本地排名,并设置当前设备为该排名对应的 GPU。
-
随机种子设置:为了确保所有进程生成的随机数相同,设置随机种子为 1。
-
标准输出设置:如果本地排名大于 0,则将标准输出重定向到空设备,即不显示任何输出。
-
加载模型检查点:找到检查点目录中的所有检查点文件,并按照文件名排序。然后,根据模型并行的排名选择一个检查点文件,并加载该检查点。然后,加载模型参数。
-
构建模型和分词器:使用加载的模型参数和提供的 max_seq_len 和 max_batch_size 构建模型参数对象。然后,加载分词器,并设置模型参数的词汇表大小为分词器的词汇表大小。然后,设置默认的张量类型为半精度浮点型(以节省内存和计算资源)。然后,构建 Transformer 模型,并加载模型检查点。
-
最后,构建一个 Llama 对象,包含加载的模型和分词器,并返回该对象。
小结
本节我们学习了LLaMA 2的源码,包括补全函数text_completion和聊天函数chat_completion的实现,以及它们的真正实现generate函数的原理。我们还学习了温度temperature和top p采样的原理。
对于没有搞到深度学习生成的同学,可能有一点难度。
LLaMA的代码还差一部分就是模型的部分,我们放到下一节来讲。要不然知识点太多大家容易大脑缺氧:)