浅析JDK中的定时器Timer实现

2018-04-21
JDK SOURCE

在jdk中处理定时任务工具类中有2种:Timer和ScheduledExecutorService。前者是在java.util包中,从1.3版本开始,属于比较老的工具类了。而ScheduledExecutorService属于java.util.concurrent包,作者是老爷子Doug Lea,1.5版本开始才有。虽然现在大多使用ScheduledExecutorService,但是我觉得很有必要对其“同宗”Timer的实现进行解读。

举个栗子

官方文档中的解释是这样的:

A facility for threads to schedule tasks for future execution in a background thread. Tasks may be scheduled for one-time execution, or for repeated execution at regular intervals.

线程的工具,用于在后台线程中安排将来执行的任务。 可以安排一次性执行任务,或定期重复执行任务。

This class is thread-safe: multiple threads can share a single Timer object without the need for external synchronization.
This class does not offer real-time guarantees: it schedules tasks using the Object.wait(long) method.
Java 5.0 introduced the java.util.concurrent package and one of the concurrency utilities therein is the ScheduledThreadPoolExecutor which is a thread pool for repeatedly executing tasks at a given rate or delay. It is effectively a more versatile replacement for the Timer/TimerTask combination, as it allows multiple service threads, accepts various time units, and doesn’t require subclassing TimerTask (just implement Runnable). Configuring ScheduledThreadPoolExecutor with one thread makes it equivalent to Timer.

这个类是线程安全的,但不能保证是实时的,因为使用的是wait方法来调度任务。文档还说了建议使用JUC下的ScheduledThreadPoolExecutor(ScheduledExecutorService的实现)类来处理定时任务,这个类提供的功能更多,参数更灵活,而且是多线程的。当线程池size指定为一那就和Timer一个样了。

下面通过一个demo来展示一下相关api的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class JdkTimerDemo {
public static void main(String[] args) {
Timer timer = new Timer("timer-demo");
MyTask[] tasks = new MyTask[20];
for (int i = 0; i < tasks.length; i++) {
tasks[i] = new MyTask(i);
System.out.println("time no " + i + " task start at " + new Date());
timer.schedule(tasks[i], 2 * 1000, 3 * 1000);
}
System.out.println("timer started at: " + new Date());
}
private static class MyTask extends TimerTask {
int timeNo;
public MyTask(int timeNo) {
this.timeNo = timeNo;
}
/**
* 如果任务出现异常不被捕获,其他任务不会被执行
*/
@Override
public void run() {
if (timeNo == 8) {
throw new RuntimeException("boom");
}
System.out.println("time no " + timeNo);
}
}
}

这个demo展示一个Timer的弊端。当任务中出现未被捕获的异常,接下来的任务都不会被执行,定时器crash掉。这个demo中定义了20个task交给timer调度,timer启动2秒后开始执行任务,每隔3秒执行一次。接下来就跟着demo一步一步看看其中的实现原理。

源码

首先看看构造器:

1
2
3
4
5
6
7
8
9
10
public Timer(String name) {
thread.setName(name);
thread.start();
}
public Timer(String name, boolean isDaemon) {
thread.setName(name);
thread.setDaemon(isDaemon);
thread.start();
}

构造器的参数代表着timer的名字和这个timer是否是后台运行的(timer本质上是一个线程)。一旦实例化就将这个线程给启动了。这个线程是核心。

1
private final TimerThread thread = new TimerThread(queue);

可以看到这个成员属性TimerThread内部维护着一个queue。这个queue上什么稍后再讲。

实例化结束后就得调用它的schedule方法了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
// 核心的是这段逻辑
private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");
// Constrain value of period sufficiently to prevent numeric
// overflow while still being effectively infinitely large.
// 防止溢出 如果周期比最大值的一半还大 那就将其除以2 道理何在?
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;
synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");
synchronized(task.lock) {
// 如果任务状态不为新创建的 直接抛异常
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
// 设置任务下次执行时间 执行周期 以及 状态
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
// 任务实际上是放在队列中 并不是直接执行的
queue.add(task);
// 为什么需要queue.getMin() == task时才调用notify方法呢?
if (queue.getMin() == task)
queue.notify();
}
}

这段代码看似简单,但也是有点门道的。在设置任务属性的时候采用加锁,避免并发下把不同任务的执行周期等参数搞混乱了。添加任务到队列中也使用了加锁,实际上是针对add方法加的锁。保证add任务不出乱(这个队列不是安全的队列)。最后有个判断,为什么需要queue.getMin() == task时才调用notify方法呢?因为只有新加入的task是所有Task中要被最早执行的task时,才会需要打断TimeThread的等待状态。举个例子,当前队列中有两个task,分别是A(3分钟后到时间)、B(5分钟后到时间),此时TimerThread正在等待A的时间到来,所以会调用queue.wait(3min),这个时候,队列中新增一个任务C(1分钟后到时),如果不打断queue.wait(3min),那当wait(3min)自然结束时,C任务已经过期了… 但是如果新加入的C任务是需要在4分钟后执行,那就没必要打断wait(3min)的状态,因为就算wait(3min)自然结束时,C也还没到时间.

