一种简易的多进程文件读写器

目录

  • 1. 前言
  • 2. 初步实现
  • 3. ParallelFileProcessor

1. 前言

在数据清洗场景下,我们可能需要对一个 .jsonl 文件清洗以得到另一个 .jsonl 文件。一种直观的做法就是逐行读取,逐行清洗,然后逐行写入,这一流程的示意图如下:

相应的代码为:

import jsonlinesdef clean(json_line: dict) -> dict:passinput_jsonl = ''
output_jsonl = ''with jsonlines.open(output_jsonl, mode='w', flush=True) as w:with jsonlines.open(input_jsonl, mode='r') as r:for line in r:line = clean(line)w.write(line)

这种单进程的处理方法非常耗时,若改用多进程,则能显著提高效率。

注意到在时间消耗方面,有

Clean ≫ Read ≈ Write \text{Clean}\gg\text{Read}\approx\text{Write} CleanReadWrite

因此可以考虑让一个进程去 Read \text{Read} Read,多个进程去 Clean & Write \text{Clean}\,\&\text{Write} Clean&Write

2. 初步实现

import os
import jsonlines
import multiprocessing as mp
from tqdm import tqdmdef read_process(input_jsonl, queue, parallel):with jsonlines.open(input_jsonl, mode='r') as reader:for line in reader:queue.put(line)for _ in range(parallel):queue.put(None)def clean(json_line):pass  # Implement your cleaning logic heredef write_process(process_id, queue, fout, lock):with tqdm(desc=mp.current_process().name, position=process_id) as pbar:while True:line = queue.get()if line is None:breakres = clean(line)with lock:fout.write(res)pbar.update(1)if __name__ == "__main__":input_jsonl_path = ''output_jsonl_path = ''fout = jsonlines.open(output_jsonl_path, mode='w', flush=True)queue = mp.Queue(maxsize=10 * 1000 * 1000)lock = mp.Lock()parallel = os.cpu_count()reader_process = mp.Process(target=read_process, args=(input_jsonl_path, queue, parallel))writer_processes = [mp.Process(target=write_process, args=(i, queue, fout, lock))for i in range(parallel)]reader_process.start()for process in writer_processes:process.start()reader_process.join()for process in writer_processes:process.join()fout.close()

在这段代码中,reader_process 用于从 input_jsonl_path 中逐步读取每一行并放到通信队列中。writer_processes 用于从通信队列中并行地读取数据然后进行清洗并写入到 fout

3. ParallelFileProcessor

为了提高代码的可复用性,我们将上述过程封装成了一个类,假设它被设计来适应以下场景:

  • 输入不一定是 jsonl 文件,还可能是其他后缀的文件。
  • 输入也可能是一个目录,我们假定输入为目录时,该目录下的所有相关后缀的文件都要被读取。
  • 输出是一个文件,且其后缀和输入的后缀不必一致。
import jsonlines
import multiprocessing as mp
import os
from pathlib import Path
from tqdm import tqdmclass ParallelFileProcessor:open_functions = {'.jsonl': jsonlines.open,'.txt': open,}def __init__(self, input_path, output_file, file_suffix=None):self.input_path = Path(input_path)self.output_file = Path(output_file)self.queue = mp.Queue(maxsize=10 * 1000 * 1000)self.lock = mp.Lock()self.parallel = os.cpu_count()self.file_suffix = file_suffix if file_suffix is not None else ''self.input_files = []if self.input_path.is_dir():for item in self.input_path.glob(f"*{self.file_suffix}"):if item.is_file():self.input_files.append(item)else:assert self.input_path.is_file() and self.input_path.suffix == self.file_suffix, "Input path must be a file with the specified suffix"self.input_files.append(self.input_path)def smart_open(self, file_path, *args, **kwargs):suffix = Path(file_path).suffixopen_func = self.open_functions.get(suffix, open)if open_func == jsonlines.open and 'encoding' in kwargs:kwargs.pop('encoding')if open_func == open and 'flush' in kwargs:kwargs.pop('flush')return open_func(file_path, *args, **kwargs)def handle(self, line):raise NotImplementedErrordef read_process(self):for input_file in self.input_files:with self.smart_open(input_file, mode='r', encoding='utf-8') as r:for line in r:self.queue.put(line)for _ in range(self.parallel):self.queue.put(None)def write_process(self, process_id, fout):with tqdm(desc=mp.current_process().name, position=process_id) as pbar:while True:line = self.queue.get()if line is None:breakres = self.handle(line)with self.lock:fout.write(res)pbar.update(1)def run(self):with self.smart_open(self.output_file, mode='w', encoding='utf-8', flush=True) as fout:reader_process = mp.Process(target=self.read_process)writer_processes = [mp.Process(target=self.write_process, args=(i, fout)) for i in range(self.parallel)]reader_process.start()for process in writer_processes:process.start()reader_process.join()for process in writer_processes:process.join()

