for循环数据量太大_中文文本分类roberta大力出奇迹之数据量大的问题

问题描述: 笔者在文本分类场景中使用了roberta+pool+dense的三分类模型。采用预训练模型做项目的时候经常苦于数据太少,模型泛化性差,因此收集了1300W数据。在我尝试暴力出奇迹的时候,遇到了部分问题,在此记录一下。

一. 数据预处理时间长

尽管数据预处理步骤比较简单,一般也就清洗、分词、token2id等操作(我一般把token2id放在预处理阶段做),但是由于数据量比较大时,也很耗时间。

1.分词提速

jieba分词开启并行很简单,一行代码开启

jieba.enable_parallel(100)

但是这里注意,jieba并行的过程是一个 jieba.cut()这一动作中的,如果输入的是多个句子,则jieba会并行处理多个句子。

# 错误jieba.cut('我爱北京天安门。')jieba.cut('北京是中国首都。')#正确示范jieba.cut('我爱北京天安门。\n北京是中国首都。')

因此如果需要采用并行,需要先包装多行数据,再拆分出来。具体参考

temp_lines = texts[:100]_res = ' '.join((jieba.cut('\n'.join(temp_lines))))split_words = [_.lstrip(' ').rstrip(' ') for _ in _res.split('\n')]res.extend(split_words)
2. 函数并行

当然,使用预训练模型不需要对中文进行分词,因此此处耗时主要在数据清洗,正则处理等操作上,所以方法1已经不起作用啦。过程就是先读取文件到内存,然后分多个进程预处理,最终获取所有进程处理结果,写入文件。

import multiprocessingimport mathdef func(id, texts):    print('enter %d' % (id))    res = []    for i, text in enumerate(texts):        if i % 10000 == 0 and i != 0:            print(i, '/', id)        value=tokenizer.encode(first=text, max_len=seq_len)[0]        res.append(value)    print('leave %d' % (id))    return id, res    def quick_func(texts, func):    pool = multiprocessing.Pool(processes=CPUS)    results = []    for i in range(CPUS):        imin = i*math.ceil(len(texts)/CPUS)        imax = min((i+1)*math.ceil(len(texts)/CPUS), len(texts))        results.append(pool.apply_async(func, (i, texts[imin:imax],)))    pool.close()    pool.join()    print("Sub-process(es) done.")        res = []    for _ in results:        res.append(_.get())    res = sorted(res, key=lambda x: x[0])    texts = []    for _ in res:        texts.extend(_[1])    return texts

注意! func内部出错,子进程直接结束,不raise错误的。如果要调试,最好还是加上traceback查看问题

在func中完全可以使用jieba的cut,不需要开启jieba并行。在处理数据500万左右的时候,各个进程已经结束,但是该函数迟迟不返回值,直到我htop查看内存,发现内存快速增长到150G之后不在增长,程序也卡住啦,发现可能是 raw句子和处理后的结果占用内存过高,因此这个方法也不行了。

3. 文件并行

查看资料的时候发现,有老哥处理几亿行文件使用linux bash来预处理,速度明显提升,我也想尝试,但是最终由于功力不足,失败。折中办法,先将文件拆分成多个文件,python并行处理多个文件并输出结果到多个文件,然后合并多个文件到单个文件。(注意,这里会丢失各行顺序,如果去重需要在外部处理,仅仅是读取写入文件单线程也还是挺快的)

并行读取同一个文件或者并行写入同一个文件是危险的,可能会写入或者读取混乱(错误)

