1 语言模型步骤
简单概述:根据输入内容,继续输出后面的句子。
1.1 根据需求拆分任务
- (1)先对模型输入一段文字,令模型输出之后的一个文字。
- (2)将模型预测出来的文字当成输入,再放到模型里,使模型预测出下一个文字,这样循环下去,以使RNN完成一句话的输出。
1.2 根据任务设计功能模块
- (1)模型能够记住前面文字的语义;
- (2)能够根据前面的语义和一个输入文字,输出下一个文字。
1.3 根据功能模块设计实现方案
RNN模型的接口可以输出两个结果:预测值和当前状态
- 在实现时,将输入的序列样本拆开,使用循环的方式,将字符逐个输入模型。模型会对每次的输入预测出两个结果,一个是预测字符,另一个是当前的序列状态。
- 在训练场景下,将硕测字骑用于计算损失,列状动用于传入下一次循环计算。
- 在测试场景下,用循环的方式将输入序列中的文字一个个地传入到模型中,得到最后一个时刻的当前状态,将该状态和输入序列中的最后一个文字转入模型,生成下一个文字的预测结果。同时,按照要求生成的文字条件,重复地将新生成的文字和当前状态输入模型,来预测下一个文字。
2 语言模型的代码实现
2.1 准备样本数据
样本内容:
在尘世的纷扰中,只要心头悬挂着远方的灯光,我们就会坚持不懈地走,理想为我们灌注了精神的蕴藉。所以,生活再平凡、再普通、再琐碎,我们都要坚持一种信念,默守一种精神,为自己积淀站立的信心,前行的气力。
2.1.1定义基本工具函数---make_Language_model.py(第1部分)
首先引入头文件,然后定义相关函数:get_ch_lable()从文件中获取文本,get_ch._able_v0将文本数组转方向量,具体代码如下:
import numpy as np
import torch
import torch.nn.functional as F
import time
import random
from collections import Counter# 1.1 定义基本的工具函数
RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')def elapsed(sec): # 计算时间函数if sec < 60:return str(sec) + "sec"elif sec<(60*60):return str(sec/60) + "min"else:return str(sec/(60*60)) + "hour"training_file = 'csv_list/wordstest.txt' # 定义样本文件#中文字
def get_ch_label(txt_file): # 提取样本中的汉字labels = ""with open(txt_file,'rb') as f :for label in f:labels = labels + label.decode("gb2312",errors = 'ignore')return labels#中文多文件
def readalltxt(txt_files): # 处理中文labels = []for txt_file in txt_files:target = get_ch_label(txt_file)labels.append(target)return labels# 将汉子转化成向量,支持文件和内存对象里的汉字转换
def get_ch_label_v(txt_file,word_num_map,txt_label = None):words_size = len(word_num_map)to_num = lambda word:word_num_map.get(word,words_size)if txt_file != None:txt_label = get_ch_label(txt_file)# 使用to_num()实现单个汉子转成向量的功能,如果没有该汉字,则将返回words_size(值为69)labels_vector = list(map(to_num,txt_label)) # 将汉字列表中的每个元素传入到to_num()进行转换return labels_vector
2.1.2 样本预处理---make_Language_model.py(第2部分)
样本预处理这个一套指读取整个样本,将其放入training_data里,获取全部的字表words,并生成样本向量wordlabel与向量有对应关系的word_num_map,代码如下:
# 1.2 样本预处理
training_data = get_ch_label(training_file)
print("加载训练模型中")
print("该样本长度:",len(training_data))
counter = Counter(training_data)
words = sorted(counter)
words_size = len(words)
word_num_map = dict(zip(words,range(words_size)))
print("字表大小:",words_size)
wordlabel = get_ch_label_v(training_file,word_num_map)
# 加载训练模型中
# 该样本长度: 75
# 字表大小: 41(去重)
上述结果表示样本文件里一共有75个文字,其中掉重复的文字之后,还有41个。这41个文字将作为字表词典,建立文字与索引值的对应关系。
在训练模型时,每个文字都会被转化成数字形式的索引值输入模型。模型的输出是这41个文字的概率,即把每个文字当成一类。
2.2 代码实现:构建循环神经网络模型---make_Language_model.py(第3部分)
使用GRU构建RNN模型,令RNN模型只接收一个序列的喻入字符,并预测出下一个序列的字符。
在该模型里,所需要完成的步骤如下:
- 将输入的字索引转为词嵌入;
- 将词嵌入结果输入GRU层;
- 对GRU结果做全连接处理,得到维度为69的预测结果,这个预测结果代表每个文字的概率。
2.2.1 代码实现
# 1.3 构建循环神经网络(RNN)模型
class GRURNN(torch.nn.Module):def __init__(self,word_size,embed_dim,hidden_dim,output_size,num_layers):super(GRURNN, self).__init__()self.num_layers = num_layersself.hidden_dim = hidden_dimself.embed = torch.nn.Embedding(word_size,embed_dim)# 定义一个多层的双向层:# 预测结果:形状为[序列,批次,维度hidden_dim×2],因为是双向RNN,故维度为hidden_dim# 序列状态:形状为[层数×2,批次,维度hidden_dim]self.gru = torch.nn.GRU(input_size=embed_dim,hidden_size=hidden_dim,num_layers=num_layers,bidirectional=True)self.fc = torch.nn.Linear(hidden_dim *2,output_size)# 全连接层,充当模型的输出层,用于对GRU输出的预测结果进行处理,得到最终的分类结果def forward(self,features,hidden):embeded = self.embed(features.view(1,-1))output,hidden = self.gru(embeded.view(1,1,-1),hidden)# output = self.attention(output)output = self.fc(output.view(1,-1))return output,hiddendef init_zero_state(self): # 对于GRU层状态的初始化,每次迭代训练之前,需要对GRU的状态进行清空,因为输入的序列是1,故torch.zeros的第二个参数为1init_hidden = torch.zeros(self.num_layers*2,1,self.hidden_dim).to(DEVICE)return init_hidden
2.3 代码实现:实例化,训练模型--make_Language_model.py(第3部分)
# 1.4 实例化模型类,并训练模型
EMBEDDING_DIM = 10 # 定义词嵌入维度
HIDDEN_DIM = 20 # 定义隐藏层维度
NUM_LAYERS = 1 # 定义层数
# 实例化模型
model = GRURNN(words_size,EMBEDDING_DIM,HIDDEN_DIM,words_size,NUM_LAYERS)
model = model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(),lr=0.005)# 定义测试函数
def evaluate(model,prime_str,predict_len,temperature=0.8):hidden = model.init_zero_state().to(DEVICE)predicted = ""# 处理输入语义for p in range(len(prime_str) -1):_,hidden = model(prime_str[p],hidden)predicted = predicted + words[predict_len]inp = prime_str[-1] # 获得输入字符predicted = predicted + words[inp]#按照指定长度输出预测字符for p in range(predict_len):output,hidden = model(inp,hidden) # 将输入字符和状态传入模型# 从多项式中分布采样# 在测试环境下,使用温度的参数和指数计算对模型的输出结果进行微调,保证其数值是大于0的数,小于0,torch.multinomial()会报错# 同时,使用多项式分布的方式进行采样,生成预测结果output_dist = output.data.view(-1).div(temperature).exp()inp = torch.multinomial(output_dist,1)[0] # 获取采样结果predicted = predicted + words[inp] # 将索引转化成汉字保存在字符串中return predicted# 定义参数并训练
training_iters = 5000
display_step = 1000
n_input = 4
step = 0
offset = random.randint(0,n_input+1)
end_offset = n_input + 1while step < training_iters: # 按照迭代次数训练模型start_time = time.time() # 计算起始时间#随机取一个位置偏移if offset > (len(training_data)-end_offset):offset = random.randint(0,n_input+1)# 制作输入样本inwords = wordlabel[offset:offset+n_input]inwords = np.reshape(np.array(inwords),[n_input,-1,1])# 制作标签样本out_onehot = wordlabel[offset+1:offset+n_input+1]hidden = model.init_zero_state() # RNN的状态清零optimizer.zero_grad()loss = 0.0inputs = torch.LongTensor(inwords).to(DEVICE)targets = torch.LongTensor(out_onehot).to(DEVICE)for c in range(n_input): # 按照输入长度将样本预测输入模型并进行预测outputs,hidden = model(inputs[c],hidden)loss = loss + F.cross_entropy(outputs,targets[c].view(1))loss = loss / n_inputloss.backward()optimizer.step()# 输出日志with torch.set_grad_enabled(False):if (step+1)%display_step == 0 :print(f'Time elapesd:{(time.time() - start_time)/60:.4f}min')print(f'step {step + 1}|Loss {loss.item():.2f}\n\n')with torch.no_grad():print(evaluate(model,inputs,32),'\n')print(50*'=')step = step +1# 每次迭代结束,将偏移值相后移动n_input+1个距离单位,可以保证输入数据的样本相互均匀,否则会出现文本两边的样本训练次数较少的情况。offset = offset + (n_input+1)
print("Finished!")
2.4 代码实现:运行模型生成句子--make_Language_model.py(第4部分)
# 1.5 运行模型生成句子
while True:prompt = "输入几个文字:"sentence = input(prompt)inputword = sentence.strip()try:inputword = get_ch_label_v(None,word_num_map,inputword)keys = np.reshape(np.array(inputword),[len(inputword),-1,1])# get_ch_label_v()中,如果在字典中找不到对应的索引,就会为其分配一个无效的索引值,# 进而在 evaluate()函数中调用模型的时,差不多对应对的有效词向量而终止报错model.eval()with torch.no_grad():sentence = evaluate(model,torch.LongTensor(keys).to(DEVICE),32)print(sentence)except: # 异常处理,当输入的文字不在模型字典中时,系统会报错,有意设置,防止输入超范围的字词print("还没学会")
3 代码总览--make_Language_model.py
import numpy as np
import torch
import torch.nn.functional as F
import time
import random
from collections import Counter# 1.1 定义基本的工具函数
RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')def elapsed(sec): # 计算时间函数if sec < 60:return str(sec) + "sec"elif sec<(60*60):return str(sec/60) + "min"else:return str(sec/(60*60)) + "hour"training_file = 'csv_list/wordstest.txt' # 定义样本文件#中文字
def get_ch_label(txt_file): # 提取样本中的汉字labels = ""with open(txt_file,'rb') as f :for label in f:labels = labels + label.decode("gb2312",errors = 'ignore')return labels#中文多文件
def readalltxt(txt_files): # 处理中文labels = []for txt_file in txt_files:target = get_ch_label(txt_file)labels.append(target)return labels# 将汉子转化成向量,支持文件和内存对象里的汉字转换
def get_ch_label_v(txt_file,word_num_map,txt_label = None):words_size = len(word_num_map)to_num = lambda word:word_num_map.get(word,words_size)if txt_file != None:txt_label = get_ch_label(txt_file)# 使用to_num()实现单个汉子转成向量的功能,如果没有该汉字,则将返回words_size(值为69)labels_vector = list(map(to_num,txt_label)) # 将汉字列表中的每个元素传入到to_num()进行转换return labels_vector# 1.2 样本预处理
training_data = get_ch_label(training_file)
print("加载训练模型中")
print("该样本长度:",len(training_data))
counter = Counter(training_data)
words = sorted(counter)
words_size = len(words)
word_num_map = dict(zip(words,range(words_size)))
print("字表大小:",words_size)
wordlabel = get_ch_label_v(training_file,word_num_map)
# 加载训练模型中
# 该样本长度: 75
# 字表大小: 41(去重)# 1.3 构建循环神经网络(RNN)模型
class GRURNN(torch.nn.Module):def __init__(self,word_size,embed_dim,hidden_dim,output_size,num_layers):super(GRURNN, self).__init__()self.num_layers = num_layersself.hidden_dim = hidden_dimself.embed = torch.nn.Embedding(word_size,embed_dim)# 定义一个多层的双向层:# 预测结果:形状为[序列,批次,维度hidden_dim×2],因为是双向RNN,故维度为hidden_dim# 序列状态:形状为[层数×2,批次,维度hidden_dim]self.gru = torch.nn.GRU(input_size=embed_dim,hidden_size=hidden_dim,num_layers=num_layers,bidirectional=True)self.fc = torch.nn.Linear(hidden_dim *2,output_size)# 全连接层,充当模型的输出层,用于对GRU输出的预测结果进行处理,得到最终的分类结果def forward(self,features,hidden):embeded = self.embed(features.view(1,-1))output,hidden = self.gru(embeded.view(1,1,-1),hidden)# output = self.attention(output)output = self.fc(output.view(1,-1))return output,hiddendef init_zero_state(self): # 对于GRU层状态的初始化,每次迭代训练之前,需要对GRU的状态进行清空,因为输入的序列是1,故torch.zeros的第二个参数为1init_hidden = torch.zeros(self.num_layers*2,1,self.hidden_dim).to(DEVICE)return init_hidden# 1.4 实例化模型类,并训练模型
EMBEDDING_DIM = 10 # 定义词嵌入维度
HIDDEN_DIM = 20 # 定义隐藏层维度
NUM_LAYERS = 1 # 定义层数
# 实例化模型
model = GRURNN(words_size,EMBEDDING_DIM,HIDDEN_DIM,words_size,NUM_LAYERS)
model = model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(),lr=0.005)# 定义测试函数
def evaluate(model,prime_str,predict_len,temperature=0.8):hidden = model.init_zero_state().to(DEVICE)predicted = ""# 处理输入语义for p in range(len(prime_str) -1):_,hidden = model(prime_str[p],hidden)predicted = predicted + words[predict_len]inp = prime_str[-1] # 获得输入字符predicted = predicted + words[inp]#按照指定长度输出预测字符for p in range(predict_len):output,hidden = model(inp,hidden) # 将输入字符和状态传入模型# 从多项式中分布采样# 在测试环境下,使用温度的参数和指数计算对模型的输出结果进行微调,保证其数值是大于0的数,小于0,torch.multinomial()会报错# 同时,使用多项式分布的方式进行采样,生成预测结果output_dist = output.data.view(-1).div(temperature).exp()inp = torch.multinomial(output_dist,1)[0] # 获取采样结果predicted = predicted + words[inp] # 将索引转化成汉字保存在字符串中return predicted# 定义参数并训练
training_iters = 5000
display_step = 1000
n_input = 4
step = 0
offset = random.randint(0,n_input+1)
end_offset = n_input + 1while step < training_iters: # 按照迭代次数训练模型start_time = time.time() # 计算起始时间#随机取一个位置偏移if offset > (len(training_data)-end_offset):offset = random.randint(0,n_input+1)# 制作输入样本inwords = wordlabel[offset:offset+n_input]inwords = np.reshape(np.array(inwords),[n_input,-1,1])# 制作标签样本out_onehot = wordlabel[offset+1:offset+n_input+1]hidden = model.init_zero_state() # RNN的状态清零optimizer.zero_grad()loss = 0.0inputs = torch.LongTensor(inwords).to(DEVICE)targets = torch.LongTensor(out_onehot).to(DEVICE)for c in range(n_input): # 按照输入长度将样本预测输入模型并进行预测outputs,hidden = model(inputs[c],hidden)loss = loss + F.cross_entropy(outputs,targets[c].view(1))loss = loss / n_inputloss.backward()optimizer.step()# 输出日志with torch.set_grad_enabled(False):if (step+1)%display_step == 0 :print(f'Time elapesd:{(time.time() - start_time)/60:.4f}min')print(f'step {step + 1}|Loss {loss.item():.2f}\n\n')with torch.no_grad():print(evaluate(model,inputs,32),'\n')print(50*'=')step = step +1# 每次迭代结束,将偏移值相后移动n_input+1个距离单位,可以保证输入数据的样本相互均匀,否则会出现文本两边的样本训练次数较少的情况。offset = offset + (n_input+1)
print("Finished!")# 1.5 运行模型生成句子
while True:prompt = "输入几个文字:"sentence = input(prompt)inputword = sentence.strip()try:inputword = get_ch_label_v(None,word_num_map,inputword)keys = np.reshape(np.array(inputword),[len(inputword),-1,1])# get_ch_label_v()中,如果在字典中找不到对应的索引,就会为其分配一个无效的索引值,# 进而在 evaluate()函数中调用模型的时,差不多对应对的有效词向量而终止报错model.eval()with torch.no_grad():sentence = evaluate(model,torch.LongTensor(keys).to(DEVICE),32)print(sentence)except: # 异常处理,当输入的文字不在模型字典中时,系统会报错,有意设置,防止输入超范围的字词print("还没学会")
模型结果:训练的并不好,但是还能用