Python压缩、解压文件

#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: util_compress.py
@time: 2023/5/28 14:58
@desc: rarfile 使用需要安装 rarfile 和 unrar 并且将 unrar.exe 复制到venv/Scrpits目录下
"""
import os
import gzip
from zipfile import ZipFile
import shutil
import rarfile
# 这俩工具包在下边
from sdk.utils.util_folder import FolderProcess
from sdk.utils.util_file import FileProcessclass ZipProcess(object):"""压缩,解压文件"""def __init__(self):""""""self.folder = FolderProcess()self.file = FileProcess()self.format = [".zip", ".rar", ".gz"]def zip(self, zip_name: str, filefolder: str = None, kind: str = "zip"):"""压缩:param zip_name::param filefolder: 支持file/folder:param kind: zip,tar,gztar等:return:"""shutil.make_archive(zip_name, kind, filefolder)def _check_zip_files(self,save_folder):""":param save_folder::return:"""for args in self.folder.get_all_files(save_folder):tail = self.file.get_file_tail(args["file"])if tail in self.format:self.unzip(args["file"], os.sep.join(self.folder.split_path(args["file"])[:-1]))def unzip(self, zip_file: str, save_path: str = "./"):"""解压 原路径结构 中文会出现乱码(原因未知):param zip_file::param save_path:可以不存在:return:"""file_split = self.folder.split_path(zip_file)save_folder = self.folder.merge_path([save_path, file_split[-1].split(".")[0]])self.folder.create_folder(save_folder)file_name = file_split[-1]if zip_file.lower().endswith(".zip"):with ZipFile(zip_file, 'r') as zip_ref:zip_ref.extractall(save_folder)elif zip_file.lower().endswith(".rar"):with rarfile.RarFile(zip_file) as rar_file:rar_file.extractall(save_folder)elif zip_file.lower().endswith(".gz"):with gzip.open(zip_file, 'rb') as gz_file, \open(self.folder.merge_path([save_folder, file_name]), 'wb') as output_file:output_file.write(gz_file.read())else:raise ValueError("不支持的格式:{}".format(zip_file))# 删除已经解压的压缩文件self.folder.remove(zip_file)# 遍历已经解压的压缩包内容,检查嵌套压缩文件继续解压self._check_zip_files(save_folder)

util_folder .py

#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: util_folder.py
@time: 2023/5/28 13:56
@desc:
"""
import os
import traceback
import shutil
from sdk.base.base_temp import Baseclass FolderProcess(Base):""""""def __init__(self):super(FolderProcess, self).__init__()def create_folder(self, path):"""创建文件夹:param _path::return:"""os.makedirs(path, exist_ok=True)def merge_path(self, path_lis):"""合并路径:param path_lis::return:"""if path_lis:return os.path.sep.join(path_lis)def split_path(self, path: str, spliter: str = None):"""拆分路径"""if not spliter:if not path.startswith("http://") or not path.startswith("https://"):return os.path.normpath(path).split(os.sep)else:return os.path.normpath(path).split("/")else:return path.split(spliter)def remove(self, file: str = None, folder: str = None):"""删除文件、文件夹:param file::param folder::return:"""try:if folder:shutil.rmtree(folder)if file:os.remove(file)except Exception as e:print(e, e.__traceback__.tb_lineno)def get_all_files(self, path: str, ext: list = None):"""获取文件夹下所有文件绝对路径:param path::param ext: 后缀列表[".txt",".json",...]:return:"""try:if os.path.exists(path) and os.path.isabs(path):for path, dir_lis, file_lis in os.walk(path):if len(file_lis) > 0:for name in file_lis:if ext:if os.path.splitext(name)[-1] in ext:yield {"name": name,"file": os.path.join(path, name),}else:yield {"name": name,"file": os.path.join(path, name),}except BaseException:print(traceback.format_exc())

util_file.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: util_file.py
@time: 2023/5/27 21:25
@desc:
"""
import os
import shutil
import traceback
import chardet
from sdk.base.base_temp import Base
from sdk.utils.util_json import JsonProcessclass FileProcess(Base):"""文件处理类"""def __init__(self):super(FileProcess, self).__init__()self.json = JsonProcess()def get_file_lines(self, file: str, status: int = 1):"""获取文件总行数:param file::param status:0:大文件、1小文件:return:"""if status == 1:return sum(1 for _ in open(file, 'rb'))else:with open(file, 'rb') as f:for count, _ in enumerate(f, 1):passreturn countdef rename_file(self, old: str, new: str):"""重命名文件:param old::param new::return:"""try:if os.path.isfile(old) and not os.path.exists(new):os.renames(old, new)except BaseException:print(traceback.format_exc())def get_file_encode(self, file: str, size=1024 * 1024) -> str:"""获取文件编码:param file::param size::return:"""with open(file, "rb")as fp:fp_bit = fp.read(size)return chardet.detect(fp_bit)["encoding"]def get_file_size(self, file: str, unit: str = "MB") -> str:"""获取文件大小:param file::param unit::return:"""file_size = os.path.getsize(file)if unit == "KB":return str(round(file_size / float(1024), 2)) + " " + unitelif unit == "MB":return str(round(file_size / float(1024 * 1024), 2)) + " " + unitdef get_file_tail(self, file: str):"""获取文件后缀:param file::return:"""return os.path.splitext(file)[-1]def read_yield(self, file: str, headers: list = None,encoding: str = "utf-8", spliter: str = "\t", sheets: list = None, mode="r") -> dict:"""按行读文件:param file::param headers::param encoding::param spliter::param sheets::return:"""with open(file, mode=mode, encoding=encoding)as fp:# 传headers 从第一行开始处理,不传headers默认第一行为headersif not headers:headers = fp.readline().strip().split(spliter)for num, data in enumerate(fp):line = data.strip("\n").split(spliter)yield {"headers": headers,"num": num + 1,"line": line}def read_json_file(self, file: str, encoding: str = "utf-8") -> dict:"""读取json文件:param file::param encoding::return:"""with open(file, "r", encoding=encoding)as fp:return self.json.loads(fp.read())def save(self, file: str, data: dict, mode: str = "w", encoding: str = "utf-8",spliter: str = "\t", indent: int = None, ensure_ascii: bool = False):"""保存文件:param file::param data::param mode::param encoding::param spliter::param indent::param ensure_ascii::return:"""with open(file, mode=mode, encoding=encoding)as fp:tail = self.get_file_tail(file)if data.get("headers") is not None:if tail == ".txt":fp.write("{}\n".format(spliter.join(data["headers"])))for line in data.get("line"):fp.write("{}\n".format(spliter.join(line)))else:if tail == ".json":if isinstance(data["line"], dict):fp.write(self.json.dumps(data["line"]))else:fp.write(self.json.dumps(data["line"]))def split_file(self, file: str, spliter_nums: int = 1000,headers: str = None, encoding: str = "utf-8", spliter="\t") -> dict:"""按行 拆分文件:param file::param spliter_nums::param headers::param encoding::return:"""lis = []with open(file, "r", encoding=encoding)as fp:if not headers:headers = fp.readline().strip().split(spliter)for i in fp:line = i.strip().split(spliter)lis.append(line)if len(lis) == spliter_nums:yield {"headers": headers,"line": lis,}lis.clear()if len(lis) > 0:yield {"headers": headers,"line": lis,}def merge_file(self, file1, file2, headers=None,encoding="utf-8", mode="r"):"""合并文件:param file1:待合并文件:param file2:合并后新文件:param headers::param encoding::param mode::return:"""with open(file2, "a", encoding=encoding)as fp:for args in self.read_yield(file1, headers=headers, encoding=encoding, mode=mode):line = args["line"]fp.write("{}\n".format("\t".join(line)))def move_file(self, old_file, new_file):""":param old_file::param new_file::return:"""shutil.copy(old_file, new_file)

base_temp.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: base_temp.py
@time: 2023/5/27 21:07
@desc:
"""class Base(object):""""""def read_yield(self, file: str, headers: list = None,encoding: str = "utf-8", spliter: str = "\t", sheets: list = None) -> dict:"""按行返回:param file::param headers::param encoding::return:"""def save(self, file: str, data: dict, mode: str = "w", encoding: str = "utf-8",spliter: str = "\t", indent: int = None, ensure_ascii: bool = False) -> str:"""保存结果:param file::param data:{'headers': ['', '', ''], 'data': [{"line":1,"data":[]},{"line":2,"data":[]}]} /{'headers': ['', '', ''], 'data': [{"line":1,"data":[{},{}]},{"line":2,"data":["{}","{}"]}]}:param mode::param encoding::return:"""def remove(self, file: str = None, folder: str = None):"""删除文件、文件夹:param file::param folder::return:"""

