Transformer:GPT背后的"造脑工程"全解析(含手搓过程)
Transformer 是人工智能领域的革命性架构,通过自注意力机制让模型像人类一样"全局理解"上下文关系。它摒弃传统循环结构,采用并行计算实现高效训练,配合位置编码破解序列的时空密码,在机器翻译、文本生成等任务中实现质的飞跃。GPT、BERT等顶尖模型均基于Transformer,其多头注意力设计如同给AI装上"多核大脑",可同时捕捉词语间的语法、语义、指代等多维关系,成为通向通用人工智能的重要基石。
一、从"人工智障"到"智能涌现":Transformer的降维打击
震撼对比实验:
使用相同训练数据(维基百科+图书语料)
- RNN模型:“巴黎是法国的首都,位于__” → “塞纳河畔”(正确率68%)
- Transformer:“巴黎是法国的首都,位于__” → “北部法兰西岛大区”(正确率92%)
传统模型三大痛点:
- 梯度消失:长距离依赖难以捕捉(如"虽然…但是…"结构)
- 计算低效:无法并行处理序列数据
- 记忆瓶颈:固定长度上下文窗口
二、位置编码
transfomer的其他结构均在之前文章有过涉及,这里着重讲一下位置编码。
由于Transformer模型不使⽤循环神经⽹络,因此⽆法从序列中学习到位置信息。为了解决这个问题,需要为输⼊序列添加位置编码,将每个词的位置信息加⼊词向量中。
通过位置编码加入每一个token的位置信息
图中的类似于太极图的那个符号其实是“正弦”符号。正弦位置编码使⽤不同频率的正弦和余弦函数对每个位置进⾏编码。编码后,每个位置都会得到⼀个固定的位置编码,与词向量拼接或相加后,可以作为模型的输⼊。
正弦位置编码具有平滑性和保留相对位置信息等优点,因此在原始的Transformer论⽂中被采⽤。当然,也有其他位置编码⽅法,如可学习的位置编码,它将位置信息作为模型参数进⾏学习。
三、分部手搓Transformer核心组件
这个逐步拆解的过程是从中⼼到两边、从左到右进⾏的。也就是从中⼼组件到外围延展,从编码器到解码器延展,然后把它们组合成Transformer类。
以下是代码的关键组件。
(1)多头⾃注意⼒:通过ScaledDotProductAttention
类实现缩放点积注意⼒机制,然后通过MultiHeadAttention
类实现多头⾃注意⼒机制。
(2)逐位置前馈⽹络:通过PoswiseFeedForwardNet
类实现逐位置前馈⽹络。
(3)正弦位置编码表:通过get_sin_code_table
函数⽣成正弦位置编码表。
(4)填充掩码:通过get_attn_pad_mask
函数为填充令牌⽣成注意⼒掩码,避免注意⼒机制关注⽆⽤的信息。
(5)编码器层:通过EncoderLayer
类定义编码器的单层。
(6)编码器:通过Encoder
类定义Transformer
完整的编码器部分。
(7)后续掩码:通过get_attn_subsequent_mask
函数为后续令牌(当前位置后⾯的信息)⽣成注意⼒掩码,避免解码器中的注意⼒机制“偷窥”未来的⽬标数据。
(8)解码器层:通过DecoderLayer
类定义解码器的单层。
(9)解码器:通过Decoder
类定义Transformer
完整的解码器部分。
(10)Transformer
类:此类将编码器和解码器整合为完整的Transformer
模型。
3.1 多头自注意力(包含残差连接和归一化)
多头自注意力的结构如下:

这⾥我们有两个⼦组件:ScaledDotProductAttention
(缩放点积注意⼒)类和MultiHeadAttention
(多头⾃注意⼒)类。它们在Transformer架构中负责实现⾃注意⼒机制。
其中,ScaledDotProductAttention
类是构成MultiHeadAttention
类的组件元素,也就是说,在多头⾃注意⼒中的每⼀个头,都使⽤缩放点积注意⼒来实现。
import numpy as np # 导入 numpy 库
import torch # 导入 torch 库
import torch.nn as nn # 导入 torch.nn 库
d_k = 64 # K(=Q) 维度
d_v = 64 # V 维度
# 定义缩放点积注意力类
class ScaledDotProductAttention(nn.Module):def __init__(self):super(ScaledDotProductAttention, self).__init__() def forward(self, Q, K, V, attn_mask):#------------------------- 维度信息 -------------------------------- # Q K V [batch_size, n_heads, len_q/k/v, dim_q=k/v] (dim_q=dim_k)# attn_mask [batch_size, n_heads, len_q, len_k]#----------------------------------------------------------------# 计算注意力分数(原始权重)[batch_size,n_heads,len_q,len_k]scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) #------------------------- 维度信息 -------------------------------- # scores [batch_size, n_heads, len_q, len_k]#----------------------------------------------------------------- # 使用注意力掩码,将 attn_mask 中值为 1 的位置的权重替换为极小值#------------------------- 维度信息 -------------------------------- # attn_mask [batch_size, n_heads, len_q, len_k], 形状和 scores 相同#----------------------------------------------------------------- scores.masked_fill_(attn_mask, -1e9) # 对注意力分数进行 softmax 归一化weights = nn.Softmax(dim=-1)(scores) #------------------------- 维度信息 -------------------------------- # weights [batch_size, n_heads, len_q, len_k], 形状和 scores 相同#----------------------------------------------------------------- # 计算上下文向量(也就是注意力的输出), 是上下文信息的紧凑表示context = torch.matmul(weights, V) #------------------------- 维度信息 -------------------------------- # context [batch_size, n_heads, len_q, dim_v]#----------------------------------------------------------------- return context, weights # 返回上下文向量和注意力分数
整个过程配合代码如下图所示:

