Python异步编程:使用`asyncio`和`aiofiles`进行高效的文件批量写入

Python异步编程:使用`asyncio`和`aiofiles`进行高效的文件批量写入

    • 1. 异步编程基础
      • 1.1 `asyncio`和`await`
      • 1.2 `aiofiles`
    • 2. 异步文件批量写入示例
      • 2.1 代码结构
      • 2.2 代码实现
      • 2.3 代码解释
        • 2.3.1 `BatchWriter`类
        • 2.3.2 `main`函数
    • 3. 其他示例代码
      • 3.1 简单的异步文件写入
      • 3.2 异步文件读取
      • 3.3 使用`asyncio.Lock`保护共享资源
      • 3.4 使用`asyncio.wait_for`设置超时
    • 4. 总结

在现代编程中,异步编程已经成为提高程序性能的重要手段之一。特别是在处理I/O密集型任务时,异步编程可以显著提高程序的效率。Python的asyncio库和第三方库aiofiles为我们提供了强大的工具,使得异步文件操作变得简单而高效。

本文将通过一个具体的示例,介绍如何使用asyncioaiofiles进行异步文件批量写入,并详细解释其中的关键技术点。

1. 异步编程基础

1.1 asyncioawait

asyncio是Python的标准库,用于编写异步代码。async关键字用于定义一个异步函数,而await关键字用于等待一个协程(coroutine)完成。

import asyncioasync def hello_world():print("Hello")await asyncio.sleep(1)print("World")asyncio.run(hello_world())

1.2 aiofiles

aiofiles是一个第三方库,提供了异步文件操作的功能。通过aiofiles.open可以异步打开文件,并通过await f.write进行异步写入。

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():await write_to_file('example.txt', 'Hello, World!')print("Data written to file")asyncio.run(main())

2. 异步文件批量写入示例

2.1 代码结构

我们将实现一个BatchWriter类,用于批量写入数据到文件中。该类的主要功能包括:

  • 将数据添加到缓冲区。
  • 当缓冲区达到一定大小时,将缓冲区中的数据批量写入文件。
  • 使用asyncio.Lock保护共享资源,避免竞态条件。
  • 使用asyncio.wait_for设置写入操作的超时时间。

2.2 代码实现

import asyncio
import json
import time
from collections import deque
import aiofilesclass BatchWriter:def __init__(self, filename: str, batch_size: int = 100):self.filename = filenameself.batch_size = batch_sizeself.buffer = deque()self.lock = asyncio.Lock()async def add(self, data: dict):print(f"Adding data: {data}")async with self.lock:print("Lock acquired in add")self.buffer.append(data)print(f"Buffer size after adding: {len(self.buffer)}")should_flush = len(self.buffer) >= self.batch_sizeprint("Releasing lock in add")if should_flush:print("Buffer size exceeded batch_size, calling flush")await self.flush()async def flush(self):print("Starting flush")buffer_to_write = []async with self.lock:if not self.buffer:print("Buffer is empty, exiting flush")return# 将缓冲区内容复制到临时列表并清空缓冲区buffer_to_write = list(self.buffer)self.buffer.clear()print(f"Flushing buffer of size: {len(buffer_to_write)}")# 在锁外执行文件写入if buffer_to_write:try:print(f"Writing {len(buffer_to_write)} items to file")await asyncio.wait_for(self._write_to_file(buffer_to_write), timeout=10)print("Finished writing to file")except asyncio.TimeoutError:print("Timeout while writing to file")# 写入失败时,将数据放回缓冲区async with self.lock:self.buffer.extendleft(reversed(buffer_to_write))except Exception as e:print(f"Error writing to file: {e}")# 写入失败时,将数据放回缓冲区async with self.lock:self.buffer.extendleft(reversed(buffer_to_write))print("Flush complete")async def _write_to_file(self, buffer_to_write):async with aiofiles.open(self.filename, 'a') as f:for data in buffer_to_write:await f.write(json.dumps(data, ensure_ascii=False) + '\n')async def main():start_time = time.time()writer = BatchWriter(filename="output.jsonl", batch_size=5)async def generate_data():for i in range(10):data = {"id": i, "value": f"data_{i}"}print(f"Generating data: {data}")await writer.add(data)await asyncio.sleep(0.1)print("Starting data generation")await generate_data()print("Finished data generation")print("Calling final flush")await writer.flush()  # 最终刷新缓冲区end_time = time.time()print(f"Total time: {end_time - start_time} seconds")if __name__ == "__main__":asyncio.run(main())

2.3 代码解释

