Quartz集群原理分析

概述

quartz是一个用java实现的开源任务调度框架,可以用来创建简单或者复杂的任务调度,并且可以提供许多企业级的功能,比如JTA以及集群等,是当今比较流行的JAVA任务调度框架。

可以用来做什么

Quartz是一个任务调度框架,当遇到以下问题时:

  • 想在每月25号,自动还款;
  • 想每隔1小时,备份一下自己的各种资料。

那么总结起来就是,在一个有规律的时间点做一些事情,并且这个规律可以非常复杂,复杂到了需要一个框架来帮助我们。Quartz的出现就是为了解决这个问题,定义一个触发条件,那么其负责到了特定的时间点,触发相应的job干活。

特点

  • 强大的调度功能,例如丰富多样的调度方法,可以满足各种常规和特殊需求;
  • 灵活的应用方式,比如支持任务调度和任务的多种组合,支持数据的多种存储(DB,RAM等);
  • 支持分布式集群,在被Terracotta收购之后,在原来基础上进行了进一步的改造。

基本原理

1. 核心元素

Quartz核心要素有Scheduler、Trigger、Job、JobDetail,其中trigger和job、jobDetail为元数据,而Scheduler为实际进行调度的控制器。

  • Scheduler为调度器负责整个定时系统的调度,内部通过线程池进行调度
  • Trigger用于定义调度任务的时间规则,主要有四种类型:SimpleTrigger、CornTrigger、DateIntervalTrigger、NthIncludedDayTrigger。
  • JobDetail为定时任务的信息载体,可以记录Job的名字、组及任务执行的具体类和任务执行所需要的参数
  • Job为任务的真正执行体,承载着具体的业务逻辑。

元素之间的关系如下:

先由SchedulerFactory创建Scheduler调度器后,由调度器去调取即将执行的Trigger,执行时获取到对于的JobDetail信息,找到对应的Job类执行业务逻辑。

2. 核心元素间关系

3. 主要线程

  • 执行线程

    • 通常用一个线程池维护
  • 调度线程

    • Regular Scheduler Thread(执行常规调度)
      • 轮询Trigger,如果有将要触发的Trigger,则从任务线程池中获取一个空闲线程,然后执行与该Trigger关联的job;
    • Misfire Scheduler Thread(执行错失的任务)
      • 扫描所有的trigger,查看是否有错失的,如果有的话,根据一定的策略进行处理。

4.数据存储

Quartz中的trigger和job需要存储下来才能被使用。Quartz中有两种存储方式:RAMJobStore,JobStoreSupport,其中RAMJobStore是将trigger和job存储在内存中,而JobStoreSupport是基于jdbc将trigger和job存储到数据库中。RAMJobStore的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在集群应用中,必须使用JobStoreSupport。

Quartz的集群部署方案在架构上是分布式的,没有负责集中管理的节点,而是利用数据库锁的方式来实现集群环境下进行并发控制。BTW,分布式部署时需要保证各个节点的系统时间一致。

Quartz数据库核心表如下:

Table Name Description
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每一个已配置的Job的详细信息
QRTZ_JOB_LISTENERS 存储有关已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger作为Blob类型存储
QRTZ_TRIGGER_LISTENERS 存储已配置的TriggerListener的信息
QRTZ_TRIGGERS 存储已配置的Trigger的信息

