文章目录
- 前言
- 一、执行器注册&执行器任务状态回调触发时机:
- 二、服务端接收执行器和任务执行状态请求:
- 2.1 JobApiController:
- 2.2 AdminBiz 接口:
- 2.3 AdminBiz 接口实现类AdminBizImpl:
- 2.4 执行器注册:
- 2.5 执行器的移除:
- 总结
前言
在之前的章节中我们可以看到在 JobRegistryHelper 有执行器的注册和移除方法,JobCompleteHelper 有日志回调状态的更新,那么这些方法是何时被触发调用呢。
一、执行器注册&执行器任务状态回调触发时机:
- 执行器注册&执行器任务: 是在在执行器项目启动和停止时分别向,服务端发送注册和移除注册请求;
- 任务状态回调 是在执行器项目中由 TriggerCallbackThread类中的triggerCallbackThread 线程向服务端发送任务状态请求;
二、服务端接收执行器和任务执行状态请求:
2.1 JobApiController:
@Controller
@RequestMapping("/api")
public class JobApiController {@Resourceprivate AdminBiz adminBiz;/*** api** @param uri* @param data* @return*/@RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {// valid 请求方式校验if (!"POST".equalsIgnoreCase(request.getMethod())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}// 访问令牌校验if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingif ("callback".equals(uri)) {// 日志回调callback 方法List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if ("registry".equals(uri)) {// 执行器注册registryRegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);} else if ("registryRemove".equals(uri)) {// 执行器移除registryRemoveRegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}}
2.2 AdminBiz 接口:
通过AdminBiz 接口最终调用服务端的实现类对数据进行处理。
public interface AdminBiz {// ---------------------- callback ----------------------/*** callback** @param callbackParamList* @return*/public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);// ---------------------- registry ----------------------/*** registry** @param registryParam* @return*/public ReturnT<String> registry(RegistryParam registryParam);/*** registry remove** @param registryParam* @return*/public ReturnT<String> registryRemove(RegistryParam registryParam);// ---------------------- biz (custome) ----------------------// group、job ... manage}
2.3 AdminBiz 接口实现类AdminBizImpl:
@Service
public class AdminBizImpl implements AdminBiz {// 日志回调@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {return JobCompleteHelper.getInstance().callback(callbackParamList);}// 执行器注册@Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return JobRegistryHelper.getInstance().registry(registryParam);}// 执行器移除@Overridepublic ReturnT<String> registryRemove(RegistryParam registryParam) {return JobRegistryHelper.getInstance().registryRemove(registryParam);}}
2.4 执行器注册:
public ReturnT<String> registry(RegistryParam registryParam) {// valid 参数验证if (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async executeregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {/*** <update id="registryUpdate" >UPDATE xxl_job_registrySET `update_time` = #{updateTime}WHERE `registry_group` = #{registryGroup}AND `registry_key` = #{registryKey}AND `registry_value` = #{registryValue}</update>**/// 更新执行器注册地址int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {// 如果没有更新成功,说明是第一次注册,则进行插入/*** INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`)VALUES( #{registryGroup} , #{registryKey} , #{registryValue}, #{updateTime})**/XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}
2.5 执行器的移除:
public ReturnT<String> registryRemove(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async executeregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 移除执行器/*** <delete id="registryDelete" >DELETE FROM xxl_job_registryWHERE registry_group = #{registryGroup}AND registry_key = #{registryKey}AND registry_value = #{registryValue}</delete>**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret > 0) {// fresh 移除成功freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}
总结
本文对服务端项目执行器注册&执行器任务状态回调 调用时机进行介绍。