深入理解 ReentrantLock

Java 中实现线程同步用得最多的是 synchronized ,但使用 synchronized 本质是锁升级的过程。当升级到重量级锁之后会有性能问题。而且无法当并发数下来之后,无法进行“锁降级”操作。

ReentrantLock 本质是 CAS 操作,都是在用户态进行,没有涉及到重量级锁的内核态指令,而且能够手动控制获取锁和释放锁。


公平锁 & 非公平锁

ReentrantLock 共提供两种初始化方法。一个是默认的无参构造方法,对应的是“非公平锁”;另一个是带一个 fair 参数的构造方法,当传入 true 的时候获取的是“公平锁”,false 为“非公平锁”。

公平 & 非公平有啥区别呢?做个实验测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ReentrantLockTest {

private static long count = 0L;

public static void main(String[] args) {
// 默认是使用"非公平锁"
final ReentrantLock reentrantLock = new ReentrantLock();

// 初始化通过制定 fair 参数,可使用"公平锁"
// final ReentrantLock reentrantLock = new ReentrantLock(true);

for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
public void run() {
for (int j = 0; j < 10; j++) {
reentrantLock.lock();
try {
count++;
System.out.println(Thread.currentThread().getName() + " : " + count);
} finally {
reentrantLock.unlock();
}
}
}
}).start();
}
}
}

公平和非公平都能实现同步效果(最终打印的 count 值为 1000),在此基础上观察打印结果可以发现:

当使用非公平的 ReentrantLock 的时候,同一线程的打印非常密集(某一个线程被连续调度);

而使用公平的 ReentrantLock 的时候,基本看不到同一线程的连续打印(同一线程不会被连续调度)。

所以,所谓的公平非公平其实就是是否允许连续调度吗?尝试从源码中获取答案。


加锁相关 API

ReentrantLock 提供了很多友好的 API。既能够获使用 tryLock 尝试上锁,并把结果进行返回,方便我们自定义加锁失败的处理;也能够使用 lockInterruptibly & interrupt 控制阻塞和中断 …


调用 tryLock 尝试上锁,并将上锁结果返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Main {

public static int count = 0;

public static void main(String[] args) throws InterruptedException {

final ReentrantLock lock = new ReentrantLock(); // 非公平锁
// final ReentrantLock lock = new ReentrantLock(true); // 公平锁

Thread t1 = new Thread(new Runnable() {
@SneakyThrows
public void run() {
lock.lock();
try {
Thread.sleep(3000);
System.out.println("T1: " + String.valueOf(++count));
} finally {
lock.unlock();
}
}
});

Thread t2 = new Thread(new Runnable() {
public void run() {
boolean isLocked = false;

// // 直到获取到锁为止
// for (;;){
// isLocked = lock.tryLock();
// if (isLocked) {
// break;
// }
// }

// 连续三次获取锁失败,放弃操作
for (int i = 0; i < 3; i++) {
// 马上尝试加锁
isLocked = lock.tryLock();
// 在 timeout 时间内尝试加锁
// isLocked = lock.tryLock(1000, TimeUnit.MILLISECONDS);
if (isLocked) {
break;
}
}
if (!isLocked) {
return;
}

try {
System.out.println("T2: " + String.valueOf(count));
} finally {
lock.unlock();
}
}
});

t1.start();
Thread.sleep(200);
t2.start();
}
}

不带参数调用 tryLock 不难理解,就是单纯的获取锁状态或者判断当前线程是否重入即可得出返回的上锁结果,查看源码可以发现在使用非公平锁的情况下最终调用的是 nonfairTryAcquire 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 检查锁的状态
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 检查当前线程是否可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

而公平锁的情况也是类似,只是多了一步检查,检查是否已经有线程正在排队。


而指定 timeout 参数的 tryLock 就比较有意思了,会在尝试获取锁失败后调用 doAcquireNanos 方法,在 timeout 时间段内不断的调用获取锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
...
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,如果获取不成功,则不间断地重新尝试获取
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
...
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;

// 计算出到期时间 deadline
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 在到期时间到达前,使用死循环不断的获取锁
for (;;) {
// 如果前面的节点是头结点(代表自身快要排到队了),尝试获取锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}

// 计算剩余时间,如果到达过期时间,返回结果:获取锁失败
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;

/**
如果还没获取锁而且尚未没到达过期时间,同时距离到期还早(剩余时间大于 spinForTimeoutThreshold = 1000 纳秒)
使用 LockSupport 中的 parkNanos 方法,先将线程的状态改为阻塞状态(目的是为了保证 CPU 不会对其进行调度,一定程度下释放 CPU 资源)
*/
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);

// 如果线程被打断,则抛出异常,结束不断获取锁的逻辑
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

