高效加载大文件(pandas+dask)

一、仅用pd加载大文件(iterator、chunksize)

要使用Pandas进行高效加载超大文件,我们通常会利用其内置的分块(chunk)处理功能。不过,请注意,Pandas本身并不支持多线程读取文件;它更倾向于单线程中进行块处理。尽管如此,对于优化加载超大文本文件这一场景,可以通过以下方式实现提速:

  1. 预先知道或估计每个数据块的行数或大小。
  2. 利用pandas.read_csv等方法的chunksize参数来迭代读取数据。
  3. 使用多进程而非多线程来并行处理每一块数据(如果确实需要并发执行),因为Python中GIL(全局解释器锁)限制了同一时刻只能有一个线程执行Python字节码。

下面是一个示例代码。请注意,在此示例中没有直接采用多进程来读取文件分片。相反,我们首先以流式模式逐步载入小数据块,并在必要时可应用某种形式的并行处理框架(如Dask)针对这些已经被逐步载入内存中DataFrame对象进行后续操作。

import pandas as pd
class EfficientTextLoader:"""采用pandas高效加载超大文本文件"""def __init__(self, filepath, chunksize=10000):"""初始化:param filepath: 文件路径:param chunksize: 每次迭代加载的行数,默认设定为10000条记录/行注意:根据你系统和硬件配置调整chunksize大小以获得最佳性能。较小值减少内存消耗但增加I/O频率;较大值则反之。"""self.filepath = filepath# Pandas 从版本0.24 开始支持 TextFileReader 属性 'chunksize'.# 当使用 read_csv 等函数与 'iterator=True' 结合时,# 设置 'chunksize' 可返回 TextFileReader 对象供迭代.self.chunk_size = chunksizedef load(self):"""按照指定chunks逐渐地、有效地装载整个文档"""reader = pd.read_csv(self.filepath,sep='\t',  # 假设是制表符分隔的TXT 文档;根据需求而定 iterator=True,header=None,chunksize=self.chunk_size)  # 逐行加载chunks = []try:while True:chunks.append(next(reader))  ## 这里可以添加额外代码对当前Chunk进行预处理或转换 ##except StopIteration:  print("Iteration is stopped.")data_concatenated=pd.concat(chunks,axis=0)   return data_concatenated

上述代码做出了几点修改与优化:

  • 1. 使用 pd.read_csv() 的 iterator=True 和 chunkSize= 参数创建一个可循环遍历所有区段(chunk) 的阅读器 (reader),从而允许手动控制流水线(pipeline) 中各部分内容何时被具体导入到RAM之中。

  • 2. 利用 Python 的异常控制结构完成对全部区段(chunk) 数据依次导入直至结束——当无更多内容可供导出(StopIteration)时跳出循环,并将所有已经在列表(chunks) 中累积起来单元结果合成(concatenate)为最终 DataFrame 对象给予输出。

备注: 而关于真正意义上“利用Panda和* 多线性 *” —— 实际情况复杂许多且易受 Python GIL 影响;通常建议替代方案比如 Dask 来达成类似需求。

普通的pandas加载多个文件,pandas加载文件不支持通配符,因此需要glob包来辅助

import pandas as pd
import glob
# 获取所有匹配路径下CSV格式文档列表 
files = glob.glob('path/to/your/files/data_*.csv')
# 循环遍历每个文档, 读取后添加至列表中 
dfs = [pd.read_csv(file) for file in files]
# 将这些 DataFrame 合并成一个 DataFrame (注意: 确保各DataFrame结构相同)
df_combined = pd.concat(dfs)

pandas 在加载3个2G大小文件时,耗时约为 80s

​​​​​​​

二、Dask + pd 加载处理大文件

使用Dask优化原有基于Pandas的代码以提高处理超大文件的能力是一个很好的选择。Dask是Python中一个流行且强大的并行计算库,它可以无缝扩展Numpy、Pandas等数据处理操作到分布式计算环境中。

下面展示如何用Dask重新实现之前讨论过加载超大文本文件功能:

