文章目录
- 前言
- 一、JobTriggerPoolHelper 作用:
- 二、JobTriggerPoolHelper 源码介绍:
- 2.1. start() 方法:
- 2.2 任务触发:
- 2.3 XxlJobTrigger.trigger 任务执行:
- 2.4 processTrigger 任务的执行:
- 2.5 runExecutor 任务的执行:
- 总结
前言
本文对 JobTriggerPoolHelper 的工作进行介绍;
一、JobTriggerPoolHelper 作用:
JobTriggerPoolHelper 创建两个线程池ThreadPoolExecutor,当任务被触发需要执行时,ThreadPoolExecutor 负责进行执行任务;
二、JobTriggerPoolHelper 源码介绍:
2.1. start() 方法:
// 创建日志对象private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);// ---------------------- trigger pool ----------------------// fast/slow thread pool 快慢线程池创建 区别是线程池最大线程的数量不一样
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;public void start(){// 快速线程池 的最大线程数量 xxl.job.triggerpool.fast.max 默认值 200fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});// 慢速线程池 的最大线程数量 xxl.job.triggerpool.slow.max 默认值 100slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});
}
// 项目停止 关闭线程池释放资源
public void stop() {//triggerPool.shutdown();fastTriggerPool.shutdownNow();slowTriggerPool.shutdownNow();logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
2.2 任务触发:
/**
* add trigger 执行任务
*/
public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread pool// 快慢线程池 的选择 1分钟 内同一个任务(jobId 相同),超过10次 执行的时间超过 500ms 就放入到慢线程池中执行ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// trigger 线程池 任务的执行triggerPool_.execute(new Runnable() {@Overridepublic void run() {// 记录开始时间,方便统计 本次job 执行的耗时 long start = System.currentTimeMillis();try {// do trigger 执行任务XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-maplong minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {// 如果不是同一分钟,则重新开始计数minTim = minTim_now;jobTimeoutCountMap.clear();}// incr timeout-count-maplong cost = System.currentTimeMillis()-start;if (cost > 500) { // ob-timeout threshold 500ms// 大于 500ms 任务超时 进行计数AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}});
}// ---------------------- helper ----------------------private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();public static void toStart() {helper.start();
}
public static void toStop() {helper.stop();
}/**
* @param jobId
* @param triggerType
* @param failRetryCount
* >=0: use this param
* <0: use param from job info config
* @param executorShardingParam
* @param executorParam
* null: use job param
* not null: cover job param
* 触发任务的执行 调用本类中的 addTrigger
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
2.3 XxlJobTrigger.trigger 任务执行:
/*** trigger job** @param jobId* @param triggerType* @param failRetryCount* >=0: use this param* <0: use param from job info config* @param executorShardingParam* @param executorParam* null: use job param* not null: cover job param* @param addressList* null: use executor addressList* not null: cover*/
public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load data// 根据job id 从 xxl_job_info 获取job /*** SELECT <include refid="Base_Column_List" />FROM xxl_job_info AS tWHERE t.id = #{id}**/XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}// 执行器,任务参数if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}// 改任务设置的 任务执行失败次数,如果到时间正常触发 则failRetryCount 为-1 ;// 如果是失败的补充,则为改任务剩余补偿次数// 到时间第一次触发任务 则获取xxl_job_info executor_fail_retry_count 设置的重试次数int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();// 获取改任务对应的执行器信息/***SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.id = #{id}**/XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList// 执行器地址覆盖,如果重新传入了执行器地址 则进行覆盖;否则取 XxlJobGroup 的执行器地址if (addressList!=null && addressList.trim().length()>0) {// 执行器地址类型:0=自动注册、1=手动录入group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding param// 执行器任务分片参数 格式如 1/2int[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}// 判断路由策略if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {// 如果是: 分片广播,则需要为注册的每个执行器都 发送任务for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}// 非分片广播 ,则直接执行任务processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}
2.4 processTrigger 任务的执行:
/**
* @param group job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount* @param triggerType* @param index sharding index* @param total sharding index*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// param// 获取阻塞处理策略 ,如果从任务中的 阻塞处理策略 不在ExecutorBlockStrategyEnum 枚举中,则默认为 单机率行// "SERIAL EXECUTION":单机率行; "DISCARD LATER":云弃后续调度;"COVER EARLY":獲盖之前调度ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy// 获取任务路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-id 记录log 日志XxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-param 创建任务执行器的 triggerParam 对象TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address 根据任务的路由策略,从执行器地址中,选择出一个执行器地址String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executor 获取到执行器地址,runExecutor 执行任务ReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("("+shardingParam+")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");// 6、save log trigger-info 更新日志信息jobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();jobLog.setTriggerCode(triggerResult.getCode());jobLog.setTriggerMsg(triggerMsgSb.toString());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
2.5 runExecutor 任务的执行:
/**
* run executor* @param triggerParam* @param address* @return*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {// 根据执行器的地址,获取包装执行器的接口对象ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);// 远程rpc 调用 执行器端,进行任务的执行runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}// 返沪执行的信息StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;
}
executorBiz.run 任务的执行:
@Override
public ReturnT<String> run(TriggerParam triggerParam) {return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
总结
本文对 JobTriggerPoolHelper 的工作内容进行介绍。