说明
平时偶尔需要进行重复性的对文件进行重命名、格式转化等。假设以文件复制功能作为目标,设计一个小程序使用多线程对文件进行批量复制。(其实以后主要目标是针对Realsense的raw文件进行批量的转化,并借助多线程加速)
代码
import threading
import os
import time# 多线程类
# 更改__init__和run供自己使用
class FileProcess(threading.Thread):process_nums: int = 0 #外部查看进度使用all_nums: int = 0 #记录总数量source_path = ""target_path = ""files = []def __init__(self, source_path: str, files: list, target_path: str = None):super().__init__()self.all_nums = len(files)self.files = filesself.source_path = source_pathself.target_path = target_pathdef run(self) -> None:import shutilwhile self.process_nums < self.get_all_nums():if self.target_path is None:passtime.sleep(0.1)else:shutil.copyfile(os.path.join(self.source_path, self.files[self.process_nums]),os.path.join(self.target_path, self.files[self.process_nums]),)self.process_nums += 1def get_process_nums(self):return self.process_numsdef get_all_nums(self):return self.all_numsif __name__ == "__main__":source_path = "D:\\SharedFolders\\DataSets\\office_room_traj0_loop\\rgb"target_path = ""# 获取需要批量处理的目录中的文件source_file = os.listdir(source_path)# 根据线程数量,计算每个线程负责处理文件数量thread_nums = 3nums_step = int(len(source_file) / 3)# 初始化线程thread_list = []process_list_nums = []for i in range(thread_nums):if i == 0:thread_list.append(FileProcess(source_path=source_path, files=source_file[:nums_step]))process_list_nums.append(len(source_file[:nums_step]))elif i == thread_nums - 1:thread_list.append(FileProcess(source_path=source_path, files=source_file[nums_step * i :]))process_list_nums.append(len(source_file[nums_step * i :]))else:thread_list.append(FileProcess(source_path=source_path,files=source_file[nums_step * i : nums_step * i + nums_step],))process_list_nums.append(len(source_file[nums_step * i : nums_step * i + nums_step]))# 启动线程for i in range(thread_nums):thread_list[i].start()print("[status]:start")print("\n" * thread_nums, end="")while True:all_end = 0process_nums = []time.sleep(0.5)for i in range(thread_nums):if thread_list[i].is_alive():process_nums.append(thread_list[i].get_process_nums())all_end += 1else:process_nums.append(process_list_nums[i])# 进度显示部分print("\033[{}F".format(thread_nums), end="") # 光标移动到上面n行开头部位print("\033[0J", end="") # 从光标位置开始删除内容,一直到结尾for i in range(thread_nums):print("[status]:thread {}:{}/{} {:.2f}%".format(i + 1,process_nums[i],process_list_nums[i],process_nums[i] * 100 / process_list_nums[i],))# 所有线程结束则退出if all_end == 0:breakprint("[status]:success")