分布式任务调度平台xxl-job源码学习

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

官网:https://www.xuxueli.com/xxl-job/

github:https://github.com/xuxueli/xxl-job/

调度中心

主要代码在 xxl-job-admin ,部分代码在 xxl-job-core 。

后台任务

后台任务启动入口在 com.xxl.job.admin.core.conf.XxlJobAdminConfig#afterPropertiesSet ,此方法是实现 InitializingBean 接口的,在 Bean 属性设置完成后被调用。

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private static XxlJobAdminConfig adminConfig = null;public static XxlJobAdminConfig getAdminConfig() {return adminConfig;}// ---------------------- XxlJobScheduler ----------------------private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();// Bean 设置完属性后初始化 xxlJobScheduler ,包含后台任务启动xxlJobScheduler.init();}@Overridepublic void destroy() throws Exception {// Bean 销毁后关闭后台任务xxlJobScheduler.destroy();}... ...
}
public class XxlJobScheduler  {private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);public void init() throws Exception {// init i18ninitI18n();// admin trigger pool startJobTriggerPoolHelper.toStart();// admin registry monitor runJobRegistryHelper.getInstance().start();// admin fail-monitor runJobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper )JobCompleteHelper.getInstance().start();// admin log report startJobLogReportHelper.getInstance().start();// start-schedule  ( depend on JobTriggerPoolHelper )JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}......
}

trigger 线程池初始化(JobTriggerPoolHelper)

调度线程池隔离,拆分为"Fast"和"Slow"两个线程池,1分钟窗口期内任务耗时达500ms超过10次,该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性。

    private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();public static void toStart() {helper.start();}// 有快慢两个线程池private ThreadPoolExecutor fastTriggerPool = null;private ThreadPoolExecutor slowTriggerPool = null;// 初始化线程池public void start(){fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});}private volatile long minTim = System.currentTimeMillis()/60000;     // ms > minprivate volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread poolThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);// 1分钟内如果任务触发时间超过 0.5 秒的次数大于10,就使用慢线程池,否则使用快线程池if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// triggertriggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// 1分钟内如果任务触发时间超过 0.5 秒,就累加次数// check timeout-count-maplong minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}// incr timeout-count-maplong cost = System.currentTimeMillis()-start;if (cost > 500) {       // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}});

执行器注册处理任务(JobRegistryHelper)

任务注册, 任务自动发现:

自v1.5版本之后, 任务取消了"任务执行机器"属性, 改为通过任务注册和自动发现的方式, 动态获取远程执行器地址并执行。