前向传播(forward):
def forward(self, Q, K, V, attn_mask):
Q
: 查询向量 (query),形状是[batch_size, n_heads, len_q, dim_q]
。K
: 键向量 (key),形状是[batch_size, n_heads, len_k, dim_k]
。V
: 值向量 (value),形状是[batch_size, n_heads, len_v, dim_v]
。attn_mask
: 注意力掩码 (mask),形状是[batch_size, n_heads, len_q, len_k]
。用于在计算注意力时屏蔽某些位置(例如在解码器中,避免未来位置被看到)。
应用掩码:
scores.masked_fill_(attn_mask, -1e9)
- 掩码应用:
attn_mask
具有和scores
相同的形状([batch_size, n_heads, len_q, len_k]
)。将attn_mask
中为1
的位置替换为一个非常小的值-1e9
,这些小值在后续的softmax
操作中会被“屏蔽”,即变为 0,避免这些位置的注意力权重被关注。
即1是需要忽略的部分,0是不需要忽略的部分。
attn_mask生成方式通常取决于以下几个因素:
- 解码器中的未来掩码(用于防止信息泄漏)
在 Transformer 的解码器中,我们需要确保模型只能看到当前时刻及之前的词,而不能看到未来时刻的词。例如,当前时刻的第 t
个位置的查询 Q[t]
应该只依赖于前 t
个位置的键 K
和对应的值 V
。这样做的目的是防止在训练时未来信息泄漏。
例如,如果序列长度是 4
,attn_mask
应该是一个上三角矩阵,表示模型不能看到未来时刻的内容。具体来说,attn_mask
会是一个形状为 [batch_size, n_heads, len_q, len_k]
的矩阵,其中 attn_mask[i, j, p, q] = 1
表示第 i
个样本、第 j
个头、第 p
个查询位置和第 q
个键位置之间的注意力需要被屏蔽。
举个例子,假设 attn_mask
为:
[1, 0, 0, 0] # 第 0 个位置只能看到自己
[1, 1, 0, 0] # 第 1 个位置可以看到自己和第 0 个位置
[1, 1, 1, 0] # 第 2 个位置可以看到自己和前两个位置
[1, 1, 1, 1] # 第 3 个位置可以看到自己和前三个位置
这样,对于 attn_mask
中的每个位置为 1
的部分,scores
中对应的位置会被屏蔽(设为极小的值 -1e9
),从而避免模型在生成时刻 t
的预测时“看到”未来的信息。
- 填充(Padding)掩码(用于忽略填充位置)
在处理变长输入序列时,序列中的某些位置可能是填充符(<PAD>
),这些填充符并不包含实际的信息,因此我们希望忽略它们对注意力计算的影响。为了避免填充符影响模型的注意力计算,我们会将填充符对应的位置的 attn_mask
设置为 1
(表示屏蔽这些位置)。
假设输入序列是:
[1, 2, 3, 0, 0] # 1, 2, 3 是实际内容,0 是填充符
对应的 attn_mask
可以是:
[0, 0, 0, 1, 1] # 填充符位置被标记为 1,表示要屏蔽
这样,attn_mask
中为 1
的位置就会在计算注意力时被屏蔽,确保填充符不会影响计算。
- 其他任务相关掩码
有时,attn_mask
也可以根据特定任务的需求自定义。例如,某些任务可能要求在计算注意力时忽略特定的区域,或者仅在特定的部分计算注意力。这种情况通常是通过任务外部的逻辑生成掩码。
下⾯定义多头⾃注意⼒另⼀个⼦组件,多头⾃注意⼒类(这⾥同时包含残差连接和层归⼀化操作)
# 定义多头自注意力类
d_embedding = 512 # Embedding 的维度
n_heads = 8 # Multi-Head Attention 中头的个数
batch_size = 3 # 每一批的数据大小
class MultiHeadAttention(nn.Module):def __init__(self):super(MultiHeadAttention, self).__init__()self.W_Q = nn.Linear(d_embedding, d_k * n_heads) # Q的线性变换层self.W_K = nn.Linear(d_embedding, d_k * n_heads) # K的线性变换层self.W_V = nn.Linear(d_embedding, d_v * n_heads) # V的线性变换层self.linear = nn.Linear(n_heads * d_v, d_embedding)self.layer_norm = nn.LayerNorm(d_embedding)def forward(self, Q, K, V, attn_mask): #------------------------- 维度信息 -------------------------------- # Q K V [batch_size, len_q/k/v, embedding_dim] #----------------------------------------------------------------- residual, batch_size = Q, Q.size(0) # 保留残差连接# 将输入进行线性变换和重塑,以便后续处理q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2)v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2)#------------------------- 维度信息 -------------------------------- # q_s k_s v_s: [batch_size, n_heads, len_q/k/v, d_q=k/v]#----------------------------------------------------------------- # 将注意力掩码复制到多头 attn_mask: [batch_size, n_heads, len_q, len_k]attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)#------------------------- 维度信息 -------------------------------- # attn_mask [batch_size, n_heads, len_q, len_k]#----------------------------------------------------------------- # 使用缩放点积注意力计算上下文和注意力权重context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)#------------------------- 维度信息 -------------------------------- # context [batch_size, n_heads, len_q, dim_v]# weights [batch_size, n_heads, len_q, len_k]#----------------------------------------------------------------- # 通过调整维度将多个头的上下文向量连接在一起context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) #------------------------- 维度信息 -------------------------------- # context [batch_size, len_q, n_heads * dim_v]#----------------------------------------------------------------- # 用一个线性层把连接后的多头自注意力结果转换,原始地嵌入维度output = self.linear(context) #------------------------- 维度信息 -------------------------------- # output [batch_size, len_q, embedding_dim]#----------------------------------------------------------------- # 与输入 (Q) 进行残差链接,并进行层归一化后输出output = self.layer_norm(output + residual)#------------------------- 维度信息 -------------------------------- # output [batch_size, len_q, embedding_dim]#----------------------------------------------------------------- return output, weights # 返回层归一化的输出和注意力权重
将输⼊进⾏线性变换和重塑,就是为了形成多个头
3.2 逐位置前馈网络(包含残差连接和层归一化)
前馈神经⽹络(Feed-Forward Network)我们都了解,是⼀个包含全连接层的神经络。这种⽹络在计算过程中是按照从输⼊到输出的⽅向进⾏前馈传播的。
但是这个“Position- wise”如何理解?
在这⾥,“Poswise”或“Position-wise”是指这个前馈神经⽹络独⽴地作⽤在输⼊序列的每个位置(即token)上,也就是对⾃注意⼒机制处理后的结果上的各个位置进⾏独⽴处理,⽽不是把⾃注意⼒结果展平之后,以⼀个⼤的⼀维张量的形式整体输⼊前馈⽹络。这意味着对于序列中的每个位置,我们都在该位置应⽤相同的神经⽹络,做相同的处理,并且不会受到其他位置的影响。因此,逐位置操作保持了输⼊序列的原始顺序
所以⽆论是多头⾃注意⼒组件,还是前馈神经⽹络组件,都严格地保证“队形”,不打乱、不整合、不循环,⽽这种对序列位置信息的完整保持和并⾏处理,正是Transformer的核⼼思路。
# 定义逐位置前馈网络类
class PoswiseFeedForwardNet(nn.Module):def __init__(self, d_ff=2048):super(PoswiseFeedForwardNet, self).__init__()# 定义一维卷积层 1,用于将输入映射到更高维度self.conv1 = nn.Conv1d(in_channels=d_embedding, out_channels=d_ff, kernel_size=1)# 定义一维卷积层 2,用于将输入映射回原始维度self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_embedding, kernel_size=1)# 定义层归一化self.layer_norm = nn.LayerNorm(d_embedding)def forward(self, inputs): #------------------------- 维度信息 -------------------------------- # inputs [batch_size, len_q, embedding_dim]#---------------------------------------------------------------- residual = inputs # 保留残差连接 # 在卷积层 1 后使用 ReLU 激活函数 output = nn.ReLU()(self.conv1(inputs.transpose(1, 2))) #------------------------- 维度信息 -------------------------------- # output [batch_size, d_ff, len_q]#----------------------------------------------------------------# 使用卷积层 2 进行降维 output = self.conv2(output).transpose(1, 2) #------------------------- 维度信息 -------------------------------- # output [batch_size, len_q, embedding_dim]#----------------------------------------------------------------# 与输入进行残差链接,并进行层归一化output = self.layer_norm(output + residual) #------------------------- 维度信息 -------------------------------- # output [batch_size, len_q, embedding_dim]#----------------------------------------------------------------return output # 返回加入残差连接后层归一化的结果
PoswiseFeedForwardNet类实现了逐位置前馈⽹络,⽤于处理Transformer中⾃注意⼒机制的输出。其中包含两个⼀维卷积层,它们⼀个负责将输⼊映射到更⾼维度,⼀个再把它映射回原始维度。在两个卷积层之间,使⽤了ReLU
函数。
在这⾥,⽤⼀维卷积层代替了论⽂中的全连接层(线性层)来实现前馈神经⽹络。其原因是全连接层不共享权重,⽽⼀维卷积层在各个位置上共享权重,所以能够减少⽹络参数的数量。
⼀维卷积层的⼯作原理是将卷积核(也称为过滤器或特征映射)沿输⼊序列的⼀个维度滑动(如下图所示),并在每个位置进⾏点积操作。在这种情况下,我们使⽤⼤⼩为1的卷积核。这样,卷积操作实际上只会在输⼊序列的⼀个位置进⾏计算,因此它能够独⽴地处理输⼊序列中的每个位置。