2.3.1 BatchWriter
  • __init__方法:初始化文件名、批量大小、缓冲区和锁。
  • add方法:将数据添加到缓冲区,并在缓冲区达到批量大小时调用flush方法。
  • flush方法:将缓冲区中的数据批量写入文件。如果写入失败(如超时),将数据重新放回缓冲区。
  • _write_to_file方法:异步写入数据到文件。
2.3.2 main函数
  • generate_data协程:生成数据并调用add方法将数据添加到缓冲区。
  • main函数:启动数据生成,并在数据生成完成后调用flush方法进行最终的缓冲区刷新。

3. 其他示例代码

3.1 简单的异步文件写入

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():await write_to_file('example.txt', 'Hello, World!')print("Data written to file")asyncio.run(main())

3.2 异步文件读取

import asyncio
import aiofilesasync def read_from_file(filename):async with aiofiles.open(filename, 'r') as f:content = await f.read()return contentasync def main():content = await read_from_file('example.txt')print(f"File content: {content}")asyncio.run(main())

3.3 使用asyncio.Lock保护共享资源

import asyncio
import aiofilesclass FileWriter:def __init__(self, filename):self.filename = filenameself.lock = asyncio.Lock()async def write(self, content):async with self.lock:async with aiofiles.open(self.filename, 'a') as f:await f.write(content + '\n')async def main():writer = FileWriter('example.txt')async def write_data(data):await writer.write(data)await asyncio.gather(write_data('Data 1'),write_data('Data 2'),write_data('Data 3'))print("All data written to file")asyncio.run(main())

3.4 使用asyncio.wait_for设置超时

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():try:await asyncio.wait_for(write_to_file('example.txt', 'Hello, World!'), timeout=1)print("Data written to file")except asyncio.TimeoutError:print("Operation timed out")asyncio.run(main())

4. 总结

通过本文的介绍和示例代码,我们了解了如何使用asyncioaiofiles进行异步文件操作。异步编程可以显著提高I/O密集型任务的效率,特别是在处理大量并发任务时。使用asyncio.Lock可以保护共享资源,避免竞态条件。使用asyncio.wait_for可以设置操作的超时时间,防止无限期等待。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/57545.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

婚纱相册必须去摄影店吗?其实自己会拍照就能实现,性价比更高

一直以来,婚纱照都是新人们婚礼筹备中不可或缺的部分。然而,高昂的摄影店价格让不少新人望而却步。其实,只要掌握一些拍照技巧,自己在家就能制作出独一无二的婚纱相册,不仅性价比超高,还能留下更多珍贵的回…

Android 中的串口开发

一:背景 本文着重讲安卓下的串口。 由于开源的Android在各种智能设备上的使用越来越多,如车载系统等。在我们的认识中,Android OS的物理接口一般只有usb host接口和耳机接口,但其实安卓支持各种各样的工业接口,如HDM…

条码检测系统——基于MATLAB的一维条码识别

摘 要:条码技术是如今应用最广泛的识别和输入技术之一,由于其包含的信息量大,识别错误率低而在各个方面得到很大的重视。它发展迅速并被广泛应用于于工业、商业、图书出版、医疗卫生等各行各业。由我国目前发展现状来看,条码的正…

攻坚金融关键业务系统,OceanBase亮相2024金融科技大会

10月15-16日,第六届中新数字金融应用博览会与2024金融科技大会(简称“金博会”)在苏州工业园区联合举办。此次大会融合了国家级重要金融科技资源——“中国金融科技大会”,围绕“赋能金融高质量发展,金融科技创新前行”…

【C++指南】运算符重载详解

引言 C 提供了运算符重载这一特性,允许程序员为自定义类型(如类和结构体)定义运算符的行为。 通过运算符重载,可以使自定义类型对象像内置类型一样使用运算符,从而提高代码的可读性和易用性。 本文将详细介绍 C 中运算…

【状态机DP】力扣2786. 访问数组中的位置使分数最大

给你一个下标从 0 开始的整数数组 nums 和一个正整数 x 。 你 一开始 在数组的位置 0 处&#xff0c;你可以按照下述规则访问数组中的其他位置&#xff1a; 如果你当前在位置 i &#xff0c;那么你可以移动到满足 i < j 的 任意 位置 j 。 对于你访问的位置 i &#xff0c…

若依微服务15 - RuoYi-Vue3 实现前端独立运行

正文开始&#xff1a; RuoYi-Vue3 使用 Vue3 Element Plus Vite 技术栈。 GitHub 开源地址&#xff1a;https://github.com/yangzongzhuan/RuoYi-Vue3 本文介绍使用若依提供的在线后端接口&#xff0c;仅启动前端项目并进行界面开发&#xff0c;而无需启动后端服务。 一、克隆…

