聊聊PowerJob的AliOssService

本文主要研究一下PowerJob的AliOssService

DFsService

tech/powerjob/server/extension/dfs/DFsService.java

public interface DFsService {/*** 存储文件* @param storeRequest 存储请求* @throws IOException 异常*/void store(StoreRequest storeRequest) throws IOException;/*** 下载文件* @param downloadRequest 文件下载请求* @throws IOException 异常*/void download(DownloadRequest downloadRequest) throws IOException;/*** 获取文件元信息* @param fileLocation 文件位置* @return 存在则返回文件元信息* @throws IOException 异常*/Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException;/*** 清理 powerjob 认为“过期”的文件* 部分存储系统自带生命周期管理(如阿里云OSS,则不需要单独实现该方法)* @param bucket bucket* @param days 天数,需要清理超过 X 天的文件*/default void cleanExpiredFiles(String bucket, int days) {}
}

DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法

AbstractDFsService

tech/powerjob/server/persistence/storage/AbstractDFsService.java

@Slf4j
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {protected ApplicationContext applicationContext;public AbstractDFsService() {log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());}abstract protected void init(ApplicationContext applicationContext);protected static final String PROPERTY_KEY = "oms.storage.dfs";protected static String fetchProperty(Environment environment, String dfsType, String key) {String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);return environment.getProperty(pKey);}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());init(applicationContext);}
}

AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init

AliOssService

tech/powerjob/server/persistence/storage/impl/AliOssService.java

@Slf4j
@Priority(value = Integer.MAX_VALUE - 1)
@Conditional(AliOssService.AliOssCondition.class)
public class AliOssService extends AbstractDFsService {private static final String TYPE_ALI_OSS = "alioss";private static final String KEY_ENDPOINT = "endpoint";private static final String KEY_BUCKET = "bucket";private static final String KEY_CREDENTIAL_TYPE = "credential_type";private static final String KEY_AK = "ak";private static final String KEY_SK = "sk";private static final String KEY_TOKEN = "token";private OSS oss;private String bucket;private static final int DOWNLOAD_PART_SIZE = 10240;private static final String NO_SUCH_KEY = "NoSuchKey";//......void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception {log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token);if (StringUtils.isEmpty(bucket)) {throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob");}this.bucket = bucket;CredentialsProvider credentialsProvider;CredentialType credentialType = CredentialType.parse(mode);switch (credentialType) {case PWD:credentialsProvider = new DefaultCredentialProvider(ak, sk, token);break;case SYSTEM_PROPERTY:credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();break;default:credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();}this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);log.info("[AliOssService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER.");}//......    
}    

AliOssService继承了AbstractDFsService

store

    @Overridepublic void store(StoreRequest storeRequest) throws IOException {ObjectMetadata objectMetadata = new ObjectMetadata();PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata);oss.putObject(putObjectRequest);}

store方法创建PutObjectRequest,使用oss.putObject进行上传

download

    @Overridepublic void download(DownloadRequest downloadRequest) throws IOException {FileLocation dfl = downloadRequest.getFileLocation();DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);try {FileUtils.forceMkdirParent(downloadRequest.getTarget());oss.downloadFile(downloadFileRequest);} catch (Throwable t) {ExceptionUtils.rethrow(t);}}

download方法则根据DownloadRequest指定的FileLocation创建DownloadFileRequest,然后通过oss.downloadFile(downloadFileRequest)进行下载

fetchFileMeta

    @Overridepublic Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {try {ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation));return Optional.ofNullable(objectMetadata).map(ossM -> {Map<String, Object> metaInfo = Maps.newHashMap();metaInfo.putAll(ossM.getRawMetadata());if (ossM.getUserMetadata() != null) {metaInfo.putAll(ossM.getUserMetadata());}return new FileMeta().setLastModifiedTime(ossM.getLastModified()).setLength(ossM.getContentLength()).setMetaInfo(metaInfo);});} catch (OSSException oe) {String errorCode = oe.getErrorCode();if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {return Optional.empty();}ExceptionUtils.rethrow(oe);}return Optional.empty();}

fetchFileMeta方法通过oss.getObjectMetadata获取ObjectMetadata

