聊聊PowerJob的HttpProcessor

本文主要研究一下PowerJob的HttpProcessor

BasicProcessor

tech/powerjob/worker/core/processor/sdk/BasicProcessor.java

public interface BasicProcessor {/*** 核心处理逻辑* 可通过 {@link TaskContext#getWorkflowContext()} 方法获取工作流上下文** @param context 任务上下文,可通过 jobParams 和 instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数* @return 处理结果,msg有长度限制,超长会被裁剪,不允许返回 null* @throws  Exception 异常,允许抛出异常,但不推荐,最好由业务开发者自己处理*/ProcessResult process(TaskContext context) throws Exception;
}

BasicProcessor是适用于单机运行的基础处理器,它定于了process方法

CommonBasicProcessor

tech/powerjob/official/processors/CommonBasicProcessor.java

@Slf4j
public abstract class CommonBasicProcessor implements BasicProcessor {@Overridepublic ProcessResult process(TaskContext ctx) throws Exception {OmsLogger omsLogger = ctx.getOmsLogger();String securityDKey = getSecurityDKey();if (SecurityUtils.disable(securityDKey)) {String msg = String.format("%s is not enabled, please set '-D%s=true' to enable it", this.getClass().getSimpleName(), securityDKey);omsLogger.warn(msg);return new ProcessResult(false, msg);}String status = "unknown";Stopwatch sw = Stopwatch.createStarted();omsLogger.info("using params: {}", CommonUtils.parseParams(ctx));try {ProcessResult result = process0(ctx);omsLogger.info("execute succeed, using {}, result: {}", sw, result);status = result.isSuccess() ? "succeed" : "failed";return result;} catch (Throwable t) {status = "exception";omsLogger.error("execute failed!", t);return new ProcessResult(false, ExceptionUtils.getMessage(t));} finally {log.info("{}|{}|{}|{}|{}", getClass().getSimpleName(), ctx.getJobId(), ctx.getInstanceId(), status, sw);}}protected abstract ProcessResult process0(TaskContext taskContext) throws Exception;protected String getSecurityDKey() {return null;}
}

CommonBasicProcessor是一个抽象类,它声明实现BasicProcessor接口,它定义了process0抽象方法,同时还提供了getSecurityDKey方法,其process方法先通过SecurityUtils.disable(securityDKey)先判断是否开启,没有开启直接返回;接着使用try catch来执行process0,最后finally的时候打印一下结果

HttpProcessor

public class HttpProcessor extends CommonBasicProcessor {/*** Default timeout is 60 seconds.*/private static final int DEFAULT_TIMEOUT = 60;private static final int HTTP_SUCCESS_CODE = 200;private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();@Overridepublic ProcessResult process0(TaskContext taskContext) throws Exception {OmsLogger omsLogger = taskContext.getOmsLogger();HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class);if (httpParams == null) {String message = "httpParams is null, please check jobParam configuration.";omsLogger.warn(message);return new ProcessResult(false, message);}if (StringUtils.isEmpty(httpParams.url)) {return new ProcessResult(false, "url can't be empty!");}if (!httpParams.url.startsWith("http")) {httpParams.url = "http://" + httpParams.url;}omsLogger.info("request url: {}", httpParams.url);// set default methodif (StringUtils.isEmpty(httpParams.method)) {httpParams.method = "GET";omsLogger.info("using default request method: GET");} else {httpParams.method = httpParams.method.toUpperCase();omsLogger.info("request method: {}", httpParams.method);}// set default mediaTypeif (!"GET".equals(httpParams.method)) {// set default request bodyif (StringUtils.isEmpty(httpParams.body)) {httpParams.body = new JSONObject().toJSONString();omsLogger.warn("try to use default request body:{}", httpParams.body);}if (JSONValidator.from(httpParams.body).validate() && StringUtils.isEmpty(httpParams.mediaType)) {httpParams.mediaType = "application/json";omsLogger.warn("try to use 'application/json' as media type");}}// set default timeoutif (httpParams.timeout == null) {httpParams.timeout = DEFAULT_TIMEOUT;}omsLogger.info("request timeout: {} seconds", httpParams.timeout);OkHttpClient client = getClient(httpParams.timeout);Request.Builder builder = new Request.Builder().url(httpParams.url);if (httpParams.headers != null) {httpParams.headers.forEach((k, v) -> {builder.addHeader(k, v);omsLogger.info("add header {}:{}", k, v);});}switch (httpParams.method) {case "PUT":case "DELETE":case "POST":MediaType mediaType = MediaType.parse(httpParams.mediaType);omsLogger.info("mediaType: {}", mediaType);RequestBody requestBody = RequestBody.create(mediaType, httpParams.body);builder.method(httpParams.method, requestBody);break;default:builder.get();}Response response = client.newCall(builder.build()).execute();omsLogger.info("response: {}", response);String msgBody = "";if (response.body() != null) {msgBody = response.body().string();}int responseCode = response.code();String res = String.format("code:%d, body:%s", responseCode, msgBody);boolean success = true;if (responseCode != HTTP_SUCCESS_CODE) {success = false;omsLogger.warn("{} url: {} failed, response code is {}, response body is {}",httpParams.method, httpParams.url, responseCode, msgBody);}return new ProcessResult(success, res);}//......
}    