任务是什么

说了这么久,还没弄清楚这个任务是个什么玩意?按照直觉说到任务第一想到就是Runnable对象,但是也有其他情况下不是这样的。然而这个任务确实是个Runnable对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class TimerTask implements Runnable {
// 对象锁
final Object lock = new Object();
// 任务状态 新建 已调度 已执行 已取消
// 默认是新建
int state = VIRGIN;
static final int VIRGIN = 0;
static final int SCHEDULED = 1;
static final int EXECUTED = 2;
static final int CANCELLED = 3;
// 下次执行的时间
long nextExecutionTime;
// 周期
long period = 0;
protected TimerTask() {
}
public abstract void run();
public boolean cancel() {
synchronized(lock) {
boolean result = (state == SCHEDULED);
state = CANCELLED;
return result;
}
}
public long scheduledExecutionTime() {
synchronized(lock) {
return (period < 0 ? nextExecutionTime + period
: nextExecutionTime - period);
}
}
}

TimeTask对Runnable进行简单封装。暴露出抽象的run方法让用户的具体实现类去完成。同时提供了取消任务的方法。和JUC中不同的是这里状态的转化非常简单,没有JUC中动不动就用CAS的骚操作。老版本的代码就是好理解些。

任务的调度(定时器的核心)

既然都知道任务是什么了,接下来看看队列中的任务是怎么被取出来和执行的。首先得看看这个线程是个什么样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class TimerThread extends Thread {
// 标记这个线程是不是要被挂起
boolean newTasksMayBeScheduled = true;
// 维护一个队列 用于存放任务的队列
private TaskQueue queue;
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}
// 具体执行核心逻辑 开死循环跑
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
// 如果队列中没有任务,而且定时器没有被取消(默认为true,只有将timer取消cancel方法调用的时候将其置为false,还有一个地方) 就得将这个线程挂起 不然就造成了死循环 cpu直接上100%
// 而唤起的地方只有cancel方法和threadReaper
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
// 任务队列空了 定时器取消了 跳出循环 线程结束
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first evt and do the right thing
long currentTime, executionTime;
// 从队列中取出最delay时间最小的任务 得最早执行的任务
task = queue.getMin();
synchronized(task.lock) {
// 任务取消了,将其移出队列 继续取下一个
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
// currentTimeMillis()返回以毫秒为单位的当前时间,返回的是当前时间与1970 年 1 月 1 日午夜之间的时间差
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
// 如果给定的到期时间小于当前时间(1970-01-01到现在的差值)说明任务到期了 需要被执行 把taskFired置为true
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
// 不是周期执行的任务 直接从队列中移除掉 将任务状态置为已执行
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
// 是周期任务 重新计算下次执行的时间 即当前时间+间隔时间为下次任务执行的时间
// 这里是➖因为之前传进来的是一个负值
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
// 如果没到期 就等一个delay长的时间
// executionTime = System.currentTimeMillis()+delay
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
// 任务启动 执行业务逻辑
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
}
小疑惑

整个定时器的核心逻辑在代码注释中都一一解释了。其中一个细节:变量newTasksMayBeScheduled用来做什么的。
首先得看看它的值被置为false的情形在哪些地方出现。

  • Timer#cancel方法
  • TimerThread的mainloop执行完了finally块中
  • Timer的成员属性threadReaper的finalize方法中

前两者都不必多解释,值得注意的是最后一种情况。这种写法我还是第一次见,还不明白其中的玄机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* This object causes the timer's task execution thread to exit
* gracefully when there are no live references to the Timer object and no
* tasks in the timer queue. It is used in preference to a finalizer on
* Timer as such a finalizer would be susceptible to a subclass's
* finalizer forgetting to call it.
*/
private final Object threadReaper = new Object() {
protected void finalize() throws Throwable {
synchronized(queue) {
thread.newTasksMayBeScheduled = false;
queue.notify(); // In case queue is empty.
}
}
};

当queue为空,并且没人调用add或cancel方法时,TimerThread永远都不会stop,那么还有别的可能吗?

上述的做法就提供了另外的思路。当队列中没有任务的时候,TimerThread会wait,如果不手动调用cancel这个线程一直会挂起。聪明的jdk就提供了上述的方法。当在GC的时候会触发finalize方法调用,那什么时候会触发GC呢?当Timer对象没有被任何对象引用的时候如果有GC那么这段代码被调用:newTasksMayBeScheduled置为false同时将挂起的TimerThread唤醒,这时候mainloop死循环就跳出了,TimerThread线程结束!

