#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: util_compress.py
@time: 2023/5/28 14:58
@desc: rarfile 使用需要安装 rarfile 和 unrar 并且将 unrar.exe 复制到venv/Scrpits目录下
"""
import os
import gzip
from zipfile import ZipFile
import shutil
import rarfile
# 这俩工具包在下边
from sdk.utils.util_folder import FolderProcess
from sdk.utils.util_file import FileProcessclass ZipProcess(object):"""压缩,解压文件"""def __init__(self):""""""self.folder = FolderProcess()self.file = FileProcess()self.format = [".zip", ".rar", ".gz"]def zip(self, zip_name: str, filefolder: str = None, kind: str = "zip"):"""压缩:param zip_name::param filefolder: 支持file/folder:param kind: zip,tar,gztar等:return:"""shutil.make_archive(zip_name, kind, filefolder)def _check_zip_files(self,save_folder):""":param save_folder::return:"""for args in self.folder.get_all_files(save_folder):tail = self.file.get_file_tail(args["file"])if tail in self.format:self.unzip(args["file"], os.sep.join(self.folder.split_path(args["file"])[:-1]))def unzip(self, zip_file: str, save_path: str = "./"):"""解压 原路径结构 中文会出现乱码(原因未知):param zip_file::param save_path:可以不存在:return:"""file_split = self.folder.split_path(zip_file)save_folder = self.folder.merge_path([save_path, file_split[-1].split(".")[0]])self.folder.create_folder(save_folder)file_name = file_split[-1]if zip_file.lower().endswith(".zip"):with ZipFile(zip_file, 'r') as zip_ref:zip_ref.extractall(save_folder)elif zip_file.lower().endswith(".rar"):with rarfile.RarFile(zip_file) as rar_file:rar_file.extractall(save_folder)elif zip_file.lower().endswith(".gz"):with gzip.open(zip_file, 'rb') as gz_file, \open(self.folder.merge_path([save_folder, file_name]), 'wb') as output_file:output_file.write(gz_file.read())else:raise ValueError("不支持的格式:{}".format(zip_file))# 删除已经解压的压缩文件self.folder.remove(zip_file)# 遍历已经解压的压缩包内容,检查嵌套压缩文件继续解压self._check_zip_files(save_folder)
util_folder .py
#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: util_folder.py
@time: 2023/5/28 13:56
@desc:
"""
import os
import traceback
import shutil
from sdk.base.base_temp import Baseclass FolderProcess(Base):""""""def __init__(self):super(FolderProcess, self).__init__()def create_folder(self, path):"""创建文件夹:param _path::return:"""os.makedirs(path, exist_ok=True)def merge_path(self, path_lis):"""合并路径:param path_lis::return:"""if path_lis:return os.path.sep.join(path_lis)def split_path(self, path: str, spliter: str = None):"""拆分路径"""if not spliter:if not path.startswith("http://") or not path.startswith("https://"):return os.path.normpath(path).split(os.sep)else:return os.path.normpath(path).split("/")else:return path.split(spliter)def remove(self, file: str = None, folder: str = None):"""删除文件、文件夹:param file::param folder::return:"""try:if folder:shutil.rmtree(folder)if file:os.remove(file)except Exception as e:print(e, e.__traceback__.tb_lineno)def get_all_files(self, path: str, ext: list = None):"""获取文件夹下所有文件绝对路径:param path::param ext: 后缀列表[".txt",".json",...]:return:"""try:if os.path.exists(path) and os.path.isabs(path):for path, dir_lis, file_lis in os.walk(path):if len(file_lis) > 0:for name in file_lis:if ext:if os.path.splitext(name)[-1] in ext:yield {"name": name,"file": os.path.join(path, name),}else:yield {"name": name,"file": os.path.join(path, name),}except BaseException:print(traceback.format_exc())
util_file.py
#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: util_file.py
@time: 2023/5/27 21:25
@desc:
"""
import os
import shutil
import traceback
import chardet
from sdk.base.base_temp import Base
from sdk.utils.util_json import JsonProcessclass FileProcess(Base):"""文件处理类"""def __init__(self):super(FileProcess, self).__init__()self.json = JsonProcess()def get_file_lines(self, file: str, status: int = 1):"""获取文件总行数:param file::param status:0:大文件、1小文件:return:"""if status == 1:return sum(1 for _ in open(file, 'rb'))else:with open(file, 'rb') as f:for count, _ in enumerate(f, 1):passreturn countdef rename_file(self, old: str, new: str):"""重命名文件:param old::param new::return:"""try:if os.path.isfile(old) and not os.path.exists(new):os.renames(old, new)except BaseException:print(traceback.format_exc())def get_file_encode(self, file: str, size=1024 * 1024) -> str:"""获取文件编码:param file::param size::return:"""with open(file, "rb")as fp:fp_bit = fp.read(size)return chardet.detect(fp_bit)["encoding"]def get_file_size(self, file: str, unit: str = "MB") -> str:"""获取文件大小:param file::param unit::return:"""file_size = os.path.getsize(file)if unit == "KB":return str(round(file_size / float(1024), 2)) + " " + unitelif unit == "MB":return str(round(file_size / float(1024 * 1024), 2)) + " " + unitdef get_file_tail(self, file: str):"""获取文件后缀:param file::return:"""return os.path.splitext(file)[-1]def read_yield(self, file: str, headers: list = None,encoding: str = "utf-8", spliter: str = "\t", sheets: list = None, mode="r") -> dict:"""按行读文件:param file::param headers::param encoding::param spliter::param sheets::return:"""with open(file, mode=mode, encoding=encoding)as fp:# 传headers 从第一行开始处理,不传headers默认第一行为headersif not headers:headers = fp.readline().strip().split(spliter)for num, data in enumerate(fp):line = data.strip("\n").split(spliter)yield {"headers": headers,"num": num + 1,"line": line}def read_json_file(self, file: str, encoding: str = "utf-8") -> dict:"""读取json文件:param file::param encoding::return:"""with open(file, "r", encoding=encoding)as fp:return self.json.loads(fp.read())def save(self, file: str, data: dict, mode: str = "w", encoding: str = "utf-8",spliter: str = "\t", indent: int = None, ensure_ascii: bool = False):"""保存文件:param file::param data::param mode::param encoding::param spliter::param indent::param ensure_ascii::return:"""with open(file, mode=mode, encoding=encoding)as fp:tail = self.get_file_tail(file)if data.get("headers") is not None:if tail == ".txt":fp.write("{}\n".format(spliter.join(data["headers"])))for line in data.get("line"):fp.write("{}\n".format(spliter.join(line)))else:if tail == ".json":if isinstance(data["line"], dict):fp.write(self.json.dumps(data["line"]))else:fp.write(self.json.dumps(data["line"]))def split_file(self, file: str, spliter_nums: int = 1000,headers: str = None, encoding: str = "utf-8", spliter="\t") -> dict:"""按行 拆分文件:param file::param spliter_nums::param headers::param encoding::return:"""lis = []with open(file, "r", encoding=encoding)as fp:if not headers:headers = fp.readline().strip().split(spliter)for i in fp:line = i.strip().split(spliter)lis.append(line)if len(lis) == spliter_nums:yield {"headers": headers,"line": lis,}lis.clear()if len(lis) > 0:yield {"headers": headers,"line": lis,}def merge_file(self, file1, file2, headers=None,encoding="utf-8", mode="r"):"""合并文件:param file1:待合并文件:param file2:合并后新文件:param headers::param encoding::param mode::return:"""with open(file2, "a", encoding=encoding)as fp:for args in self.read_yield(file1, headers=headers, encoding=encoding, mode=mode):line = args["line"]fp.write("{}\n".format("\t".join(line)))def move_file(self, old_file, new_file):""":param old_file::param new_file::return:"""shutil.copy(old_file, new_file)
base_temp.py
#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: base_temp.py
@time: 2023/5/27 21:07
@desc:
"""class Base(object):""""""def read_yield(self, file: str, headers: list = None,encoding: str = "utf-8", spliter: str = "\t", sheets: list = None) -> dict:"""按行返回:param file::param headers::param encoding::return:"""def save(self, file: str, data: dict, mode: str = "w", encoding: str = "utf-8",spliter: str = "\t", indent: int = None, ensure_ascii: bool = False) -> str:"""保存结果:param file::param data:{'headers': ['', '', ''], 'data': [{"line":1,"data":[]},{"line":2,"data":[]}]} /{'headers': ['', '', ''], 'data': [{"line":1,"data":[{},{}]},{"line":2,"data":["{}","{}"]}]}:param mode::param encoding::return:"""def remove(self, file: str = None, folder: str = None):"""删除文件、文件夹:param file::param folder::return:"""
util_json.py
#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: util_json.py
@time: 2023/5/27 22:41
@desc:
"""
import jsonclass JsonProcess():"""json 序列化 反序列化"""def loads(self, data: str) -> dict:"""str - dict:param data::return:"""return json.loads(data, strict=False)def dumps(self, data: dict, indent: None = 4,ensure_ascii: bool = False) -> str:"""dict-str:param data::param indent::param ensure_ascii::return:"""return json.dumps(data, indent=indent, ensure_ascii=ensure_ascii)