schedule的使用
在Spring中,你可以使用@Scheduled
注解来标记一个方法,使其成为一个定时任务。一般情况下@Scheduled注解修饰的任务方法没有返回值也没有入参。
开启schedule
@Configuration
@EnableScheduling
public class ScheduleConfig {}
在配置类上使用@EnableScheduling注解开启schedule。
fixedDelay固定间隔
@Scheduled(fixedDelay = 2000)public void fixDelay(){long workTime = 1000*(new Random().nextInt(10));log.info("fixDelay,workTime:{}",workTime);try {Thread.sleep(workTime);} catch (InterruptedException e) {e.printStackTrace();}}
fixedDelay : 固定间隔时间执行,单位毫秒
这里固定间隔指:上次调用结束和下次调用开始之间的时间间隔。会确保两个任务之间有间隔,所以fixedDelay的任务不会有同时多个任务执行。
fixedRate固定周期
@Scheduled(fixedRate = 2000)
public void fixRate(){long workTime = 1000*(new Random().nextInt(10));log.info("fixRate,workTime:{}",workTime);try {Thread.sleep(workTime);} catch (InterruptedException e) {e.printStackTrace();}
}
fixedRate: 和fixedDelay差不多,固定频率执行。但是同样的任务不会并行执行,如果超过设定的频率,前一个任务还未完成则会等待前一个任务完成,再开始下一个任务。但是不能像fixedDelay一样保证每个任务之间又固定的间隔。比如:fixedRate设置的任务周期是5秒,如果一次任务的时间耗时是3秒,则下一任务会等到5秒间隔开始,但是如果惹我你执行耗时是6秒,则本次任务结束后会立即执行下一次任务,下一任务已经排队等候1秒了。
initialDelay参数:
@Scheduled(fixedDelay = 2000,initialDelay = 3000)public void initDelay(){log.info("initDelay");}
initialDelay可以设置任务延迟多长时间后开启。
cron表达式
@Scheduled(cron = "5 * * * * ?")
public void cron(){log.info("cron");
}
cron表达式指定任务频率。表达式格式:[秒 分 时 天 月 周]。如上面的表达式意思逢5秒时候执行,也即每一分钟执行一次。
时区设置:默认情况下spring使用系统默认时区,也可以通过zone参数来指定时区。例如
@Scheduled(cron="5 * * * * ?",zone = "Asia/Shanghai")
参数化任务时间
schedule不管是fixedRate、fixedDelay 还是cron表达式,其指定时间参数不仅可以直接在注解上配置固定值活表达式,还可以从properties参数中来进行动态加载。
@Scheduled(fixedDelayString = "${shchedule.fixDelay}");
@Scheduled(fixedRateString = "${shchedule.fixRate}")
@Scheduled(cron = "${shchedule.cron}")
这样可以通过SPEL表达式从容器的配置中进行加载,可以方便的修改任务的执行计划。
源码分析
@EnableScheduling注解import一个配置类SchedulingConfiguration,然后该配置类又引入一个ScheduledAnnotationBeanPostProcessor后置处理器。前面文章有说后置处理器的after方法在bean初始化完成后会被调用。
任务的解析
ScheduledAnnotationBeanPostProcessor是一个bean处理器,那就要看他的后置处理方法postProcessAfterInitialization()
public Object postProcessAfterInitialization(Object bean, String beanName) {//跳过线程池beanif (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceof ScheduledExecutorService) {// Ignore AOP infrastructure such as scoped proxies.return bean;}Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);if (!this.nonAnnotatedClasses.contains(targetClass) &&AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);});if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);}else {// Non-empty set of methodsannotatedMethods.forEach((method, scheduledAnnotations) ->scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));}}return bean;
}
找出所有带有@Schedules注解的方法Map<Method, Set> annotatedMethods ,然后逐个调用processScheduled()方法进行处理任务。
processScheduled()方法就是解析@Schedules上注解配置的信息,进行封装成任务,最后存放到 ScheduledAnnotationBeanPostProcessor的scheduledTasks变量里。
Map<Object, Set<ScheduledTask>> scheduledTasks;
ScheduledTaskRegistrar registrar = ScheduledTaskRegistrar();
不同的任务类型会封装成不同的Task类型(FixedDelayTask、FixedRateTask、CronTask)。
ScheduledAnnotationBeanPostProcessor中还有一个ScheduledTaskRegistrar类型的registrar变量,不同类型的任务在该类中也有一份存储:
ScheduledTaskRegistrar{private TaskScheduler taskScheduler;private List<TriggerTask> triggerTasks;private List<IntervalTask> fixedRateTasks;private List<CronTask> cronTasks;
}
这个时候任务只是被抽取出来了,还没有被执行。因为还没看到TaskScheduler任务执行器初始化,那么什么时候开始执行呢?
任务的执行
ScheduledAnnotationBeanPostProcessor不仅是一个后置处理器,还实现了ApplicationListener接口。当容器发布refresh事件时,会回调其onApplicationEvent方法。前面的文章也只读spring在初始化时候有一个最重要的onrefresh方法来进行所有bean的初始化,在最后完成时会发送该事件。
public void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext() == this.applicationContext) {finishRegistration();}
}
来分析下finishRegistration()源码逻辑,这里就不贴代码了
1、判断当前容器beanFacotry是否是ListableBeanFactory,如果是就拿出所有SchedulingConfigurer类型的bean依次调用其configureTasks()方法。
这里也是我们自定人任务执行一些参数的入口,即通过实现SchedulingConfigurer接口。
2、获取TaskScheduler
step1、首先从beanFacotry尝试获取TaskScheduler类型的的bean。
如果有多个TaskScheduler类型的bean。 则尝试按bean名称为taskScheduler的再进行获取。如果没有则认为没有。
step2、如果没有TaskScheduler则尝试获取ScheduledExecutorService类型的bean,同样多个再按scheduledExecutorService名称获取一次,没有则认为没有。
3、调用registrar.afterPropertiesSet()方法
这里最终会调用registrar的scheduleTasks();
如果此时taskScheduler还是null,即上面的第2步没有从容器中获取到TaskScheduler。则会默认创建一个taskScheduler。其对应的线程池是Executors.newSingleThreadScheduledExecutor()。
看到这里就能够明白为什么默认情况schedule任务都是一个个串行执行的了,因为默认线程池是singleThread。
还有延迟一定时间执行,周期执行这都是java ScheduledThreadPoolExecutor所支持的。
ScheduledTaskRegistrar#scheduleTasks代码有必要看以下。
protected void scheduleTasks() {if (this.taskScheduler == null) {this.localExecutor = Executors.newSingleThreadScheduledExecutor();this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}
}
这里就是拿出所有的Task来进行执行了,最后任务执行都是通过taskScheduler来执行的。
ConcurrentTaskScheduler几个启动不同任务方法:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {long initialDelay = startTime.getTime() - this.clock.millis();return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
}public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {long initialDelay = startTime.getTime() - this.clock.millis();return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);}public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {if (this.enterpriseConcurrentScheduler) {return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);}else {ErrorHandler errorHandler =(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();}}}
FixedRate和FixedDelay就不必看了,是之间丢给scheduledExecutor线程池去执行的,这里看下cron类型最后会调用schedule()方法。这里看其else分支使用ReschedulingRunnable类进行执行。
ReschedulingRunnable#schedule()
public ScheduledFuture<?> schedule() {synchronized (this.triggerContextMonitor) {//根据cron表达式计算任务执行间隔this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime == null) {return null;}long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();//延迟执行任务this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;}
}
最后也是交给线程池schedule方法执行。那么是怎么实现cron周期性执行的呢?
这里延迟执行的任务是this(ReschedulingRunnable),到时间就会调用其run方法,来看run方法逻辑
ReschedulingRunnable#run()
public void run() {Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());//原业务逻辑super.run();Date completionTime = new Date(this.triggerContext.getClock().millis());synchronized (this.triggerContextMonitor) {Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");//更新triggerthis.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);if (!obtainCurrentFuture().isCancelled()) {//任务未被取消schedule();//再次执行schedule}}
}
这里看到到时间run方法首先会调用原被包装的业务逻辑。然后更新trigger信息,再次调用schedule()方法生成下一周期任务到executor执行,到时间又会调用run方法这样循环往复完成cron周期性执行。
执行器配置
从任务的执行部分逻辑可以知道,我们有好几个位置可以设置taskScheduler。
1、通过实现SchedulingConfigurer接口
public class MySchedulingConfigurer implements SchedulingConfigurer {@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {//设置taskSchedulerThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(5);taskScheduler.setThreadNamePrefix("taskScheduler");taskScheduler.initialize();taskRegistrar.setTaskScheduler(taskScheduler);//动态添加任务taskRegistrar.addFixedRateTask(new IntervalTask(()->{log.info("new add task");},5000,0));}
}
这里再运行观察日志,会看到是使用的自定义线程池进行执行任务,最后新加的任务也正常执行。
17:49:47.368 [taskScheduler2] INFO com.cpx.service.schedule.MySchedulingConfigurer - new add task
17:49:52.368 [taskScheduler2] INFO com.cpx.service.schedule.MySchedulingConfigurer - new add task
17:49:57.369 [taskScheduler1] INFO com.cpx.service.schedule.MySchedulingConfigurer - new add task
17:50:02.369 [taskScheduler1] INFO com.cpx.service.schedule.MySchedulingConfigurer - new add task
17:50:05.008 [taskScheduler4] INFO com.cpx.service.schedule.ScheduleService - cron
2、通过直接定义executor类型bean的方式。
前面ScheduledAnnotationBeanPostProcessor后置处理器获取taskScheduler过程中可以知道。我们可以直接在Configuration中定义TaskScheduler类型或ScheduledExecutorService类型的bean即可。