import dask.dataframe as ddclass DaskTextLoader:def __init__(self, filepath, blocksize=1024 * 1024 * 128): # 默认块大小为128MB"""使用 Dask 初始化加载器。:param filepath: 要读取的文件路径。:param blocksize: 单个块(block)读入内存时占用字节大小,默认值设定为128MB。根据你系统和硬件配置调整blocksize大小以获得最佳性能,较小值将导致更多、但管理起来较易控制(内存使用上)单元任务;较大则减少任务数量但每个任务更耗时及可能引发更高内存消耗压力。注意:该参数仅针对文本数据有效,如CSV或JSON格式。如果输入其他格式(比如Parquet)DASK将自动管理最佳块划分策略而忽略此设置项。"""self.filepath = filepathself.blocksize = blocksizedef load(self):# 加载txt/csv/json... 文件并返回dask DataFrame对象.df = dd.read_csv(self.filepath, blocksize=self.blocksize)## 这里可以添加任何必要预处理步骤 ##return df

这段代码通过dd.read_csv()函数来读取文本类型数据,并允许通过blocksize参数来控制加载到内存中每个块(chunk) 的大小。这对于处理非常庞大的文件特别有用因为它允许在不完全加载整个文件到RAM情况下进行分片并行操作。

一旦得到了Dask DataFrame 对象 (df) 后即可利用类似 Pandas 的API进行各种复杂操作与运算—例如过滤(filtering), 分组(grouping), 汇总(aggregating), 转换(transformations)—只需记住结果通常也呈现异步形态;故而在需要具体结果前须调用.compute()方法触发真正执行所有累积待办事务序列链条:

result_df = df.compute()   # 触发实际执行获取Pandas DataFrame结果对象result_df = df.head(5)      # 触发实际执行获取Pandas DataFrame结果对象,只获取5条

请注意,尽管 .compute() 返回标准 Pandas DataFrame 对象包括其所含全部数据项——针对极端庞大规模集合可能会再度碰撞 内存在限制问题; 因此,在设计解决方案结构时应当谨慎试图一次性完全求解而考虑是否逐部递进或者仍然保持部分工作流程在 DASK 执行框架上面智能地选段完成具体细则需求点。

三、自定义单机/多机多线程 dask + pd 加载预处理大文件

要在单机环境中对Dask进行多进程数的控制,你可以使用dask.distributed模块创建一个本地集群,并控制其工作进程数量。通过这种方式,你能够显式地设定并发执行任务的工作线程或进程数目。

以下是如何修改上述代码来加入单机多进程控制的示例:

from dask.distributed import Client, LocalCluster
import dask.dataframe as ddclass DaskTextLoaderWithMultiprocessing:def __init__(self, filepath, blocksize=1024 * 1024 * 128, n_workers=4):"""使用 Dask 初始化加载器并设置多处理。:param filepath: 要读取的文件路径。:param blocksize: 单个块(block)读入内存时占用字节大小,默认值设定为128MB。根据系统和硬件配置调整blocksize大小以获得最佳性能,较小值将导致更高I/O频率但容易管理(内存使用上);较大则减少任务数量但每个任务更耗时及可能引发更高内存消耗压力。注意:该参数仅针对文本数据有效,如CSV或JSON格式。如果输入其他格式(比如Parquet)DASK将自动管理最佳块划分策略而忽略此设置项。:param n_workers: 并行工作线程/进程数,默认为4.增加此数字可并行执行更多操作,但也会增加系统资源消耗。"""self.filepath = filepathself.blocksize = blocksize# 创建本地DASK集群  cluster = LocalCluster(n_workers=n_workers)self.client = Client(cluster)def load(self):# 加载txt/csv/json... 文件并返回dask DataFrame对象.df = dd.read_csv(self.filepath, header=None,    # 是否使用头sep='\t', # csv 分隔符blocksize=self.blocksize)## 这里可以添加任何必要预处理步骤 ##return df def close_cluster(self):# 关闭client和cluster self.client.close()

在这段代码中,我们首先创建了一个LocalCluster实例,并通过参数n_workers=n_worksers,指明了我们想要在集群中启用的工作者(Worker)数量即实际运行计算操作所使用到核心/线程序列总量。紧接着利用该cluster构造出 a Client, 其扮演着用户与集群之间交互接口角色方便提交相关计算任务请求等功能使命完结后续各类数据操作需求点。

请注意,在完成所有需要做的计算之后调用.close()方法关闭客户端(Clients)与服务端(Cluster),释放相关资源非常重要;特别情况下如果忘记手动关闭可能会导致程序未正常结束情形下挂起保持运行状态占据宝贵资源直至外部干预才得以解决问题。

举个完整的例子来执行该代码

首先,假设已经有了一个CSV文件example.csv,该文件内容大致如下:

name,age,city

Alice,34,Berlin

Bob,23,London

Charlie,45,New York