util_json.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC
@file: util_json.py
@time: 2023/5/27 22:41
@desc:
"""
import jsonclass JsonProcess():"""json 序列化 反序列化"""def loads(self, data: str) -> dict:"""str - dict:param data::return:"""return json.loads(data, strict=False)def dumps(self, data: dict, indent: None = 4,ensure_ascii: bool = False) -> str:"""dict-str:param data::param indent::param ensure_ascii::return:"""return json.dumps(data, indent=indent, ensure_ascii=ensure_ascii)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/179585.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

<Linux>(极简关键、省时省力)《Linux操作系统原理分析之Linux 进程管理 7》(11)

《Linux操作系统原理分析之Linux 进程管理 7》(11) 4 Linux 进程管理4.7 IPC 信号量机制4.7.1 信号量与信号量集合1.信号量2.信号量集合3.信号量集合的集中 4.7.2 信号量集合的创建和检索4.7.3 信号量 PV 操作4.7.4 信号量操作等待队列1.信号量操作等待队…

整数反转 Golang leecode_7

拿到手第一反应还是暴力,直接从低位到高位把数一个个取出来,然后乘以每一位的权重,构成一个新的反转后的整数 res 返回,代码如下 package mainimport ("fmt""math" )func reverse(x int) int {if x > -10…

Java 基础学习(三)循环流程控制与数组

1 循环流程控制 1.1 循环流程控制概述 1.1.1 什么是循环流程控制 当一个业务过程需要多次重复执行一个程序单元时,可以使用循环流程控制实现。 Java中包含3种循环结构: 1.2 for循环 1.2.1 for循环基础语法 for循环是最常用的循环流程控制&#xff…

redis运维(二十二)redis 的扩展应用 lua(四)

一 最佳实践 ① 铺垫 最佳实践:1、把redis操作所需的key通过KEYS进行参数传递2、其它的lua脚本所需的参数通过ARGV进行传递. redis lua脚本原理 Redis Lua脚本的执行原理 ② 删除指定的脚本缓存 ③ redis集群模式下使用lua脚本注意事项 1、常见报错现象 C…

SpringBoot 项目中获取 Request 的四种方法

SpringBoot 项目中获取 Request 的四种方法 方法1、Controller中加参数来获取request 注意:只能在Controller中加入request参数。 一般,我们在Controller中加参数获取HttpServletRequest,如下所示: RestController RequestMap…

『 Linux 』进程优先级

文章目录 什么是优先级Linux下的进程优先级PRI与NI使用top查看进程以及对进程的优先级的修改 进程优先级的其他概念竞争性与独立性并发与并行 什么是优先级 优先级,顾名思义,即在同一环境下不同单位对同一个资源的享有顺序; 一般优先级高的单位将优先占有该资源; 在进程当中进…

Python中如何用栈实现队列

目录 一、引言 二、使用两个栈实现队列 三、性能分析 四、应用场景 五、代码示例 六、优缺点总结 一、引言 队列(Queue)和栈(Stack)是计算机科学中常用的数据结构。队列是一种特殊的线性表,只允许在表的前端进行…

Leetcode 380. O(1) 时间插入、删除和获取随机元素

文章目录 题目代码(11.28 首刷看解析) 题目 Leetcode 380. O(1) 时间插入、删除和获取随机元素 代码(11.28 首刷看解析) 1.length:表示的是数组的长度 数组 2.length():表示的是字符串的长度 字符串 3.size():表示的是集合中有多…

万宾科技可燃气体监测仪科技作用全览

燃气管网在运行过程中经常会遇到燃气管道泄漏的问题,燃气泄漏甚至会引起爆炸,从而威胁人民的生命和财产安全,因此对燃气管网进行定期巡检是十分必要的工作。但是传统的人工巡检已不能满足城市的需要,除了选择增加巡检人员之外&…

#Js篇:前端的设计模式有哪些

常见的前端设计模式 单例模式 保证一个类仅有一个实例,并提供一个访问他的全局访问点;vuex常用于管理全局状态、配置信息等。 工厂模式 一个用于创建对象的接口,让子类决定实例化哪个类; 创建复杂对象或对象的组合。 策略模…

Web UI自动化测试框架

WebUI automation testing framework based on Selenium and unittest. 基于 selenium 和 unittest 的 Web UI自动化测试框架。 特点 提供更加简单API编写自动化测试。提供脚手架,快速生成自动化测试项目。自动生成HTML测试报告生成。自带断言方法,断言…

笔记:Pika Labs 3D 动画生成工具

Pika Labs 一款3D 动画生成工具 本文地址:https://blog.csdn.net/qq_28550263/article/details/134657306 目 录 1. 简介2. 准备2.1 安装 discord2.2 加入 Discord 频道 3. Pika 使用指南2.1 快速开始2.2 从图像到视频2.3 Pika Bot按钮2.4 提示(Prompt&a…

Spark Streaming的基本数据流

先来介绍一下按照动静对数据的区分 静态数据 ​ 静态数据(Static Data)指的是在一段时间内不会或很少发生变化的数据。这种类型的数据通常是固定的,并且不会随着时间的推移而更新或仅偶尔更新。静态数据的典型例子包括配置文件、参考表、历…

面试题:说一下MyBatis动态代理原理?

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1.MyBatis简介2.使用步骤2.1、引入依赖2.2、配置文件2.3、接口定义2.4、加载执行 3.原理解析 1.MyBatis简介 MyBatis是一个ORM工具,封装了JDBC的操作&a…

Redis 主从架构,Redis 分区,Redis哈希槽的概念,为什么要做Redis分区

文章目录 Redis 主从架构redis replication 的核心机制redis 主从复制的核心原理过程原理Redis集群的主从复制模型是怎样的?生产环境中的 redis 是怎么部署的?机器是什么配置?你往内存里写的是什么数据?说说Redis哈希槽的概念&…

前端入门(四)Ajax、Promise异步、Axios通信、vue-router路由

文章目录 AjaxAjax特点 Promise 异步编程(缺)Promise基本使用状态 - PromiseState结果 - PromiseResult Axios基本使用 Vue路由 - vue-router单页面Web应用(single page web application,SPA)vue-router基本使用路由使…

Rabbitmq发送邮件并消费邮件

📑前言 本文主要是【Rabbitmq】——Rabbitmq发送邮件并消费邮件的文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是听风与他🥇 ☁️博客首页:CSDN主页听风与他 &#x1…

jvm的相关知识点

Java Virtual Machine(JVM)是Java程序的运行环境,是Java技术的核心和关键之一。JVM负责执行Java字节码,并提供了一种平台无关性的执行环境,使得Java程序可以在不同的硬件和操作系统上运行。 下面是关于JVM的一些重要知…

spring应用在afterPropertiesSet方法中获取ApplicationContext

在afterPropertiesSet方法中获取ApplicationContext是可以的。Spring容器在初始化bean后,会自动调用afterPropertiesSet方法。在这个方法中,您可以获取到ApplicationContext对象。 以下是一个示例代码: import org.springframework.context…

【数学】旋转矩阵

参考链接 OpenGL from OpenGL.GL import * from OpenGL.GLUT import * from math import * import numpy as np def draw_axes():glClear(GL_COLOR_BUFFER_BIT)# 绘制坐标轴glColor3f(1.0, 1.0, 1.0) # 设置坐标轴颜色为白色glBegin(GL_LINES)glVertex2f(-1.0, 0.0) # x 轴g…