论文地址:https://arxiv.org/abs/2105.07148
代码地址:https://github.com/liuwei1206/LEBERT
模型创新
- LEBRT采用句子中的词语对(论文中称为Char-Word Pair)的特征作为输入
- 作者设计Lexicon adapter,在BERT的中间某一层注入词汇特征
-
左图是在BERT之后的架构上面引入词汇信息
-
右图是在BERT底层时引入词汇信息
Char-Word Pair
首先,对于给定的中文句子
s c = { c 1 , c 2 , . . . , c n } c i 代表句子中的第 i 个字符 s_c = \{c_1,c_2,...,c_n\}\quad c_i代表句子中的第i个字符 sc={c1,c2,...,cn}ci代表句子中的第i个字符
利用词典D匹配出句子中包含的潜在词汇(这里设定最多匹配出相关性最强的三个,不足三个则用PAD填充)。然后,每个字符和包含该字符的词汇组成词汇对,表示为
s c w = { ( c 1 , w s 1 ) , ( c 2 , w s 2 ) , . . . , ( c n , w s n ) } w s i 表示包含 c i 词汇组成的集合 s_{cw} = \{(c_1,ws_1),(c_2,ws_2),...,(c_n,ws_n)\}\quad ws_i表示包含c_i词汇组成的集合 scw={(c1,ws1),(c2,ws2),...,(cn,wsn)}wsi表示包含ci词汇组成的集合
此时就构成了Char-Words Pair Sequence
Lexicon adapter
将输入数据构建成Char-Words Pair Sequence后,句子中的每个位置包含了字符特征和词汇特征,为了把词汇特征注入到BERT中,作者设计了Lexicon adapter
Char-Words Pair Sequence中的第i个位置char-words pair表示为 ( h i c , x i w s ) (h_i^c,x_i^{ws}) (hic,xiws)
- h i c h_i^c hic:第i个位置的字符特征,该特征是BERT的某个Transformer层的输出
- x i w s = { x i 1 w , x i 2 w , . . . , x i m w } x_i^{ws} = \{x_{i1}^w,x_{i2}^w,...,x_{im}^w\} xiws={xi1w,xi2w,...,ximw} :第i个位置字符对应m个词汇的词向量
对词向量使用非线性变换,以至于和字符向量进行维度对齐
v i j w = W 2 ( t a n h ( W 1 x i j w + b 1 ) ) + b 2 j = 1 , . . . , m v_{ij}^w = W_2(tanh(W_1x_{ij}^w + b_1)) + b_2\quad\quad j=1,...,m vijw=W2(tanh(W1xijw+b1))+b2j=1,...,m
- d c d_c dc:字符特征维度
- d w d_w dw:词向量维度
- W 1 ∈ R d c ∗ d w W_1\in R ^{d_c*d_w} W1∈Rdc∗dw
- W 2 ∈ R d c ∗ d c W_2\in R ^{d_c*d_c} W2∈Rdc∗dc
- v i j w ∈ R d c v_{ij}^w \in R^{d_c} vijw∈Rdc
此时,对于Char-Words Pair Sequence中的第i个位置,进行维度变换后的词向量集合为
V i = ( v i 1 w , . . . , v i m w ) ∈ R m ∗ d c V_i = (v_{i1}^w,...,v_{im}^w)\in R^{m*d_c} Vi=(vi1w,...,vimw)∈Rm∗dc
此时,使用注意力机制对 V i V_i Vi中的m个词向量进行融合
a i = s o f t m a x ( h i c W a t t n V i T ) a_i = softmax(h_i^cW_{attn}V_i^T) ai=softmax(hicWattnViT)
- h i c h_i^c hic:为query向量
- V i V_i Vi:为value
- a i a_i ai:使用双线性变换矩阵计算相似度得分得到
之后,再利用相似度得分对 V i V_i Vi进行加权求和得到融合后词特征:
z i w = ∑ j = 1 m a i j v i j w z_i^w = \sum_{j=1}^m a_{ij}v_{ij}^w ziw=j=1∑maijvijw
最后,再把字符特征和融合后的词特征相加得到:
h ^ i = h i c + z i w \hat h_i^ = h_i^c + z_i^w h^i=hic+ziw
Lexicon Enhanced BERT
对于给定的中文句子 s c = { c 1 , c 2 , . . . , c n } s_c = \{c_1,c_2,...,c_n\} sc={c1,c2,...,cn},将其构建成character-words pair sequence形式
s c w = { ( c 1 , w s 1 ) , ( c 2 , w s 2 ) , . . . , ( c n , w s n ) } s_{cw} = \{(c_1,ws_1),(c_2,ws_2),...,(c_n,ws_n)\} scw={(c1,ws1),(c2,ws2),...,(cn,wsn)}
将 { c 1 , c 2 , . . . , c n } \{c_1,c_2,...,c_n\} {c1,c2,...,cn}输入到BERT的Input Embedder当中,得到输出 E = { e 1 , e 2 , . . . , e n } E = \{e_1,e_2,...,e_n\} E={e1,e2,...,en},之后将E($H^0 = E $)输入到BERT的Transformer encoder中,每个Transformer encoder表示为如下形式:
$$
G &= Layernormalization(H^{l-1 } + Multiheadattention(H^{l-1}))\
H &= Layernormalization(G + FFN(G))
$$
之后,通过Lexicon Adapter把词汇信息注入到第k层和第k+1层的Transformer层之间
第k层Transformer层的输出为 H k = { h 1 k , h 2 k , . . . , h n k } H^k = \{h_1^k,h_2^k,...,h_n^k\} Hk={h1k,h2k,...,hnk}。将其中的每一个Char-Words Pair( h i k , x i w s h_i^k,x_i^{ws} hik,xiws)利用Lexicon Adapter进行转化得到:
h ^ k = L A ( h i k , x i w s ) \hat h^k = LA(h_i^k,x_i^{ws}) h^k=LA(hik,xiws)
代码讲解
词向量处理
加载词向量(load_word_embedding)
input
- word_embed_path:词向量的路径,这里选取腾讯的tencent-ailab-embedding-zh-d200-v0.2.0-s.txt
- 词的个数为2000000,向量维度为200
- max_scan_num:最多加载多少个词向量
output
- word_embed_dict:每个词对应的词向量 200000 * 200
- word_list:词集合,2000000
- word_embed_dim:词的维度,200
@classmethoddef build_trie_tree(cls, word_list, save_path):"""# todo 是否不将单字加入字典树中构建字典树:return:"""logger.info('building trie tree')trie_tree = Trie()for word in word_list:trie_tree.insert(word)write_pickle(trie_tree, save_path)return trie_treedef load_word_embedding(cls, word_embed_path, max_scan_num):"""todo 存在许多单字的,考虑是否去掉"""logger.info('loading word embedding from pretrain')word_embed_dict = dict()word_list = list()with open(word_embed_path, 'r', encoding='utf8') as f:for idx, line in tqdm(enumerate(f)):# 只扫描前max_scan_num个词向量if idx > max_scan_num:breakitems = line.strip().split()if idx == 0:assert len(items) == 2num_embed, word_embed_dim = itemsnum_embed, word_embed_dim = int(num_embed), int(word_embed_dim)else:assert len(items) == word_embed_dim + 1word = items[0]embedding = np.empty([1, word_embed_dim])embedding[:] = items[1:]word_embed_dict[word] = embeddingword_list.append(word)logger.info('word_embed_dim:{}'.format(word_embed_dim))logger.info('size of word_embed_dict:{}'.format(len(word_embed_dict)))logger.info('size of word_list:{}'.format(len(word_list)))return word_embed_dict, word_list, word_embed_dim
构建字典树(build_trie_tree)
input
- word_list:word_list:词集合,2000000
- save_path:字典树的保存路径(方便复用)
output
- trie_tree:字典树
@classmethoddef build_trie_tree(cls, word_list, save_path):"""# todo 是否不将单字加入字典树中"""logger.info('building trie tree')trie_tree = Trie()for word in word_list:trie_tree.insert(word)write_pickle(trie_tree, save_path)return trie_treedef write_pickle(x, path):with open(path, 'wb') as f:pickle.dump(x, f)
找到数据集中的所有单词(get_words_from_corpus)
input
- files:训练、验证、测试的文件路径
- save_file:文件保存路径
- trie_tree:字典树
output
- all_matched_words:找到了所有跟我们训练、验证、测试数据有关的所有词
@classmethoddef get_words_from_corpus(cls, files, save_file, trie_tree):"""找出文件中所有匹配的单词"""logger.info('getting words from corpus')all_matched_words = set()for file in files:with open(file, 'r', encoding='utf8') as f:lines = f.readlines()for idx in trange(len(lines)):line = lines[idx].strip()data = json.loads(line)text = data['text']matched_words = cls.get_words_from_text(text, trie_tree)_ = [all_matched_words.add(word) for word in matched_words]all_matched_words = list(all_matched_words)all_matched_words = sorted(all_matched_words)write_lines(all_matched_words, save_file)return all_matched_words@classmethoddef get_words_from_text(cls, text, trie_tree):"""找出text中所有的单词"""length = len(text)matched_words_set = set() # 存储匹配到的单词for idx in range(length):sub_text = text[idx:idx + trie_tree.max_depth]words = trie_tree.enumerateMatch(sub_text)_ = [matched_words_set.add(word) for word in words]matched_words_set = list(matched_words_set)matched_words_set = sorted(matched_words_set)return matched_words_setdef write_lines(lines, path, encoding='utf8'):with open(path, 'w', encoding=encoding) as f:for line in lines:f.writelines('{}\n'.format(line))
初始化模型的词向量(init_model_word_embedding)
input
- corpus_words:所有跟我们训练、验证、测试数据有关的所有词
- word_embed_dict:每个词对应的词向量 200000 * 200
- save_embed_path:词向量的保存路径
- save_word_vocab_path:词表的保存保存
output
- model_word_embedding:模型的嵌入向量 --> 20857 * 200
- word_vocab:模型的词表 --> 20857
- embed_dim:嵌入维度 --> 200
def init_model_word_embedding(self, corpus_words, word_embed_dict, save_embed_path, save_word_vocab_path):logger.info('initializing model word embedding')# 构建单词和id的映射word_vocab = Vocabulary(corpus_words, vocab_type='word')# embed_dim = len(word_embed_dict.items()[1].size)embed_dim = next(iter(word_embed_dict.values())).sizescale = np.sqrt(3.0 / embed_dim)model_word_embedding = np.empty([word_vocab.size, embed_dim])matched = 0not_matched = 0for idx, word in enumerate(word_vocab.idx2token):if word in word_embed_dict:model_word_embedding[idx, :] = word_embed_dict[word]matched += 1else:model_word_embedding[idx, :] = np.random.uniform(-scale, scale, [1, embed_dim])not_matched += 1logger.info('num of match:{}, num of not_match:{}'.format(matched, not_matched))write_pickle(model_word_embedding, save_embed_path)write_pickle(word_vocab, save_word_vocab_path)return model_word_embedding, word_vocab, embed_dim
数据加载格式
每个汉字对应的单词列表(get_char2words)
input:
- text:文本
output:
- char_index2words:文本中每个汉字所对应的词
def get_char2words(self, text):"""获取每个汉字,对应的单词列表"""text_len = len(text)char_index2words = [[] for _ in range(text_len)]for idx in range(text_len):sub_sent = text[idx:idx + self.trie_tree.max_depth] # speed using max depthwords = self.trie_tree.enumerateMatch(sub_sent) # 找到以text[idx]开头的所有单词for word in words:start_pos = idxend_pos = idx + len(word)for i in range(start_pos, end_pos):char_index2words[i].append(word)return char_index2words
数据加载格式(collate)
output
- 特征输入为:
'text': text,
'input_ids': input_ids,
'attention_mask': input_mask,
'token_type_ids': token_type_ids,
'word_ids': word_ids,
'word_mask': word_mask,
'label_ids': label_ids
def get_input_data(self, file):lines = load_lines(file)features = []cls_token_id = self.tokenizer.cls_token_idsep_token_id = self.tokenizer.sep_token_idpad_token_id = self.tokenizer.pad_token_ido_label_id = self.label_vocab.convert_token_to_id('O')pad_label_id = self.label_vocab.convert_token_to_id('[PAD]')for line in tqdm(lines):data = json.loads(line)text = data['text']labels = data['label']char_index2words = self.get_char2words(text)# 在开头与结尾分别添加[CLS]与[SEP]input_ids = [cls_token_id] + self.tokenizer.convert_tokens_to_ids(text) + [sep_token_id]label_ids = [o_label_id] + self.label_vocab.convert_tokens_to_ids(labels) + [o_label_id]word_ids_list = []word_pad_id = self.word_vocab.convert_token_to_id('[PAD]')for words in char_index2words:words = words[:self.max_word_num]word_ids = self.word_vocab.convert_tokens_to_ids(words)word_pad_num = self.max_word_num - len(words)word_ids = word_ids + [word_pad_id] * word_pad_numword_ids_list.append(word_ids)# 开头和结尾进行paddingword_ids_list = [[word_pad_id]*self.max_word_num] + word_ids_list + [[word_pad_id]*self.max_word_num]if len(input_ids) > self.max_seq_len:input_ids = input_ids[: self.max_seq_len]label_ids = label_ids[: self.max_seq_len]word_ids_list = word_ids_list[: self.max_seq_len]input_mask = [1] * len(input_ids)token_type_ids = [0] * len(input_ids)assert len(input_ids) == len(label_ids) == len(word_ids_list)# paddingpadding_length = self.max_seq_len - len(input_ids)input_ids += [pad_token_id] * padding_lengthinput_mask += [0] * padding_lengthtoken_type_ids += [0] * padding_lengthlabel_ids += [pad_label_id] * padding_lengthword_ids_list += [[word_pad_id]*self.max_word_num] * padding_lengthtext = ''.join(text)input_ids = torch.LongTensor(input_ids)label_ids = torch.LongTensor(label_ids)input_mask = torch.LongTensor(input_mask)token_type_ids = torch.LongTensor(token_type_ids)word_ids = torch.LongTensor(word_ids_list)word_mask = (word_ids != word_pad_id).long()feature = {'text': text, 'input_ids': input_ids, 'attention_mask': input_mask, 'token_type_ids': token_type_ids,'word_ids': word_ids, 'word_mask': word_mask, 'label_ids': label_ids}features.append(feature)return features
模型运行
模型初步加载
- config.word_vocab_size:20857
- config.word_embed_dim:200
- LEBertModel:对其中的BertEncoder模块进行改造,后续会详细介绍
class LEBertSoftmaxForNer(BertPreTrainedModel):def __init__(self, config):super(LEBertSoftmaxForNer, self).__init__(config)self.word_embeddings = nn.Embedding(config.word_vocab_size, config.word_embed_dim)self.num_labels = config.num_labelsself.bert = LEBertModel(config)self.dropout = nn.Dropout(config.hidden_dropout_prob)self.classifier = nn.Linear(config.hidden_size, config.num_labels)self.loss_type = config.loss_typeself.init_weights()def forward(self, input_ids, attention_mask, token_type_ids, word_ids, word_mask, ignore_index, labels=None):word_embeddings = self.word_embeddings(word_ids)outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids,word_embeddings=word_embeddings, word_mask=word_mask)sequence_output = outputs[0]sequence_output = self.dropout(sequence_output)logits = self.classifier(sequence_output)outputs = (logits,) + outputs[2:] # add hidden states and attention if they are hereif labels is not None:assert self.loss_type in ['lsr', 'focal', 'ce']if self.loss_type == 'lsr':loss_fct = LabelSmoothingCrossEntropy(ignore_index=ignore_index)elif self.loss_type == 'focal':loss_fct = FocalLoss(ignore_index=ignore_index)else:loss_fct = CrossEntropyLoss(ignore_index=ignore_index)# Only keep active parts of the lossif attention_mask is not None:active_loss = attention_mask.contiguous().view(-1) == 1active_logits = logits.contiguous().view(-1, self.num_labels)[active_loss]active_labels = labels.contiguous().view(-1)[active_loss]loss = loss_fct(active_logits, active_labels)else:loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))outputs = (loss,) + outputsreturn outputs # (loss), scores, (hidden_states), (attentions)
model.word_embeddings.weight.data.copy_(torch.from_numpy(processor.word_embedding))
- 把词向量的word_embedding赋给LEBertSoftmaxForNer
简要概述
class LEBertModel(BertPreTrainedModel):def __init__(self, config):self.encoder = BertEncoder(config)class BertEncoder(nn.Module):def __init__(self, config):super().__init__()self.config = configself.layer = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)])self.word_embedding_adapter = WordEmbeddingAdapter(config)def forward():# 在第i层之后,进行融合if i == self.config.add_layer:hidden_states = self.word_embedding_adapter(hidden_states, word_embeddings, word_mask)
核心代码
class WordEmbeddingAdapter(nn.Module):def __init__(self, config):super(WordEmbeddingAdapter, self).__init__()self.dropout = nn.Dropout(config.hidden_dropout_prob)self.tanh = nn.Tanh()self.linear1 = nn.Linear(config.word_embed_dim, config.hidden_size)self.linear2 = nn.Linear(config.hidden_size, config.hidden_size)attn_W = torch.zeros(config.hidden_size, config.hidden_size)self.attn_W = nn.Parameter(attn_W)self.attn_W.data.normal_(mean=0.0, std=config.initializer_range)self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)def forward(self, layer_output, word_embeddings, word_mask):""":param layer_output:bert layer的输出,[b_size, len_input, d_model]:param word_embeddings:每个汉字对应的词向量集合,[b_size, len_input, num_word, d_word]:param word_mask:每个汉字对应的词向量集合的attention mask, [b_size, len_input, num_word]"""# transform# 将词向量,与字符向量进行维度对齐word_outputs = self.linear1(word_embeddings)word_outputs = self.tanh(word_outputs)word_outputs = self.linear2(word_outputs)word_outputs = self.dropout(word_outputs) # word_outputs:[b_size, len_input, num_word, d_model]# 计算每个字符向量,与其对应的所有词向量的注意力权重,然后加权求和。采用双线性映射计算注意力权重# layer_output = layer_output.unsqueeze(2) # layer_output:[b_size, len_input, 1, d_model]socres = torch.matmul(layer_output.unsqueeze(2), self.attn_W) # [b_size, len_input, 1, d_model]socres = torch.matmul(socres, torch.transpose(word_outputs, 2, 3)) # [b_size, len_input, 1, num_word]socres = socres.squeeze(2) # [b_size, len_input, num_word]socres.masked_fill_(word_mask, -1e9) # 将pad的注意力设为很小的数socres = F.softmax(socres, dim=-1) # [b_size, len_input, num_word]attn = socres.unsqueeze(-1) # [b_size, len_input, num_word, 1]weighted_word_embedding = torch.sum(word_outputs * attn, dim=2) # [N, L, D] # 加权求和,得到每个汉字对应的词向量集合的表示layer_output = layer_output + weighted_word_embeddinglayer_output = self.dropout(layer_output)layer_output = self.layer_norm(layer_output)return layer_output
# transform# 将词向量,与字符向量进行维度对齐word_outputs = self.linear1(word_embeddings)word_outputs = self.tanh(word_outputs)word_outputs = self.linear2(word_outputs)word_outputs = self.dropout(word_outputs) # word_outputs:[b_size, len_input, num_word, d_model]
# 计算每个字符向量,与其对应的所有词向量的注意力权重,然后加权求和。采用双线性映射计算注意力权重# layer_output = layer_output.unsqueeze(2) # layer_output:[b_size, len_input, 1, d_model]socres = torch.matmul(layer_output.unsqueeze(2), self.attn_W) # [b_size, len_input, 1, d_model]socres = torch.matmul(socres, torch.transpose(word_outputs, 2, 3)) # [b_size, len_input, 1, num_word]socres = socres.squeeze(2) # [b_size, len_input, num_word]socres.masked_fill_(word_mask, -1e9) # 将pad的注意力设为很小的数socres = F.softmax(socres, dim=-1) # [b_size, len_input, num_word]attn = socres.unsqueeze(-1) # [b_size, len_input, num_word, 1]
weighted_word_embedding = torch.sum(word_outputs * attn, dim=2)
layer_output = layer_output + weighted_word_embedding
参照资料
论文解说:https://zhuanlan.zhihu.com/p/374720213
复现代码:https://github.com/yangjianxin1/LEBERT-NER-Chinese