经典神经网络(12)Transformer代码详解及其在Multi30k数据集上的训练
-
论文链接:https://arxiv.org/pdf/1706.03762v2
-
原理可以参考:Self-Attention和Transformer
-
网络架构图如下:
1 Transformer编码器模块
1.1 Embedding+位置编码
- 在实际应用中,我们会事先预训练好各种embedding矩阵,这些embedding矩阵包含常用领域常用单词或字的向量化表示。
- 当模型预测时,接收到一个序列时,可以从对应的embedding矩阵里查找对应的词或字向量,最终把整句输入转换成对应的向量表示。
"""Reference:Post: http://nlp.seas.harvard.edu/annotated-transformerCode: https://github.com/harvardnlp/annotated-transformer
"""
import os
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax
import math
import copy
import pandas as pd
import altair as alt
import warningswarnings.filterwarnings("ignore")def clones(module, N):"Produce N identical layers."# 复制N层Block,例如:编码器由6个相同block组成return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()# 创建一个词嵌入层,参数为词汇表大小和词嵌入维度self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):# 通过词嵌入层将输入的单词编码为向量,并乘以词嵌入维度的平方根进行缩放# embedding matrix的初始化方式是xavier,这种方式的方差是1/embedding size# 因此乘以embedding size的开方使得embedding matrix的方差是1,在这个scale下更有利于embedding matrix的收敛return self.lut(x) * math.sqrt(self.d_model)
Transformer中绝对位置编码公式如下:
Transformer升级之路:1、Sinusoidal位置编码追根溯源
class PositionalEncoding(nn.Module):"Implement the PE function."def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)# Compute the positional encodings once in log space.pe = torch.zeros(max_len, d_model)# position shape = (max_len, 1)position = torch.arange(0, max_len).unsqueeze(1)# div_term shape = (d_model // 2)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))# 使用正弦和余弦函数生成位置编码,对于d_model的偶数索引,使用正弦函数;# 对于奇数索引,使用余弦函数# (position * div_term)的shape = (max_len, d_model // 2) pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer("pe", pe)def forward(self, x):x = x + self.pe[:, : x.size(1)].requires_grad_(False)return self.dropout(x)
解释下代码中公式: d i v = e 2 i ∗ − l o g ( 10000 ) / d m o d e l = e 2 i / d m o d e l ∗ − l o g ( 10000 ) = e l o g ( 1000 0 d m o d e l / 2 i ) = 1000 0 d m o d e l / 2 i = 1 / 1000 0 2 i / d m o d e l p o s i t i o n ∗ d i v = p o s / 1000 0 2 i / d m o d e l 解释下代码中公式:\\ div=e^{2i*-log(10000)/d_{model}}\\ =e^{2i/d_{model}*-log(10000)}\\ =e^{log(10000^{d_{model}/2i})}\\ =10000^{d_{model}/2i}\\ =1/10000^{2i/d_{model}}\\ position * div=pos/10000^{2i/d_{model}} 解释下代码中公式:div=e2i∗−log(10000)/dmodel=e2i/dmodel∗−log(10000)=elog(10000dmodel/2i)=10000dmodel/2i=1/100002i/dmodelposition∗div=pos/100002i/dmodel
1.2 Multi-Head Attention
1.2.1 多头注意力机制
多头注意力的原理可以参考:https://jalammar.github.io/illustrated-transformer/
class MultiHeadedAttention(nn.Module):# 输入模型的大小(d_model)和注意力头的数量(h)def __init__(self, h, d_model, dropout=0.1):"Take in model size and number of heads."super(MultiHeadedAttention, self).__init__()assert d_model % h == 0# 我们假设d_v(值向量的维度)总是等于 d_k(键向量的维度)self.d_k = d_model // h # 计算每个注意力头的维度self.h = h # 保存注意力头的数量self.linears = clones(nn.Linear(d_model, d_model), 4) # 4个线性层self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):"Implements Figure 2"if mask is not None:# 对所有h个头应用相同的 maskmask = mask.unsqueeze(1)nbatches = query.size(0)# 1) Do all the linear projections in batch from d_model => h x d_k# 批量执行从 d_model 到 h x d_k 的线性投影# (bs, seq_len, d_model) -> (bs, seq_len, h, d_k) -> (bs, h, seq_len, d_k)query, key, value = [lin(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)for lin, x in zip(self.linears, (query, key, value))]# 2) Apply attention on all the projected vectors in batch.# x shape = (bs, h, seq_len, d_k)x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)# 3) "Concat" using a view and apply a final linear.# 使用view函数进行“拼接concat”,然后做下Linear变换x = (x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k))del querydel keydel value# 4) 拼接后,再投影一次return self.linears[-1](x)
1.2.2 缩放点积注意力
下图左边部分是缩放点积注意力(Scaled Dot-Product Attention)的整体实现步骤
def attention(query, key, value, mask=None, dropout=None):"""计算缩放点积注意力:param query: shape = (bs, h, src_seq_len/tgt_seq_len, d_k):param key: shape = (bs, h, src_seq_len/tgt_seq_len, d_k):param value: shape = (bs, h, src_seq_len/tgt_seq_len, d_k):param mask: encoder的mask(忽略填充):shape=(bs, 1, 1, src_seq_len)decoder的mask(忽略填充+下三角矩阵):训练时shape=(bs, 1, tgt_seq_len, tgt_seq_len);推理时shape=(1, 1, change_size, change_size):param dropout: dropout比例:return: value的加权求和 shape=(bs, h, src_seq_len/tgt_seq_len, d_k)"""# 获取 query 向量的最后一个维度的大小,即词嵌入的维度d_k = query.size(-1)# 计算query和key的点积,并对结果进行缩放,以减少梯度消失或爆炸的可能性# query shape = (bs, h, seq_len, d_k)# key trans shape = (bs, h, d_k, seq_len)# scores shape = (bs, h, seq_len, seq_len)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)# 如果提供了 mask,根据mask对scores进行遮掩# 遮掩的具体方法就是设为一个很大的负数比如-1e9,从而softmax后 对应概率基本为0if mask is not None:# Fills elements of self tensor with value where mask is True.# The shape of mask must be 【broadcastable】 with the shape of the underlying tensor.scores = scores.masked_fill(mask == 0, -1e9)# 对 scores 进行 softmax 操作,得到注意力权重 p_attnp_attn = scores.softmax(dim=-1)# 如果提供了 dropout,对注意力权重 p_attn 进行 dropout 操作if dropout is not None:p_attn = dropout(p_attn)# 用注意力权重 p_attn 对 value 向量进行加权求和,得到最终的输出# (bs, h, seq_len, d_k)return torch.matmul(p_attn, value), p_attn
1.3 LayerNorm和FFN
- 通过对层的激活值的归一化,可以加速模型的训练过程,使其更快的收敛,编码时用 LayerNorm 函数实现
- FFN由简单的2层nn.Linear实现
class LayerNorm(nn.Module):"""构建一个层归一化(layernorm)模块"""# 初始化函数,接收features(特征维度大小)和eps(防止除以零的微小值)作为输入参数def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__() # 调用父类nn.Module的构造函数self.a_2 = nn.Parameter(torch.ones(features)) # 定义一个大小为features的一维张量,初始化为全1,并将其设置为可训练参数self.b_2 = nn.Parameter(torch.zeros(features)) # 定义一个大小为features的一维张量,初始化为全0,并将其设置为可训练参数self.eps = eps # 将防止除以零的微小值eps保存为类实例的属性# 定义前向传播函数,输入参数x是输入张量def forward(self, x):mean = x.mean(-1, keepdim=True) # 计算输入x在最后一个维度上的均值,保持输出结果的维度std = x.std(-1, keepdim=True) # 计算输入x在最后一个维度上的标准差,保持输出结果的维度# 对输入x进行层归一化,使用可训练参数a_2和b_2进行缩放和偏移,最后返回归一化后的结果return self.a_2 * (x - mean) / (std + self.eps) + self.b_2class PositionwiseFeedForward(nn.Module):"Implements FFN equation."def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()# 注意:这里d_ff = d_model * 4self.w_1 = nn.Linear(d_model, d_ff)self.w_2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w_2(self.dropout(self.w_1(x).relu()))
1.4 搭建Encoder
1.4.1 Sublayer
- 这里为了代码的简单性,先进行LayerNorm
- 即:由原始论文的
LayerNorm(x + Sublayer(x))
-> 修改为Sublayer(LayerNorm(x)) + x
- Sublayer层:MultiHead Attention/Masked MultiHead Attention 或 前馈神经网络FFN
class SublayerConnection(nn.Module):"""Sublayer层:MultiHead Attention/Masked MultiHead Attention 或 前馈神经网络FFNNote: 这里为了代码的简单性,先进行LayerNorm即:由原始论文的LayerNorm(x + Sublayer(x)) -> 修改为Sublayer(LayerNorm(x)) + x"""def __init__(self, size, dropout):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(dropout)def forward(self, x, sublayer):# 首先对输入x进行层归一化,然后执行子层操作(如self-attention或前馈神经网络)sub_x = sublayer(self.norm(x))# 接着应用dropout,最后将结果与原始输入x相加return x + self.dropout(sub_x)
1.4.2 EncoderLayer及Encoder
class EncoderLayer(nn.Module):"Encoder is made up of self-attn and feed forward (defined below)"def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__()self.self_attn = self_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 2)self.size = sizedef forward(self, x, mask):# 1、第一个子层# Sublayer(LayerNorm(x)) + x,其中Sublayer = Multi-Head Attention# Note:query、key、value的传参值均为encoder的xx = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))# 2、第二个子层# Sublayer(LayerNorm(x)) + x,其中Sublayer = FFNreturn self.sublayer[1](x, self.feed_forward)
# Encoder是由N=6个相同层组成
class Encoder(nn.Module):"Core encoder is a stack of N layers"def __init__(self, layer, N):super(Encoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, mask):"Pass the input (and mask) through each layer in turn."for layer in self.layers:x = layer(x, mask)return self.norm(x)
2 Transformer解码器模块
2.1 DecoderLayer
- Masked Multi-Head Self-attention和第一部分介绍的Multi-Head self-attention基本一致,区别在于加了个mask机制
- decoder的mask在训练时除了忽略填充,还要利用下三角矩阵屏蔽未来词汇
def subsequent_mask(size):"Mask out subsequent positions.屏蔽未来词汇"attn_shape = (1, size, size)# torch.triu : 上三角为1、下三角为0(上三角矩阵)# torch.tril : 上三角为0、下三角为1(下三角矩阵)subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)return subsequent_mask == 0
- DecoderLayer含有三个子层
class DecoderLayer(nn.Module):"Decoder is made of self-attn, src-attn, and feed forward (defined below)"def __init__(self, size, self_attn, src_attn, feed_forward, dropout):super(DecoderLayer, self).__init__()self.size = sizeself.self_attn = self_attnself.src_attn = src_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 3)def forward(self, x, memory, src_mask, tgt_mask):""":param x: 解码端输入,训练时shape=(bs, tgt_seq_len, embedding_dim); 推理时shape=(1, change_size, embedding_dim):param memory: 编码器输出, shape=(bs, src_seq_len, embedding_dim):param src_mask: 编码器mask, shape=(bs, 1, src_seq_len):param tgt_mask: 解码器mask(下三角矩阵), 训练时shape=(bs, tgt_seq_len, tgt_seq_len); 推理时shape=(1, change_size, change_size):return:"""m = memory# 1、第一个子层# Sublayer(LayerNorm(x)) + x,其中Sublayer = Masked Multi-Head Attention# Note:query、key、value的传参值均为decoder的xx = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))# 2、第二个子层# Sublayer(LayerNorm(x)) + x,其中Sublayer = Multi-Head Attention# Note:query的传参值为decoder的x;而key、value的传参值encoder输出mx = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))# 3、第三个子层# Sublayer(LayerNorm(x)) + x,其中Sublayer = FFNx = self.sublayer[2](x, self.feed_forward)return x
2.2 Decoder和Generator
- Decoder也是由N=6个相同层组成
- Generator将模型的输出维度映射到词汇表大小
class Decoder(nn.Module):"Generic N layer decoder with masking."def __init__(self, layer, N):super(Decoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, memory, src_mask, tgt_mask):for layer in self.layers:x = layer(x, memory, src_mask, tgt_mask)return self.norm(x)class Generator(nn.Module):# 初始化方法,接收模型维度(d_model)和词汇表大小(vocab)作为参数def __init__(self, d_model, vocab):super(Generator, self).__init__()# 定义一个线性层,将模型的输出维度映射到词汇表大小self.proj = nn.Linear(d_model, vocab)def forward(self, x):# 将输入x传入线性层,然后对输出应用log-softmax激活函数(在最后一个维度上)return log_softmax(self.proj(x), dim=-1)
2.3 封装Transformer模型
class EncoderDecoder(nn.Module):"""A standard Encoder-Decoder architecture. Base for this and many other models."""def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):super(EncoderDecoder, self).__init__()self.encoder = encoderself.decoder = decoder# 源输入的embedding层self.src_embed = src_embed# 目标输入的embedding层self.tgt_embed = tgt_embed# 将模型的输出维度映射到词汇表大小self.generator = generatordef forward(self, src, tgt, src_mask, tgt_mask):# 对源序列进行编码memory = self.encode(src, src_mask)# 将编码结果与掩码传递给解码器进行解码return self.decode(memory, src_mask, tgt, tgt_mask)def encode(self, src, src_mask):# 将源序列进行嵌入,然后将嵌入后的序列和源序列掩码传给编码器return self.encoder(self.src_embed(src), src_mask)def decode(self, memory, src_mask, tgt, tgt_mask):# 将目标序列进行嵌入,然后将嵌入后的序列、编码器输出、源序列掩码和目标序列掩码传给解码器return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)def make_model(src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):"Helper: Construct a model from hyperparameters."c = copy.deepcopy# 1、多头自注意力attn = MultiHeadedAttention(h, d_model)# 2、FFNff = PositionwiseFeedForward(d_model, d_ff, dropout)# 3、位置编码position = PositionalEncoding(d_model, dropout)# 4、EncoderDecodermodel = EncoderDecoder(Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),nn.Sequential(Embeddings(d_model, src_vocab), c(position)),nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),Generator(d_model, tgt_vocab),)# This was important from their code.# Initialize parameters with Glorot / fan_avg.for p in model.parameters():if p.dim() > 1:nn.init.xavier_uniform_(p)return model
2.4 执行前向预测
def inference_test():test_model = make_model(src_vocab=11, tgt_vocab=11, N=2)test_model.eval()# 注意:这里使用的是LongTensorsrc = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])src_mask = torch.ones(1, 1, 10)# 编码器输出 memory shape = (bs, seq_len, embedding_dim)memory = test_model.encode(src, src_mask)ys = torch.zeros(1, 1).type_as(src)for i in range(9):# 下三角矩阵作为tgt_masktgt_mask = subsequent_mask(ys.size(1)).type_as(src.data)# 1、解码器output_shape = (1, ys.size(1), 512)out = test_model.decode(memory, src_mask, ys, tgt_mask)# 2、解码器输出维度映射到目标词汇表大小# prob_shape = [1, tgt_vocab]prob = test_model.generator(out[:, -1])# 取出概率最大值作为预测输出_, next_word = torch.max(prob, dim=1)next_word = next_word.data[0]# 拼接到原ys后ys = torch.cat([ys, torch.empty(1, 1).type_as(src.data).fill_(next_word)], dim=1)print("Example Untrained Model Prediction:", ys)def run_tests():for _ in range(10):inference_test()if __name__ == '__main__':run_tests()
3 Transformer的训练
3.1 通过造一个数据集再次理解Transformer
- 为了方便查看模型训练细节,我们可以先自己造一个数据集进行训练
"""Reference:Post: http://nlp.seas.harvard.edu/annotated-transformerCode: https://github.com/harvardnlp/annotated-transformer
"""
import torch
# 这就是上面的模型代码所在文件
from transformer_model import subsequent_mask, make_modelclass Batch(object):"""Object for holding a batch of data with mask during training."""def __init__(self, src, tgt=None, pad=0):# encoder端序列 shape = (bs, src_seq_len)self.src = src# encoder端序列 mask shape = (bs, 1, src_seq_len)self.src_mask = (src != pad).unsqueeze(-2)if tgt is not None:# decoder端序列 shape = (bs, tgt_seq_len) 去掉最后一个词self.tgt = tgt[:, :-1]# decoder端序列标签 shape = (bs, tgt_seq_len) 去掉第一个词self.tgt_y = tgt[:, 1:]# decoder端序列 mask shape = (bs, tgt_seq_len, tgt_seq_len)self.tgt_mask = self.make_std_mask(self.tgt, pad)# 计算目标语言中非填充词的数量self.ntokens = (self.tgt_y != pad).data.sum()@staticmethoddef make_std_mask(tgt, pad):"Create a mask to hide padding and future words."# 1、创建目标语言的掩码,用于忽略填充部分tgt_mask = (tgt != pad).unsqueeze(-2)# 2、使用子掩码同时屏蔽未来词汇(使用下三角矩阵)tgt_mask = tgt_mask & subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)return tgt_maskdef data_gen(V, batch_size, nbatches):for i in range(nbatches):data = torch.randint(1, V, size=(batch_size, 10))data[:, 0] = 1src = data.requires_grad_(False).clone().detach()tgt = data.requires_grad_(False).clone().detach()yield Batch(src=src, tgt=tgt, pad=0)if __name__ == '__main__':V = 11# 1、创建模型model = make_model(src_vocab=V, tgt_vocab=V, N=2)# 2、测试data_iter = data_gen(V=V, batch_size=2, nbatches=1)for i, batch in enumerate(data_iter):src, tgt, src_mask, tgt_mask = batch.src, batch.tgt, batch.src_mask, batch.tgt_mask# out shape = (bs, tgt_seq_len, embedding_dim)out = model(src, tgt, src_mask, tgt_mask)print(out.shape)
3.2 Transformer在Multi30k数据集上的训练
3.2.1 预处理阶段:创建词汇表
"""Reference:Post: http://nlp.seas.harvard.edu/annotated-transformerCode: https://github.com/harvardnlp/annotated-transformer
"""
import os
import time
import torch
import spacy
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import LambdaLR
import torchtext.datasets as datasets
from torchtext.data.functional import to_map_style_dataset
from torchtext.vocab import build_vocab_from_iterator, Vocab
from torch.nn.functional import log_softmax, pad# 使用DDP进行分布式训练
from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.multiprocessing as mp# 导入Transformer模型
from transformer_model import make_model, subsequent_maskdef load_tokenizers():"""1、Python的spacy库是一个现代化的、工业级的NLP库,提供了快速、高效和易于使用的API,适用于构建各种NLP应用。spacy不仅支持多种语言,还包含丰富的预训练模型和工具,能够处理从分词、词性标注、命名实体识别到依存分析等任务。2、安装:pip install spacy==3.2.63、安装完成后,还需要下载预训练模型。下载英文模型的命令:python -m spacy download en_core_web_sm# 下载地址:https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.2.0/en_core_web_sm-3.2.0-py3-none-any.whl下载德语模型的命令:python -m spacy download de_core_news_sm下载地址:https://github.com/explosion/spacy-models/releases/download/de_core_news_sm-3.2.0/de_core_news_sm-3.2.0-py3-none-any.whl4、加载预训练模型后可以处理文本:return:"""try:spacy_de = spacy.load("de_core_news_sm")except IOError:os.system("python -m spacy download de_core_news_sm")spacy_de = spacy.load("de_core_news_sm")try:spacy_en = spacy.load("en_core_web_sm")except IOError:os.system("python -m spacy download en_core_web_sm")spacy_en = spacy.load("en_core_web_sm")return spacy_de, spacy_endef yield_tokens(data_iter, tokenizer, index):for from_to_tuple in data_iter:yield tokenizer(from_to_tuple[index])def build_vocabulary(spacy_de, spacy_en):def tokenize_de(text):return tokenize(text, spacy_de)def tokenize_en(text):return tokenize(text, spacy_en)print("Building German Vocabulary ...")train, val = datasets.Multi30k(root=r'D:\python\deepbule\kaggle\multi', split=('train', 'valid'), language_pair=("de", "en"))"""build_vocab_from_iterator函数用于从iterator中生成词汇表(Vocabulary)。它可以从iterator中读取token,根据min_freq参数来确定token是否添加到词汇表中,同时也可以添加特殊符号(specials)。函数的参数:iterator:用于生成词汇表的迭代器。迭代器中每个元素都是一系列 token。min_freq:词汇表中token的最小出现频率。如果一个token的出现频率低于这个值,它将不会被添加到词汇表中。specials:要添加的特殊符号。这些符号的顺序将被保留。special_first:指示是否将特殊符号添加到词汇表的开始还是结尾。函数的返回值是一个 torchtext.vocab.Vocab 对象。"""vocab_src = build_vocab_from_iterator(iterator=yield_tokens(train + val, tokenize_de, index=0),min_freq=2,specials=["<s>", "</s>", "<blank>", "<unk>"],special_first=True)print("Building English Vocabulary ...")train, val = datasets.Multi30k(root=r'D:\python\deepbule\kaggle\multi',split=('train', 'valid'), language_pair=("de", "en"))vocab_tgt = build_vocab_from_iterator(yield_tokens(train + val, tokenize_en, index=1),min_freq=2,specials=["<s>", "</s>", "<blank>", "<unk>"],special_first=True)# This index will be returned when OOV token is queried.vocab_src.set_default_index(vocab_src["<unk>"])vocab_tgt.set_default_index(vocab_tgt["<unk>"])return vocab_src, vocab_tgtdef load_vocab(spacy_de, spacy_en):if not os.path.exists("vocab.pt"):# 构建翻译任务源端、目标端的词典vocab_src, vocab_tgt = build_vocabulary(spacy_de, spacy_en)torch.save((vocab_src, vocab_tgt), "vocab.pt")else:vocab_src, vocab_tgt = torch.load("vocab.pt")print("Finished.\nVocabulary sizes:")print(f'len(vocab_src) = {len(vocab_src)}')print(f'len(vocab_tgt) = {len(vocab_tgt)}')return vocab_src, vocab_tgtif __name__ == '__main__':# 1、配置信息config = {"batch_size": 32,"distributed": False,"num_epochs": 8,"accum_iter": 10,"base_lr": 1.0,"max_padding": 72,"warmup": 3000,"file_prefix": "multi30k_model_",}# 2、加载tokenizers;构造德语、英语词典spacy_de, spacy_en = load_tokenizers()vocab_src, vocab_tgt = load_vocab(spacy_de, spacy_en)# 3、模型训练train_model(vocab_src, vocab_tgt, spacy_de, spacy_en, config)
3.2.2 训练三部曲:随机初始化、损失函数、反向传播
下面训练代码主要包含下面技巧:
- 梯度累加
- 标签平滑LabelSmoothing
- 自定义学习率调度器、其中含有warm_up阶段
- 使用DDP进行分布式训练
- 使用torchtext进行文本预处理、构建词典等
- 利用spacy库加载的德语、英语预训练模型进行分词等
class TrainState:"""记录训练状态"""step: int = 0 # Steps in the current epochaccum_step: int = 0 # Number of gradient accumulation stepssamples: int = 0 # total of examples usedtokens: int = 0 # total of tokens processed# 定义一个函数run_epoch,用于训练一个epoch的过程
def run_epoch(data_iter, model, loss_compute, optimizer, scheduler, mode="train", accum_iter=1, train_state=TrainState()):"""Train a single epoch"""start = time.time() # 记录训练开始时间total_tokens = 0 # 初始化总token数total_loss = 0 # 初始化总损失tokens = 0 # 初始化当前批次的token数n_accum = 0 # 初始化累积步数# 遍历数据集中的每个批次for i, batch in enumerate(data_iter):# 对每个批次进行前向传播out = model(batch.src, batch.tgt, batch.src_mask, batch.tgt_mask)# 计算每个批次的损失loss, loss_node = loss_compute(out, batch.tgt_y, batch.ntokens)if mode == "train" or mode == "train+log":loss_node.backward() # 反向传播train_state.step += 1 # 更新state中的训练步数train_state.samples += batch.src.shape[0] # 更新state中的样本数train_state.tokens += batch.ntokens # 更新state中的token数# 每accum_iter次更新一下梯度,即【梯度累加】if i % accum_iter == 0:optimizer.step() # 参数更新optimizer.zero_grad(set_to_none=True) # 梯度清零n_accum += 1 # 更新累积步数train_state.accum_step += 1 # 更新state中的累积训练步数scheduler.step() # 更新学习率# 累加损失total_loss += loss # 累加损失total_tokens += batch.ntokens # 累加token数tokens += batch.ntokens # 更新当前批次的token数if i % 40 == 1 and (mode == "train" or mode == "train+log"):lr = optimizer.param_groups[0]["lr"] # 获取当前学习率elapsed = time.time() - start # 计算时间间隔print(("Epoch Step: %6d | Accumulation Step: %3d | Loss: %6.2f "+ "| Tokens / Sec: %7.1f | Learning Rate: %6.1e") % (i, n_accum, loss / batch.ntokens, tokens / elapsed, lr) # 打印训练信息)start = time.time() # 重新记录时间tokens = 0 # 重置当前批次的token数del loss # 释放损失值的内存del loss_node # 释放损失节点的内存# 返回平均损失、训练状态train_statereturn total_loss / total_tokens, train_stateclass SimpleLossCompute(object):"A simple loss compute and train function."def __init__(self, generator, criterion):# 将模型的输出维度映射到词汇表大小self.generator = generator# criterion 为标签平滑(LabelSmoothing)方法self.criterion = criteriondef __call__(self, x, y, norm):# 1、模型的输出维度映射到词汇表大小x = self.generator(x)# x对对应于out,也就是预测的时刻 [batch_size,max_length-1,vocab_size]# y对应于tgt_y,也就是t时刻 [batch_size,max_length-1]# norm为batch.ntokens,即目标语言非填充词数量# x.contiguous().view(-1, x.size(-1)) -> [batch_size*(max_length-1), vocab_size]# y.contiguous().view(-1) -> [batch_size*(max_length-1)]sloss = (self.criterion(x.contiguous().view(-1, x.size(-1)), y.contiguous().view(-1)) / norm)return sloss.data * norm, slossclass LabelSmoothing(nn.Module):def __init__(self, size, padding_idx, smoothing=0.1):"""Implement label smoothing:param size: 目标语言词典大小:param padding_idx: 填充索引:param smoothing: 平滑值"""super(LabelSmoothing, self).__init__()# KL散度损失self.criterion = nn.KLDivLoss(reduction="sum")self.padding_idx = padding_idxself.confidence = 1.0 - smoothing # 置信度self.smoothing = smoothing # 平滑系数self.size = sizeself.true_dist = Nonedef forward(self, x, target):"""计算标签平滑损失:通过在label中添加噪声,从而实现对模型的约束。ont-hot标签:0 1 0 0 00 0 1 0 0平滑标签:0.0250, 0.9000, 0.0250, 0.0250, 0.02500.0250, 0.0250, 0.9000, 0.0250, 0.0250目的是:防止模型在训练时过于自信地预测标签,防止过拟合,提高模型的泛化能力。标签平滑的提出主要是为了解决传统中one-hot标签形式存在的问题假设现在是一个5分类的任务,文本的类别标签(y)是鸟,损失函数使用的是分类任务中常用的交叉熵损失函数,最终的loss值就只和y为1的那一维度有关。这样会造成一些问题:问题一:在神经网络学习的过程中,鼓励模型预测为目标类别的概率趋近1,非目标类别的概率趋近0,使得模型向预测正确与错误标签的logit差值无限增大的方向学习,而过大的logit差值会使模型缺乏适应性,对它的预测过于自信。在训练数据不足以覆盖所有情况下,这就会导致网络过拟合,泛化能力差。问题二:面对易混淆的分类任务、有噪音(误打标)的数据集时,更容易受影响:param x: 模型的预测输出:param target: 真实的标签:return: 平滑损失"""assert x.size(1) == self.sizetrue_dist = x.data.clone()# 在每个样本所有预测的类别都填充为平滑值# 一般为填充为:smoothing / (类别数 - 1),这里减2是因为含有词典中含有pad,我们把pad位置的平滑值设置为0true_dist.fill_(self.smoothing / (self.size - 2))# 在每个样本真实类别的位置上设置为confidence# pytorch连接:https://pytorch.org/docs/stable/generated/torch.Tensor.scatter_.html# scatter_函数可以参考:https://blog.csdn.net/lifeplayer_/article/details/111561685true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)# 将padding_idx位置处的平滑值设置为0true_dist[:, self.padding_idx] = 0mask = torch.nonzero(target.data == self.padding_idx)if mask.dim() > 0:true_dist.index_fill_(0, mask.squeeze(), 0.0)self.true_dist = true_dist# 计算KL散度损失return self.criterion(x, true_dist.clone().detach())def rate(step, model_size, factor, warmup):"""we have to default the step to 1 for LambdaLR function to avoid zero raising to negative power.model_size ** (-0.5)是定值C当step < warmup时,step ** (-0.5) > step * warmup ** (-1.5),此时lr = C * step * warmup(定值) ** (-1.5)当step = warmup时,step ** (-0.5) = step * warmup ** (-1.5), 此时lr = C * warmup(定值) ** (-0.5)当step > warmup时,step ** (-0.5) < step * warmup ** (-1.5), 此时lr = C * step ** (-0.5)"""if step == 0:step = 1return factor * (model_size ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5)))def collate_batch(batch, src_pipeline, tgt_pipeline, src_vocab: Vocab, tgt_vocab: Vocab,device, max_padding=128, pad_id=2):bs_id = torch.tensor([0], device=device) # <s> token ideos_id = torch.tensor([1], device=device) # </s> token idsrc_list, tgt_list = [], []for (_src, _tgt) in batch:# 开头加bs_id、结尾加eos_idprocessed_src = torch.cat([bs_id,# tokenize + token2idtorch.tensor(src_vocab(src_pipeline(_src)), dtype=torch.int64, device=device),eos_id,], dim=0)processed_tgt = torch.cat([bs_id,# tokenize + token2idtorch.tensor(tgt_vocab(tgt_pipeline(_tgt)), dtype=torch.int64, device=device),eos_id,],dim=0)src_list.append(# warning - overwrites values for negative values of padding - lenpad(processed_src,(0, max_padding - len(processed_src)),value=pad_id,) # 使用pad函数将序列数据填充到相同的长度,以便于进行批量处理)tgt_list.append(pad(processed_tgt,(0, max_padding - len(processed_tgt)),value=pad_id,) # 使用pad函数将序列数据填充到相同的长度,以便于进行批量处理)# src shape = (bs, max_padding); tgt shape = (bs, max_padding)src = torch.stack(src_list)tgt = torch.stack(tgt_list)return (src, tgt)def tokenize(text, tokenizer):return [tok.text for tok in tokenizer.tokenizer(text)]def create_dataloaders(device, vocab_src, vocab_tgt, spacy_de, spacy_en, batch_size=12000, max_padding=128, is_distributed=False):# 对德语进行分词def tokenize_de(text):return tokenize(text, spacy_de)# 对英文进行分词def tokenize_en(text):return tokenize(text, spacy_en)def collate_fn(batch):return collate_batch(batch,tokenize_de,tokenize_en,vocab_src, # torchtext.vocab.Vocab对象vocab_tgt, # torchtext.vocab.Vocab对象device,max_padding=max_padding,# vocab.get_stoi() 获取词元到索引到映射(字典),vocab.get_itos() 获取索引到词元的映射(列表)pad_id=vocab_src.get_stoi()["<blank>"],)# torchtext是一个用于文本预处理的库# torchtext提供了很多常用的文本数据集,可以直接加载使用# language_pair:包含源语言和目标语言的元组或列表。可用的选项是('de','en')和('en','de')# 可从此地址手动下载:https://github.com/neychev/small_DL_repo/tree/master/datasets/Multi30ktrain_iter, valid_iter = datasets.Multi30k(root=r'D:\python\deepbule\kaggle\multi', split=('train', 'valid'), language_pair=("de", "en"))# 将一个iterable-style数据集通过to_map_style_dataset转换为一个易于操作的map-style数据集# 我们可以通过索引直接访问数据集中的特定样本train_iter_map = to_map_style_dataset(train_iter)"""分布式训练的本质是把数据等分成多份,分别在不同的GPU上面训练,训练完毕后再合成。所以在分布式训练时我们需要使用DistributedSampler封装数据,保证数据平均分到不同的GPU上面。例如:有100份数据,用4张卡训练,则每张卡分到25份数据。如果不加DistributedSampler也能训练,只不过此时每张卡就是拥有全部数据了,也就是4张卡都有100份相同的数据。"""train_sampler = (DistributedSampler(train_iter_map) if is_distributed else None)valid_iter_map = to_map_style_dataset(valid_iter)valid_sampler = (DistributedSampler(valid_iter_map) if is_distributed else None)train_dataloader = DataLoader(train_iter_map,batch_size=batch_size,shuffle=(train_sampler is None), # DDP分布式训练时,不需要设置shufflesampler=train_sampler, # DDP分布式训练时,需要samplercollate_fn=collate_fn,)valid_dataloader = DataLoader(valid_iter_map,batch_size=batch_size,shuffle=(valid_sampler is None), # 分布式训练时,不需要设置shufflesampler=valid_sampler, # DDP分布式训练时,需要samplercollate_fn=collate_fn,)return train_dataloader, valid_dataloaderclass DummyOptimizer(torch.optim.Optimizer):def __init__(self):self.param_groups = [{"lr": 0}]Nonedef step(self):Nonedef zero_grad(self, set_to_none=False):Noneclass DummyScheduler:def step(self):Noneclass Batch(object):"""Object for holding a batch of data with mask during training."""def __init__(self, src, tgt=None, pad=0):# encoder端序列 shape = (bs, src_seq_len)self.src = src# encoder端序列 mask shape = (bs, 1, src_seq_len)self.src_mask = (src != pad).unsqueeze(-2)if tgt is not None:# decoder端序列 shape = (bs, tgt_seq_len) 去掉最后一个词self.tgt = tgt[:, :-1]# decoder端序列标签 shape = (bs, tgt_seq_len) 去掉第一个词self.tgt_y = tgt[:, 1:]# decoder端序列 mask shape = (bs, tgt_seq_len, tgt_seq_len)self.tgt_mask = self.make_std_mask(self.tgt, pad)# 计算目标语言中非填充词的数量self.ntokens = (self.tgt_y != pad).data.sum()@staticmethoddef make_std_mask(tgt, pad):"Create a mask to hide padding and future words."# 1、创建目标语言的掩码,用于忽略填充部分tgt_mask = (tgt != pad).unsqueeze(-2)# 2、使用子掩码同时屏蔽未来词汇(使用下三角矩阵)tgt_mask = tgt_mask & subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)return tgt_maskdef train_worker(gpu, ngpus_per_node, vocab_src, vocab_tgt, spacy_de, spacy_en, config, is_distributed=False):"""模型训练:param gpu: GPU:param ngpus_per_node: 每个节点的gpu个数:param vocab_src: 源端的torchtext.vocab.Vocab(词典)对象:param vocab_tgt: 目标端的torchtext.vocab.Vocab(词典)对象:param spacy_de: 利用spacy库加载的德语预训练模型:param spacy_en: 利用spacy库加载的英语预训练模型:param config: 训练相关配置信息:param is_distributed: 是否利用DDP进行分布式训练:return:"""print(f"Train worker process using GPU: {gpu} for training", flush=True)torch.cuda.set_device(gpu)pad_idx = vocab_tgt["<blank>"] # 获取pad的索引d_model = 512# 创建Transformer模型model = make_model(len(vocab_src), len(vocab_tgt), N=6)model.cuda(gpu)module = modelis_main_process = True# DDP分布式训练if is_distributed:# 初始化进程组,这个函数用来告诉GPU你是第几个进程和总的进程数# backend:进程间通信方式。使用nccl就好了# world_size:每个节点总的进程数# rank:进程编号,当前进程是第几号进程dist.init_process_group("nccl", init_method="env://", rank=gpu, world_size=ngpus_per_node)# 在函数中使用pytorch自带的DDP类封装我们的模型,通过这个操作告诉pytorch我们的模型是一个分布式模型model = DDP(model, device_ids=[gpu])module = model.moduleis_main_process = gpu == 0criterion = LabelSmoothing(size=len(vocab_tgt), padding_idx=pad_idx, smoothing=0.1)criterion.cuda(gpu)# 获取dataloadertrain_dataloader, valid_dataloader = create_dataloaders(gpu,vocab_src,vocab_tgt,spacy_de,spacy_en,batch_size=config["batch_size"] // ngpus_per_node,max_padding=config["max_padding"],is_distributed=is_distributed,)# Adam优化器optimizer = torch.optim.Adam(model.parameters(), lr=config["base_lr"], betas=(0.9, 0.98), eps=1e-9)# 学习率调度器lr_scheduler = LambdaLR(optimizer=optimizer,lr_lambda=lambda step: rate(step, d_model, factor=1, warmup=config["warmup"]),)train_state = TrainState()for epoch in range(config["num_epochs"]):# 分布式训练需要在每个周期开始处、调用sampler.set_epoch(epoch)可以使得数据充分打乱if is_distributed:train_dataloader.sampler.set_epoch(epoch)valid_dataloader.sampler.set_epoch(epoch)model.train()print(f"[GPU{gpu}] Epoch {epoch} Training ====", flush=True)_, train_state = run_epoch((Batch(b[0], b[1], pad_idx) for b in train_dataloader),model,SimpleLossCompute(module.generator, criterion),optimizer,lr_scheduler,mode="train+log",accum_iter=config["accum_iter"],train_state=train_state,)if is_main_process:file_path = "%s%.2d.pt" % (config["file_prefix"], epoch)torch.save(module.state_dict(), file_path)torch.cuda.empty_cache()print(f"[GPU{gpu}] Epoch {epoch} Validation ====", flush=True)model.eval()sloss = run_epoch((Batch(b[0], b[1], pad_idx) for b in valid_dataloader),model,SimpleLossCompute(module.generator, criterion),DummyOptimizer(),DummyScheduler(),mode="eval",)print(sloss)torch.cuda.empty_cache()if is_main_process:file_path = "%sfinal.pt" % config["file_prefix"]torch.save(module.state_dict(), file_path)def train_distributed_model(vocab_src, vocab_tgt, spacy_de, spacy_en, config):ngpus = torch.cuda.device_count()# 指明主机的IP和端口os.environ["MASTER_ADDR"] = "localhost"os.environ["MASTER_PORT"] = "12356"print(f"Number of GPUs detected: {ngpus}")print("Spawning training processes ...")"""mp.spawn是pytorch内置的多进程创建程序和python自带的Process差不多。使用mp.spwan后会启动多个进程进行分布式训练,一个进程就是一个GPU。mp.spawn的主要参数如下:训练函数:mp.spawn的第一个参数为训练函数,训练函数会在每个GPU上面单独执行。每个GPU都会有单独的模型,优化器,损失函数和Dataloader,互不干扰。一般而言训练函数的格式为train_worker(gpu, args)。nporcs:当前节点的进程数,其实就是当前节点的gpu数。args:用于向训练函数train_worker(gpu, args)传参不过需要注意的是训练函数的第一个参数gpu会自动获取,所以我们只需要传第二个参数args"""mp.spawn(train_worker,nprocs=ngpus,args=(ngpus, vocab_src, vocab_tgt, spacy_de, spacy_en, config, True) # 注意这里没有传参数gpu,会自动获取)def train_model(vocab_src, vocab_tgt, spacy_de, spacy_en, config):if config["distributed"]:# 利用DDP进行分布式训练train_distributed_model(vocab_src, vocab_tgt, spacy_de, spacy_en, config)else:train_worker(gpu=0, ngpus_per_node=1, vocab_src=vocab_src, vocab_tgt=vocab_tgt,spacy_de=spacy_de, spacy_en=spacy_en, config=config, is_distributed=False)
4 Transformer相关问题总结
1、Transformer为何使用多头注意力机制?(为什么不使用一个头)
- 多头保证了transformer可以注意到不同子空间的信息,捕捉到更加丰富的特征信息。可以类比CNN中同时使用多个滤波器的作用。
- 具体可参考:https://www.zhihu.com/question/341222779
2、Transformer为什么Q和K使用不同的权重矩阵生成,为何不能使用同一个值进行自身的点乘?
- K和Q的点乘是为了得到一个attention score 矩阵,用来对V进行加权平均。K和Q使用了不同的 W K , W Q W_K, W_Q WK,WQ来计算,可以理解为是在不同空间上的投影。正因为有了这种不同空间的投影,增加了表达能力,这样计算得到的attention score矩阵的泛化能力更高。
- 具体可参考:https://www.zhihu.com/question/319339652
3、为什么在进行softmax之前需要对attention进行scaled(为什么除以dk的平方根)?
-
这取决于softmax函数的特性,如果softmax内计算数的数量级太大,会输出近似one-hot编码的形式,导致梯度消失的问题,所以需要scale。
-
我们还可以从另一角度来分析:
- 假设q和k各分量都是服从期望为0,方差为1的独立的随机变量,那么 q k qk qk的均值为0,方差为 d k d_k dk;
我们知道对于 n 个独立同分布的随机变量 X 1 , X 2 , . . . , X n ,对于他们的和 S n = X 1 + X 2 + . . . + X n 期望 E [ S n ] = n μ , 方差 V ( S n ) = n σ 2 则 E ( q k ) = d k E [ q i k i ] = d k ∗ 0 = 0 V ( q k ) = d k V a r [ q i k i ] = d k ∗ 1 = d k 我们知道对于n个独立同分布的随机变量X_1,X_2,...,X_n,对于他们的和S_n = X_1+X_2+...+X_n \\ 期望E[S_n] = n\mu ,方差V(S_n)=n\sigma^2 \\ 则E(qk)=d_kE[q_ik_i]=d_k*0=0\\ V(qk)=d_kVar[q_ik_i]=d_k*1=d_k 我们知道对于n个独立同分布的随机变量X1,X2,...,Xn,对于他们的和Sn=X1+X2+...+Xn期望E[Sn]=nμ,方差V(Sn)=nσ2则E(qk)=dkE[qiki]=dk∗0=0V(qk)=dkVar[qiki]=dk∗1=dk
下面是两个随机变量乘积的均值和方差的推导过程:
设 X , Y 两个随机变量是独立的,且均值为 0 ,方差为 1 。 乘积的均值可以计算为: E ( X Y ) = E ( X ) E ( Y ) = 0 ∗ 0 = 0 那么它们的协方差为 0 ,乘积的方差可以计算为: V a r ( X Y ) = E ( X 2 ) E ( Y 2 ) − [ E ( X ) ] 2 [ E ( Y ) ] 2 = E ( X 2 − 0 2 ) E ( Y 2 − 0 2 ) − [ E ( X ) E ( Y ) ] 2 = E ( X 2 − [ E ( X ) 2 ] ) E ( Y 2 − E ( Y ) 2 ) − 0 = [ E ( X 2 ) − [ E ( X ) 2 ] ] [ E ( Y 2 ) − [ E ( Y ) 2 ] ] − 0 均值和方差之间有着紧密的联系,它们之间的关系公式如下: V a r ( X ) = E [ ( X − μ ) 2 ] = E [ X 2 ] − μ 2 = E [ X 2 ] − [ E ( X ) 2 ] V a r ( Y ) = E [ ( Y − μ ) 2 ] = E [ Y 2 ] − μ 2 = E [ Y 2 ] − [ E ( Y ) 2 ] 因此 , V a r ( X Y ) = V a r ( X ) V a r ( Y ) − 0 = 1 ∗ 1 − 0 = 1 因此 E ( X Y ) = 0 , V a r ( X Y ) = 1 , 即 E ( q i k i ) = 0 , V a r ( q i k i ) = 1 设X,Y两个随机变量是独立的,且均值为0,方差为1。\\ 乘积的均值可以计算为:\\ E(XY)=E(X)E(Y)=0*0=0\\ 那么它们的协方差为0,乘积的方差可以计算为:\\ Var(XY)=E(X^2)E(Y^2)-[E(X)]^2[E(Y)]^2\\ =E(X^2-0^2)E(Y^2-0^2)-[E(X)E(Y)]^2\\ =E(X^2-[E(X)^2])E(Y^2-E(Y)^2)-0\\ =[E(X^2)-[E(X)^2]][E(Y^2)-[E(Y)^2]]-0\\ 均值和方差之间有着紧密的联系,它们之间的关系公式如下:\\ Var(X) = E[(X - \mu)^2] = E[X^2] - \mu^2=E[X^2] - [E(X)^2]\\ Var(Y) = E[(Y - \mu)^2] = E[Y^2] - \mu^2=E[Y^2] - [E(Y)^2]\\ 因此,Var(XY)=Var(X)Var(Y)-0\\ =1*1-0\\ =1\\ 因此E(XY)=0,Var(XY)=1,即E(q_ik_i)=0,Var(q_ik_i)=1 设X,Y两个随机变量是独立的,且均值为0,方差为1。乘积的均值可以计算为:E(XY)=E(X)E(Y)=0∗0=0那么它们的协方差为0,乘积的方差可以计算为:Var(XY)=E(X2)E(Y2)−[E(X)]2[E(Y)]2=E(X2−02)E(Y2−02)−[E(X)E(Y)]2=E(X2−[E(X)2])E(Y2−E(Y)2)−0=[E(X2)−[E(X)2]][E(Y2)−[E(Y)2]]−0均值和方差之间有着紧密的联系,它们之间的关系公式如下:Var(X)=E[(X−μ)2]=E[X2]−μ2=E[X2]−[E(X)2]Var(Y)=E[(Y−μ)2]=E[Y2]−μ2=E[Y2]−[E(Y)2]因此,Var(XY)=Var(X)Var(Y)−0=1∗1−0=1因此E(XY)=0,Var(XY)=1,即E(qiki)=0,Var(qiki)=1
- 假设q和k各分量都是服从期望为0,方差为1的独立的随机变量,那么 q k qk qk的均值为0,方差为 d k d_k dk;
-
从统计学计算,【若让 q k qk qk点积的方差控制在1,则需要将其除以 d k d_k dk的平方根】,使得softmax更加平滑
V ( q k d k ) = d k ( d k ) 2 = 1 V(\frac{qk}{\sqrt{d_k}})=\frac{d_k}{(\sqrt{d_k})^2}=1 V(dkqk)=(dk)2dk=1
4、在计算attention score的时候如何对padding做mask操作?
-
padding位置置为负无穷(一般来说-1000就可以)
-
def attention(query, key, value, mask=None, dropout=None):"Compute 'Scaled Dot Product Attention'"d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:# 将mask位置置为-1e9scores = scores.masked_fill(mask == 0, -1e9)# 经过softmax,注意力就变为0p_attn = scores.softmax(dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attn
5、为何在获取输入词向量之后需要对矩阵乘以embedding size的开方?意义是什么?
-
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()# 创建一个词嵌入层,参数为词汇表大小和词嵌入维度self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):# 通过词嵌入层将输入的单词编码为向量,并乘以词嵌入维度的平方根return self.lut(x) * math.sqrt(self.d_model)
-
embedding matrix的初始化方式是xavier,这种方式的方差是1/embedding size,因此乘以embedding size的开方使得embedding matrix的方差是1,在这个scale下更有利于embedding matrix的收敛。
6、简单介绍一下Transformer的位置编码?有什么意义和优缺点?
- 因为self-attention是位置无关的,无论句子的顺序是什么样的,通过self-attention计算的token的hidden embedding都是一样的,这显然不符合人类的思维。因此要有一个办法能够在模型中表达出一个token的位置信息,transformer使用了固定的positional encoding来表示token在句子中的绝对位置信息。
7、你还了解哪些关于位置编码的技术,各自的优缺点是什么?
-
如下代码所示,Transformer使用了基于正弦余弦的1d绝对位置编码、ViT使用了可学习的位置编码、MAE中使用了基于正弦余弦的2d绝对位置编码,是在 x, y 方向上分别独立进行绝对位置编码
-
import torch import torch.nn as nn# 1、Transformer def create_1d_absolute_sincos_embeddings(n_pos_vec, dim):# n_pos_vec: torch.arange(n_pos)# 初始化position_embeddingassert dim % 2 == 0, "wrong dimension"position_embedding = torch.zeros(n_pos_vec.numel(), dim, dtype=torch.float)omega = torch.arange(dim // 2, dtype=torch.float)omega = 2. * omega / dimomega = 1.0 / (10000 ** omega)out = n_pos_vec[:, None] @ omega[None, :] # shape = (n_pos, dim // 2)position_embedding_sin = torch.sin(out)position_embedding_cos = torch.cos(out)# 赋值position_embedding[:, 0::2] = position_embedding_sinposition_embedding[:, 1::2] = position_embedding_cosreturn position_embedding# 2、ViT def create_1d_absolute_learnable_embeddings(n_pos_vec, dim):position_embedding = nn.Embedding(n_pos_vec.numel(), dim)# 初始化nn.init.constant_(position_embedding.weight, 0.)return position_embedding# 3、MAE def create_2d_absolute_sincos_embeddings(height, width, dim):assert dim % 4 == 0, "wrong dimension"position_embedding = torch.zeros(height*width, dim, dtype=torch.float)coords = torch.stack(torch.meshgrid(torch.arange(height, dtype=torch.float),torch.arange(width, dtype=torch.float))) # [2, height, width]height_embedding = create_1d_absolute_sincos_embeddings(torch.flatten(coords[0]), dim// 2)width_embedding = create_1d_absolute_sincos_embeddings(torch.flatten(coords[1]), dim// 2)position_embedding[:, :dim // 2] = height_embeddingposition_embedding[:, dim // 2:] = width_embeddingreturn position_embeddingif __name__ == '__main__':n_pos_vec, dim = torch.arange(4, dtype=torch.float), 4create_1d_absolute_sincos_embeddings(n_pos_vec, dim)create_1d_absolute_learnable_embeddings(n_pos_vec, dim)create_2d_absolute_sincos_embeddings(height=2, width=2, dim=dim)
8、为什么transformer块使用LayerNorm而不是BatchNorm?LayerNorm在Transformer的位置是哪里
- LN:针对每个样本序列进行Norm,没有样本间的依赖。对一个序列的不同特征维度进行Norm
- CV使用BN是认为channel维度的信息对cv方面有重要意义,如果对channel维度也归一化会造成不同通道信息一定的损失。而同理nlp领域认为句子长度不一致,并且各个batch的信息没什么关系,因此只考虑句子内信息的归一化,也就是LN。
9、简答讲一下BatchNorm技术,以及它的优缺点。
-
优点:
-
- 第一个就是可以解决内部协变量偏移,简单来说训练过程中,各层分布不同,增大了学习难度,BN缓解了这个问题。当然后来也有论文证明BN有作用和这个没关系,而是可以使损失平面更加的平滑,从而加快的收敛速度。
- 第二个优点就是缓解了梯度饱和问题(如果使用sigmoid激活函数的话),加快收敛。
-
缺点:
-
- 第一个,batch_size较小的时候,效果差。这一点很容易理解。BN的过程,使用整个batch中样本的均值和方差来模拟全部数据的均值和方差,在batch_size 较小的时候,效果肯定不好。
- 第二个缺点就是 BN 在RNN中效果比较差。
10、Decoder阶段的多头自注意力和encoder的多头自注意力有什么区别?(为什么需要decoder自注意力需要进行 sequence mask)
- 让输入序列只看到过去的信息,不能让看到未来的信息
11、Transformer的并行化提现在哪个地方?Decoder端可以做并行化吗?
- Encoder侧:模块之间是串行的,一个模块计算的结果做为下一个模块的输入,互相之前有依赖关系。从每个模块的角度来说,注意力层和前馈神经层这两个子模块单独来看都是可以并行的,不同单词之间是没有依赖关系的。
- Decode引入sequence mask就是为了并行化训练,Decoder推理过程没有并行,只能一个一个的解码,很类似于RNN,这个时刻的输入依赖于上一个时刻的输出。