现在目标是使用上面定义的Dask处理类来读取这个CSV文件,并计算年龄列(age)的平均值。 
代码示例如下:

# 使用定义好的加载器来读取数据,然后多线程处理数据。
loader = DaskTextLoaderWithMultiprocessing('example.csv', n_workers=2)   ## 假设您希望用2个工作进程运行df_dask = loader.load()  ## 调用load方法得到dask DataFrame对象 average_age_computed_future=df_dask.age.mean()   ## 计算年龄平均值操作 (延迟执行) average_age_result=average_age_computed_future.compute()  ### 触发实际执行并获取结果(阻塞直到完成)
print(f"The average age is: {average_age_result}")loader.close_cluster() ### 记住关闭集群释放资源!

以上代码段展示了从头至尾创建一个可以控制单机多进程数 的 DASK Text Loader, 然后利用它去异步地读取一个CSV格式文本数据、计算某特定数值列(此处为“age”年龄字段)内所有元素平均值,并最终输出该统计结果。

 使用 dask 加载多个文件

使用 Dask 加载多个文件假设你有一系列以相同模式命名(例如data_*.csv)的CSV文件想要加载:import dask.dataframe as dd# 用通配符 '*' 加载匹配到的所有 CSV 文件
df = dd.read_csv('path/to/your/files/data_*.csv')

下面是dask + pd 加载3个2g的文件,耗时约37s,n_workers指定越多,文件大小和文件数量越大,差距拉的就越大 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/727718.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

千云GPS平台 -在k8s上部署Mysql

构建xtrabackup docker build -t registry.cn-zhangjiakou.aliyuncs.com/qy566/xtrabackup:8.0.34 --rm .部署进度 你可以通过运行以下命令查看启动进度: kubectl get pods -l app=mysql -n mysql-db --watch 测试mysql 发送客户端请求写入数据kubectl run mysql-client --…

解决cs不能生成Linux木马的问题

要解决的问题:众所周知,msf上面的shell或者是其他的shell想反弹给cs默认情况下是只支持windows的,因为cs的监听模块默认没有linux的,但是有些主机就是用linux搭建的,这可怎么办呢。就要用到一个插件CrossC2。 下载插件…

实操keepalived(高可用)+Nginx(四层代理+七层代理),实现高可用、负载均衡以及动静分离

一 vrrp技术 VRRP 相关术语 VRRP能够在不改变组网的情况下,将多台路由器虚拟成一个虚拟路由器,i通过配置虚拟路由器的IP地址为默认网关,实现网关的备份。 协议版本: VRRPv2 (常用) 和VRRPv3:0 VRRPv2仅适用于IPv4网络,VRRPv3适用…

[项目设计] 从零实现的高并发内存池(五)

🌈 博客个人主页:Chris在Coding 🎥 本文所属专栏:[高并发内存池] ❤️ 前置学习专栏:[Linux学习] ⏰ 我们仍在旅途 ​ 目录 8 使用定长内存池脱离new 9. 释放对象时不传大小 10.性能优化 10.1…

[LeetCode][239]【学习日记】滑动窗口最大值——O(n)单调队列

题目 239. 滑动窗口最大值 难度:困难相关标签相关企业提示 给你一个整数数组 nums,有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。 返回滑动窗口中的最大值。 示例 1…

雷赛控制卡的扩展IO连接

雷赛控制卡的扩展IO点无法控制问题处理 现象 因设备的上IO点较多,所以使用了多个雷赛32点位的IO扩展卡。上位机程序在控制输出IO时发现主模块IO和第一个扩展IO的输出可以控制。但第二个IO扩展卡和第三个IO扩展卡的输出控制不了。经排查出发现轴卡在初始化时只连接了…

校园小情书微信小程序,社区小程序前后端开源,校园表白墙交友小程序

功能 表白墙卖舍友步数旅行步数排行榜情侣脸漫画脸个人主页私信站内消息今日话题评论点赞收藏 效果图

2024年腾讯云学生服务器优惠活动「云+校园」政策解读

2024年腾讯云学生服务器优惠活动「云校园」,学生服务器优惠价格:轻量应用服务器2核2G学生价30元3个月、58元6个月、112元一年,轻量应用服务器4核8G配置191.1元3个月、352.8元6个月、646.8元一年,CVM云服务器2核4G配置842.4元一年&…

几何工具的使用

Geometry - Creation 创建几何 CogCreateCircleTool:创建圆CogCreateEllipseTool:创建椭圆CogCreateLineBisectPointsTool:带有两个点的平行线CogCreateLineParallelTool:在某一点创建某条线的平行线CogCreateLinePerpendicularTool:在某一点创建某条线…