cleanExpiredFiles

    @Overridepublic void cleanExpiredFiles(String bucket, int days) {/*阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54*/}

cleanExpiredFiles则是空操作

init

    protected void init(ApplicationContext applicationContext) {Environment environment = applicationContext.getEnvironment();String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);try {initOssClient(endpoint, bkt, ct, ak, sk, token);} catch (Exception e) {ExceptionUtils.rethrow(e);}}

init则是通过environment获取相关属性,然后执行initOssClient

小结

DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法;AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init;AliOssService继承了AbstractDFsService,通过ossClient实现了store、download、fetchFileMeta方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/637065.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C - Monotonically Increasing

很妙的dfs&#xff1a;记录层数以及前一个数是多少。 代码&#xff1a; int ans[11]; int n,m;void dfs(int u,int pre){if(un1){for(int i1;i<n;i)cout<<ans[i]<< ;cout<<endl;return;}for(int ipre1;i(n-u)<m;i){ans[u]i;dfs(u1,i);} }void solve(…

Linux问题 apt-get install时 无法解析域名“cn.archive.ubuntu.com”

问题描述: 在安装程序时会出现无法解析域名的错误 解决办法: 1、编辑文件 sudo vim /etc/resolv.conf 2、在最后加上(按键 i 进入编辑模式) nameserver 8.8.8.8 3、保存退出(:wq)

Upload靶场通关教程(旧版20关)

文件上传类型&#xff1a; 前端验证&#xff1a;1 MIME类型验证&#xff1a;2 黑名单验证&#xff1a;3~10&#xff0c;19 大小写绕过、空格绕过、解析后缀数字绕过、点绕过、/绕过、::$DATA绕过 白名单验证&#xff1a;11~18&#xff0c;20 %00截断、二次渲染、文件包含、…

网络安全B模块(笔记详解)- Apache安全配置

1.打开服务器场景(A-Server),通过命令行清除防火墙规则。在服务器场景上查看apache版本,将查看到的服务版本字符串完整提交; 2.检测服务器场景中此版本apache是否存在显示banner信息漏洞,如果验证存在,修改配置文件将此漏洞进行加固,并重启Apache服务,将此加固项内容…

力扣每日一练(24-1-20)

大脑里的第一想法是排列组合&#xff0c;直接给出超级准确的最优解。 但不适用&#xff0c;hhh 只要连续的n个元素大于或者等于target就可以了 题目比自己想象的要好解决 解法是使用滑动窗口算法。这个算法的基本思想是维护一个窗口&#xff0c;使得窗口内的元素总和大于等于目…

代码随想录算法训练营第二十五天| 216.组合总和III、17.电话号码的字母组合

216.组合总和III 题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 解题思路&#xff1a;依旧是正常遍历&#xff0c;过程中记录遍历的所有节点之和&#xff0c;如果当前元素之和已经大于所给定的值&#xff0c;退回上一节点 ja…

算法常用思路总结

思路 1. 求数组中最大最小值思路代码 2. 计算阶乘思路&#xff1a;代码&#xff1a; 3. 得到数字的每一位思路代码 4. 计算时间类型5. 最大公约数、最小公倍数6. 循环数组的思想题目&#xff1a;猴子选大王代码 补充经典例题1. 复试四则运算题目内容题解 2. 数列求和题目内容题…

专升本-拓展部分-信息安全

信息安全&#xff1a; 1.信息本身的安全&#xff0c;也是信息安全的基本属性&#xff1a;保密性&#xff0c;完整性&#xff0c;可用性 信息本身的安全是指保证信息的保密性&#xff08;非授权用户不能访问信息&#xff09;&#xff0c;完整性&#xff08;信息正确&#xff0c…

Pytest 测试框架与Allure 测试报告——Allure2测试报告-L3

目录&#xff1a; allure2报告中添加附件-图片 Allure2报告中添加附件Allure2报告中添加附件&#xff08;图片&#xff09;应用场景Allure2报告中添加附件&#xff08;图片&#xff09;-Python代码示例&#xff1a;allure2报告中添加附件-日志 Allure2报告中添加附件&#xff…

