按照阿光的项目
做出了学习笔记,pytorch深度学习实战项目100例
基于LSTM实现春联上联对下联
基于LSTM(长短期记忆网络)实现春联上联对下联是一种有趣且具有挑战性的任务,它涉及到自然语言处理(NLP)中的序列到序列(seq2seq)模型。LSTM是处理序列数据的理想选择,因为它能够记住长期的依赖信息,这对于生成符合语境和文化习俗的春联下联至关重要。
数据
https://github.com/wb14123/couplet-dataset
感谢大佬的分享的对联数据集
对数据集的处理
def data_generator(data):# 计算每个对联长度的权重data_probability = [float(len(x)) for wordcount, [x, y] in data.items()] # [每个字数key对应对联list中上联数据的个数]data_probability = np.array(data_probability) / sum(data_probability) # 标准化至[0,1],这是每个字数的权重# 随机选择字数,然后随机选择字数对应的上联样本,生成batchfor idx in range(15):# 随机选字数id,概率为上面计算的字数权重idx = idx + 1size = min(batch_size, len(data[idx][0])) # batch_size=64,len(data[idx][0])随机选择的字数key对应的上联个数# 从上联列表下标list中随机选出大小为size的listidxs = np.random.choice(len(data[idx][0]), size=size)# 返回选出的上联X与下联y, 将原本1-d array维度扩展为(row,col,1)yield data[idx][0][idxs], np.expand_dims(data[idx][1][idxs], axis=2)# 加载文本数据
def load_data(input_path, output_path):# 数据读取与切分def read_data(file_path):txt = codecs.open(file_path, encoding='utf-8').readlines()txt = [line.strip().split(' ') for line in txt] # 每行按空格切分txt = [line for line in txt if len(line) < 16] # 过滤掉字数超过maxlen的对联return txt# 产生数据字典def generate_count_dict(result_dict, x, y):for i, idx in enumerate(x):j = len(idx)if j not in result_dict:result_dict[j] = [[], []] # [样本数据list,类别标记list]result_dict[j][0].append(idx)result_dict[j][1].append(y[i])return result_dict# 将字典数据转为numpydef to_numpy_array(dict):for count, [x, y] in dict.items():dict[count][0] = np.array(x)dict[count][1] = np.array(y)return dictx = read_data(input_path)y = read_data(output_path)# 获取词表vocabulary = x + y# 构造字符级别的特征string = ''for words in vocabulary:for word in words:string += word# 所有的词汇表vocabulary = set(string)word2idx = {word: i for i, word in enumerate(vocabulary)}idx2word = {i: word for i, word in enumerate(vocabulary)}# 训练数据中所有词的个数vocab_size = len(word2idx.keys()) # 词汇表大小# 将x和y转为数值x = [[word2idx[word] for word in sent] for sent in x]y = [[word2idx[word] for word in sent] for sent in y]train_dict = {}train_dict = generate_count_dict(train_dict, x, y)train_dict = to_numpy_array(train_dict)return train_dict, vocab_size, idx2word, word2idx
基本想法:
这种场景是典型的 Encoder-Decoder 框架应用问题。
在这个框架中:
- Encoder 负责读取输入序列(上联)并将其转换成一个固定长度的内部表示形式,通常是最后一个时间步的隐藏状态。这个内部表示被视为输入序列的“上下文”或“意义”,包含了生成输出序列所需的所有信息。
- Decoder 接收这个内部表示并开始生成输出序列(下联),一步一步地生成,直到产生序列结束标记或达到特定长度。
构建模型
模型架构:使用seq2seq模型,该模型一般包括一个编码器(encoder)和一个解码器(decoder),两者都可以是LSTM网络。编码器负责处理上联,而解码器则生成下联。
嵌入层:通常在模型的第一层使用嵌入层,将每个字符或词转换为固定大小的向量,这有助于模型更好地理解语言中的语义信息。
# 定义网络结构
class LSTM(nn.Module):def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers):super(LSTM, self).__init__()self.hidden_dim = hidden_dimself.embeddings = nn.Embedding(vocab_size + 1, embedding_dim)self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers)self.linear = nn.Linear(hidden_dim, vocab_size)def forward(self, x):time_step, batch_size = x.size() # 124, 16embeds = self.embeddings(x)output, (h_n, c_n) = self.lstm(embeds)output = self.linear(output.reshape(time_step * batch_size, -1))# 要返回所有时间点的数据,每个时间点对应一个字,也就是vocab_size维度的向量return output
训练模型
# 加载数据
train_dict, vocab_size, idx2word, word2idx = load_data(input_path, output_path)# 模型训练
model = LSTM(vocab_size=vocab_size, hidden_dim=hidden_dim,embedding_dim=embedding_dim, num_layers=num_layers)Configimizer = optim.Adam(model.parameters(), lr=lr) # 优化器
criterion = nn.CrossEntropyLoss() # 多分类损失函数model.to(device)
loss_meter = meter.AverageValueMeter()best_loss = 999 # 保存loss
best_model = None # 保存对应最好准确率的模型参数for epoch in range(epochs):model.train() # 开启训练模式loss_meter.reset()for x, y in data_generator(train_dict):x = torch.from_numpy(x).long().transpose(1, 0).contiguous()x = x.to(device)y = torch.from_numpy(y).long().transpose(1, 0).contiguous()y = y.to(device)Configimizer.zero_grad()# 形成预测结果output_ = model(x)# 计算损失loss = criterion(output_, y.long().view(-1))loss.backward()Configimizer.step()loss_meter.add(loss.item())# 打印信息print("【EPOCH: 】%s" % str(epoch + 1))print("训练损失为%s" % (str(loss_meter.mean)))# 保存模型及相关信息if loss_meter.mean < best_loss:best_loss = loss_meter.meanbest_model = model.state_dict()# 在训练结束保存最优的模型参数if epoch == epochs - 1:# 保存模型torch.save(best_model, './best_model.pkl')
测试
import codecsimport numpy as np
import torch
from torch import nn
from torch import optim
from torchnet import meter# 模型输入参数,需要自己根据需要调整
input_path = 'C:\\Users\\kaai\\AppData\\Local\\Temp\\BNZ.65e95f542f0fca6f\\train\\in.txt'
output_path = 'C:\\Users\\kaai\\AppData\\Local\\Temp\\BNZ.65e95f542f0fca6f\\train\\out.txt'
num_layers = 1 # LSTM层数
hidden_dim = 100 # LSTM中的隐层大小
epochs = 50 # 迭代次数
batch_size = 128 # 每个批次样本大小
embedding_dim = 15 # 每个字形成的嵌入向量大小
lr = 0.01 # 学习率
device = 'cpu' # 设备# 用于生成训练数据
def data_generator(data):# 计算每个对联长度的权重data_probability = [float(len(x)) for wordcount, [x, y] in data.items()] # [每个字数key对应对联list中上联数据的个数]data_probability = np.array(data_probability) / sum(data_probability) # 标准化至[0,1],这是每个字数的权重# 随机选择字数,然后随机选择字数对应的上联样本,生成batchfor idx in range(15):# 随机选字数id,概率为上面计算的字数权重idx = idx + 1size = min(batch_size, len(data[idx][0])) # batch_size=64,len(data[idx][0])随机选择的字数key对应的上联个数# 从上联列表下标list中随机选出大小为size的listidxs = np.random.choice(len(data[idx][0]), size=size)# 返回选出的上联X与下联y, 将原本1-d array维度扩展为(row,col,1)yield data[idx][0][idxs], np.expand_dims(data[idx][1][idxs], axis=2)# 加载文本数据
def load_data(input_path, output_path):# 数据读取与切分def read_data(file_path):txt = codecs.open(file_path, encoding='utf-8').readlines()txt = [line.strip().split(' ') for line in txt] # 每行按空格切分txt = [line for line in txt if len(line) < 16] # 过滤掉字数超过maxlen的对联return txt# 产生数据字典def generate_count_dict(result_dict, x, y):for i, idx in enumerate(x):j = len(idx)if j not in result_dict:result_dict[j] = [[], []] # [样本数据list,类别标记list]result_dict[j][0].append(idx)result_dict[j][1].append(y[i])return result_dict# 将字典数据转为numpydef to_numpy_array(dict):for count, [x, y] in dict.items():dict[count][0] = np.array(x)dict[count][1] = np.array(y)return dictx = read_data(input_path)y = read_data(output_path)# 获取词表vocabulary = x + y# 构造字符级别的特征string = ''for words in vocabulary:for word in words:string += word# 所有的词汇表vocabulary = set(string)word2idx = {word: i for i, word in enumerate(vocabulary)}idx2word = {i: word for i, word in enumerate(vocabulary)}# 训练数据中所有词的个数vocab_size = len(word2idx.keys()) # 词汇表大小# 将x和y转为数值x = [[word2idx[word] for word in sent] for sent in x]y = [[word2idx[word] for word in sent] for sent in y]train_dict = {}train_dict = generate_count_dict(train_dict, x, y)train_dict = to_numpy_array(train_dict)return train_dict, vocab_size, idx2word, word2idx# 定义网络结构
class LSTM(nn.Module):def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers):super(LSTM, self).__init__()self.hidden_dim = hidden_dimself.embeddings = nn.Embedding(vocab_size + 1, embedding_dim)self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers)self.linear = nn.Linear(hidden_dim, vocab_size)def forward(self, x):time_step, batch_size = x.size() # 124, 16embeds = self.embeddings(x)output, (h_n, c_n) = self.lstm(embeds)output = self.linear(output.reshape(time_step * batch_size, -1))# 要返回所有时间点的数据,每个时间点对应一个字,也就是vocab_size维度的向量return outputdef couplet_match(s):# 将字符串转为数值x = [word2idx[word] for word in s]# 将数值向量转为tensorx = torch.from_numpy(np.array(x).reshape(-1, 1))# 加载模型model_path = './best_model.pkl'model = LSTM(vocab_size=vocab_size, hidden_dim=hidden_dim,embedding_dim=embedding_dim, num_layers=num_layers)model.load_state_dict(torch.load(model_path, 'cpu'))y = model(x)y = y.argmax(axis=1)r = ''.join([idx2word[idx.item()] for idx in y])print('上联:%s,下联:%s' % (s, r))
# 加载数据
train_dict, vocab_size, idx2word, word2idx = load_data(input_path, output_path)
# 测试
sentence = '恭喜发财'
couplet_match(sentence)