import multiprocessingCPUS = 40IN_DATA_SPLIT_PREFIX = 'data-split-tmp-in'OUT_DATA_SPLIT_PREFIX = 'data-split-tmp-out'seq_len = 256  # 文本最大长度vocab_path = '../../basedata/chinese_wwm_ext_L-12_H-768_A-12/vocab.txt'tokenizer = LimitBertTokenizer(vocab_path)  # text2 *4 < text1char_tool = CharTools()def _clean_tokenize(infile, outfile, filters=[]):    fin = open(infile, 'r')    fout = open(outfile, 'w')    for i, line in enumerate(fin):        if i % 10000 == 0 and i != 0:            print(i, ' / ', infile)        items = line.strip().split('\t')        ###########------------#########                if len(items) != 6:            continue        object_id, _, operator_id, title, content, action = items        type = 'model' if operator_id == '0' else 'human'        if type in filters:            continue        if action not in action2label.keys():            continue        label = action2label[action]        title = char_tool.clean(title)        content = char_tool.clean(content)        title = title[:seq_len]        content = content[:seq_len]        wordids, segmentids = tokenizer.encode(            first=content, second=title, max_len=seq_len)        fout.write(json.dumps(            {'type': type, 'label': label, 'wordids': wordids, 'segmentids': segmentids})+'\n')        ###########------------#########    fin.close()    fout.close()def parallel(_func, infile, outfile, filters=[]):    os.system('split -n l/%d %s %s' %              (CPUS, infile, IN_DATA_SPLIT_PREFIX))    print("split files done")    pool = multiprocessing.Pool(processes=CPUS)    for small_data_file_in in [_ for _ in os.listdir('.') if _.startswith(IN_DATA_SPLIT_PREFIX)]:        small_data_file_out = small_data_file_in.replace(            IN_DATA_SPLIT_PREFIX, OUT_DATA_SPLIT_PREFIX)        pool.apply_async(_func, args=(            small_data_file_in, small_data_file_out, filters,))    pool.close()    pool.join()    print("Sub-process(es) done.")    os.system('cat %s* > %s' % (OUT_DATA_SPLIT_PREFIX, outfile))    os.system('rm %s*' % (IN_DATA_SPLIT_PREFIX))    os.system('rm %s*' % (OUT_DATA_SPLIT_PREFIX))    print("done.")

二. numpy加载后占用内存太大

之前由于机器内存够用+数据量不算太大,在训练过程中我都是加载前文处理的json文件为numpy数据然后使用model.fit()进行训练的,代码如下

def load_from_json(filename):    labels = []    wordids = []    segmentids = []    with open(filename, 'r') as f:        for i, line in enumerate(f):            if i % 100000 == 0 and i != 0:                print('载入数据:%d ' % i)            item = json.loads(line.strip())            labels.append(item['label'])            wordids.append(item['wordids'])    wordids = np.array(wordids)    segmentids = np.zeros((len(labels), seq_len), int)    labels = tf.keras.utils.to_categorical(labels)    [train_wordids, val_wordids, train_segmentids, val_segmentids,     train_label3s, val_label3s] = train_test_split(wordids, segmentids, label3s, test_size=0.01, stratify=labels,random_state=0)    return [[train_wordids, train_segmentids],            [train_label3s],            [val_wordids, val_segmentids],            [val_label3s]]train_X,train_y,val_X,val_y=load_from_json()model.fit(train_X, train_Y,          validation_data=(val_X, val_Y),)

直到,我遇到了千万数据集,首先读取后占用机器超过150G内存,另外python提示单个变量占用超过10%内存,就此程序卡住,因此不得不更换方法。

  • tf.data: 官方推荐的方法,但是我感觉使用json或者re都不是很方便,加上tf.function写起来不是很方便,放弃。
  • data.generator:一般generator很常见,但是很多人使用的时候都是把数据完全读进内存,然后在generator中实现shuffle和输出batch的功能(就没有generator的作用啦),这里由于数据量太大,明显是不能读取所有数据进内存的。为了保持shuffle的功能,这里还是顺序读取文件,但是维持一个buffer, 在buffer中对数据进行shuffle。
class DataGenerator():    # 读取 generator    def __init__(self, json_file, batch_size=2, min_buffer_size=200000, max_buffer_size=300000, shuffle=True):        self.f = open(json_file, 'r')        self.batch_size = batch_size        file_len = int(os.popen('wc -l %s' % json_file).read().split()[0])        self.len = math.ceil(file_len / batch_size)        self.buffer_lines = []        self.max_buffer_size = max_buffer_size        self.min_buffer_size = min_buffer_size        self.shuffle = shuffle        self.check_load()    def __len__(self):        return self.len    def _read_line(self):        """获取一行数据"""        line = self.f.readline()        if not line:            self.f.seek(0)            line = self.f.readline()        return line    def check_load(self):        """保证buffer中的数据量满足要求"""        if len(self.buffer_lines) > self.min_buffer_size:            return        else:            while len(self.buffer_lines) <= self.max_buffer_size:                self.buffer_lines.append(self._read_line())                            if self.shuffle:                random.shuffle(self.buffer_lines)    def _handle(self, lines):        pass    def __iter__(self):        while True:            self.check_load()            lines, self.buffer_lines = self.buffer_lines[:self.batch_size], self.buffer_lines[self.batch_size:]            yield self._handle(lines)class MyDataGenerator(DataGenerator):    def _handle(self, lines):        word_ids, segment_ids, labels = [], [], []        for line in lines:            item = json.loads(line.strip())            labels.append(item['label'])            word_ids.append(item['wordids'])            segment_ids.append(item['segmentids'])        word_ids = np.array(word_ids)        segment_ids = np.array(segment_ids)        labels = tf.keras.utils.to_categorical(labels, num_classes=3)        return [word_ids, segment_ids], labelstrain_data = MyDataGenerator(params.tain_file, batch_size=params.finetune_batch_size*gpus,                             min_buffer_size=100000, max_buffer_size=300000)val_data = MyDataGenerator(params.val_file, batch_size=params.finetune_batch_size*gpus,                           min_buffer_size=0, max_buffer_size=100000, shuffle=False)model.fit(iter(train_data),          validation_data=iter(val_data),          steps_per_epoch=len(train_data),          validation_steps=len(val_data))