在PoswiseFeedForwardNet类中,⾸先通过使⽤conv1的多个卷积核将输⼊序列映射到更⾼的维度(程序中是2048维,这是⼀个可调节的超参数),并应⽤ReLU函数。
接着,conv2将映射后的序列降维到原始维度。这个过程在输⼊序列的每个位置上都是独⽴完成的,因为⼀维卷积层会在每个位置进⾏逐点操作。所以,逐位置前馈神经⽹络能够在每个位置上分别应⽤相同的运算,从⽽捕捉输⼊序列中各个位置的信息。
逐位置前馈神经网络有下⾯⼏个作⽤。
(1)增强模型的表达能⼒。FFN为模型提供了更强⼤的表达能⼒,使其能够捕捉输⼊序列中更复杂的模式。通过逐位置前馈神经⽹络和⾃注意⼒机制的组合,Transformer可以学习到不同位置之间的⻓距离依赖关系。
(2)信息融合。==FFN可以将⾃注意⼒机制输出的信息进⾏融合。==每个位置上的信息在经过FFN后,都会得到⼀个新表示。这个新表示可以看作原始信息在经过⼀定程度的⾮线性变换之后的结果。
(3)层间传递。在Transformer中,逐位置前馈神经⽹络将在每个编码器和解码器层中使⽤。
这样可以确保每⼀层的输出都经过了FFN的处理,从⽽在多层次上捕捉到序列中的特征。多头⾃注意⼒层和逐位置前馈神经⽹络层是编码器层结构中的两个主要组件,不过,在开始构建编码器层之前,还要再定义两个辅助性的组件。第⼀个是位置编码表,第⼆个是⽣成填充注意⼒掩码的函数。
3.3 正弦编码表
Transformer模型的并⾏结构导致它不是按位置顺序来处理序列的,但是在处理序列尤其是注意⼒计算的过程中,仍需要位置信息来帮助捕捉序列中的顺序关系。为了解决这个问题,需要向输⼊序列中添加位置编码。
Tansformer的原始论⽂中使⽤的是正弦位置编码。它的计算公式如下:
P E ( p o s , 2 i ) = sin ( p o s 1000 0 2 i / d ) PE(\mathrm{pos},2i)=\sin\left(\frac{\mathrm{pos}}{10000^{2i/d}}\right) PE(pos,2i)=sin(100002i/dpos)
P E ( p o s , 2 i + 1 ) = cos ( p o s 1000 0 2 i l d ) PE(\mathrm{pos},2i+1)=\cos\left(\frac{\mathrm{pos}}{10000^{2ild}}\right) PE(pos,2i+1)=cos(100002ildpos)
这种位置编码⽅式具有周期性和连续性的特点,可以让模型学会捕捉位置之间的相对关系和全
局关系。这个公式可以⽤于计算位置嵌⼊向量中每个维度的⻆度值。
■ pos:单词/标记在句⼦中的位置,从0到seq_len-1。
■ d:单词/标记嵌⼊向量的维度embedding_dim。
■ i:嵌⼊向量中的每个维度,从0到 d 2 − 1 \frac{d}{2}-1 2d−1
# 生成正弦位置编码表的函数,用于在 Transformer 中引入位置信息
def get_sin_enc_table(n_position, embedding_dim):#------------------------- 维度信息 --------------------------------# n_position: 输入序列的最大长度# embedding_dim: 词嵌入向量的维度#----------------------------------------------------------------- # 根据位置和维度信息,初始化正弦位置编码表sinusoid_table = np.zeros((n_position, embedding_dim)) # 遍历所有位置和维度,计算角度值for pos_i in range(n_position):for hid_j in range(embedding_dim):angle = pos_i / np.power(10000, 2 * (hid_j // 2) / embedding_dim)sinusoid_table[pos_i, hid_j] = angle # 计算正弦和余弦值sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) # dim 2i 偶数维sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) # dim 2i+1 奇数维 #------------------------- 维度信息 --------------------------------# sinusoid_table 的维度是 [n_position, embedding_dim]#---------------------------------------------------------------- return torch.FloatTensor(sinusoid_table) # 返回正弦位置编码表
事实上,使⽤1、2、3、4等⾃然数序列作为位置编码确实可以为序列中的不同位置提供区分性。然⽽,这种⽅法可能在某些⽅⾯不如正弦和余弦函数⽣成的位置嵌⼊向量有效。
当我们使⽤⾃然数序列作为位置编码时,这些编码是线性的。这意味着相邻位置之间的差异在整个序列中保持恒定。然⽽,在许多任务中,不同位置之间的关系可能更复杂,可能需要⼀种能够捕捉到这种复杂关系的编码⽅法。
正弦和余弦函数⽣成的位置嵌⼊向量具有周期性和正交性,因此可以产⽣在各个尺度上都有区分性的位置嵌⼊。这使得模型可以更容易地学习到序列中不同位置之间的关系,特别是在捕捉⻓距离依赖关系时可能表现得更好。
所以,虽然使⽤⾃然数序列(1、2、3、4等)作为位置编码可以做⼀定的区分,但正弦和余弦函数⽣成的位置嵌⼊向量在捕捉序列中更复杂的位置关系⽅⾯更具优势。
3.4 填充掩码
在NLP任务中,输⼊序列的⻓度通常是不固定的。为了能够同时处理多个序列,我们需要将这些序列填充到相同的⻓度,将不等⻓的序列补充到等⻓,这样才能将它们整合成同⼀个批次进⾏训练。
通常使⽤⼀个特殊的标记(如,编码后这个token的值通常是0)来表示填充部分。
然⽽,这些填充符号并没有实际的含义,所以我们希望模型在计算注意⼒时忽略它们。因此,在编码器的输⼊部分,我们使⽤了填充位的注意⼒掩码机制(如下⻚图所示)。这个掩码机制的作⽤是在注意⼒计算的时候把⽆⽤的信息屏蔽,防⽌模型在计算注意⼒权重时关注到填充位。

