python 多进程与多线程配合拷贝文件目录

版本一:使用shutil进行拷贝

  1 # -*- coding: utf-8 -*-
  2 # @author: Tele
  3 # @Time  : 2019/04/02 下午 3:09
  4 # 待改进:
  5 # 1.拷贝逻辑使用原生的io
  6 # 2.针对大文件在进程内部实现多线程方式进行拷贝
  7 
  8 
  9 import time
 10 import re
 11 import os
 12 import shutil
 13 import multiprocessing
 14 
 15 
 16 # 遍历文件夹
 17 def walk_file(file):
 18     file_list = list()
 19     for root, dirs, files in os.walk(file):
 20         # 遍历文件
 21         for f in files:
 22             file_list.append(f)
 23     return file_list
 24 
 25 
 26 # 计算文件数量
 27 def get_file_count(dir):
 28     return len(walk_file(dir))
 29 
 30 
 31 def copy(src, target, queue):
 32     target_number = 1
 33     if os.path.isdir(src):
 34         target_number = get_file_count(src)
 35         shutil.copytree(src, target)
 36     else:
 37         shutil.copyfile(src, target)
 38     # 将拷贝完成的文件数量放入队列中
 39     queue.put(target_number)
 40 
 41 
 42 def copy_dir(src, desc):
 43     total_number = get_file_count(src)
 44     # 分隔符检测
 45     src = check_separator(src)
 46     desc = check_separator(desc)
 47     # print("src:",src)
 48     # print("desc:",desc)
 49 
 50     file_dir_list = [src + "/" + i for i in os.listdir(src)]
 51     if os.path.exists(desc):
 52         shutil.rmtree(desc)
 53     pool = multiprocessing.Pool(3)
 54 
 55     # 创建队列
 56     queue = multiprocessing.Manager().Queue()
 57 
 58     # 一个文件/目录开启一个进程去拷贝
 59     for f_name in file_dir_list:
 60         target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:]
 61         # print(target)
 62         # 创建target目录
 63         parent_path = os.path.split(target)[0]
 64         if not os.path.exists(parent_path):
 65             os.makedirs(parent_path)
 66         pool.apply_async(copy, args=(f_name, target, queue,))
 67 
 68     start = time.time()
 69     pool.close()
 70     #    pool.join()
 71     count = 0
 72     while True:
 73         count += queue.get()
 74         # 格式化输出时两个%输出一个%,不换行,每次定位到行首,实现覆盖
 75         print("\r拷贝进度为 %.2f %%" % (count * 100 / total_number), end="")
 76         if count >= total_number:
 77             break
 78     end = time.time()
 79     print()
 80     print("耗时-----", (end - start), "s")
 81 
 82 
 83 # 查找指定字符出现的全部索引位置
 84 def index_list(c, s):
 85     return [i.start() for i in re.finditer(c, s)]
 86 
 87 
 88 # 检测目录结尾是否有 "/"
 89 def check_separator(path):
 90     if path.rindex("/") == len(path) - 1:
 91         return path[0:path.rindex("/")]
 92     return path
 93 
 94 
 95 def main():
 96     copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/")
 97 
 98 
 99 if __name__ == '__main__':
100     main()

