NLP(15)-序列标注任务

前言

仅记录学习过程,有问题欢迎讨论

什么时候应该使用Pooling层:

  • 如果针对每个字做标注,无需;若是针对整句话做分类,则需要pooling

NER(数据标注):B/M/E (A/O/P) --左/中/右 边界(地址/机构/人名) O–无关字

CRF -转移矩阵: 如果一个字已经是B_location 那么它大概率是E_location 或者 M_location 和其他的BEM 基本无关

  • shape为 label * label

发射矩阵:shape为 sen_len * tag_size ;相当于输出每个字对应tag的概率矩阵

采用bert:

PERSON类实体,准确率:0.480000, 召回率: 0.208092, F1: 0.290318
LOCATION类实体,准确率:0.624113, 召回率: 0.458333, F1: 0.528524
TIME类实体,准确率:0.739130, 召回率: 0.625767, F1: 0.677736
ORGANIZATION类实体,准确率:0.500000, 召回率: 0.282353, F1: 0.360898
Macro-F1: 0.464369
Micro-F1 0.492606

采用LSTM:

PERSON类实体,准确率:0.432000, 召回率: 0.312139, F1: 0.362411
LOCATION类实体,准确率:0.512987, 召回率: 0.411458, F1: 0.456642
TIME类实体,准确率:0.721804, 召回率: 0.588957, F1: 0.648644
ORGANIZATION类实体,准确率:0.450000, 召回率: 0.423529, F1: 0.436359
Macro-F1: 0.476014
Micro-F1 0.479633
`

代码

实现一个NER代码,划分每一句话的B/M/E/O

config.py

"""
配置参数信息
"""
Config = {"model_path": "./output/","model_name": "model.pt","schema_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\schema.json","train_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\train.txt","valid_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\test.txt","vocab_path": r"E:\Anlp\week9 序列标注问题\ner\chars.txt","model_type": "bert",# 数据标注中计算loss"use_crf": True,# 文本向量大小"char_dim": 20,# 文本长度"max_len": 50,# 词向量大小"hidden_size": 64,# 训练 轮数"epoch_size": 15,# 批量大小"batch_size": 25,# 训练集大小"simple_size": 300,# 学习率"lr": 0.001,# dropout"dropout": 0.5,# 优化器"optimizer": "adam",# 卷积核"kernel_size": 3,# 最大池 or 平均池"pooling_style": "max",# 模型层数"num_layers": 2,"bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese",# 输出层大小"output_size": 9,# 随机数种子"seed": 987
}

load.py j加载数据文件

"""
数据加载
"""
"""
数据加载
"""
import os
import numpy as np
import json
import re
import os
import torch
import torch.utils.data as Data
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer# 获取字表集
def load_vocab(path):vocab = {}with open(path, 'r', encoding='utf-8') as f:for index, line in enumerate(f):word = line.strip()# 0留给padding位置,所以从1开始vocab[word] = index + 1vocab['unk'] = len(vocab) + 1return vocabclass DataGenerator:def __init__(self, data_path, config):self.data_path = data_pathself.config = configself.schema = self.load_schema(config["schema_path"])if self.config["model_type"] == "bert":self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])self.vocab = load_vocab(config["vocab_path"])self.config["vocab_size"] = len(self.vocab)# 中文的语句listself.sentence_list = []self.data = self.load_data()def __len__(self):return len(self.data)def __getitem__(self, idx):return self.data[idx]def load_schema(self, path):with open(path, encoding="utf8") as f:return json.load(f)def load_data(self):dataset_x = []dataset_y = []with open(self.data_path, 'r', encoding='utf-8') as f:# 每句话segments = f.read().split("\n\n")# 每句话字符 如: 你 0for segment in segments:sentences = []labels = []for line in segment.split("\n"):if line.strip() == "":continuechar, label = line.split()sentences.append(char)labels.append(self.schema[label])self.sentence_list.append(' '.join(sentences))input_id = self.sentence_to_index(sentences)# labels 也需要padding相同长度labels = self.padding(labels)# 标签和文本组成一个样本dataset_x.append(input_id)dataset_y.append(labels)data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y))return data# 文本预处理# 转化为向量def sentence_to_index(self, text):input_ids = []vocab = self.vocabfor char in text:input_ids.append(vocab.get(char, vocab['unk']))# 填充or裁剪input_ids = self.padding(input_ids)return input_ids# 数据预处理 裁剪or填充def padding(self, input_ids):length = self.config["max_len"]if len(input_ids) >= length:return input_ids[:length]else:padded_input_ids = input_ids + [0] * (length - len(input_ids))return padded_input_ids# 用torch自带的DataLoader类封装数据
def load_data_batch(data_path, config, shuffle=True):dg = DataGenerator(data_path, config)# DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用)dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)return dlif __name__ == '__main__':from config import Configdg = DataGenerator(Config["train_data_path"], Config)print(len(dg))print(dg[0])

main.py 主方法

import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import TorchModel, choose_optimizer
from loader import load_data_batch
from evaluate import Evaluator# [DEBUG, INFO, WARNING, ERROR, CRITICAL]logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)"""
模型训练主程序
"""
# 通过设置随机种子来复现上一次的结果(避免随机性)
seed = Config["seed"]
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)def main(config):# 保存模型的目录if not os.path.isdir(config["model_path"]):os.mkdir(config["model_path"])# 加载数据dataset = load_data_batch(config["train_data_path"], config)# 加载模型model = TorchModel(config)# 是否使用gpuif torch.cuda.is_available():logger.info("gpu可以使用,迁移模型至gpu")model.cuda()# 选择优化器optim = choose_optimizer(config, model)# 加载效果测试类evaluator = Evaluator(config, model, logger)for epoch in range(config["epoch_size"]):epoch += 1logger.info("epoch %d begin" % epoch)epoch_loss = []# 训练模型model.train()for index, batch_data in enumerate(dataset):if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]# x, y = dataiter# 反向传播optim.zero_grad()x, y = batch_data     # 输入变化时这里需要修改,比如多输入,多输出的情况# 计算梯度loss = model(x, y)# 梯度更新loss.backward()# 优化器更新模型optim.step()# 记录损失epoch_loss.append(loss.item())logger.info("epoch average loss: %f" % np.mean(epoch_loss))# 测试模型效果acc = evaluator.eval(epoch)# 可以用model_type model_path epoch 三个参数来保存模型# model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"]))# torch.save(model.state_dict(), model_path)  # 保存模型权重returnif __name__ == "__main__":main(Config)# for model in ["cnn"]:#     Config["model_type"] = model#     print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"])# 对比所有模型# 中间日志可以关掉,避免输出过多信息# 超参数的网格搜索# for model in ["gated_cnn"]:#     Config["model_type"] = model#     for lr in [1e-3, 1e-4]:#         Config["learning_rate"] = lr#         for hidden_size in [128]:#             Config["hidden_size"] = hidden_size#             for batch_size in [64, 128]:#                 Config["batch_size"] = batch_size#                 for pooling_style in ["avg"]:#                     Config["pooling_style"] = pooling_style# 可以把输出放入文件中 便于查看#                     print("最后一轮准确率:", main(Config), "当前配置:", Config)

evaluate.py 评估模型文件

"""
模型效果测试
"""
import re
from collections import defaultdictimport numpy as np
import torch
from loader import load_data_batchclass Evaluator:def __init__(self, config, model, logger):self.config = configself.model = modelself.logger = logger# 选择验证集合self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False)# self.stats_dict = {"correct": 0, "wrong": 0}  # 用于存储测试结果def eval(self, epoch):self.logger.info("开始测试第%d轮模型效果:" % epoch)# 测试模式self.model.eval()self.logger.info("开始测试第%d轮模型效果:" % epoch)self.stats_dict = {"LOCATION": defaultdict(int),"TIME": defaultdict(int),"PERSON": defaultdict(int),"ORGANIZATION": defaultdict(int)}for index, batch_data in enumerate(self.dataset):# 取batch_size 句话sentences = self.dataset.dataset.sentence_list[index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]]if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]input_id, labels = batch_datawith torch.no_grad():pred_results = self.model(input_id)  # 不输入labels,使用模型当前参数进行预测self.write_stats(labels, pred_results, sentences)self.show_stats()returndef write_stats(self, labels, pred_results, sentences):assert len(labels) == len(pred_results) == len(sentences)if not self.config['use_crf']:pred_results = torch.argmax(pred_results, dim=-1)for true_label, pred_label, sentence in zip(labels, pred_results, sentences):if not self.config["use_crf"]:pred_label = pred_label.cpu().detach().tolist()true_label = true_label.cpu().detach().tolist()true_entities = self.decode(sentence, true_label)pred_entities = self.decode(sentence, pred_label)# 正确率 = 识别出的正确实体数 / 识别出的实体数# 召回率 = 识别出的正确实体数 / 样本的实体数for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:self.stats_dict[key]["正确识别"] += len([ent for ent in pred_entities[key] if ent in true_entities[key]])self.stats_dict[key]["样本实体数"] += len(true_entities[key])self.stats_dict[key]["识别出实体数"] += len(pred_entities[key])returndef show_stats(self):F1_scores = []for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:# 正确率 = 识别出的正确实体数 / 识别出的实体数# 召回率 = 识别出的正确实体数 / 样本的实体数precision = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["识别出实体数"])recall = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["样本实体数"])F1 = (2 * precision * recall) / (precision + recall + 1e-5)F1_scores.append(F1)self.logger.info("%s类实体,准确率:%f, 召回率: %f, F1: %f" % (key, precision, recall, F1))self.logger.info("Macro-F1: %f" % np.mean(F1_scores))correct_pred = sum([self.stats_dict[key]["正确识别"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])total_pred = sum([self.stats_dict[key]["识别出实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])true_enti = sum([self.stats_dict[key]["样本实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])micro_precision = correct_pred / (total_pred + 1e-5)micro_recall = correct_pred / (true_enti + 1e-5)micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall + 1e-5)self.logger.info("Micro-F1 %f" % micro_f1)self.logger.info("--------------------")return# 相当于截取对应的句子def decode(self, sentence, labels):labels = "".join([str(x) for x in labels[:len(sentence)]])results = defaultdict(list)for location in re.finditer("(04+)", labels):s, e = location.span()results["LOCATION"].append(sentence[s:e])for location in re.finditer("(15+)", labels):s, e = location.span()results["ORGANIZATION"].append(sentence[s:e])for location in re.finditer("(26+)", labels):s, e = location.span()results["PERSON"].append(sentence[s:e])for location in re.finditer("(37+)", labels):s, e = location.span()results["TIME"].append(sentence[s:e])return results

model.py

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from transformers import BertModel
from torchcrf import CRF"""
建立网络模型结构
"""class TorchModel(nn.Module):def __init__(self, config):super(TorchModel, self).__init__()hidden_size = config["hidden_size"]vocab_size = config["vocab_size"] + 1output_size = config["output_size"]self.model_type = config["model_type"]num_layers = config["num_layers"]# self.use_bert = config["use_bert"]self.use_crf = config["use_crf"]self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0)if self.model_type == 'rnn':self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers,batch_first=True)elif self.model_type == 'lstm':# 双向lstm,输出的是 hidden_size * 2(num_layers 要写2)self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True)hidden_size = hidden_size * 2elif self.model_type == 'bert':self.encoder = BertModel.from_pretrained(config["bert_model_path"])# 需要使用预训练模型的hidden_sizehidden_size = self.encoder.config.hidden_sizeelif self.model_type == 'cnn':self.encoder = CNN(config)elif self.model_type == "gated_cnn":self.encoder = GatedCNN(config)elif self.model_type == "bert_lstm":self.encoder = BertLSTM(config)# 需要使用预训练模型的hidden_sizehidden_size = self.encoder.config.hidden_sizeself.classify = nn.Linear(hidden_size, output_size)self.pooling_style = config["pooling_style"]self.crf_layer = CRF(output_size, batch_first=True)self.loss = nn.functional.cross_entropy  # loss采用交叉熵损失def forward(self, x, y=None):if self.model_type == 'bert':# 输入x为[batch_size, seq_len]# bert返回的结果是 (sequence_output, pooler_output)# sequence_output:batch_size, max_len, hidden_size# pooler_output:batch_size, hidden_sizex = self.encoder(x)[0]else:x = self.emb(x)x = self.encoder(x)# 判断x是否是tupleif isinstance(x, tuple):x = x[0]# # 池化层# if self.pooling_style == "max":#     # shape[1]代表列数,shape是行和列数构成的元组#     self.pooling_style = nn.MaxPool1d(x.shape[1])# elif self.pooling_style == "avg":#     self.pooling_style = nn.AvgPool1d(x.shape[1])# x = self.pooling_style(x.transpose(1, 2)).squeeze()y_pred = self.classify(x)if y is not None:# 是否使用crf:if self.use_crf:mask = y.gt(-1)return - self.crf_layer(y_pred, y, mask, reduction="mean")else:# (number, class_num), (number)return self.loss(y_pred.view(-1, y.shape[-1]), y.view(-1))else:if self.use_crf:return self.crf_layer.decode(y_pred)else:return y_pred# 优化器的选择
def choose_optimizer(config, model):optimizer = config["optimizer"]learning_rate = config["lr"]if optimizer == "adam":return Adam(model.parameters(), lr=learning_rate)elif optimizer == "sgd":return SGD(model.parameters(), lr=learning_rate)# 定义CNN模型
class CNN(nn.Module):def __init__(self, config):super(CNN, self).__init__()hidden_size = config["hidden_size"]kernel_size = config["kernel_size"]pad = int((kernel_size - 1) / 2)self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)def forward(self, x):  # x : (batch_size, max_len, embeding_size)return self.cnn(x.transpose(1, 2)).transpose(1, 2)# 定义GatedCNN模型
class GatedCNN(nn.Module):def __init__(self, config):super(GatedCNN, self).__init__()self.cnn = CNN(config)self.gate = CNN(config)# 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积def forward(self, x):a = self.cnn(x)b = self.gate(x)b = torch.sigmoid(b)return torch.mul(a, b)# 定义BERT-LSTM模型
class BertLSTM(nn.Module):def __init__(self, config):super(BertLSTM, self).__init__()self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False)self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)def forward(self, x):x = self.bert(x)[0]x, _ = self.rnn(x)return x# if __name__ == "__main__":
#     from config import Config
#
#     Config["output_size"] = 2
#     Config["vocab_size"] = 20
#     Config["max_length"] = 5
#     Config["model_type"] = "bert"
#     Config["use_bert"] = True
#     # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False)
#     x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]])
#     # sequence_output, pooler_output = model(x)
#     # print(x[1], type(x[2]), len(x[2]))
#
#     model = TorchModel(Config)
#     label = torch.LongTensor([0,1])
#     print(model(x, label))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/12361.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript中带日期的操作

当我们把日期转换为Number类型的时候,就会变成时间戳(毫秒) const future new Date(2037, 10, 19, 15, 23); console.log(Number(future)); // console.log(future); //与上行代码等效● 所以我们就可以利用时间戳去做点东西,例…

GPT-4o:融合文本、音频和图像的全方位人机交互体验

引言: GPT-4o(“o”代表“omni”)的问世标志着人机交互领域的一次重要突破。它不仅接受文本、音频和图像的任意组合作为输入,还能生成文本、音频和图像输出的任意组合。这一全新的模型不仅在响应速度上达到了惊人的水平&#xff0…

Halcon 初步了解

1.Halcon 概述 Halcon是德国MVTec公司开发的一套完善的机器视觉算法包,也是一款功能强大的视觉处理软件,为工业自动化领域提供了全面的解决方案。它拥有应用广泛的机器视觉集成开发环境,提供了一套丰富的图像处理和机器视觉算法,…

qt cmake加入程序exe图标

可以看到qt自动编译出来的图标是默认的,如下图所示 我想要更改成自定义的图标,比如下方的样子 下边是操作步骤: 图标选择与转化成ico 通过这个网站将正常图片转化成ico:https://www.bitbug.net/创建rc文件 将ico复制到cmakelis…

短视频拍摄+直播间搭建视觉艺术实战课:手把手场景演绎 从0-1短视频-8节课

抖音短视频和直播间你是否遇到这些问题? 短视频是用手机拍还是相机拍?画面怎么拍都没有质感 短视频产量低,拍的素材可用率低 看到别人用手机就能把短视频拍好自己却无从下手 明明已经打了好几盏灯了,但是画面还是比较暗 直播软件参数不会设置,电脑…

纯电动汽车的发展趋势简述

纯电车简介 纯电动汽车是使用电池驱动电动马达而不是传统的内燃机的汽车。它们通常使用电池组储存能量,然后通过电动马达转化为动力来驱动车辆。相比于传统的燃油车,纯电动汽车具有零排放、低噪音、低维护成本等优点,因此在环保和能源效率方…

数据新探:用Python挖掘互联网的隐藏宝藏

Hello,我是你们的阿佑,今天给大家上的菜是——数据存储!听起来枯燥无味?错了!阿佑将带你重新认识数据存储的艺术。就像为珍贵的艺术品寻找完美的展览馆,为你的数据选择合适的存储方式同样重要! …

版本控制:软件开发的基石(一文读懂版本控制)

未经允许,禁止转载! 在现代软件开发中,版本控制是不可或缺的工具。它帮助开发者跟踪和管理代码的变化,协作完成项目,并确保代码的完整性和安全性。本文将基于Git官网的视频“什么是版本控制”来深入探讨版本控制的基本…

Linux实验 vi编辑器的使用与磁盘管理

实验目的: 掌握vi编辑器的启动、保存和退出;掌握vi编辑器的三种工作模式的转换及输入模式下的操作;了解Linux文件系统类型、虚拟文件系统和存储设备的名称;掌握磁盘文件系统的挂载和卸载;掌握常用磁盘操作命令&#x…

C. Sort Zero

题目描述 思路分析: 记住他是要保证这个数列是不降的,也就是说如果某一个位置上的数变成了0,那么这个数前面的部分一定全都是0了,我们用map数组得到每一个数出现的最晚的位置,先从后向前遍历一遍,找出从哪开始出现了递减,然后标记下来结束即可,看看前面的每一个数都要用st标记是…

微信小程序有哪些优势

哈喽,大家好呀,淼淼又来和大家见面啦,如今在移动互联网飞速发展的时代背景下,微信小程序正在逐步成为人们生活中不可或缺的一部分。微信小程序有独特的优势和创新特点,为开发者和用户带来了前所未有的便利和体验。这一…

【Python快速上手(十九)】

目录 Python快速上手(十九)Python3 正则表达式1. 导入re模块2. 基本匹配3. 搜索4. 替换5. 匹配组6. 修饰符7. 特殊字符8. 贪婪与非贪婪匹配9. 自定义字符集10. 转义字符11.正则表达式实例 Python快速上手(十九) Python3 正则表达…

关于nvm管理node版本的一些问题

背景: 基于开发项目的迭代不能做到全部更新,有的项目是vue2.0 有的项目是vue3.0, 那么我们开发的时候就需要对node 进行更新,进而产生因为版本不同导致的错误:由此我们需要一款管理 切换node版本的东西,那就…

JavaScript 算法题目思考

1. 二叉搜索树是什么 二叉搜索树一种特殊的二叉树数据结构,又称二叉查找树或二叉排序树,是一种特殊的二叉树数据结构。 在二叉搜索树中,左子树上的所有节点的值都小于根节点的值,右子树上的所有节点的值都大于根节点的值&#xff…

C语言 8 函数递归

目录 1. 递归是什么? 2.递归的限制条件 3. 递归举例1 4. 递归举例2 5.迭代 6. 递归举例3 拓展学习: 1. 递归是什么? 递归是学习C语⾔函数绕不开的⼀个话题,那什么是递归呢? 递归其实是⼀种解决问题的⽅法&#xff0c…

0508GoodsContent的Maven项目

0508GoodsContent的Maven项目包-CSDN博客 数据库字段 页面需求

【代码随想录算法训练营第37期 第七天 | LeetCode454.四数相加II、383. 赎金信、15. 三数之和、18. 四数之和】

代码随想录算法训练营第37期 第七天 | LeetCode454.四数相加II、383. 赎金信、15. 三数之和、18. 四数之和 一、454.四数相加II 解题代码C&#xff1a; class Solution { public:int fourSumCount(vector<int>& nums1, vector<int>& nums2, vector<in…

ssl证书申请配置要怎么弄?

SSL证书是一种公钥证书&#xff0c;用于保护网站的数据传输过程&#xff0c;确保网站数据的安全性。在网站上使用SSL证书可以有效地防止黑客攻击、窃取用户信息等安全问题。下面将详细介绍SSL证书的申请和配置。 一、SSL证书的申请 1. 选择证书品牌和类型 目前市场上有很多S…

手撸XXL-JOB(四)——远程调用定时任务

Java Socket网络编程 网络编程是Java编程中的重要组成部分&#xff0c;包括服务端和客户端两部分内容。Socket是Java网络编程的基本组件之一&#xff0c;用于在应用程序之间提供双向通信&#xff0c;Socket提供了一种标准的接口&#xff0c;允许应用程序通过网络发送和接收数据…

【递归、回溯和剪枝】综合训练<二>

1.组合总和 组合总和 解法一&#xff1a; class Solution { public:vector<vector<int>> ret;vector<int> path;int aim;vector<vector<int>> combinationSum(vector<int>& nums, int target) {aim target;dfs(nums, 0, 0);return …