课程链接:Mindspore 技术公开课
Transformer 论文地址,建议看完课程以后简单看看论文
前言
Transformer是一种神经网络结构,由Vaswani等人在2017年的论文“Attention Is All You Need” 中提出,用于处理机器翻译、语言建模和文本生成等自然语言处理任务。
Transformer 与传统 NLP 特征提取类模型的区别主要在以下两点:
- Transformer 是一个纯基于注意力机制的结构,并将自注意力机制和多头注意力机制的概念运用到模型中;
- 由于缺少RNN模型的时序性,Transformer引入了位置编码,在数据上而非模型中添加位置信息;
以上的处理带来了几个优点:
- 更容易并行化,训练更加高效;
- 在处理长序列的任务中表现优秀,可以快速捕捉长距离中的关联信息。
注意力机制 (Attention)
如同阅读时,视线只会集中在正在阅读的部分;自然语言处理中,根据任务内容的不同,句子中需要更加关注的部分也会不同。
注意力机制便是在判断 词在句子中的重要性,我们通过 注意力分数 来表达某个词在句子中的重要性,分数越高,说明该词对完成该任务的重要性越大。
计算注意力分数时,我们主要参考三个因素:query、key 和 value。
query
:任务内容key
:索引/标签(帮助定位到答案)value
:答案
在上面的例子中,如“情感分类”、“电影名字”、“中译英”等为 query
,每次对于任务内容的回答即为 value
。至于什么是 key
, 用一个比较直观的举例来说,每次登录视频网站搜索视频时,搜索的内容为 query
,搜索结果中显示的视频名称为 key
,它与任务内容相关,并可以引导我们至具体的视频内容(value
)。
一般在文本翻译中,我们希望翻译后的句子的意思和原始句子相似,所以进行注意力分数计算时, query
一般和目标序列,即翻译后的句子有关,key
则与源序列,即翻译前的原始句子有关。
计算注意力分数,即为计算 query
与 key
的相似度。常用的计算注意力分数的方式有两种:additive attention
和 scaled dot-product attention
。
以 scaled dot-product attention
为例,在几何角度,点积(dot product)表示一个向量在另一个向量方向上的投影。换句话说,从几何角度上解读,点积代表了某个向量和另一个向量的相似程度。
将这个概念运用到当前的情境中,想要求 query
和 key
之间有多少是相似的,则需要计算 query
和 key
的点积。同时,为了避免 query
( Q ∈ R n × d m o d e l Q \in R^{n\times d_{model}} Q∈Rn×dmodel) 和 key
( K ∈ R m × d m o d e l K \in R^{m\times d_{model}} K∈Rm×dmodel) 本身的 “大小” 影响到相似度的计算,我们需要在点乘后除以 d m o d e l \sqrt{d_{model}} dmodel
Attention Score ( Q , K ) = Q K T d m o d e l \text{Attention Score}(Q, K)=\frac{QK^T}{\sqrt{d_{model}}} Attention Score(Q,K)=dmodelQKT
我们将该相似度的区间限制与 0 到 1 之间,并令其作用在 value
上。
Attention ( Q , K , V ) = softmax ( Q K T d m o d e l ) V \text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_{model}}}\right)V Attention(Q,K,V)=softmax(dmodelQKT)V
代码如下:
import mindspore
from mindspore import nn
from mindspore import ops
from mindspore import Tensor
from mindspore import dtype as mstypeclass ScaledDotProductAttention(nn.Cell):def __init__(self, dropout_p=0.):super().__init__()self.softmax = nn.Softmax()self.dropout = nn.Dropout(1-dropout_p)self.sqrt = ops.Sqrt()def construct(self, query, key, value, attn_mask=None):"""scaled dot product attention"""embed_size = query.shape[-1]scaling_factor = self.sqrt(Tensor(embed_size, mstype.float32))attn = ops.matmul(query, key.swapaxes(-2, -1) / scaling_factor)if attn_mask is not None:attn = attn.masked_fill(attn_mask, -1e9)attn = self.softmax(attn)attn = self.dropout(attn)output = ops.matmul(attn, value)return (output, attn)
在处理数据时,我们为了统一长度,会使用占位符补齐了一些稍短的文本。
“Hello world!” --> <bos> hello world ! <eos> <pad> <pad>
这些 <pad> 占位符没有任何意义,不应该参与注意力分数计算中。为此我们在注意力中加入了 padding 掩码,即识别输入序列中的 <pad> 占位符,保证计算时这些位置对应的注意力分数为0。
def get_attn_pad_mask(seq_q, seq_k, pad_idx):"""注意力掩码:识别序列中的<pad>占位符Args:seq_q (Tensor): query序列,shape = [batch size, query len]seq_k (Tensor): key序列,shape = [batch size, key len]pad_idx (Tensor): key序列<pad>占位符对应的数字索引"""batch_size, len_q = seq_q.shapebatch_size, len_k = seq_k.shapepad_attn_mask = ops.equal(seq_k, pad_idx)pad_attn_mask = pad_attn_mask.expand_dims(1)pad_attn_mask = ops.broadcast_to(pad_attn_mask, (batch_size, len_q, len_k))return pad_attn_mask# 测试
q = k = Tensor([[1, 1, 0, 0]], mstype.float32)
pad_idx = 0
mask = get_attn_pad_mask(q, k, pad_idx)
print(mask)
print(q.shape, mask.shape)
自注意力机制(Self-Attention)
自注意力机制中,我们关注句子本身,查看每个单词对于周边单词的重要性。这样可以很好地理清句子中的逻辑关系,如代词指代。
举个例子,在 ’ The animal
didn’t cross the street because it
was too tired’这句话中,‘it’ 指代句中的 ‘The animal’,所以自注意力会赋予 ‘The’、‘animal’ 更高的注意力分值。
自注意力分数的计算还是遵循着上述的公式,只不过这里的 query
,key
和 value
都变成了句子本身点乘各自权重。
给定序列 X ∈ R n × d m o d e l X \in \mathbb{R}^{n \times d_{model}} X∈Rn×dmodel,序列长度为 n n n,维度为 d m o d e l d_{model} dmodel。在计算自注意力时, Q = W Q X , K = W K X , V = W V X Q = W^QX, K = W^KX, V = W^VX Q=WQX,K=WKX,V=WVX。
Attention ( Q , K , V ) = softmax ( Q K T d m o d e l ) V \text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_{model}}}\right)V Attention(Q,K,V)=softmax(dmodelQKT)V
其中,序列中位置为 i i i 的词与位置为 j j j 的词之间的自注意力分数为:
Attention ( Q , K , V ) i , j = exp ( Q i K j T d m o d e l ) ∑ k = 1 n exp ( Q i K k T d m o d e l ) V j \text{Attention}(Q, K, V)_{i,j} = \frac{\text{exp}\left(\frac{Q_iK_j^T}{\sqrt{d_{model}}}\right)}{\sum_{k=1}^{n}\text{exp}\left(\frac{Q_iK_k^T}{\sqrt{d_{model}}}\right)}V_j Attention(Q,K,V)i,j=∑k=1nexp(dmodelQiKkT)exp(dmodelQiKjT)Vj
多头注意力(Multi-Head Attention)
多头注意力是注意力机制的扩展,它可以使模型通过不同的方式关注输入序列的不同部分,从而提升模型的训练效果。
不同于之前一次计算整体输入的注意力分数,多头注意力是多次计算,每次计算输入序列中某一部分的注意力分数,最后再将结果进行整合。
多头注意力通过对输入的embedding乘以不同的权重参数 W Q W^{Q} WQ、 W K W^{K} WK和 W V W^{V} WV,将其映射到多个小维度空间中,我们称之为“头”(head),每个头部会并行计算自己的自注意力分数。
head i = Attention ( Q W i Q , K W i K , V W i V ) = softmax ( Q i K i T d k ) V i \text{head}_i = \text{Attention}(QW^Q_i, KW^K_i, VW^V_i) = \text{softmax}\left(\frac{Q_iK_i^T}{\sqrt{d_{k}}}\right)V_i headi=Attention(QWiQ,KWiK,VWiV)=softmax(dkQiKiT)Vi
W i Q ∈ R d m o d e l × d k W^Q_i \in \mathbb{R}^{d_{model}\times d_{k}} WiQ∈Rdmodel×dk、 W i K ∈ R d m o d e l × d k W^K_i \in \mathbb{R}^{d_{model}\times d_{k}} WiK∈Rdmodel×dk和 W i V ∈ R d m o d e l × d v W^V_i \in \mathbb{R}^{d_{model}\times d_{v}} WiV∈Rdmodel×dv为可学习的权重参数。一般为了平衡计算成本,我们会取 d k = d v = d m o d e l / n h e a d d_k = d_v = d_{model} / n_{head} dk=dv=dmodel/nhead。
在获得多组自注意力分数后,我们将结果拼接到一起,得到多头注意力的最终输出。 W O W^O WO为可学习的权重参数,用于将拼接后的多头注意力输出映射回原来的维度。
MultiHead ( Q , K , V ) = Concat ( head 1 , . . . , head h ) W O \text{MultiHead}(Q, K, V)=\text{Concat}(\text{head}_1, ..., \text{head}_h)W^O MultiHead(Q,K,V)=Concat(head1,...,headh)WO
简单来说,在多头注意力中,每个头部可以’解读’输入内容的不同方面,比如:捕捉全局依赖关系、关注特定语境下的词元、识别词和词之间的语法关系等。
class MultiHeadAttention(nn.Cell):def __init__(self, d_model, d_k, n_heads, dropout_p=0.):super().__init__()self.n_heads = n_headsself.d_k = d_kself.W_Q = nn.Dense(d_model, d_k * n_heads)self.W_K = nn.Dense(d_model, d_k * n_heads)self.W_V = nn.Dense(d_model, d_k * n_heads)self.W_O = nn.Dense(n_heads * d_k, d_model)self.attention = ScaledDotProductAttention(dropout_p=dropout_p)def construct(self, query, key, value, attn_mask):"""query: [batch_size, len_q, d_model]key: [batch_size, len_k, d_model]value: [batch_size, len_k, d_model]attn_mask: [batch_size, seq_len, seq_len]"""batch_size = query.shape[0]q_s = self.W_Q(query).view(batch_size, -1, self.n_heads, self.d_k)k_s = self.W_K(key).view(batch_size, -1, self.n_heads, self.d_k)v_s = self.W_V(value).view(batch_size, -1, self.n_heads, self.d_k)q_s = q_s.transpose((0, 2, 1, 3))k_s = k_s.transpose((0, 2, 1, 3))v_s = v_s.transpose((0, 2, 1, 3))attn_mask = attn_mask.expand_dims(1)attn_mask = ops.tile(attn_mask, (1, self.n_heads, 1, 1))context, attn = self.attention(q_s, k_s, v_s, attn_mask)context = context.transpose((0, 2, 1, 3)).view((batch_size, -1, self.n_heads * self.d_k))output = self.W_O(context)return output, attn# 测试
dmodel, dk, nheads = 10, 2, 5
q = k = v = ops.ones((1, 2, 10), mstype.float32)
attn_mask = Tensor([False]).broadcast_to((1, 2, 2))
multi_head_attn = MultiHeadAttention(dmodel, dk, nheads)
output, attn = multi_head_attn(q, k, v, attn_mask)
print(output.shape, attn.shape)
Transformer结构
Transformer同样是encoder-decoder的结构,只不过这里的“encoder”和“decoder”是由无数个同样结构的encoder层和decoder层堆叠组成。
在进行机器翻译时,encoder解读源语句(被翻译的句子)的信息,并传输给decoder。decoder接收源语句信息后,结合当前输入(目前翻译的情况),预测下一个单词,直到生成完整的句子。
Transformer的具体结构如下图所示,在进入encoder或decoder前,源序列和目标序列需要经过一些“加工”。
- word embedding: 将序列转换为模型所能理解的词向量表示,其中包含了序列的内容信息。
- positional encoding:在内容信息的基础上添加位置信息。
位置编码(Positional Encoding)
Transformer模型不包含RNN,所以无法在模型中记录时序信息,这样会导致模型无法识别由顺序改变而产生的句子含义的改变,如“我爱我的小猫”和“我的小猫爱我”。
为了弥补这个缺陷,我们选择在输入数据中额外添加表示位置信息的位置编码。
位置编码 P E PE PE的形状与经过word embedding后的输出 X X X相同,对于索引为[pos, 2i]的元素,以及索引为[pos, 2i+1]的元素,位置编码的计算如下:
P E ( p o s , 2 i ) = sin ( p o s 1000 0 2 i / d model ) PE_{(pos,2i)} = \sin\Bigg(\frac{pos}{10000^{2i/d_{\text{model}}}}\Bigg) PE(pos,2i)=sin(100002i/dmodelpos)
P E ( p o s , 2 i + 1 ) = cos ( p o s 1000 0 2 i / d model ) PE_{(pos,2i+1)} = \cos\Bigg(\frac{pos}{10000^{2i/d_{\text{model}}}}\Bigg) PE(pos,2i+1)=cos(100002i/dmodelpos)
在下面的代码中,我们实现了位置编码,输入经过word embedding后的结果 X X X,输出添加位置信息后的结果 X + P E X + PE X+PE。
from mindspore import numpy as mnpclass PositionalEncoding(nn.Cell):"""位置编码"""def __init__(self, d_model, dropout_p=0.1, max_len=100):super().__init__()self.dropout = nn.Dropout(1 - dropout_p)self.pe = ops.Zeros()((max_len, d_model), mstype.float32)pos = mnp.arange(0, max_len, dtype=mstype.float32).view((-1, 1))angle = ops.pow(10000.0, mnp.arange(0, d_model, 2, dtype=mstype.float32)/d_model)self.pe[:, 0::2] = ops.sin(pos/angle)self.pe[:, 1::2] = ops.cos(pos/angle)def construct(self, x):batch_size = x.shape[0]pe = self.pe.expand_dims(0)pe = ops.broadcast_to(pe, (batch_size, -1, -1))x = x + pe[:, :x.shape[1], :]return self.dropout(x)# 测试
x = ops.Zeros()((1, 2, 4), mstype.float32)
pe = PositionalEncoding(4)
print(pe(x))
编码器(Encoder)
Transformer的Encoder负责处理输入的源序列,并将输入信息整合为一系列的上下文向量(context vector)输出。
每个encoder层中存在两个子层:多头自注意力(multi-head self-attention)和基于位置的前馈神经网络(position-wise feed-forward network)。
子层之间使用了残差连接(residual connection),并使用了层规范化(layer normalization)。二者统称为“Add & Norm”
基于位置的前馈神经网络 (Position-Wise Feed-Forward Network)
基于位置的前馈神经网络被用来对输入中的每个位置进行非线性变换。它由两个线性层组成,层与层之间需要经过ReLU激活函数。
F F N ( x ) = R e L U ( x W 1 + b 1 ) W 2 + b 2 \mathrm{FFN}(x) = \mathrm{ReLU}(xW_1 + b_1)W_2 + b_2 FFN(x)=ReLU(xW1+b1)W2+b2
相比固定的ReLU函数,基于位置的前馈神经网络可以处理更加复杂的关系,并且由于前馈网络是基于位置的,可以捕获到不同位置的信息,并为每个位置提供不同的转换。
class PoswiseFeedForward(nn.Cell):def __init__(self, d_ff, d_model, dropout_p=0.):super().__init__()self.linear1 = nn.Dense(d_model, d_ff)self.linear2 = nn.Dense(d_ff, d_model)self.dropout = nn.Dropout(1-dropout_p)self.relu = nn.ReLU()def construct(self, x):"""前馈神经网络x: [batch_size, seq_len, d_model]"""x = self.linear1(x)x = self.relu(x)x = self.dropout(x)output = self.linear2(x)return output# 测试
x = ops.ones((1, 2, 4), mstype.float32)
ffn = PoswiseFeedForward(16, 4)
print(ffn(x).shape)
Add & Norm
Add & Norm层本质上是残差连接后紧接了一个LayerNorm层。
Add&Norm ( x ) = LayerNorm ( x + Sublayer ( x ) ) \text{Add\&Norm}(x) = \text{LayerNorm}(x + \text{Sublayer}(x)) Add&Norm(x)=LayerNorm(x+Sublayer(x))
- Add:残差连接,帮助缓解网络退化问题,注意需要满足 x x x与 SubLayer ( x ) 的形状一致 \text{SubLayer}(x)的形状一致 SubLayer(x)的形状一致;
- Norm:Layer Norm,层归一化,帮助模型更快地进行收敛;
class AddNorm(nn.Cell):def __init__(self, d_model, dropout_p=0.):super().__init__()self.layer_norm = nn.LayerNorm((d_model, ), epsilon=1e-5)self.dropout = nn.Dropout(1-dropout_p)def construct(self, x, residual):return self.layer_norm(self.dropout(x) + residual)# 测试
x = ops.ones((1, 2, 4), mstype.float32)
residual = ops.ones((1, 2, 4), mstype.float32)
add_norm = AddNorm(4)
print(add_norm(x, residual).shape)
EncoderLayer
我们首先实现encoder中的一个层。
class EncoderLayer(nn.Cell):def __init__(self, d_model, n_heads, d_ff, dropout_p=0.):super().__init__()d_k = d_model // n_headsif d_k * n_heads != d_model:raise ValueError(f"The `d_model` {d_model} can not be divisible by `num_heads` {n_heads}.")self.enc_self_attn = MultiHeadAttention(d_model, d_k, n_heads, dropout_p)self.pos_ffn = PoswiseFeedForward(d_ff, d_model, dropout_p)self.add_norm1 = AddNorm(d_model, dropout_p)self.add_norm2 = AddNorm(d_model, dropout_p)def construct(self, enc_inputs, enc_self_attn_mask):"""enc_inputs: [batch_size, src_len, d_model]enc_self_attn_mask: [batch_size, src_len, src_len]"""residual = enc_inputsenc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask)enc_outputs = self.add_norm1(enc_outputs, residual)residual = enc_outputsenc_outputs = self.pos_ffn(enc_outputs)enc_outputs = self.add_norm2(enc_outputs, residual)return enc_outputs, attn# 测试
x = ops.ones((1, 2, 8), mstype.float32)
mask = Tensor([False]).broadcast_to((1, 2, 2))
encoder_layer = EncoderLayer(8, 4, 16)
output, attn = encoder_layer(x, mask)
print(output.shape, attn.shape)
Encoder
将上面实现的encoder层堆叠n_layers
次,并添加wording embedding与positional encoding。
class Encoder(nn.Cell):def __init__(self, src_vocab_size, d_model, n_heads, d_ff, n_layers, dropout_p=0.):super().__init__()self.src_emb = nn.Embedding(src_vocab_size, d_model)self.pos_emb = PositionalEncoding(d_model, dropout_p)self.layers = nn.CellList([EncoderLayer(d_model, n_heads, d_ff, dropout_p) for _ in range(n_layers)])self.scaling_factor = ops.Sqrt()(Tensor(d_model, mstype.float32))def construct(self, enc_inputs, src_pad_idx):"""enc_inputs : [batch_size, src_len]"""enc_outputs = self.src_emb(enc_inputs.astype(mstype.int32))enc_outputs = self.pos_emb(enc_outputs * self.scaling_factor)enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs, src_pad_idx)enc_self_attns = []for layer in self.layers:enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)enc_self_attns.append(enc_self_attn)return enc_outputs, enc_self_attns
解码器 (Decoder)
解码器将编码器输出的上下文序列转换为目标序列的预测结果 Y ^ \hat{Y} Y^,该输出将在模型训练中与真实目标输出 Y Y Y进行比较,计算损失。
不同于编码器,每个Decoder层中包含两层多头注意力机制,并在最后多出一个线性层,输出对目标序列的预测结果。
- 第一层:计算目标序列的注意力分数的掩码多头自注意力;
- 第二层:用于计算上下文序列与目标序列对应关系,其中Decoder掩码多头注意力的输出作为query,Encoder的输出(上下文序列)作为key和value;
带掩码的多头注意力
在处理目标序列的输入时,t时刻的模型只能“观察”直到t-1时刻的所有词元,后续的词语不应该一并输入Decoder中。
为了保证在t时刻,只有t-1个词元作为输入参与多头注意力分数的计算,我们需要在第一个多头注意力中额外增加一个时间掩码,使目标序列中的词随时间发展逐个被暴露出来。
该注意力掩码可通过三角矩阵实现,对角线以上的词元表示为不参与注意力计算的词元,标记为1。
0 1 1 1 1 0 0 1 1 1 0 0 0 1 1 0 0 0 0 1 0 0 0 0 0 \begin{matrix} 0 & 1 & 1 & 1 & 1\\ 0 & 0 & 1 & 1 & 1\\ 0 & 0 & 0 & 1 & 1\\ 0 & 0 & 0 & 0 & 1\\ 0 & 0 & 0 & 0 & 0\\ \end{matrix} 0000010000110001110011110
该掩码一般被称作subsequent mask。
最后,将subsequent mask和padding mask合并为一个整体的掩码,确保模型既不会注意到t时刻以后的词元,也不会关注为<pad>的词元。
def get_attn_subsequent_mask(seq_q, seq_k):"""生成时间掩码,使decoder在第t时刻只能看到序列的前t-1个元素Args:seq_q (Tensor): query序列,shape = [batch size, len_q]seq_k (Tensor): key序列,shape = [batch size, len_k]"""batch_size, len_q = seq_q.shapebatch_size, len_k = seq_k.shapeones = ops.ones((batch_size, len_q, len_k), mindspore.float32)subsequent_mask = mnp.triu(ones, k=1)return subsequent_mask# 测试
q = k = ops.ones((1, 4), mstype.float32)
mask = get_attn_subsequent_mask(q, k)
print(mask)
Decoder Layer
首先实现Decoder中的一个层。
class DecoderLayer(nn.Cell):def __init__(self, d_model, n_heads, d_ff, dropout_p=0.):super().__init__()d_k = d_model // n_headsif d_k * n_heads != d_model:raise ValueError(f"The `d_model` {d_model} can not be divisible by `num_heads` {n_heads}.")self.dec_self_attn = MultiHeadAttention(d_model, d_k, n_heads, dropout_p)self.dec_enc_attn = MultiHeadAttention(d_model, d_k, n_heads, dropout_p)self.pos_ffn = PoswiseFeedForward(d_ff, d_model, dropout_p)self.add_norm1 = AddNorm(d_model, dropout_p)self.add_norm2 = AddNorm(d_model, dropout_p)self.add_norm3 = AddNorm(d_model, dropout_p)def construct(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):"""dec_inputs: [batch_size, trg_len, d_model]enc_outputs: [batch_size, src_len, d_model]dec_self_attn_mask: [batch_size, trg_len, trg_len]dec_enc_attn_mask: [batch_size, trg_len, src_len]"""residual = dec_inputsdec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)dec_outputs = self.add_norm1(dec_outputs, residual)residual = dec_outputsdec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)dec_outputs = self.add_norm2(dec_outputs, residual)residual = dec_outputsdec_outputs = self.pos_ffn(dec_outputs)dec_outputs = self.add_norm3(dec_outputs, residual)return dec_outputs, dec_self_attn, dec_enc_attn# 测试
x = y = ops.ones((1, 2, 4), mstype.float32)
mask1 = mask2 = Tensor([False]).broadcast_to((1, 2, 2))
decoder_layer = DecoderLayer(4, 1, 16)
output, attn1, attn2 = decoder_layer(x, y, mask1, mask2)
print(output.shape, attn1.shape, attn2.shape)
Decoder
将上面实现的DecoderLayer堆叠n_layer
次,添加word embedding与positional encoding,以及最后的线性层。
输出的dec_outputs
为对目标序列的预测。
class Decoder(nn.Cell):def __init__(self, trg_vocab_size, d_model, n_heads, d_ff, n_layers, dropout_p=0.):super().__init__()self.trg_emb = nn.Embedding(trg_vocab_size, d_model)self.pos_emb = PositionalEncoding(d_model, dropout_p)self.layers = nn.CellList([DecoderLayer(d_model, n_heads, d_ff) for _ in range(n_layers)])self.projection = nn.Dense(d_model, trg_vocab_size)self.scaling_factor = ops.Sqrt()(Tensor(d_model, mstype.float32)) def construct(self, dec_inputs, enc_inputs, enc_outputs, src_pad_idx, trg_pad_idx):"""dec_inputs: [batch_size, trg_len]enc_inputs: [batch_size, src_len]enc_outputs: [batch_size, src_len, d_model]"""dec_outputs = self.trg_emb(dec_inputs.astype(mstype.int32))dec_outputs = self.pos_emb(dec_outputs * self.scaling_factor)dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs, trg_pad_idx)dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs, dec_inputs)dec_self_attn_mask = ops.gt((dec_self_attn_pad_mask + dec_self_attn_subsequent_mask), 0)dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs, src_pad_idx)dec_self_attns, dec_enc_attns = [], []for layer in self.layers:dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)dec_self_attns.append(dec_self_attn)dec_enc_attns.append(dec_enc_attn)dec_outputs = self.projection(dec_outputs)return dec_outputs, dec_self_attns, dec_enc_attns
Transformer
将实现的Encoder与Decoder组合起来。
class Transformer(nn.Cell):def __init__(self, encoder, decoder):super().__init__()self.encoder = encoderself.decoder = decoderdef construct(self, enc_inputs, dec_inputs, src_pad_idx, trg_pad_idx):"""enc_inputs: [batch_size, src_len]dec_inputs: [batch_size, trg_len]"""enc_outputs, enc_self_attns = self.encoder(enc_inputs, src_pad_idx)dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs, src_pad_idx, trg_pad_idx)dec_logits = dec_outputs.view((-1, dec_outputs.shape[-1]))return dec_logits, enc_self_attns, dec_self_attns, dec_enc_attns
通过Transformer实现文本机器翻译
全流程
- 数据预处理: 将图像、文本等数据处理为可以计算的Tensor
- 模型构建: 使用框架API, 搭建模型
- 模型训练: 定义模型训练逻辑, 遍历训练集进行训练
- 模型评估: 使用训练好的模型, 在测试集评估效果
- 模型推理: 将训练好的模型部署, 输入新数据获得预测结果
数据准备
我们本次使用的数据集为Multi30K数据集,它是一个大规模的图像-文本数据集,包含30K+图片,每张图片对应两类不同的文本描述:
- 英语描述,及对应的德语翻译;
- 五个独立的、非翻译而来的英语和德语描述,描述中包含的细节并不相同;
因其收集的不同语言对于图片的描述相互独立,所以训练出的模型可以更好地适用于有噪声的多模态内容。
在本次文本翻译任务中,德语是源语言(source languag),英语是目标语言(target language)。
数据下载模块
使用download
进行数据下载,并将tar.gz
文件解压到指定文件夹。
from download import download
from pathlib import Path
from tqdm import tqdm
import osurls = {'train': 'http://www.quest.dcs.shef.ac.uk/wmt16_files_mmt/training.tar.gz','valid': 'http://www.quest.dcs.shef.ac.uk/wmt16_files_mmt/validation.tar.gz','test': 'http://www.quest.dcs.shef.ac.uk/wmt17_files_mmt/mmt_task1_test2016.tar.gz'
}cache_dir = Path.home() / '.mindspore_examples'train_path = download(urls['train'], os.path.join(cache_dir, 'train'), kind='tar.gz')
valid_path = download(urls['valid'], os.path.join(cache_dir, 'valid'), kind='tar.gz')
test_path = download(urls['test'], os.path.join(cache_dir, 'test'), kind='tar.gz')
数据预处理
在使用数据进行模型训练等操作时,我们需要对数据进行预处理,流程如下:
- 加载数据集;
- 构建词典;
- 创建数据迭代器;
数据加载器
加载数据集,并进行分词,即将句子拆解为单独的词元(token,可以为字符或者单词)。一般在机器翻译类任务中,我们习惯进行单词级词元化,即每个词元要么为一个单词,要么为一个标点符号。同一个单词,不论首字母是否大写,都应该对应同一个词元,故在分词前,我们需统一将单词转换为小写。
"Hello world!" --> ["hello", "world", "!"]
接下来,我们创建数据加载器Multi30K
。后期调用该类进行遍历时,每次返回当前源语言(德语)与目标语言(英语)文本描述的词元列表。
import reclass Multi30K():"""Multi30K数据集加载器加载Multi30K数据集并处理为一个Python迭代对象。"""def __init__(self, path):self.data = self._load(path)def _load(self, path):def tokenize(text):text = text.rstrip()return [tok.lower() for tok in re.findall(r'\w+|[^\w\s]', text)]members = {i.split('.')[-1]: i for i in os.listdir(path)}de_path = os.path.join(path, members['de'])en_path = os.path.join(path, members['en'])with open(de_path, 'r', encoding='utf-8') as de_file:de = de_file.readlines()[:-1]de = [tokenize(i) for i in de]with open(en_path, 'r', encoding='utf-8') as en_file:en = en_file.readlines()[:-1]en = [tokenize(i) for i in en]return list(zip(de, en))def __getitem__(self, idx):return self.data[idx]def __len__(self):return len(self.data)
train_dataset, valid_dataset, test_dataset = Multi30K(train_path), Multi30K(valid_path), Multi30K(test_path)
对解压和分词结果进行测试,打印测试数据集第一组英德语文本,可以看到每一个单词和标点符号已经被单独分离出来。
for de, en in test_dataset:print(f'de = {de}')print(f'en = {en}')break
词典
将每个词元映射到从0开始的数字索引中(为节约存储空间,可过滤掉词频低的词元),词元和数字索引所构成的集合叫做词典(vocabulary)。
以上述“Hello world!”为例,该序列组成的词典为:
{"<unk>": 0, "<pad>": 1, "<bos>": 2, "<eos>": 3, "hello": 4, "world": 5, "!": 6}
在构建词典中,我们使用了4个特殊词元。
<unk>
:未知词元(unknown),将出现次数少于一定频率的单词统一判定为未知词元;<bos>
:起始词元(begin of sentence),用来标注一个句子的开始;<eos>
:结束词元(end of sentence),用来标注一个句子的结束;<pad>
:填充词元(padding),当句子长度不够时将句子填充至统一长度;
通过Vocab
创建词典后,我们可以实现词元与数字索引之间的互相转换。我们可以通过调用enocde
函数,返回输入词元或者词元序列对应的数字索引或数字索引序列,反之亦然,我们同样可以通过调用decode
函数,返回输入数字索引或数字索引序列对应的词元或词元序列。
class Vocab:"""通过词频字典,构建词典"""special_tokens = ['<unk>', '<pad>', '<bos>', '<eos>']def __init__(self, word_count_dict, min_freq=1):self.word2idx = {}for idx, tok in enumerate(self.special_tokens):self.word2idx[tok] = idxfilted_dict = {w: cfor w, c in word_count_dict.items() if c >= min_freq}for w, _ in filted_dict.items():self.word2idx[w] = len(self.word2idx)self.idx2word = {idx: word for word, idx in self.word2idx.items()}self.bos_idx = self.word2idx['<bos>']self.eos_idx = self.word2idx['<eos>']self.pad_idx = self.word2idx['<pad>']self.unk_idx = self.word2idx['<unk>']def _word2idx(self, word):"""单词映射至数字索引"""if word not in self.word2idx:return self.unk_idxreturn self.word2idx[word]def _idx2word(self, idx):"""数字索引映射至单词"""if idx not in self.idx2word:raise ValueError('input index is not in vocabulary.')return self.idx2word[idx]def encode(self, word_or_list):"""将单个单词或单词数组映射至单个数字索引或数字索引数组"""if isinstance(word_or_list, list):return [self._word2idx(i) for i in word_or_list]return self._word2idx(word_or_list)def decode(self, idx_or_list):"""将单个数字索引或数字索引数组映射至单个单词或单词数组"""if isinstance(idx_or_list, list):return [self._idx2word(i) for i in idx_or_list]return self._idx2word(idx_or_list)def __len__(self):return len(self.word2idx)
通过自定义词频字典进行测试,我们可以看到词典已去除词频少于2的词元c,并加入了默认的四个特殊占位符,故词典整体长度为:4 - 1 + 4 = 7
word_count = {'a':20, 'b':10, 'c':1, 'd':2}vocab = Vocab(word_count, min_freq=2)
len(vocab)
使用collections
中的Counter
和OrderedDict
统计英/德语每个单词在整体文本中出现的频率。构建词频字典,然后再将词频字典转为词典。其中,收录所有源语言(德语)词元的词典为de_vocab
,收录所有目标语言(英语)词元的词典为en_vocab
。
在分配数字索引时有一个小技巧:常用的词元对应数值较小的索引,这样可以节约空间。
from collections import Counter, OrderedDictdef build_vocab(dataset):de_words, en_words = [], []for de, en in dataset:de_words.extend(de)en_words.extend(en)de_count_dict = OrderedDict(sorted(Counter(de_words).items(), key=lambda t: t[1], reverse=True))en_count_dict = OrderedDict(sorted(Counter(en_words).items(), key=lambda t: t[1], reverse=True))return Vocab(de_count_dict, min_freq=2), Vocab(en_count_dict, min_freq=2)
de_vocab, en_vocab = build_vocab(train_dataset)
print('Unique tokens in de vocabulary:', len(de_vocab))
数据迭代器
数据预处理的最后一步是创建数据迭代器。截至目前,我们已经通过数据加载器Multi30K
将源语言(德语)与目标语言(英语)的文本描述转换为词元序列,并构建了词元与数字索引一一对应的词典,接下来,需要将词元序列转换为数字索引序列。
还是以“Hello world!”为例,我们逐步演示数据迭代器中的操作
- 我们将表示开始和结束的特殊词元
<bos>
和<eos>
分别添加在每个词元序列的句首和句尾。
["hello", "world", "!"] --> ["<bos>", "hello", "world", "!", "<eos>"]
- 统一序列长度(超出长度的进行截断,未达到长度的通过填充
<pad>
进行补齐),同时记录序列的有效长度。此处假定统一的长度为7。
["<bos>", "hello", "world", "!", "<eos>"] --> ["<bos>", "hello", "world", "!", "<eos>", "<pad>", "<pad>"], valid length = 5
- 最后,对文本序列进行批处理。对于每个batch中的序列,通过调用词典中的
encode
为序列中的所有词元找到其对应的数字索引,将结果以Tensor
的形式返回。
["<bos>", "hello", "world", "!", "<eos>", "<pad>", "<pad>"] --> [2, 4, 5, 6, 3, 1, 1] --> tensor
import mindsporeclass Iterator():"""创建数据迭代器"""def __init__(self, dataset, de_vocab, en_vocab, batch_size, max_len=32, drop_reminder=False):self.dataset = datasetself.de_vocab = de_vocabself.en_vocab = en_vocabself.batch_size = batch_sizeself.max_len = max_lenself.drop_reminder = drop_reminderlength = len(self.dataset) // batch_sizeself.len = length if drop_reminder else length + 1 # 批量数量def __call__(self):def pad(idx_list, vocab, max_len):"""统一序列长度,并记录有效长度"""idx_pad_list, idx_len = [], []for i in idx_list:if len(i) > max_len - 2:idx_pad_list.append([vocab.bos_idx] + i[:max_len-2] + [vocab.eos_idx])idx_len.append(max_len)else:idx_pad_list.append([vocab.bos_idx] + i + [vocab.eos_idx] + [vocab.pad_idx] * (max_len - len(i) - 2))idx_len.append(len(i) + 2)return idx_pad_list, idx_lendef sort_by_length(src, trg):"""对德/英语的字段长度进行排序"""data = zip(src, trg)data = sorted(data, key=lambda t: len(t[0]), reverse=True)return zip(*list(data))def encode_and_pad(batch_data, max_len):"""将批量中的文本数据转换为数字索引,并统一每个序列的长度"""src_data, trg_data = zip(*batch_data)src_idx = [self.de_vocab.encode(i) for i in src_data]trg_idx = [self.en_vocab.encode(i) for i in trg_data]src_idx, trg_idx = sort_by_length(src_idx, trg_idx)src_idx_pad, src_len = pad(src_idx, de_vocab, max_len)trg_idx_pad, _ = pad(trg_idx, en_vocab, max_len)return src_idx_pad, src_len, trg_idx_padfor i in range(self.len):if i == self.len - 1 and not self.drop_reminder:batch_data = self.dataset[i * self.batch_size:]else:batch_data = self.dataset[i * self.batch_size: (i+1) * self.batch_size]src_idx, src_len, trg_idx = encode_and_pad(batch_data, self.max_len)yield mindspore.Tensor(src_idx, mindspore.int32), \mindspore.Tensor(src_len, mindspore.int32), \mindspore.Tensor(trg_idx, mindspore.int32)def __len__(self):return self.len
train_iterator = Iterator(train_dataset, de_vocab, en_vocab, batch_size=128, max_len=32, drop_reminder=True)
valid_iterator = Iterator(valid_dataset, de_vocab, en_vocab, batch_size=128, max_len=32, drop_reminder=False)
test_iterator = Iterator(test_dataset, de_vocab, en_vocab, batch_size=1, max_len=32, drop_reminder=False)
模型构建
定义超参数,实例化模型。
src_vocab_size = len(de_vocab)
trg_vocab_size = len(en_vocab)
src_pad_idx = de_vocab.pad_idx
trg_pad_idx = en_vocab.pad_idxd_model = 512
d_ff = 2048
n_layers = 6
n_heads = 8encoder = Encoder(src_vocab_size, d_model, n_heads, d_ff, n_layers, dropout_p=0.1)
decoder = Decoder(trg_vocab_size, d_model, n_heads, d_ff, n_layers, dropout_p=0.1)
model = Transformer(encoder, decoder)
模型训练 & 模型评估
定义损失函数与优化器。
- 损失函数:定义如何计算模型输出(logits)与目标(targets)之间的误差,这里可以使用交叉熵损失(CrossEntropyLoss)
- 优化器:MindSpore将模型优化算法的实现称为优化器。优化器内部定义了模型的参数优化过程(即梯度如何更新至模型参数),所有优化逻辑都封装在优化器对象中。
loss_fn = nn.CrossEntropyLoss(ignore_index=trg_pad_idx)
optimizer = nn.Adam(model.trainable_params(), learning_rate=0.0001)
模型训练逻辑
MindSpore在模型训练部分使用了函数式编程(FP)。
构造函数 → 函数变换 → 函数调用 \text{构造函数}\rightarrow \text{函数变换} \rightarrow \text{函数调用} 构造函数→函数变换→函数调用
- Network+loss function直接构造正向函数
- 函数变换,获得梯度计算(反向传播)函数
- 构造训练过程函数
- 调用函数进行训练
定义前向网络计算逻辑。
在训练过程中,表示句子结尾的<eos>占位符应是被模型预测出来,而不是作为模型的输入,所以在处理Decoder的输入时,我们需要移除目标序列最末的<eos>占位符。
trg = [ <bos> , x 1 , x 2 , . . . , x n , <eos> ] \text{trg} = [\text{<bos>}, x_1, x_2, ..., x_n, \text{<eos>}] trg=[<bos>,x1,x2,...,xn,<eos>]
trg[:-1] = [ <bos> , x 1 , x 2 , . . . , x n ] \text{trg[:-1]} = [\text{<bos>}, x_1, x_2, ..., x_n] trg[:-1]=[<bos>,x1,x2,...,xn]
其中, x i x_i xi代表目标序列中第i个表示实际内容的词元。
我们期望最终的输出包含表示句末的<eos>,不包含表示句首的<bos>,所以在计算损失时,需要同样去除的目标序列的句首<bos>占位符,再进行比较。
output = [ y 1 , y 2 , . . . , y n , <eos> ] \text{output} = [y_1, y_2, ..., y_n, \text{<eos>}] output=[y1,y2,...,yn,<eos>]
trg[1:] = [ x 1 , x 2 , . . . , x n , <bos> ] \text{trg[1:]} = [x_1, x_2, ..., x_n, \text{<bos>}] trg[1:]=[x1,x2,...,xn,<bos>]
其中, y i y_i yi表示预测的第i个实际内容词元。
def forward(enc_inputs, dec_inputs):"""前向网络enc_inputs: [batch_size, src_len]dec_inputs: [batch_size, trg_len]"""logits, _, _, _ = model(enc_inputs, dec_inputs[:, :-1], src_pad_idx, trg_pad_idx)targets = dec_inputs[:, 1:].view(-1)loss = loss_fn(logits, targets)return loss
定义梯度计算函数。
为了优化模型参数,需要求参数对loss的导数。我们调用mindspore.ops.value_and_grad
函数,来获得function的微分函数。
常用到的参数有三种:
- fn:待求导的函数;
- grad_position:指定求导输入位置的索引;
- weights:指定求导的参数;
由于使用Cell封装神经网络模型,模型参数为Cell的内部属性,此时我们不需要使用grad_position
指定对函数输入求导,因此将其配置为None。对模型参数求导时,我们使用weights参数,使用model.trainable_params()
方法从Cell中取出可以求导的参数。
grad_fn = ops.value_and_grad(forward, None, optimizer.parameters)
定义训练一个step的逻辑。
def train_step(enc_inputs, dec_inputs):loss, grads = grad_fn(enc_inputs, dec_inputs)optimizer(grads)return loss
定义整体训练逻辑。
在训练中,模型会以最小化损失为目标更新模型权重,故模型状态需设置为训练model.set_train(True)
。
def train(iterator, epoch=0):model.set_train(True)num_batches = len(iterator)total_loss = 0total_steps = 0with tqdm(total=num_batches) as t:t.set_description(f'Epoch: {epoch}')for src, src_len, trg in iterator():loss = train_step(src, trg)total_loss += loss.asnumpy()total_steps += 1curr_loss = total_loss / total_stepst.set_postfix({'loss': f'{curr_loss:.2f}'})t.update(1)return total_loss / total_steps
定义模型评估逻辑。
在评估中,仅需正向计算loss,无需更新模型参数,故模型状态需设置为训练model.set_train(False)
。
def evaluate(iterator):model.set_train(False)num_batches = len(iterator)total_loss = 0total_steps = 0with tqdm(total=num_batches) as t:for src, _, trg in iterator():loss = forward(src, trg)total_loss += loss.asnumpy()total_steps += 1curr_loss = total_loss / total_stepst.set_postfix({'loss': f'{curr_loss:.2f}'})t.update(1)return total_loss / total_steps
模型训练
数据集遍历迭代,一次完整的数据集遍历成为一个epoch。我们逐个epoch打印训练的损失值和评估精度,并通过save_checkpoint
保存评估精度最高的ckpt文件(transformer.ckpt)到home_path/.mindspore_examples/transformer.ckpt。
from mindspore import save_checkpointnum_epochs = 10
best_valid_loss = float('inf')
ckpt_file_name = os.path.join(cache_dir, 'transformer.ckpt')for i in range(num_epochs):train_loss = train(train_iterator, i)valid_loss = evaluate(valid_iterator)if valid_loss < best_valid_loss:best_valid_loss = valid_losssave_checkpoint(model, ckpt_file_name)
模型推理
首先,通过load_checkpoint
与load_param_into_net
将训练好的模型参数加载入新实例化的模型中。
from mindspore import load_checkpoint, load_param_into_netencoder = Encoder(src_vocab_size, d_model, n_heads, d_ff, n_layers, dropout_p=0.1)
decoder = Decoder(trg_vocab_size, d_model, n_heads, d_ff, n_layers, dropout_p=0.1)
new_model = Transformer(encoder, decoder)param_dict = load_checkpoint(ckpt_file_name)
load_param_into_net(new_model, param_dict)
推理过程中无需对模型参数进行更新,所以这里model.set_train(False)
。
我们输入一个德文语句,期望可以返回翻译好的英文语句。
首先通过Encoder提取德文序列中的特征信息,并将其传输至Decoder。
Decoder最开始的输入为起始占位符<bos>,每次会根据输入预测下一个出现的单词,并对输入进行更新,直到预测出终止占位符<eos>。
def inference(sentence, max_len=32):"""模型推理:输入一个德语句子,输出翻译后的英文句子enc_inputs: [batch_size(1), src_len]"""new_model.set_train(False)if isinstance(sentence, str):tokens = [tok.lower() for tok in re.findall(r'\w+|[^\w\s]', sentence.rstrip())]else:tokens = [token.lower() for token in sentence]if len(tokens) > max_len - 2:src_len = max_lentokens = ['<bos>'] + tokens[:max_len - 2] + ['<eos>']else:src_len = len(tokens) + 2tokens = ['<bos>'] + tokens + ['<eos>'] + ['<pad>'] * (max_len - src_len)indexes = de_vocab.encode(tokens)enc_inputs = Tensor(indexes, mstype.float32).expand_dims(0)enc_outputs, _ = new_model.encoder(enc_inputs, src_pad_idx)dec_inputs = Tensor([[en_vocab.bos_idx]], mstype.float32)max_len = enc_inputs.shape[1]for _ in range(max_len):dec_outputs, _, _ = new_model.decoder(dec_inputs, enc_inputs, enc_outputs, src_pad_idx, trg_pad_idx)dec_logits = dec_outputs.view((-1, dec_outputs.shape[-1]))dec_logits = dec_logits[-1, :]pred = dec_logits.argmax(axis=0).expand_dims(0).expand_dims(0)pred = pred.astype(mstype.float32)dec_inputs = ops.concat((dec_inputs, pred), axis=1)if int(pred.asnumpy()[0]) == en_vocab.eos_idx:breaktrg_indexes = [int(i) for i in dec_inputs.view(-1).asnumpy()]eos_idx = trg_indexes.index(en_vocab.eos_idx) if en_vocab.eos_idx in trg_indexes else -1trg_tokens = en_vocab.decode(trg_indexes[1:eos_idx])return trg_tokens
以测试数据集中的第一组语句为例,进行测试。
example_idx = 0src = test_dataset[example_idx][0]
trg = test_dataset[example_idx][1]
pred_trg = inference(src)print(f'src = {src}')
print(f'trg = {trg}')
print(f"predicted trg = {pred_trg}")
BLEU得分
双语替换评测得分(bilingual evaluation understudy,BLEU)为衡量文本翻译模型生成出来的语句好坏的一种算法,它的核心在于评估机器翻译的译文 pred \text{pred} pred 与人工翻译的参考译文 label \text{label} label 的相似度。通过对机器译文的片段与参考译文进行比较,计算出各个片段的的分数,并配以权重进行加和,基本规则为:
- 惩罚过短的预测,即如果机器翻译出来的译文相对于人工翻译的参考译文过于短小,则命中率越高,需要施加更多的惩罚;
- 对长段落匹配更高的权重,即如果出现长段落的完全命中,说明机器翻译的译文更贴近人工翻译的参考译文;
BLEU的公式如下:
e x p ( m i n ( 0 , 1 − l e n ( label ) l e n ( pred ) ) Π n = 1 k p n 1 / 2 n ) exp(min(0, 1-\frac{len(\text{label})}{len(\text{pred})})\Pi^k_{n=1}p_n^{1/2^n}) exp(min(0,1−len(pred)len(label))Πn=1kpn1/2n)
len(label)
:人工翻译的译文长度len(pred)
:机器翻译的译文长度p_n
:n-gram的精度
我们可以调用nltk
中的corpus_bleu
函数来计算BLEU,在此之前,需要手动下载nltk
。
pip install nltk
from nltk.translate.bleu_score import corpus_bleudef calculate_bleu(dataset, max_len=50):trgs = []pred_trgs = []for data in dataset[:10]:src = data[0]trg = data[1]pred_trg = inference(src, max_len)pred_trgs.append(pred_trg)trgs.append([trg])return corpus_bleu(trgs, pred_trgs)bleu_score = calculate_bleu(test_dataset)print(f'BLEU score = {bleu_score*100:.2f}')