这样做仍然有些小问题,对于大文件可以在进程内部采用多线程的方式,可以看到使用shutil进行拷贝时我们没有办法实现字节切割,于是有了下面的版本二

 版本二:

  1 # -*- coding: utf-8 -*-
  2 # @author: Tele
  3 # @Time  : 2019/04/02 下午 3:09
  4 # 使用多进程拷贝文件夹,对于大文件进程内部又使用了多线程进行拷贝
  5 # 使用进程池实现多进程时,使用的消息队列要使用multiprocessing.Manager().Queue()创建
  6 
  7 import time
  8 import re
  9 import os
 10 import shutil
 11 import multiprocessing
 12 import math
 13 from concurrent.futures import ThreadPoolExecutor, wait
 14 
 15 # 设置单个文件的最大值:209715200 200M
 16 MAX_SINGLE_FILE_SIZE = 209715200
 17 mutex = multiprocessing.Lock()
 18 executor = ThreadPoolExecutor(max_workers=3)
 19 
 20 
 21 # 遍历文件夹
 22 def walk_file(file):
 23     file_list = list()
 24     for root, dirs, files in os.walk(file):
 25         # 遍历文件
 26         for f in files:
 27             file_list.append(f)
 28 
 29         # 空文件夹处理
 30         for d in dirs:
 31             if len(os.listdir(os.path.join(root, d))) == 0:
 32                 file_list.append(d)
 33     return file_list
 34 
 35 
 36 # 计算文件数量
 37 def get_file_count(dir):
 38     return len(walk_file(dir))
 39 
 40 
 41 def copy(src, target, queue):
 42     target_number = 1
 43     buffer = 1024
 44     # 文件夹
 45     if os.path.isdir(src):
 46         target_number = get_file_count(src)
 47         for root, dirs, files in os.walk(src):
 48             # 遍历文件
 49             for f in files:
 50                 drive = os.path.splitdrive(target)[0]
 51                 target = drive + os.path.splitdrive(os.path.join(root, f))[1]
 52                 copy_single_file(buffer, os.path.join(root, f), target)
 53             # 空文件夹
 54             for d in dirs:
 55                 drive = os.path.splitdrive(target)[0]
 56                 target = drive + os.path.splitdrive(os.path.join(root, d))[1]
 57                 # 检查文件的层级目录
 58                 if not os.path.exists(target):
 59                     os.makedirs(target)
 60     else:
 61         copy_single_file(buffer, src, target)
 62     # 将拷贝完成的文件数量放入队列中
 63     queue.put(target_number)
 64 
 65 
 66 # 拷贝单文件
 67 def copy_single_file(buffer, src, target):
 68     file_size = os.path.getsize(src)
 69     rs = open(src, "rb")
 70 
 71     # 检查文件的层级目录
 72     parent_path = os.path.split(target)[0]
 73     if not os.path.exists(parent_path):
 74         os.makedirs(parent_path)
 75 
 76     ws = open(target, "wb")
 77     # 小文件直接读写
 78     if file_size <= MAX_SINGLE_FILE_SIZE:
 79         while True:
 80             content = rs.read(buffer)
 81             ws.write(content)
 82             if len(content) == 0:
 83                 break
 84         ws.flush()
 85     else:
 86         # 设置每个线程拷贝的字节数 50M
 87         PER_THREAD_SIZE = 52428800
 88         # 构造参数并执行
 89         task_list = list()
 90         for i in range(math.ceil(file_size / PER_THREAD_SIZE)):
 91             byte_size = PER_THREAD_SIZE
 92             # 最后一个线程拷贝的字节数应该是取模
 93             if i == math.ceil(file_size / PER_THREAD_SIZE) - 1:
 94                 byte_size = file_size % PER_THREAD_SIZE
 95             start = i * PER_THREAD_SIZE + i
 96             t = executor.submit(copy_file_thread, start, byte_size, rs, ws)
 97             task_list.append(t)
 98         wait(task_list)
 99     if rs:
100         rs.close()
101     if ws:
102         ws.close()
103 
104 
105 # 多线程拷贝
106 def copy_file_thread(start, byte_size, rs, ws):
107     mutex.acquire()
108     buffer = 1024
109     count = 0
110     rs.seek(start)
111     ws.seek(start)
112     while True:
113         if count + buffer <= byte_size:
114             content = rs.read(buffer)
115             count += len(content)
116             write(content, ws)
117         else:
118             content = rs.read(byte_size % buffer)
119             count += len(content)
120             write(content, ws)
121             break
122     # global total_count
123     # total_count += byte_size
124     # print("\r拷贝进度为%.2f %%" % (total_count * 100 / file_size), end="")
125     mutex.release()
126 
127 
128 def write(content, ws):
129     ws.write(content)
130     ws.flush()
131 
132 
133 def copy_dir(src, desc):
134     # 获得待拷贝的文件总数(含空文件夹)
135     total_number = get_file_count(src)
136     # 分隔符检测
137     src = check_separator(src)
138     desc = check_separator(desc)
139     # print("src:",src)
140     # print("desc:",desc)
141 
142     file_dir_list = [src + "/" + i for i in os.listdir(src)]
143     if os.path.exists(desc):
144         shutil.rmtree(desc)
145 
146     # 进程池
147     pool = multiprocessing.Pool(3)
148 
149     # 创建队列
150     queue = multiprocessing.Manager().Queue()
151 
152     # 一个文件/目录开启一个进程去拷贝
153     for f_name in file_dir_list:
154         target = os.path.splitdrive(desc)[0] + "/" + os.path.splitdrive(f_name)[1]
155         # target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:]
156         # print(target)
157         # 创建target目录
158         parent_path = os.path.split(target)[0]
159         if not os.path.exists(parent_path):
160             os.makedirs(parent_path)
161         pool.apply_async(copy, args=(f_name, target, queue))
162 
163     start = time.time()
164     pool.close()
165     # pool.join()
166     count = 0
167     while True:
168         count += queue.get()
169         # 格式化输出时两个%输出一个%,不换行,每次定位到行首,实现覆盖
170         print("\r当前进度为 %.2f %%" % (count * 100 / total_number), end="")
171         if count >= total_number:
172             break
173 
174     executor.shutdown()
175     end = time.time()
176     print()
177     print("耗时-----", (end - start), "s")
178 
179 
180 # 查找指定字符出现的全部索引位置
181 def index_list(c, s):
182     return [i.start() for i in re.finditer(c, s)]
183 
184 
185 # 检测目录结尾是否有 "/"
186 def check_separator(path):
187     if path.rindex("/") == len(path) - 1:
188         return path[0:path.rindex("/")]
189     return path
190 
191 
192 def main():
193     copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/")
194 
195 
196 if __name__ == '__main__':
197     main()

 

