数据库准备
准备一个MySQL数据库,版本为8.0,然后创建一个库,并从quartz官方的版本包中找到名称为tables_mysql_innodb.sql
的脚本执行进去(脚本内容文后也有提供)。
项目依赖说明
创建一个Maven项目,引入以下依赖
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.1</version>
</dependency>
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz-jobs</artifactId><version>2.2.1</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.16</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.17</version>
</dependency>
Quartz简单demo
首先创建一个Job,在Quartz当中通过实现Job来执行业务逻辑
package org.quartz.myexample;import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;import java.time.LocalDateTime;public class HelloJob implements Job {/*** 传递给 execute() 方法的 JobExecutionContext 对象中保存着该 job 运行时的一些信息 ,执行 job 的 scheduler 的引用,* 触发 job 的 trigger 的引用,JobDetail 对象引用,以及一些其它信息。*/@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {Object tv1 = context.getTrigger().getJobDataMap().get("t1");Object tv2 = context.getTrigger().getJobDataMap().get("t2");Object jv1 = context.getJobDetail().getJobDataMap().get("j1");Object jv2 = context.getJobDetail().getJobDataMap().get("j2");Object sv = null;try {sv = context.getScheduler().getContext().get("skey");} catch (SchedulerException e) {e.printStackTrace();}System.out.println(tv1 + ":" + tv2);System.out.println(jv1 + ":" + jv2);System.out.println(sv);System.out.println("hello:" + LocalDateTime.now());}
}
然后,获取Scheduler实例,创建一个Trigger和JobDetail,并使用scheduler进行调度,最后启动Scheduler,分别为下面源码中的第一、二、三、四步骤。
package org.quartz.myexample;import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;import java.util.Date;/*** -Dorg.quartz.properties=D:\Tools\activtiDemo\src\main\resources\quartz.properties*/
public class QuartzTest {public static void main(String[] args) {try {// 第一步// Grab the scheduler instance from the FactoryScheduler scheduler = StdSchedulerFactory.getDefaultScheduler();scheduler.getContext().put("skey", "svalue");// 第二步// Trigger the job to run now, and then repeat every 40 secondsTrigger trigger = TriggerBuilder.newTrigger().withIdentity("mytrigger", "group1").usingJobData("t1", "tv1")
// .startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();trigger.getJobDataMap().put("t2", "tv2");// define the job and tie it to our HelloJob classJobDetail job = JobBuilder.newJob(HelloJob.class).usingJobData("j1", "jv1").withIdentity("myjob", "group1").build();job.getJobDataMap().put("j2", "jv2");scheduler.deleteJob(job.getKey());// 第三步// Tell quartz to schedule the job using our trigger/*** 为什么既有 Job,又有 Trigger 呢?很多任务调度器并不区分 Job 和 Trigger。有些调度器只是简单地通过一个执行时间* 和一些 job 标识符来定义一个 Job;其它的一些调度器将 Quartz 的 Job 和 Trigger 对象合二为一。在开发 Quartz 的时候,* 我们认为将调度和要调度的任务分离是合理的。在我们看来,这可以带来很多好处。** 例如,Job 被创建后,可以保存在 Scheduler 中,与 Trigger 是独立的,同一个 Job可以有多个 Trigger;* 这种松耦合的另一个好处是,当与 Scheduler 中的 Job 关联的 trigger 都过期时,可以配置 Job 稍后被重新调度,* 而不用重新定义 Job;还有,可以修改或者替换 Trigger,而不用重新定义与之关联的 Job。*/scheduler.scheduleJob(job, trigger);// and start it// 第四步scheduler.start();Thread.sleep(Integer.MAX_VALUE);scheduler.shutdown();} catch (SchedulerException | InterruptedException e) {e.printStackTrace();}}
程序配置
编辑一个quartz.properties文件,内容如下
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
### 此调度程序的名称
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.skipUpdateCheck=trueorg.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = myDS
org.quartz.dataSource.myDS.connectionProvider.class:com.alibaba.druid.support.quartz.DruidQuartzConnectionProvider
org.quartz.dataSource.myDS.driverClassName = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.myDS.url = jdbc:mysql://localhost:3306/quartz?characterEncoding=utf-8
org.quartz.dataSource.myDS.username = root
org.quartz.dataSource.myDS.password = root
org.quartz.dataSource.myDS.maxActive = 5#============================================================================
# Other Example Delegates
#============================================================================
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DB2v6Delegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DB2v7Delegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DriverDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.HSQLDBDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.MSSQLDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PointbaseDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.WebLogicDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.OracleDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.WebLogicOracleDelegate
程序执行
执行QuartzTest时,需要指定启动参数读取配置文件,比如
-Dorg.quartz.properties=D:\Tools\activtiDemo\src\main\resources\quartz.properties
源码分析
1. 启动流程
通过StdSchedulerFactory#getDefaultScheduler获取一个调度器对象。在此过程中会执行org.quartz.impl.StdSchedulerFactory#instantiate()
方法完成初始化操作。
此过程中会创建用于任务执行的工作线程
// Get ThreadPool Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());if (tpClass == null) {initException = new SchedulerException("ThreadPool class not specified. ");throw initException;
}try {tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {initException = new SchedulerException("ThreadPool class '"+ tpClass + "' could not be instantiated.", e);throw initException;
}
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
try {setBeanProps(tp, tProps);
} catch (Exception e) {initException = new SchedulerException("ThreadPool class '"+ tpClass + "' props could not be configured.", e);throw initException;
}
线程池实现类使用的默认值,而线程池大小设置为10,线程优先级为正常值(NORM_PRIORITY)。
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
以及创建JobStore对象
// Get JobStore Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,RAMJobStore.class.getName());if (jsClass == null) {initException = new SchedulerException("JobStore class not specified. ");throw initException;
}try {js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {initException = new SchedulerException("JobStore class '" + jsClass+ "' could not be instantiated.", e);throw initException;
}SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try {setBeanProps(js, tProps);
} catch (Exception e) {initException = new SchedulerException("JobStore class '" + jsClass+ "' props could not be configured.", e);throw initException;
}
对应配置为
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = myDS
默认使用的是内存数据库,但我们这里需要使用外部数据库。
配置了以上参数之后,首先会构造一个JobStoreTX
对象,然后分别设置对象以下的属性。
protected Class<? extends DriverDelegate> delegateClass = StdJDBCDelegate.class;
protected String dsName;
protected String tablePrefix = DEFAULT_TABLE_PREFIX;
这里可以看出来,其实driverDelegateClass和tablePrefix是可以不用在配置文件中指定,默认值就是上面配置的值。整个Quartz的调度都离不开中间数据,这些中间数据都是靠JobStore
来保存的。JobStoreTX
指定使用数据库来保存数据,肯定就离不开数据源了。
初始化JobStore之后,就来创建数据源了
// Set up any DataSources
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
for (int i = 0; i < dsNames.length; i++) {PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);// custom connectionProvider...if(cpClass != null) {ConnectionProvider cp = null;try {cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();} catch (Exception e) {initException = new SchedulerException("ConnectionProvider class '" + cpClass+ "' could not be instantiated.", e);throw initException;}try {// remove the class name, so it isn't attempted to be setpp.getUnderlyingProperties().remove(PROP_CONNECTION_PROVIDER_CLASS);setBeanProps(cp, pp.getUnderlyingProperties());cp.initialize();} catch (Exception e) {initException = new SchedulerException("ConnectionProvider class '" + cpClass+ "' props could not be configured.", e);throw initException;}dbMgr = DBConnectionManager.getInstance();dbMgr.addConnectionProvider(dsNames[i], cp);} // ... 其他场景省略
这里相关的配置参数为
org.quartz.jobStore.dataSource = myDS
org.quartz.dataSource.myDS.connectionProvider.class:com.alibaba.druid.support.quartz.DruidQuartzConnectionProvider
org.quartz.dataSource.myDS.driverClassName = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.myDS.url = jdbc:mysql://191.168.1.60:3306/quartz?characterEncoding=utf-8
org.quartz.dataSource.myDS.username = tools_user
org.quartz.dataSource.myDS.password = xams_tools_20230714
org.quartz.dataSource.myDS.maxActive = 5
指定了一个名称为myDS数据源,类型为DruidQuartzConnectionProvider
,这个数据源是druid数据源专门为Quartz
准备的,从上面的源码看,自定义的数据源必须实现ConnectionProvider
接口,而DruidQuartzConnectionProvider正好实现了这个接口,并且继承了DruidDataSource
。
创建完这个连接提供者对象之后,就会交给DBConnectionManager
数据库连接管理器来管理。看起来这里跟JobStoreTX
没有任何关系,其实这种联系需要等到真正获取连接的时候,才会体现出来。JobStoreTX
每次获取连接的时候,会从连接管理器中通过DataSource名称来查找连接。
protected Connection getConnection() throws JobPersistenceException {Connection conn;try {conn = DBConnectionManager.getInstance().getConnection(getDataSource());
此时就会根据数据池名称(我们配置的为myDS)查找到对应的数据源对象获取连接了。
public Connection getConnection(String dsName) throws SQLException {ConnectionProvider provider = providers.get(dsName);if (provider == null) {throw new SQLException("There is no DataSource named '"+ dsName + "'");}return provider.getConnection();
}
所以以上这些配置最终就定义好了JobStore
要操作的数据库了。
2. 创建Job和Trigger并调度
通过构造者模式创建Job和Trigger对象然后调用scheduler.scheduleJob(job, trigger)
将Job绑定到Trigger,构造对象不涉及数据库操作,只是简单构造对象,这里不详细。一旦调用scheduleJob方法,在org.quartz.core.QuartzScheduler#scheduleJob(org.quartz.JobDetail, org.quartz.Trigger)
方法中,首先会针对Job和Trigger做各种属性检查,然后调用resources.getJobStore().storeJobAndTrigger(jobDetail, trig)
来持久化。对应的实现为
org.quartz.impl.jdbcjobstore.JobStoreSupport#storeJobAndTrigger
public void storeJobAndTrigger(final JobDetail newJob,final OperableTrigger newTrigger) throws JobPersistenceException {executeInLock((isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,new VoidTransactionCallback() {public void executeVoid(Connection conn) throws JobPersistenceException {storeJob(conn, newJob, false);storeTrigger(conn, newTrigger, newJob, false,Constants.STATE_WAITING, false, false);}});
}
这里会将storeJob和storeTrigger会放在一个事务当中进行操作,并且使用内存排他锁防止并发。保存Job和Tigger的操作都比较简单,无非是将结果保存到库当中,不过要注意的是,如果Job的JobKey
已经存在了,则会报错。
因为storeJob
中的第三个参数为false,指定不允许替换。最后执行org.quartz.impl.jdbcjobstore.StdJDBCDelegate#insertJobDetail
方法,将Job插入到数据库当中,对应源码如下
public int insertJobDetail(Connection conn, JobDetail job)throws IOException, SQLException {ByteArrayOutputStream baos = serializeJobData(job.getJobDataMap());PreparedStatement ps = null;int insertResult = 0;try {ps = conn.prepareStatement(rtp(INSERT_JOB_DETAIL));ps.setString(1, job.getKey().getName());ps.setString(2, job.getKey().getGroup());ps.setString(3, job.getDescription());ps.setString(4, job.getJobClass().getName());setBoolean(ps, 5, job.isDurable());setBoolean(ps, 6, job.isConcurrentExectionDisallowed());setBoolean(ps, 7, job.isPersistJobDataAfterExecution());setBoolean(ps, 8, job.requestsRecovery());setBytes(ps, 9, baos);insertResult = ps.executeUpdate();} finally {closeStatement(ps);}return insertResult;}
可以看出,这里使用的是标准的原生JDBC来操作数据库,这里的常量org.quartz.impl.jdbcjobstore.StdJDBCConstants#INSERT_JOB_DETAIL
定义了操作的数据库SQL。内容为
// Table namesString TABLE_JOB_DETAILS = "JOB_DETAILS";String INSERT_JOB_DETAIL = "INSERT INTO "+ TABLE_PREFIX_SUBST + TABLE_JOB_DETAILS + " (" + COL_SCHEDULER_NAME + ", " + COL_JOB_NAME+ ", " + COL_JOB_GROUP + ", " + COL_DESCRIPTION + ", "+ COL_JOB_CLASS + ", " + COL_IS_DURABLE + ", " + COL_IS_NONCONCURRENT + ", " + COL_IS_UPDATE_DATA + ", " + COL_REQUESTS_RECOVERY + ", "+ COL_JOB_DATAMAP + ") " + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";
操作的表为自定义表前缀加上JOB_DETAILS
,前缀为QRTZ_
,所以这里Job定义保存到了QRTZ_JOB_DETAILS
表当中。
StdJDBCConstants这个类中定义了Quartz当中所有操作数据库的SQL定义,很具有参考意义
保存Trigger同样不允许覆盖,另外内部会检查Job是否存在,这里传入的状态为WAITING
状态,代表任务处于等待的状态,当然在org.quartz.impl.jdbcjobstore.JobStoreSupport#storeTrigger
方法中会涉及到一些状态转换的情况,由于是第一次创建这里不涉及,所以最后保存的状态也是WAITING
,保存Trigger的表为QRTZ_TRIGGERS
,但Trigger通常没有Job简单,org.quartz.impl.jdbcjobstore.StdJDBCDelegate#insertTrigger
源码如下
try {ps = conn.prepareStatement(rtp(INSERT_TRIGGER));ps.setString(1, trigger.getKey().getName());ps.setString(2, trigger.getKey().getGroup());ps.setString(3, trigger.getJobKey().getName());ps.setString(4, trigger.getJobKey().getGroup());ps.setString(5, trigger.getDescription());if(trigger.getNextFireTime() != null)ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime())));elseps.setBigDecimal(6, null);long prevFireTime = -1;if (trigger.getPreviousFireTime() != null) {prevFireTime = trigger.getPreviousFireTime().getTime();}ps.setBigDecimal(7, new BigDecimal(String.valueOf(prevFireTime)));ps.setString(8, state);TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);String type = TTYPE_BLOB;if(tDel != null)type = tDel.getHandledTriggerTypeDiscriminator();ps.setString(9, type);ps.setBigDecimal(10, new BigDecimal(String.valueOf(trigger.getStartTime().getTime())));long endTime = 0;if (trigger.getEndTime() != null) {endTime = trigger.getEndTime().getTime();}ps.setBigDecimal(11, new BigDecimal(String.valueOf(endTime)));ps.setString(12, trigger.getCalendarName());ps.setInt(13, trigger.getMisfireInstruction());setBytes(ps, 14, baos);ps.setInt(15, trigger.getPriority());insertResult = ps.executeUpdate();if(tDel == null)insertBlobTrigger(conn, trigger);elsetDel.insertExtendedTriggerProperties(conn, trigger, state, jobDetail);} finally {closeStatement(ps);}
这里不但要计算下一次的触发事件nextFireTime,还需要通过TriggerPersistenceDelegate
来处理一些特殊Trigger
的额外属性。
protected void addDefaultTriggerPersistenceDelegates() {addTriggerPersistenceDelegate(new SimpleTriggerPersistenceDelegate());addTriggerPersistenceDelegate(new CronTriggerPersistenceDelegate());addTriggerPersistenceDelegate(new CalendarIntervalTriggerPersistenceDelegate());addTriggerPersistenceDelegate(new DailyTimeIntervalTriggerPersistenceDelegate());
}public TriggerPersistenceDelegate findTriggerPersistenceDelegate(OperableTrigger trigger) {for(TriggerPersistenceDelegate delegate: triggerPersistenceDelegates) {if(delegate.canHandleTriggerType(trigger))return delegate;}return null;
}
其实这里有四种特殊情况,还包括一种通用情况,从org.quartz.impl.jdbcjobstore.Constants
类中以上属性可以看出。
// TRIGGER TYPES
/** Simple Trigger type. */
String TTYPE_SIMPLE = "SIMPLE";/** Cron Trigger type. */
String TTYPE_CRON = "CRON";/** Calendar Interval Trigger type. */
String TTYPE_CAL_INT = "CAL_INT";/** Daily Time Interval Trigger type. */
String TTYPE_DAILY_TIME_INT = "DAILY_I";/** A general blob Trigger type. */
String TTYPE_BLOB = "BLOB";
比如org.quartz.impl.jdbcjobstore.CronTriggerPersistenceDelegate#canHandleTriggerType
的实现为
public boolean canHandleTriggerType(OperableTrigger trigger) {return ((trigger instanceof CronTriggerImpl) && !((CronTriggerImpl)trigger).hasAdditionalProperties());
}
CronTriggerImpl
是CronTrigger
的实现类,也属于一种Trigger
。如果当前传入的是这种类型的Trigger,会在保存数据到QRTZ_TRIGGERS
的同时,会调用CronTriggerPersistenceDelegate#insertExtendedTriggerProperties
方法将一些额外的参数保存到QRTZ_CRON_TRIGGERS
表当中。如下所示
CronTrigger cronTrigger = (CronTrigger)trigger;PreparedStatement ps = null;try {ps = conn.prepareStatement(Util.rtp(INSERT_CRON_TRIGGER, tablePrefix, schedNameLiteral));ps.setString(1, trigger.getKey().getName());ps.setString(2, trigger.getKey().getGroup());ps.setString(3, cronTrigger.getCronExpression());ps.setString(4, cronTrigger.getTimeZone().getID());return ps.executeUpdate();
} finally {Util.closeStatement(ps);
}
这其中就包含了Cron表达式以及时区编号。如果是最普通的Trigger,则将Trigger定义序列化保存到QRTZ_BLOB_TRIGGERS
表当中。
public int insertBlobTrigger(Connection conn, OperableTrigger trigger)throws SQLException, IOException {PreparedStatement ps = null;ByteArrayOutputStream os = null;try {// update the blobos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(os);oos.writeObject(trigger);oos.close();byte[] buf = os.toByteArray();ByteArrayInputStream is = new ByteArrayInputStream(buf);ps = conn.prepareStatement(rtp(INSERT_BLOB_TRIGGER));ps.setString(1, trigger.getKey().getName());ps.setString(2, trigger.getKey().getGroup());ps.setBinaryStream(3, is, buf.length);return ps.executeUpdate();} finally {closeStatement(ps);}
}
总结一下
Trigger Type | TriggerPersistenceDelegate | 操作表 |
---|---|---|
SIMPLE | SimpleTriggerPersistenceDelegate | QRTZ_SIMPLE_TRIGGERS |
CRON | CronTriggerPersistenceDelegate | QRTZ_CRON_TRIGGERS |
CAL_INT | CalendarIntervalTriggerPersistenceDelegate | QRTZ_SIMPROP_TRIGGERS |
DAILY_I | DailyTimeIntervalTriggerPersistenceDelegate | QRTZ_SIMPROP_TRIGGERS |
BLOB |
在当前案例中,通过以上操作之后,最终涉及三张表,
SELECT * FROM QRTZ_JOB_DETAILS;
SELECT * FROM QRTZ_TRIGGERS;
SELECT * FROM QRTZ_SIMPLE_TRIGGERS;
数据库中的数据情况如下
3. 启动任务调度器
通过scheduler.start()
操作会真实启动调度器,前面无论什么操作,创建的任务并不会真实执行,因为调度线程schedThread(org.quartz.core.QuartzScheduler#schedThread
)的paused
属性一直还是true,所以这个线程并不能真正进入任务的调度。当然scheduler.start()
并不仅仅只是将这个状态修改了,而且针对数据库中的任务调度数据进行了初始化。源码如下:
if (initialStart == null) {initialStart = new Date();this.resources.getJobStore().schedulerStarted(); startPlugins();
} else {resources.getJobStore().schedulerResumed();
}schedThread.togglePause(false);
可以看到,这里首先是调用了org.quartz.spi.JobStore#schedulerStarted
,然后开启了插件,再触发了paused
状态改变。
org.quartz.impl.jdbcjobstore.JobStoreSupport#schedulerStarted
中主要是通知JobStore做一些调度开始前的集群初始化或者恢复任务初始化状态,然后启动一个MisfireHandler线程来处理错过触发的任务。默认情况下,Quartz都是单机的,需要配置以下的参数才会是集群模式,所以这里不深入讲解。另外关于任务错过触发以及处理的逻辑这里也不是重点,先不介绍。
org.quartz.jobStore.isClustered=true
这里介绍一下任务恢复操作,对应的源码为org.quartz.impl.jdbcjobstore.JobStoreSupport#recoverJobs()
。
/*** Recover any failed or misfired jobs and clean up the data store as* appropriate.* * @throws JobPersistenceException if jobs could not be recovered*/
protected void recoverJobs() throws JobPersistenceException {executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,new VoidTransactionCallback() {public void executeVoid(Connection conn) throws JobPersistenceException {recoverJobs(conn);}}, null);
}
整个恢复工作还是比较复杂的。主要包含以下部分
- 将
QRTZ_TRIGGERS
表中BLOCKED
和ACQUIRED
状态恢复为WAITING
状态,PAUSED_BLOCKED
和PAUSED_BLOCKED
状态修改为PAUSED
。
// update inconsistent job states
int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);rows += getDelegate().updateTriggerStatesFromOtherStates(conn,STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
- 处理错过触发的任务,主要是从
QRTZ_TRIGGERS
表中查找错过触发的trigger,然后根据触发器的misfireInstruction属性重新计算下一次触发事件,然后更新数据库的过程。这里和上面谈及的MisfireHandler
线程的操作基本一致,只是调用的recoverMisfiredJobs
方法中的recovering
参数一个为false,一个为true的差别。此处也不详述。
// clean up misfired jobs
recoverMisfiredJobs(conn, true);
- 查找
QRTZ_FIRED_TRIGGERS
表中REQUESTS_RECOVERY
字段为1(true)的数据,并重新计算下一次触发时间,然后新增一条Trigger数据插入到QRTZ_TRIGGERS
表中。
// recover jobs marked for recovery that were not fully executed
List<OperableTrigger> recoveringJobTriggers = getDelegate().selectTriggersForRecoveringJobs(conn);
getLog().info("Recovering "+ recoveringJobTriggers.size()+ " jobs that were in-progress at the time of the last shut-down.");for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) {if (jobExists(conn, recoveringJobTrigger.getJobKey())) {recoveringJobTrigger.computeFirstFireTime(null);storeTrigger(conn, recoveringJobTrigger, null, false,STATE_WAITING, false, true);}
}
getLog().info("Recovery complete.");
- 删除已完成的触发器
从QRTZ_TRIGGERS表中查询状态为COMPLETE
的触发器,然后将其删除。如果触发器对应的任务是非持久的(isDurable
属性为false),任务也会被删除掉。
// remove lingering 'complete' triggers...
List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
for(TriggerKey ct: cts) {removeTrigger(conn, ct);
}
getLog().info("Removed " + cts.size() + " 'complete' triggers.");// clean up any fired trigger entries
int n = getDelegate().deleteFiredTriggers(conn);
总结一下:恢复工作就是将未完成的工作重新计算触发时间,并修改状态为WAITING
状态,对于已经完成的工作,则删除触发器,甚至是任务。
4. 任务调度
在QuartzSchedulerThread
线程启动之后,run方法中会一直处于等待状态,直到在上一步中paused
被设置为false。
while (paused && !halted.get()) {try {// wait until togglePause(false) is called...sigLock.wait(1000L);} catch (InterruptedException ignore) {}}
接下来只要工作线程池有可用线程时,就会进入任务查询和任务执行之中。
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
整体分为三个部分
- 查询下一波待触发的Triggers
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
可以看到这里查询的主要逻辑在org.quartz.spi.JobStore#acquireNextTriggers
方法中,这个方法涉及三个参数,分别为
- noLaterThan:满足条件的Trigger的下一次触发时间的最大值,因为任务肯定需要在被执行之前被查询出来,所以这个值必须大于零,要不然没有意义。默认值为30000+当前时间戳,也就是查询未来30s内要执行的Trigger,用户可以通过参数
org.quartz.scheduler.idleWaitTime
来指定。 - maxCount:本次查询的最大数量,这个值等于工作线程的可用数量,因为结果列表大于可用工作线程的数量,还是需要等待的
- timeWindow:时间窗口,其实是对noLaterThan的一个补充,默认值为0,可以通过参数
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
来配置。真正查询的时候,会将noLaterThan + timeWindow
作为DriverDelegate#selectTriggerToAcquire(java.sql.Connection, long, long, int)
中noLaterThan参数的值。 - noEarlierThan:这个参数倒不是
acquireNextTriggers
中的参数,而是DriverDelegate#selectTriggerToAcquire
的参数,和noLaterThan一样,也是用于限制Trigger的条件,只不过noLaterThan用于限制最大值,而noEarlierThan限制最小值。这个值默认为60000L
。如下图所示
private long misfireThreshold = 60000L; // one minutepublic long getMisfireThreshold() {return misfireThreshold;
}/*** The the number of milliseconds by which a trigger must have missed its* next-fire-time, in order for it to be considered "misfired" and thus* have its misfire instruction applied.* * @param misfireThreshold the misfire threshold to use, in millis*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
public void setMisfireThreshold(long misfireThreshold) {if (misfireThreshold < 1) {throw new IllegalArgumentException("Misfirethreshold must be larger than 0");}this.misfireThreshold = misfireThreshold;
}protected long getMisfireTime() {long misfireTime = System.currentTimeMillis();if (getMisfireThreshold() > 0) {misfireTime -= getMisfireThreshold();}return (misfireTime > 0) ? misfireTime : 0;
}
可以通过参数org.quartz.jobStore.misfireThreshold
来配置,但是不能小于1。真正的查询方法如下
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
对应的SQL语句为
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
其中{1}是调度器的实例名字DefaultQuartzScheduler
,通过参数org.quartz.scheduler.instanceName
指定的。默认值为QuartzScheduler
。而TRIGGER_STATE
是在方法中写死的。如下所示
ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));// Set max rows to retrieve
if (maxCount < 1)maxCount = 1; // we want at least one trigger back.
ps.setMaxRows(maxCount);// Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
// Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
ps.setFetchSize(maxCount);ps.setString(1, STATE_WAITING);
ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
rs = ps.executeQuery();
所以以上的查询语句最后类似于
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY
FROM QRTZ_TRIGGERS
WHERE SCHED_NAME = 'DefaultQuartzScheduler'
AND TRIGGER_STATE = 'WAITING'
AND NEXT_FIRE_TIME <= #{noLaterThan}
AND ( MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= #{noEarlierThan}))
ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
这里MISFIRE_INSTR为-1则代表当任务错过触发之后会被立即触发,对应以下常量。
/*** Instructs the <code>{@link Scheduler}</code> that the * <code>Trigger</code> will never be evaluated for a misfire situation, * and that the scheduler will simply try to fire it as soon as it can, * and then update the Trigger as if it had fired at the proper time. * * <p>NOTE: if a trigger uses this instruction, and it has missed * several of its scheduled firings, then several rapid firings may occur * as the trigger attempt to catch back up to where it would have been. * For example, a SimpleTrigger that fires every 15 seconds which has * misfired for 5 minutes will fire 20 times once it gets the chance to * fire.</p>*/
public static final int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
对于非-1
的,需要额外的机制处理,这里限制在noEarlierThan
时间内的才算满足条件。
通过以上语句如果有值,则会再次验证触发器和Job是否存在。这里除了QRTZ_TRIGGERS
表之外,还有插入的时候涉及的相关表,比如QRTZ_SIMPLE_TRIGGERS
或是QRTZ_CRON_TRIGGERS
,取决于TRIGGER_TYPE
,前面总结过,这里顺带提一下。因为如果报错找不到,不一定是QRTZ_TRIGGERS
少了数据。如果以上都没有问题,则会执行以下操作。
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
这里首先会将QRTZ_TRIGGERS
表中的数据状态从WAITING
更改为ACQUIRED
,注意这里其实是一个CAS操作,如果失败了,任务不会被触发(continue操作了)。修改成功,则会往QRTZ_FIRED_TRIGGERS
表中插入一条数据,状态为ACQUIRED
。
UPDATE QRTZ_TRIGGERS
SET TRIGGER_STATE = 'ACQUIRED'
WHERE SCHED_NAME = 'DefaultQuartzScheduler'
AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
AND TRIGGER_STATE = 'WAITING'
INSERT INTO QRTZ_FIRED_TRIGGERS
(SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES
('DefaultQuartzScheduler', ?, ?, ?, ?, ?, ?, 'ACQUIRED', ?, ?, ?, ?, ?)
此时数据库中相关数据如下图
- 修改Trigger状态为executing
源码对应为
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
最终调用org.quartz.impl.jdbcjobstore.JobStoreSupport#triggerFired
,这里首先会检查Trigger的状态是否为ACQUIRED
,主要是保证没有被并发修改。
// Make sure trigger wasn't deleted, paused, or completed...
try { // if trigger was deleted, state will be STATE_DELETEDString state = getDelegate().selectTriggerState(conn,trigger.getKey());if (!state.equals(STATE_ACQUIRED)) {return null;}
} catch (SQLException e) {throw new JobPersistenceException("Couldn't select trigger state: "+ e.getMessage(), e);
}
还要检查Job是否存在,通常不会有问题。
try {job = retrieveJob(conn, trigger.getJobKey());if (job == null) { return null; }
} catch (JobPersistenceException jpe) {try {getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);getDelegate().updateTriggerState(conn, trigger.getKey(),STATE_ERROR);} catch (SQLException sqle) {getLog().error("Unable to set trigger state to ERROR.", sqle);}throw jpe;
}
修改QRTZ_FIRED_TRIGGERS
表中对应数据的状态为EXECUTING
(原来的状态为ACQUIRED
)。
try {getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);} catch (SQLException e) {throw new JobPersistenceException("Couldn't insert fired trigger: "+ e.getMessage(), e);}
还会根据任务是否支持并发、Trigger是否还有下一次触发时间(比如一次性触发不会再次触发),会修改Trigger的状态以及下一次触发时间。
// call triggered - to update the trigger's next-fire-time state...
trigger.triggered(cal);String state = STATE_WAITING;
boolean force = true;if (job.isConcurrentExectionDisallowed()) {state = STATE_BLOCKED;force = false;
// ... 并发情况省略
} if (trigger.getNextFireTime() == null) {state = STATE_COMPLETE;force = true;
}storeTrigger(conn, trigger, job, true, state, force, false);
本案例中会将Trigger的状态改为WAITING
,并修改下一次触发时间。最后QRTZ_FIRED_TRIGGERS
表中的状态为EXECUTING
,QRTZ_TRIGGERS
中除了下一次触发时间,其他不变。
- 将任务交给工作线程执行
JobRunShell shell = null;try {shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// this case should never happen, as it is indicative of the// scheduler being shutdown or a bug in the thread pool or// a thread pool being used concurrently - which the docs// say not to do...getLog().error("ThreadPool.runInThread() return false!");qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);}
}
这里会将触发器包装到一个JobRunShell
任务里面,这里JobRunShell
具体类型取决于JobRunShellFactory
类型,而这个工厂的类型在org.quartz.impl.StdSchedulerFactory#initialize()
方法中相关源码如下所示
boolean wrapJobInTx = false;
wrapJobInTx = cfg.getBooleanProperty(PROP_SCHED_WRAP_JOB_IN_USER_TX,wrapJobInTx);
// Fire everything up
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
try {JobRunShellFactory jrsf = null; // Create correct run-shell factory...if (userTXLocation != null) {UserTransactionHelper.setUserTxLocation(userTXLocation);}if (wrapJobInTx) {jrsf = new JTAJobRunShellFactory();} else {jrsf = new JTAAnnotationAwareJobRunShellFactory();}
// ... 其他省略
默认值为JTAAnnotationAwareJobRunShellFactory
,如果参数org.quartz.scheduler.wrapJobExecutionInUserTransaction
配置为true,则使用JTAJobRunShellFactory
。
这里会调用JTAAnnotationAwareJobRunShellFactory#createJobRunShell
创建一个JobRunShell
对象。源码如下
/*** <p>* Called by the <class>{@link org.quartz.core.QuartzSchedulerThread}* </code> to obtain instances of <code>* {@link org.quartz.core.JobRunShell}</code>.* </p>*/
public JobRunShell createJobRunShell(TriggerFiredBundle bundle)throws SchedulerException {ExecuteInJTATransaction jtaAnnotation = ClassUtils.getAnnotation(bundle.getJobDetail().getJobClass(), ExecuteInJTATransaction.class);if(jtaAnnotation == null)return new JobRunShell(scheduler, bundle);else {int timeout = jtaAnnotation.timeout();if (timeout >= 0) {return new JTAJobRunShell(scheduler, bundle, timeout);} else {return new JTAJobRunShell(scheduler, bundle);}}
}
由于Job实现类上面没有注解,所以会创建一个基础的JobRunShell
对象。接下来会执行初始化,初始化主要是通过反射创建一个Job对象,也就是本案例中的HelloJob
实例。这里创建对象又用到了工厂模式,不同的工厂逻辑不同,这里org.quartz.spi.JobFactory
主要有以下三个实现类。
其中SimpleJobFactory
只会创建一个HelloJob
实例,而org.quartz.simpl.PropertySettingJobFactory
则会通过反射尝试为这个实例设置属性。本案例中HelloJob
并没有属性,所以用哪个关系不大。SpringBeanJobFactory
则是Spring提供的一个实现,使用了Spring的依赖注入来填充属性。默认实现为PropertySettingJobFactory
,可以通过org.quartz.impl.StdScheduler#setJobFactory
方法进行修改。
JobRunShell
本身是一个Runnable实现类,最终工作线程会调用run方法来执行具体的Job。
其实现如下
public void run() {qs.addInternalSchedulerListener(this);try {OperableTrigger trigger = (OperableTrigger) jec.getTrigger();JobDetail jobDetail = jec.getJobDetail();do {JobExecutionException jobExEx = null;Job job = jec.getJobInstance();try {begin();} catch (SchedulerException se) {// ... 异常处理逻辑}// notify job & trigger listeners...try {if (!notifyListenersBeginning(jec)) {break;}} catch(VetoedException ve) {// ... 异常处理逻辑}long startTime = System.currentTimeMillis();long endTime = startTime;// execute the jobtry {log.debug("Calling execute on job " + jobDetail.getKey());job.execute(jec);endTime = System.currentTimeMillis();} catch (JobExecutionException jee) {// ... 异常处理逻辑} catch (Throwable e) {// ... 异常处理逻辑}jec.setJobRunTime(endTime - startTime);// notify all job listenersif (!notifyJobListenersComplete(jec, jobExEx)) {break;}CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;// update the triggertry {instCode = trigger.executionComplete(jec, jobExEx);} catch (Exception e) {// If this happens, there's a bug in the trigger...SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e);qs.notifySchedulerListenersError("Please report this error to the Quartz developers.",se);}// notify all trigger listenersif (!notifyTriggerListenersComplete(jec, instCode)) {break;}// update job/trigger or re-execute jobif (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {jec.incrementRefireCount();try {complete(false);} catch (SchedulerException se) {qs.notifySchedulerListenersError("Error executing Job ("+ jec.getJobDetail().getKey()+ ": couldn't finalize execution.", se);}continue;}try {complete(true);} catch (SchedulerException se) {qs.notifySchedulerListenersError("Error executing Job ("+ jec.getJobDetail().getKey()+ ": couldn't finalize execution.", se);continue;}qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);break;} while (true);} finally {qs.removeInternalSchedulerListener(this);}
}
这里在真正执行任务的前后为子类保留了扩展(默认没有实现),如下所示
protected void begin() throws SchedulerException {
}protected void complete(boolean successfulExecution)throws SchedulerException {
}
另外还有一些监听器通知的操作,简化一下如下所示
begin();
notifyListenersBeginning(jec);
job.execute(jec); // 真正执行任务
notifyJobListenersComplete(jec, jobExEx);
CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
instCode = trigger.executionComplete(jec, jobExEx);
notifyTriggerListenersComplete(jec, instCode);
complete(true);
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
其中job.execute(jec)
会执行真实Job实现逻辑,而trigger.executionComplete
会决定instCode
结果值,最后在qs.notifyJobStoreJobComplete
中会执行org.quartz.spi.JobStore#triggeredJobComplete
方法,会根据instCode
的值更新QRTZ_TRIGGERS
表中的状态值,
- SET_TRIGGER_COMPLETE 修改状态为COMPLETE
- SET_TRIGGER_ERROR 修改状态为ERROR
- SET_ALL_JOB_TRIGGERS_COMPLETE 修改状态为COMPLETE
- SET_ALL_JOB_TRIGGERS_ERROR 修改状态为 ERROR
- NOOP 不做修改 状态保持为
WAITING
并最终删除QRTZ_FIRED_TRIGGERS
表中的值。结果如下
总结一下触发器状态的变化如下表所示
表名 | scheduleJob | acquireNextTriggers | triggersFired | triggeredJobComplete |
---|---|---|---|---|
QRTZ_TRIGGERS | WAITING | (CAS)ACQUIRED | WAITING | WAITING |
QRTZ_FIRED_TRIGGERS | ACQUIRED | EXECUTING |
上面在
acquireNextTriggers
时修改QRTZ_TRIGGERS
状态使用的CAS操作防止并发操作。这里修改失败,也不会有后续操作。
附录一
tables_mysql_innodb.sql
脚本内容
#
# In your Quartz properties file, you'll need to set
# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#
#
# By: Ron Cordell - roncordell
# I didn't see this anywhere, so I thought I'd post it here. This is the script from Quartz to create the tables in a MySQL database, modified to use INNODB instead of MYISAM.DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;CREATE TABLE QRTZ_SIMPROP_TRIGGERS( SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,STR_PROP_1 VARCHAR(512) NULL,STR_PROP_2 VARCHAR(512) NULL,STR_PROP_3 VARCHAR(512) NULL,INT_PROP_1 INT NULL,INT_PROP_2 INT NULL,LONG_PROP_1 BIGINT NULL,LONG_PROP_2 BIGINT NULL,DEC_PROP_1 NUMERIC(13,4) NULL,DEC_PROP_2 NUMERIC(13,4) NULL,BOOL_PROP_1 VARCHAR(1) NULL,BOOL_PROP_2 VARCHAR(1) NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(200) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
ENGINE=InnoDB;CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID))
ENGINE=InnoDB;CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
ENGINE=InnoDB;CREATE TABLE QRTZ_LOCKS (
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME))
ENGINE=InnoDB;CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);commit;