LT6813/ADBMS1818底层驱动---均衡控制

1、LT6813采用内部均衡的原理 2、平衡控制结构体 根据数据库中读取的控制值设置平衡。要为单元设置平衡,必须将相应的位写入配置寄存器中。LTC 驱动程序仅执行数据库中 BMS 写入的数据。 参数 ltc_stateLTC 状态机的状态pSpi接口指向 SPI 配置的指针pTxBuff &…

数据结构(八)——初识单链表

😀前言 单链表是数据结构中最基本的一种链表结构,它由一系列节点组成,每个节点包含数据和指向下一个节点的指针。单链表具有灵活性和动态性,可以根据需要插入、删除和查找元素,适用于各种场景和问题的解决。 在本篇文章…

day7-网络编程

1>基于UDP的网络聊天室 Ser.c #include <myhead.h> #define SER_IP "10.211.55.9" // 服务器IP #define SER_PORT 9999struct user {char usrName[20];struct sockaddr_in cin; }; int main(int argc, char const *argv[]) {// 1.创建用于监听的套接字int…

MyBatis-Flex学习总结

写在前面的话 MyBatis-Flex 是一个优雅的 MyBatis 增强框架&#xff0c;它非常轻量、同时拥有极高的性能与灵活性。我们可以轻松的使用 Mybaits-Flex 链接任何数据库&#xff0c;其内置的 QueryWrapper 帮助我们极大的减少了 SQL 编写的工作的同时&#xff0c;减少出错的可能性…

鸿蒙开发岗成春招最大黑马,“金三银四”应届生如何突围?

一年一度春招时间到&#xff0c;技术岗位已成为众多人才竞相追求的“职业高地”&#xff0c;也是未来职业发展的重要方向之一。鸿蒙人才在春招市场上成为“香饽饽”&#xff0c;与往年不同的是&#xff0c;许多应届生放弃考公执念向程序员进攻&#xff0c;这一现象背后蕴含着深…

腾讯云服务器和阿里云服务器价格测评_2024年费用大PK

2024年阿里云服务器和腾讯云服务器价格战已经打响&#xff0c;阿里云服务器优惠61元一年起&#xff0c;腾讯云服务器61元一年&#xff0c;2核2G3M、2核4G、4核8G、4核16G、8核16G、16核32G、16核64G等配置价格对比&#xff0c;阿腾云atengyun.com整理阿里云和腾讯云服务器详细配…

day52(vueJS)json-server模拟数据

json-server介绍&#xff1a;&#xff1a;&#xff1a;JSON Server 是一个用于快速搭建 REST API 的工具&#xff0c;它可以帮助我们在开发过程中快速模拟 一个后端 API 服务器&#xff0c;方便前端开发人员进行接口调试和开发。使用 JSON Server&#xff0c;你可以通过创建一个…

第三百八十七回

文章目录 1. 概念介绍2. 使用方法3. 示例代码 我们在上一章回中介绍了DateRangePickerDialog Widget相关的内容,本章回中将介绍Radio Widget.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在这里说的Radio Widget是指单选按钮&#xff0c;没有选中时是圆形…

代码随想录算法训练营第二天|977、有序数组的平方

977. 有序数组的平方 已解答 简单 相关标签 相关企业 给你一个按 非递减顺序 排序的整数数组 nums&#xff0c;返回 每个数字的平方 组成的新数组&#xff0c;要求也按 非递减顺序 排序。 示例 1&#xff1a; 输入&#xff1a;nums [-4,-1,0,3,10] 输出&#xff1a;[0,1,9,16,…

Linux设备模型(七) - Netlink

一&#xff0c;什么是netlink通信机制 Netlink套接字是用以实现用户进程与内核进程通信的一种特殊的进程间通信(IPC) ,也是网络应用程序与内核通信的最常用的接口。Netlink 是一种特殊的 socket&#xff0c;它是 Linux 所特有的。 Netlink 是一种在内核与用户应用间进行双向数…

我的创作周年纪念日

机缘 最初成为创作者的初心&#xff1a;整理自己的知识体系&#xff0c;普及前端知识 实战项目中的经验分享日常工作学习过程中的记录通过文章进行技术交流归纳和整理自己的知识体系 收获 创作的过程中收获&#xff1a; 获得了909粉丝的关注获得了很多正向的反馈&#xff0c…