说明
从应用的角度,对实体识别的全流程进行进一步的明确。从全流程的角度上看,需要对数据做一些规范,并允许在不同的阶段插进来进行修改和迭代。
内容
1 原始数据
假设这个阶段,通过较为简单的方式对数据做了标记
在初始阶段,我们获得原始数据。然后,可能通过tf idf之类简单的方法获得一些实体列表。
x = '小王喊小明吃饭,小王大声喊'
y = '小王,小明'
考虑到数据应该是以文档形式给到的,所以可以约定如下,必须具有4个字段:
无论是x还是y,必须遵守最基本的utf-8 + 半角 + 两端无空格的规范。
字段 | 解释 |
---|---|
doc_id | 文档id,一篇文档包含n个句子 |
ss_id | 句子排序id, sentence sort id, 整型,标题是0, 正文从1开始编号 |
md5 | md5 hash id ,用于标记内容 |
x | 句子文本内容,必须以强分隔符结尾(如果是标题类的,手动补上一个中文句号)。强制分隔符包含中文句号、中英文问号、中英文叹号、换行符 |
y | 实体列表,使用英文逗号连接 |
强分割函数
import re
def split_sentences_with_punctuation(text):# 定义句子分隔符punctuation = r'([。?!?!\n])'# 根据句子分隔符进行分割,并保留分隔符parts = re.split(punctuation, text)# 将分隔符与句子重新组合sentences = []for i in range(0, len(parts), 2):sentence = parts[i].strip()if i + 1 < len(parts):sentence += parts[i + 1]sentences.append(sentence)return sentences# 示例文本
text = "我喜欢编程。你呢?这是一个很有趣的项目!\n我也喜欢读书。"# 分割句子,并保留分隔符
sentences = split_sentences_with_punctuation(text)
print(sentences)
['我喜欢编程。', '你呢?', '这是一个很有趣的项目!', '\n', '我也喜欢读书。', '']# 确保数据总是可以有一个强分割符import redef ensure_period(sentence= None):"""如果句子不是以句号、问号、感叹号或者感叹问号结尾,则在结尾添加一个句号。参数:sentence (str): 待检查的句子。返回:str: 添加句号后的句子。"""# 使用正则表达式匹配句子末尾的标点符号if not re.search(r'[。?!?!]$', sentence):sentence += '。'return sentence# 示例用法
sentence1 = "这是一个例句"
sentence2 = "这是一个例句!"
sentence3 = "这是一个例句?"
sentence4 = "这是一个例句."
sentence5 = "这是一个例句?"
sentence6 = "这是一个例句!"
sentence7 = "这是一个例句!?"
sentence8 = "这是一个例句?!"print(ensure_period(sentence1)) # 输出: "这是一个例句."
print(ensure_period(sentence2)) # 输出: "这是一个例句!"
print(ensure_period(sentence3)) # 输出: "这是一个例句?"
print(ensure_period(sentence4)) # 输出: "这是一个例句."
print(ensure_period(sentence5)) # 输出: "这是一个例句?"
print(ensure_period(sentence6)) # 输出: "这是一个例句!"
print(ensure_period(sentence7)) # 输出: "这是一个例句!?"
print(ensure_period(sentence8)) # 输出: "这是一个例句?!"
2 处理流程
基本的规范是utf8字符集和半角字符集。
# 标准处理函数
import redef extract_utf8_chars(input_string = None):# 定义一个正则表达式,用于匹配所有的UTF-8字符utf8_pattern = re.compile(r'[\u0000-\U0010FFFF]')# 使用findall方法找到所有匹配的字符utf8_chars = utf8_pattern.findall(input_string)return ''.join(utf8_chars)def toDBC(some_char):tem_str_ord = ord(some_char)res = None if tem_str_ord >65280 and tem_str_ord < 65375:res =tem_str_ord - 65248# 12288全角空格,160  空格if tem_str_ord in [12288,160]:res = 32res_var_ord = res or tem_str_ordreturn chr(res_var_ord)
def tranform_half_widh(some_str = None):res_list = []return ''.join([toDBC(x) for x in some_str])
使用pydantic进行规范转化
# 强分割断句,确保末尾的强分隔符。
from typing import List, Optional
from pydantic import BaseModel,FieldValidationInfo, field_validatorclass Item(BaseModel):doc_id:strcontent:str# 验证器:确保 content 以强分隔符结尾@field_validator('content',mode='before')def ensure_utf8_and_halfwidth(cls, v):v = tranform_half_widh(extract_utf8_chars(v)) # 转换为半角字符return v# 给到一个document,将之分割为句子
class DocumentSplit(BaseModel):input_data_listofdict: List[Item] = [{'doc_id':'1', 'content':'这是第一篇文章。'}, {'doc_id':'2', 'content':'这是第二篇文章。'}]
先进行初始化
ds = DocumentSplit(input_data_listofdict = [{'doc_id':'1','content':'这是第一篇文章。这是第一篇文章'}, {'doc_id':'2', 'content':'这是第二篇文章。这是第二篇文章'}])
开始构造句子
from Basefuncs import * tem_df = pd.DataFrame([x.dict() for x in ds.input_data_listofdict])
tem_df['sentences'] = tem_df['content'].apply(split_sentences_with_punctuation)# 构造句子
def make_sentences(some_dict = None):doc_id = some_dict['doc_id']sentences = some_dict['sentences']res_list = []for i, v in enumerate(sentences):tem_dict = {}tem_dict['doc_id'] = doc_idtem_dict['s_ord'] = i+1tem_dict['sentence'] = ensure_period(v)res_list.append(tem_dict)return res_list_s = cols2s(tem_df, cols = ['doc_id', 'sentences'], cols_key_mapping=['doc_id', 'sentences'])
_s1 = _s.apply(make_sentences)
tem_df1 = pd.DataFrame( flatten_list(_s1.to_list() ))doc_id s_ord sentence
0 1 1 这是第一篇文章。
1 1 2 这是第一篇文章。
2 2 1 这是第二篇文章。
3 2 2 这是第二篇文章。
以上处理完成了将原始数据转为标准格式的数据,每个句子将作为一个独立的个体。对于训练数据而言,可以无视句子在文章中的顺序;而作为预测来说,结果可以根据原文的顺序拼接起来。
3 创建环境
租用一个显卡主机,启动jupyter,然后将对应的模型考过去。
- 1 拷贝老的模型,对无标签数据进行初级打标
- 2 拷贝新的模型,根据数据进行下一步训练。
4 训练
4.1 数据预打标(prelabel_data.ipynb)
这个过程是通过各种方法收集到的实体打标数据,通常是打标尚具有缺陷的数据。总体上可以认为 80%正确的标签数据。
获取原始未达标数据,这部分数据应该已经存在于数据库中 clickhouse
from Basefuncs import *
import pandas as pd
import requests as req # 获取全部数据
host = 'xxx'
port = 19000
database = 'my_database'
user = 'xxx'
password = 'xxx'
name = 'tem'
chc = CHClient(host = host, port = port , database = database, user = user, password = password, name = name )
the_sql = 'show tables'
chc._exe_sql(the_sql)# 直接获取全部
query_data = chc.get_table('news_wz_retrain_ner')
# 整合数据
df = pd.DataFrame(query_data, columns = ['mid','title','content','task_id','task_rand'])
mid title content task_id task_rand
0 00007bfd99a62722a2ebc0bedef3c398 河南济源示范区:依托企业招才引智博聚兴产 7月14日,河南博士后科技服务团济源行活动启动,来自全省高等院校、科研院所的21名博士后、专... 42719 208
1 000193b97b50083acb76de6128094916 Omicron有何新变化?将如何影响市场? 辉瑞和德国拜恩泰科(BioNTech)公司也发布声明称“三剂疫苗可以将抗体水平提升25倍”;... 85212 746
数据规范化处理
对应的处理函数,确保了基本字符集以及断句,分隔符的规范。
import redef extract_utf8_chars(input_string = None):# 定义一个正则表达式,用于匹配所有的UTF-8字符utf8_pattern = re.compile(r'[\u0000-\U0010FFFF]')# 使用findall方法找到所有匹配的字符utf8_chars = utf8_pattern.findall(input_string)return ''.join(utf8_chars)def toDBC(some_char):tem_str_ord = ord(some_char)res = None if tem_str_ord >65280 and tem_str_ord < 65375:res =tem_str_ord - 65248# 12288全角空格,160  空格if tem_str_ord in [12288,160]:res = 32res_var_ord = res or tem_str_ordreturn chr(res_var_ord)
def tranform_half_widh(some_str = None):res_list = []return ''.join([toDBC(x) for x in some_str])# 强分割
import re
def split_sentences_with_punctuation(text):# 定义句子分隔符punctuation = r'([。?!?!\n])'# 根据句子分隔符进行分割,并保留分隔符parts = re.split(punctuation, text)# 将分隔符与句子重新组合sentences = []for i in range(0, len(parts), 2):sentence = parts[i].strip()if i + 1 < len(parts):sentence += parts[i + 1]sentences.append(sentence)return sentencesimport redef ensure_period(sentence= None):"""如果句子不是以句号、问号、感叹号或者感叹问号结尾,则在结尾添加一个句号。参数:sentence (str): 待检查的句子。返回:str: 添加句号后的句子。"""# 使用正则表达式匹配句子末尾的标点符号if not re.search(r'[。?!?!]$', sentence):sentence += '。'return sentence# 强分割断句,确保末尾的强分隔符。
from typing import List, Optional
from pydantic import BaseModel,FieldValidationInfo, field_validatorclass Item(BaseModel):doc_id:strcontent:str# 验证器:确保 content 以强分隔符结尾@field_validator('content',mode='before')def ensure_utf8_and_halfwidth(cls, v):v = tranform_half_widh(extract_utf8_chars(v)) # 转换为半角字符return v# 给到一个document,将之分割为句子
class DocumentSplit(BaseModel):input_data_listofdict: List[Item] = [{'doc_id':'1', 'content':'这是第一篇文章。'}, {'doc_id':'2', 'content':'这是第二篇文章。'}]ds = DocumentSplit(input_data_listofdict = [{'doc_id':'1','content':'这是第一篇文章。这是第一篇文章'}, {'doc_id':'2', 'content':'这是第二篇文章。这是第二篇文章'}])
标题类数据准备
对于一篇文档来说,可能存在一个类似标题的数据。这类数据是高度概括的,从文字风格上,可能与正文不同;从作用上也不同,标题,也可能是摘要,目前是对文档的信息进行高度提炼。
所以在实体识别任务中,这些是需要区别对待的。(对于title数据,只要保证强分隔符即可)
通过DocumentSplit, 数据进行字符集的检查和转换。pydantic的处理效率接近pandas的apply方法,是可以量产的。
title_s = cols2s(df, cols=['mid','title'], cols_key_mapping=['doc_id', 'content'])
ds = DocumentSplit(input_data_listofdict = title_s.to_list())
the_data = pd.DataFrame([x.dict() for x in ds.input_data_listofdict])
doc_id content
0 00007bfd99a62722a2ebc0bedef3c398 河南济源示范区:依托企业招才引智博聚兴产
1 000193b97b50083acb76de6128094916 Omicron有何新变化?将如何影响市场?
句子序号和分隔符规范化。每篇文档,0的序号预留给标题。
the_data.columns = ['doc_id', 'sentence']
the_data['s_ord'] = 0
the_data['sentence'] = the_data['sentence'].apply(ensure_period)
part1_df = the_data.copy()
part1_df就是标题部分的预处理结果。
正文类数据准备
对于content数据,需要转为短句列表。在识别实体时,一方面实体一定不会包含强分隔符,所以业务上可分;另一方面,服务在批量处理时是通过GPU并行计算的,那么限定每个向量的长度是必须的,这是技术上的要求。
将句子按强分隔符分割
def make_sentences(some_dict = None):doc_id = some_dict['doc_id']sentences = some_dict['sentences']res_list = []for i, v in enumerate(sentences):tem_dict = {}tem_dict['doc_id'] = doc_idtem_dict['s_ord'] = i+1tem_dict['sentence'] = ensure_period(v)res_list.append(tem_dict)return res_listcontent_s = cols2s(df, cols=['mid','content'], cols_key_mapping=['doc_id', 'content'])
ds = DocumentSplit(input_data_listofdict = content_s.to_list())the_data1 = pd.DataFrame([x.dict() for x in ds.input_data_listofdict])
the_data1['sentences'] = the_data1['content'].apply(split_sentences_with_punctuation)_s = cols2s(the_data1, cols = ['doc_id', 'sentences'], cols_key_mapping=['doc_id', 'sentences'])
_s1 = _s.apply(make_sentences)
the_data2 = pd.DataFrame( flatten_list(_s1.to_list() ))
part2_df = the_data2.copy()
数据合并
ucs编号:通过将接口服务再封装为对象操作
article_df = pd.concat([part1_df,part2_df], ignore_index=True)
from typing import List, Optional
from pydantic import BaseModelimport requests as req
class UCS(BaseModel):gfgo_lite_server: str = 'http://172.17.0.1:24090/'def get_brick_name(self, some_id = None):some_dict = {}some_dict['rec_id'] = some_idurl = self.gfgo_lite_server + 'get_brick_name/'res = req.post(url, json = some_dict).json()return res def get_brick_name_s(self, some_id_list = None):some_dict = {}some_dict['rec_id_list'] = some_id_listurl = self.gfgo_lite_server + 'get_brick_name_s/'res = req.post(url, json = some_dict).json()return res ucs = UCS()
article_df['pid'] = list(range(len(article_df)))
pid_list2 = slice_list_by_batch2(list(article_df['pid']),10000)import tqdm
res_list = []
for some_pid_list in tqdm.tqdm(pid_list2):block_name_list = ucs.get_brick_name_s(some_pid_list)res_list.append(block_name_list)
article_df['brick'] = flatten_list(res_list)doc_id sentence s_ord pid brick
0 00007bfd99a62722a2ebc0bedef3c398 河南济源示范区:依托企业招才引智博聚兴产。 0 0 0.0.0.0
1 000193b97b50083acb76de6128094916 Omicron有何新变化?将如何影响市场? 0 1 0.0.0.0
分块存储并处理(打标)
brick_list = sorted(list(article_df['brick'].unique()))
!mkdir left
!mkdir right
worker.py
import sys# brick
arg1 = sys.argv[1]print('arg1', arg1)port_list = [10000,10001,10002]import random
from Basefuncs import *
the_port = random.choice(port_list)the_brick = arg1
the_data = from_pickle(the_brick, './left/')the_data['sentence_clean'] = the_data['sentence'].apply(lambda x: x[:198])
batch_list1 = cols2s(the_data, cols=['pid', 'sentence_clean'], cols_key_mapping= ['rec_id', 'data'])
batch_list2 = slice_list_by_batch2(batch_list1.to_list(), 500)import requests as reqresp_df_list = []
for some_batch in batch_list2:the_url = 'http://172.17.0.1:%s/parse_ent/' % the_portpara_dict = {}para_dict['task_for'] = 'test.test.ent'para_dict['data_listofdict'] = some_batchresp = req.post(the_url,json = para_dict).json()resp_df = pd.DataFrame(resp['data'])resp_df_list.append(resp_df)import pandas as pd
mdf = pd.concat(resp_df_list, ignore_index=True)mdf['pid'] = mdf['rec_id']
mdf2 = pd.merge(the_data, mdf[['pid', 'ORG']], how='left', on ='pid')
to_pickle(mdf2, the_brick, './right/')
player.py
import sys# brick
arg1 = sys.argv[1]print('arg1', arg1)the_mod_num = int(arg1)from Basefuncs import * left_file = list_file_names_without_extension('./left/')
right_file = list_file_names_without_extension('./right/')gap_files = list(left_file - right_file)
gap_files1 = [x for x in gap_files if int(x[-1]) % 3 == the_mod_num]import os
for some_brick in gap_files1:os.system('python3 worker.py %s' % some_brick)
执行player,对3个服务均匀发起请求
python3 player.py 0 &
python3 player.py 1 &
python3 player.py 2 &
收集结果并存库
left_file = list_file_names_without_extension('./left/')
right_file = list_file_names_without_extension('./right/')
gap_files = list(left_file - right_file)
如果gap_files为空列表,那么表示处理完毕
读取所有文件合并
right_file_list = right_file
right_df_list = []
for some_right in right_file_list:tem_df = from_pickle(some_right, './right/')right_df_list.append(tem_df)
right_df = pd.concat(right_df_list, ignore_index=True)
right_sel = (right_df['ORG'] != ',') &(right_df['ORG'].notnull())
right_df1 = right_df[right_sel]
建表
# 获取全部数据
host = 'xxx'
port = 19000
database = 'xxx'
user = 'xxx'
password = 'xxx'
name = 'xxx'
chc = CHClient(host = host, port = port , database = database, user = user, password = password, name = name )
the_sql = 'show tables'
chc._exe_sql(the_sql)# chc.del_table('train_ner_news_title_content_org_20240529')
create_table_sql = '''
CREATE TABLE train_ner_news_title_content_org_20240529
(doc_id String,s_ord Int,pid Int,brick String,sentence_clean String,ORG String
)
ENGINE = MergeTree
partition by (brick)
PRIMARY KEY (pid)
'''
chc._exe_sql(create_table_sql)for slice_tuple in slice_list_by_batch1(0, len(right_df1), 100000):print(slice_tuple)_tem_df = right_df1.iloc[slice_tuple[0]:slice_tuple[1]]chc.insert_df2table(table_name = 'train_ner_news_title_content_org_20240529' , some_df = _tem_df[['doc_id','s_ord', 'pid','brick','sentence_clean', 'ORG']], pid_name = 'pid', cols =['doc_id','s_ord', 'pid','brick','sentence_clean', 'ORG'])
到这里算是完成了规范化的一部分,太长了,还得继续开新的文章写。