目录
- 1. 前言
- 2. 初步实现
- 3. ParallelFileProcessor
1. 前言
在数据清洗场景下,我们可能需要对一个 .jsonl
文件清洗以得到另一个 .jsonl
文件。一种直观的做法就是逐行读取,逐行清洗,然后逐行写入,这一流程的示意图如下:
相应的代码为:
import jsonlinesdef clean(json_line: dict) -> dict:passinput_jsonl = ''
output_jsonl = ''with jsonlines.open(output_jsonl, mode='w', flush=True) as w:with jsonlines.open(input_jsonl, mode='r') as r:for line in r:line = clean(line)w.write(line)
这种单进程的处理方法非常耗时,若改用多进程,则能显著提高效率。
注意到在时间消耗方面,有
Clean ≫ Read ≈ Write \text{Clean}\gg\text{Read}\approx\text{Write} Clean≫Read≈Write
因此可以考虑让一个进程去 Read \text{Read} Read,多个进程去 Clean & Write \text{Clean}\,\&\text{Write} Clean&Write。
2. 初步实现
import os
import jsonlines
import multiprocessing as mp
from tqdm import tqdmdef read_process(input_jsonl, queue, parallel):with jsonlines.open(input_jsonl, mode='r') as reader:for line in reader:queue.put(line)for _ in range(parallel):queue.put(None)def clean(json_line):pass # Implement your cleaning logic heredef write_process(process_id, queue, fout, lock):with tqdm(desc=mp.current_process().name, position=process_id) as pbar:while True:line = queue.get()if line is None:breakres = clean(line)with lock:fout.write(res)pbar.update(1)if __name__ == "__main__":input_jsonl_path = ''output_jsonl_path = ''fout = jsonlines.open(output_jsonl_path, mode='w', flush=True)queue = mp.Queue(maxsize=10 * 1000 * 1000)lock = mp.Lock()parallel = os.cpu_count()reader_process = mp.Process(target=read_process, args=(input_jsonl_path, queue, parallel))writer_processes = [mp.Process(target=write_process, args=(i, queue, fout, lock))for i in range(parallel)]reader_process.start()for process in writer_processes:process.start()reader_process.join()for process in writer_processes:process.join()fout.close()
在这段代码中,reader_process
用于从 input_jsonl_path
中逐步读取每一行并放到通信队列中。writer_processes
用于从通信队列中并行地读取数据然后进行清洗并写入到 fout
。
3. ParallelFileProcessor
为了提高代码的可复用性,我们将上述过程封装成了一个类,假设它被设计来适应以下场景:
- 输入不一定是
jsonl
文件,还可能是其他后缀的文件。 - 输入也可能是一个目录,我们假定输入为目录时,该目录下的所有相关后缀的文件都要被读取。
- 输出是一个文件,且其后缀和输入的后缀不必一致。
import jsonlines
import multiprocessing as mp
import os
from pathlib import Path
from tqdm import tqdmclass ParallelFileProcessor:open_functions = {'.jsonl': jsonlines.open,'.txt': open,}def __init__(self, input_path, output_file, file_suffix=None):self.input_path = Path(input_path)self.output_file = Path(output_file)self.queue = mp.Queue(maxsize=10 * 1000 * 1000)self.lock = mp.Lock()self.parallel = os.cpu_count()self.file_suffix = file_suffix if file_suffix is not None else ''self.input_files = []if self.input_path.is_dir():for item in self.input_path.glob(f"*{self.file_suffix}"):if item.is_file():self.input_files.append(item)else:assert self.input_path.is_file() and self.input_path.suffix == self.file_suffix, "Input path must be a file with the specified suffix"self.input_files.append(self.input_path)def smart_open(self, file_path, *args, **kwargs):suffix = Path(file_path).suffixopen_func = self.open_functions.get(suffix, open)if open_func == jsonlines.open and 'encoding' in kwargs:kwargs.pop('encoding')if open_func == open and 'flush' in kwargs:kwargs.pop('flush')return open_func(file_path, *args, **kwargs)def handle(self, line):raise NotImplementedErrordef read_process(self):for input_file in self.input_files:with self.smart_open(input_file, mode='r', encoding='utf-8') as r:for line in r:self.queue.put(line)for _ in range(self.parallel):self.queue.put(None)def write_process(self, process_id, fout):with tqdm(desc=mp.current_process().name, position=process_id) as pbar:while True:line = self.queue.get()if line is None:breakres = self.handle(line)with self.lock:fout.write(res)pbar.update(1)def run(self):with self.smart_open(self.output_file, mode='w', encoding='utf-8', flush=True) as fout:reader_process = mp.Process(target=self.read_process)writer_processes = [mp.Process(target=self.write_process, args=(i, fout)) for i in range(self.parallel)]reader_process.start()for process in writer_processes:process.start()reader_process.join()for process in writer_processes:process.join()
使用时只需要重写 handle
方法即可。
用户可根据自己的需求在此基础上进一步定制化这个类。