Flink处理函数(3)—— 窗口处理函数

窗口处理函数包括&#xff1a;ProcessWindowFunction 和 ProcessAllWindowFunction 基础用法 stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction()) 这里的MyProcessWindowFunction就是ProcessWi…

《WebKit 技术内幕》之五(4): HTML解释器和DOM 模型

4 影子&#xff08;Shadow&#xff09;DOM 影子 DOM 是一个新东西&#xff0c;主要解决了一个文档中可能需要大量交互的多个 DOM 树建立和维护各自的功能边界的问题。 4.1 什么是影子 DOM 当开发这样一个用户界面的控件——这个控件可能由一些 HTML 的标签元素…

【前端学习笔记2】javaScript基础

是什么&#xff1a; 是一种运行在客户端&#xff08;服务器的编程语言&#xff09; javacript分为行内JavaScript&#xff0c;内部JavaScript&#xff0c;外部JavaScript 内部JavaScript 直接写在html中body里面 alert&#xff08;“hello&#xff0c;world”&#xff09;…

Flutter 入门

什么是Flutter Flutter 只是一个用来解决跨平台的UI框架&#xff0c;最终还是要使用原生平台进行绘制&#xff0c;对于大部分和系统API无关的页面都可以使用Flutter处理,但是有一些获取系统信息的页面比如某个页面获取Android是否打开了通知栏权限&#xff1f;获取手机电池电量…

React16源码: React中的IndeterminateComponent的源码实现

IndeterminateComponent 1 &#xff09;概述 这是一个比较特殊的component的类型&#xff0c; 就是还没有被指定类型的component在一个fibrer被创建的时候&#xff0c;它的tag可能会是 IndeterminateComponent在 packages/react-reconciler/src/ReactFiber.js 中&#xff0c;有…

Impala:基于内存的MPP查询引擎

Impala查询引擎 1、Impala概述 1、Impala概述 Impala是Cloudera公司主导研发的高性能、低延迟的交互式SQL查询引擎&#xff0c;它提供SQL语义&#xff0c;能查询存储在Hadoop的HDFS和HBase中的PB级大数据。Impala是CDH平台首选的PB级大数据实时交互式查询分析引擎 2015年11月&…

使用Sobel算子把视频转换为只剩边缘部分

效果展示 原始视频 修改后的视频 整体代码 import cv2vc cv2.VideoCapture(test.mp4)if vc.isOpened():open, frame vc.read() else:open Falsei 0 while open:ret, frame vc.read()if frame is None:breakif ret True:i 1# 转换为灰度图gray cv2.cvtColor(frame, cv…

实现分布式锁

背景 分布式锁是一种用于协调分布式系统中多个节点之间并发访问共享资源的机制。在分布式系统中&#xff0c;由于存在多个节点同时访问共享资源的可能性&#xff0c;需要使用分布式锁来保证数据的一致性和正确性。 今天要实现的是分布式场景中的互斥类型的锁。 下面时分布…

Tensorflow 入门基础——向LLM靠近一小步

进入tensflow的系统学习&#xff0c;向LLM靠拢。 目录 1. tensflow的数据类型1.1 数值类型1.2 字符串类型1.3 布尔类型的数据 2. 数值精度3. 类型转换3.1 待优化的张量 4 创建张量4.1 从数组、列表对象创建4.2 创建全0或者1张量4.3 创建自定义数值张量 5. 创建已知分布的张量&…

luceda ipkiss教程 56:画多端口螺旋线

案例分享&#xff1a;画多端口螺旋线 注&#xff1a;spiral的长度不是真实长度&#xff0c;具体可以参考教程28 代码如下&#xff1a; from si_fab import all as pdk import ipkiss3.all as i3 import numpy as np from scipy.constants import piclass SpiralCircular(i3.P…

linux perf工具使用

参考文章Linux性能调优之perf使用方法_perf交叉编译-CSDN博客 perf是一款Linux性能分析工具。比如打流性能优化的时候&#xff0c;就能够看到是哪些函数消耗的cpu高 那么linux如何编译perf工具呢&#xff1f; perf工具编译 进入perf目录下linux-3.16/tools/perf make ARCH…