前言
仅记录学习过程,有问题欢迎讨论
什么时候应该使用Pooling层:
- 如果针对每个字做标注,无需;若是针对整句话做分类,则需要pooling
NER(数据标注):B/M/E (A/O/P) --左/中/右 边界(地址/机构/人名) O–无关字
CRF -转移矩阵: 如果一个字已经是B_location 那么它大概率是E_location 或者 M_location 和其他的BEM 基本无关
- shape为 label * label
发射矩阵:shape为 sen_len * tag_size ;相当于输出每个字对应tag的概率矩阵
采用bert:
PERSON类实体,准确率:0.480000, 召回率: 0.208092, F1: 0.290318
LOCATION类实体,准确率:0.624113, 召回率: 0.458333, F1: 0.528524
TIME类实体,准确率:0.739130, 召回率: 0.625767, F1: 0.677736
ORGANIZATION类实体,准确率:0.500000, 召回率: 0.282353, F1: 0.360898
Macro-F1: 0.464369
Micro-F1 0.492606
采用LSTM:
PERSON类实体,准确率:0.432000, 召回率: 0.312139, F1: 0.362411
LOCATION类实体,准确率:0.512987, 召回率: 0.411458, F1: 0.456642
TIME类实体,准确率:0.721804, 召回率: 0.588957, F1: 0.648644
ORGANIZATION类实体,准确率:0.450000, 召回率: 0.423529, F1: 0.436359
Macro-F1: 0.476014
Micro-F1 0.479633
`
代码
实现一个NER代码,划分每一句话的B/M/E/O
config.py
"""
配置参数信息
"""
Config = {"model_path": "./output/","model_name": "model.pt","schema_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\schema.json","train_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\train.txt","valid_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\test.txt","vocab_path": r"E:\Anlp\week9 序列标注问题\ner\chars.txt","model_type": "bert",# 数据标注中计算loss"use_crf": True,# 文本向量大小"char_dim": 20,# 文本长度"max_len": 50,# 词向量大小"hidden_size": 64,# 训练 轮数"epoch_size": 15,# 批量大小"batch_size": 25,# 训练集大小"simple_size": 300,# 学习率"lr": 0.001,# dropout"dropout": 0.5,# 优化器"optimizer": "adam",# 卷积核"kernel_size": 3,# 最大池 or 平均池"pooling_style": "max",# 模型层数"num_layers": 2,"bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese",# 输出层大小"output_size": 9,# 随机数种子"seed": 987
}
load.py j加载数据文件
"""
数据加载
"""
"""
数据加载
"""
import os
import numpy as np
import json
import re
import os
import torch
import torch.utils.data as Data
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer# 获取字表集
def load_vocab(path):vocab = {}with open(path, 'r', encoding='utf-8') as f:for index, line in enumerate(f):word = line.strip()# 0留给padding位置,所以从1开始vocab[word] = index + 1vocab['unk'] = len(vocab) + 1return vocabclass DataGenerator:def __init__(self, data_path, config):self.data_path = data_pathself.config = configself.schema = self.load_schema(config["schema_path"])if self.config["model_type"] == "bert":self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])self.vocab = load_vocab(config["vocab_path"])self.config["vocab_size"] = len(self.vocab)# 中文的语句listself.sentence_list = []self.data = self.load_data()def __len__(self):return len(self.data)def __getitem__(self, idx):return self.data[idx]def load_schema(self, path):with open(path, encoding="utf8") as f:return json.load(f)def load_data(self):dataset_x = []dataset_y = []with open(self.data_path, 'r', encoding='utf-8') as f:# 每句话segments = f.read().split("\n\n")# 每句话字符 如: 你 0for segment in segments:sentences = []labels = []for line in segment.split("\n"):if line.strip() == "":continuechar, label = line.split()sentences.append(char)labels.append(self.schema[label])self.sentence_list.append(' '.join(sentences))input_id = self.sentence_to_index(sentences)# labels 也需要padding相同长度labels = self.padding(labels)# 标签和文本组成一个样本dataset_x.append(input_id)dataset_y.append(labels)data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y))return data# 文本预处理# 转化为向量def sentence_to_index(self, text):input_ids = []vocab = self.vocabfor char in text:input_ids.append(vocab.get(char, vocab['unk']))# 填充or裁剪input_ids = self.padding(input_ids)return input_ids# 数据预处理 裁剪or填充def padding(self, input_ids):length = self.config["max_len"]if len(input_ids) >= length:return input_ids[:length]else:padded_input_ids = input_ids + [0] * (length - len(input_ids))return padded_input_ids# 用torch自带的DataLoader类封装数据
def load_data_batch(data_path, config, shuffle=True):dg = DataGenerator(data_path, config)# DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用)dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)return dlif __name__ == '__main__':from config import Configdg = DataGenerator(Config["train_data_path"], Config)print(len(dg))print(dg[0])
main.py 主方法
import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import TorchModel, choose_optimizer
from loader import load_data_batch
from evaluate import Evaluator# [DEBUG, INFO, WARNING, ERROR, CRITICAL]logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)"""
模型训练主程序
"""
# 通过设置随机种子来复现上一次的结果(避免随机性)
seed = Config["seed"]
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)def main(config):# 保存模型的目录if not os.path.isdir(config["model_path"]):os.mkdir(config["model_path"])# 加载数据dataset = load_data_batch(config["train_data_path"], config)# 加载模型model = TorchModel(config)# 是否使用gpuif torch.cuda.is_available():logger.info("gpu可以使用,迁移模型至gpu")model.cuda()# 选择优化器optim = choose_optimizer(config, model)# 加载效果测试类evaluator = Evaluator(config, model, logger)for epoch in range(config["epoch_size"]):epoch += 1logger.info("epoch %d begin" % epoch)epoch_loss = []# 训练模型model.train()for index, batch_data in enumerate(dataset):if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]# x, y = dataiter# 反向传播optim.zero_grad()x, y = batch_data # 输入变化时这里需要修改,比如多输入,多输出的情况# 计算梯度loss = model(x, y)# 梯度更新loss.backward()# 优化器更新模型optim.step()# 记录损失epoch_loss.append(loss.item())logger.info("epoch average loss: %f" % np.mean(epoch_loss))# 测试模型效果acc = evaluator.eval(epoch)# 可以用model_type model_path epoch 三个参数来保存模型# model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"]))# torch.save(model.state_dict(), model_path) # 保存模型权重returnif __name__ == "__main__":main(Config)# for model in ["cnn"]:# Config["model_type"] = model# print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"])# 对比所有模型# 中间日志可以关掉,避免输出过多信息# 超参数的网格搜索# for model in ["gated_cnn"]:# Config["model_type"] = model# for lr in [1e-3, 1e-4]:# Config["learning_rate"] = lr# for hidden_size in [128]:# Config["hidden_size"] = hidden_size# for batch_size in [64, 128]:# Config["batch_size"] = batch_size# for pooling_style in ["avg"]:# Config["pooling_style"] = pooling_style# 可以把输出放入文件中 便于查看# print("最后一轮准确率:", main(Config), "当前配置:", Config)
evaluate.py 评估模型文件
"""
模型效果测试
"""
import re
from collections import defaultdictimport numpy as np
import torch
from loader import load_data_batchclass Evaluator:def __init__(self, config, model, logger):self.config = configself.model = modelself.logger = logger# 选择验证集合self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False)# self.stats_dict = {"correct": 0, "wrong": 0} # 用于存储测试结果def eval(self, epoch):self.logger.info("开始测试第%d轮模型效果:" % epoch)# 测试模式self.model.eval()self.logger.info("开始测试第%d轮模型效果:" % epoch)self.stats_dict = {"LOCATION": defaultdict(int),"TIME": defaultdict(int),"PERSON": defaultdict(int),"ORGANIZATION": defaultdict(int)}for index, batch_data in enumerate(self.dataset):# 取batch_size 句话sentences = self.dataset.dataset.sentence_list[index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]]if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]input_id, labels = batch_datawith torch.no_grad():pred_results = self.model(input_id) # 不输入labels,使用模型当前参数进行预测self.write_stats(labels, pred_results, sentences)self.show_stats()returndef write_stats(self, labels, pred_results, sentences):assert len(labels) == len(pred_results) == len(sentences)if not self.config['use_crf']:pred_results = torch.argmax(pred_results, dim=-1)for true_label, pred_label, sentence in zip(labels, pred_results, sentences):if not self.config["use_crf"]:pred_label = pred_label.cpu().detach().tolist()true_label = true_label.cpu().detach().tolist()true_entities = self.decode(sentence, true_label)pred_entities = self.decode(sentence, pred_label)# 正确率 = 识别出的正确实体数 / 识别出的实体数# 召回率 = 识别出的正确实体数 / 样本的实体数for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:self.stats_dict[key]["正确识别"] += len([ent for ent in pred_entities[key] if ent in true_entities[key]])self.stats_dict[key]["样本实体数"] += len(true_entities[key])self.stats_dict[key]["识别出实体数"] += len(pred_entities[key])returndef show_stats(self):F1_scores = []for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:# 正确率 = 识别出的正确实体数 / 识别出的实体数# 召回率 = 识别出的正确实体数 / 样本的实体数precision = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["识别出实体数"])recall = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["样本实体数"])F1 = (2 * precision * recall) / (precision + recall + 1e-5)F1_scores.append(F1)self.logger.info("%s类实体,准确率:%f, 召回率: %f, F1: %f" % (key, precision, recall, F1))self.logger.info("Macro-F1: %f" % np.mean(F1_scores))correct_pred = sum([self.stats_dict[key]["正确识别"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])total_pred = sum([self.stats_dict[key]["识别出实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])true_enti = sum([self.stats_dict[key]["样本实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])micro_precision = correct_pred / (total_pred + 1e-5)micro_recall = correct_pred / (true_enti + 1e-5)micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall + 1e-5)self.logger.info("Micro-F1 %f" % micro_f1)self.logger.info("--------------------")return# 相当于截取对应的句子def decode(self, sentence, labels):labels = "".join([str(x) for x in labels[:len(sentence)]])results = defaultdict(list)for location in re.finditer("(04+)", labels):s, e = location.span()results["LOCATION"].append(sentence[s:e])for location in re.finditer("(15+)", labels):s, e = location.span()results["ORGANIZATION"].append(sentence[s:e])for location in re.finditer("(26+)", labels):s, e = location.span()results["PERSON"].append(sentence[s:e])for location in re.finditer("(37+)", labels):s, e = location.span()results["TIME"].append(sentence[s:e])return results
model.py
import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from transformers import BertModel
from torchcrf import CRF"""
建立网络模型结构
"""class TorchModel(nn.Module):def __init__(self, config):super(TorchModel, self).__init__()hidden_size = config["hidden_size"]vocab_size = config["vocab_size"] + 1output_size = config["output_size"]self.model_type = config["model_type"]num_layers = config["num_layers"]# self.use_bert = config["use_bert"]self.use_crf = config["use_crf"]self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0)if self.model_type == 'rnn':self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers,batch_first=True)elif self.model_type == 'lstm':# 双向lstm,输出的是 hidden_size * 2(num_layers 要写2)self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True)hidden_size = hidden_size * 2elif self.model_type == 'bert':self.encoder = BertModel.from_pretrained(config["bert_model_path"])# 需要使用预训练模型的hidden_sizehidden_size = self.encoder.config.hidden_sizeelif self.model_type == 'cnn':self.encoder = CNN(config)elif self.model_type == "gated_cnn":self.encoder = GatedCNN(config)elif self.model_type == "bert_lstm":self.encoder = BertLSTM(config)# 需要使用预训练模型的hidden_sizehidden_size = self.encoder.config.hidden_sizeself.classify = nn.Linear(hidden_size, output_size)self.pooling_style = config["pooling_style"]self.crf_layer = CRF(output_size, batch_first=True)self.loss = nn.functional.cross_entropy # loss采用交叉熵损失def forward(self, x, y=None):if self.model_type == 'bert':# 输入x为[batch_size, seq_len]# bert返回的结果是 (sequence_output, pooler_output)# sequence_output:batch_size, max_len, hidden_size# pooler_output:batch_size, hidden_sizex = self.encoder(x)[0]else:x = self.emb(x)x = self.encoder(x)# 判断x是否是tupleif isinstance(x, tuple):x = x[0]# # 池化层# if self.pooling_style == "max":# # shape[1]代表列数,shape是行和列数构成的元组# self.pooling_style = nn.MaxPool1d(x.shape[1])# elif self.pooling_style == "avg":# self.pooling_style = nn.AvgPool1d(x.shape[1])# x = self.pooling_style(x.transpose(1, 2)).squeeze()y_pred = self.classify(x)if y is not None:# 是否使用crf:if self.use_crf:mask = y.gt(-1)return - self.crf_layer(y_pred, y, mask, reduction="mean")else:# (number, class_num), (number)return self.loss(y_pred.view(-1, y.shape[-1]), y.view(-1))else:if self.use_crf:return self.crf_layer.decode(y_pred)else:return y_pred# 优化器的选择
def choose_optimizer(config, model):optimizer = config["optimizer"]learning_rate = config["lr"]if optimizer == "adam":return Adam(model.parameters(), lr=learning_rate)elif optimizer == "sgd":return SGD(model.parameters(), lr=learning_rate)# 定义CNN模型
class CNN(nn.Module):def __init__(self, config):super(CNN, self).__init__()hidden_size = config["hidden_size"]kernel_size = config["kernel_size"]pad = int((kernel_size - 1) / 2)self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)def forward(self, x): # x : (batch_size, max_len, embeding_size)return self.cnn(x.transpose(1, 2)).transpose(1, 2)# 定义GatedCNN模型
class GatedCNN(nn.Module):def __init__(self, config):super(GatedCNN, self).__init__()self.cnn = CNN(config)self.gate = CNN(config)# 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积def forward(self, x):a = self.cnn(x)b = self.gate(x)b = torch.sigmoid(b)return torch.mul(a, b)# 定义BERT-LSTM模型
class BertLSTM(nn.Module):def __init__(self, config):super(BertLSTM, self).__init__()self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False)self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)def forward(self, x):x = self.bert(x)[0]x, _ = self.rnn(x)return x# if __name__ == "__main__":
# from config import Config
#
# Config["output_size"] = 2
# Config["vocab_size"] = 20
# Config["max_length"] = 5
# Config["model_type"] = "bert"
# Config["use_bert"] = True
# # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False)
# x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]])
# # sequence_output, pooler_output = model(x)
# # print(x[1], type(x[2]), len(x[2]))
#
# model = TorchModel(Config)
# label = torch.LongTensor([0,1])
# print(model(x, label))