quartz 2.2.x 源码学习 基本执行流程分析
quartz 官网中给出了一些基本概念,请先阅读官网相关概念。
http://www.quartz-scheduler.org/documentation/quartz-2.2.x/tutorials/
下面对最简单的一个任务调度工作进行分析,下面的代码每隔三秒中不断重复执行任务SimpleJob。
public class JobStart {
public static void main(String[] args) throws SchedulerException {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
sched.clear();
String instanceId = sched.getSchedulerInstanceId();
JobDetail detail = JobBuilder.newJob(SimpleJob.class)
.withIdentity("Ins Id " + instanceId, instanceId)
.requestRecovery()
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("Ins Id " + instanceId,instanceId)
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();
sched.scheduleJob(detail, trigger);
sched.start();
}
}
@DisallowConcurrentExecution
public class SimpleJob implements Job{
private static Logger _log = LoggerFactory.getLogger(SimpleJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
if(context.isRecovering()){
_log.info("is recovering");
}else{
_log.info("not recovering");
}
try {
_log.info("do time consumed job start");
Thread.sleep(5000);
_log.info("do time consumed job end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
quartz 执行任务可以总结为创建JobDetail描述待执行任务,创建Trigger描述什么时候去触发任务。最后将JobDetail与Trigger注册在调度器中,调度器启动后遍开始对任务调度。
因此了解其基本过程需要明白 创建调度器完成哪些工作?调度工作什么时候开始?任务如何被执行?
示例代码中,首先获取一个调度器。
Scheduler sched = sf.getScheduler();
1
进入该方法内部
org.quartz.impl.StdSchedulerFactory.getScheduler()
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
sched = instantiate();
return sched;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
获取调度器对象时,首先查找缓存,如果有就直接拿,若不存在则调用instantiate()方法来创建调度器对象。
进入instantiate方法内部,定义了许多变量。
org.quartz.impl.StdSchedulerFactory.instantiate()
private Scheduler instantiate() throws SchedulerException {
if (cfg == null) {
initialize();
}
if (initException != null) {
throw initException;
}
JobStore js = null;
ThreadPool tp = null;
QuartzScheduler qs = null;
DBConnectionManager dbMgr = null;
String instanceIdGeneratorClass = null;
Properties tProps = null;
......
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPool tp为线程池对象,在该方法内部可以看到该线程池的创建 是通过从属性文件quartz.properties中拿到org.quartz.threadPool所定义的线城池类来创建。本文使用的是 org.quartz.simpl.SimpleThreadPool。而最终任务通过线程池来管理。
public static final String PROP_THREAD_POOL_PREFIX = "org.quartz.threadPool";
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
if (tpClass == null) {
initException = new SchedulerException(
"ThreadPool class not specified. ");
throw initException;
}
try {
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("ThreadPool class '"
+ tpClass + "' could not be instantiated.", e);
throw initException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
后续代码中会创建QuartzScheduler对象,最终来启动调动线程。
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
1
在该构造器内部,会创建QuartzSchedulerThread对象,而QuartzSchedulerThread负责对我们定义的Job进行调度。当创建完后
提交给执行器对象schedThreadExecutor.execute(this.schedThread);准备执行。
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
当sf.getScheduler();执行完毕后,线程池,执行器,调度器,负责调度Job的线程都已创建完毕。而任务调度线程QuartzSchedulerThread也准备执行。
可以看到sf.getScheduler();调用完毕后,任务调度线程处于暂停状态,并不断检查状态等待恢复并执行任务。
org.quartz.core.QuartzSchedulerThread.run()
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
在 QuartzSchedulerThread 构造方法中会设置两个变量初始值。
paused = true;
halted = new AtomicBoolean(false);
1
2
当我们获取到调度器对象,并指定trigger与detail后,调用start方法。
当调用执行器的 org.quartz.core.QuartzScheduler.start()方法在该方法内部会调用方法 schedThread.togglePause(false);将paused变量设置为false,并唤醒QuartzSchedulerThread线程。随QuartzSchedulerThread线程开始执行调度任务。
org.quartz.core.QuartzScheduler.start()
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
至此任务调度工作准备开始,Job等待被调用。
QuartzSchedulerThread的run方法中检测到变量状态被修改,开始调度工作。
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
... 省略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
run方法中的执行过程可粗略归纳为
1.查找将要触发的Trigger
2.检测条件,等待执行
3.执行任务。
4.释放Trigger
在执行时,任务被包装为JobRunShell来运行。
当qsRsrcs.getThreadPool().runInThread(shell)被调用时,我们自己的Job被提交,并等待线程池调度执行。
JobRunShell shell = null;
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
在 qsRsrcs.getThreadPool().runInThread(shell)方法中会创建一个线程来运行我们定义的Job,而Job的信息则是通过JobDetail来获取。经过上述过程就形成了quartz 进行任务调度的基本过程。
最后总结,启动时的三个主要方法,及工作概述如下。
Scheduler sched = sf.getScheduler();
完成了线程池创建,调度线程创建,调度线程初始处理暂停状态。
sched.scheduleJob(detail, trigger);
指定JobDetail与Trigger。
sched.start();
修改状态未,使调度线程开始运行。任务以JobRunShell的形式被执行。
工艺流程图图