Python Minio 工具类封装

        最近因为需要对大规模的文件进行存储,选了多种对象存储方案,最终选择了MinIO,为了方便python的调用,在minio第三方包的基础上进行进一步封装调用,该工具除了基础的功能外,还封装了多线程分片下载文件和上传文件的功能,切片设置不宜过大,因为会受限于机器的带宽,过大会导致带宽被占光影响机器性能。分享的代码仅供学习使用。

import os
import io
from minio import Minio
from minio.error import S3Error
from datetime import timedelta
from tqdm import tqdm
from minio.deleteobjects import DeleteObject
from concurrent.futures import as_completed, ThreadPoolExecutorclass Bucket(object):client = Nonepolicy = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetBucketLocation","s3:ListBucket"],"Resource":["arn:aws:s3:::%s"]},{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetObject"],"Resource":["arn:aws:s3:::%s/*"]}]}'def __new__(cls, *args, **kwargs):if not cls.client:cls.client = object.__new__(cls)return cls.clientdef __init__(self, service, access_key, secret_key, secure=False, section_size=10, t_max=3):'''实例化参数:param service: 服务器地址:param access_key: access_key:param secret_key: secret_key:param secure: secure:param section_size: 切片大小mb:param t_max: 线程池大小'''self.service = serviceself.client = Minio(service, access_key=access_key, secret_key=secret_key, secure=secure)self.size = section_size * 1024 * 1024self.processPool = ThreadPoolExecutor(max_workers=t_max)def exists_bucket(self, bucket_name):"""判断桶是否存在:param bucket_name: 桶名称:return:"""return self.client.bucket_exists(bucket_name=bucket_name)def create_bucket(self, bucket_name: str, is_policy: bool=True):"""创建桶 + 赋予策略:param bucket_name: 桶名:param is_policy: 策略:return:"""if self.exists_bucket(bucket_name=bucket_name):return Falseelse:self.client.make_bucket(bucket_name=bucket_name)if is_policy:policy = self.policy % (bucket_name, bucket_name)self.client.set_bucket_policy(bucket_name=bucket_name, policy=policy)return Truedef get_bucket_list(self):"""列出存储桶:return:"""buckets = self.client.list_buckets()bucket_list = []for bucket in buckets:bucket_list.append({"bucket_name": bucket.name, "create_time": bucket.creation_date})return bucket_listdef remove_bucket(self, bucket_name):"""删除桶:param bucket_name::return:"""try:self.client.remove_bucket(bucket_name=bucket_name)except S3Error as e:print("[error]:", e)return Falsereturn Truedef bucket_list_files(self, bucket_name, prefix):"""列出存储桶中所有对象:param bucket_name: 同名:param prefix: 前缀:return:"""try:files_list = self.client.list_objects(bucket_name=bucket_name, prefix=prefix, recursive=True)for obj in files_list:print(obj.bucket_name, obj.object_name.encode('utf-8'), obj.last_modified,obj.etag, obj.size, obj.content_type)except S3Error as e:print("[error]:", e)def bucket_policy(self, bucket_name):"""列出桶存储策略:param bucket_name::return:"""try:policy = self.client.get_bucket_policy(bucket_name)except S3Error as e:print("[error]:", e)return Nonereturn policydef download_file(self, bucket_name, file, file_path, stream=1024*32):"""从bucket 下载文件 + 写入指定文件:return:"""try:data = self.client.get_object(bucket_name, file)with open(file_path, "wb") as fp:for d in data.stream(stream):fp.write(d)except S3Error as e:print("[error]:", e)def fget_file(self, bucket_name, file, file_path):"""下载保存文件保存本地:param bucket_name::param file::param file_path::return:"""self.client.fget_object(bucket_name, file, file_path)def get_section_data(self, bucket_name, file_name, start, size):'''获取切片数据:param bucket_name::param file_name::param start::param size::return:'''data = {'start': start, 'data': None}try:obj = self.client.get_object(bucket_name=bucket_name, object_name=file_name, offset=start, length=size)data = {'start': start, 'data': obj}except Exception as e:print('=============', e)return datadef get_file_object(self, bucket_name, object_name):"""获取文件对象:param bucket_name::param file::return:"""pool_arr = []file_data = io.BytesIO()try:stat_obj = self.client.stat_object(bucket_name=bucket_name, object_name=object_name)total_length = stat_obj.sizesize = self.sizetotal_page = self.get_page_count(total_length, size)total = 0for chunck in range(1, total_page + 1):start = (chunck - 1) * sizeif chunck == total_page:size = total_length - totalthread_item = self.processPool.submit(self.get_section_data, bucket_name, object_name, start, size)pool_arr.append(thread_item)for key, thread_res in tqdm(enumerate(as_completed(pool_arr)), unit='MB', unit_scale=True,unit_divisor=1024 * 1024, ascii=True, total=len(pool_arr), ncols=50):try:_res = thread_res.result()file_data.seek(_res['start'])file_data.write(_res['data'].read())except Exception as e:print(e)except Exception as e:print(e)return file_data.getvalue()def get_object_list(self, bucket_name):objects = []try:objects = self.client.list_objects(bucket_name)except Exception as e:print(e)return objectsdef get_page_count(self, total, per_page):"""计算分页总数:param total: 记录总数:param per_page: 每页记录数:return: 分页总数"""page_count = total // per_pageif total % per_page != 0:page_count += 1return page_countdef copy_file(self, bucket_name, file, file_path):"""拷贝文件(最大支持5GB):param bucket_name::param file::param file_path::return:"""self.client.copy_object(bucket_name, file, file_path)def upload_file(self, bucket_name, file, file_path, content_type):"""上传文件 + 写入:param bucket_name: 桶名:param file: 文件名:param file_path: 本地文件路径:param content_type: 文件类型:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:print("Bucket '{}' is not exists".format(bucket_name))self.client.make_bucket(bucket_name)with open(file_path, "rb") as file_data:file_stat = os.stat(file_path)self.client.put_object(bucket_name, file, file_data, file_stat.st_size, content_type=content_type)except S3Error as e:print("[error]:", e)def upload_object(self, bucket_name, file, file_data, content_type='binary/octet-stream'):"""上传文件 + 写入:param bucket_name: 桶名:param file: 文件名:param file_data: bytes:param content_type: 文件类型 默认是appliction/octet-stream:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:print("Bucket '{}' is not exists".format(bucket_name))self.client.make_bucket(bucket_name)buffer = io.BytesIO(file_data)st_size = len(file_data)self.client.put_object(bucket_name, file, buffer, st_size, content_type=content_type)except S3Error as e:print("[error]:", e)def fput_file(self, bucket_name, file, file_path):"""上传文件:param bucket_name: 桶名:param file: 文件名:param file_path: 本地文件路径:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:self.client.make_bucket(bucket_name)else:print("Bucket '{}' already exists".format(bucket_name))self.client.fput_object(bucket_name, file, file_path)except S3Error as e:print("[error]:", e)def stat_object(self, bucket_name, file, log=True):"""获取文件元数据:param bucket_name::param file::return:"""res = Nonetry:data = self.client.stat_object(bucket_name, file)res = dataif log:print(data.bucket_name)print(data.object_name)print(data.last_modified)print(data.etag)print(data.size)print(data.metadata)print(data.content_type)except S3Error as e:if log:print("[error]:", e)return resdef remove_file(self, bucket_name, file):"""移除单个文件:return:"""self.client.remove_object(bucket_name, file)def remove_files(self, bucket_name, file_list):"""删除多个文件:return:"""delete_object_list = [DeleteObject(file) for file in file_list]for del_err in self.client.remove_objects(bucket_name, delete_object_list):print("del_err", del_err)def presigned_get_file(self, bucket_name, file, days=7):"""生成一个http GET操作 签证URL:return:"""return self.client.presigned_get_object(bucket_name, file, expires=timedelta(days=days))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/15090.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DeepDriving | CUDA编程-03:线程层级

本文来源公众号“DeepDriving”,仅用于学术分享,侵权删,干货满满。 原文链接:CUDA编程-03:线程层级 DeepDriving | CUDA编程-01: 搭建CUDA编程环境-CSDN博客 DeepDriving | CUDA编程-02: 初识CUDA编程-C…

Linux之共享内存mmap用法实例(六十三)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…

外卖霸王餐返利外卖会员卡小程序开发

外卖霸王餐返利外卖会员卡小程序开发 "社交电商赋能下的外卖返利小程序"是专为商家与用户双赢而设计的创新平台。 以下是其开发方案的详细步骤: 一、需求梳理:首先,我们需要明确小程序的核心功能和特色。包括设定活动类型、返利…

Python学习(3) 函数

定义 定义一个函数的格式: def 函数名(参数):执行代码如果没有参数,则称为无参函数。 定义时小括号中写的是形参(形式参数),调用时写的是实参(实际参数)。 调用 调用格式: def…

【Docker】Linux 系统(CentOS 7)安装 Docker

文章目录 对 VMware 软件的建议官方说明文档Docker安装卸载旧版本docker设置仓库开始安装 docker 引擎最新版 Docker 安装指定版本 Docker 安装(特殊需求使用) 启动 Docker查看 Docker 版本查看 Docker 镜像设置 Docker 开机自启动 验证开机启动是否生效…

自定义原生小程序顶部及获取胶囊信息

需求:我需要将某个文字或者按钮放置在小程序顶部位置 思路:根据获取到的顶部信息来定义我需要放的这个元素样式 * 这里我是定义某个指定页面 json:给指定页面的json中添加自定义设置 "navigationStyle": "custom" JS&am…

新时代AI浪潮下,程序员和产品经理如何入局AIGC领域?

当下,AI浪潮席卷全球,AIGC大模型技术已经成为当今技术领域的一个重要趋势,对于产品经理来说,掌握这项技术不仅能够增强他们的职业技能,还能在竞争激烈的职场中脱颖而出。 为什么呢? 把握AI时代的机遇 AI技…

StringMVC

目录 一,MVC定义 二,SpringMVC的基本使用 2.1建立连接 - RequestMapping("/...") ​编辑 2.2请求 1.传递单个参数 2.传递多个参数 3.传递对象 4.参数重命名 5.传递数组 6. 传递集合 7.传递JSON数据 8. 获取url中数据 9. 传递文…

怎么通过OpenAI API调用其多模态大模型(GPT-4o)

现在只要有额度,大家都可以调用OpenAI的多模态大模型了,例如GPT-4o和GPT-4 Turbo,我一年多前总结过一些OpenAI API的用法,发现现在稍微更新了一下。主要参考了这里:https://platform.openai.com/docs/guides/vision 其…

python数据类型之元组、集合和字典

目录 0.三者主要作用 1.元组 元组特点 创建元组 元组解包 可变和不可变元素元组 2.集合 集合特点 创建集合 集合元素要求 集合方法 访问与修改 子集和超集 相等性判断 集合运算 不可变集合 3.字典 字典特点 字典创建和常见操作 字典内置方法 pprin模块 0.…

k8s——Pod详解

一、Pod基础概念 1.1 Pod定义 Pod是kubernetes中最小的资源管理组件,Pod也是最小化运行容器化应用的资源对象。一个Pod代表着集群中运行的一个进程。kubernetes中其他大多数组件都是围绕着Pod来进行支撑和扩展Pod功能的,例如,用于管理Pod运行…

缪尔赛思又来到了你的面前(哈希)

定义一棵根节点为 1 1 1, n ( 2 ≤ n ≤ 1 0 3 ) n(2≤n≤10^3) n(2≤n≤103) 个节点的树的哈希值为: H ∑ i 1 n X i Y f a ( i ) m o d 998244353 H∑^n_{i1}X^iY^{fa(i)}\ mod\ 998244353 Hi1∑n​XiYfa(i) mod 998244353 f a ( i ) fa(i) fa(i)…

断网之后的页面,Autox.js是点击还是上下滑动比较好?

在处理断网之后的页面,选择点击还是上下滑动作为刷新操作,取决于应用的设计和用户界面。通常,这两种操作都可以作为刷新页面的方式,但它们各自有不同的适用场景: 点击刷新 - 适用场景:如果应用提供了一个明…

Java进阶学习笔记7——权限修饰符

什么是权限修饰符? 就是用来限制类中的成员(成员变量、成员方法、构造器、代码块....)能够被访问的范围。 protected使用的比较少,但是程序员还是要阅读代码,看官方文档是怎么写的,都会接触到protected修饰…

C#串口通信-串口相关参数介绍

串口通讯(Serial Communication),是指外设和计算机间,通过数据信号线、地线等,按位进行传输数据的一种双向通讯方式。 串口是一种接口标准,它规定了接口的电气标准,没有规定接口插件电缆以及使用的通信协议&#xff0c…

ssh 配置 authorized_keys 后无法免密登录

查看日志: tail -f /var/log/auth.log May 25 15:55:13 121 sudo: pam_unix(sudo:session): session opened for user root by root(uid0) May 25 15:55:13 121 sshd[550561]: Received signal 15; terminating. May 25 15:55:13 121 sshd[922866]: Server liste…

性能测试场景的设计方法

引用:根据2008年Aberdeen Group的研究报告,对于Web网站,1秒的页面加载延迟相当于少了11%的PV(page view),相当于降低了16%的顾客满意度。如果从金钱的角度计算,就意味着:如果一个网站…

「探讨」:什么是网络审计?好用的网络审计系统推荐【图文详解】

网络是企业运营、政府管理、个人生活不可或缺的基础设施。 然而网络安全问题却日益凸显,数据泄露、网络攻击、欺诈行为等风险日益严重。 一、网络审计的定义 网络审计,又称信息技术审计或电子审计,是指审计人员运用专业技能和工具&#xff…

fdk-aac将aac格式转为pcm数据

int sampleRate 44100; // 采样率int sampleSizeInBits 16; // 采样位数,通常是16int channels 2; // 通道数,单声道为1,立体声为2FILE *m_fd NULL;FILE *m_fd2 NULL;HANDLE_AACDECODER decoder aacDecoder_Open(TT_MP4_ADTS, 1);if (!…

实战之快速完成 ChatGLM3-6B 在 GPU-8G的 INT4 量化和本地部署

ChatGLM3 (ChatGLM3-6B) 项目地址 https://github.com/THUDM/ChatGLM3大模型是很吃CPU和显卡的,所以,要不有一个好的CPU,要不有一块好的显卡,显卡尽量13G,内存基本要32GB。 清华大模型分为三种(ChatGLM3-6B-Base&…