XXL-JOB源码梳理——一文理清XXL-JOB实现方案

分布式定时任务调度系统

流程分析

一个分布式定时任务,需要具备有以下几点功能:

  • 核心功能:定时调度、任务管理、可观测日志
  • 高可用:集群、分片、失败处理
  • 高性能:分布式锁
  • 扩展功能:可视化运维、多语言、任务编排

在分布式环境下,一般会将定时任务拆解为任务调度部分和任务执行部分,各司其职。
调度中心就是总览全局的Leader,具体的执行器就是需要工作的workerLeader分配任务,worker负责任务执行。那么Leader会等worker执行完任务再去干其他事情吗?显然不行,这样效率太低了。

Leader:时间到了,你去执行任务
Worker:收到,我马上执行,任务完成给你反馈,不用一直等我。。。
Leader:任务执行完了
Worker:收到
Worker:执行器挂了??任务也标记失败吧。。。还得报告上级,任务失败了。。

核心问题

  1. 任务如何触发?触发失败的处理逻辑?
  2. 任务如何执行?任务结果如何反馈?反馈回调失败处理逻辑?任务日志查看?
  3. 任务失败判断逻辑与依据,任务失败后告警提示?
  4. 如何保证高可用?集群如何搭建?
  5. 调度和执行之间的通信和心跳?

同类产品对比

QuartZxxl-jobSchedulerX 2.0PowerJob
定时类型CRONCRONCRON、固定频率、固定延迟、OpenAPICRON、固定频率、固定延迟、OpenAPI
任务类型内置Java内置Java、GLUE Java、Shell、Python等脚本内置Java、外置Java(FatJar)、Shell、Python等脚本内置Java、外置Java(容器)、Shell、Python等脚本
分布式任务静态分片MapReduce 动态分片MapReduce 动态分片
在线任务治理不支持支持支持支持
日志白屏化不支持支持不支持支持
调度方式及性能基于数据库锁,有性能瓶颈基于数据库锁,有性能瓶颈不详无锁化设计,性能强劲无上限
报警监控邮件短信邮件,提供接口允许开发者扩展
系统依赖关系型数据库(MySQL、Oracle…)MySQL人民币任意 Spring Data Jpa支持的关系型数据库(MySQL、Oracle…)
DAG 工作流不支持不支持支持支持

数据来源于PowerJob:https://www.yuque.com/powerjob/guidence/intro

XXL-JOB相关概念

调度中心xxl-job-admin ;统一管理任务调度平台上调度任务,负责触发调度执行,并且提供任务管理平台。
执行器:负责接收调度中心的调度并执行;可直接部署执行器,也可以将执行器集成到现有业务项目中。

XXL-JOB系统架构

逻辑架构

image.png

数据架构

xxl-job调度中心数据表

- xxl_job_lock:任务调度锁表;
- xxl_job_group:执行器信息表,维护任务执行器信息;
- xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
- xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
- xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
- xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
- xxl_job_user:系统用户表;

核心表E-R图

arrow-platform-xxl-job[xxl-job]-202383222945.png

执行流程

整体执行流程

  1. 执行器自动注册到调度中心,30秒执行一次,用于心跳检查。执行器销毁会取消注册。
  2. 调度中心根据触发时间触发调度任务。
  3. 执行器通过任务执行线程池执行任务,并记录执行日志,执行结果异步上报。
  4. 调度中心日志请求。

执行流程细化

执行器注册:

  1. 服务启动后ExecutorRegistryThread通过registryThread注册线程每30s向调度中心注册一次。服务销毁后,ExecutorRegistryThread调用一次取消注册接口,从调度中心删除当前节点的注册信息。
  2. JobRegistryHelper内部维护了一个registryOrRemoveThreadPool注册或者删除线程池,用于处理执行器客户端发送的注册或者删除请求,同时更新调度中心的执行器地址信息。
  3. JobRegistryHelper内部为了一个registryMonitorThread注册监听线程,每30s执行一次(与客户端注册频率一致),用于监听超过90s未主动注册的节点地址。超过90s就认为节点掉线。

调度中心任务调度:

  1. JobScheduleHelper主要负责任务调度逻辑判断与执行调度。内部维护了来个线程来执行任务调度。scheduleThread调度线程:主要负责扫描任务,将能够执行的任务放入时间轮,并计算下一次执行时间。ringThread时间轮线程:主要处理时间轮中的任务,调用JobTriggerPoolHelper进行任务触发。
  2. JobTriggerPoolHelper任务触发线程,由快慢线程池组成,根据任务触发时间来进行切换选择由哪一个线程池触发任务。任务触发器根据任务信息组装触发参数(包括基本信息和阻塞策略),任务触发器根据任务配置的路由策略进行路由寻址,然后通过远程调用进行任务触发。
  3. XxlJobTrigger主要负责任务触发执行动作。
  4. ExecutorBizClientExecutorBiz接口的客户端sdk实现,在调度中心使用,相当于执行器的sdk,调用执行器的Rest接口使用。同理ExecutorBizImpl就是ExecutorBiz执行器业务逻辑实现。
  5. 调度中心的http服务就是Spring Boot实现的JobApiController

执行器执行任务:

  1. 执行器中的http服务是通过netty搭建的。
  2. ExecutorBizImpl接收到触发任务后先根据阻塞策略和任务类型进行必要参数组装,组装完成后交给XxlJobExecutor处理,XxlJobExecutor通过registJobThread()方法获取执行线程同时启动线程,然后将触发任务信息放入任务队列,由线程消费处理。
  3. JobThread任务线程,负责执行任务,记录执行日志到**文件,**任务执行完毕后,将结果推送到TriggerCallbackThreadcallBackQueue回调队列中,由TriggerCallbackThread负责任务结果回调。
  4. TriggerCallbackThread主要负责任务执行结果回调,将执行结果反馈给调度中心。TriggerCallbackThread内部维护了triggerCallbackThreadtriggerRetryCallbackThread两个线程。triggerCallbackThread负责处理callBackQueue队列中的数据,回调失败将回调参数记录到回调日志文件中,一直执行。triggerRetryCallbackThread主要对回调失败的数据进行重试,每30s执行一次,主要动作:将回调日志读取出来,反序列化后执行调用。