具体实现了一个DataGenerator父类,其必须包含数据条目(可返回batch个数),因为model.fit中需要指定其迭代次数。为了保证generaotr持续有输出,在读取文件到末尾的时候,自动返回文件头。另外由于是在buffer中shuffle,其不能保证文件中的各行只输出一次(但是能保证一个epoch最多max_buffer_size个重复的),需要依据数据条目酌情设置,这里应该优化,在达到文件末尾后等全量buffer清空后在seed到文件头。另外,实现了子类,具体实现lines到numpy的操作。其实也可以把数据预处理和token2id放在这里,但是每个epoch都要处理一次,有点浪费时间,因此习惯把所有预处理和toekn2id放到train前的预处理脚本中。

三. 模型训练速度慢,需要多卡训练

在tf2之后并行更简单了,代码如下:

import tensorflow as tffrom keras_bert import load_trained_model_from_checkpointdef create_model(bert_train=False):    bert = load_trained_model_from_checkpoint(        config_path, checkpoint_path,        training=False,        trainable=bert_train,        seq_len=SEQ_LEN,)    inputs = bert.inputs[:2]    dense = bert.get_layer('Encoder-12-FeedForward-Norm').output    dense = tf.keras.layers.Lambda(lambda x: x[:, 1:, :])(dense)    dense1 = tf.keras.layers.GlobalMaxPool1D()(dense)    dense2 = tf.keras.layers.GlobalAveragePooling1D()(dense)    dense = tf.keras.layers.Concatenate()([dense1, dense2])    dense = tf.keras.layers.Dense(params.dnn_units, activation='relu')(dense)    dense = tf.keras.layers.Dropout(rate=params.dropout)(dense)    output = tf.keras.layers.Dense(        units=3, activation='softmax', name='3cls')(dense)    model = tf.keras.models.Model(inputs,  output)    return modelos.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7"gpus = tf.config.experimental.list_physical_devices(device_type='GPU')for gpu in gpus:    tf.config.experimental.set_memory_growth(gpu, True)gpus = len(os.environ["CUDA_VISIBLE_DEVICES"].split(','))strategy = tf.distribute.MirroredStrategy()with strategy.scope():    model = create_model(bert_train=False)    scheduler = tf.keras.callbacks.ReduceLROnPlateau(        monitor='val_loss', factor=0.5, patience=int(params.fit_opt_patience), min_delta=1e-7)    loss = LossGenerate(params.model_loss)    metrics = ['accuracy']    optimizer = tf.keras.optimizers.Adam(params.fit_lr)    csvlogger = tf.keras.callbacks.CSVLogger(os.path.join(        params.model_dir, 'log.tsv'), append=True, separator='\t')    earlystop = tf.keras.callbacks.EarlyStopping(        monitor='val_loss', patience=params.fit_patience)    checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(params.model_dir, 'stage1.weight.h5'),                                                    save_weights_only=True, save_best_only=True)    model.compile(loss=loss, metrics=metrics,                  optimizer=optimizer)

只需要在strategy.scope()下定义模型就行,很简单。但是我也遇到一个问题: 在预测时,在strategy.scope()加载存储的模型文件报错:

from keras_bert import get_custom_objectscustom_objects = get_custom_objects()with strategy.scope():	model = tf.keras.models.load_model(            model_path, custom_objects=custom_objects)# 报错

具体错误google很久也没有结果,最终发现在strategy.scope下载入权重文件是可以的(可能是哪里实现兼容性不强吧),代码:

with strategy.scope():	model = create_model(bert_train=False)	model.load_weights(os.path.join(params.model_dir, 'stage1.weight.h5'))

实验结果

最终,在6卡v100并行下, 1000万长度384的分类模型训练好啦。stage1为固定bert训练结果, 01-0.4238为所有参数train的结果。843ba4a0f417d079c7f0647244728e2f.png发现了:1000W数据,max-len设置为384, RoBERTa-wwm-ext 模型训练需要接近25小时。其实还是蛮快的.... 另外: 大力出奇迹的模型效果还可以!!!

