人活着就是为了救赎自己;为了经历世间的美好;为了在挫折中成长变得更坚强
—— 24.12.10
一、定义模型
1.嵌入层
nn.Embedding:将离散值转化为向量
# embedding层,vocab:词表,要多少个数据(向量数量); dim:向量维度; idx:控制长度
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0) #embedding层
2.池化层
nn.AvgPool1d:平均值池化
# 平均值池化 sentence _length 句长
self.pool = nn.AvgPool1d(sentence_length) #池化层
3.线性层
nn.Linear:线性层
self.classify = nn.Linear(vector_dim, 1) #线性层
4.激活函数
torch.sigmoid:sigmoid归一化函数
self.activation = torch.sigmoid #sigmoid归一化函数
5.损失函数
nn.functional.mse_loss:均方差损失函数
self.loss = nn.functional.mse_loss #loss函数采用均方差损失
6.前向传播
forward
#当输入真实标签,返回loss值;无真实标签,返回预测值def forward(self, x, y=None):# 嵌入层x = self.embedding(x) #(batch_size, sen_len) -> (batch_size, sen_len, vector_dim)# 转置层x = x.transpose(1, 2) #(batch_size, sen_len, vector_dim) -> (batch_size, vector_dim, sen_len)# 平均池化层x = self.pool(x) #(batch_size, vector_dim, sen_len)->(batch_size, vector_dim, 1)# 去掉多余的维度 1x = x.squeeze() #(batch_size, vector_dim, 1) -> (batch_size, vector_dim)# 过线性层x = self.classify(x) #(batch_size, vector_dim) -> (batch_size, 1) 3*20 * 20*1 -> 3*1# sigmoid激活函数归一化y_pred = self.activation(x) #(batch_size, 1) -> (batch_size, 1)if y is not None:return self.loss(y_pred, y) #预测值和真实值计算损失else:return y_pred #输出预测结果
7.建立模型
根据定义的模型TorchModel类建立模型
#建立模型
def build_model(vocab, char_dim, sentence_length):model = TorchModel(char_dim, sentence_length, vocab)return model
二、 生成样本
1.建立词表
pad
pad是占位补齐的功能,声明要补位的元素
在自然语言处理(NLP)任务中,由于不同句子长度可能不同,通常需要对句子进行填充(Padding)以确保它们具有相同的长度。pad 参数在嵌入层中用于处理这些填充的标记。
在NLP任务中,尽量少做截断,多做补0
unk
在自然语言处理(NLP)任务中,词汇表(Vocabulary)通常是有限的,这意味着某些词汇可能不在词汇表中。为了处理这些未登录词(Out-of-Vocabulary,OOV),通常会引入一个特殊的标记<unk>(Unknown)。<unk>标记用于表示词汇表中未包含的词汇
#字符集随便挑了一些字,实际上还可以扩充
#为每个字生成一个标号
#{"a":1, "b":2, "c":3...}
#abc -> [1,2,3]
# 构建词表
def build_vocab():chars = "你我他defghijklmnopqrstuvwxyz" #字符集vocab = {"pad":0}for index, char in enumerate(chars):vocab[char] = index+1 #每个字对应一个序号vocab['unk'] = len(vocab) #26return vocab
2. 随机生成一个样本
random.choice():是 Python 标准库 random 模块中的一个函数。它的主要功能是从给定的序列(如列表、元组、字符串等)中随机选择一个元素。
set() 函数: set() 是 Python 内置的用于创建集合的数据类型转换函数。集合(set)是一种无序、不重复的数据结构,它里面的元素具有唯一性。例如:`set("abc")` 会将字符串 `"abc"` 转换为一个包含字符 `'a'`、`'b'`、`'c'` 的集合,即 `{'a', 'b', 'c'}`。
#随机生成一个样本
#从所有字中选取sentence_length个字
#反之为负样本
def build_sample(vocab, sentence_length):#随机从字表选取sentence_length个字,可能重复x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)]#指定哪些字出现时为正样本if set("你我他") & set(x):y = 1#指定字都未出现,则为负样本else:y = 0x = [vocab.get(word, vocab['unk']) for word in x] #将字转换成序号,为了做embeddingreturn x, y
3.建立数据集
append(x):append()是 Python 中列表(list )类型的一个内置方法。它的主要作用是在列表的末尾添加一个新的元素。这个新元素可以是任何数据类型,如整数、字符串、列表、字典
LongTensor:LongTensor(即 torch.LongTensor )是一种数据类型,用于表示整数型的张量(Tensor)。
FloatTensor:FloatTensor(即 torch.FloatTensor)是一种常用的数据类型,用于表示浮点型的张量(Tensor)
张量:张量是 PyTorch 中的核心数据结构,类似于多维数组,可以用于存储和处理各种数据,如向量、矩阵和更高维的数据结构。 LongTensor 主要用于存储整数数据
#建立数据集
#输入需要的样本数量。需要多少生成多少
def build_dataset(sample_length, vocab, sentence_length):dataset_x = []dataset_y = []for i in range(sample_length):x, y = build_sample(vocab, sentence_length)dataset_x.append(x)dataset_y.append([y])return torch.LongTensor(dataset_x), torch.FloatTensor(dataset_y)
三、模型测试
model.eval():将模型设置为测试模式,改变模型中某些层的行为,以适应在模型评估和推理阶
段的需求。
with torch.no_grad():在其包裹的代码块内禁用梯度计算。减少梯度计算造成的资源开销
zip():用于将多个可迭代对象(如列表、元组、字符串等)中对应的元素打包成一个个元组,然后返回一个由这些元组组成的可迭代对象(通常是一个 zip 对象)。如果传入的可迭代对象的长度不一致,那么 zip()函数会以最短的可迭代对象的长度为准进行打包。
#测试代码
#用来测试每轮模型的准确率
def evaluate(model, vocab, sample_length):model.eval()x, y = build_dataset(200, vocab, sample_length) #建立200个用于测试的样本print("本次预测集中共有%d个正样本,%d个负样本"%(sum(y), 200 - sum(y)))correct, wrong = 0, 0with torch.no_grad():y_pred = model(x) #模型预测for y_p, y_t in zip(y_pred, y): #与真实标签进行对比if float(y_p) < 0.5 and int(y_t) == 0:correct += 1 #负样本判断正确elif float(y_p) >= 0.5 and int(y_t) == 1:correct += 1 #正样本判断正确else:wrong += 1print("正确预测个数:%d, 正确率:%f"%(correct, correct/(correct+wrong)))return correct/(correct+wrong)
四、模型训练
1.配置训练参数
#配置参数
epoch_num = 10 #训练轮数
batch_size = 20 #每次训练样本个数
train_sample = 500 #每轮训练总共训练的样本总数
char_dim = 20 #每个字的维度
sentence_length = 6 #样本文本长度
learning_rate = 0.005 #学习率
2.建立模型
# 建立模型
model = build_model(vocab, char_dim, sentence_length)
3.选择优化器
Adam优化器
Adam(Adaptive Moment Estimation)是一种优化算法,主要用于在训练神经网络等机器学习模型时更新模型的参数。它结合了动量(Momentum)和自适应学习率(Adaptive Learning Rate)的思想能够在训练过程中根据梯度的一阶矩估计和二阶矩估计动态地调整每个参数的学习率,从而更高效地优化模型。
工作原理:首先计算模型参数关于损失函数的梯度(求导),
torch.optim.Adam():创建Adam优化器对象
# 选择优化器
optim = torch.optim.Adam(model.parameters(), lr=learning_rate)
4.训练过程
① 计算损失 ② 计算梯度 ③ 更新权重 ④ 梯度归零
model.train():将模型设置为训练模式
append(x):append()是 Python 中列表(list )类型的一个内置方法。它的主要作用是在列表的末尾添加一个新的元素。这个新元素可以是任何数据类型,如整数、字符串、列表、字典
# 训练过程
for epoch in range(epoch_num):model.train()watch_loss = []for batch in range(int(train_sample / batch_size)):x, y = build_dataset(batch_size, vocab, sentence_length) #构造一组训练样本loss = model(x, y) #计算lossloss.backward() #计算梯度optim.step() #更新权重optim.zero_grad() #梯度归零watch_loss.append(loss.item())print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))acc = evaluate(model, vocab, sentence_length) #测试本轮模型结果log.append([acc, np.mean(watch_loss)])
5.保存模型
torch.save():PyTorch 中用于保存模型或张量(Tensor)对象的函数。它可以将模型的参数、整个模型结构(在某些情况下)或者任意的张量保存到磁盘文件中,以便后续的加载和使用,例如用于模型的持久化存储,方便在不同的环境或时间点重新加载模型进行评估、继续训练或者其他应用。
# 保存模型
torch.save(model.state_dict(), "model.pth")
6.保存词表
open():主要用于打开或创建文件,并返回一个文件对象,通过这个文件对象可以对文件进行读取、写入、追加等各种操作。
write():write()是文件对象(通过 open()函数打开文件后得到的对象)的一个方法。它主要用于将指定的字符串(在文本模式下)或字节串(在二进制模式下)写入到文件中
close():close()是 Python 文件对象的一个方法,用于关闭已经打开的文件。它可以释放与文件相关的系统资源,如文件描述符、缓冲区等。如果不关闭文件,可能会导致资源泄漏、文件损坏或者其他不可预测的错误。
# 保存词表
writer = open("vocab词表.json", "w", encoding="utf8")
writer.write(json.dumps(vocab, ensure_ascii=False, indent=2))
writer.close()
7.调用模型训练函数
if __name__ == "__main__":main()
五、使用训练好的模型做预测
1.预测函数
model.load_state_dict():model.load state_dict 是 PyTorch 中用于加载模型参数的一个重要方法,它应用于继承自torch.nn.Module的模型对象。
其主要作用是将预训练好的模型参数(通常以字典形式保存)加载到当前的模型结构中,使得当前模型能够复用这些已有的参数,进而可以基于这些参数进行后续的评估、微调(fine-tuning)或者继续训练等操作。
append():列表(list )类型的一个内置方法。它用于在列表的末尾添加-个新的元素。这个新元素可以是任何数据类型,包括但不限于整数、浮点数、字符串、列表、字典等
model.eval():将模型设置为测试模式,改变模型中某些层的行为,以适应在模型评估和推理阶
段的需求。
with torch.no_grad():在其包裹的代码块内禁用梯度计算。减少梯度计算造成的资源开销
enumerate():它的主要作用是将一个可迭代对象(如列表、元组、字符串等)组合为一个索引序列,同时返回索引和对应元素的元组,这个函数在需要同时获取元素及其索引的时候非常有用。
# 使用训练好的模型做预测
def predict(model_path, vocab_path, input_strings):char_dim = 20 # 每个字的维度sentence_length = 6 # 样本文本长度vocab = json.load(open(vocab_path, "r", encoding="utf8")) #加载字符表model = build_model(vocab, char_dim, sentence_length) #建立模型model.load_state_dict(torch.load(model_path, weights_only=True)) #加载训练好的权重x = []for input_string in input_strings:x.append([vocab[char] for char in input_string]) #将输入序列化model.eval() #测试模式with torch.no_grad(): #不计算梯度result = model.forward(torch.LongTensor(x)) #模型预测for i, input_string in enumerate(input_strings):print("输入:%s, 预测类别:%d, 概率值:%f" % (input_string, round(float(result[i])), result[i])) #打印结果
2.对给定字符串进行预测
调用predict预测函数
if __name__ == "__main__":test_strings = ["fnvfee", "wz你dfg", "rqwdeg", "n我kwww"]predict("model.pth", "vocab词表.json", test_strings)
完整代码
plt.plot():绘制曲线函数
plt.legend():在绘制的图表中添加图例。图例会显示之前通过 label 参数给每条曲线设置的标签及对应的曲线样式(比如颜色、线条类型等),使得查看图表的人能够清楚地区分不同曲线所代表的数据内容
plt.show():显示绘制好的图表的函数。
#coding:utf8import torch
import torch.nn as nn
import numpy as np
import random
import json
import matplotlib.pyplot as plt"""基于pytorch的网络编写
实现一个网络完成一个简单nlp任务
判断文本中是否有某些特定字符出现"""class TorchModel(nn.Module):def __init__(self, vector_dim, sentence_length, vocab):super(TorchModel, self).__init__()# embedding层,vocab:词表,要多少个数据(向量数量); dim:向量维度; idx:控制长度self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0) #embedding层# 平均值池化self.pool = nn.AvgPool1d(sentence_length) #池化层self.classify = nn.Linear(vector_dim, 1) #线性层self.activation = torch.sigmoid #sigmoid归一化函数self.loss = nn.functional.mse_loss #loss函数采用均方差损失#当输入真实标签,返回loss值;无真实标签,返回预测值def forward(self, x, y=None):# 嵌入层x = self.embedding(x) #(batch_size, sen_len) -> (batch_size, sen_len, vector_dim)# 转置层x = x.transpose(1, 2) #(batch_size, sen_len, vector_dim) -> (batch_size, vector_dim, sen_len)# 平均池化层x = self.pool(x) #(batch_size, vector_dim, sen_len)->(batch_size, vector_dim, 1)# 去掉多余的维度 1x = x.squeeze() #(batch_size, vector_dim, 1) -> (batch_size, vector_dim)# 过线性层x = self.classify(x) #(batch_size, vector_dim) -> (batch_size, 1) 3*20 * 20*1 -> 3*1# sigmoid激活函数归一化y_pred = self.activation(x) #(batch_size, 1) -> (batch_size, 1)if y is not None:return self.loss(y_pred, y) #预测值和真实值计算损失else:return y_pred #输出预测结果#字符集随便挑了一些字,实际上还可以扩充
#为每个字生成一个标号
#{"a":1, "b":2, "c":3...}
#abc -> [1,2,3]
# 构建词表
def build_vocab():chars = "你我他defghijklmnopqrstuvwxyz" #字符集vocab = {"pad":0}for index, char in enumerate(chars):vocab[char] = index+1 #每个字对应一个序号vocab['unk'] = len(vocab) #26return vocab#随机生成一个样本
#从所有字中选取sentence_length个字
#反之为负样本
def build_sample(vocab, sentence_length):#随机从字表选取sentence_length个字,可能重复x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)]#指定哪些字出现时为正样本if set("你我他") & set(x):y = 1#指定字都未出现,则为负样本else:y = 0x = [vocab.get(word, vocab['unk']) for word in x] #将字转换成序号,为了做embeddingreturn x, y#建立数据集
#输入需要的样本数量。需要多少生成多少
def build_dataset(sample_length, vocab, sentence_length):dataset_x = []dataset_y = []for i in range(sample_length):x, y = build_sample(vocab, sentence_length)dataset_x.append(x)dataset_y.append([y])return torch.LongTensor(dataset_x), torch.FloatTensor(dataset_y)#建立模型
def build_model(vocab, char_dim, sentence_length):model = TorchModel(char_dim, sentence_length, vocab)return model#测试代码
#用来测试每轮模型的准确率
def evaluate(model, vocab, sample_length):model.eval()x, y = build_dataset(200, vocab, sample_length) #建立200个用于测试的样本print("本次预测集中共有%d个正样本,%d个负样本"%(sum(y), 200 - sum(y)))correct, wrong = 0, 0with torch.no_grad():y_pred = model(x) #模型预测for y_p, y_t in zip(y_pred, y): #与真实标签进行对比if float(y_p) < 0.5 and int(y_t) == 0:correct += 1 #负样本判断正确elif float(y_p) >= 0.5 and int(y_t) == 1:correct += 1 #正样本判断正确else:wrong += 1print("正确预测个数:%d, 正确率:%f"%(correct, correct/(correct+wrong)))return correct/(correct+wrong)def main():#配置参数epoch_num = 10 #训练轮数batch_size = 20 #每次训练样本个数train_sample = 500 #每轮训练总共训练的样本总数char_dim = 20 #每个字的维度sentence_length = 6 #样本文本长度learning_rate = 0.005 #学习率# 建立字表vocab = build_vocab()# 建立模型model = build_model(vocab, char_dim, sentence_length)# 选择优化器optim = torch.optim.Adam(model.parameters(), lr=learning_rate)log = []# 训练过程for epoch in range(epoch_num):model.train()watch_loss = []for batch in range(int(train_sample / batch_size)):x, y = build_dataset(batch_size, vocab, sentence_length) #构造一组训练样本loss = model(x, y) #计算lossloss.backward() #计算梯度optim.step() #更新权重optim.zero_grad() #梯度归零watch_loss.append(loss.item())print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))acc = evaluate(model, vocab, sentence_length) #测试本轮模型结果log.append([acc, np.mean(watch_loss)])# 画图plt.plot(range(len(log)), [l[0] for l in log], label="acc") #画acc曲线plt.plot(range(len(log)), [l[1] for l in log], label="loss") #画loss曲线plt.legend()plt.show()# 保存模型torch.save(model.state_dict(), "model.pth")# 保存词表writer = open("vocab词表.json", "w", encoding="utf8")writer.write(json.dumps(vocab, ensure_ascii=False, indent=2))writer.close()return# 使用训练好的模型做预测
def predict(model_path, vocab_path, input_strings):char_dim = 20 # 每个字的维度sentence_length = 6 # 样本文本长度vocab = json.load(open(vocab_path, "r", encoding="utf8")) #加载字符表model = build_model(vocab, char_dim, sentence_length) #建立模型model.load_state_dict(torch.load(model_path, weights_only=True)) #加载训练好的权重x = []for input_string in input_strings:x.append([vocab[char] for char in input_string]) #将输入序列化model.eval() #测试模式with torch.no_grad(): #不计算梯度result = model.forward(torch.LongTensor(x)) #模型预测for i, input_string in enumerate(input_strings):print("输入:%s, 预测类别:%d, 概率值:%f" % (input_string, round(float(result[i])), result[i])) #打印结果if __name__ == "__main__":main()test_strings = ["fnvfee", "wz你dfg", "rqwdeg", "n我kwww"]predict("model.pth", "vocab词表.json", test_strings)