【AGC】云存储服务端使用方法

 【集成准备】

1、Python环境配置

下载Python和PyCharm并安装。

cke_125677.png

使用安装的python本身作为解释器。

cke_134497.png

安装AGC Python SDK。

cke_139799.png​云存储包安装完成。

2、AGC环境配置

在AGC创建项目和应用

cke_148779.png

开通云存储服务。

返回项目设置界面,选择Server SDK 页签,在认证凭据处点击创建按钮,然后下载认证凭据。

cke_154152.png

将认证凭据导入到项目中

【布局设计】

本次测试的Demo是一个Python服务,所以没有界面UI。

【功能实现】

引入AGC与云存储模块

from agconnect.cloud_storage import AGCCloudStorageExceptionfrom agconnect.cloud_storage import GetFilesOptionsfrom agconnect.cloud_storage import Metadatafrom agconnect.cloud_storage import StorageManagementfrom agconnect.common_server import AGCClient, CredentialParser, logger

将下载的凭据文件放入项目中,调用AGCClient.initialize方法初始化AGCClient实例

将配置开发环境中获取的认证凭据放置到自定义的目录,通过initialize方法初始化对应认证凭据的AGCClient实例。

通过StorageManagement()来初始化存储实例。

bucket_name = 'cloudstoragepython-kyuv2'file_name = 'test.txt' # for example: "test.txt"config_path = os.path.normpath(os.path.join(os.path.dirname(__file__), 'agc-apiclient-1157238089186298880-7233693580609187129.json'))credential = CredentialParser.to_credential(config_path)AGCClient.initialize(credential=credential)storage = StorageManagement()bucket = storage.bucket(bucket_name)

上传文件

使用bucket.upload方法将文件上传到云端。

def upload_file(bucket):config_path = os.path.normpath(os.path.join(os.path.dirname(__file__), 'C:\Users\kwx1075489\Desktop\agcserversdk-python-1.3.0.300\test.txt'))if not os.path.exists(config_path):logger.error("file does not exist")loop = asyncio.new_event_loop()try:asyncio.set_event_loop(loop)result = loop.run_until_complete(bucket.upload(path_str=config_path))logger.info(f"Upload file response: {result}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the upload file process: {e}")finally:loop.close()

调用file.get_metadata方法获取设置在云端的元数据

def get_file_metadata(bucket, file_name):file = bucket.file(file_name)loop = asyncio.new_event_loop()try:asyncio.set_event_loop(loop)result = loop.run_until_complete(file.get_metadata())res_json = loop.run_until_complete(result.json(content_type='text/plain'))logger.info(f"Get file metadata: {res_json}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the get file metadata: {e}")finally:loop.close()

调用file.set_metadata方法将文件属性的元数据和自定义的元数据覆盖到云端。

def update_file_metadata(bucket, file_name):file = bucket.file(file_name)loop = asyncio.new_event_loop()try:asyncio.set_event_loop(loop)get_result = loop.run_until_complete(file.get_metadata())get_result_text = loop.run_until_complete(get_result.text())get_result_json = json.loads(get_result_text)metadata = Metadata(content_language='en-US', custom_metadata={'test': 'test'},content_type=get_result_json.get('contentType'))set_result = loop.run_until_complete(file.set_metadata(metadata))set_result_json = loop.run_until_complete(set_result.json(content_type='text/plain'))logger.info(f"Update file metadata response: {set_result_json}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the update file metadata: {e}")finally:loop.close()

调用file.download方法将云端文件数据写入本地文件中。

def download_file(bucket, file_name):def progress_callback(progress: Dict[str, int] = None):logger.info(f"Downloaded {progress['writtenBytes']} bytes out of {progress['totalBytes']}")loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)try:local_file = os.path.normpath(os.path.join(os.path.dirname(__file__), ' C:\Users\kwx1075489\Desktop'))if not os.path.exists(local_file):logger.error("file does not exist")remote_file = bucket.file(file_name)text, resp = loop.run_until_complete(remote_file.download(local_file, on_download_progress=progress_callback))logger.info(f"Download file response: {resp}")assert text == "File downloaded successfully. "except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the download file process: {e}")finally:loop.close()

调用file.download方法获取当前目录下的文件与子目录。

def get_files(bucket):loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)options = GetFilesOptions(delimiter="/")try:result = loop.run_until_complete(bucket.get_files(options=options))logger.info(f"Get file list response: {result}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the get file list: {e}")finally:loop.close()

调用file.delete方法删除云端文件。

def get_files(bucket):loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)options = GetFilesOptions(delimiter="/")try:result = loop.run_until_complete(bucket.get_files(options=options))logger.info(f"Get file list response: {result}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the get file list: {e}")finally:loop.close()

【功能测试】

执行python main.py命令,服务依次执行:

上传“test.txt”文件到云端:

cke_165928.png

从云端下载“test.txt”文件到本地目录:

从云端删除“cloudstoragepython-kyuv2”存储区的“test.txt”文件:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/162666.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

双系统Ubuntu-22.04.3安装编译kaldi

Ubuntu物理内存要求85-100G以上,运行内存5-6G以上(如果第一次安装的Ubuntu物理内存不够,请勿进行扩容,扩容易出现黑屏、蓝屏、死机的情况,应该卸载Ubuntu重新安装,在安装过程中进行内存分配;运行…

4.22每日一题(累次积分的计算:交换次序)

注:因为 是积不出的函数,所以先不用算,最后发现,出现dx与dy可以相互抵消,即可算出答案

为企业解决设备全生命周期需求,凌雄科技凸显DaaS增长价值

企业成长离不开投资,但毫无疑问的是,投资最有价值的部分在业务。相比之下,诸如办公设备之类的固定资产投资,很容易变成企业现金流的吞噬者。从购买、运维到保养、折旧、回收,现代企业在越来越大的办公设备规模面前&…

工具 | docker删除不使用的容器

工具 | docker删除不使用的容器 Docker 清理命令

数据库|TiDB v7.1.0 资源管控功能是如何降低运维难度和成本

目录 一、前言 二、资源管控流程图 三、资源管控 (Resource Control)测试 1)测试集群环境 2)Request Unit (RU) 概念 3)资源管控参数 4)评估实际负载所需容量 4.1 根据实际负载估算容量 方法一 or: 方法二 4.2 基于硬件…