具体验证可以使用Jprofile这个工具。事实上确实可行。但是通过这段代码让我联想到一个这个定时器的弊端:没做到像ExecutorService能够等到任务全部执行完成后再将其关闭。手动关闭只能通过cancel这种粗暴的方式,还好jdk工程师提供这样一个“后门”,交给jvm来管理。这也不失为一种补救措施,但是对于之后的JUC而言,这种做法显得有点“小儿科”了。但是对于学习者而言,这种代码组织显得更容易读懂,设计思想很容易呈现在我们面前。时代不断进步,总会有好的设计来取代老的旧的东西。并不是意味着老的旧的真正被取代,而是以另外一种价值呈现在我们后来人面前。通过比较,我们可能会更加理解什么是好的设计。

补充

整个Timer定时任务的核心逻辑就梳理完了。其中有一些细节被忽略掉了,比如这个任务队列queue的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* This class represents a timer task queue: a priority queue of TimerTasks,
* ordered on nextExecutionTime. Each Timer object has one of these, which it
* shares with its TimerThread. Internally this class uses a heap, which
* offers log(n) performance for the add, removeMin and rescheduleMin
* operations, and constant time performance for the getMin operation.
*/
class TaskQueue {
// 维护一个长度为128的数组
private TimerTask[] queue = new TimerTask[128];
// 队列长度
private int size = 0;
int size() {
return size;
}
/**
* Adds a new task to the priority queue.
* 当前队列长度为最大长度-1的时候就进行扩容 新队列长度为当前2倍
* 为什么要在最大长度-1的时候扩容呢?因为不提前扩容当前的元素加不进去😂
*/
void add(TimerTask task) {
// Grow backing store if necessary
if (size + 1 == queue.length)
queue = Arrays.copyOf(queue, 2*queue.length);
queue[++size] = task;
fixUp(size);
}
/**
* 获取第一个元素 到期时间最近的
*/
TimerTask getMin() {
return queue[1];
}
// 下标从1开始取
TimerTask get(int i) {
return queue[i];
}
/**
* 移除第一个元素 到期时间最近的
*/
void removeMin() {
queue[1] = queue[size];
queue[size--] = null; // Drop extra reference to prevent memory leak
fixDown(1);
}
void quickRemove(int i) {
assert i <= size;
queue[i] = queue[size];
queue[size--] = null; // Drop extra ref to prevent memory leak
}
// 改第一个元素的到期时间属性
void rescheduleMin(long newTime) {
queue[1].nextExecutionTime = newTime;
fixDown(1);
}
boolean isEmpty() {
return size==0;
}
void clear() {
// Null out task references to prevent memory leak
for (int i=1; i<=size; i++)
queue[i] = null;
size = 0;
}
/**
* Establishes the heap invariant (described above) assuming the heap
* satisfies the invariant except possibly for the leaf-node indexed by k
* (which may have a nextExecutionTime less than its parent's).
*
* This method functions by "promoting" queue[k] up the hierarchy
* (by swapping it with its parent) repeatedly until queue[k]'s
* nextExecutionTime is greater than or equal to that of its parent.
*/
private void fixUp(int k) {
while (k > 1) {
int j = k >> 1;
if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
/**
* Establishes the heap invariant (described above) in the subtree
* rooted at k, which is assumed to satisfy the heap invariant except
* possibly for node k itself (which may have a nextExecutionTime greater
* than its children's).
*
* This method functions by "demoting" queue[k] down the hierarchy
* (by swapping it with its smaller child) repeatedly until queue[k]'s
* nextExecutionTime is less than or equal to those of its children.
*/
private void fixDown(int k) {
int j;
while ((j = k << 1) <= size && j > 0) {
if (j < size &&
queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
j++; // j indexes smallest kid
if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
/**
* Establishes the heap invariant (described above) in the entire tree,
* assuming nothing about the order of the elements prior to the call.
*/
void heapify() {
for (int i = size/2; i >= 1; i--)
fixDown(i);
}
}

这个队列是一个优先队列,以task的到期时间来排序,时间越小越靠前。这个队列是用堆结构实现的(废话,优先队列本身就是堆),实际上堆本质上就是完全二叉树。对于add remove操作的复杂度为log(n)。最核心的操作就是fixDown fixUp。堆也分为大根堆和小根堆,这是一个小根堆,最小的放到最上面,也就是下标为1的位置(只是Timer中的实现将下标为0的位置给弃用了)。每次添加元素都得调整堆结构,同理移除的时候也得这样做。

本文中的demo

小结

  • 源码读起来没那么难受,仔细推敲还是很有意思的。
  • 相比老爷子Doug Lea的骚代码,这种中规中矩的写法看起来更让人容易理解。
  • 建议使用ScheduledExecutorService,毕竟Timer的适用场景很局限。

留言: