一、导包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency>
二、创建定时任务
创建一个定时任务如下,需要传入定时任务执行的class、定时任务名字、定时任务组名字、core表达式、执行参数、是否初始化创建(做持久化用到的)
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();public void createJob(Class<? extends Job> jobClass, String jobName, String jobGroupName, String cronExpression, JSONObject params, boolean isInit) {// 创建scheduler,调度器, 策略采用错过之后立即执行一次CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionFireAndProceed();Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName).startNow().withSchedule(scheduleBuilder).build();// 定义一个JobDetailJobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();trigger.getJobDataMap().putAll(params);try {scheduler.scheduleJob(jobDetail, trigger);// 启动任务调度scheduler.start();} catch (Exception e) {log.error("创建定时任务失败,jobName:{},jobGroupName:{}", jobName, jobGroupName);throw new InterfaceException(ErrorCodeEnum.CREATE_QUARTZ_JOB_ERROR.getErrorCode(), ErrorCodeEnum.CREATE_QUARTZ_JOB_ERROR.getErrorMessage());}}
上述的定时任务类需要实现Job接口,实现的execute方法就是具体定时任务需要做的内容:
package com.supcon.mare.tankinfo.service.closedpath.quartz.job;import com.supcon.mare.SpringTool;
import com.supcon.mare.tankinfo.service.closedpath.ClosedPathConstants;
import com.supcon.mare.tankinfo.service.closedpath.TagWriteVO;
import com.supcon.mare.tankinfo.service.closedpath.VfTagHelper;
import com.supcon.mare.tankinfo.service.closedpath.quartz.mananer.impl.QuartzManagerImpl;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.SchedulerException;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.stereotype.Service;import java.util.List;@Slf4j
@Service
public class CancelWriteClosedEqpColor implements Job {public CancelWriteClosedEqpColor() throws SchedulerException {}@Overridepublic void execute(JobExecutionContext jobExecutionContext) {//获取参数JobDataMap mergedJobDataMap = jobExecutionContext.getMergedJobDataMap();// 创建的时候塞了什么参数就get什么String modelName = mergedJobDataMap.get("modelName").toString(); // todo }
}
创建的时候直接调用:
quartzManager.createJob(CancelWriteClosedEqpColor.class, modelName, ClosedPathConstants.QUARTZ_GROUP_CLOSED_WRITE_COLOR,cronExpression, params, false);
三、定时任务持久化
如果创建了一个定时任务,但是这时候服务突然崩掉了,再起来的时候该做的事情并没有做(比如要对某些数据进行回退),就会留下脏数据在系统里面,因此需要在系统启动的时候把之前没删除的定时任务再装载进来继续执行
创建的时候需要对任务进行存储,如果是在系统初始化数据库中存在没有删除的定时任务,则需要创建,持久化框架我用的jpa,如下:
@Override@Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)public void createJob(Class<? extends Job> jobClass, String jobName, String jobGroupName, String cronExpression, JSONObject params, boolean isInit) {// 创建scheduler,调度器, 策略采用错过之后立即执行一次CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionFireAndProceed();Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName).startNow().withSchedule(scheduleBuilder).build();// 定义一个JobDetailJobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();trigger.getJobDataMap().putAll(params);try {scheduler.scheduleJob(jobDetail, trigger);// 启动任务调度scheduler.start();} catch (Exception e) {log.error("创建定时任务失败,jobName:{},jobGroupName:{}", jobName, jobGroupName);throw new InterfaceException(ErrorCodeEnum.CREATE_QUARTZ_JOB_ERROR.getErrorCode(), ErrorCodeEnum.CREATE_QUARTZ_JOB_ERROR.getErrorMessage());}if (!isInit) {// 持久化到数据库ScheduledJobEntity scheduledJobEntity = new ScheduledJobEntity();scheduledJobEntity.setJobName(jobName);scheduledJobEntity.setJobGroupName(jobGroupName);scheduledJobEntity.setJobClassName(jobClass.getName());scheduledJobEntity.setCronExpression(cronExpression);scheduledJobEntity.setValid(Constants.VALID_TRUE);scheduledJobEntity.setParams(params.toJSONString());schedulerJobRepository.save(scheduledJobEntity);}log.info("创建定时任务成功,jobName:{},jobGroupName:{}", jobName, jobGroupName);}
初始化执行方式为实现了ApplicationRunner,重写run方法,我这里的定时任务是五秒中之后执行一次就可以,这个需要根据需求自己写
@Overridepublic void run(ApplicationArguments args) throws Exception {// 启动自动装配定时任务,执行之前未执行的任务List<ScheduledJobEntity> scheduledJobEntities = schedulerJobRepository.findByAttr("valid", Constants.VALID_TRUE.toString());for (ScheduledJobEntity scheduledJobEntity : scheduledJobEntities) {String jobName = scheduledJobEntity.getJobName();String jobGroupName = scheduledJobEntity.getJobGroupName();// 执行参数String paramsString = scheduledJobEntity.getParams();JSONObject params = JSONObject.parseObject(paramsString);Calendar calendar = Calendar.getInstance();calendar.add(Calendar.SECOND, 5);// 几分钟之后回退String cronExpression = Utils.formatDateByPattern(calendar.getTime(), "ss mm HH dd MM ? yyyy");createJob(CancelWriteClosedEqpColor.class, jobName, jobGroupName,cronExpression, params, true);}}
定时任务实体类:
package com.supcon.mare.tankinfo.entity;import lombok.Data;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.LastModifiedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;import javax.persistence.*;
import java.util.Date;@Entity
@Table(name = "scheduled_job")
@Data
@EntityListeners(AuditingEntityListener.class)
public class ScheduledJobEntity {@Id@GeneratedValue(strategy = GenerationType.AUTO)private Long id;/*** 任务名字*/private String jobName;/*** 任务组名字*/private String jobGroupName;/*** 任务类名*/private String jobClassName;/*** core表达式*/private String cronExpression;/*** 任务参数*/@Column(columnDefinition = "text")private String params;/*** 是否生效0/1*/private Integer valid;/*** 创建时间*/@CreatedDate@Column(name = "gmt_create", updatable = false)@Temporal(TemporalType.TIMESTAMP)private Date gmtCreate;/*** 更新时间*/@LastModifiedDate@Column(name = "gmt_modified", insertable = false)@Temporal(TemporalType.TIMESTAMP)private Date gmtModified;
}
定时任务创建、重启装载就完成了,以下是删除定时任务以及激活一次定时任务:
@Override@Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)public void deleteJob(String jobName, String jobGroupName) {try {scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));} catch (Exception e) {log.error("删除定时任务失败,jobName:{},jobGroupName:{}", jobName, jobGroupName);}List<ScheduledJobEntity> scheduledJobEntities = schedulerJobRepository.findByJobNameAndJobGroupNameAndValid(jobName, jobGroupName, Constants.VALID_TRUE);if (CollectionUtil.isNotEmpty(scheduledJobEntities)) {for (ScheduledJobEntity scheduledJobEntity : scheduledJobEntities) {scheduledJobEntity.setValid(Constants.VALID_FALSE);}schedulerJobRepository.saveAll(scheduledJobEntities);log.info("删除定时任务成功,jobName:{},jobGroupName:{}", jobName, jobGroupName);}}@Overridepublic void activeJob(String jobName, String jobGroupName) {try {Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MILLISECOND, 100);// 几分钟之后自动取消String cronExpression = Utils.formatDateByPattern(calendar.getTime(), "ss mm HH dd MM ? yyyy");TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);// 表达式调度构建器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionFireAndProceed();CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 根据Cron表达式构建一个Triggertrigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();// 按新的trigger重新设置job执行scheduler.rescheduleJob(triggerKey, trigger);// 等300毫秒激活定时任务,等待时间需要看定时任务执行一次需要多久时间Thread.sleep(300);deleteJob(jobName, jobGroupName);} catch (Exception e) {log.error("激活定时任务失败,message:{},jobName:{},jobGroupName:{}", e.getMessage(), jobName, jobGroupName);}}