为了凑够1万字,放一下上文用到的LossGenerate函数

def LossGenerate(name='ce', *args, **kwargs):    NAMES = ('ce', 'focal', 'dmi')    kwargs = locals()['kwargs']    assert (name in NAMES), ' loss not defined!!!'    if name == 'ce':        return tf.keras.losses.CategoricalCrossentropy()    if name == 'focal':        gamma = kwargs.get('gamma', 2.)        alpha = kwargs.get('alpha', 0.25)        def categorical_focal_loss_fixed(y_true, y_pred):            y_pred /= K.sum(y_pred, axis=-1, keepdims=True)            epsilon = K.epsilon()            y_pred = K.clip(y_pred, epsilon, 1. - epsilon)            cross_entropy = -y_true * K.log(y_pred)            loss = alpha * K.pow(1 - y_pred, gamma) * cross_entropy            return K.mean(loss, axis=1)        return categorical_focal_loss_fixed    if name == 'dmi':        def dmi_loss(y_true, y_pred):            y_true = tf.transpose(y_true, perm=[1, 0])            mat = tf.matmul(y_true, y_pred)            loss = -1.0 * tf.math.log(tf.math.abs(tf.linalg.det(mat)) + 0.001)            return loss        return dmi_loss

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/428229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

future.cancel不能关闭线程_彻底弄懂线程池-newFixedThreadPool实现线程池