# 定义填充注意力掩码函数
def get_attn_pad_mask(seq_q, seq_k):#------------------------- 维度信息 --------------------------------# seq_q 的维度是 [batch_size, len_q]# seq_k 的维度是 [batch_size, len_k]#-----------------------------------------------------------------batch_size, len_q = seq_q.size()batch_size, len_k = seq_k.size()# 生成布尔类型张量pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # <PAD>token 的编码值为 0#------------------------- 维度信息 --------------------------------# pad_attn_mask 的维度是 [batch_size,1,len_k]#-----------------------------------------------------------------# 变形为与注意力分数相同形状的张量 pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)#------------------------- 维度信息 --------------------------------# pad_attn_mask 的维度是 [batch_size,len_q,len_k]#-----------------------------------------------------------------return pad_attn_mask
我们为填充的⽂本序列创建⼀个与其形状相同的⼆维矩阵,称为填充掩码矩阵。填充掩码矩阵的⽬的是在注意⼒计算中屏蔽填充位置的影响。屏蔽流程如下。
(1)根据输⼊⽂本序列创建⼀个与其形状相同的⼆维矩阵。对于原始⽂本中的每个单词,矩阵中对应位置填充0;对于填充的符号,矩阵中对应位置填充1。
(2)为了将填充部分的权重降⾄接近负⽆穷,我们可以先将填充掩码矩阵中的1替换为⼀个⾮常⼤的负数(例如-1e9),再将处理后的填充掩码矩阵与注意⼒分数矩阵进⾏元素相加。这样,有意义的token加了0,值保持不变,⽽填充部分加了⽆穷⼩值,在注意⼒分数矩阵中的权重就会变得⾮常⼩。
(3)对注意⼒分数矩阵应⽤softmax函数进⾏归⼀化。由于填充部分的权重接近负⽆穷,softmax函数会使其归⼀化后的权重接近于0。这样,模型在计算注意⼒时就能够忽略填充部分的信息,专注于序列中实际包含的有效内容。
3.5 编码器层
有了多头⾃注意⼒和逐位置前馈⽹络这两个主要组件,以及正弦位置编码表和填充掩码这两个辅助函数后,现在我们终于可以搭建编码器层这个核⼼组件了。
# 定义编码器层类
class EncoderLayer(nn.Module):def __init__(self):super(EncoderLayer, self).__init__() self.enc_self_attn = MultiHeadAttention() # 多头自注意力层 self.pos_ffn = PoswiseFeedForwardNet() # 位置前馈神经网络层def forward(self, enc_inputs, enc_self_attn_mask):#------------------------- 维度信息 --------------------------------# enc_inputs 的维度是 [batch_size, seq_len, embedding_dim]# enc_self_attn_mask 的维度是 [batch_size, seq_len, seq_len]#-----------------------------------------------------------------# 将相同的 Q,K,V 输入多头自注意力层 , 返回的 attn_weights 增加了头数 enc_outputs, attn_weights = self.enc_self_attn(enc_inputs, enc_inputs,enc_inputs, enc_self_attn_mask)#------------------------- 维度信息 --------------------------------# enc_outputs 的维度是 [batch_size, seq_len, embedding_dim] # attn_weights 的维度是 [batch_size, n_heads, seq_len, seq_len] # 将多头自注意力 outputs 输入位置前馈神经网络层enc_outputs = self.pos_ffn(enc_outputs) # 维度与 enc_inputs 相同#------------------------- 维度信息 --------------------------------# enc_outputs 的维度是 [batch_size, seq_len, embedding_dim] #-----------------------------------------------------------------return enc_outputs, attn_weights # 返回编码器输出和每层编码器注意力权重
这个类将多头自注意力层和位置前馈神经网络层组合在一起,并完成前向传播的计算。
EncoderLayer
类将 多头自注意力机制 和 位置前馈神经网络 结合,完成了 Transformer 编码器层的基本结构。具体过程是:
- 输入经过多头自注意力层,计算查询与键的注意力权重,并生成上下文向量。
- 将上下文向量输入到位置前馈神经网络中,得到最终的编码器输出。
- 返回编码器输出和每层的注意力权重。
这种结构是 Transformer 编码器的核心部分,支持在输入序列中捕捉远距离依赖并进行非线性变换。