总结一下,不带参数的 tryLock 方法会马上进行一次尝试获取锁操作,并将尝试获取锁结果进行返回;带 timeout 参数的 tryLock 方法会在第一次尝试获取锁失败后调用 doAcquireNanos 方法。在 doAcquireNanos 方法中使用死循环不断尝试获取锁,同时如果距离到期时间还早(距离到期时间超过 1000 纳秒),会先将线程进行 park (加入阻塞队列)。


使用 lockInterruptibly & interrupt 控制阻塞和中断

使用 ReentrantLocklock 方法会阻塞,直到获取到锁为止,而且这种阻塞是无法被中断。因为无论是公平锁还是非公平锁在使用 lock 进行获取锁操作的时候,都是使用死循环不断的获取锁。

如果希望这种阻塞能够被外部中断,则需要使用 lockInterruptibly 代替 lock ,并在希望中断的地方调用线程的 interrupt 终止等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void main(String[] args) throws InterruptedException {

final ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(new Runnable() {
@SneakyThrows
public void run() {
lock.lock();
try {
System.out.println("thread-01 started");
Thread.sleep(Integer.MAX_VALUE);
} finally {
lock.unlock();
}
}
});
t1.start();

// 保证 t1 已经获取到锁
Thread.sleep(200);

Thread t2 = new Thread(new Runnable() {
public void run() {
try {
// 阻塞,获取锁
lock.lockInterruptibly();
System.out.println("thread-02 started");
} catch (InterruptedException e) {
System.out.println("thread-02 interrupted");
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
});
t2.start();

// 保证 t2 已经开始尝试获取锁
Thread.sleep(200);

// 终止 t2 的等待
t2.interrupt();
}

在了解过 ReentrantLock 中的 doAcquireNanos 方法之后,不难猜到 lockInterruptibly 的实现其实也是类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
...
public final void acquireInterruptibly(int arg) throws InterruptedException {
// 如果线程被中断(线程对象调用了 interrupt 方法),则抛出异常,终止阻塞等锁的操作
if (Thread.interrupted())
throw new InterruptedException();

// 先尝试进行一次获取锁,不成功则调用 doAcquireInterruptibly 方法
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
...
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 使用死循环不断的获取锁
for (;;) {
final Node p = node.predecessor();
// 如果前面的节点是头结点(代表自身快要排到队了),尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}

// 再次检查线程是否被中断,中断则抛出异常,终止阻塞等锁的操作
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
...
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

可见 lockInterruptibly 的作用就是用来代替 lock 方法,配合线程对象的 interrupt 实现外部可中断等待的操作。原理就是在 acquireInterruptiblydoAcquireInterruptibly 中有对线程的打断状态进行检查。


更多细节

深入理解 ReentrantLock 的最好方式就是阅读源码,而阅读源码最好是带着目标去阅读。

我们可以尝试带着以下几个问题去阅读。

如何区分公平非公平

要回答这个问题,可以从 ReentrantLock 的两个初始化方法入手。

1
2
3
4
5
6
7
8
9
10
11
12
13
...

private final Sync sync;

public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

...

当使用公平锁时对应的是 FairSync 的 AQS 子类对象,非公平锁则对应名为 NonfairSync 的 AQS 子类对象。

FairSyncNonfairSync 都是 ReentrantLock 中的内部类,同时它们还有一个共同的父类 Sync ,也是 ReentrantLock 中的内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
abstract static class Sync extends AbstractQueuedSynchronizer {

abstract void lock();

/**
尝试以非公平的形式获取锁,代表尝试马上获取锁
这个方法没有留在 NonfairSync 类实现,是以为 ReentrantLock 有一个 tryLock 方法,马上尝试获取锁
也就是无论是 NonfairSync 还是 FairSync 都有马上获取锁的逻辑。以此封装到了父类
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果获取的 state 为 0 ,代表当前锁没被占用,调用 CAS 尝试获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 获取锁成功,记录当前线程为锁占用线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前锁已被占用,判断获得锁的线程是否为当前线程,如果是的话则允许重入,并更新 state
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// 释放锁操作
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 只有当前线程为占用锁的线程才能调用此方法
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;

// 由于存在线程可重入的情况,只有当 state 减到 0(调用获取锁和释放锁的次数一致),才是真正的释放完锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}

// 更新 state
setState(c);
return free;
}

// 省略其他方法:获取占用锁的线程、获取当前是否有被占用锁 ...
}

// 非公平锁
static final class NonfairSync extends Sync {

final void lock() {
// 使用非公平锁获取锁时,会直接使用 CAS 尝试获取锁
if (compareAndSetState(0, 1))
// 获取锁成功,记录当前线程为锁占用线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 尝试再次获取锁,或者从等待队列中获取锁,下面讲到
acquire(1);
}

/**
当非公平锁尝试获取锁时,会马上尝试获取锁,无论在此之前是否有别的线程先尝试获取锁
也就是如果多个线程同时调用 tryAcquire,会出现锁抢夺
先调用的线程未必获取成功,后调用的线程未必获取失败,这也就是非公平锁不公平的地方
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

// 公平锁
static final class FairSync extends Sync {

final void lock() {
// 尝试获取锁,或者从等待队列中获取锁,下面讲到
acquire(1);
}

/**
公平锁和非公平锁尝试获取锁不同的在于,哪怕
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
/**
只有在锁还没被占用,并且也没有线程正在获取锁的情况,当前线程才尝试获取锁
保证了先调用获取锁的线程比后调用获取锁的线程有更大的概率获取到锁,这就是公平锁公平的地方
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

读到这里可以知道,ReentrantLock 是通过两个内部类 FairSyncNonfairSync 来实现公平锁和非公平锁的。

同时这两个内部类都使用从 AQS 的 state 属性来记录是否获取锁和重入次数。

所谓的公平与否其实就是在还没有线程占用锁的时候,是否先判断已经有正在排队获取锁的 Node (线程)再尝试获取锁。

注:state 为 0 代表未获取锁,state 不为 0 时代表重入次数。为保证线程可见性,state 被 volatile 修饰。


公平锁是如何实现“公平”调度的

结果上面的“如何区分公平非公平”可以知道,公平锁只有在锁没被占用并且没有正在排队的 Node(线程)才会去尝试上锁。

但这些被拒绝(调用 tryAcquire 返回 false)的线程是如何处理?我们可以从非公平锁调用 lock() 获取锁失败后的 acquire(1) 进行入手:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
/**
由于存在高并发的情况,所以有可能虽然上一步调用 lock() 获取锁失败,但马上又有线程释放了锁,所以首先会再次尝试获取锁
这里
并且使用的是短路与,如果再次获取锁成功的话,将不会执行 && 的右半部分
如果再次获取锁失败,则执行 && 的右半部分 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
/**
再次获取锁失败,并且进入等待队列成功,将线程进行中断(等待下次被节点唤醒)
Thread.currentThread().interrupt();
*/
selfInterrupt();
}
...
private Node addWaiter(Node mode) {
// 以排他锁的形式,为当前线程创建一个 Node
Node node = new Node(Thread.currentThread(), mode);

// 取出双向链表中的尾节点
Node pred = tail;
if (pred != null) {
// 先将当前节点的 prev 指向当前链表的最终节点,因为这样哪怕当前节点成为尾节点不成功,也不影响当前链表
node.prev = pred;
// 使用 CAS 的方式,将当前线程对应的 Node 设置为尾结点,如果成功再将原本的尾结点的 next 指针指向当前节点,这样当前节点彻底成为链表的尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
...
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的上一个节点
final Node p = node.predecessor();
/**
如果当前节点的上一个节点是头节点,说明将要轮到当前节点获取锁
由于存在高并发,有可能在判断完上一节点是头节点后,上一节点就释放锁了,所以这里的短路与会在判断完上一节点是头结点后,马上尝试获取锁
*/
if (p == head && tryAcquire(arg)) {
// 获取成功后将当前节点设置为头结点,并解除上一节点的对链表的引用,使其成为游离节点,方便 GC 回收
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
如果不是前一位不是头结点,或者前一节点还没释放锁,让线程先进入 wait 状态,等待将来被别的线程唤醒
底层是使用 LockSupport.park(this) 的方式
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

可见,AQS 是通过将当前获取锁的线程添加到一个双向链表的尾部,然后按照从头部往后的方式(只有前一位是头节点的 Node 才会执行 tryAcquire,其余的继续排队)。所以公平锁在高并发场景下,能够实现相对“公平”的调度。


总结

ReentrantLock 就是使用各种 CAS 操作来处理高并发,并且使用 Node 封装了一个双向链表来接受那些正在等待锁的线程。

ReentrantLock 的非公平锁的非公平在于不考虑线程获取锁的先后顺序,每个调用 lock() 的线程都会有一个执行 CAS 获取锁的机会。所以先来的线程不一定能够成功获取锁,后来的线程也不一定获取锁失败。而那些 CAS 获取锁失败的线程会被加入等待队列(双向链表实现)尾部。

ReentrantLock 的公平锁的公平在于考虑了线程获取锁的先后顺序,那些在“锁已经被占用”或者“队列已经有线程正在排队”之后才来的线程根本没有执行 CAS 获取锁的机会,而是直接进入等待队列的尾部。只要那些初次涌入(锁还没占用并且队列还没线程排队)的线程有机会执行 CAS 获取锁,在这些初次涌入的线程中只有一个能够成功获取锁,其余仍然是进入链表尾部。

缓存一致性 由 async & await 引发的协程思考

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×