public class ExecutorServiceTest {public static void main(String[] args) throws IOException, InterruptedException {// 创建一个固定大小的线程池ExecutorService service Executors.newFixedThreadPool(3);for (int i 0; i < 10; i) {System.out.println("创…

Spring实战(十三)Spring事务

1、什么是事务&#xff08;Transaction&#xff09;&#xff1f; 事务是指逻辑上的一组操作&#xff0c;要么全部成功&#xff0c;要么全部失败。 事务是指将一系列数据操作捆绑成为一个整体进行统一管理。如果某一事务执行成功&#xff0c;则该事务中进行的所有数据更改均会提…

解密SVM系列(二):SVM的理论基础(转载)

解密SVM系列&#xff08;二&#xff09;&#xff1a;SVM的理论基础 原文博主讲解地太好了 收藏下 解密SVM系列&#xff08;三&#xff09;&#xff1a;SMO算法原理与实战求解 支持向量机通俗导论&#xff08;理解SVM的三层境界&#xff09; 上节我们探讨了关于拉格朗日乘子…

cout输出数组_让程序从1开始一直执行++操作,10秒钟能输出最大的数是多少

问题描述如果写一段代码&#xff0c;让程序从 1 开始一直执行 操作&#xff0c;在规定的 10s 钟内&#xff0c;你能输出的最大数是多少&#xff1f;并将它打印到屏幕上。乍一看&#xff0c;你会觉得它是一道算法题&#xff0c;再细想&#xff1a;不对&#xff01;这可能是一道…

微信公众号管理

微信公众号图文编辑 在新建图文时&#xff0c;如果有想插入的视频&#xff0c;可以采取获取视频链接再导入的方法&#xff0c;这样会更高效美观。 摘要和多个图文信息的重叠 新建图文信息&#xff0c;在设置封面旁边有图文摘要&#xff0c;图文摘要会在发送出去的界面旁边有显…

页面模板

找了半天的公众号模板&#xff0c;有点坑&#xff0c;微信公众号更新太快了&#xff0c;几个月不看都找不到使用功能了。 页面模板位置 页面模板在现在的话题专辑&#xff0c;如图: 在页面排版中可以实现我一直想要的菜单整理化功能 可以将文章整理后&#xff0c;发布在菜单…

SQL语句输出

select ,print均可以做输出 但如果想用print同时输出字符串和数字时&#xff0c;就需要遇到转换函数convert: declare allstudents int e.g.print’毕业人数为’convert(char,allstudents) 在执行时可能会遇到结果中显示了你的输出信息&#xff0c;而在消息中却没有&#xff0…

.NET面试题解析(04)-类型、方法与继承

转自:http://www.cnblogs.com/anding/p/5248973.html 常见面试题目: 1. 所有类型都继承System.Object吗&#xff1f; 2. 解释virtual、sealed、override和abstract的区别 3. 接口和类有什么异同&#xff1f; 4. 抽象类和接口有什么区别&#xff1f;使用时有什么需要注意的吗&a…

初学python的format之美

初学python的format之美 *区别于C语言的输出语句的是python的输出用的是"print"而不是“printf”。 1.简单的字符串和变量一起输出 python中的替代使用的是“{}” nameinput("请输入一个人的名字") countryinput("请输入一个国家的名字") prin…

20.链式队列

运行截图: 完整代码: 1 #include <stdio.h>2 #include <stdlib.h>3 4 #define datatype int5 6 typedef struct queue7 {8 datatype data;9 struct queue *pNext; 10 }Queue,*PQueue; 11 12 //入队 从尾部入,从头部出 13 PQueue enq(PQueue phead, dataty…

1到n阶乘算法的改进

1到n阶乘算法的改进 之前用到过好几次了&#xff0c;但总是很长时间不用就会忘了&#xff0c;所以这次直接把它扔进来了。 之前总是喜欢用双层循环&#xff0c;其实一个单层循环足以&#xff0c;下面将用Python和C两种语言进行展示 C&#xff1a; #include<iostream> …

lua 调用文件中的函数调用_深入Lua:调用相关的指令

前言这一节我们来深入解析与调用相关的指令&#xff0c;这些指令是&#xff1a;OP_CALL 调用OP_TAILCALL 尾调用OP_VARARG 可变参数OP_RETURN 返回解析这些指令的过程中&#xff0c;最重要的是时刻跟踪栈的变化情况。简单调用OP_CALL 的语法是&#xff1a;R(A), ... ,R(AC-2) :…

算法基础——列表查找

whats the 算法 算法&#xff08;Algorithm&#xff09;是指解题方案的准确而完整的描述&#xff0c;是一系列解决问题的清晰指令&#xff0c;算法代表着用系统的方法描述解决问题的策略机制。也就是说&#xff0c;能够对一定规范的输入&#xff0c;在有限时间内获得所要求的输…

Python画板画图之美

Python画板画图之美 *turtle.done() #可让画板窗口停止*1.绘制同切圆 pensize为画笔宽度 circle(n),n为半径大小&#xff0c;两者单位均为像素 import turtle turtle.pensize(2) #画笔宽度&#xff0c;单位为像素 turtle.circle(10) #圆半径&…

python request库_【Python爬虫】Request库入门

什么是爬虫&#xff1f; 网络爬虫&#xff08;又被称为网页蜘蛛&#xff0c;网络机器人&#xff0c;在FOAF社区中间&#xff0c;更经常的称为网页追逐者&#xff09;&#xff0c;是一种按照一定的规则&#xff0c;自动地抓取万维网信息的程序或者脚本。另外一些不常使用的名字还…

world文档粘贴图片进去看不到

在大学做实验报告的时候经常要插入一些截图&#xff0c;往往会遇到直接复制粘贴图片会看不到的情况&#xff0c;会很烦&#xff0c;我自己也找了好多方法&#xff0c;下面是我最喜欢的简便方法的一种。 解决方法如下: 可以点击鼠标右键选择段落&#xff0c;将行距修改为1.5倍即…

c盘users的用户名怎么改_怎么修改iPhone备份文件夹路径 iPhone C盘路径修改教程【详解】...

iPhone备份文件夹路径怎么修改_iPhone C盘备份路径修改教程 我们都知道iPhone默认的备份是在C盘&#xff0c;不过现在各种视频、照片体积那么大&#xff0c;小小的C盘只怕是负荷不了了&#xff0c;那么有什么办法去修改备份路径呢&#xff0c;下面小编就为大家介绍一下。需要注…

javaweb开发的准备工作——配置篇

1.配置 a. jdk配置&#xff08;用于web开发编程&#xff0c;此处不需要配置path&#xff0c;只需配置环境变量即可&#xff09; b. tomcat配置 打开Tomcat&#xff08;打开bin目录下的startup.bat文件&#xff09; 检验是否打开成功(两个网址都可以&#xff0c;出现汤姆猫即为…

阅读引擎开源项目调研总结

农历腊月初二&#xff0c;也是冬至后的第四个九天&#xff0c;俗称“四九”。冬至这一天开始数九&#xff0c;这就是人们所说的“提冬数九”。数上9天是一九&#xff0c;再数9天是二九……数到“九九”就算“九”尽了&#xff0c;“九尽杨花开”&#xff0c;那时天就暖了。《九…

ftp 上传文件夹_命令行连接FTP服务器

Windows下&#xff1a;打开命令行窗口&#xff0c;输入 ftp&#xff0c;进入ftp命令模式&#xff1a;输入 open ip地址 端口&#xff0c;进入ftp服务器&#xff0c;如open 172.16.3.77 2121。如下图&#xff1a;输入Windows下的用户名&#xff0c;然后输入密码&#xff08;注意…