3.6 编码器
# 定义编码器类
n_layers = 6 # 设置 Encoder 的层数
class Encoder(nn.Module):def __init__(self, corpus):super(Encoder, self).__init__() self.src_emb = nn.Embedding(len(corpus.src_vocab), d_embedding) # 词嵌入层self.pos_emb = nn.Embedding.from_pretrained( \get_sin_enc_table(corpus.src_len+1, d_embedding), freeze=True) # 位置嵌入层self.layers = nn.ModuleList(EncoderLayer() for _ in range(n_layers))# 编码器层数def forward(self, enc_inputs): #------------------------- 维度信息 --------------------------------# enc_inputs 的维度是 [batch_size, source_len]#-----------------------------------------------------------------# 创建一个从 1 到 source_len 的位置索引序列pos_indices = torch.arange(1, enc_inputs.size(1) + 1).unsqueeze(0).to(enc_inputs)#------------------------- 维度信息 --------------------------------# pos_indices 的维度是 [1, source_len]#----------------------------------------------------------------- # 对输入进行词嵌入和位置嵌入相加 [batch_size, source_len,embedding_dim]enc_outputs = self.src_emb(enc_inputs) + self.pos_emb(pos_indices)#------------------------- 维度信息 --------------------------------# enc_outputs 的维度是 [batch_size, seq_len, embedding_dim]#-----------------------------------------------------------------# 生成自注意力掩码enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) #------------------------- 维度信息 --------------------------------# enc_self_attn_mask 的维度是 [batch_size, len_q, len_k] #----------------------------------------------------------------- enc_self_attn_weights = [] # 初始化 enc_self_attn_weights# 通过编码器层 [batch_size, seq_len, embedding_dim]for layer in self.layers: enc_outputs, enc_self_attn_weight = layer(enc_outputs, enc_self_attn_mask)enc_self_attn_weights.append(enc_self_attn_weight)#------------------------- 维度信息 --------------------------------# enc_outputs 的维度是 [batch_size, seq_len, embedding_dim] 维度与 enc_inputs 相同# enc_self_attn_weights 是一个列表,每个元素的维度是 [batch_size, n_heads, seq_len, seq_len] #-----------------------------------------------------------------return enc_outputs, enc_self_attn_weights # 返回编码器输出和编码器注意力权重
这个编码器类实现了Transformer模型中的编码器部分,包括词嵌⼊、位置嵌⼊和多个编码器层。通过这个编码器,可以处理输⼊序列,并从中提取深层次的特征表示。这些特征表示可以直接应⽤于后续的任务,如序列到序列的⽣成任务(如机器翻译)或者分类任务(如情感分析)等。
BERT模型就只包含Transformer模型中的编码器部分,因此它很适合为各种NLP下游任务提供有⽤的特征表示。
编码器的定义⾄此结束,下⾯我们进⼊解码器组件。不过,在开始构建解码器层之前,也有⼀个⼩组件需要说明,它就是⽣成后续注意⼒掩码的函数。
3.7 后续掩码
在⾃然语⾔处理中,尤其是Seq2Seq任务中,我们需要为解码器提供正确的输⼊,对于已经⽣成的部分,我们要让解码器看到序列是否正确,然后⽤正确的信息(Ground Truth)来预测下⼀个词。但是与此同时,为了确保模型不会提前获取未来的信息,我们⼜需要在注意⼒计算中遮住当前位置后⾯的信息(Subsequent Positions)。
所以,对序列中的第⼀个位置,我们需要遮住后⾯所有的词;⽽对后⾯的词,需要遮住的词会逐渐减少(如下图所示)。⽐如把“咖哥 喜欢 ⼩冰”这句话输⼊解码器,当对“咖哥”计算注意⼒时,解码器不可以看到“喜欢”“⼩冰”这两个词。当对“喜欢”计算注意⼒时,解码器可以看到“咖哥”,不能看到“⼩冰”,因为它正是需要根据“咖哥”和“喜欢”这个上下⽂,来猜测咖哥喜欢谁。当对最后⼀个词"⼩冰"计算注意⼒的时候,前两个词就不是秘密了。