调度中心任务结果处理:

  1. AdminBizImpl基本没做复杂逻辑,接收到客户端发送的回调结果后,直接交给JobCompleteHelper处理。
  2. JobCompleteHelper负责对任务执行结果处理,内部维护了一个线程池和一个线程。callbackThreadPool线程池主要负责异步处理执行结果。monitorThread主要处理未收到回调的任务,60s执行一次,判断条件:①任务状态处于运行中超过10min 并且 ②执行器不在线。也就是说在线的执行器,任务执行超过10min不会标记为失败。

服务端启动流程

服务端执行时序图

在这里插入图片描述

🔔主要流程:

  1. 任务执行调度器负责计算任务是否需要执行,将需要执行的任务添加到任务触发线程池中;
  2. 任务触发器由快慢线程池组成,根据任务触发时间来进行切换选择由哪一个线程池触发任务。任务触发器根据任务信息组装触发参数(包括基本信息和阻塞策略),任务触发器根据任务配置的路由策略进行路由寻址,然后通过远程调用进行任务触发。

初始化

首先找到配置类 XxlJobAdminConfig。该类实现InitializingBean接口和DisposableBean接口,主要用于xxl-job-admin初始化和销毁动作。
image.png
afterPropertiesSet执行初始化操作:

/*** 在Bean对象属性填充完成后调用*/
@Override
public void afterPropertiesSet() throws Exception {// 利用静态声明的只会加载一次的特性,初始化一个单例对象。adminConfig = this;// 初始化xxl-job调度器xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();
}

com.xxl.job.admin.core.scheduler.XxlJobScheduler#init初始化xxl-job调度器:

public void init() throws Exception {// init i18ninitI18n();// admin trigger pool start --> 初始化触发器线程池JobTriggerPoolHelper.toStart();// admin registry monitor run --> 30秒执行一次,维护注册表信息,判断在线超时时间90s RegistryConfig类中配置JobRegistryHelper.getInstance().start();// admin fail-monitor run   --> 运行失败监视器,主要失败发送邮箱,重试触发器JobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper ) -->  任务结果处理,包括执行器正常回调和任务结果丢失处理// 调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;JobCompleteHelper.getInstance().start();// admin log report start  --> 统计一些失败成功报表JobLogReportHelper.getInstance().start();// start-schedule  ( depend on JobTriggerPoolHelper ) --> 执行调度器JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");
}

该方法主要做了如下事情:

  1. init i18n
  2. 初始化触发器线程池
  3. 维护注册表信息(30秒执行一次),保持心跳
  4. 将丢失主机信息调度日志更改状态
  5. 统计一些失败成功报表,删除过期日志
  6. 执行调度器

具体流程

I. 初始化i18n

主要是针对ExecutorBlockStrategyEnum枚举的title属性进行国际化赋值处理

private void initI18n() {// 枚举都是单例的,初始化调用一次赋值后即可for (ExecutorBlockStrategyEnum item : ExecutorBlockStrategyEnum.values()) {// SERIAL_EXECUTION=单机串行// DISCARD_LATER=丢弃后续调度// COVER_EARLY=覆盖之前调度item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));}
}

II. 初始化触发器线程池【JobTriggerPoolHelper快慢线程池】

JobTriggerPoolHelper主要维护了两个线程池。
image.png
主要由JobTriggerPoolHelper类完成触发器线程池的初始化

/*** 初始化* 调度器启动时,初始化了两个线程池,除了慢线程池的队列大一些以及最大线程数由用户自定义以外,其他配置都一致。* 快线程池用于处理时间短的任务,慢线程池用于处理时间长的任务*/
public void start() {// 核心线程数10,最大线程数来自配置,存活时间为60s,队列大小1000,线程工厂配置线程名。拒绝策略为AbortPolicy,直接抛出异常fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));// 慢线程池初始化  触发的任务在一分钟内超时10次,则采用慢触发器执行。拒绝策略为AbortPolicy,直接抛出异常slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2000),r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
}

注意:这里分别初始化了2个线程池,一个快一个慢,优先选择快,当一分钟以内任务触发时间超时10次【超时时间为:500ms】,则加入慢线程池执行。

III. 维护注册表信息【JobRegistryHelper】(30秒执行一次)

JobRegistryHelper#start主要完成3件事情:

  1. 初始化注册或者删除线程池,主要负责客户端注册或者销毁到xxl_job_registry表异步处理,调度中心的apicom.xxl.job.admin.controller.JobApiController
  2. 初始化守护线程,每30秒执行一次。
    1. xxl_job_registry中删除超时的机器
    2. 更新xxl_job_group执行器地址列表
/*** 初始化*/
public void start() {// for registry or remove  --> 注册或者删除线程池初始化,拒绝策略是由父线程执行,同时会打印日志registryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 注意:这里是父线程执行任务r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});// for monitor  --> 注册监控线程 30秒【sleep】执行一次,维护注册表信息, 判断在线超时时间90sregistryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// auto registry group --> 查询任务组数据。对应xxl-job-group表,有数据时校验自动任务执行器注册信息List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList != null && !groupList.isEmpty()) {// remove dead address (admin/executor)  --> 从xxl-job-registry中删除超时90s的机器,不分是否自动注册List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids != null && ids.size() > 0) {// 移除超时掉线的执行器。执行器的更新时间通过com.xxl.job.core.biz.AdminBiz.registry完成,也就是执行器和admin之间的心跳XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)  --> 从xxl-job-registry中获取执行器地址,刷新到xxl-job-group中。刷新在线地址 包含执行器注册的和adminHashMap<String, List<String>> appAddressMap = new HashMap<>();// 查询更新时间大于当前时间-90s的数据List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item : list) {if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {// group表的appname对应registry表的registrykey字段String appname = item.getRegistryKey();List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}appAddressMap.put(appname, registryList);}}}// fresh group addressfor (XxlJobGroup group : groupList) {List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList != null && !registryList.isEmpty()) {// 对地址进行排序Collections.sort(registryList);// 用逗号分隔 http:127.0.0.1:9092/,http://127.0.0.1:9903/StringBuilder addressListSB = new StringBuilder();for (String item : registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length() - 1);}group.setAddressList(addressListStr);group.setUpdateTime(new Date());// 更新xxl-job-group中的数据。注册信息中没有数据也会执行更新,将执行器地址更新为空XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:", e);}}try {//  30s执行一次,通过sleep实现TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}});// 守护线程registryMonitorThread.setDaemon(true);registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");registryMonitorThread.start();
}

