参考地址
https://aistudio.baidu.com/projectdetail/7723195
import mathimport paddle
import paddle.nn as nn
import paddle.nn.functional as Fclass MaskMultiHeadAttention(nn.Layer):def __init__(self, hidden_size, num_heads):super(MaskMultiHeadAttention, self).__init__()assert hidden_size % num_heads == 0, "Hidden size must be divisible by the number of heads."self.hidden_size = hidden_sizeself.num_heads = num_headsself.head_size = hidden_size // num_heads# Query, Key, Value matricesself.query_linear = nn.Linear(hidden_size, hidden_size)self.value_linear = nn.Linear(hidden_size, hidden_size)# Output matrixself.output_linear = nn.Linear(hidden_size, hidden_size)def forward(self, x, state=None):batch_size = x.shape[0]# Compute Query, Key, Value for all heads in parallelquery = self.query_linear(x).reshape([batch_size, -1, self.num_heads, self.head_size]).transpose([0, 2, 1, 3])value = self.value_linear(x).reshape([batch_size, -1, self.num_heads, self.head_size]).transpose([0, 2, 1, 3])# Compute attention scoresscores = (F.relu(query) + F.relu(-query)) / (self.head_size ** 0.5 + 0.000000000001)if state is None:state=0state=paddle.cumsum(scores, -2) + 0.0000000000000001+statescores = scores / statestate=state[:,:,-1:]out = scores * value# Concatenate and transform to get the final outputout = out.transpose([0, 2, 1, 3]).reshape([batch_size, -1, self.hidden_size])out = self.output_linear(out)return out,stateclass FeedForward(nn.Layer):def __init__(self, hidden_size):super(FeedForward, self).__init__()self.ffn1 = nn.Linear(hidden_size, hidden_size // 2)self.ffn2 = nn.Linear(hidden_size // 2, hidden_size)self.relu = nn.ReLU()def forward(self, x):x = self.ffn1(x)x = self.relu(x)x = self.ffn2(x)return xclass GPTDecoderLayer(nn.Layer):def __init__(self, hidden_size, num_heads):super(GPTDecoderLayer, self).__init__()self.self_attention = MaskMultiHeadAttention(hidden_size, num_heads)self.ffn = FeedForward(hidden_size)self.norm1 = nn.LayerNorm(hidden_size)def forward(self, x,state=None):x1,state = self.self_attention(x,state) # Self-Attention with residual connectionx = x1 + xx = self.norm1(x)x = self.ffn(x) + x # Feed-Forward with residual connectionx = self.norm1(x)return x,stateclass PositionalEncoding(nn.Layer):def __init__(self, d_model, max_len=5000):super(PositionalEncoding, self).__init__()# Create a long enough Paddle array to hold position encodings for the maximum sequence lengthposition = paddle.arange(max_len).unsqueeze(1).astype("float32")# Create a constant 'pe' matrix with the same size as the embedding matrixdiv_term = paddle.exp(paddle.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))pe = paddle.zeros([max_len, d_model])pe[:, 0::2] = paddle.sin(position * div_term)pe[:, 1::2] = paddle.cos(position * div_term)self.pe = pe.unsqueeze(0) # Shape: [1, max_len, d_model]# Register 'pe' as a buffer (non-trainable parameter)def forward(self, x, seq_len=None):# x is of shape [batch_size, seq_len, d_model]if seq_len is None:seq_len = x.shape[1]return x + self.pe[:, :seq_len, :]else:return x + self.pe[:, seq_len - 1:seq_len, :]class GPT(nn.Layer):def __init__(self, vocab_size, hidden_size, num_heads, num_layers):super(GPT, self).__init__()self.embedding = nn.Embedding(vocab_size, hidden_size)self.position_embedding = PositionalEncoding(hidden_size)self.decoder_layers = nn.LayerList([GPTDecoderLayer(hidden_size, num_heads) for _ in range(num_layers)])self.fc = nn.Linear(hidden_size, vocab_size)def forward(self, x, state=None, seq_len=None):x = self.embedding(x)x = self.position_embedding(x, seq_len)if state is None:state=[None]*len(self.decoder_layers)i=0for decoder_layer in self.decoder_layers:x,state[i] = decoder_layer(x,state[i])i+=1out = self.fc(x)return out,statedef check_mask_multi_head_attention():x = paddle.randn([4, 10, 20])mha = MaskMultiHeadAttention(20, 5)out = mha(x)def check_positional_encoding():x = paddle.randn([4, 10, 20])pe = PositionalEncoding(20)out = pe(x)def check_gpt_decoder_layer():x = paddle.randn([4, 10, 20])dl = GPTDecoderLayer(20, 5)out = dl(x)def check_gpt():x = paddle.randint(4, 10, [4, 10])gpt = GPT(10, 20, 5, 2)out = gpt(x)def check_all():check_mask_multi_head_attention()check_positional_encoding()check_gpt_decoder_layer()check_gpt()if __name__ == '__main__':# 检查所有模块check_all()
该代码实现了GPT(Generative Pre-trained Transformer)模型的Decoder部分。
首先定义了一个MaskMultiHeadAttention
类,它是多头注意力机制的实现。在forward
方法中,输入x
经过线性变换得到Query、Key和Value张量,然后计算注意力得分。注意力得分进行了ReLU激活和归一化处理,并进行了累加操作。最后,将注意力得分与Value相乘得到输出。
接下来定义了一个FeedForward
类,它是GPT模型中的前馈网络层。在forward
方法中,输入x
经过线性变换和ReLU激活后再经过另一个线性变换得到输出。
然后定义了一个GPTDecoderLayer
类,它包含了一个MaskMultiHeadAttention
层和一个FeedForward
层。在forward
方法中,输入x
先经过注意力层,然后与输入相加,并进行LayerNorm归一化。接着经过前馈网络层,再与输入相加,并再次进行LayerNorm归一化。最后得到输出。
接下来定义了一个PositionalEncoding
类,用于生成位置编码。位置编码的计算过程参考论文《Attention Is All You Need》,其中使用了一个正弦函数和一个余弦函数来计算位置编码。在forward
方法中,将输入x
与位置编码相加得到输出。
最后定义了一个GPT
类,它是整个GPT模型的实现。在forward
方法中,输入x
先经过嵌入层和位置编码层,然后通过多个GPTDecoderLayer
层进行处理。最后通过一个线性变换层得到最终的预测结果。
在代码的最后,定义了一些用于测试各个模块的函数,并在__main__
中调用这些函数进行测试。