# 生成后续注意力掩码的函数,用于在多头自注意力计算中忽略未来信息
def get_attn_subsequent_mask(seq):#------------------------- 维度信息 --------------------------------# seq 的维度是 [batch_size, seq_len(Q)=seq_len(K)]#-----------------------------------------------------------------# 获取输入序列的形状attn_shape = [seq.size(0), seq.size(1), seq.size(1)] #------------------------- 维度信息 --------------------------------# attn_shape 是一个一维张量 [batch_size, seq_len(Q), seq_len(K)]#-----------------------------------------------------------------# 使用 numpy 创建一个上三角矩阵(triu = triangle upper)subsequent_mask = np.triu(np.ones(attn_shape), k=1)#------------------------- 维度信息 --------------------------------# subsequent_mask 的维度是 [batch_size, seq_len(Q), seq_len(K)]#-----------------------------------------------------------------# 将 numpy 数组转换为 PyTorch 张量,并将数据类型设置为 byte(布尔值)subsequent_mask = torch.from_numpy(subsequent_mask).byte()#------------------------- 维度信息 --------------------------------# 返回的 subsequent_mask 的维度是 [batch_size, seq_len(Q), seq_len(K)]#-----------------------------------------------------------------return subsequent_mask # 返回后续位置的注意力掩码
此段代码最终生成的是注意力掩码,根据上图第一行为例,因为咖哥只能看到自己来推测下面的词,所以先写出咖哥对整个句子的权重,在人为将看不到的地方取消关注(也就是替换成Zero weight),从上到下一行是一个时间的步长。
3.8 解码器层
# 定义解码器层类
class DecoderLayer(nn.Module):def __init__(self):super(DecoderLayer, self).__init__() self.dec_self_attn = MultiHeadAttention() # 多头自注意力层 self.dec_enc_attn = MultiHeadAttention() # 多头自注意力层,连接编码器和解码器 self.pos_ffn = PoswiseFeedForwardNet() # 位置前馈神经网络层def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):#------------------------- 维度信息 --------------------------------# dec_inputs 的维度是 [batch_size, target_len, embedding_dim]# enc_outputs 的维度是 [batch_size, source_len, embedding_dim]# dec_self_attn_mask 的维度是 [batch_size, target_len, target_len]# dec_enc_attn_mask 的维度是 [batch_size, target_len, source_len]#----------------------------------------------------------------- # 将相同的 Q,K,V 输入多头自注意力层dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]# dec_self_attn 的维度是 [batch_size, n_heads, target_len, target_len]#----------------------------------------------------------------- # 将解码器输出和编码器输出输入多头自注意力层dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]# dec_enc_attn 的维度是 [batch_size, n_heads, target_len, source_len]#----------------------------------------------------------------- # 输入位置前馈神经网络层dec_outputs = self.pos_ffn(dec_outputs)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]# dec_self_attn 的维度是 [batch_size, n_heads, target_len, target_len]# dec_enc_attn 的维度是 [batch_size, n_heads, target_len, source_len] #-----------------------------------------------------------------# 返回解码器层输出,每层的自注意力和解 - 编码器注意力权重return dec_outputs, dec_self_attn, dec_enc_attn
定义了一个标准的解码器层,通过三个主要步骤处理输入:
- 自注意力:通过多头自注意力机制理解目标语言的上下文。
- 编码器-解码器注意力:通过与编码器的输出进行交互,理解目标语言与源语言的关系。
- 前馈神经网络:对解码器输出进行进一步的转换和处理。
这些步骤共同作用,使得解码器能够生成目标语言的翻译或输出。

3.9 解码器

