通过源码分析Java开源任务调度框架Quartz的主要流程

通过源码分析Java开源任务调度框架Quartz的主要流程

从使用效果、调用链路跟踪、E-R图、循环调度逻辑几个方面分析Quartz。

github项目地址: https://github.com/tanliwei/spring-quartz-cluster-sample , 补充了SQL输出

系统说明:

IDE: IntelliJ

JDK:1.8

Quartz:2.2.1

使用效果

1.从github项目https://github.com/tanliwei/spring-quartz-cluster-sample中,拉取项目到本地,导入IDEA。

相信读者都有一定工作经验,这些细节不赘述。

2.本文采用Mysql数据库。

请执行 resources/scripts/tables_mysql_innodb.sql

3.修改jdbc.properties中数据库配置

4.通过IDEA, Edit Configurations -> Add Tomcat Server, 部署到Tomcat

通过源码分析Java开源任务调度框架Quartz的主要流程

暴露的Restful 接口 /say-hello.do 以及添加好任务后的调用效果:

通过源码分析Java开源任务调度框架Quartz的主要流程

添加任务

在tomcat启动成功后,在首页点击“添加任务”,添加如下任务:

通过源码分析Java开源任务调度框架Quartz的主要流程

代码执行逻辑在SyncJobFactory类中,从Output中可以看到执行的输出信息,

调用链跟踪的最后会回到这个类来。

通过源码分析Java开源任务调度框架Quartz的主要流程

现在开始跟踪调用链路。

IDEA 快捷键:

进入方法: Ctrl + 鼠标左键

光标前进/后退: Ctrl + Shirt + 右方向键/左方向键

一、 调用链路跟踪

从配置文件applicationContext.xml配置中找到任务调度核心类SchedulerFactoryBean

resources/applicationContext.xml

<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
...
</bean>

使用IDEA快捷键,点击进入SchedulerFactoryBean类,它实现了InitializingBean接口,

在Spring中凡是实现了InitializingBean接口的Bean,都会在Bean属性都设置完成后调用afterPropertiesSet()方法.

SchedulerFactoryBean.java