AI视听新体验!浙大阿里提出视频到音乐生成模型MuVi:可解决语义对齐和节奏同步问题

MuVi旨在解决视频到音乐生成(V2M)中的语义对齐和节奏同步问题。 MuVi通过专门设计的视觉适配器分析视频内容,以提取上下文 和时间相关的特征,这些特征用于生成与视频的情感、主题及其节奏和节拍相匹配的音乐。MuVi在音频质量和时间同步方面表现优于现有基线方法,并展示了其在风…

C++:模板的特化与分离编译

之前我们在介绍模板的时候仅仅是简单的介绍了模板的用法&#xff0c;本篇文章我们来详细的介绍下模板中比较重要的几个点。 一&#xff0c;非类型模板参数 我们之前的c中&#xff0c;会将经常使用的而又确保在我们程序的运行过程中值不会改变的值进行#define&#xff1a; #d…

初入编程之路,启航代码海

#1024程序员节|征文# 前言 今天又是1024程序员节了&#xff0c;第一次听说这个节日是在我在23年刚刚上大一的时候听学长他们说的&#xff0c;如今已经是24年了&#xff0c;虽然只学习了一年的编程但我已经了解到了这条路上的不易。希望能够在这条路上面一路坚持下去&#xff0…

力扣_斐波那契数列

本题目本质和爬楼梯是一样的&#xff0c;主要运用的是递归来解题。 class Solution:my_dict {}def fib(self, n: int) -> int:if self.my_dict.get(n) is not None: # 先判断有没有计算过这个值return self.my_dict.get(n)tempResult 0if n > 2:tempResult self.fib…

075_基于springboot的万里学院摄影社团管理系统

目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍&#xff1a;CodeMentor毕业设计领航者、全网关注者30W群落&#xff0c;InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者&#xff0c;博客领航之星、开发者头条/腾讯云/AW…

MySql中使用findInSet和collection实践

FIND_IN_SET 需求如下&#xff1a;有张用户表&#xff0c;表里有个字段叫school&#xff0c;意为这个用户上过哪些学校&#xff0c;数据库里存的就是字符串类型&#xff0c;存的值类似"2,5,12"&#xff0c;要求就是查询出上过id为2的学校有哪些用户 解决方法&#x…

【JAVA毕设】基于JAVA的酒店管理系统

一、项目介绍 本系统前端框架采用了比较流行的渐进式JavaScript框架Vue.js。使用Vue-Router实现动态路由&#xff0c;Ajax实现前后端通信&#xff0c;Element-plus组件库使页面快速成型。后端部分&#xff1a;采用SpringBoot作为开发框架&#xff0c;同时集成MyBatis、Redis、…

qt生成uuid,转成int。ai回答亲测可以

// 生成一个随机的UUID QUuid uuid QUuid::createUuid(); // 将UUID转换为字符串 QString uuidStr uuid.toString(QUuid::WithoutBraces);// 计算MD5哈希值 QByteArray hash QCryptographicHash::hash(uuidStr.toUtf8(), QCryptographicHash::Md5);// 提取前8个字节并转换为…

云曦10月13日awd复现

一、防御 1、改用户密码 passwd <user> 2、改数据库密码 进入数据库 mysql -uroot -proot 改密码 update mysql.user set passwordpassword(新密码) where userroot; 查看用户信息密码 select host,user,password from mysql.user; 改配置文件&#xff0c;将密码改为自己…

电脑技巧:Rufus——最佳USB启动盘制作工具指南

目录 一、功能强大&#xff0c;兼容性广泛 二、界面友好&#xff0c;操作简便 三、快速高效&#xff0c;高度可定制 四、安全可靠&#xff0c;社区活跃 在日常的电脑使用中&#xff0c;无论是为了安装操作系统、修复系统故障还是进行其他需要可引导媒体的任务&#xff0c;拥…

使用 Python结合随机User-Agent与代理池进行网络请求

1. 引言 在爬虫开发过程中&#xff0c;为了模拟真实的用户行为&#xff0c;避免被目标网站识别并封锁&#xff0c;通常需要使用随机的User-Agent以及代理IP来发送网络请求。本文将介绍如何通过Python实现这一功能&#xff0c;包括设置随机User-Agent、读取代理列表&#xff0c…

web网页

HTML代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>wyy</title><!-- 引…

VSCODE c++不能自动补全的问题

最近安装了vscode&#xff0c;配置了C/C扩展&#xff0c;也按照网上说的配置了头文件路径 我发现有部分头文件是没办法解析的&#xff0c;只要包含这些头文件中的一个或者多个&#xff0c;就没有代码高亮和代码自动补全了&#xff0c;确定路径配置是没问题的&#xff0c;因为鼠…