IV. 运行失败监视器【JobFailMonitorHelper】失败重试,告警邮件

JobFailMonitorHelper主要是失败任务重试,以及告警消息发送

  1. 失败重试

这里判断失败有2种情况(trigger_code表示任务触发状态,handle_code表示任务执行结果状态,200均表示成功,500表示失败)
第一种:trigger_code!=(0,200) 且 handle_code!=0
第二种:handle_code!=200

  1. 告警(这里可向spring注入JobAlarm),可自定义扩展

JobFailMonitorHelper内部初始化了一个守护线程monitorThread用于检测失败任务,并根据配置的重试规则进行重试和告警。

/*** 初始化任务失败监听类* <p>* 线程每10秒执行1次*/
public void start() {monitorThread = new Thread(new Runnable() {@Overridepublic void run() {// monitorwhile (!toStop) {try {// 查询 1000 条失败任务List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);if (CollUtil.isNotEmpty(failLogIds)) {for (long failLogId : failLogIds) {// lock log --> 加锁,乐观修锁改alarm_status=-1int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);if (lockRet < 1) {continue;}XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());// 1、fail retry monitor  失败重试if (log.getExecutorFailRetryCount() > 0) {// 执行重新触发操作JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY,(log.getExecutorFailRetryCount() - 1), log.getExecutorShardingParam(), log.getExecutorParam(), null);// 追加日志String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>" + I18nUtil.getString("jobconf_trigger_type_retry") + "<<<<<<<<<<< </span><br>";log.setTriggerMsg(log.getTriggerMsg() + retryMsg);XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);}// 2、fail alarm monitor 任务失败就告警int newAlarmStatus = 0;        // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败if (info != null) {// 发送告警,并获取发生送结果boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);logger.debug(">>>>>>>> xxl-job 任务执行失败,发送告警信息:jobId:{},重试次数:{}", info.getId(), log.getExecutorFailRetryCount());newAlarmStatus = alarmResult ? 2 : 3;} else {newAlarmStatus = 1;}XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:", e);}}try {// 10秒执行一次TimeUnit.SECONDS.sleep(10);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");}});// 守护线程monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobFailMonitorHelper");monitorThread.start();
}

其中XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);告警的发送,可以实现自定义。即实现JobAlarm接口,并注入Spring即可。

@Component
public class JobAlarmer implements ApplicationContextAware, InitializingBean {private static Logger logger = LoggerFactory.getLogger(JobAlarmer.class);private ApplicationContext applicationContext;private List<JobAlarm> jobAlarmList;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}@Overridepublic void afterPropertiesSet() throws Exception {Map<String, JobAlarm> serviceBeanMap = applicationContext.getBeansOfType(JobAlarm.class);if (MapUtil.isNotEmpty(serviceBeanMap)) {jobAlarmList = new ArrayList<>(serviceBeanMap.values());}}/*** job alarm** @param info   任务信息* @param jobLog 任务日志* @return 告警结果*/public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {boolean result = false;if (jobAlarmList != null && jobAlarmList.size() > 0) {result = true;  // success means all-successfor (JobAlarm alarm : jobAlarmList) {boolean resultItem = false;try {resultItem = alarm.doAlarm(info, jobLog);} catch (Exception e) {logger.error(e.getMessage(), e);}if (!resultItem) {result = false;}}}return result;}}

V. 任务结果处理【JobCompleteHelper】

主要职责:

  1. 初始化线程池和守护线程
  2. 守护线程每60秒执行一次,将执行器客户端失联的任务状态标记为完成【两个条件:a.超过10分钟都处于运行中;b.失联】
  3. 线程池主要用于异步处理执行器的任务结果回调

image.png

/*** 初始化*/
public void start() {// for callback   --> 回调线程callbackThreadPool = new ThreadPoolExecutor(2,20,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3000),r -> new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()),(r, executor) -> {// 超过最大数量后,父线程执行任务r.run();log.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");});// for monitor  -> 监听线程。每60秒执行一次monitorThread = new Thread(new Runnable() {@Overridepublic void run() {// wait for JobTriggerPoolHelper-inittry {// 首次运行,暂停50毫秒,目的是为了让JobTriggerPoolHelper先初始化完成TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {if (!toStop) {log.error(e.getMessage(), e);}}// monitor --> 监听while (!toStop) {try {// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;// 两个条件:1.运行中状态超过10min 2.心跳不在线Date losedTime = DateUtil.addMinutes(new Date(), -10);List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);if (CollUtil.isNotEmpty(losedJobIds)) {for (Long logId : losedJobIds) {XxlJobLog jobLog = new XxlJobLog();jobLog.setId(logId);jobLog.setHandleTime(new Date());jobLog.setHandleCode(ReturnT.FAIL_CODE);// 任务结果丢失,标记失败jobLog.setHandleMsg(I18nUtil.getString("joblog_lost_fail"));XxlJobCompleter.updateHandleInfoAndFinish(jobLog);}}} catch (Exception e) {if (!toStop) {log.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:", e);}}try {// 每60秒执行一次TimeUnit.SECONDS.sleep(60);} catch (Exception e) {if (!toStop) {log.error(e.getMessage(), e);}}}log.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");}});// 守护线程monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");monitorThread.start();
}

com.xxl.job.admin.core.complete.XxlJobCompleter#updateHandleInfoAndFinish处理任务结果,有子任务触发子任务

/*** 任务结果刷新入口* common fresh handle entrance (limit only once)** @param xxlJobLog 任务信息*/
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {// finish 处理任务结果,有子任务执行子任务finishJob(xxlJobLog);// text最大64kb 避免长度过长if (xxlJobLog.getHandleMsg().length() > 15000) {xxlJobLog.setHandleMsg(xxlJobLog.getHandleMsg().substring(0, 15000));}// fresh handlereturn XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}

VI. 报表统计与日志清理【JobLogReportHelper】

  1. 按天统计报表数据(xxl_job_log_report)1分钟执行一次
  2. 定时清理日志信息(xxl_job_log)24小时执行一次
/*** 初始化,启动一个守护线程处理任务报表* 每分钟执行一次*/
public void start() {// 每一分钟执行一次logrThread = new Thread(() -> {// 上次清理日志时间long lastCleanLogTime = 0;while (!toStop) {// 1、log-report refresh: refresh log report in 3 daystry {// 分别统计今天,昨天,前天0~24点的数据  每天开始时间为 00:00:00.000 结束时间为:23:59:59.999for (int i = 0; i < 3; i++) {// 获取当前迁移i天的开始时间数据。Calendar itemDay = Calendar.getInstance();itemDay.add(Calendar.DAY_OF_MONTH, -i);itemDay.set(Calendar.HOUR_OF_DAY, 0);itemDay.set(Calendar.MINUTE, 0);itemDay.set(Calendar.SECOND, 0);itemDay.set(Calendar.MILLISECOND, 0);// 开始时间,getTime() 是通过new Date()返回的。Date todayFrom = itemDay.getTime();itemDay.set(Calendar.HOUR_OF_DAY, 23);itemDay.set(Calendar.MINUTE, 59);itemDay.set(Calendar.SECOND, 59);itemDay.set(Calendar.MILLISECOND, 999);// 结束时间Date todayTo = itemDay.getTime();XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();xxlJobLogReport.setTriggerDay(todayFrom);xxlJobLogReport.setRunningCount(0);xxlJobLogReport.setSucCount(0);xxlJobLogReport.setFailCount(0);// 查询当前数据 开始时间为 00:00:00.000 结束时间为:23:59:59.999Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);if (MapUtil.isNotEmpty(triggerCountMap)) {// 触发总数int triggerDayCount = Integer.parseInt(String.valueOf(triggerCountMap.getOrDefault("triggerDayCount", 0)));// 运行中 trigger_code in (0, 200) and handle_code = 0int triggerDayCountRunning = Integer.parseInt(String.valueOf(triggerCountMap.getOrDefault("triggerDayCountRunning", 0)));// 成功 handle_code = 200int triggerDayCountSuc = Integer.parseInt(String.valueOf(triggerCountMap.getOrDefault("triggerDayCountSuc", 0)));// 失败数据int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;xxlJobLogReport.setRunningCount(triggerDayCountRunning);xxlJobLogReport.setSucCount(triggerDayCountSuc);xxlJobLogReport.setFailCount(triggerDayCountFail);}// do refresh 先执行更新,无数据才插入,能在一定程度上解决调度器执行器多节点并发问题// 旧数据执行更新,新数据执行保存。更新返回的是变动行数,小于1则表示库里不存在 。根据报表时间更新数据int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);if (ret < 1) {// 这里还是有很小的可能会同时执行到,导致数据有多份的情况XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);}}} catch (Exception e) {if (!toStop) {log.error(">>>>>>>>>>> xxl-job, job log report thread error:", e);}}// 2、log-clean: switch open & once each day 开关打卡并且每24小时执行一次// 设置了保留日志天数,并且有效时(小于7为-1),距离上次清理超过24小时if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays() > 0&& System.currentTimeMillis() - lastCleanLogTime > 24 * 60 * 60 * 1000) {// expire-time 获取开始清理时间。例如配置了7天,今天是2023-08-12 那么clearBeforeTime就是2023-08-05 00:00:00.000Calendar expiredDay = Calendar.getInstance();expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());expiredDay.set(Calendar.HOUR_OF_DAY, 0);expiredDay.set(Calendar.MINUTE, 0);expiredDay.set(Calendar.SECOND, 0);expiredDay.set(Calendar.MILLISECOND, 0);Date clearBeforeTime = expiredDay.getTime();// clean expired logList<Long> logIds;do {// 每次1000条 执行清理,mysql in最多1000个logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);if (CollUtil.isNotEmpty(logIds)) {XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);}} while (CollUtil.isNotEmpty(logIds));// update clean timelastCleanLogTime = System.currentTimeMillis();}try {// 每1分钟钟执行一次TimeUnit.MINUTES.sleep(1);} catch (Exception e) {if (!toStop) {log.error(e.getMessage(), e);}}}log.info(">>>>>>>>>>> xxl-job, job log report thread stop");});logrThread.setDaemon(true);logrThread.setName("xxl-job, admin JobLogReportHelper");logrThread.start();
}

VII. 执行调度器【JobScheduleHelper】(核心)

执行调度器主要由包含了两个线程。一个线程(scheduleThread)负责加锁查询任务信息,对任务按照触发时间分类,并按照具体策略执行或者计算下次调度时间。对于执行时间间隔非常短的任务会根据具体的策略放入时间轮,然后由另一个线程(ringThread)进行任务触发处理。
image.png

scheduleThread执行周期:

  1. 扫描超时(大于1000ms),不等待,直接继续执行。
  2. 预读数据不为空,执行周期为:0-1000ms。预读数据为空,执行周期为4000-5000ms
// Wait seconds, align second  耗时小于1秒,-->数据少。可以sleep一会。数据多的情况下。一直执行
if (cost < 1000) {  // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}
}

scheduleThread会加锁查询出下次执行时间在未来5秒以内的所有任务,默认一次最多获取6000条。然后根据过期时间会分成三种对应处理。

  1. 触发器下次执行时间过期时间 > 5S
  2. 触发器下次执行时间过期时间 < 5S
  3. 触发器下次执行时间在未来5S以内。


ringThread主要处理时间轮中的定时任务,执行周期为:0-1000ms。
时间轮出自Netty中的HashedWheelTimer,是一个环形结构,可以用时钟来类比,钟面上有很多bucket,每一个bucket上可以存放多个任务,使用一个List保存该时刻到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应bucket上所有到期的任务。任务通过取模决定应该放入哪个bucket。和HashMap的原理类似,newTask对应put,使用List来解决 Hash 冲突。
xxl-job中一个时间轮有60个bucket,从0-59。用于存储当前秒执行的任务列表。
image.png
以上图为例,假设一个bucket是1秒,则指针转动一轮表示的时间段为60s,假设当前指针指向0,此时需要调度一个3s后执行的任务,显然应该加入到(0+3=3)的方格中,指针再走3s次就可以执行了;

具体代码如下:

package com.xxl.job.admin.core.thread;import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.cron.CronExpression;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum;
import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum;
import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import com.xxl.job.admin.core.util.CollUtil;
import lombok.extern.slf4j.Slf4j;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;/*** 任务执行调度* <p>* 工作流程:* 周期性的遍历所有的jobInfo这个表,通过数据库的行锁和事务一致性,通过for update 来保证多个调度中心集群在同一时间内只有一个调度中心在调度任务* <p>* 周期性的遍历所有的jobInfo这个表,读取触发时间小于nowtime+5s这个时间之前的所有任务,然后进行引入以下触发机制判断* <p>* 三种触发任务机制:* <ol>*     <li>nowtime-TriggerNextTime()>PRE_READ_MS(5s) 既超过有效误差内【5秒内】,则查看当前任务的失效调度策略,若为立即重试一次,则立即触发调度任务,且触发类型为misfire</li>*     <li>nowtime-TriggerNextTime()<PRE_READ_MS(5s) 既没有超过有效误差【过5秒】,则立即调度调度任务</li>*     <li>nowtime<TriggerNextTime() 则说明这个任务马上就要触发了,放到一个时间轮上(https://blog.csdn.net/zalu9810/article/details/113396131),</li>* </ol>* <p>* 随后将快要触发的任务放到时间轮上,时间轮由key(将要触发的时间s),value(在当前触发s的所有任务id集合),然后更新这个任务的下一次触发时间* <p>* 这个时间轮的任务遍历交由第二个线程处理ringThread,周期在1s之内周期的扫描这个时间轮,然后执行调度任务** @author xuxueli 2019-05-21*/
@Slf4j
public class JobScheduleHelper {private static JobScheduleHelper instance = new JobScheduleHelper();public static JobScheduleHelper getInstance() {return instance;}/*** 预读误差时间,5秒*/public static final long PRE_READ_MS = 5000;/*** 调度线程,执行周期:【0-1000ms】、【4000-5000ms】内的随时时间执行*/private Thread scheduleThread;/*** 时间轮线程,主要处理ringData中的任务数据。并触发任务。注意这里执行周期 0-1000ms*/private Thread ringThread;/*** 默认调度线程停止标志*/private volatile boolean scheduleThreadToStop = false;/*** 时间轮线程停止标志*/private volatile boolean ringThreadToStop = false;/*** 时间轮,环上数据长度为60。即key的范围是0-59秒。value是在具体秒数需要执行的任务ID*/private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();public void start() {// schedule threadscheduleThread = new Thread(() -> {try {// sleep 4000-5000毫秒,时间返回内随机,避免各调度中心节点同时执行TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}log.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)// 每个触发器花费50ms,每个线程单位时间(秒)内处理20任务,默认最多同时处理300*20=6000任务int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;boolean connAutoCommit = true;PreparedStatement preparedStatement = null;// 查询成功标志,判断有无数据boolean preReadSuc = true;try {// 设置手动提交conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);// 获取任务调度锁表内数据信息,加写锁preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();// 查询条件:1. 下次触发时间小于当前时间+5s and 2.triggerStatus为1(调度状态:0-停止,1-运行)  and 3. 数据量(默认取值为6000条【根据配置变动】)// 任务调度错过触发时间时的可能原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过;List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (CollUtil.isNotEmpty(scheduleList)) {// 2、push time-ringfor (XxlJobInfo jobInfo : scheduleList) {// time-ring jumpif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-time  --> 任务过期超过5秒 当前时间-任务执行时间>5秒  -->按照过期策略处理并刷新下一次触发时间log.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = {}", jobInfo.getId());// 1、misfire match 过期处理策略-->FIRE_ONCE_NOW:立即执行一次MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);log.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = {}", jobInfo.getId());}// 2、fresh next  刷新下一次执行时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time   任务过期小于5秒 --> 直接触发任务并计算下次触发时间// 1、triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);log.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = {}", jobInfo.getId());// 2、fresh nextrefreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again  下次触发时间在未来5秒内,这块跟else中逻辑一致,目的是为了避免下次扫描时漏掉数据if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring second 时间转化为秒 时间轮为长度为60  (如果执行时间为 2023/08/29 17:03:26  则返回26)int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ring 将当前时间添加到时间轮pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next 刷新下一次触发时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring secondint ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger info 更新任务信息long currentTime = System.currentTimeMillis();for (XxlJobInfo jobInfo : scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}log.debug(">>>>>>>>> xxl-job,更新任务信息耗时统计,count:{}, Time-consuming:{}ms", scheduleList.size(), System.currentTimeMillis() - currentTime);} else {preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {log.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:", e);}} finally {// commitif (conn != null) {try {// 提交事务conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}try {// 设置为自动提交conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}try {// 关闭连接conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}}}long cost = System.currentTimeMillis() - start;// Wait seconds, align second  耗时小于1秒,-->数据少。可以sleep一会。数据多的情况下。一直执行if (cost < 1000) {  // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}}}log.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring thread 时间轮ringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {// align secondtry {// 执行周期 0-1000msTimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {log.error(e.getMessage(), e);}}try {// second data// 时间轮上的数据集合。即任务ID集合List<Integer> ringItemData = new ArrayList<>();// 避免处理耗时太长,跨过刻度,向前校验一个刻度;int nowSecond = Calendar.getInstance().get(Calendar.SECOND);for (int i = 0; i < 2; i++) {//  (nowSecond + 60 - i) % 60 和 (nowSecond  - i) % 60 加60的目的,避免为负数List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlog.debug(">>>>>>>>>>> xxl-job, time-ring beat : {} = {}", nowSecond, Collections.singletonList(ringItemData));// do triggerfor (int jobId : ringItemData) {// do triggerJobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();} catch (Exception e) {if (!ringThreadToStop) {log.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:", e);}}}log.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}/*** 计算任务下一次触发时间** @param jobInfo  任务信息* @param fromTime 当前时间* @throws Exception exp*/private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {Date nextValidTime = generateNextValidTime(jobInfo, fromTime);if (nextValidTime != null) {jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());jobInfo.setTriggerNextTime(nextValidTime.getTime());} else {// 调度状态:0-停止,1-运行jobInfo.setTriggerStatus(0);jobInfo.setTriggerLastTime(0);jobInfo.setTriggerNextTime(0);log.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}", jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());}}/*** 添加任务到时间轮** @param ringSecond 时间【秒】* @param jobId      任务id*/private void pushTimeRing(int ringSecond, int jobId) {// push async ring// 时间轮不存在对应时间时就新建一个list,存在取值。list中添加任务idList<Integer> ringItemData = ringData.computeIfAbsent(ringSecond, k -> new ArrayList<>());ringItemData.add(jobId);log.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : {} = {}", ringSecond, Collections.singletonList(ringItemData));}public void toStop() {// 1、stop schedulescheduleThreadToStop = true;try {TimeUnit.SECONDS.sleep(1);  // wait} catch (InterruptedException e) {log.error(e.getMessage(), e);}if (scheduleThread.getState() != Thread.State.TERMINATED) {// interrupt and waitscheduleThread.interrupt();try {scheduleThread.join();} catch (InterruptedException e) {log.error(e.getMessage(), e);}}// if has ring databoolean hasRingData = false;if (!ringData.isEmpty()) {for (int second : ringData.keySet()) {List<Integer> tmpData = ringData.get(second);if (tmpData != null && tmpData.size() > 0) {hasRingData = true;break;}}}if (hasRingData) {try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {log.error(e.getMessage(), e);}}// stop ring (wait job-in-memory stop)ringThreadToStop = true;try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {log.error(e.getMessage(), e);}if (ringThread.getState() != Thread.State.TERMINATED) {// interrupt and waitringThread.interrupt();try {ringThread.join();} catch (InterruptedException e) {log.error(e.getMessage(), e);}}log.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");}// ---------------------- tools ----------------------/*** 根据当前时间计算下次执行时间** @param jobInfo  任务信息* @param fromTime 当前时间* @return 下次执行时间* @throws Exception Exp*/public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {// 返回满足cron表达式的给定日期/时间之后的下一个日期/时间return new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);} else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {// 当前时间之后的下一次时间 固定速率return new Date(fromTime.getTime() + Integer.parseInt(jobInfo.getScheduleConf()) * 1000L);}return null;}}

执行器启动流程

在这里插入图片描述

执行器启动流程时序图

🧊主要流程:

  1. XxlJobSpringExecutor初始化时执行相关的方法;
  2. 解析标有XxlJob注解的方法,标有Lazy的类不处理。将标注有XxlJob注解的方法转化为MethodJobHandler类,并存储到XxlJobExecutor#jobHandlerRepository属性中。
  3. 初始化SpringGlueFactory
  4. 初始化日志路径,XxlJobFileAppender主要用于处理日志
  5. 初始化admin-client,用于进行任务回调以及心跳检查
  6. 初始化日志清理线程JobLogFileCleanThread
  7. 初始化任务回调线程TriggerCallbackThread
  8. 启动内嵌服务EmbedServer,基于netty实现

初始化

客户端执行器的核心接口是XxlJobExecutor,主要有两个实现类,XxlJobSimpleExecutorXxlJobSpringExecutor。其中XxlJobSpringExecutor主要是针对spring框架的。

xxl-job整合Spring场景下,需要手动配置XxlJobSpringExecutor实例,并注册为bean

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;
}

XxlJobExecutor接口中主要实现了执行器客户端的启动和销毁、admin-client远程调用初始化、executor-server远程调用初始化、JobHandler任务缓存、jobthread任务线程缓存。
image.png

image.png
接口继承体系,XxlJobSpringExecutor注入了 ApplicationContext 对象。以及实现了 SmartInitializingSingleton 接口,实现该接口的当spring容器初始完成,紧接着执行监听器发送监听后,就会遍历所有的Bean然后初始化所有单例非懒加载的bean,最后在实例化阶段结束时触发回调接口。
com.xxl.job.core.executor.impl.XxlJobSpringExecutor#afterSingletonsInstantiated主要完成三件事:

  1. 初始化调度器资源管理器(从spring容器中将标记了XxlJob注解的方法,将其封装并添加到map中)
  2. 刷新GlueFactory
  3. 启动服务,接收服务器请求等
    // start@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository/*initJobHandlerRepository(applicationContext);*/// init JobHandler Repository (for method)  初始化任务 标记XxlJob注解的方法类型的initJobHandlerMethodRepository(applicationContext);// refresh GlueFactory 舒心GlueFactoryGlueFactory.refreshInstance(1);// super start 调用父类接口,启动服务try {super.start();} catch (Exception e) {throw new RuntimeException(e);}}

具体流程

I. 初始化JobHandler

com.xxl.job.core.executor.impl.XxlJobSpringExecutor#initJobHandlerMethodRepository该方法主要做了如下事情:

  1. spring容器获取所有对象,并遍历查找方法上标记XxlJob注解的方法。
  2. xxljob配置的jobname作为key,根据初始化和销毁方法配置数据构造MethodJobHandler作为value注册jobHandlerRepository

任务执行接口IJobHandler,之前版本是自动注册IJobHandler接口的实现类的,后续版本改为了注册标记了@XxlJob注解的方法。如果有IJobHandler实现类形式,需要自己注册。
image.png
com.xxl.job.core.executor.impl.XxlJobSpringExecutor#initJobHandlerMethodRepository方法比较简单。主要流程:

  1. 加载所有非懒加载Bean
  2. 找出标记了XxlJob注解的方法,并解析初始化和销毁属性,并构造MethodJobHandler
  3. 注册MethodJobHandlerjobHandlerRepository 缓存中。MethodJobHandler任务最终是通过反射调用执行的。
    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from methodString[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {// get beanObject bean = null;Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);if (onBean!=null){logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);continue;}else {bean = applicationContext.getBean(beanDefinitionName);}// filter methodMap<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}// generate and regist method job handlerfor (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();// registregistJobHandler(xxlJob, bean, executeMethod);}}}

com.xxl.job.core.executor.XxlJobExecutor#registJobHandler(com.xxl.job.core.handler.annotation.XxlJob, java.lang.Object, java.lang.reflect.Method)方法完成注册

    protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob == null) {return;}String name = xxlJob.value();//make and simplify the variables since they'll be called several times laterClass<?> clazz = bean.getClass();String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");}if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}// execute method/*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}*/executeMethod.setAccessible(true);// init and destroyMethod initMethod = null;Method destroyMethod = null;if (xxlJob.init().trim().length() > 0) {try {initMethod = clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");}}// registry jobhandlerregistJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}

II. 刷新GlueFactory

com.xxl.job.core.glue.GlueFactory#refreshInstance刷新GlueFactory,工厂模式

	public static void refreshInstance(int type){if (type == 0) {glueFactory = new GlueFactory();} else if (type == 1) {glueFactory = new SpringGlueFactory();}}

III. 核心启动类【XxlJobExecutor】

该方法主要做了如下事情:

  1. 初始化日志文件
  2. 封装调度中心请求路径,用于访问调度中心
  3. 清除过期日志
  4. 回调调度中心任务执行状态
  5. 执行内嵌服务

com.xxl.job.core.executor.XxlJobExecutor#start方法

public void start() throws Exception {// init logpath 日志路径初始化XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client admin-client初始化,initAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);
}
初始化日志文件【XxlJobFileAppender】

XxlJobFileAppender主要用于处理执行日志信息。包括日志路径初始化、创建日志文件、追加日志、读取日志信息等。
方法都比较简单,这里不过多介绍。

初始化调度中心客户端【AdminBizClient】

AdminBizClient封装调度中心请求路径,用于访问调度中心。

    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}}
执行器日志文件清理【JobLogFileCleanThread】

JobLogFileCleanThread日志文件清理线程,主要用于日志文件清理。需要注意的是:配置参数小于3天不执行清理。每天执行一次清理。
代码也非常简单

public void start(final long logRetentionDays) {// limit min valueif (logRetentionDays < 3) {return;}localThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// clean log dir, over logRetentionDaysFile[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs != null && childDirs.length > 0) {// todayCalendar todayCal = Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY, 0);todayCal.set(Calendar.MINUTE, 0);todayCal.set(Calendar.SECOND, 0);todayCal.set(Calendar.MILLISECOND, 0);Date todayDate = todayCal.getTime();for (File childFile : childDirs) {// validif (!childFile.isDirectory()) {continue;}if (childFile.getName().indexOf("-") == -1) {continue;}// file create dateDate logFileCreateDate = null;try {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");logFileCreateDate = simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate == null) {continue;}if ((todayDate.getTime() - logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)) {FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {// 每天执行一次TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");}});localThread.setDaemon(true);localThread.setName("xxl-job, executor JobLogFileCleanThread");localThread.start();}
回调调度中心反馈任务结果【TriggerCallbackThread】

TriggerCallbackThread主要用于处理任务回调,以及任务回调失败后的重试操作。

服务注册与心跳检测

服务注册主要是指执行器客户端每隔30s向调度中心定时发送执行器配置信息(appNameaddress)等,在执行器中主要通过ExecutorRegistryThread类来完成。注册过程通过调用调度中心的api接口来完成注册信息传递。
在调度中心也会检测执行器是否失联(超过90s未上报数据),失联的执行器地址会被清理。

主要的核心类包括:

  • 执行器客户端:
    1. ExecutorRegistryThread执行器注册线程,每隔30s向调度中心注册一次。通过AdminBizClient发送出注册请求,都是post请求。
  • 调度中心:
    1. AdminBizImpl接收到请求不出特殊处理,转交给JobRegistryHelper完成注册
    2. JobRegistryHelper内部维护了registryOrRemoveThreadPool注册或者移除线程池,用于异步处理客户端的注册请求。
    3. JobRegistryHelper内部还维护了registryMonitorThread监控线程,用于处理超过90s未进行注册更新的执行器,每30s处理一次。

参考资料

  1. 分布式任务调度平台XXL-JOB
  2. xxl-job源码解析(看这一篇就够了,超简约且详细)_Nuan_Feng的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/95256.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【计算机网络笔记八】应用层(五)HTTPS

什么是 HTTPS HTTPS 解决了 HTTP 不安全的问题 HTTP 整个传输过程数据都是明文的&#xff0c;任何人都能够在链路中截获、修改或者伪造请求&#xff0f;响应报文&#xff0c;数据不具有可信性。 ① HTTPS 使用加密算法对报文进行加密&#xff0c;黑客截获了也看不懂 ② HTTP…

Play Beyond:Sui让优秀的游戏变得更好

自问世以来&#xff0c;视频游戏就紧随着文化产业发展。从Pong和Space Invaders的时代到Animal Crossing和Among Us&#xff0c;伟大的游戏总有能力吸引玩家&#xff0c;并推动娱乐产业发展。根据Grand View Research的数据&#xff0c;全球视频游戏市场在2022年估计为2170.6亿…

CUDA C编程权威指南:1.1-CUDA基础知识点梳理

主要整理了N多年前&#xff08;2013年&#xff09;学习CUDA的时候开始总结的知识点&#xff0c;好长时间不写CUDA代码了&#xff0c;现在LLM推理需要重新学习CUDA编程&#xff0c;看来出来混迟早要还的。 1.CUDA 解析&#xff1a;2007年&#xff0c;NVIDIA推出CUDA&#xff08…

Docker 日志管理 - ELK

Author&#xff1a;rab 目录 前言一、Docker 日志驱动二、ELK 套件部署三、Docker 容器日志采集3.1 部署 Filebeat3.2 配置 Filebeat3.3 验证采集数据3.4 Kibana 数据展示3.4.1 创建索引模式3.4.2 Kibana 查看日志 总结 前言 如何查看/管理 Docker 运行容器的日志&#xff1f;…

图像拼接后丢失数据,转tiff报错rasterfile failed: an unknown

图像拼接后丢失数据 不仅是数据丢失了&#xff0c;还有个未知原因报错 部分数据存在值不存在的情况 原因 处理遥感数据很容易&#xff0c;磁盘爆满了 解决方案 清理一些无用数据&#xff0c;准备买个2T的外接硬盘用着了。 然后重新做处理

redis高可用(主从复制,哨兵,集群)

目录 一、主从复制&#xff1a; 1.主从复制介绍&#xff1a; 2.主从复制的作用&#xff1a; 3.主从复制流程&#xff1a; 4.搭建Redis 主从复制&#xff1a; 4.1 环境准备&#xff1a; 4.2 安装redis&#xff1a; 4.3 master节点修改 Redis 配置文件&#xff1a; 4.4 slave节点…

Linux学习之悟空派上实现OLED的无线网IP及CPU温度显示【守护进程】

起因 最近各种网购平台似乎都在推送99元的悟空派全志H3的开发板&#xff0c;出于好奇就买了一块来试试水&#xff0c;由于这块板子基本上和orangepi-Zero的硬件结构一模一样&#xff0c;所以设备树、boot这些就用orangepi现成的部件了。 因为本人比较喜欢使用SSH操作&#xff…

C++——list(2)

作者&#xff1a;几冬雪来 时间&#xff1a;2023年9月28日 内容&#xff1a;C——list内容讲解 目录 前言&#xff1a; list的const迭代器&#xff1a; const的iterator&#xff1a; const迭代器&#xff1a; operator->: 拷贝构造&#xff1a; 迭代器接口补充&…

【LittleXi】【MIT6.S081-2020Fall】Lab: locks

【MIT6.S081-2020Fall】Lab: locks 【MIT6.S081-2020Fall】Lab: locks内存分配实验内存分配实验准备实验目的1. 举一个例子说明修改前的**kernel/kalloc.c**中如果没有锁会导致哪些进程间竞争(races)问题2. 说明修改前的kernel/kalloc.c中锁竞争contention问题及其后果3. 解释a…

Elasticsearch安装访问

Elasticsearch 是一个开源的、基于 Lucene 的分布式搜索和分析引擎&#xff0c;设计用于云计算环境中&#xff0c;能够实现实时的、可扩展的搜索、分析和探索全文和结构化数据。它具有高度的可扩展性&#xff0c;可以在短时间内搜索和分析大量数据。 Elasticsearch 不仅仅是一个…

【云笔记篇】Microsoft OneNote笔记插件推荐OneMore

【云笔记篇】Microsoft OneNote笔记插件推荐OneMore OneMore插件是一款非常强大&#xff0c;多达一百多个扩展功能的OneNote笔记插件&#xff0c;而且免费开源&#xff0c;不断更新的优秀插件—【蘇小沐】 1、实验 【OneMore官网&#xff1a;OneMore - a OneNote add-in (on…

C++——类和对象

文章目录 1.面向过程和面向对象的区别2.类的定义3.类的特点3.1封装性3.2继承性3.3多态性 4.类的访问限定符及封装4.1访问限定符4.2封装 5.类的作用域6.类的实例化7.计算类对象大小8.this指针8.1this指针的引出8.2this指针的特性 9.类的6个默认成员函数9.1默认构造函数9.1.1概念…

Elastic SQL 输入:数据库指标可观测性的通用解决方案

作者&#xff1a;Lalit Satapathy, Ishleen Kaur, Muthukumar Paramasivam Elastic SQL 输入&#xff08;metricbeat 模块和输入包&#xff09;允许用户以灵活的方式对许多支持的数据库执行 SQL 查询&#xff0c;并将结果指标提取到 Elasticsearch。 本博客深入探讨了通用 SQL …

单调队列---数据结构与算法

简介 队列也是一种受限制的线性表和栈相类似&#xff0c;栈是先进后出&#xff0c;而队列是先进先出&#xff0c;就好像一没有底的桶&#xff0c;往里面放东西&#xff0c;如图 在这里也是用数组来实现队列&#xff0c;用数组实现的叫做顺序队列 队列的数组模拟 const int N…

学习笔记|ADC反推电源电压|扫描按键(长按循环触发)|课设级实战练习|STC32G单片机视频开发教程(冲哥)|第十八集:ADC实战

文章目录 1.ADC反推电源电压测出Vref引脚电压的意义?手册示例代码分析复写手册代码Tips&#xff1a;乘除法与移位关系为什么4096后面还有L 2.ADC扫描按键(长按循环触发)长按触发的实现 3.实战小练1.初始状态显示 00 - 00 - 00&#xff0c;分别作为时&#xff0c;分&#xff0c…

buuctf-[GXYCTF2019]禁止套娃 git泄露,无参数rce

用dirsearch扫一下&#xff0c;看到flag.php 访问一下没啥东西&#xff0c;使用githack python2 GitHack.py http://8996e81f-a75c-4180-b0ad-226d97ba61b2.node4.buuoj.cn/.git/查看index.php <?php include "flag.php"; echo "flag在哪里呢&#xff1f;…

Net相关的各类开源项目

Net相关的各类开源项目 WPFHandyControlLive-ChartsWPFDeveloperswpf-uidesignStylet WebScheduleMasterYiShaAdminBlog.CoreNebula.AdminNewLife.CubeOpenAuth UnityuGUIUnityCsReferenceEpitomeMyUnityFrameWorkKSFrameworkTowerDefense-GameFramework-Demo 通用ClientServer…

数据库安全与保护

数据库安全与保护 文章目录 第一节 数据库完整性一、完整性约束条件的作用对象1.列级约束2.元组约束3.表级约束 二、定义与实现完整性约束1、实体完整性2、参照完整性3、自定义完整性非空约束 三、命名完整性约束四、更新完整性约束1、删除约束2、添加约束 第二节 触发器一、创…

NUWA论文阅读

论文链接&#xff1a;NUWA: Visual Synthesis Pre-training for Neural visUal World creAtion 文章目录 摘要引言相关工作视觉自回归模型视觉稀疏自注意 方法3D数据表征3D Nearby Self-Attention3D编码器-解码器训练目标 实验实现细节与SOTA比较T2I微调T2V微调V2V微调Sketch-t…