使用时只需要重写 handle 方法即可。

用户可根据自己的需求在此基础上进一步定制化这个类。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/702719.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【wails】(6):使用wails做桌面应用开发,使用gin+go-chatglm.cpp进行本地模型运行,在windows上运行成功

1,整体架构说明 主要使用,参考的开源项目是: https://github.com/wailsapp/wails 前端项目: https://github.com/Chanzhaoyu/chatgpt-web 运行模型: https://github.com/Weaxs/go-chatglm.cpp 参考代码: h…

深度神经网络中的计算和内存带宽

深度神经网络中的计算和内存带宽 文章目录 深度神经网络中的计算和内存带宽来源原理介绍分析1:线性层分析2:卷积层分析3:循环层总结 来源 相关知识来源于这里。 原理介绍 Memory bandwidth and data re-use in deep neural network computat…

五.AV Foundation 视频播放 - 标题和字幕

引言 本篇博客主要介绍使用AV Foundation加载视频资源的时候,如何获取视频标题,获取字幕并让其显示到播放界面。 设置标题 资源标题的元数据内容,我们需要从资源的commonMetadata中获取,在加载AVPlayerItem的时候我们已经指定了…

Sentinel微服务流量治理组件实战上

目录 分布式系统遇到的问题 解决方案 Sentinel 是什么? Sentinel 工作原理 Sentinel 功能和设计理念 流量控制 熔断降级 Sentinel工作主流程 Sentinel快速开始 Sentinel资源保护的方式 基于API实现 SentinelResource注解实现 Spring Cloud Alibaba整合…

介绍 PIL+IPython.display+mtcnn for 音视频读取、标注

1. nn.NLLLoss是如何计算误差的? nn.NLLLoss是负对数似然损失函数,用于多分类问题中。它的计算方式如下:首先,对于每个样本,我们需要将其预测结果通过softmax函数转换为概率分布。softmax函数可以将一个向量映射为一个概率分布&…

第四节:Vben Admin登录对接后端getUserInfo接口

系列文章目录 第一节:Vben Admin介绍和初次运行 第二节:Vben Admin 登录逻辑梳理和对接后端准备 第三节:Vben Admin登录对接后端login接口 第四节:Vben Admin登录对接后端getUserInfo接口 文章目录 系列文章目录前言一、回顾Vben…

RK3568平台 阻塞IO和非阻塞IO

一.IO 模型的分类 IO 模型根据实现的功能可以划分为为阻塞 IO、非阻塞 IO、信号驱动IO,IO多路复用和异步 IO。根据等待 IO 的执行结果进行划分,前四个 IO 模型又被称为同步IO. 同步IO与异步IO: 以现实生活去餐馆吃饭为例,根据菜…

Linux——缓冲区封装系统文件操作

📘北尘_:个人主页 🌎个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上,不忘来时的初心 文章目录 一、FILE二、封装系统接口实现文件操作1、text.c2、mystdio.c3、mystdio.h 一、FILE 因为IO相…

Typora结合PicGo + 使用Github搭建个人免费图床

