参考网址
目标:定时任务持久化到数据库,动态调整数据库里保存的cron表达式使定时任务可以跟随变化。
从SYS_QUARTZ_JOB表(通过反射创建任务)和SYS_QUARTZ_LOG表(主要就是记录日志)构建两个对应的实体类:QuartzJob和QuartzLog
1.看表结构
-- Create table
create table SYS_QUARTZ_JOB
(job_id NUMBER(20) not null,bean_name NVARCHAR2(255),cron_expression NVARCHAR2(255),is_pause CHAR(1),job_name NVARCHAR2(255),method_name NVARCHAR2(255),params NVARCHAR2(255),description NVARCHAR2(255),person_in_charge NVARCHAR2(100),email NVARCHAR2(100),sub_task NVARCHAR2(100),pause_after_failure CHAR(1),create_by NVARCHAR2(255),update_by NVARCHAR2(255),create_time DATE,update_time DATE
)
tablespace USERSpctfree 10initrans 1maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);
-- Add comments to the table
comment on table SYS_QUARTZ_JOBis '定时任务';
-- Add comments to the columns
comment on column SYS_QUARTZ_JOB.job_idis 'ID';
comment on column SYS_QUARTZ_JOB.bean_nameis 'Spring Bean名称';
comment on column SYS_QUARTZ_JOB.cron_expressionis 'cron 表达式';
comment on column SYS_QUARTZ_JOB.is_pauseis '状态:1暂停、0启用';
comment on column SYS_QUARTZ_JOB.job_nameis '任务名称';
comment on column SYS_QUARTZ_JOB.method_nameis '方法名称';
comment on column SYS_QUARTZ_JOB.paramsis '参数';
comment on column SYS_QUARTZ_JOB.descriptionis '备注';
comment on column SYS_QUARTZ_JOB.person_in_chargeis '负责人';
comment on column SYS_QUARTZ_JOB.emailis '报警邮箱';
comment on column SYS_QUARTZ_JOB.sub_taskis '子任务ID';
comment on column SYS_QUARTZ_JOB.pause_after_failureis '任务失败后是否暂停';
comment on column SYS_QUARTZ_JOB.create_byis '创建者';
comment on column SYS_QUARTZ_JOB.update_byis '更新者';
comment on column SYS_QUARTZ_JOB.create_timeis '创建日期';
comment on column SYS_QUARTZ_JOB.update_timeis '更新时间';
-- Create/Recreate indexes
create index INX_IS_PAUSE on SYS_QUARTZ_JOB (IS_PAUSE)tablespace USERSpctfree 10initrans 2maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);
-- Create/Recreate primary, unique and foreign key constraints
alter table SYS_QUARTZ_JOBadd primary key (JOB_ID)using index tablespace USERSpctfree 10initrans 2maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);
图形化理解字段:
-- Create table
create table SYS_QUARTZ_LOG
(log_id NUMBER(20) not null,bean_name NVARCHAR2(255),create_time DATE,cron_expression NVARCHAR2(255),exception_detail NCLOB,is_success VARCHAR2(1),job_name NVARCHAR2(255),method_name NVARCHAR2(255),params NVARCHAR2(255),time NUMBER(20)
)
tablespace USERSpctfree 10initrans 1maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);
-- Add comments to the table
comment on table SYS_QUARTZ_LOGis '定时任务日志';
-- Add comments to the columns
comment on column SYS_QUARTZ_LOG.log_idis 'ID';
-- Create/Recreate primary, unique and foreign key constraints
alter table SYS_QUARTZ_LOGadd primary key (LOG_ID)using index tablespace USERSpctfree 10initrans 2maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);
图形化理解字段:
2.项目各个文件位置及代码
config目录下(全局配置)
/** */
package com.njry.modules.quartz.config;import com.njry.modules.quartz.domain.QuartzJob;
import lombok.RequiredArgsConstructor;
import com.njry.modules.quartz.mapper.QuartzJobMapper;
import com.njry.modules.quartz.utils.QuartzManage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.List;/*** @author* @date 2024-05-11*/
@Component
@RequiredArgsConstructor
public class JobRunner implements ApplicationRunner {private static final Logger log = LoggerFactory.getLogger(JobRunner.class);private final QuartzJobMapper quartzJobMapper;private final QuartzManage quartzManage;/*** 项目启动时重新激活启用的定时任务** @param applicationArguments /*/@Overridepublic void run(ApplicationArguments applicationArguments) {List<QuartzJob> quartzJobs = quartzJobMapper.findByIsPauseIsFalse();quartzJobs.forEach(quartzManage::addJob);log.info("Timing task injection complete");}
}
/** */
package com.njry.modules.quartz.config;import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;/*** 定时任务配置* @author * @date */
@Configuration
public class QuartzConfig {/*** 解决Job中注入Spring Bean为null的问题*/@Component("quartzJobFactory")public static class QuartzJobFactory extends AdaptableJobFactory {private final AutowireCapableBeanFactory capableBeanFactory;public QuartzJobFactory(AutowireCapableBeanFactory capableBeanFactory) {this.capableBeanFactory = capableBeanFactory;}@Overrideprotected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {//调用父类的方法,把Job注入到spring中Object jobInstance = super.createJobInstance(bundle);capableBeanFactory.autowireBean(jobInstance);return jobInstance;}}
}
domain目录下(实体类和请求体类)
vo目录下
/***/
package com.njry.modules.quartz.domain.vo;import lombok.Data;
import java.sql.Timestamp;
import java.util.List;/*** @author* @date */
@Data
public class QuartzJobQueryCriteria {private String jobName;private Boolean isSuccess;private List<Timestamp> createTime;
}
/** */
package com.njry.modules.quartz.domain;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njry.base.BaseEntity;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;/*** @author * @date */
@Getter
@Setter
@TableName("sys_quartz_job")
public class QuartzJob extends BaseEntity implements Serializable {public static final String JOB_KEY = "JOB_KEY";@TableId(value = "job_id", type = IdType.INPUT)@NotNull(groups = {Update.class})private Long id;@TableField(exist = false)@ApiModelProperty(value = "用于子任务唯一标识", hidden = true)private String uuid;@ApiModelProperty(value = "定时器名称")private String jobName;@NotBlank@ApiModelProperty(value = "Bean名称")private String beanName;@NotBlank@ApiModelProperty(value = "方法名称")private String methodName;@ApiModelProperty(value = "参数")private String params;@NotBlank@ApiModelProperty(value = "cron表达式")private String cronExpression;@ApiModelProperty(value = "状态,暂时或启动")private Boolean isPause = false;@ApiModelProperty(value = "负责人")private String personInCharge;@ApiModelProperty(value = "报警邮箱")private String email;@ApiModelProperty(value = "子任务")private String subTask;@ApiModelProperty(value = "失败后暂停")private Boolean pauseAfterFailure;@NotBlank@ApiModelProperty(value = "备注")private String description;
}
/***/
package com.njry.modules.quartz.domain;import com.baomidou.mybatisplus.annotation.*;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.sql.Timestamp;/*** @author * @date */
@Data
@TableName("sys_quartz_log")
public class QuartzLog implements Serializable {@TableId(value = "log_id", type = IdType.INPUT)@ApiModelProperty(value = "ID", hidden = true)private Long id;@ApiModelProperty(value = "任务名称", hidden = true)private String jobName;@ApiModelProperty(value = "bean名称", hidden = true)private String beanName;@ApiModelProperty(value = "方法名称", hidden = true)private String methodName;@ApiModelProperty(value = "参数", hidden = true)private String params;@ApiModelProperty(value = "cron表达式", hidden = true)private String cronExpression;@ApiModelProperty(value = "状态", hidden = true)private Boolean isSuccess;@ApiModelProperty(value = "异常详情", hidden = true)private String exceptionDetail;@ApiModelProperty(value = "执行耗时", hidden = true)private Long time;@TableField(fill = FieldFill.INSERT)@ApiModelProperty(value = "创建时间", hidden = true)private Timestamp createTime;
}
mapper目录下
/***/
package com.njry.modules.quartz.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njry.modules.quartz.domain.QuartzJob;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;import java.util.List;/*** @author * @date **/
@Mapper
public interface QuartzJobMapper extends BaseMapper<QuartzJob> {IPage<QuartzJob> findAll(@Param("criteria") QuartzJobQueryCriteria criteria, Page<Object> page);List<QuartzJob> findAll(@Param("criteria") QuartzJobQueryCriteria criteria);List<QuartzJob> findByIsPauseIsFalse();Long getSeq();
}
/***/
package com.njry.modules.quartz.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;/*** @author * @date **/
@Mapper
public interface QuartzLogMapper extends BaseMapper<QuartzLog> {IPage<QuartzLog> findAll(@Param("criteria") QuartzJobQueryCriteria criteria, Page<Object> page);List<QuartzLog> findAll(@Param("criteria") QuartzJobQueryCriteria criteria);Long getSeq();
}
rest目录下
/***/
package com.njry.modules.quartz.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;/*** @author * @date**/
@Mapper
public interface QuartzLogMapper extends BaseMapper<QuartzLog> {IPage<QuartzLog> findAll(@Param("criteria") QuartzJobQueryCriteria criteria, Page<Object> page);List<QuartzLog> findAll(@Param("criteria") QuartzJobQueryCriteria criteria);Long getSeq();
}
service目录下
impl目录下
/***/
package com.njry.modules.quartz.service.impl;import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njry.exception.BadRequestException;
import com.njry.modules.quartz.domain.QuartzJob;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.modules.quartz.utils.QuartzManage;
import com.njry.utils.*;
import lombok.RequiredArgsConstructor;
import com.njry.modules.quartz.mapper.QuartzJobMapper;
import com.njry.modules.quartz.mapper.QuartzLogMapper;
import com.njry.modules.quartz.service.QuartzJobService;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import com.njry.utils.*;
import org.quartz.CronExpression;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;/*** @author * @date */
@RequiredArgsConstructor
@Service(value = "quartzJobService")
public class QuartzJobServiceImpl extends ServiceImpl<QuartzJobMapper, QuartzJob> implements QuartzJobService {private final QuartzJobMapper quartzJobMapper;private final QuartzLogMapper quartzLogMapper;private final QuartzManage quartzManage;private final RedisUtils redisUtils;@Overridepublic PageResult<QuartzJob> queryAll(QuartzJobQueryCriteria criteria, Page<Object> page){return PageUtil.toPage(quartzJobMapper.findAll(criteria, page));}@Overridepublic PageResult<QuartzLog> queryAllLog(QuartzJobQueryCriteria criteria, Page<Object> page){return PageUtil.toPage(quartzLogMapper.findAll(criteria, page));}@Overridepublic List<QuartzJob> queryAll(QuartzJobQueryCriteria criteria) {return quartzJobMapper.findAll(criteria);}@Overridepublic List<QuartzLog> queryAllLog(QuartzJobQueryCriteria criteria) {return quartzLogMapper.findAll(criteria);}@Override@Transactional(rollbackFor = Exception.class)public void create(QuartzJob resources) {if (!CronExpression.isValidExpression(resources.getCronExpression())){throw new BadRequestException("cron表达式格式错误");}resources.setId(quartzJobMapper.getSeq());save(resources);quartzManage.addJob(resources);}@Override@Transactional(rollbackFor = Exception.class)public void update(QuartzJob resources) {if (!CronExpression.isValidExpression(resources.getCronExpression())){throw new BadRequestException("cron表达式格式错误");}if(StringUtils.isNotBlank(resources.getSubTask())){List<String> tasks = Arrays.asList(resources.getSubTask().split("[,,]"));if (tasks.contains(resources.getId().toString())) {throw new BadRequestException("子任务中不能添加当前任务ID");}}saveOrUpdate(resources);quartzManage.updateJobCron(resources);}@Override@Transactional(rollbackFor = Exception.class)public void updateIsPause(QuartzJob quartzJob) {if (quartzJob.getIsPause()) {quartzManage.resumeJob(quartzJob);quartzJob.setIsPause(false);} else {quartzManage.pauseJob(quartzJob);quartzJob.setIsPause(true);}saveOrUpdate(quartzJob);}@Overridepublic void execution(QuartzJob quartzJob) {quartzManage.runJobNow(quartzJob);}@Override@Transactional(rollbackFor = Exception.class)public void delete(Set<Long> ids) {for (Long id : ids) {QuartzJob quartzJob = getById(id);quartzManage.deleteJob(quartzJob);removeById(quartzJob);}}@Async@Override@Transactional(rollbackFor = Exception.class)public void executionSubJob(String[] tasks) throws InterruptedException {for (String id : tasks) {if (StrUtil.isBlank(id)) {// 如果是手动清除子任务id,会出现id为空字符串的问题continue;}QuartzJob quartzJob = getById(Long.parseLong(id));// 执行任务String uuid = IdUtil.simpleUUID();quartzJob.setUuid(uuid);// 执行任务execution(quartzJob);// 获取执行状态,如果执行失败则停止后面的子任务执行Boolean result = (Boolean) redisUtils.get(uuid);while (result == null) {// 休眠5秒,再次获取子任务执行情况Thread.sleep(5000);result = (Boolean) redisUtils.get(uuid);}if(!result){redisUtils.del(uuid);break;}}}@Overridepublic void download(List<QuartzJob> quartzJobs, HttpServletResponse response) throws IOException {List<Map<String, Object>> list = new ArrayList<>();for (QuartzJob quartzJob : quartzJobs) {Map<String,Object> map = new LinkedHashMap<>();map.put("任务名称", quartzJob.getJobName());map.put("Bean名称", quartzJob.getBeanName());map.put("执行方法", quartzJob.getMethodName());map.put("参数", quartzJob.getParams());map.put("表达式", quartzJob.getCronExpression());map.put("状态", quartzJob.getIsPause() ? "暂停中" : "运行中");map.put("描述", quartzJob.getDescription());map.put("创建日期", quartzJob.getCreateTime());list.add(map);}FileUtil.downloadExcel(list, response);}@Overridepublic void downloadLog(List<QuartzLog> queryAllLog, HttpServletResponse response) throws IOException {List<Map<String, Object>> list = new ArrayList<>();for (QuartzLog quartzLog : queryAllLog) {Map<String,Object> map = new LinkedHashMap<>();map.put("任务名称", quartzLog.getJobName());map.put("Bean名称", quartzLog.getBeanName());map.put("执行方法", quartzLog.getMethodName());map.put("参数", quartzLog.getParams());map.put("表达式", quartzLog.getCronExpression());map.put("异常详情", quartzLog.getExceptionDetail());map.put("耗时/毫秒", quartzLog.getTime());map.put("状态", quartzLog.getIsSuccess() ? "成功" : "失败");map.put("创建日期", quartzLog.getCreateTime());list.add(map);}FileUtil.downloadExcel(list, response);}
}
/** */
package com.njry.modules.quartz.service;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njry.modules.quartz.domain.QuartzJob;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.utils.PageResult;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
import java.util.Set;/*** @author* @date*/
public interface QuartzJobService extends IService<QuartzJob> {/*** 分页查询** @param criteria 条件* @param page 分页参数* @return /*/PageResult<QuartzJob> queryAll(QuartzJobQueryCriteria criteria, Page<Object> page);/*** 查询全部* @param criteria 条件* @return /*/List<QuartzJob> queryAll(QuartzJobQueryCriteria criteria);/*** 分页查询日志** @param criteria 条件* @param page 分页参数* @return /*/PageResult<QuartzLog> queryAllLog(QuartzJobQueryCriteria criteria, Page<Object> page);/*** 查询全部* @param criteria 条件* @return /*/List<QuartzLog> queryAllLog(QuartzJobQueryCriteria criteria);/*** 创建* @param resources /*/void create(QuartzJob resources);/*** 编辑* @param resources /*/void update(QuartzJob resources);/*** 删除任务* @param ids /*/void delete(Set<Long> ids);/*** 更改定时任务状态* @param quartzJob /*/void updateIsPause(QuartzJob quartzJob);/*** 立即执行定时任务* @param quartzJob /*/void execution(QuartzJob quartzJob);/*** 导出定时任务* @param queryAll 待导出的数据* @param response /* @throws IOException /*/void download(List<QuartzJob> queryAll, HttpServletResponse response) throws IOException;/*** 导出定时任务日志* @param queryAllLog 待导出的数据* @param response /* @throws IOException /*/void downloadLog(List<QuartzLog> queryAllLog, HttpServletResponse response) throws IOException;/*** 执行子任务* @param tasks /* @throws InterruptedException /*/void executionSubJob(String[] tasks) throws InterruptedException;
}
task目录下
这类目的:
就是定义的测试任务
/***/
package com.njry.modules.quartz.task;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;/*** 测试用* @author * @date */
@Slf4j
@Service
public class TestTask {public void run(){log.info("run 执行成功");}public void run1(String str){log.info("run1 执行成功,参数为: {}", str);}public void run2(){log.info("run2 执行成功");}
}
utils目录下
这个类目的:
通过bean容器获取QuartzLogMapper实现日志记录,
通过bean容器获取QuartzJobService判断子任务,或者任务是否存在一类的
自定义线程池,异步执行反射获取的任务----会遇到子线程获取不到主线程的SecurityContext问题
(SecurityContextHolder.getContext()获取不到Security上下文数据?)
/***/
package com.njry.modules.quartz.utils;import cn.hutool.extra.template.Template;
import cn.hutool.extra.template.TemplateConfig;
import cn.hutool.extra.template.TemplateEngine;
import cn.hutool.extra.template.TemplateUtil;
import com.njry.modules.tools.domain.vo.EmailVo;
import com.njry.modules.quartz.domain.QuartzJob;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.modules.tools.service.EmailService;
import com.njry.utils.RedisUtils;
import com.njry.utils.SpringContextHolder;
import com.njry.utils.StringUtils;
import com.njry.utils.ThrowableUtil;
import com.njry.modules.quartz.mapper.QuartzLogMapper;
import com.njry.modules.quartz.service.QuartzJobService;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.security.task.DelegatingSecurityContextAsyncTaskExecutor;import java.util.*;
import java.util.concurrent.*;/*** 参考人人开源,<a href="https://gitee.com/renrenio/renren-security">...</a>* @author * @date */
@Async
public class ExecutionJob extends QuartzJobBean {private final Logger logger = LoggerFactory.getLogger(this.getClass());// 此处仅供参考,可根据任务执行情况自定义线程池参数private final ThreadPoolTaskExecutor executor = SpringContextHolder.getBean("elAsync");// private final DelegatingSecurityContextAsyncTaskExecutor delegateExecutor= SpringContextHolder.getBean("elAsyncAgain");@Overridepublic void executeInternal(JobExecutionContext context) {// 获取任务//1. 通过context.getJobDetail().getJobDataMap()方式分别获得Job、Trigger参数//2. 通过context.getMergedJobDataMap()合并方式获得参数
//Ps:若同名则优先获取Trigger类型的数据,屏蔽了JobDetail类型数据。QuartzJob quartzJob = (QuartzJob) context.getMergedJobDataMap().get(QuartzJob.JOB_KEY);// 获取spring beanQuartzLogMapper quartzLogMapper = SpringContextHolder.getBean(QuartzLogMapper.class);QuartzJobService quartzJobService = SpringContextHolder.getBean(QuartzJobService.class);RedisUtils redisUtils = SpringContextHolder.getBean(RedisUtils.class);String uuid = quartzJob.getUuid();QuartzLog log = new QuartzLog();log.setJobName(quartzJob.getJobName());log.setBeanName(quartzJob.getBeanName());log.setMethodName(quartzJob.getMethodName());log.setParams(quartzJob.getParams());long startTime = System.currentTimeMillis();log.setCronExpression(quartzJob.getCronExpression());try {// 执行任务QuartzRunnable task = new QuartzRunnable(quartzJob.getBeanName(), quartzJob.getMethodName(), quartzJob.getParams());Future<?> future = executor.submit(task);
// Future<?> future = delegateExecutor.submit(task);future.get();long times = System.currentTimeMillis() - startTime;log.setTime(times);if(StringUtils.isNotBlank(uuid)) {redisUtils.set(uuid, true);}// 任务状态log.setIsSuccess(true);logger.info("任务执行成功,任务名称:" + quartzJob.getJobName() + ", 执行时间:" + times + "毫秒");// 判断是否存在子任务if(StringUtils.isNotBlank(quartzJob.getSubTask())){String[] tasks = quartzJob.getSubTask().split("[,,]");// 执行子任务quartzJobService.executionSubJob(tasks);}} catch (Exception e) {if(StringUtils.isNotBlank(uuid)) {redisUtils.set(uuid, false);}logger.error("任务执行失败,任务名称:" + quartzJob.getJobName());long times = System.currentTimeMillis() - startTime;log.setTime(times);// 任务状态 0:成功 1:失败log.setIsSuccess(false);log.setExceptionDetail(ThrowableUtil.getStackTrace(e));// 任务如果失败了则暂停if(quartzJob.getPauseAfterFailure() != null && quartzJob.getPauseAfterFailure()){quartzJob.setIsPause(false);//更新状态quartzJobService.updateIsPause(quartzJob);}if(quartzJob.getEmail() != null){EmailService emailService = SpringContextHolder.getBean(EmailService.class);// 邮箱报警if(StringUtils.isNoneBlank(quartzJob.getEmail())){EmailVo emailVo = taskAlarm(quartzJob, ThrowableUtil.getStackTrace(e));emailService.send(emailVo, emailService.find());}}} finally {log.setId(quartzLogMapper.getSeq());quartzLogMapper.insert(log);}}private EmailVo taskAlarm(QuartzJob quartzJob, String msg) {EmailVo emailVo = new EmailVo();emailVo.setSubject("定时任务【"+ quartzJob.getJobName() +"】执行失败,请尽快处理!");Map<String, Object> data = new HashMap<>(16);data.put("task", quartzJob);data.put("msg", msg);TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH));Template template = engine.getTemplate("taskAlarm.ftl");emailVo.setContent(template.render(data));List<String> emails = Arrays.asList(quartzJob.getEmail().split("[,,]"));emailVo.setTos(emails);return emailVo;}
}
这个类目的:
实现定时任务增删的基本操作
/** */
package com.njry.modules.quartz.utils;import com.njry.exception.BadRequestException;
import com.njry.modules.quartz.domain.QuartzJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import static org.quartz.TriggerBuilder.newTrigger;/*** @author * @date */
@Slf4j
@Component
public class QuartzManage {private static final String JOB_NAME = "TASK_";@Resourceprivate Scheduler scheduler;public void addJob(QuartzJob quartzJob){try {// 构建job信息JobDetail jobDetail = JobBuilder.newJob(ExecutionJob.class).withIdentity(JOB_NAME + quartzJob.getId()).build();//通过触发器名和cron 表达式创建 TriggerTrigger cronTrigger = newTrigger().withIdentity(JOB_NAME + quartzJob.getId()).startNow().withSchedule(CronScheduleBuilder.cronSchedule(quartzJob.getCronExpression())).build();// 这里保存调度任务信息,在ExecutionJob取出来,用QuartzRunnable反射找到对应执行job,但是交给子线程异步执行cronTrigger.getJobDataMap().put(QuartzJob.JOB_KEY, quartzJob);//重置启动时间((CronTriggerImpl)cronTrigger).setStartTime(new Date());//执行定时任务scheduler.scheduleJob(jobDetail,cronTrigger);// 暂停任务if (quartzJob.getIsPause()) {pauseJob(quartzJob);}} catch (Exception e){log.error("创建定时任务失败", e);throw new BadRequestException("创建定时任务失败");}}/*** 更新job cron表达式* @param quartzJob /*/public void updateJobCron(QuartzJob quartzJob){try {TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + quartzJob.getId());CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 如果不存在则创建一个定时任务if(trigger == null){addJob(quartzJob);trigger = (CronTrigger) scheduler.getTrigger(triggerKey);}CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzJob.getCronExpression());trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();//重置启动时间((CronTriggerImpl)trigger).setStartTime(new Date());trigger.getJobDataMap().put(QuartzJob.JOB_KEY,quartzJob);scheduler.rescheduleJob(triggerKey, trigger);// 暂停任务if (quartzJob.getIsPause()) {pauseJob(quartzJob);}} catch (Exception e){log.error("更新定时任务失败", e);throw new BadRequestException("更新定时任务失败");}}/*** 删除一个job* @param quartzJob /*/public void deleteJob(QuartzJob quartzJob){try {JobKey jobKey = JobKey.jobKey(JOB_NAME + quartzJob.getId());scheduler.pauseJob(jobKey);scheduler.deleteJob(jobKey);} catch (Exception e){log.error("删除定时任务失败", e);throw new BadRequestException("删除定时任务失败");}}/*** 恢复一个job* @param quartzJob /*/public void resumeJob(QuartzJob quartzJob){try {TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + quartzJob.getId());CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 如果不存在则创建一个定时任务if(trigger == null) {addJob(quartzJob);}JobKey jobKey = JobKey.jobKey(JOB_NAME + quartzJob.getId());scheduler.resumeJob(jobKey);} catch (Exception e){log.error("恢复定时任务失败", e);throw new BadRequestException("恢复定时任务失败");}}/*** 立即执行job* @param quartzJob /*/public void runJobNow(QuartzJob quartzJob){try {TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + quartzJob.getId());CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 如果不存在则创建一个定时任务if(trigger == null) {addJob(quartzJob);}JobDataMap dataMap = new JobDataMap();dataMap.put(QuartzJob.JOB_KEY, quartzJob);JobKey jobKey = JobKey.jobKey(JOB_NAME + quartzJob.getId());scheduler.triggerJob(jobKey,dataMap);} catch (Exception e){log.error("定时任务执行失败", e);throw new BadRequestException("定时任务执行失败");}}/*** 暂停一个job* @param quartzJob /*/public void pauseJob(QuartzJob quartzJob){try {JobKey jobKey = JobKey.jobKey(JOB_NAME + quartzJob.getId());scheduler.pauseJob(jobKey);} catch (Exception e){log.error("定时任务暂停失败", e);throw new BadRequestException("定时任务暂停失败");}}
}
这个类目的:
通过反射的方式调用定时任务,这样就不用手动生成每个Quartz的Job
/** */
package com.njry.modules.quartz.utils;import com.njry.utils.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;/*** 执行定时任务* @author */
@Slf4j
public class QuartzRunnable implements Callable<Object> {private final Object target;private final Method method;private final String params;QuartzRunnable(String beanName, String methodName, String params)throws NoSuchMethodException, SecurityException {this.target = SpringContextHolder.getBean(beanName);this.params = params;if (StringUtils.isNotBlank(params)) {this.method = target.getClass().getDeclaredMethod(methodName, String.class);} else {this.method = target.getClass().getDeclaredMethod(methodName);}}@Override@SuppressWarnings("all")public Object call() throws Exception {ReflectionUtils.makeAccessible(method);if (StringUtils.isNotBlank(params)) {method.invoke(target, params);} else {method.invoke(target);}return null;}
}
mapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.njry.modules.quartz.mapper.QuartzJobMapper"><resultMap id="BaseResultMap" type="com.njry.modules.quartz.domain.QuartzJob"><id column="job_id" property="id" jdbcType="BIGINT"/><result column="job_name" property="jobName" jdbcType="VARCHAR"/><result column="bean_name" property="beanName" jdbcType="VARCHAR"/><result column="method_name" property="methodName" jdbcType="VARCHAR"/><result column="params" property="params" jdbcType="VARCHAR"/><result column="cron_expression" property="cronExpression" jdbcType="VARCHAR"/><result column="is_pause" property="isPause" jdbcType="TINYINT"/><result column="person_in_charge" property="personInCharge" jdbcType="VARCHAR"/><result column="email" property="email" jdbcType="VARCHAR"/><result column="sub_task" property="subTask" jdbcType="VARCHAR"/><result column="pause_after_failure" property="pauseAfterFailure" jdbcType="TINYINT"/><result column="description" property="description" jdbcType="VARCHAR"/><result column="create_time" property="createTime" jdbcType="TIMESTAMP"/><result column="update_time" property="updateTime" jdbcType="TIMESTAMP"/><result column="create_by" property="createBy" jdbcType="VARCHAR"/><result column="update_by" property="updateBy" jdbcType="VARCHAR"/></resultMap><sql id="Base_Column_List">job_id, job_name, bean_name, method_name, params, cron_expression, is_pause, person_in_charge, email, sub_task, pause_after_failure, description, create_time, update_time, create_by, update_by</sql><select id="findAll" resultMap="BaseResultMap">SELECT<include refid="Base_Column_List"/>FROM sys_quartz_job<where><if test="criteria.jobName != null and criteria.jobName != ''">AND job_name LIKE CONCAT('%'||#{criteria.jobName},'%')</if><if test="criteria.createTime != null and criteria.createTime.size() > 0">AND update_time BETWEEN #{criteria.createTime[0]} AND #{criteria.createTime[1]}</if></where>ORDER BY job_id DESC</select><select id="findByIsPauseIsFalse" resultMap="BaseResultMap">SELECT<include refid="Base_Column_List"/>FROM sys_quartz_jobWHERE is_pause = '0'</select><select id="getSeq" resultType="Long">select seq_sys_quartz_job.nextval user_user_id from dual</select>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.njry.modules.quartz.mapper.QuartzLogMapper"><resultMap id="BaseResultMap" type="com.njry.modules.quartz.domain.QuartzLog"><id column="log_id" property="id" jdbcType="BIGINT"/><result column="job_name" property="jobName" jdbcType="VARCHAR"/><result column="bean_name" property="beanName" jdbcType="VARCHAR"/><result column="method_name" property="methodName" jdbcType="VARCHAR"/><result column="params" property="params" jdbcType="VARCHAR"/><result column="cron_expression" property="cronExpression" jdbcType="VARCHAR"/><result column="is_success" property="isSuccess" jdbcType="VARCHAR"/><result column="exception_detail" property="exceptionDetail" jdbcType="BIGINT"/><result column="time" property="time" jdbcType="BIGINT"/><result column="create_time" property="createTime" jdbcType="TIMESTAMP"/></resultMap><sql id="Base_Column_List">log_id, job_name, bean_name, method_name, params, cron_expression, is_success, exception_detail, time, create_time</sql><select id="findAll" resultMap="BaseResultMap">SELECT<include refid="Base_Column_List"/>FROM sys_quartz_log<where><if test="criteria.jobName != null and criteria.jobName != ''">AND job_name LIKE CONCAT('%'||#{criteria.jobName},'%')</if><if test="criteria.isSuccess != null">AND is_success = #{criteria.isSuccess}</if><if test="criteria.createTime != null and criteria.createTime.size() > 0">AND create_time BETWEEN #{criteria.createTime[0]} AND #{criteria.createTime[1]}</if></where>ORDER BY log_id DESC</select><select id="getSeq" resultType="Long">select seq_sys_quartz_log.nextval user_user_id from dual</select>
</mapper>
看几个测试任务存入表里
3.项目代码用到的全局几个工具类
这个类目的:
获取全局上下文
/***/
package com.njry.utils;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;/*** @author* @date*/
@Slf4j
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {private static ApplicationContext applicationContext = null;private static final List<CallBack> CALL_BACKS = new ArrayList<>();private static boolean addCallback = true;/*** 针对 某些初始化方法,在SpringContextHolder 未初始化时 提交回调方法。* 在SpringContextHolder 初始化后,进行回调使用** @param callBack 回调函数*/public synchronized static void addCallBacks(CallBack callBack) {if (addCallback) {SpringContextHolder.CALL_BACKS.add(callBack);} else {log.warn("CallBack:{} 已无法添加!立即执行", callBack.getCallBackName());callBack.executor();}}/*** 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.*/@SuppressWarnings("unchecked")public static <T> T getBean(String name) {assertContextInjected();return (T) applicationContext.getBean(name);}/*** 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.*/public static <T> T getBean(Class<T> requiredType) {assertContextInjected();return applicationContext.getBean(requiredType);}/*** 获取SpringBoot 配置信息** @param property 属性key* @param defaultValue 默认值* @param requiredType 返回类型* @return /*/public static <T> T getProperties(String property, T defaultValue, Class<T> requiredType) {T result = defaultValue;try {result = getBean(Environment.class).getProperty(property, requiredType);} catch (Exception ignored) {}return result;}/*** 获取SpringBoot 配置信息** @param property 属性key* @return /*/public static String getProperties(String property) {return getProperties(property, null, String.class);}/*** 获取SpringBoot 配置信息** @param property 属性key* @param requiredType 返回类型* @return /*/public static <T> T getProperties(String property, Class<T> requiredType) {return getProperties(property, null, requiredType);}/*** 检查ApplicationContext不为空.*/private static void assertContextInjected() {if (applicationContext == null) {throw new IllegalStateException("applicaitonContext属性未注入, 请在applicationContext" +".xml中定义SpringContextHolder或在SpringBoot启动类中注册SpringContextHolder.");}}/*** 清除SpringContextHolder中的ApplicationContext为Null.*/private static void clearHolder() {log.debug("清除SpringContextHolder中的ApplicationContext:"+ applicationContext);applicationContext = null;}@Overridepublic void destroy() {SpringContextHolder.clearHolder();}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {if (SpringContextHolder.applicationContext != null) {log.warn("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext);}SpringContextHolder.applicationContext = applicationContext;if (addCallback) {for (CallBack callBack : SpringContextHolder.CALL_BACKS) {callBack.executor();}CALL_BACKS.clear();}SpringContextHolder.addCallback = false;}/*** 获取 @Service 的所有 bean 名称* @return /*/public static List<String> getAllServiceBeanName() {return new ArrayList<>(Arrays.asList(applicationContext.getBeanNamesForAnnotation(Service.class)));}
}
这个类目的:
自定义线程池
package com.njry.config.thread;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.task.DelegatingSecurityContextAsyncTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** 创建自定义的线程池* @author njry* @date 2024-05-11**/
@Configuration
public class CustomExecutorConfig {/*** 自定义线程池,用法 @Async* @return Executor*/@Bean@Primarypublic Executor elAsync() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(AsyncTaskProperties.corePoolSize);executor.setMaxPoolSize(AsyncTaskProperties.maxPoolSize);executor.setQueueCapacity(AsyncTaskProperties.queueCapacity);executor.setThreadNamePrefix("el-async-");executor.setKeepAliveSeconds(AsyncTaskProperties.keepAliveSeconds);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}/*** 自定义线程池,用法 @Async* @return Executor*/@Bean@Primarypublic Executor elAsyncAgain() {SecurityContext context = SecurityContextHolder.getContext();ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(AsyncTaskProperties.corePoolSize);executor.setMaxPoolSize(AsyncTaskProperties.maxPoolSize);executor.setQueueCapacity(AsyncTaskProperties.queueCapacity);executor.setThreadNamePrefix("el-async-");executor.setKeepAliveSeconds(AsyncTaskProperties.keepAliveSeconds);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();
// Constructor threw exception; nested exception is java.lang.ClassCastException:
// org.springframework.security.task.DelegatingSecurityContextAsyncTaskExecutor cannot be cast to
// org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
// 这里不能类型转换DelegatingSecurityContextAsyncTaskExecutor delegatingSecurityContextAsyncTaskExecutor = new DelegatingSecurityContextAsyncTaskExecutor(executor,context);return delegatingSecurityContextAsyncTaskExecutor;}/*** 自定义线程池,用法 @Async("otherAsync")* @return Executor*/@Beanpublic Executor otherAsync() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(15);executor.setQueueCapacity(50);executor.setKeepAliveSeconds(AsyncTaskProperties.keepAliveSeconds);executor.setThreadNamePrefix("el-task-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}