鸿蒙原生应用/元服务开发-AGC分发如何配置签名信息

使用制作的私钥(.p12)文件、在AGC申请的证书文件和Profile(.p7b)文件,在DevEco Studio配置工程的签名信息,以构建携带发布签名信息的APP。 1.打开DevEco Studio,菜单选择“File > Project S…

va-Q-tec实现温度敏感产品运输过程质量控制温控无忧

摘要:温度敏感产品运输对供应链全流程的温度质量要求较高,往往需要借助特殊的温湿度监测技术产品。va-Q-tec与虹科Comet合作,采用虹科Comet的U系列温度记录仪,为集装箱运输过程提供完整的温控包装解决方案。 一、客户背景 va-Q-…

钴电解液中净化除镍除铜树脂

#钴电解液中净化除镍除铜树脂 钴是生产各种合金及电池不可或缺的原材料,钴资源供给主要来自于大型铜矿和镍矿的伴生开采,钴的主要应用领域为动力电池、3C消费电池、各种耐热合金、硬质合金、防腐合金、磁性合金及各种钴盐等。其中,电池领域是…

【洛谷算法题】P5714-肥胖问题【入门2分支结构】

👨‍💻博客主页:花无缺 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5714-肥胖问题【入门2分支结构】🌏题目描述🌏输入格式&a…

给项目快速接入链路追踪

为什么需要链路追踪? 我们程序员在日常工作中,最常做事情之一就是修bug了。如果程序只是运行在单机上,我们最常用的方式就是在程序上打日志,然后程序运行的过程中将日志输出到文件上,然后我们根据日志去推断程序是哪一…

redis-cluster集群

1.redis-cluster集群 redis3.0引入的分布式存储方案 集群由多个node节点组成,redis数据分布在这些节点之中。 在集群之中分为主节点和从节点 集群模式当中,主从一一对应,数据写入和读取与主从模式一样,主负责写,从…

网页小游戏的开发流程

网页小游戏的开发流程可以分为几个关键步骤。这只是一个一般性的流程概述,具体的步骤可能会根据项目的规模和要求而有所不同。此外,还要考虑法律和版权问题,确保你的游戏开发过程是合法的。下面是一个简要的概述,希望对大家有所帮…

Android WorldWind加载shapefile格式文件形成三维效果

目录 1 前言2 实现思路3 绘制Polygons4 读取shapefile文件5 加载立体模型6 问题1 前言 在项目中有时会加载shapefile格式的数据,要形成三维立体效果。但是查看worldwind NASA官网,在worldwind android的使用教程中并没用加载shapefile格式的教程,然后源码中也没有开发加载s…

单片机调试技巧--栈回溯

在启动文件中修改 IMPORT rt_hw_hard_fault_exceptionEXPORT HardFault_Handler HardFault_Handler PROC; get current contextTST lr, #0x04 ; if(!EXC_RETURN[2])ITE EQMRSEQ r0, msp ; [2]0 > Z1, get fault context from h…

java--static修饰成员变量

1.static 叫静态,可以修饰成员变量、成员方法。 2.成员变量按照有无static修饰,分为两种: ①类变量:有static修饰,属于类,在计算机里只有一份,会被类的全部对象共享(不管那个类调用的&#x…

CyberRT-共享内存实现

CyberRT共享内存类图 共享内存消息发布 数据用共享内存发布时,首先会创建ShmTransmitter对象,包含两个主要成员segment和notifier,Segment用于创建共享内存(上面绿色部分),Notifer 最终构建ReadableInfo通…

Navicat 技术指引 | 适用于 GaussDB 的自动运行功能

Navicat Premium(16.2.8 Windows版或以上) 已支持对 GaussDB 主备版的管理和开发功能。它不仅具备轻松、便捷的可视化数据查看和编辑功能,还提供强大的高阶功能(如模型、结构同步、协同合作、数据迁移等),这…

tp8 使用rabbitMQ

php8.0 使用 rabbitmq 要使用 3.6版本以上的&#xff0c; 并且还要开启 php.ini中的 socket 扩展 php think make:command SimpleMQProduce //创建一个生产者命令行 php think make:command SimpleMQConsumer //创建一个消费者命令行 生产者代码 <?php declare (strict_ty…

一个ETL流程搞定数据脱敏

数据脱敏是什么&#xff1f; 数据脱敏是指在数据处理过程中&#xff0c;通过一系列的技术手段去除或者替换敏感信息&#xff0c;以保护个人隐私和敏感信息的安全的过程。数据脱敏通常在数据共享、数据分析和软件测试等场景下使用&#xff0c;它旨在降低数据泄露和滥用的风险。…

运动戴什么耳机好?运动无线耳机哪个品牌比较好?运动耳机推荐

​如果你是一名户外运动爱好者&#xff0c;那么一款高品质的运动耳机是必不可少的。它们具备好音质、高稳固性舒适度、防尘防水等多项防护功能&#xff0c;让你在恶劣的天气条件下也能保持音乐的陪伴。面对市面上越来越多的运动耳机&#xff0c;到底哪款更值得入手&#xff1f;…