文章目录 一、国内图床比较二、使用Github搭建图床三、PicGo整合Github图床1、下载并安装PicGo2、设置图床3、整合jsDelivr具体配置介绍 4、测试5、附录 四、Typora整合PicGo实现自动上传 每次写博客时,我都会习惯在Typora写好,然后再复制粘贴到对应的网…

基于springboot+vue的校园社团信息管理系统(前后端分离)

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战,欢迎高校老师\讲师\同行交流合作 ​主要内容:毕业设计(Javaweb项目|小程序|Pyt…

自定义搭建管理系统

最近使用自己搭建的脚手架写了一个简易管理系统,使用webpackreactantd,搭建脚手架参考: 使用Webpack5搭建项目(react篇)_babel-preset-react-app-CSDN博客 搭建的思路: 1. 基建布局,使用antd的…

代码随想录算法训练营第二十五天 | 216.组合总和III,17.电话号码的字母组合 [回溯篇]

代码随想录算法训练营第二十五天 LeetCode 216.组合总和III题目描述思路参考代码总结 LeetCode 17.电话号码的字母组合题目描述思路参考代码 LeetCode 216.组合总和III 题目链接:216.组合总和III 文章讲解:代码随想录#216.组合总和III 视频讲解&#xff…

Java零基础 - 字符串连接运算符

哈喽,各位小伙伴们,你们好呀,我是喵手。 今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。 我是一名后…

acwing算法学习笔记 ------ 双链表

1、定义 这里可以做一个投机取巧,我们不再像单链表去用head去存头和尾,直接让r[0] 1,l[1] 0; idx 2.进行初始化, 解释一下l[N] 和 r[N] l[N]:是表示指向左面下一个节点下标, r[N]:表示指向下一个节点的下标。大家不用担心i…

皓学IT:WEB03_MySQL

今日内容介绍 Mysql数据库 SQL语句 一、数据库 1.1. 数据库概述 什么是数据库 数据库就是存储数据的仓库,其本质是一个文件系统,数据按照特定的格式将数据存储起来,用户可以对数据库中的数据进行增加,修改,删除及…

南京观海微电子---AXI总线技术简介——ZYNQ PS和PL的互联技术

1.AXI总线介绍 AXI全称Advanced Extensible Interface,是Xilinx从6系列的FPGA开始引入的一个接口协议,主要描述了主设备和从设备之间的数据传输方式。AXI协议在Xilinx的ZYNQ系列芯片中继续使用,协议版本是AXI4。 ZYNQ为Xilinx推出的首款将高…

解决vulhub漏洞环境下载慢卡死问题即解决docker-valhub漏洞环境下载慢的问题

解决vulhub环境下载慢/卡 当前环境为:ubuntu20 1.在 cd /etc/docker/目录下创建或修改daemon.json文件 sudo touch daemon.json编辑daemon.json文件 sudo vim daemon.json2.填写阿里云镜像地址: { "registry-mirrors":["https://6kx…

11-pytorch-使用自己的数据集测试

b站小土堆pytorch教程学习笔记 import torch import torchvision from PIL import Image from torch import nnimg_path ../imgs/dog.png imageImage.open(img_path) print(image) # imageimage.convert(RGB)transformtorchvision.transforms.Compose([torchvision.transforms.…

运维SRE-15 自动化批量管理-ansible1

## 1.什么是自动化批量管理重复性工作与内容: 思考如何自动化完成. 部署环境,批量查看信息,批量检查:自动化 一般步骤:1.如何手动实现2.如何自动化管理工具,批量实现3.注意事项:想要自动化一定要先标准化(所有环境,软件,目录一致)…

复制策略深入探讨

在之前的博客中,我们讨论了复制最佳实践和不同类型的复制,例如批量、站点和存储桶。但是,随着所有这些不同类型的复制类型的出现,人们不得不想知道在哪里使用哪种复制策略?从现有 S3 兼容数据存储迁移数据时&#xff0…