转载于:https://www.cnblogs.com/tele-share/p/10656811.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/264325.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

水晶报表加载失败的解决方法

问题描述:利用水晶报表开发的程序在运行了一段时间后,加载水晶报表失败.问题解决1.这个问题可能是由于windows临时文件夹下面的水晶报表文件没有及时清除的原因.打开临时文件夹,删除后就可以正常运行.2. (1).ReportDocumen实例必须为类成员 private ReportDocument prtp ne…

重定向

servlet的两种重定向方法的区别及应用一 问题&#xff1a; 在servlet/JSP编程学习中,发现有两种方法可以实现服务端输出重定向&#xff0c;一种是通过forward方法&#xff08;例如JSP中 的<jsp:forward page”OtherPage.jsp”/>&#xff09;&#xff0c;另一种则是通过运…

移动端事件

1. click事件 单击事件&#xff0c;类似于PC端的click&#xff0c;但在移动端中&#xff0c;连续click的触发有200ms ~ 300ms的延迟 2. touch类事件 触摸事件&#xff0c;有touchstart touchmove touchend touchcancel 四种之分 touchstart&#xff1a;手指触摸到屏幕会触发 to…

LINQ to SharePoint 试用感受, 欢迎讨论~

目前项目需要比较复杂的查询功能&#xff0c; 如果直接写CAML的话以后维护非常麻烦&#xff0c; 然后暂时用LINQ to SharePoint替代之&#xff5e; URL&#xff1a; http://linqtosharepoint.codeplex.com/ 最大的优点是我不用写CAML了&#xff0c; 非常容易实现一些简单的查询…

android应用退出后广播无效,关闭应用程序后,保持广播接收器运行

我认为接受的答案不是实际答案。我将解释问题所在。我认为您是在Huawie&#xff0c;Oppo&#xff0c;Vivo&#xff0c;Xiomi&#xff0c;asus .......或某些设备上测试您的应用程序。使用这些设备&#xff0c;如果我们关闭应用程序&#xff0c;它们还将关闭我们的广播接收器。因…

数据库SQL,技巧篇

三、技巧1、11&#xff0c;12的使用&#xff0c;在SQL语句组合时用的较多“where 11” 是表示选择全部 “where 12”全部不选&#xff0c;如&#xff1a;if strWhere ! beginset strSQL select count(*) as Total from [ tblName ] where strWhere endelse beginset st…

走过小公司的坑之入职一周

第一天工作内容&#xff0c;电脑系统安装&#xff0c;软件环境部署&#xff0c;出现硬盘坏道问题及病毒 第二天工作内容&#xff0c;web基础模型搭建&#xff0c;解决软件版本兼容问题 第三天工作内容&#xff0c;承接项目&#xff0c; 项目背景&#xff1a;web进销存系统&…

ps4移植android游戏,把PS4游戏《Apex英雄》《只狼》搬到安卓手机上玩,这招够简单!...

近期&#xff0c;索尼抢先其他平台&#xff0c;在PlayStaion官方网站发布新信息&#xff0c;使各大主流射击游戏平均流失率达到57%的大逃杀游戏《Apex英雄》&#xff0c;本赛季会加入两名新角色&#xff0c;一位是刚更新的辛烷&#xff0c;另一位会在本赛季结束前推出。根据此前…

指挥家和他的理解

