问题描述: 笔者在文本分类场景中使用了roberta+pool+dense的三分类模型。采用预训练模型做项目的时候经常苦于数据太少,模型泛化性差,因此收集了1300W数据。在我尝试暴力出奇迹的时候,遇到了部分问题,在此记录一下。
一. 数据预处理时间长
尽管数据预处理步骤比较简单,一般也就清洗、分词、token2id等操作(我一般把token2id放在预处理阶段做),但是由于数据量比较大时,也很耗时间。
1.分词提速
jieba分词开启并行很简单,一行代码开启
jieba.enable_parallel(100)
但是这里注意,jieba并行的过程是一个 jieba.cut()这一动作中的,如果输入的是多个句子,则jieba会并行处理多个句子。
# 错误jieba.cut('我爱北京天安门。')jieba.cut('北京是中国首都。')#正确示范jieba.cut('我爱北京天安门。\n北京是中国首都。')
因此如果需要采用并行,需要先包装多行数据,再拆分出来。具体参考
temp_lines = texts[:100]_res = ' '.join((jieba.cut('\n'.join(temp_lines))))split_words = [_.lstrip(' ').rstrip(' ') for _ in _res.split('\n')]res.extend(split_words)
2. 函数并行
当然,使用预训练模型不需要对中文进行分词,因此此处耗时主要在数据清洗,正则处理等操作上,所以方法1已经不起作用啦。过程就是先读取文件到内存,然后分多个进程预处理,最终获取所有进程处理结果,写入文件。
import multiprocessingimport mathdef func(id, texts): print('enter %d' % (id)) res = [] for i, text in enumerate(texts): if i % 10000 == 0 and i != 0: print(i, '/', id) value=tokenizer.encode(first=text, max_len=seq_len)[0] res.append(value) print('leave %d' % (id)) return id, res def quick_func(texts, func): pool = multiprocessing.Pool(processes=CPUS) results = [] for i in range(CPUS): imin = i*math.ceil(len(texts)/CPUS) imax = min((i+1)*math.ceil(len(texts)/CPUS), len(texts)) results.append(pool.apply_async(func, (i, texts[imin:imax],))) pool.close() pool.join() print("Sub-process(es) done.") res = [] for _ in results: res.append(_.get()) res = sorted(res, key=lambda x: x[0]) texts = [] for _ in res: texts.extend(_[1]) return texts
注意! func内部出错,子进程直接结束,不raise错误的。如果要调试,最好还是加上traceback查看问题
在func中完全可以使用jieba的cut,不需要开启jieba并行。在处理数据500万左右的时候,各个进程已经结束,但是该函数迟迟不返回值,直到我htop查看内存,发现内存快速增长到150G之后不在增长,程序也卡住啦,发现可能是 raw句子和处理后的结果占用内存过高,因此这个方法也不行了。
3. 文件并行
查看资料的时候发现,有老哥处理几亿行文件使用linux bash来预处理,速度明显提升,我也想尝试,但是最终由于功力不足,失败。折中办法,先将文件拆分成多个文件,python并行处理多个文件并输出结果到多个文件,然后合并多个文件到单个文件。(注意,这里会丢失各行顺序,如果去重需要在外部处理,仅仅是读取写入文件单线程也还是挺快的)
并行读取同一个文件或者并行写入同一个文件是危险的,可能会写入或者读取混乱(错误)
import multiprocessingCPUS = 40IN_DATA_SPLIT_PREFIX = 'data-split-tmp-in'OUT_DATA_SPLIT_PREFIX = 'data-split-tmp-out'seq_len = 256 # 文本最大长度vocab_path = '../../basedata/chinese_wwm_ext_L-12_H-768_A-12/vocab.txt'tokenizer = LimitBertTokenizer(vocab_path) # text2 *4 < text1char_tool = CharTools()def _clean_tokenize(infile, outfile, filters=[]): fin = open(infile, 'r') fout = open(outfile, 'w') for i, line in enumerate(fin): if i % 10000 == 0 and i != 0: print(i, ' / ', infile) items = line.strip().split('\t') ###########------------######### if len(items) != 6: continue object_id, _, operator_id, title, content, action = items type = 'model' if operator_id == '0' else 'human' if type in filters: continue if action not in action2label.keys(): continue label = action2label[action] title = char_tool.clean(title) content = char_tool.clean(content) title = title[:seq_len] content = content[:seq_len] wordids, segmentids = tokenizer.encode( first=content, second=title, max_len=seq_len) fout.write(json.dumps( {'type': type, 'label': label, 'wordids': wordids, 'segmentids': segmentids})+'\n') ###########------------######### fin.close() fout.close()def parallel(_func, infile, outfile, filters=[]): os.system('split -n l/%d %s %s' % (CPUS, infile, IN_DATA_SPLIT_PREFIX)) print("split files done") pool = multiprocessing.Pool(processes=CPUS) for small_data_file_in in [_ for _ in os.listdir('.') if _.startswith(IN_DATA_SPLIT_PREFIX)]: small_data_file_out = small_data_file_in.replace( IN_DATA_SPLIT_PREFIX, OUT_DATA_SPLIT_PREFIX) pool.apply_async(_func, args=( small_data_file_in, small_data_file_out, filters,)) pool.close() pool.join() print("Sub-process(es) done.") os.system('cat %s* > %s' % (OUT_DATA_SPLIT_PREFIX, outfile)) os.system('rm %s*' % (IN_DATA_SPLIT_PREFIX)) os.system('rm %s*' % (OUT_DATA_SPLIT_PREFIX)) print("done.")
二. numpy加载后占用内存太大
之前由于机器内存够用+数据量不算太大,在训练过程中我都是加载前文处理的json文件为numpy数据然后使用model.fit()进行训练的,代码如下
def load_from_json(filename): labels = [] wordids = [] segmentids = [] with open(filename, 'r') as f: for i, line in enumerate(f): if i % 100000 == 0 and i != 0: print('载入数据:%d ' % i) item = json.loads(line.strip()) labels.append(item['label']) wordids.append(item['wordids']) wordids = np.array(wordids) segmentids = np.zeros((len(labels), seq_len), int) labels = tf.keras.utils.to_categorical(labels) [train_wordids, val_wordids, train_segmentids, val_segmentids, train_label3s, val_label3s] = train_test_split(wordids, segmentids, label3s, test_size=0.01, stratify=labels,random_state=0) return [[train_wordids, train_segmentids], [train_label3s], [val_wordids, val_segmentids], [val_label3s]]train_X,train_y,val_X,val_y=load_from_json()model.fit(train_X, train_Y, validation_data=(val_X, val_Y),)
直到,我遇到了千万数据集,首先读取后占用机器超过150G内存,另外python提示单个变量占用超过10%内存,就此程序卡住,因此不得不更换方法。
- tf.data: 官方推荐的方法,但是我感觉使用json或者re都不是很方便,加上tf.function写起来不是很方便,放弃。
- data.generator:一般generator很常见,但是很多人使用的时候都是把数据完全读进内存,然后在generator中实现shuffle和输出batch的功能(就没有generator的作用啦),这里由于数据量太大,明显是不能读取所有数据进内存的。为了保持shuffle的功能,这里还是顺序读取文件,但是维持一个buffer, 在buffer中对数据进行shuffle。
class DataGenerator(): # 读取 generator def __init__(self, json_file, batch_size=2, min_buffer_size=200000, max_buffer_size=300000, shuffle=True): self.f = open(json_file, 'r') self.batch_size = batch_size file_len = int(os.popen('wc -l %s' % json_file).read().split()[0]) self.len = math.ceil(file_len / batch_size) self.buffer_lines = [] self.max_buffer_size = max_buffer_size self.min_buffer_size = min_buffer_size self.shuffle = shuffle self.check_load() def __len__(self): return self.len def _read_line(self): """获取一行数据""" line = self.f.readline() if not line: self.f.seek(0) line = self.f.readline() return line def check_load(self): """保证buffer中的数据量满足要求""" if len(self.buffer_lines) > self.min_buffer_size: return else: while len(self.buffer_lines) <= self.max_buffer_size: self.buffer_lines.append(self._read_line()) if self.shuffle: random.shuffle(self.buffer_lines) def _handle(self, lines): pass def __iter__(self): while True: self.check_load() lines, self.buffer_lines = self.buffer_lines[:self.batch_size], self.buffer_lines[self.batch_size:] yield self._handle(lines)class MyDataGenerator(DataGenerator): def _handle(self, lines): word_ids, segment_ids, labels = [], [], [] for line in lines: item = json.loads(line.strip()) labels.append(item['label']) word_ids.append(item['wordids']) segment_ids.append(item['segmentids']) word_ids = np.array(word_ids) segment_ids = np.array(segment_ids) labels = tf.keras.utils.to_categorical(labels, num_classes=3) return [word_ids, segment_ids], labelstrain_data = MyDataGenerator(params.tain_file, batch_size=params.finetune_batch_size*gpus, min_buffer_size=100000, max_buffer_size=300000)val_data = MyDataGenerator(params.val_file, batch_size=params.finetune_batch_size*gpus, min_buffer_size=0, max_buffer_size=100000, shuffle=False)model.fit(iter(train_data), validation_data=iter(val_data), steps_per_epoch=len(train_data), validation_steps=len(val_data))
具体实现了一个DataGenerator父类,其必须包含数据条目(可返回batch个数),因为model.fit中需要指定其迭代次数。为了保证generaotr持续有输出,在读取文件到末尾的时候,自动返回文件头。另外由于是在buffer中shuffle,其不能保证文件中的各行只输出一次(但是能保证一个epoch最多max_buffer_size个重复的),需要依据数据条目酌情设置,这里应该优化,在达到文件末尾后等全量buffer清空后在seed到文件头。另外,实现了子类,具体实现lines到numpy的操作。其实也可以把数据预处理和token2id放在这里,但是每个epoch都要处理一次,有点浪费时间,因此习惯把所有预处理和toekn2id放到train前的预处理脚本中。
三. 模型训练速度慢,需要多卡训练
在tf2之后并行更简单了,代码如下:
import tensorflow as tffrom keras_bert import load_trained_model_from_checkpointdef create_model(bert_train=False): bert = load_trained_model_from_checkpoint( config_path, checkpoint_path, training=False, trainable=bert_train, seq_len=SEQ_LEN,) inputs = bert.inputs[:2] dense = bert.get_layer('Encoder-12-FeedForward-Norm').output dense = tf.keras.layers.Lambda(lambda x: x[:, 1:, :])(dense) dense1 = tf.keras.layers.GlobalMaxPool1D()(dense) dense2 = tf.keras.layers.GlobalAveragePooling1D()(dense) dense = tf.keras.layers.Concatenate()([dense1, dense2]) dense = tf.keras.layers.Dense(params.dnn_units, activation='relu')(dense) dense = tf.keras.layers.Dropout(rate=params.dropout)(dense) output = tf.keras.layers.Dense( units=3, activation='softmax', name='3cls')(dense) model = tf.keras.models.Model(inputs, output) return modelos.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7"gpus = tf.config.experimental.list_physical_devices(device_type='GPU')for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True)gpus = len(os.environ["CUDA_VISIBLE_DEVICES"].split(','))strategy = tf.distribute.MirroredStrategy()with strategy.scope(): model = create_model(bert_train=False) scheduler = tf.keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.5, patience=int(params.fit_opt_patience), min_delta=1e-7) loss = LossGenerate(params.model_loss) metrics = ['accuracy'] optimizer = tf.keras.optimizers.Adam(params.fit_lr) csvlogger = tf.keras.callbacks.CSVLogger(os.path.join( params.model_dir, 'log.tsv'), append=True, separator='\t') earlystop = tf.keras.callbacks.EarlyStopping( monitor='val_loss', patience=params.fit_patience) checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(params.model_dir, 'stage1.weight.h5'), save_weights_only=True, save_best_only=True) model.compile(loss=loss, metrics=metrics, optimizer=optimizer)
只需要在strategy.scope()下定义模型就行,很简单。但是我也遇到一个问题: 在预测时,在strategy.scope()加载存储的模型文件报错:
from keras_bert import get_custom_objectscustom_objects = get_custom_objects()with strategy.scope(): model = tf.keras.models.load_model( model_path, custom_objects=custom_objects)# 报错
具体错误google很久也没有结果,最终发现在strategy.scope下载入权重文件是可以的(可能是哪里实现兼容性不强吧),代码:
with strategy.scope(): model = create_model(bert_train=False) model.load_weights(os.path.join(params.model_dir, 'stage1.weight.h5'))
实验结果
最终,在6卡v100并行下, 1000万长度384的分类模型训练好啦。stage1为固定bert训练结果, 01-0.4238为所有参数train的结果。发现了:1000W数据,max-len设置为384, RoBERTa-wwm-ext 模型训练需要接近25小时。其实还是蛮快的.... 另外: 大力出奇迹的模型效果还可以!!!
为了凑够1万字,放一下上文用到的LossGenerate函数
def LossGenerate(name='ce', *args, **kwargs): NAMES = ('ce', 'focal', 'dmi') kwargs = locals()['kwargs'] assert (name in NAMES), ' loss not defined!!!' if name == 'ce': return tf.keras.losses.CategoricalCrossentropy() if name == 'focal': gamma = kwargs.get('gamma', 2.) alpha = kwargs.get('alpha', 0.25) def categorical_focal_loss_fixed(y_true, y_pred): y_pred /= K.sum(y_pred, axis=-1, keepdims=True) epsilon = K.epsilon() y_pred = K.clip(y_pred, epsilon, 1. - epsilon) cross_entropy = -y_true * K.log(y_pred) loss = alpha * K.pow(1 - y_pred, gamma) * cross_entropy return K.mean(loss, axis=1) return categorical_focal_loss_fixed if name == 'dmi': def dmi_loss(y_true, y_pred): y_true = tf.transpose(y_true, perm=[1, 0]) mat = tf.matmul(y_true, y_pred) loss = -1.0 * tf.math.log(tf.math.abs(tf.linalg.det(mat)) + 0.001) return loss return dmi_loss