Java中的“锁”事之ReentrantLock

2020-07-12
AQS 笔记

谈谈“锁”

说起Java的锁,脑袋里第一反应就是关键字synchronized.这是Java提供的基于语言级别的锁,底层是通过cup指令来实现的。对于使用者来说非常简单,容易上手。然而也有一些小缺陷。在早期的jvm中synchronized性能不是太好,而且加锁和释放锁不是很灵活,比如只能在程序正常执行完成和抛出异常时释放锁,对锁的持有很“执着”,获取锁的时候没法设置超时时间等。

除了jvm层面实现的锁之外,JDK中也提供了另外的锁实现。下面从一个例子说起。

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void test0() throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
boolean b = lock.hasQueuedThreads();
System.out.println("t1"+ b);
lock.lock();
System.out.println("t1 start working...");
try {
for (int i = 0; i < 10; i++) {
System.out.println("t1 do working...");
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
Thread t2 = new Thread(() -> {
boolean b = lock.hasQueuedThreads();
System.out.println("t2"+ b);
lock.lock();
System.out.println("t2 start working...");
try {
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println("t2 do working... ");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
Thread t3 = new Thread(() -> {
boolean b = lock.hasQueuedThreads();
System.out.println("t3"+ b);
lock.lock();
System.out.println("t3 start working...");
try {
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println("t3 do working... ");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
System.out.println("++++finished++++");
}

很容易看出,demo中使用了ReentrantLock来作为锁来对三个线程进行协调,确保三个线程顺序执行。使用方式也很简单:在需要保护的代码前后使用lockunlock即可。

既然ReentrantLock能提供和synchronized一样的锁机制,那必须得看看到底这个“锁”有什么黑魔法。

ReentrantLock和AbstractQueuedSynchronizer之加锁

加锁其实是一个很容易理解的过程,其中我认为有点绕的是node结点之间链的摘除和建立,毕竟数据结构的基础还是比较弱,稍微多绕几圈就被整蒙圈了。
在研究AQS锁实现之前得聊一下什么是“公平”和“非公平”锁。所谓公平锁遵循先来的先获得锁,翻译成白话就是大家都是在排队的;而非公平锁则反之,只要有获取锁的机会,那就不顾一切去抢,不排队。

ReentrantLock默认的实现为非公平锁。理论上来说非公平锁比公平锁效率更高。当然也可以通过指定参数来区分是否使用公平锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// ReentrantLock中的lock方法其实
public void lock() {
sync.lock();
}
static final class NonfairSync extends Sync {
//...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
//...
}

NonfairSync作为内部类继承自Sync,而Sync继承自AbstractQueuedSynchronizer
说白了其实就是个模版方法,AQS提供基础实现,子类根据自己需要去自定义不同的逻辑。

接下来根据demo中的几个关于锁的基本操作(lock)来看看其实现细节。

首先lock方法中的compareAndSetState(0, 1)语义是如果当前的值为0,那就更新为1.这是一个基于cpu指令的原子操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* The synchronization state.
*/
private volatile int state;
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

如果更新成功,那就返回true。而这个原子更新的字段为AQS的state。这个字段简单理解为获取锁的标志,整个锁的核心都是围绕着这个字段来完成的。
如果更新成功,那么将当前线程置为exclusiveOwnerThread。这个变量表示当前持有锁的线程。
完整的语义即:当某个线程中的逻辑调用lock方法后,lock对象中的state字段由0更新为1,当前线程持有锁。
那这个线程没执行完操作,还没释放掉锁,后续的线程怎么办?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// AQS中的实现 必须得由子类重写
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// NonfairSync中的重写
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

如果在某个线程获取到了锁之后还没释放,其他线程也执行到lock方法,这时候由于lock对象中state为1,因此没办法更新,所以执行acquire逻辑。而acquire调用nonfairTryAcquire方法。
首先获取state的值,在我们的demo中由于之前的线程没有释放掉锁,这里的c的值为1,而当前线程和lock对象中持有的线程不一样(getExclusiveOwnerThread返回之前持有锁的线程对象)因此这里直接返回false。
当线程中执行的任务很短的时候,短到几纳秒,获取到锁的线程马上释放掉了。这个state值从1变成了0,这里其他线程就有机会再次去“争夺”一次锁,同样使用cas操作将state值从0到1,同时将当前线程置为lock对象的exclusiveOwnerThread字段。最后返回true,表示获取到了锁。
还有一种情形,一个线程多次去lock,这里lock对象中持有的线程锁同一个线程,因此进入到current==getExclusiveOwnerThread()逻辑。做法也很简单,将state再加1即可,这个线程依旧能获取到锁。这就是所谓的可重入(Reentrant),即可以多次获取一个锁。

tryAcquire方法返回为真时,表示当前线程成功获取到了锁,整个lock逻辑已经完成,后面的acquireQueued方法就直接忽略掉。
这里小结一下:

  • AQS使用state变量来标记锁是否被线程获取,使用变量exclusiveOwnerThread标记获取锁的线程;
  • 锁可以被多次获取,这样的锁叫做可重入锁(Reentrant),通过state标记获取锁的次数,同理锁被获取多少次就得释放多少次,不然锁不会被释放;

接下来看看acquireQueued方法的实现。上面说道,当尝试获取锁成功的时候,lock方法就结束了,如果尝试获取锁失败呢?如果失败就进入到acquireQueued方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

其实这里是两个方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg),先调用addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

看到这里,出现了新的数据结构Node,为了更加方便理解,现在不得不对这个数据结构进行说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

首先是俩静态变量,这个变量仅仅是一个标记,并没有实际用途:

1
2
3
4
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

因为AQS有两种模式:独占和共享。独占模式例如demo中的ReentrantLock,而共享模式如并发工具包中的CountDownLatch。接着就是waitStatus变量,同时指定了几个枚举。然后就是thread当前线程,以及前驱后继结点。不难看出这是一个双端链表结构。nextWaiter字段暂时按下不表。

继续看addWaiter方法,由于传入的mode为Node.EXCLUSIVE,因此这里创建的node的nextWaiter字段的值为null,将当前要获取锁的线程也放进node里,然后尝试去“操作”这个node。实际上就是看这个AQS中node队列除了当前创建的还有没有别的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

这里的tailhead都是AQS中的变量,用于操控node链表。他们的更新都是使用cas实现的,保证原子性。如果AQS中没有node链表(没有形成),head和tail都是null,直接走enq逻辑,然后将新创建的这个node返回,如果AQS中有结点存在呢,那就直接将创建的node变成tail。compareAndSetTail(pred, node)的语义为,如果当前tail的值为pred,那么将其更新为node。然后修改后继指针,返回node结点。

再看看AQS中的结点为空的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

逻辑很清晰,这里进一步判断了一下tail是否为空,如果是真为空,那就新建一个node结点作为头结点,同时将tail指向头结点。这时候头结点就是尾结点,且结点内没有数据,只是作为一个标志而已。然而并没有返回,因为是个死循环,头尾结点初始化成功之后,继续走else逻辑,同理将新创建的结点的前驱指向刚才新建的空结点,然后把tail指向自己(Node node = new Node(Thread.currentThread(), mode);)的结点。最后修改后继指针并返回。这个逻辑看起来比较绕,尤其是指针的操作让人眼花缭乱,通过画图会更容易理解。总结一句话就是:创建node链表,初始化tail和head指针,且head指针指向的是一个空node(仅仅有意义的事waitStatus=0,因为没给值,默认就是0)。而返回的新创建的node作为acquireQueued参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

核心逻辑又是一个死循环,首先获取刚才创建的node结点的前驱结点,如果前驱结点为head结点(空的结点),可以再给这个线程一次机会,尝试获取锁。tryAcquire之前说过,这里不赘述。如果运气好,获取到了(state从0到1)返回true,将当前结点设置为head,同时摘除链表关系,也就是那个空结点被释放了,这个时候head结点可不是空结点了,而是Node node = new Node(Thread.currentThread(), mode);创建出来的。最后返回false,获取锁成功。为什么要来这一出呢?因为如果之前获取锁的线程执行任务的时候,其他线程在尝试着排队的时候还是有机会去抢一下的,说不定哪一瞬间任务结束释放了锁其他线程刚好抢到了呢?当然这也是有前提的,当线程决定去排队,且是排第一个的时候才能有多一次机会去抢锁。这里有疑问了,这个不是非公平的么?为啥还得排第一个才能抢?其实并不矛盾,因为每个线程都至少有一次机会去抢锁,通过tryAcquire。只有没抢到的,打算排队的,排到第一个的线程有第二次机会。当然,就算某个线程排第一,多一次抢锁机会,也不一定必然抢到呀,因为别的线程依旧和这个线程一样,同样是通过tryAcquire来抢的,因此是公平的,严格来说不公平,因为排第一的线程多了一次机会。

如果这个排第一的倒霉鬼还是没获取到锁,那就很难受了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

这里我们先考虑前驱结点为空结点的情况,之前提到,空结点的waitStatus没有赋值,默认为0,因此这里直接走compareAndSetWaitStatus(pred, ws, Node.SIGNAL);逻辑,将其置为-1;最后返回false,然而for死循环的逻辑还没结束,还会继续尝试获取一下锁,如果还是没获取到,那就再次进入到shouldParkAfterFailedAcquire中,因为第一次循环中将其waitStatus从0设置为了-1,因此这里直接返回true,所以,当在ASQ内部中“排队”的线程数第一个,是有两次次额外的获取锁的机会的。
接着就是parkAndCheckInterrupt逻辑了:

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

park为停车的意思,这里理解为挂起也没太大毛病。说白了就等着呗,等到什么时候为止呢?那就得从unlock说起了。

ReentrantLock和AbstractQueuedSynchronizer之释放锁

demo中的解锁方法unlock对应的实现逻辑为release

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

类似地,先尝试去释放一下:

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

先把state值减回去,判断一下当前的线程是不是AQS中锁持有的线程,如果不是那就说明有问题。如果当state还原为0了说明锁被释放掉了,同时将当前AQS持有的线程置为空。最后将当前state值更新(更新为减1后的,这里并不一定是0)。正如上文提到过,如果加多次锁,那么也得释放多次。如果没获释放掉,那就说明当前锁依旧被持有。

如果更新state成功,那么还需要做的一件事就是处理node结点。如果AQS中的头结点不为空,且状态不是默认的初始化的0,那么就去唤醒后继结点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

这里逻辑也十分清晰,先更新结点的waitStatus,将其置为0.然后找到后面的结点,如果不是空,那就将其唤醒,和之前是park一一对应。这里还多了一段判断锁被取消的情况,注释中也写得很清晰,意思就是从node链表的尾部开始找,一直找到符合要求的结点将其唤醒。

唤醒了还没完,因为等待锁的线程被park了后还得继续执行后续的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 锁释放之后 这里会继续执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

parkAndCheckInterrupt返回的值为当前线程的中断状态,如果当前(获取锁的)线程被设置了中断标记,那么这个方法就直接返回true。即interrupted=true
由于是死循环,同样的当前线程(被唤醒的)获取一下锁,因为AQS中的state已经还原了,所以这里能拿到,将当前结点设置为头结点,获取锁完成。由于可能存在当前获取锁的线程由于某种情况被设置了中断标记,那么就将其中断(也只是设置中断标记)。

相比获取锁的操作,释放锁容易很多。

小结

本文基于ReentrantLock锁的基础实现,对AQS的大致原理进行了比较粗略的分析。如AQS的底层结构,核心的API等。通过锁的基础操作,如加锁和释放锁背后的逻辑进行了详细解读。当然还有很多没有涉及到的地方,如条件队列,共享模式的实现,公平和非公平的体现等。当知道了AQS的原理之后,去理解这些主题也是非常轻松的。总的来说,AQS的代码量不算太多,读起来不是很吃力。

PS:在云笔记中发现18年的时候也写过一篇AQS的文章,现在居然一点印象都没有了,时间啊,是真残酷。2年前的笔记

参考资料

Java并发锁框架AQS(AbstractQueuedSynchronizer)原理从理论到源码透彻解析

Java里一个线程调用了Thread.interrupt()到底意味着什么?


留言: