序
本文主要研究一下PowerJob的HttpProcessor
BasicProcessor
tech/powerjob/worker/core/processor/sdk/BasicProcessor.java
public interface BasicProcessor {/*** 核心处理逻辑* 可通过 {@link TaskContext#getWorkflowContext()} 方法获取工作流上下文** @param context 任务上下文,可通过 jobParams 和 instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数* @return 处理结果,msg有长度限制,超长会被裁剪,不允许返回 null* @throws Exception 异常,允许抛出异常,但不推荐,最好由业务开发者自己处理*/ProcessResult process(TaskContext context) throws Exception;
}
BasicProcessor是适用于单机运行的基础处理器,它定于了process方法
CommonBasicProcessor
tech/powerjob/official/processors/CommonBasicProcessor.java
@Slf4j
public abstract class CommonBasicProcessor implements BasicProcessor {@Overridepublic ProcessResult process(TaskContext ctx) throws Exception {OmsLogger omsLogger = ctx.getOmsLogger();String securityDKey = getSecurityDKey();if (SecurityUtils.disable(securityDKey)) {String msg = String.format("%s is not enabled, please set '-D%s=true' to enable it", this.getClass().getSimpleName(), securityDKey);omsLogger.warn(msg);return new ProcessResult(false, msg);}String status = "unknown";Stopwatch sw = Stopwatch.createStarted();omsLogger.info("using params: {}", CommonUtils.parseParams(ctx));try {ProcessResult result = process0(ctx);omsLogger.info("execute succeed, using {}, result: {}", sw, result);status = result.isSuccess() ? "succeed" : "failed";return result;} catch (Throwable t) {status = "exception";omsLogger.error("execute failed!", t);return new ProcessResult(false, ExceptionUtils.getMessage(t));} finally {log.info("{}|{}|{}|{}|{}", getClass().getSimpleName(), ctx.getJobId(), ctx.getInstanceId(), status, sw);}}protected abstract ProcessResult process0(TaskContext taskContext) throws Exception;protected String getSecurityDKey() {return null;}
}
CommonBasicProcessor是一个抽象类,它声明实现BasicProcessor接口,它定义了process0抽象方法,同时还提供了getSecurityDKey方法,其process方法先通过SecurityUtils.disable(securityDKey)先判断是否开启,没有开启直接返回;接着使用try catch来执行process0,最后finally的时候打印一下结果
HttpProcessor
public class HttpProcessor extends CommonBasicProcessor {/*** Default timeout is 60 seconds.*/private static final int DEFAULT_TIMEOUT = 60;private static final int HTTP_SUCCESS_CODE = 200;private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();@Overridepublic ProcessResult process0(TaskContext taskContext) throws Exception {OmsLogger omsLogger = taskContext.getOmsLogger();HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class);if (httpParams == null) {String message = "httpParams is null, please check jobParam configuration.";omsLogger.warn(message);return new ProcessResult(false, message);}if (StringUtils.isEmpty(httpParams.url)) {return new ProcessResult(false, "url can't be empty!");}if (!httpParams.url.startsWith("http")) {httpParams.url = "http://" + httpParams.url;}omsLogger.info("request url: {}", httpParams.url);// set default methodif (StringUtils.isEmpty(httpParams.method)) {httpParams.method = "GET";omsLogger.info("using default request method: GET");} else {httpParams.method = httpParams.method.toUpperCase();omsLogger.info("request method: {}", httpParams.method);}// set default mediaTypeif (!"GET".equals(httpParams.method)) {// set default request bodyif (StringUtils.isEmpty(httpParams.body)) {httpParams.body = new JSONObject().toJSONString();omsLogger.warn("try to use default request body:{}", httpParams.body);}if (JSONValidator.from(httpParams.body).validate() && StringUtils.isEmpty(httpParams.mediaType)) {httpParams.mediaType = "application/json";omsLogger.warn("try to use 'application/json' as media type");}}// set default timeoutif (httpParams.timeout == null) {httpParams.timeout = DEFAULT_TIMEOUT;}omsLogger.info("request timeout: {} seconds", httpParams.timeout);OkHttpClient client = getClient(httpParams.timeout);Request.Builder builder = new Request.Builder().url(httpParams.url);if (httpParams.headers != null) {httpParams.headers.forEach((k, v) -> {builder.addHeader(k, v);omsLogger.info("add header {}:{}", k, v);});}switch (httpParams.method) {case "PUT":case "DELETE":case "POST":MediaType mediaType = MediaType.parse(httpParams.mediaType);omsLogger.info("mediaType: {}", mediaType);RequestBody requestBody = RequestBody.create(mediaType, httpParams.body);builder.method(httpParams.method, requestBody);break;default:builder.get();}Response response = client.newCall(builder.build()).execute();omsLogger.info("response: {}", response);String msgBody = "";if (response.body() != null) {msgBody = response.body().string();}int responseCode = response.code();String res = String.format("code:%d, body:%s", responseCode, msgBody);boolean success = true;if (responseCode != HTTP_SUCCESS_CODE) {success = false;omsLogger.warn("{} url: {} failed, response code is {}, response body is {}",httpParams.method, httpParams.url, responseCode, msgBody);}return new ProcessResult(success, res);}//......
}
HttpProcessor继承了CommonBasicProcessor,其process0方法使用CommonUtils.parseParams(taskContext)获取参数,解析为HttpParams类型,然后构建Request.Builder,针对PUT、DELETE、POST构建RequestBody,然后通过client.newCall(builder.build()).execute()执行请求获取返回结果,根据http status来判断是否success
CommonUtils.parseParams
tech/powerjob/official/processors/util/CommonUtils.java
public class CommonUtils {private CommonUtils() {}public static String parseParams(TaskContext context) {// 工作流中的总是优先使用 jobParamsif (context.getWorkflowContext().getWfInstanceId() != null) {return context.getJobParams();}if (StringUtils.isNotEmpty(context.getInstanceParams())) {return context.getInstanceParams();}return context.getJobParams();}
}
CommonUtils的parseParams方法针对使用工作流返回jobParams,否则优先返回instanceParams
HttpParams
@Datapublic static class HttpParams {/*** POST / GET / PUT / DELETE*/private String method;/*** the request url*/private String url;/*** application/json* application/xml* image/png* image/jpeg* image/gif*/private String mediaType;private String body;private Map<String, String> headers;/*** timeout for complete calls*/private Integer timeout;}
HttpParams定义了method、url、mediaType、body、headers、timeout属性
getClient
private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();private static OkHttpClient getClient(Integer timeout) {return CLIENT_STORE.computeIfAbsent(timeout, ignore -> new OkHttpClient.Builder().connectTimeout(Duration.ZERO).readTimeout(Duration.ZERO).writeTimeout(Duration.ZERO).callTimeout(timeout, TimeUnit.SECONDS).build());}
getClient将相同timeout的OkHttpClient进行了缓存
小结
PowerJob的HttpProcessor继承了CommonBasicProcessor,它接收HttpParams参数,然后使用okhttp进行请求;CommonBasicProcessor是一个抽象类,它声明实现BasicProcessor接口,它定义了process0抽象方法,同时还提供了getSecurityDKey方法,其process方法先通过SecurityUtils.disable(securityDKey)先判断是否开启,没有开启直接返回;接着使用try catch来执行process0,最后finally的时候打印一下结果。