最近翻出了电脑硬盘中的MP3&#xff0c;无意间找到了上大学音乐选修课时为考试准备的一些古典音乐的MP3&#xff0c;其中有一个文件夹中的文件都是Beethoven的 音乐。遂考到MP3中找些时间慢慢听。逐渐&#xff0c;发现自己开始喜欢上了古典交响乐&#xff0c;尤其是Beethoven的…

手把手带你使用JS-SDK自定义微信分享效果

https://www.cnblogs.com/backtozero/p/7064247.html转载于:https://www.cnblogs.com/diyunpeng/p/10659452.html

linux fork函数浅析

#include <sys/types.h> #include <unistd.h> /* 功能&#xff1a;复制进程 參数&#xff1a;无 返回值&#xff1a; 成功&#xff1a; 父进程&#xff1a;返回子进程id 子进程&#xff1a;返回0 失败&#xff1a; 返回&#xff0d;1 */ pid_t fork(void); 由fork…

android图片保存形式,Android应用开发之Android ScrollView截图和图片保存到相册的方式...

本文将带你了解Android应用开发之Android ScrollView截图和图片保存到相册的方式&#xff0c;希望本文对大家学Android有所帮助。1.1首先来看你一种截取屏幕&#xff0c;这种代码有缺陷&#xff0c;只能截取一次Java代码 getWindow().getDecorView().setDrawingCacheEnabled…

如何在IE让用户自动下载ActiveX控件?

IE中要嵌一个ActiveX OCX&#xff0c;如何让用户在打开这个IE时自动安装并注册&#xff1f; 假如该页面完全load后并没发现需要的ocx&#xff0c;如何提示给用户? 1.做好ACTIVEX控件&#xff0c;写inf文件。打包成CAB。 2.用signcode给cab数字签名&#xff08;数字签名可到中…

ffmpeg学习笔记-native原生绘制

上次已将ffmpeg的动态库编译出来了&#xff0c;并且使用了ffmpeg的转码功能&#xff0c;成功将mp4格式视频转化为yuv视频&#xff0c;这篇文章基于上次测试的demo&#xff0c;使用surfaceview显示解码完成的像素数据 布局设置和权限添加 布局 <FrameLayout xmlns:android&qu…

CoreData一些基本概念

Core Data涉及到的几个主要的概念可以对应数据库来理解&#xff1a;NSManagedObjectContext&#xff08;托管对象上下文&#xff09;&#xff1a;数据库NSEntityDescription&#xff08;实体描述&#xff09;&#xff1a;表NSFetchRequest&#xff08;请求&#xff09;&#xf…

BDC技术(一个例子)

BDC技术 BDC&#xff08;Batch Data Conversion&#xff09;&#xff1a;在SAP系统里&#xff0c;由于某种原因&#xff0c;可能需要重复输入数据&#xff0c;&#xff08;数据不同&#xff0c;但是操作是相同的&#xff0c;典型的情形就是切换系统的时候&#xff0c;旧系统的数…

华为android怎样隐藏软件,华为怎么打开隐藏应用功能

隐藏应用是没有密码的&#xff0c;隐藏应用的方法&#xff1a;在主桌面两指分开&#xff0c;进入隐藏应用界面&#xff0c;点击(添加)&#xff0c;然后点击要隐藏的应用&#xff0c;再点击确定即可。应用锁有密码&#xff0c;是机主设定的&#xff0c;如果设置了指纹访问应用&a…

事务的传播性和隔离级别

事务的传播性&#xff1a;1、PROPOGATION_REQUIRES --需要在一个事务中执行2、PROPOGATION_SUPPOTS --不需要在一个事务中执行&#xff0c;如果有事务&#xff0c;也可以执行3 PROPOGATION_NOT_SUPPORTED --不支持在一个事务中执行&#xff0c;如果在…

Oracle建立表空间和用户

Oracle建立表空间和用户 建立表空间和用户的步骤&#xff1a; 用户 建立&#xff1a;create user username identified by "password"; 授权&#xff1a;grant create session to username;grant create table to username;grant create tablespace…

VC系统扫雷游戏外挂源代码程序下载(转帖

VC系统扫雷游戏外挂源代码程序下载&#xff08;转帖&#xff09;2008-03-04 10:25经过了多次测试写出了历史上第一个有点意义的MFC程序。效果差强人意。^_^ CODE:// CrackWinmineDlg.cpp : implementation file// #include "stdafx.h"#include "CrackWinmine.h&…