HttpProcessor继承了CommonBasicProcessor,其process0方法使用CommonUtils.parseParams(taskContext)获取参数,解析为HttpParams类型,然后构建Request.Builder,针对PUT、DELETE、POST构建RequestBody,然后通过client.newCall(builder.build()).execute()执行请求获取返回结果,根据http status来判断是否success

CommonUtils.parseParams

tech/powerjob/official/processors/util/CommonUtils.java

public class CommonUtils {private CommonUtils() {}public static String parseParams(TaskContext context) {// 工作流中的总是优先使用 jobParamsif (context.getWorkflowContext().getWfInstanceId() != null) {return context.getJobParams();}if (StringUtils.isNotEmpty(context.getInstanceParams())) {return context.getInstanceParams();}return context.getJobParams();}
}

CommonUtils的parseParams方法针对使用工作流返回jobParams,否则优先返回instanceParams

HttpParams

    @Datapublic static class HttpParams {/*** POST / GET / PUT / DELETE*/private String method;/*** the request url*/private String url;/*** application/json* application/xml* image/png* image/jpeg* image/gif*/private String mediaType;private String body;private Map<String, String> headers;/*** timeout for complete calls*/private Integer timeout;}

HttpParams定义了method、url、mediaType、body、headers、timeout属性

getClient

