一. Jdk中的定时任务
我们平时在 Spring 项目中会使用 @Scheduled 开启定时任务;
jdk 中其实也提供了定时任务线程池 ScheduledThreadPool,我们可以直接通过 Executors 工具类获取;
// 创建了核心线程数为 2 的 ScheduledThreadPool 对象
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println("正在执行任务1,线程名:" + Thread.currentThread().getName());}
}, 0, 3, TimeUnit.SECONDS);executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println("正在执行任务2,线程名:" + Thread.currentThread().getName());}
}, 0, 3, TimeUnit.SECONDS);
这里会启用两个线程去执行定时任务,打印如下;
正在执行任务1,线程名:pool-1-thread-1
正在执行任务2,线程名:pool-1-thread-1
正在执行任务1,线程名:pool-1-thread-1
正在执行任务2,线程名:pool-1-thread-2
二. Spring中的定时任务
我们知道:要开启 Spring 的定时任务,也就是要使用 @Scheduled 注解的话,需要 @EnableScheduling 启用定时任务;
下面我们从源码的角度来看一下 Spring 中的定时任务;
1. Spring中默认的TaskScheduler
Spring 项目中会存在一个默认的 taskScheduler 对象,它是一个 ThreadPoolTaskScheduler;
是在 TaskSchedulingAutoConfiguration 中导入的,可以看到会往 Spring 容器中注入一个 beanName 叫 “taskScheduler” 的 ThreadPoolTaskScheduler 对象;
@ConditionalOnClass(ThreadPoolTaskScheduler.class)
@AutoConfiguration(after = TaskExecutionAutoConfiguration.class)
@EnableConfigurationProperties(TaskSchedulingProperties.class)
public class TaskSchedulingAutoConfiguration {@Bean@ConditionalOnBean(name = "internalScheduledAnnotationProcessor")@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {// 通过 TaskSchedulerBuilder.build() 构建 ThreadPoolTaskScheduler 对象return builder.build();}@Bean@ConditionalOnMissingBeanpublic TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties, ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {// 创建出 TaskSchedulerBuilderTaskSchedulerBuilder builder = new TaskSchedulerBuilder();// 设置 TaskSchedulerBuilder 的核心线程数builder = builder.poolSize(properties.getPool().getSize());Shutdown shutdown = properties.getShutdown();builder = builder.awaitTermination(shutdown.isAwaitTermination());// 设置 TaskSchedulerBuilder 的线程名前缀builder = builder.threadNamePrefix(properties.getThreadNamePrefix());builder = builder.customizers(taskSchedulerCustomizers);// 返回 TaskSchedulerBuilder 对象return builder;}
}
2. @Scheduled
我们需要知道是哪个类来解析 @Scheduled 注解的;
/*** 从注解的信息可以看出解析是在 ScheduledAnnotationBeanPostProcessor* 我们需要重点看 ScheduledAnnotationBeanPostProcessor 类** @see EnableScheduling* @see ScheduledAnnotationBeanPostProcessor* @see Schedules*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {long fixedDelay() default -1;long fixedRate() default -1;String cron() default "";
}
那这个 ScheduledAnnotationBeanPostProcessor 又是从哪注入到 Spring 容器的呢?其实是在 @EnableScheduling 中注入的;
3. @EnableScheduling
我们看一下 @EnableScheduling,它往 spring 容器中注入了一个配置类:SchedulingConfiguration;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)// 往 Spring 容器中注入 SchedulingConfiguration 类
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {}
我们看一下 SchedulingConfiguration 类,它往容器中注入了 ScheduledAnnotationBeanPostProcessor 对象;
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {// 往容器中注入了 ScheduledAnnotationBeanPostProcessor 对象// 并且它的 beanName 为 "internalScheduledAnnotationProcessor"// 正因为导入了 "internalScheduledAnnotationProcessor",// taskScheduler 对象才会被注入到 Spring 容器中@Bean(name = "internalScheduledAnnotationProcessor")public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}
至此,前置完成,解析 @Scheduled 注解的任务交给了 ScheduledAnnotationBeanPostProcessor,我们需要重点看 ScheduledAnnotationBeanPostProcessor 做了啥;
三. ScheduledAnnotionBeanPostProcessor
- ScheduledAnnotionBeanPostProcessor 实现了 BeanPostProcessor 接口,它的 postProcessAfterInitialization() 会解析 bean 中的 @Scheduled 注解;
- ScheduledAnnotionBeanPostProcessor 实现了 ApplicationListener 接口,它的 onApplication(ContextRefreshedEvent event) 会在 spring 刷新完 beanFactory 容器的时候调用,启用定时任务;
我们也主要从这两个方法入手;
1. postProcessAfterInitialization()
postProcessAfterInitialization() 如下;
// -------------------- ScheduledAnnotionBeanPostProcessor ---------------------
public Object postProcessAfterInitialization(Object bean, String beanName) {if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) {return bean;}Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);// 1. 遍历类中的每一个方法,收集带有 @Scheduled 注解的方法if (AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {Map<Method, Set<Scheduled>> annotatedMethods;// 构建 annotatedMethodsif (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);} else {// 2. 轮询 annotatedMethods// 对 @Scheduled 注解的方法执行 processScheduled()annotatedMethods.forEach((method, scheduledAnnotations) ->scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));}}return bean;
}
processScheduled(scheduled, method, bean) 中,scheduled 为方法上的 @Scheduled 对象;我们看下 processScheduled() 的过程,我们只关注用的多的 cron 表达式;
// -------------------- ScheduledAnnotionBeanPostProcessor ---------------------
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {// 1. 将 bean 和 method 包装为 runnable 对象Runnable runnable = createRunnable(bean, method);boolean processedSchedule = false;Set<ScheduledTask> tasks = new LinkedHashSet<>(4);// 2. 解析 cron 表达式String cron = scheduled.cron();if (StringUtils.hasText(cron)) {String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}processedSchedule = true;if (!Scheduled.CRON_DISABLED.equals(cron)) {TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}// 3. 往 this.registrar 中添加 CronTask 任务// this.registrar 为 ScheduledTaskRegistrar 类对象tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}}
}// ------------------------ ScheduledTaskRegistrar -------------------------
public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {// 1. 第一次进来的时候创建 scheduledTaskscheduledTask = new ScheduledTask(task);newTask = true;}// 2. 第一次进来的时候创建 this.taskScheduler == null,走 else 逻辑if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());} else {// 3. 将 task 放入到当前的 cronTask 中addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}
至此,bean 中的 @Scheduled 注解的定时任务都被包装为 cronTask 对象放入到 ScheduledTaskRegistrar 中;
2. onApplication(ContextRefreshEvent)
Spring 刷新完 beanFactory 容器的时候会调用该方法,启用定时任务;
// -------------------- ScheduledAnnotionBeanPostProcessor ---------------------
public void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext() == this.applicationContext) {// 调用 finishRegistration()finishRegistration();}
}// -------------------- ScheduledAnnotionBeanPostProcessor ---------------------
private void finishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}// 1. 先查找 SchedulingConfigurer,如果有的话用 SchedulingConfigurerif (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {try {// 2. 寻找 TaskScheduler bean,通过 byType 的方式// 往 this.registrar 中设置定时线程池this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));} catch (NoUniqueBeanDefinitionException ex) {// 2.1 TaskScheduler bean 不唯一,通过 byName 的方式,注入 "taskScheduler"// 寻找 TaskScheduler bean,通过 byName 的方式// 一般注入的都是默认的 "taskScheduler"this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));} catch (NoSuchBeanDefinitionException ex) {try {// 3. 寻找 ScheduledExecutorService bean,通过 byType 的方式this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));} catch (NoUniqueBeanDefinitionException ex2) {// 3.1 ScheduledExecutorService bean 不唯一// 寻找 ScheduledExecutorService bean,通过 byName 的方式this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));} catch (NoSuchBeanDefinitionException ex2) {logger.info("没有 TaskScheduler/ScheduledExecutorService bean")}}}// 4. 上述往 this.registrar 中设置了 taskScheduler 对象// 执行 this.registrar.afterPropertiesSet()this.registrar.afterPropertiesSet();
}
我们看下 this.registrar.afterPropertiesSet() 做了啥;
// ------------------------ ScheduledTaskRegistrar -------------------------
public void afterPropertiesSet() {scheduleTasks();
}// ------------------------ ScheduledTaskRegistrar -------------------------
protected void scheduleTasks() {// 1. 如果 this.taskScheduler == null// 创建单核心线程的 ThreadScheduledExecutor 作为定时线程池// 一般 this.taskScheduler 不会为 nullif (this.taskScheduler == null) {this.localExecutor = Executors.newSingleThreadScheduledExecutor();this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}// 2. this.cronTasks 不为 null// 依次遍历 cronTask,执行 scheduleCronTask(cronTask)// 这是我们第二次进入 scheduleCronTask(cronTask),和第一次有点区别if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}
}// ------------------------ ScheduledTaskRegistrar -------------------------
public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}// 1. 第二次进来,this.taskScheduler 不为 null// 执行 this.taskScheduler.schedule(),正式启动定时任务if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());} else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}
至此,SpringScheduled 全部解析完毕;
四. 配置定时线程池
1. 通过配置文件
我们在 TaskSchedulingAutoConfiguration 中知道,其实默认的 ThreadPoolTaskScheduler 都是根据配置项 TaskSchedulingProperties 创建的,默认核心线程 coreThreads = 1;
可以进行如下配置:
spring:task:scheduling:pool:size: 5thread-name-prefix: my-schedule-
2. 自定义ThreadPoolTaskScheduler
我们也可以直接往 Spring 容器中自定义注入 ThreadPoolTaskScheduler 对象,只不过需要注意它的 beanName 必须为 taskScheduler;
@Configuration
public class ThreadPoolConfig {@Beanpublic Executor taskScheduler() {ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(3);taskScheduler.setThreadNamePrefix("my-schedule-task-");taskScheduler.initialize();return taskScheduler;}
}