1 EncoderDecoder
1.1 _init_
class EncoderDecoder(nn.Module):def __init__(self, vocab_size, embedding_size,hidden_size, num_layers, dropout, bidirectional):super(EncoderDecoder, self).__init__()self.vocab_size = vocab_size #词汇表大小self.embedding_size = embedding_size #词向量嵌入的维度大小## the embedding shared by encoder and decoderself.embedding = nn.Embedding(vocab_size, embedding_size,padding_idx=constants.PAD)#词向量嵌入层self.encoder = Encoder(embedding_size, hidden_size, num_layers,dropout, bidirectional, self.embedding)#编码器self.decoder = Decoder(embedding_size, hidden_size, num_layers,dropout, self.embedding)#解码器self.num_layers = num_layers
1.2 load_pretrained_embedding
从指定的路径加载预训练的词嵌入权重,并将这些权重复制到模型中的 embedding
层
def load_pretrained_embedding(path):if os.path.isfile(path):w = torch.load(path)#加载预训练的嵌入权重到变量 wself.embedding.weight.data.copy_(w)#将加载的权重 w 复制到模型的嵌入层
1.3 encoder_hn2decoder_h0
'''
转换编码器的输出隐藏状态
'''
def encoder_hn2decoder_h0(self, h):"""Input:编码器的输出隐藏状态h (num_layers * num_directions, batch, hidden_size): encoder output hn---Output: 解码器的初始隐藏状态h (num_layers, batch, hidden_size * num_directions): decoder input h0"""if self.encoder.num_directions == 2:num_layers, batch, hidden_size = h.size(0)//2, h.size(1), h.size(2)#根据输入 h 的形状计算 num_layers, batch 和 hidden_sizereturn h.view(num_layers, 2, batch, hidden_size)\.transpose(1, 2).contiguous()\.view(num_layers, batch, hidden_size * 2)'''使用 view 方法将 h 重塑为形状 (num_layers, 2, batch, hidden_size)。这里的 2 对应于双向RNN的两个方向使用 transpose 交换第2和第3维使用 contiguous 确保张量在内存中是连续的使用 view 方法再次重塑张量,将两个方向的隐藏状态连接在一起,形成形状 (num_layers, batch, hidden_size * 2) 的张量'''else:return h
pytorch笔记:contiguous &tensor 存储知识_pytorch中的tensor存储是列主布局还是行主布局_UQI-LIUWJ的博客-CSDN博客
1.4 forward
def forward(self, src, lengths, trg):"""Input:src (src_seq_len, batch): source tensor 源序列lengths (1, batch): source sequence lengths 源序列的长度trg (trg_seq_len, batch): target tensor, the `seq_len` in trg is notnecessarily the same as that in src 目标序列需要注意的是,目标序列的长度并不一定与源序列的长度相同---Output:output (trg_seq_len, batch, hidden_size)"""encoder_hn, H = self.encoder(src, lengths)#将源序列src和其长度lengths传递给编码器decoder_h0 = self.encoder_hn2decoder_h0(encoder_hn)#将编码器的输出隐藏状态encoder_hn转换为适合解码器的初始隐藏状态decoder_h0。## for target we feed the range [BOS:EOS-1] into decoderoutput, decoder_hn = self.decoder(trg[:-1], decoder_h0, H)return output
2 Encoder
2.1 init
class Encoder(nn.Module):def __init__(self, input_size, hidden_size, num_layers, dropout,bidirectional, embedding):"""embedding (vocab_size, input_size): pretrained embedding"""super(Encoder, self).__init__()self.num_directions = 2 if bidirectional else 1#根据 bidirectional 参数决定方向数量assert hidden_size % self.num_directions == 0self.hidden_size = hidden_size // self.num_directionsself.num_layers = num_layersself.embedding = embeddingself.rnn = nn.GRU(input_size, self.hidden_size,num_layers=num_layers,bidirectional=bidirectional,dropout=dropout)
2.2 forward
'''
数据在编码器中的传播方式,并且考虑了序列的真实长度以处理填充
'''
def forward(self, input, lengths, h0=None):"""Input:input (seq_len, batch): padded sequence tensorlengths (1, batch): sequence lengthsh0 (num_layers*num_directions, batch, hidden_size): initial hidden state---Output:hn (num_layers*num_directions, batch, hidden_size):the hidden state of each layeroutput (seq_len, batch, hidden_size*num_directions): output tensor"""# (seq_len, batch) => (seq_len, batch, input_size)embed = self.embedding(input)#将输入序列索引转换为嵌入表示#input(seq_len,batch)->embed(seq_len,batch,self.embedding_size)lengths = lengths.data.view(-1).tolist()if lengths is not None:embed = pack_padded_sequence(embed, lengths)#使用pack_padded_sequence对填充的序列进行打包,以便RNN可以跳过填充项output, hn = self.rnn(embed, h0)#将嵌入的序列传递给GRU RNNif lengths is not None:output = pad_packed_sequence(output)[0]#使用pad_packed_sequence对输出序列进行解包,得到RNN的完整输出return hn, output
pytorch 笔记:PAD_PACKED_SEQUENCE 和PACK_PADDED_SEQUENCE-CSDN博客
pytorch笔记:PackedSequence对象送入RNN-CSDN博客
3 Decoder
3.1 init
def __init__(self, input_size, hidden_size, num_layers, dropout, embedding):super(Decoder, self).__init__()self.embedding = embeddingself.rnn = StackingGRUCell(input_size, hidden_size, num_layers,dropout)self.attention = GlobalAttention(hidden_size)self.dropout = nn.Dropout(dropout)self.num_layers = num_layers
3.2 forward
'''
seq2seq的解码过程,使用了可选的注意力机制
'''
def forward(self, input, h, H, use_attention=True):"""Input:input (seq_len, batch): padded sequence tensorh (num_layers, batch, hidden_size): input hidden stateH (seq_len, batch, hidden_size): the context used in attention mechanismwhich is the output of encoderuse_attention: If True then we use attention---Output:output (seq_len, batch, hidden_size)h (num_layers, batch, hidden_size): output hidden state,h may serve as input hidden state for the next iteration,especially when we feed the word one by one (i.e., seq_len=1)such as in translation"""assert input.dim() == 2, "The input should be of (seq_len, batch)"# (seq_len, batch) => (seq_len, batch, input_size)embed = self.embedding(input)#将输入序列转换为嵌入向量output = []# split along the sequence length dimensionfor e in embed.split(1):#split(1)每次沿着seq_len方法分割一行#即每个e的维度是(1,batch,input_size)e = e.squeeze(0) # (1, batch, input_size) => (batch, input_size)o, h = self.rnn(e, h)#用RNN处理嵌入向量,并得到输出o和新的隐藏状态h#这边的RNN是StackingGRUCell,也即我认为可能是seq_len为1的GRU#o:(batch, hidden_size)#h:(num_layers,batch, hidden_size)if use_attention:o = self.attention(o, H.transpose(0, 1))#如果use_attention为True,将使用注意力机制处理RNN的输出o = self.dropout(o)#为了正则化和防止过拟合,应用 dropoutoutput.append(o)output = torch.stack(output)#将所有的输出叠加为一个张量return output, h#(seq_len, batch, hidden_size)
4 StackingGRUCell
个人感觉就是
class StackingGRUCell(nn.Module):"""Multi-layer CRU Cell"""def __init__(self, input_size, hidden_size, num_layers, dropout):super(StackingGRUCell, self).__init__()self.num_layers = num_layersself.grus = nn.ModuleList()self.dropout = nn.Dropout(dropout)self.grus.append(nn.GRUCell(input_size, hidden_size))for i in range(1, num_layers):self.grus.append(nn.GRUCell(hidden_size, hidden_size))
def forward(self, input, h0):"""Input:input (batch, input_size): input tensorh0 (num_layers, batch, hidden_size): initial hidden state---Output:output (batch, hidden_size): the final layer output tensorhn (num_layers, batch, hidden_size): the hidden state of each layer"""hn = []output = inputfor i, gru in enumerate(self.grus):hn_i = gru(output, h0[i])#在每一次循环中,输入output会经过一个GRU单元并更新隐藏状态hn.append(hn_i)if i != self.num_layers - 1:output = self.dropout(hn_i)else:output = hn_i#如果不是最后一层,输出会经过一个dropout层。hn = torch.stack(hn)#将hn列表转变为一个张量return output, hn
5 GlobalAttention
'''
对于给定的查询向量q,查找上下文矩阵H中哪些向量与其最相关,并使用这些相关性的加权和来生成一个新的上下文向量
'''
class GlobalAttention(nn.Module):"""$$a = \sigma((W_1 q)H)$$$$c = \tanh(W_2 [a H, q])$$"""def __init__(self, hidden_size):super(GlobalAttention, self).__init__()self.L1 = nn.Linear(hidden_size, hidden_size, bias=False)self.L2 = nn.Linear(2*hidden_size, hidden_size, bias=False)self.softmax = nn.Softmax(dim=1)self.tanh = nn.Tanh()def forward(self, q, H):"""Input:q (batch, hidden_size): queryH (batch, seq_len, hidden_size): context---Output:c (batch, hidden_size)"""# (batch, hidden_size) => (batch, hidden_size, 1)q1 = self.L1(q).unsqueeze(2)#使用线性变换L1对查询向量q进行变换,然后增加一个维度以进行后续的批量矩阵乘法# (batch, seq_len)a = torch.bmm(H, q1).squeeze(2)#计算查询向量与上下文矩阵H中的每一个向量的点积。#这将生成一个形状为(batch, seq_len)的张量,表示查询向量与每个上下文向量的相似度a = self.softmax(a)#经过softmax,得到注意力权重# (batch, seq_len) => (batch, 1, seq_len)a = a.unsqueeze(1)#增加一个维度以进行后续的批量矩阵乘法# (batch, hidden_size)c = torch.bmm(a, H).squeeze(1)#使用注意力权重与上下文矩阵H进行加权求和,得到上下文向量c# (batch, hidden_size * 2)c = torch.cat([c, q], 1)#将上下文向量与查询向量连接在一起return self.tanh(self.L2(c))#使用线性变换L2对连接后的向量进行变换,并使用tanh激活函数