    private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();private static OkHttpClient getClient(Integer timeout) {return CLIENT_STORE.computeIfAbsent(timeout, ignore -> new OkHttpClient.Builder().connectTimeout(Duration.ZERO).readTimeout(Duration.ZERO).writeTimeout(Duration.ZERO).callTimeout(timeout, TimeUnit.SECONDS).build());}

getClient将相同timeout的OkHttpClient进行了缓存

小结

PowerJob的HttpProcessor继承了CommonBasicProcessor,它接收HttpParams参数,然后使用okhttp进行请求;CommonBasicProcessor是一个抽象类,它声明实现BasicProcessor接口,它定义了process0抽象方法,同时还提供了getSecurityDKey方法,其process方法先通过SecurityUtils.disable(securityDKey)先判断是否开启,没有开启直接返回;接着使用try catch来执行process0,最后finally的时候打印一下结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/591649.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows CPU部署llama2量化模型并实现API接口

目录 模型部署本地运行llama2使用fastapi实现API接口常用git仓库 模型部署 从huggingface下载模型 https://huggingface.co/ 放在本地文件夹&#xff0c;如下 本地运行llama2 from ctransformers import AutoModelForCausalLMllm AutoModelForCausalLM.from_pretrained(&q…

请求转发和重定向的区别

当客户端向服务器发送一个请求时&#xff0c;服务器可以通过请求转发和重定向两种方式来处理请求。这两种方式有着不同的实现机制和应用场景。 请求转发&#xff08;Forward&#xff09;&#xff1a; 请求转发是指服务器接收到一个客户端的请求后&#xff0c;将该请求转发给另…

微软好听的tts语音包下载,粤语,韩语,日语

微软的 tts 语音库&#xff0c;都是离线的&#xff0c;所以速度非常快 但资源比较少&#xff0c;比如粤语&#xff0c;韩语&#xff0c;日语 我发现一个老牌语音技术供应商。 资源丰富&#xff0c;可自行下载免费或收费语音包。 网站&#xff1a;正版用户专用配套播音员下载…

gitee添加仓库人员

1.进入gitee项目&#xff0c;点击管理 2.点击仓库成员管理&#xff0c;展开仓库成员管理节点&#xff0c;选择所有或者开发者 3.点击添加仓库成员 4. 邀请用户

Superset二次开发之环境部署(Docker版)

目录结构: /data/superset ├── 3.x-build.sh – docker build 命令脚本 ├── 3.x-run.sh – docker run 命令脚本 ├── src …

1214:八皇后 深度优先搜索算法

1214&#xff1a;八皇后 时间限制: 1000 ms 内存限制: 65536 KB 提交数: 22901 通过数: 14116 【题目描述】 会下国际象棋的人都很清楚&#xff1a;皇后可以在横、竖、斜线上不限步数地吃掉其他棋子。如何将8个皇后放在棋盘上&#xff08;有8 8个方格&#xff09;&#xff0c…

深度生成模型之GAN的评估 ->(个人学习记录笔记)

文章目录 深度生成模型之GAN的评估图像翻译的应用1. 风格迁移2. 数据增强3. 经典图像任务4. 内容创作5. 人脸图像编辑6. 人体图像编辑 图像翻译模型1. 有监督图像翻译模型2. 无监督图像翻译模型3. 多域图像翻译模型 深度生成模型之GAN的评估 图像翻译的应用 1. 风格迁移 各类…

2024年腾讯云服务器租用价格表_优惠活动大全_实时更新

腾讯云服务器租用价格表&#xff1a;轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年&#xff0c;540元三年、2核4G5M带宽218元一年&#xff0c;2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月&#xff0c;云服务器CVM S5实例2核2G配置280.8元一年…

NA原理及配置

在IP地址空间中&#xff0c;a&#xff1b;b&#xff1b;c类地址中各有一部分地址&#xff0c;被称为私有IP地址&#xff08;私网地址&#xff09;&#xff0c;其余的为公有IP地址&#xff08;公网地址&#xff09; A&#xff1a;10.0.0.0 - 10.255.255.255 --- 相当于1条A类网段…

PyTorch官网demo解读——第一个神经网络(4)

上一篇&#xff1a;PyTorch官网demo解读——第一个神经网络&#xff08;3&#xff09;-CSDN博客 上一篇我们聊了手写数字识别神经网络的损失函数和梯度下降算法&#xff0c;这一篇我们来聊聊激活函数。 大佬说激活函数的作用是让神经网络产生非线性&#xff0c;类似人脑神经元…

十年磨一剑,花为缘享奢app打造行业的又一颠覆性创新

随着国内生活质量的提高&#xff0c;人们对于奢侈品的消费需求也在不断增长。消费者对于高品质、高价值的商品和服务的需求日益增长。2022年我国内地消费者奢侈品市场规模约为4700亿元&#xff0c;预计2023年我国内地消费者奢侈品消费预计将达到5500亿元&#xff0c;呈现出强劲…

旅游平台网页前后端

功能清单 游客功能 用户注册、登录登录权限拦截按名称搜索房间支付流程查看订单信息和状态评论预定过的房间&#xff0c;并自动修改订单状态查看统计剩余房间数量&#xff0c;数量为0时不可预定 管理员功能 房间分类管理 类型的删除、修改、查询&#xff08;准备添加增添功能…

Debezium日常分享系列之:Debezium 通知

Debezium日常分享系列之&#xff1a;Debezium 通知 一、概论二、Debezium通知格式三、Debezium 有关初始快照状态的通知四、Debezium 有关增量快照进度的通知五、启用 Debezium 通知六、访问 Debezium JMX 通知七、自定义通知渠道八、应用案例 一、概论 Debezium 通知提供了一…

Wnmp本地部署结合内网穿透实现任意浏览器远程访问本地服务

最近&#xff0c;我发现了一个超级强大的人工智能学习网站。它以通俗易懂的方式呈现复杂的概念&#xff0c;而且内容风趣幽默。我觉得它对大家可能会有所帮助&#xff0c;所以我在此分享。点击这里跳转到网站。 文章目录 前言1.Wnmp下载安装2.Wnmp设置3.安装cpolar内网穿透3.1…

测开基础概念

小王学习录 前言测试1. 什么是测试2. 测试和调试的区别3. 优秀测试人员应该具备的素质 需求1. 需求的定义2. 测试人员眼里的需求 测试用例1. 什么是测试用例(case)2. 什么是Bug 开发测试模型1. 软件的生命周期2. 瀑布模型3. 螺旋模型3. 增量和迭代4. 敏捷开发5. scrum6. 瀑布模…

chromium通信系统-ipcz系统(九)-ipcz系统代码实现-跨Node通信-代理和代理消除

chromium通信系统-ipcz系统(六)-ipcz系统代码实现-跨Node通信-基础通信 一文我们分析了跨Node的基础通信过程。 a进程和b进程通信的过程。 但在程序中a进程将自己打开的一对portal中的一个portal传递给了b进程。由于篇幅问题这个过程我们并没有分析&#xff0c;这篇文章我们就来…

如何使用甘特图进行项目管理?

或许你在工作中或项目启动会议上听说过“甘特图”一词&#xff0c;但对此了解不多。虽然这些图表可能变得相当复杂&#xff0c;但基础知识并不难掌握。通过本文&#xff0c;你将清楚地了解什么是甘特图、何时使用甘特图、创建甘特图的技巧等等。 什么是甘特图&#xff1f; 甘特…

第二十三章 反射(reflection)

一、反射机制&#xff08;重点&#xff09;&#xff08;P711&#xff09; 1. Java Reflection &#xff08;1&#xff09;反射机制允许程序在执行期借助 Reflection API 取得任何类的内部信息&#xff08;比如成员变量、构造器、成员方法等等&#xff09;&#xff0c;并能操作…

虚幻UE 材质-边界混合之PDO像素深度偏移量

2024年的第一天&#xff01;&#xff01;&#xff01;大家新年快乐&#xff01;&#xff01;&#xff01; 可能是长大了才知道 当你过得一般 你的亲朋好友对你真正态度只可能是没有表露出来的冷嘲热讽了 希望大家新的一年平安、幸福、 永远活力满满地追求自己所想做的、爱做的&…

Nginx(十四) 配置文件详解 - 负载均衡(超详细)

1. upstream Syntax: upstream name { ... } Default: — Context: http upstream块定义了一个上游服务器的集群&#xff0c;便于反向代理中的proxy_pass使用。 2. server Syntax: server address [parameters]; Default: — Context: upstream server指定一…