解码器类的实现代码如下:
# 定义解码器类
n_layers = 6 # 设置 Decoder 的层数
class Decoder(nn.Module):def __init__(self, corpus):super(Decoder, self).__init__()self.tgt_emb = nn.Embedding(len(corpus.tgt_vocab), d_embedding) # 词嵌入层self.pos_emb = nn.Embedding.from_pretrained( \get_sin_enc_table(corpus.tgt_len+1, d_embedding), freeze=True) # 位置嵌入层 self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)]) # 叠加多层def forward(self, dec_inputs, enc_inputs, enc_outputs): #------------------------- 维度信息 --------------------------------# dec_inputs 的维度是 [batch_size, target_len]# enc_inputs 的维度是 [batch_size, source_len]# enc_outputs 的维度是 [batch_size, source_len, embedding_dim]#----------------------------------------------------------------- # 创建一个从 1 到 source_len 的位置索引序列pos_indices = torch.arange(1, dec_inputs.size(1) + 1).unsqueeze(0).to(dec_inputs)#------------------------- 维度信息 --------------------------------# pos_indices 的维度是 [1, target_len]#----------------------------------------------------------------- # 对输入进行词嵌入和位置嵌入相加dec_outputs = self.tgt_emb(dec_inputs) + self.pos_emb(pos_indices)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]#----------------------------------------------------------------- # 生成解码器自注意力掩码和解码器 - 编码器注意力掩码dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs) # 填充位掩码dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs) # 后续位掩码dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask \+ dec_self_attn_subsequent_mask), 0) dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # 解码器 - 编码器掩码#------------------------- 维度信息 -------------------------------- # dec_self_attn_pad_mask 的维度是 [batch_size, target_len, target_len]# dec_self_attn_subsequent_mask 的维度是 [batch_size, target_len, target_len]# dec_self_attn_mask 的维度是 [batch_size, target_len, target_len]# dec_enc_attn_mask 的维度是 [batch_size, target_len, source_len]#----------------------------------------------------------------- dec_self_attns, dec_enc_attns = [], [] # 初始化 dec_self_attns, dec_enc_attns# 通过解码器层 [batch_size, seq_len, embedding_dim]for layer in self.layers:dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)dec_self_attns.append(dec_self_attn)dec_enc_attns.append(dec_enc_attn)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]# dec_self_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, target_len, target_len]# dec_enc_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, target_len, source_len]#----------------------------------------------------------------- # 返回解码器输出,解码器自注意力和解码器 - 编码器注意力权重 return dec_outputs, dec_self_attns, dec_enc_attns
1.词嵌入:输入目标语言的词索引,并结合位置编码来生成解码器的输入。
2.掩码计算:生成自注意力掩码和解码器-编码器掩码,确保模型不会使用未来信息或填充位置的信息。
3.多层解码器:通过多层解码器来处理输入,生成目标语言的最终表示。
4.返回结果:解码器的输出和每一层的注意力权重。
3.10 transfomer类
# 定义 Transformer 模型
class Transformer(nn.Module):def __init__(self, corpus):super(Transformer, self).__init__() self.encoder = Encoder(corpus) # 初始化编码器实例 self.decoder = Decoder(corpus) # 初始化解码器实例# 定义线性投影层,将解码器输出转换为目标词汇表大小的概率分布self.projection = nn.Linear(d_embedding, len(corpus.tgt_vocab), bias=False)def forward(self, enc_inputs, dec_inputs):#------------------------- 维度信息 --------------------------------# enc_inputs 的维度是 [batch_size, source_seq_len]# dec_inputs 的维度是 [batch_size, target_seq_len]#----------------------------------------------------------------- # 将输入传递给编码器,并获取编码器输出和自注意力权重 enc_outputs, enc_self_attns = self.encoder(enc_inputs)#------------------------- 维度信息 --------------------------------# enc_outputs 的维度是 [batch_size, source_len, embedding_dim]# enc_self_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, src_seq_len, src_seq_len] #----------------------------------------------------------------- # 将编码器输出、解码器输入和编码器输入传递给解码器# 获取解码器输出、解码器自注意力权重和编码器 - 解码器注意力权重 dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]# dec_self_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, tgt_seq_len, src_seq_len]# dec_enc_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, tgt_seq_len, src_seq_len] #----------------------------------------------------------------- # 将解码器输出传递给投影层,生成目标词汇表大小的概率分布dec_logits = self.projection(dec_outputs) #------------------------- 维度信息 --------------------------------# dec_logits 的维度是 [batch_size, tgt_seq_len, tgt_vocab_size]#-----------------------------------------------------------------# 返回逻辑值 ( 原始预测结果 ), 编码器自注意力权重,解码器自注意力权重,解 - 编码器注意力权重return dec_logits, enc_self_attns, dec_self_attns, dec_enc_attns
⾸先初始化编码器、解码器和投影层。在forward⽅法中,将源序列输⼊传递给编码器,获取编码器输出和⾃注意⼒权重。然后将编码器输出、解码器输⼊和编码器输⼊传递给解码器,获取解码器输出、解码器⾃注意⼒权重和编码器-解码器注意⼒权重。最后,将解码器输出传递给投影层,⽣成⽬标词汇表⼤⼩的概率分布。
这个概率分布将被⽤于计算损失和评估模型的性能。
四、举一个栗子跑跑
4.1 数据准备
先准备几个中英翻译例句
sentences = [['咖哥 喜欢 小冰', 'KaGe likes XiaoBing'],['我 爱 学习 人工智能', 'I love studying AI'],['深度学习 改变 世界', ' DL changed the world'],['自然语言处理 很 强大', 'NLP is powerful'],['神经网络 非常 复杂', 'Neural-networks are complex'] ]
然后,创建TranslationCorpus类,⽤于读⼊中英翻译语料,并⽣成字典和模型可以读取的数据批次。
from collections import Counter # 导入 Counter 类
# 定义 TranslationCorpus 类
class TranslationCorpus:def __init__(self, sentences):self.sentences = sentences# 计算源语言和目标语言的最大句子长度,并分别加 1 和 2 以容纳填充符和特殊符号self.src_len = max(len(sentence[0].split()) for sentence in sentences) + 1self.tgt_len = max(len(sentence[1].split()) for sentence in sentences) + 2# 创建源语言和目标语言的词汇表self.src_vocab, self.tgt_vocab = self.create_vocabularies()# 创建索引到单词的映射self.src_idx2word = {v: k for k, v in self.src_vocab.items()}self.tgt_idx2word = {v: k for k, v in self.tgt_vocab.items()}# 定义创建词汇表的函数def create_vocabularies(self):# 统计源语言和目标语言的单词频率src_counter = Counter(word for sentence in self.sentences for word in sentence[0].split())tgt_counter = Counter(word for sentence in self.sentences for word in sentence[1].split()) # 创建源语言和目标语言的词汇表,并为每个单词分配一个唯一的索引src_vocab = {'<pad>': 0, **{word: i+1 for i, word in enumerate(src_counter)}}tgt_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, **{word: i+3 for i, word in enumerate(tgt_counter)}} return src_vocab, tgt_vocab# 定义创建批次数据的函数def make_batch(self, batch_size, test_batch=False):input_batch, output_batch, target_batch = [], [], []# 随机选择句子索引sentence_indices = torch.randperm(len(self.sentences))[:batch_size]for index in sentence_indices:src_sentence, tgt_sentence = self.sentences[index]# 将源语言和目标语言的句子转换为索引序列src_seq = [self.src_vocab[word] for word in src_sentence.split()]tgt_seq = [self.tgt_vocab['<sos>']] + [self.tgt_vocab[word] \for word in tgt_sentence.split()] + [self.tgt_vocab['<eos>']] # 对源语言和目标语言的序列进行填充src_seq += [self.src_vocab['<pad>']] * (self.src_len - len(src_seq))tgt_seq += [self.tgt_vocab['<pad>']] * (self.tgt_len - len(tgt_seq)) # 将处理好的序列添加到批次中input_batch.append(src_seq)output_batch.append([self.tgt_vocab['<sos>']] + ([self.tgt_vocab['<pad>']] * \(self.tgt_len - 2)) if test_batch else tgt_seq[:-1])target_batch.append(tgt_seq[1:]) # 将批次转换为 LongTensor 类型input_batch = torch.LongTensor(input_batch)output_batch = torch.LongTensor(output_batch)target_batch = torch.LongTensor(target_batch) return input_batch, output_batch, target_batch
# 创建语料库类实例
corpus = TranslationCorpus(sentences)
4.2 训练Transfomer模型
import torch # 导入 torch
import torch.optim as optim # 导入优化器
model = Transformer(corpus) # 创建模型实例
criterion = nn.CrossEntropyLoss() # 损失函数
optimizer = optim.Adam(model.parameters(), lr=0.0001) # 优化器
epochs = 5 # 训练轮次
for epoch in range(epochs): # 训练 100 轮optimizer.zero_grad() # 梯度清零enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size) # 创建训练数据 outputs, _, _, _ = model(enc_inputs, dec_inputs) # 获取模型输出 loss = criterion(outputs.view(-1, len(corpus.tgt_vocab)), target_batch.view(-1)) # 计算损失if (epoch + 1) % 1 == 0: # 打印损失print(f"Epoch: {epoch + 1:04d} cost = {loss:.6f}")loss.backward()# 反向传播 optimizer.step()# 更新参数
训练100轮之后,损失会减⼩到⼀个较⼩的值。
4.3 测试Transfomer模型
# 创建一个大小为 1 的批次,目标语言序列 dec_inputs 在测试阶段,仅包含句子开始符号 <sos>
enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size=1,test_batch=True)
print("编码器输入 :", enc_inputs) # 打印编码器输入
print("解码器输入 :", dec_inputs) # 打印解码器输入
print("目标数据 :", target_batch) # 打印目标数据
predict, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs) # 用模型进行翻译
predict = predict.view(-1, len(corpus.tgt_vocab)) # 将预测结果维度重塑
predict = predict.data.max(1, keepdim=True)[1] # 找到每个位置概率最大的词汇的索引
# 解码预测的输出,将所预测的目标句子中的索引转换为单词
translated_sentence = [corpus.tgt_idx2word[idx.item()] for idx in predict.squeeze()]
# 将输入的源语言句子中的索引转换为单词
input_sentence = ' '.join([corpus.src_idx2word[idx.item()] for idx in enc_inputs[0]])
print(input_sentence, '->', translated_sentence) # 打印原始句子和翻译后的句子
编码器输入 : tensor([[11, 12, 13, 0, 0]])解码器输入 : tensor([[1, 0, 0, 0, 0]])目标数据 : tensor([[14, 15, 16, 2, 0]])自然语言处理 很 强大 <pad> <pad> -> ['NLP', 'NLP', 'NLP', 'NLP', 'NLP']
这个Transformer能训练,能⽤。不过,其输出结果并不理想,模型只成功翻译了⼀个单词“NLP”,之后就不断重复这个词。
对于这样简单的数据集,在设计和选择模型时,应该优先考虑简单的模型,像Transformer这样⽐较复杂的模型并不⼀定效果更好。
这次测试效果不理想的真正原因和模型的简单或者复杂⽆关,主要是因为此处我们并没有利⽤解码器的⾃回归机制进⾏逐位置(即逐词、逐令牌、逐元素或逐时间步)的⽣成式输出。