AppName: 每个执行器机器集群的唯一标示, 任务注册以 "执行器" 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;注册表: 见"xxl_job_registry"表, "执行器" 在进行任务注册时将会周期性维护一条注册记录,即机器地址和AppName的绑定关系; "调度中心" 从而可以动态感知每个AppName在线的机器列表;执行器注册: 任务注册Beat周期默认30s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间为三倍Beat; 执行器注册摘除:执行器销毁时,将会主动上报调度中心并摘除对应的执行器机器信息,提高心跳注册的实时性;为保证系统"轻量级"并且降低学习部署成本,没有采用Zookeeper作为注册中心,采用DB方式进行任务注册发现。
public void start(){// 执行器注册及删除线程池,// 执行器每30秒周期调用调度中心 REST 接口注册,// 执行器停止运行前调用调度中心 REST 接口删除,// 这两个接口都是此线程池中异步处理// for registry or removeregistryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});// for monitorregistryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// 查询 xxl_job_group 表中自动注册(address_type=0)的记录,xxl_job_group 记录在页面新增执行器或者数据库insert// auto registry groupList<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// 查询超过 3 个上报周期(90秒)没有上报注册信息的执行器节点信息,并将其从 xxl_job_registry 表中删除// remove dead address (admin/executor)List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// 查询周期正常上报注册信息的执行器节点,根据上报的appname将其分到不同的执行器中// fresh online address (admin/executor)HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {String appname = item.getRegistryKey();List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}appAddressMap.put(appname, registryList);}}}// 将正常的执行器节点信息刷新到 xxl_job_group 表中// fresh group addressfor (XxlJobGroup group: groupList) {List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}group.setAddressList(addressListStr);group.setUpdateTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}// 处理一次后等待 30 秒再进行下一次处理try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}});registryMonitorThread.setDaemon(true);registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");registryMonitorThread.start();}

对执行失败的任务处理(JobFailMonitorHelper)

  1. 查询执行失败的任务。
  2. 对需要重试的任务进行重试。
  3. 发送告警通知,当前只支持邮件通知,可以扩展新的通知方式,实现 JobAlarm 接口并注册为Spring的Bean即可。
public void start(){monitorThread = new Thread(new Runnable() {@Overridepublic void run() {// monitorwhile (!toStop) {try {// 查询 alarm_status = 0 的记录List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);if (failLogIds!=null && !failLogIds.isEmpty()) {for (long failLogId: failLogIds) {// 将 alarm_status 设置为 -1,表示正在处理,调度中心如果是集群部署,其它调用中心节点就不会处理该记录// lock logint lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);if (lockRet < 1) {continue;}XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());// 对需要重试的任务进行重试// 1、fail retry monitorif (log.getExecutorFailRetryCount() > 0) {JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";log.setTriggerMsg(log.getTriggerMsg() + retryMsg);XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);}// 2、fail alarm monitorint newAlarmStatus = 0;		// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败if (info != null) {// 发送告警通知boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);newAlarmStatus = alarmResult?2:3;} else {newAlarmStatus = 1;}XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);}}try {TimeUnit.SECONDS.sleep(10);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");}});monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobFailMonitorHelper");monitorThread.start();}
    @Overridepublic void afterPropertiesSet() throws Exception {// 查找所有实现 JobAlarm 接口的 Bean,当前只有 EmailJobAlarm,如果要支持新的通知方式,实现 JobAlarm 接口并注册为Spring的Bean即可Map<String, JobAlarm> serviceBeanMap = applicationContext.getBeansOfType(JobAlarm.class);if (serviceBeanMap != null && serviceBeanMap.size() > 0) {jobAlarmList = new ArrayList<JobAlarm>(serviceBeanMap.values());}}/*** job alarm** @param info* @param jobLog* @return*/public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {boolean result = false;// 遍历所有通知方式发送告警通知if (jobAlarmList!=null && jobAlarmList.size()>0) {result = true;  // success means all-successfor (JobAlarm alarm: jobAlarmList) {boolean resultItem = false;try {resultItem = alarm.doAlarm(info, jobLog);} catch (Exception e) {logger.error(e.getMessage(), e);}if (!resultItem) {result = false;}}}return result;}

执行器掉线导致任务一直在运行中处理(JobCompleteHelper)

调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败

public void start(){// 任务回调处理线程池初始化,执行器执行完任务后调用调度中心 REST 接口(com.xxl.job.admin.controller.JobApiController#api)// 通知任务执行结果,接口在此线程池异步处理// for callbackcallbackThreadPool = new ThreadPoolExecutor(2,20,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");}});// for monitormonitorThread = new Thread(new Runnable() {@Overridepublic void run() {// wait for JobTriggerPoolHelper-inittry {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}// monitorwhile (!toStop) {try {// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;Date losedTime = DateUtil.addMinutes(new Date(), -10);List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);if (losedJobIds!=null && losedJobIds.size()>0) {for (Long logId: losedJobIds) {XxlJobLog jobLog = new XxlJobLog();jobLog.setId(logId);jobLog.setHandleTime(new Date());jobLog.setHandleCode(ReturnT.FAIL_CODE);jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );XxlJobCompleter.updateHandleInfoAndFinish(jobLog);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);}}try {TimeUnit.SECONDS.sleep(60);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");}});monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");monitorThread.start();}

日志统计及清理(JobLogReportHelper)

  1. 统计 3 天内日志,统计出每天的任务运行数量、任务执行成功数量、任务执行失败数量。
  2. 清理日志,默认清理 30 天前的日志,每天清理一次。
public void start(){logrThread = new Thread(new Runnable() {@Overridepublic void run() {// last clean log timelong lastCleanLogTime = 0;while (!toStop) {// 1、log-report refresh: refresh log report in 3 daystry {// 统计今天、昨天、前天的运行日志,有可能前两天的任务今天还在执行,更新执行成功和失败的数量for (int i = 0; i < 3; i++) {// todayCalendar itemDay = Calendar.getInstance();itemDay.add(Calendar.DAY_OF_MONTH, -i);itemDay.set(Calendar.HOUR_OF_DAY, 0);itemDay.set(Calendar.MINUTE, 0);itemDay.set(Calendar.SECOND, 0);itemDay.set(Calendar.MILLISECOND, 0);Date todayFrom = itemDay.getTime();itemDay.set(Calendar.HOUR_OF_DAY, 23);itemDay.set(Calendar.MINUTE, 59);itemDay.set(Calendar.SECOND, 59);itemDay.set(Calendar.MILLISECOND, 999);Date todayTo = itemDay.getTime();// refresh log-report every minuteXxlJobLogReport xxlJobLogReport = new XxlJobLogReport();xxlJobLogReport.setTriggerDay(todayFrom);xxlJobLogReport.setRunningCount(0);xxlJobLogReport.setSucCount(0);xxlJobLogReport.setFailCount(0);// 查询 xxl_job_log 表Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);if (triggerCountMap!=null && triggerCountMap.size()>0) {int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;xxlJobLogReport.setRunningCount(triggerDayCountRunning);xxlJobLogReport.setSucCount(triggerDayCountSuc);xxlJobLogReport.setFailCount(triggerDayCountFail);}// 先按统计日期更新 xxl_job_log_report 表,更新失败后再插入新记录// do refreshint ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);if (ret < 1) {XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);}}// xxl.job.logretentiondays默认值是30,小于7不清理。每天清理一次。// 2、log-clean: switch open & once each dayif (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {// expire-timeCalendar expiredDay = Calendar.getInstance();expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());expiredDay.set(Calendar.HOUR_OF_DAY, 0);expiredDay.set(Calendar.MINUTE, 0);expiredDay.set(Calendar.SECOND, 0);expiredDay.set(Calendar.MILLISECOND, 0);Date clearBeforeTime = expiredDay.getTime();// 这里如果日志删除失败可能导致死循环吧?// clean expired logList<Long> logIds = null;do {logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);if (logIds!=null && logIds.size()>0) {XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);}} while (logIds!=null && logIds.size()>0);// update clean timelastCleanLogTime = System.currentTimeMillis();}try {TimeUnit.MINUTES.sleep(1);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");}});logrThread.setDaemon(true);logrThread.setName("xxl-job, admin JobLogReportHelper");logrThread.start();}

调度任务(JobScheduleHelper)

有两个线程:

scheduleThread

此线程做的事:读取要执行的任务,直接触发或将任务放入 ring (时间轮)中。

具体执行过程如下:

  1. 读取下次执行时间 <= 当前时间 + 5秒的任务,每次最多读取 6000 条任务。
  2. 循环处理读取出来的每条任务:
    1. 下次执行时间过期超过 5 秒
      1. 如果任务的过期策略是“立即执行一次”,则立即触发任务。
      2. 刷新任务的下次执行时间。
    2. 下次执行时间过期小于 5 秒
      1. 立即触发任务。
      2. 刷新任务的下次执行时间。
      3. 如果刷新后的下次执行时间在 5 秒内到达,将任务放入 ring 中,再刷新任务的下次执行时间。
    3. 下次执行时间刚好是当前时间或者在 5 秒内到达
      1. 将任务放入 ring 中。
      2. 刷新任务的下次执行时间。

将任务放入 ring 的处理:

先计算任务下次执行时间所在的秒数,再添加到对应秒数的任务 List 中。

ringThread

此线程做的事:读取 ring 中的任务,触发任务执行。

ring 结构如下,其实就是时间轮:

具体执行过程如下:

  1. 获取当前时间的秒数,读取 ring 中对应秒及前一秒的任务队列中的数据。

    这里读取前一秒的任务队列的原因举例说明一下:当前时间的秒数是5,执行任务触发用了1秒多,进入下一个循环时的秒数已经过了6秒,会等待到第7秒进行处理,这时读取任务队列如果只读取7秒的任务队列的话,6秒的任务就会被跳过没有执行,所以要读取7秒和6秒的任务队列。
    
  2. 循环触发读取的每条任务,是在线程池中异步触发的,不会耗费很长时间。

public void start(){// schedule threadscheduleThread = new Thread(new Runnable() {@Overridepublic void run() {// 等待整秒时再进行后面的处理try {TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// 默认是6000// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);// 使用排它锁,确保多个 xxl-job admin 节点下只有1个节点执行后面的处理。preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// 下次执行时间过期超过 5 秒// time-ring jumpif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// 1、misfire matchMisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );}// 2、fresh nextrefreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 1、triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2、fresh nextrefreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read againif (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger infofor (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// commitif (conn != null) {try {conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}long cost = System.currentTimeMillis()-start;// Wait seconds, align secondif (cost < 1000) {  // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring threadringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {// 等待整秒再进行后面的处理// align secondtry {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}try {// second dataList<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );if (ringItemData.size() > 0) {// do triggerfor (int jobId: ringItemData) {// 触发任务,在线程池中异步触发// do triggerJobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}

执行器

xxl-job-executor-sample-frameless 模块是不使用 Spring Boot 框架的执行器代码,不常用。重点分析使用 Spring Boot 框架的执行器代码,即 xxl-job-executor-sample-springboot 模块。

执行器初始化:

com.xxl.job.executor.core.config.XxlJobConfig#xxlJobExecutor

注意调度中心和执行器通讯的端口是 xxl.job.executor.port , server.port 是原有业务接口使用的端口。

@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}

com.xxl.job.core.executor.impl.XxlJobSpringExecutor 实现了 Spring 框架的 SmartInitializingSingleton 接口,在 Spring 所有单例 Bean 创建完成后会回调 afterSingletonsInstantiated 方法。

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);// start@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository/*initJobHandlerRepository(applicationContext);*/// 初始化任务处理器名称和对应方法的对应关系// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// 暂时没看明白// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}}......
}

initJobHandlerMethodRepository 是初始化任务处理器名称和对应方法的对应关系,

例如任务处理器名称“demoJobHandler”对应的处理方法是 SampleXxlJob 的 demoJobHandler() 方法。

super.start() 启动执行器任务:

public void start() throws Exception {// 创建执行器日志目录// init logpathXxlJobFileAppender.initLogPath(logPath);// 将配置项xxl.job.admin.addresses初始化为执行器调用调度中心接口的客户端实例,可以配置多个,以逗号分隔// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);// 启动执行器日志文件清理线程,每天执行1次,默认清理30天前的日志文件,通过 xxl.job.executor.logretentiondays 配置// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// 启动执行器回调线程,执行器执行完任务后,调用调度中心接口api/callback将执行结果上报给调度中心// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// 初始化执行器服务器,启动了1个Netty服务器,接收调度中心发送的消息// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);}
        // startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);
 try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL// 执行器服务器请求处理.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// 执行器向调度中心注册,默认每30秒注册一次,调用调度中心接口 api/registry 注册// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();}
            // 执行器启动的 Netty 服务器处理 HTTP 请求,调度中心调用执行器的接口在这里分发处理// services mappingtry {switch (uri) {case "/beat":return executorBiz.beat();case "/idleBeat":IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);// 根据任务的 executorHandler 找对应方法处理case "/run":TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill":KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}}

任务执行线程在这里启动:

public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);newJobThread.start();logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}
if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/63954.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Macbookpro M1 IDEA中安装mysql

一&#xff1a;安装与连接数据库 1. 首先在mysql中创建一个初始数据库&#xff1a;idea_db&#xff0c;如示&#xff1a; 2.打开IDEA,如果最右侧没有database窗口&#xff0c;则在插件那里下载“Database navigator”,稍后重启一下即可&#xff1b; 点击最右侧Database---->…

Linux内核结构及源码概述

参考&#xff1a;深入分析LINUX内核源码 深入分析Linux内核源码 (kerneltravel.net) Linux 是一个庞大、高效而复杂的操作系统&#xff0c;虽然它的开发起始于 Linus Torvalds 一个人&#xff0c;但随着时间的推移&#xff0c;越来越多的人加入了 Linux 的开发和对它的不断完善…

[Unity] Text文本首行缩进两个字符

Text文本首行缩进两个字符的方法比较简单。通过代码把"\u3000\u3000"加到文本字符串前面即可。 比如&#xff1a; 效果&#xff1a; 代码&#xff1a; TMPtext1.text "\u3000\u3000" "选择动作类型&#xff1a;";

容器内部时间和Node时间不同步问题

之前在《 Docker及Kubernetes使用过程中出现的问题&#xff08;FAQ&#xff09;》中的“FAQ30 容器内部时间和Node时间不同”分享过容器内部和容器服务器之间时间不一致的问题&#xff1b;这通常会导致各种问题&#xff0c;比如日志记录、定时任务等。 问题原因&#xff1a;问…

AI开源南京分享会回顾录

AI 开源南京分享会&#xff0c;已于2024年11月30日下午在国浩律师&#xff08;南京&#xff09;事务所5楼会议厅成功举办。此次活动由 KCC南京、PowerData、RISC-Verse 联合主办&#xff0c;国浩律师&#xff08;南京&#xff09;事务所协办。 活动以“开源视角的 AI 对话”为主…

OpenCV 图像变换与处理实战

OpenCV快速通关 第一章&#xff1a;OpenCV 简介与环境搭建 第二章&#xff1a;OpenCV 图像基本操作 第三章&#xff1a;OpenCV 图像变换与处理实战 OpenCV 图像变换与处理实战 OpenCV快速通关OpenCV 图像变换与处理实战一、OpenCV 基础与图像处理概览二、图像变换理论精析三、…

Ubuntu22.04安装docker desktop遇到的bug

1. 确认已启用 KVM 虚拟化 如果加载了模块&#xff0c;输出应该如下图。说明 Intel CPU 的 KVM 模块已开启。 否则在VMware开启宿主机虚拟化功能&#xff1a; 2. 下一步操作&#xff1a; Ubuntu | Docker Docs 3. 启动Docker桌面后发现账户登陆不上去&#xff1a; Sign in | …

【深度学习入门】深度学习介绍

1.1 深度学习介绍 学习目标 目标 知道深度学习与机器学习的区别了解神经网络的结构组成知道深度学习效果特点 应用 无 区别 特征提取方面 机器学习的特征工程步骤是要靠手动完成的&#xff0c;而且需要大量领域专业知识深度学习通常由多个层组成&#xff0c;它们通常将更简…

实现按键按下(低电平)检测到下降沿

按照流程进行编程 步骤1&#xff1a; 初始化函数 包括时基工作参数配置 输入通道配置 更新中断使能 使能捕获、捕获中断及计数器 HAL_TIM_IC_Init(&ic_handle) //时基参数配置 HAL_TIM_IC_ConfigChannel(&ic_handle,&ic_config,TIM_CHANNEL_2) //输…

汽车车牌标记支持YOLO,COCO,VOC三种格式标记,4000张图片的数据集

本数据集支持YOLO&#xff0c;COCO&#xff0c;VOC三种格式标记汽车车牌&#xff0c;无论是新能源汽车还是油车都能识别标记&#xff0c;该数据集一共包含4000张图片 数据集分割 4000总图像数 训练组 70&#xff05; 2800图片 有效集 20&#xff05; 800图片 测…

游秦岭山感

巍乎高哉&#xff01; 悠悠大秦岭 佑吾华夏之根脉 八水润之 泽万物而赋予生机 于万山之山中 享自然之美于万物 西有昆仑祖龙脉 东有秦岭护关中 绿水青山国之本 万山长青谋发展 旭日东升耀中华 固我山河永泰安 你我同行共保护 关中龙脉更兴旺

阿里云-通义灵码:测试与实例展示

目录 一.引子 二.例子 三.优点 四.其他优点 五.总结 一.引子 在软件开发的广袤天地中&#xff0c;阿里云通义灵码宛如一座蕴藏无尽智慧的宝库&#xff0c;等待着开发者们去深入挖掘和探索。当我们跨越了入门的门槛&#xff0c;真正开始使用通义灵码进行代码生成和开发工作…

微信小程序中使用miniprogram-sm-crypto实现SM4加密攻略

在微信小程序开发过程中&#xff0c;数据安全至关重要。本文将为大家介绍如何在微信小程序中使用miniprogram-sm-crypto插件进行SM4加密&#xff0c;确保数据传输的安全性。 一、SM4加密简介 SM4是一种对称加密算法&#xff0c;由国家密码管理局发布&#xff0c;适用于商密领…

使用 Ansys Fluent 对气体泄漏检测进行建模

了解使用 Ansys Fluent 仿真气体泄漏和确保安全的前沿技术。 挑战 气体泄漏对人类安全和环境构成重大风险。及早检测气体泄漏可以防止潜在的灾难&#xff0c;包括爆炸、火灾和有毒物质暴露。有效的气体泄漏检测系统对于石油和天然气、化学加工和住宅基础设施等行业至关重要。…

QT图形/视图架构详解(一)

场景、视图与图形项 图形/视图架构主要由 3 个部分组成&#xff0c;即场景、视图和图形项&#xff0c;三者的关系如图所示&#xff1a; 场景、视图和图形项的关系 场景&#xff08;QGraphicsScene 类&#xff09; 场景不是界面组件&#xff0c;它是不可见的。场景是一个抽象的…

LLM之RAG实战(五十)| FastAPI:构建基于LLM的WEB接口界面

FastAPI是WEB UI接口&#xff0c;随着LLM的蓬勃发展&#xff0c;FastAPI的生态也迎来了新的机遇。本文将围绕FastAPI、OpenAI的API以及FastCRUD&#xff0c;来创建一个个性化的电子邮件写作助手&#xff0c;以展示如何结合这些技术来构建强大的应用程序。 下面我们开始分步骤操…

Maven学习(Maven项目模块化。模块间“继承“机制。父(工程),子项目(模块)间聚合)

目录 一、Maven项目模块化&#xff1f; &#xff08;1&#xff09;基本介绍。 &#xff08;2&#xff09;汽车模块化生产再聚合组装。 &#xff08;3&#xff09;Maven项目模块化图解。 1、maven_parent。 2、maven_pojo。 3、maven_dao。 4、maven_service。 5、maven_web。 6…

CNAS软件实验室认可费用清单,如何规划预算方案?

CNAS软件实验室在申请认可前&#xff0c;需要按照CNAS相关认可文件的要求&#xff0c;建立完善的CNAS软件实验室质量管理体系&#xff0c;试运行六个月&#xff0c;且覆盖全部质量要素后&#xff0c;向CNAS认可委提交申请&#xff0c;等待专家的审查。在前期的筹备工作中&#…

【2024 Dec 超实时】编辑安装llama.cpp并运行llama

首先讲一下环境 这是2024 年12月&#xff0c;llama.cpp 的编译需要cmake 呜呜呜 网上教程都是make 跑的。反正我现在装的时候make已经不再适用了&#xff0c;因为工具的版本&#xff0c;捣鼓了很久。 ubuntu 18 conda env内置安装。 以下是可以完美编译llama.cpp的测试工具版…

优化移动端H5:常见问题与解决方案

移动端H5开发中的“坑”与解决方案 本文介绍了开发中遇到的几个关于移动端H5开发中的小问题&#xff0c;以及解决的方法。 一、iOS滑动不流畅问题 在iOS设备上&#xff0c;H5页面的滑动效果有时会出现不流畅的情况&#xff0c;特别是在页面高度超过一屏时。这通常是由于iOS的…