QRTZ_SCHEDULER_STATE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mysql> show create table QRTZ_SCHEDULER_STATE\G
*************************** 1. row ***************************
Table: QRTZ_SCHEDULER_STATE
Create Table: CREATE TABLE `QRTZ_SCHEDULER_STATE` (
`SCHED_NAME` varchar(120) NOT NULL,
`INSTANCE_NAME` varchar(200) NOT NULL,
`LAST_CHECKIN_TIME` bigint(13) NOT NULL,
`CHECKIN_INTERVAL` bigint(13) NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`INSTANCE_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
1 row in set (0.03 sec)

mysql> select * from QRTZ_SCHEDULER_STATE limit 1 \G
*************************** 1. row ***************************
SCHED_NAME: division
INSTANCE_NAME: job-di-1.bigdata.lf.hw.lan1638777369231
LAST_CHECKIN_TIME: 1643007986388
CHECKIN_INTERVAL: 5000
1 row in set (0.03 sec)

QRTZ_JOB_DETAILS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
mysql> show create table QRTZ_JOB_DETAILS\G
*************************** 1. row ***************************
Table: QRTZ_JOB_DETAILS
Create Table: CREATE TABLE `QRTZ_JOB_DETAILS` (
`SCHED_NAME` varchar(120) NOT NULL,
`JOB_NAME` varchar(200) NOT NULL,
`JOB_GROUP` varchar(200) NOT NULL,
`DESCRIPTION` varchar(250) DEFAULT NULL,
`JOB_CLASS_NAME` varchar(250) NOT NULL,
`IS_DURABLE` varchar(1) NOT NULL,
`IS_NONCONCURRENT` varchar(1) NOT NULL,
`IS_UPDATE_DATA` varchar(1) NOT NULL,
`REQUESTS_RECOVERY` varchar(1) NOT NULL,
`JOB_DATA` mediumblob,
PRIMARY KEY (`SCHED_NAME`,`JOB_NAME`,`JOB_GROUP`),
KEY `IDX_QRTZ_J_REQ_RECOVERY` (`SCHED_NAME`,`REQUESTS_RECOVERY`),
KEY `IDX_QRTZ_J_GRP` (`SCHED_NAME`,`JOB_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

mysql> select * from QRTZ_JOB_DETAILS limit 1 \G
*************************** 1. row ***************************
SCHED_NAME: division
JOB_NAME: 12
JOB_GROUP: timing
DESCRIPTION: NULL
JOB_CLASS_NAME: com.sddi.quartz.SchedulerJob
IS_DURABLE: 0
IS_NONCONCURRENT: 0
IS_UPDATE_DATA: 0
REQUESTS_RECOVERY: 0
JOB_DATA: 0x

QRTZ_CRON_TRIGGERS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
mysql> show create table QRTZ_CRON_TRIGGERS\G;
*************************** 1. row ***************************
Table: QRTZ_CRON_TRIGGERS
Create Table: CREATE TABLE `QRTZ_CRON_TRIGGERS` (
`SCHED_NAME` varchar(120) NOT NULL,
`TRIGGER_NAME` varchar(200) NOT NULL,
`TRIGGER_GROUP` varchar(200) NOT NULL,
`CRON_EXPRESSION` varchar(120) NOT NULL,
`TIME_ZONE_ID` varchar(80) DEFAULT NULL,
PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
CONSTRAINT `qrtz_cron_triggers_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

mysql> select * from QRTZ_CRON_TRIGGERS limit 1\G
*************************** 1. row ***************************
SCHED_NAME: division
TRIGGER_NAME: 12
TRIGGER_GROUP: timing
CRON_EXPRESSION: 1 30 7 * * ? *
TIME_ZONE_ID: Asia/Shanghai
1 row in set (0.01 sec)

QRTZ_TRIGGERS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
mysql> show create table QRTZ_TRIGGERS\G
*************************** 1. row ***************************
Table: QRTZ_TRIGGERS
Create Table: CREATE TABLE `QRTZ_TRIGGERS` (
`SCHED_NAME` varchar(120) NOT NULL,
`TRIGGER_NAME` varchar(200) NOT NULL,
`TRIGGER_GROUP` varchar(200) NOT NULL,
`JOB_NAME` varchar(200) NOT NULL,
`JOB_GROUP` varchar(200) NOT NULL,
`DESCRIPTION` varchar(250) DEFAULT NULL,
`NEXT_FIRE_TIME` bigint(13) DEFAULT NULL,
`PREV_FIRE_TIME` bigint(13) DEFAULT NULL,
`PRIORITY` int(11) DEFAULT NULL,
`TRIGGER_STATE` varchar(16) NOT NULL,
`TRIGGER_TYPE` varchar(8) NOT NULL,
`START_TIME` bigint(13) NOT NULL,
`END_TIME` bigint(13) DEFAULT NULL,
`CALENDAR_NAME` varchar(200) DEFAULT NULL,
`MISFIRE_INSTR` smallint(2) DEFAULT NULL,
`JOB_DATA` mediumblob,
PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
KEY `IDX_QRTZ_T_J` (`SCHED_NAME`,`JOB_NAME`,`JOB_GROUP`),
KEY `IDX_QRTZ_T_JG` (`SCHED_NAME`,`JOB_GROUP`),
KEY `IDX_QRTZ_T_C` (`SCHED_NAME`,`CALENDAR_NAME`),
KEY `IDX_QRTZ_T_G` (`SCHED_NAME`,`TRIGGER_GROUP`),
KEY `IDX_QRTZ_T_STATE` (`SCHED_NAME`,`TRIGGER_STATE`),
KEY `IDX_QRTZ_T_N_STATE` (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`,`TRIGGER_STATE`),
KEY `IDX_QRTZ_T_N_G_STATE` (`SCHED_NAME`,`TRIGGER_GROUP`,`TRIGGER_STATE`),
KEY `IDX_QRTZ_T_NEXT_FIRE_TIME` (`SCHED_NAME`,`NEXT_FIRE_TIME`),
KEY `IDX_QRTZ_T_NFT_ST` (`SCHED_NAME`,`TRIGGER_STATE`,`NEXT_FIRE_TIME`),
KEY `IDX_QRTZ_T_NFT_MISFIRE` (`SCHED_NAME`,`MISFIRE_INSTR`,`NEXT_FIRE_TIME`),
KEY `IDX_QRTZ_T_NFT_ST_MISFIRE` (`SCHED_NAME`,`MISFIRE_INSTR`,`NEXT_FIRE_TIME`,`TRIGGER_STATE`),
KEY `IDX_QRTZ_T_NFT_ST_MISFIRE_GRP` (`SCHED_NAME`,`MISFIRE_INSTR`,`NEXT_FIRE_TIME`,`TRIGGER_GROUP`,`TRIGGER_STATE`),
CONSTRAINT `qrtz_triggers_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `JOB_NAME`, `JOB_GROUP`) REFERENCES `QRTZ_JOB_DETAILS` (`SCHED_NAME`, `JOB_NAME`, `JOB_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
1 row in set (0.03 sec)

mysql> select * from QRTZ_TRIGGERS limit 1 \G
*************************** 1. row ***************************
SCHED_NAME: division
TRIGGER_NAME: 12
TRIGGER_GROUP: timing
JOB_NAME: 12
JOB_GROUP: timing
DESCRIPTION: NULL
NEXT_FIRE_TIME: 1643067001000
PREV_FIRE_TIME: 1642980601000
PRIORITY: 5
TRIGGER_STATE: WAITING
TRIGGER_TYPE: CRON
START_TIME: 1631936627000
END_TIME: 0
CALENDAR_NAME: NULL
MISFIRE_INSTR: 2
JOB_DATA: NULL
1 row in set (0.01 sec)

QRTZ_LOCKS

Quartz集群实现同步机制的行锁表,其表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mysql> show create table QRTZ_LOCKS\G;
*************************** 1. row ***************************
Table: QRTZ_LOCKS
Create Table: CREATE TABLE `QRTZ_LOCKS` (
`SCHED_NAME` varchar(120) NOT NULL,
`LOCK_NAME` varchar(40) NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

mysql> select * from QRTZ_LOCKS\G;
*************************** 1. row ***************************
SCHED_NAME: division
LOCK_NAME: STATE_ACCESS
*************************** 2. row ***************************
SCHED_NAME: division
LOCK_NAME: TRIGGER_ACCESS
2 rows in set (0.01 sec)

集群原理

主要流程

启动流程

QuartzSchedulerThread线程

  1. 先获取线程池中的可用线程数量(若没有可用的会阻塞,直到有可用的);

  2. 获取30m内要执行的trigger(即acquireNextTriggers): 获取trigger的锁,通过select …for update方式实现;获取30m内(可配置)要执行的triggers(需要保证集群节点的时间一致),若@ConcurrentExectionDisallowed且列表存在该条trigger则跳过,否则更新trigger状态为ACQUIRED(刚开始为WAITING);插入firedTrigger表,状态为ACQUIRED;(注意:在RAMJobStore中,有个timeTriggers,排序方式是按触发时间nextFireTime排的;JobStoreSupport从数据库取出triggers时是按照nextFireTime排序);

  3. 等待直到获取的trigger中最先执行的trigger在2ms内;

  4. triggersFired:

    1. 更新firedTrigger的status=EXECUTING;
    2. 更新trigger下一次触发的时间;
    3. 更新trigger的状态:无状态的trigger->WAITING,有状态的trigger->BLOCKED,若nextFireTime==null ->COMPLETE;
    4. commit connection,释放锁;
  5. 针对每个要执行的trigger,创建JobRunShell,并放入线程池执行:

    1. execute:执行job
    2. 获取TRIGGER_ACCESS锁
    3. 若是有状态的job:更新trigger状态:BLOCKED->WAITING,PAUSED_BLOCKED->BLOCKED
    4. 若@PersistJobDataAfterExecution,则updateJobData
    5. 删除firedTrigger
    6. commit connection,释放锁

调度过程中Trigger状态变化如图

调度过程中Trigger状态变化

MisfireHandler线程

下面这些原因可能造成 misfired job:

  1. 系统因为某些原因被重启。在系统关闭到重新启动之间的一段时间里,可能有些任务会被 misfire;
  2. Trigger 被暂停(suspend)的一段时间里,有些任务可能会被 misfire;
  3. 线程池中所有线程都被占用,导致任务无法被触发执行,造成 misfire;
  4. 有状态任务在下次触发时间到达时,上次执行还没有结束;为了处理 misfired job,Quartz 中为 trigger 定义了处理策略,主要有下面两种:
    • MISFIRE_INSTRUCTION_FIRE_ONCE_NOW:针对 misfired job 马上执行一次;
    • MISFIRE_INSTRUCTION_DO_NOTHING:忽略 misfired job,等待下次触发;默认是MISFIRE_INSTRUCTION_SMART_POLICY,该策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW线程默认1分钟执行一次;在一个事务中,默认一次最多recovery 20个;

执行流程:

  1. 若配置(默认为true,可配置)成获取锁前先检查是否有需要recovery的trigger,先获取misfireCount;
  2. 获取TRIGGER_ACCESS锁;
  3. hasMisfiredTriggersInState:获取misfired的trigger,默认一个事务里只能最大20个misfired trigger(可配置),misfired判断依据:status=waiting,next_fire_time < current_time-misfirethreshold(可配置,默认1min)
  4. notifyTriggerListenersMisfired
  5. updateAfterMisfire:获取misfire策略(默认是MISFIRE_INSTRUCTION_SMART_POLICY,该策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW),根据策略更新nextFireTime;
  6. 将nextFireTime等更新到trigger表;
  7. commit connection,释放锁8.如果还有更多的misfired,sleep短暂时间(为了集群负载均衡),否则sleep misfirethreshold时间,后继续轮询;

ClusterManager集群管理线程

初始化:

failedInstance=failed+self+firedTrigger表中的schedulerName在scheduler_state表中找不到的(孤儿)

线程执行:

每个服务器会定时(org.quartz.jobStore.clusterCheckinInterval这个时间)更新SCHEDULER_STATE表的LAST_CHECKIN_TIME,若这个字段远远超出了该更新的时间,则认为该服务器实例挂了;

注意:每个服务器实例有唯一的id,若配置为AUTO,则为hostname+current_time

线程执行的具体流程:

  1. 检查是否有超时的实例failedInstances;

  2. 更新该服务器实例的LAST_CHECKIN_TIME; 若有超时的实例:

  3. 获取STATE_ACCESS锁;

  4. 获取超时的实例failedInstances;

  5. 获取TRIGGER_ACCESS锁;

  6. clusterRecover:

    • 针对每个failedInstances,通过instanceId获取每个实例的firedTriggers;

    • 针对每个firedTrigger:

      • 更新trigger状态:
        • BLOCKED->WAITING
        • PAUSED_BLOCKED->PAUSED
        • ACQUIRED->WAITING
      • 若firedTrigger不是ACQUIRED状态(在执行状态),且jobRequestRecovery=true: 创建一个SimpleTrigger,存储到trigger表,status=waiting,MISFIRE_INSTR=MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY.
      • 删除firedTrigger