//---------------------------------------------------------------------
// Implementation of InitializingBean interface
// 实现 InitializingBean 接口
//---------------------------------------------------------------------
public void afterPropertiesSet() throws Exception {
 //...
 // Create SchedulerFactory instance.
 // 创建 SchedulerFactory 调度器工厂实例
 SchedulerFactory schedulerFactory = (SchedulerFactory)
 BeanUtils.instantiateClass(this.schedulerFactoryClass);
 initSchedulerFactory(schedulerFactory);
 //...
 // Get Scheduler instance from SchedulerFactory.
 // 通过调度器工厂 获取 调度器实例
 try {
 this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
 //...
}

SchedulerFactoryBean.java

/**
 * Create the Scheduler instance for the given factory and scheduler name.
 * 通过制定工厂和调度器名称创建调度器实例
 * Called by {@link #afterPropertiesSet}.
 * <p>The default implementation invokes SchedulerFactory's <code>getScheduler</code>
 * method. Can be overridden for custom Scheduler creation.
 */
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
 throws SchedulerException {
 //...
 try {
 SchedulerRepository repository = SchedulerRepository.getInstance();
 synchronized (repository) {
 Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
 Scheduler newScheduler = schedulerFactory.getScheduler();
 if (newScheduler == existingScheduler) {
 throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
 "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
 }
 //...
}

这个项目走的逻辑是 StdSchedulerFactory.getScheduler()方法,可自行debug。

StdSchedulerFactory.java

/**
 * Returns a handle to the Scheduler produced by this factory.
 * 返回该工厂创造的调度器的句柄
 */
public Scheduler getScheduler() throws SchedulerException {
 if (cfg == null) {
 initialize();
 }
 SchedulerRepository schedRep = SchedulerRepository.getInstance();
 Scheduler sched = schedRep.lookup(getSchedulerName());
 //...
 sched = instantiate();
 return sched;
}

StdSchedulerFactory.java

private Scheduler instantiate() throws SchedulerException {
 //...
 //大量的配置初始化、实例化代码
 //...
 //第1298行代码
 qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
 //...
}

QuartzScheduler.java

/**
 * Create a <code>QuartzScheduler</code> with the given configuration
 * 根据给定的配置 创建Quartz调度器
 */
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
 throws SchedulerException {
 this.resources = resources;
 if (resources.getJobStore() instanceof JobListener) {
 addInternalJobListener((JobListener)resources.getJobStore());
 }
 //private QuartzSchedulerThread schedThread;
 this.schedThread = new QuartzSchedulerThread(this, resources);
 ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
 //通过线程池执行 Quartz调度器线程
 schedThreadExecutor.execute(this.schedThread);
 //...
}

QuartzSchedulerThread.java

/**
 * <p>
 * The main processing loop of the <code>QuartzSchedulerThread</code>.
 * Quartz调度器线程的主循环逻辑
 * </p>
 */
@Override
public void run() {
 //while循环执行,只要调度器为被暂停
 while(!halted.get()){
 JobRunShell shell = null;
 try {
 shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
 shell.initialize(qs);
 }
 if (qsRsrcs.getThreadPool().runInThread(shell) == false){}
 }
}

JobRunShell.java

public void run() {
 //...
 Job job = jec.getJobInstance();
 //...
 try {
 log.debug("Calling execute on job " + jobDetail.getKey());
 //执行
 job.execute(jec);
 endTime = System.currentTimeMillis();
 }
 //...
 //更新Trigger触发器状态,删除FIRED_TRIGGERS触发记录
 instCode = trigger.executionComplete(jec, jobExEx);
 //...
}

QuartzJobBean.java

/**
 * This implementation applies the passed-in job data map as bean property
 * values, and delegates to <code>executeInternal</code> afterwards.
 * 这个实现 把传入的map数据作为bean属性值,然后委托给 executeInternal 方法
 */
public final void execute(JobExecutionContext context) throws JobExecutionException {
 try {
 //执行
 executeInternal(context);
}

SyncJobFactory.java

//回到了我们的业务类SyncJobFactory的executeInternal方法,
//里面执行我们的业务代码
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
 try {
 LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort());
 }
 //...
 System.out.println("jobName:" + scheduleJob.getJobName() + " " + scheduleJob);
 //...
}

二、E-R图

梳理6张主要的Quartz表:

通过源码分析Java开源任务调度框架Quartz的主要流程

QRTZ_TRIGGERS 触发器表

SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_JOB_DETAILS表SCHED_NAME外键

JOB_NAME,任务名。自定义值。 联合主键,QRTZ_JOB_DETAILS表JOB_NAME外键

JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_JOB_DETAILS表JOB_GROUP外键

TRIGGER_STATE,触发器状态: WAITING , ACQUIRED, BLOCKING

NEXT_FIRE_TIME, 下次触发时间:

MISFIRE_INSTR,执行失败后的指令,

非失败策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;

失败策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0;

TRIGGER_TYPE, 触发器类型,例如CRON,cron表达式类型的触发器

PRIORITY,优先级

QRTZ_CRON_TRIGGERS cron类型触发器表

SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_TRIGGERS表SCHED_NAME外键

JOB_NAME,任务名。自定义值。 联合主键,QRTZ_TRIGGERS表JOB_NAME外键

JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_TRIGGERS表JOB_GROUP外键

CRON_EXPRESSION, cron表达式, 例如每30秒执行一次, 0/30 * * * * ?

QRTZ_JOB_DETAILS 任务详细表

SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

JOB_NAME,任务名。自定义值。 联合主键

JOB_GROUP,任务组。 自定义值。联合主键

JOB_DATA,blob类型,任务参数

QRTZ_FIRED_TRIGGERS 任务触发表

SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

ENTRY_ID,entry id,联合主键

JOB_NAME,任务名。自定义值。

JOB_GROUP,任务组。 自定义值。

FIRED_TIME, 任务触发时间

STATE,状态

INSTANCE_NAME, 服务器实例名

PRIORITY,优先级

QRTZ_SCHEDULER_STATE

SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

INSTANCE_NAME,服务器实例名。联合主键

LAST_CHECKIN_TIME,上次检查时间

CHECKIN_INTERVAL,检查间隔

QRTZ_LOCKS 全局锁

SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

LOCK_NAME,锁名称,例如,TRIGGER_ACCESS。联合主键

三、循环调度逻辑

主要流程如下:

通过源码分析Java开源任务调度框架Quartz的主要流程

源码如下:

QuartzSchedulerThread.java

public void run() {
 //...
 while (!halted.get()) {
 try {
 //合理休眠
 //...
 //获取接下来的触发器
 //1.状态为WAITING
 //2.触发时间在30秒内
 //3.不是错过执行的或者错过了但是时间不超过两分钟
 triggers = qsRsrcs.getJobStore().acquireNextTriggers(
 now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
 
 //... 
 //触发任务
 List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
 //...
 JobRunShell shell = null;
 //...
 //执行代码
 if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
 //...
 } // while (!halted)
 //..
 }

JobRunShell.java

protected QuartzScheduler qs = null;
 
 public void run() {
 qs.addInternalSchedulerListener(this);
 try {
 //...
 do {
 Job job = jec.getJobInstance();
 // execute the job
 try {
 //执行任务代码
 job.execute(jec);
 //更新触发器,删除触发记录
 qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
 break;
 } while (true);
 } 
 //...
 }

四、扩展

除了对主线程 QuartzSchedulerThread 的分析

继续分析JobStoreSupport类的两个线程 ClusterManager 和 MisfireHandler 的分析, 它们维护触发器的MISFIRE_INSTR状态,和调度器状态QRTZ_SCHEDULER_STATE。

相关推荐