Java 多线程 & 锁原理

后端讲求的是高并发、高性能、高可用(3H),但是要实现 3H,通常是通过优化架构(横向分层,纵向分割)、使用缓存、分布式化和集群化等手段来实现。很少会自己写线程代码,日常开发在需要用到多线程的地方也大多都交给框架处理,对多线程和锁原理的理解一直不够深入。

趁着周末有时间,从简单的线程创建方式开始讲起,逐步深入了解关于线程的几种状态和锁原理。针对一些以前没写过独立文章的锁实现,还会展开来讲。


Java 创建线程的几种方式

先简单介绍一下创建线程的几种方式:

  • 继承 Thread

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    class SubThread extends Thread {
    @Override
    public void run() {
    System.out.println(getName());
    }

    public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
    SubThread subThread = new SubThread();
    subThread.start();
    }
    }
    }

  • 实现 Runnable 接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class RunnableImpl implements Runnable {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName());
    }

    public static void main(String[] args) {
    RunnableImpl runnable = new RunnableImpl();
    for (int i = 0; i < 10; i++) {
    Thread thread = new Thread(runnable);
    thread.start();
    }
    }
    }

    由于 Runnable 接口只有一个 run 方法需要我们实现,所以也可以在创建 Thread 实例的时候传入 lambda 表达式,这里不再赘述。

  • 线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    class RunnableImpl implements Runnable {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName());
    }

    public static void main(String[] args) {
    final int count = 10;
    ExecutorService threadPool = Executors.newFixedThreadPool(count);
    RunnableImpl runnable = new RunnableImpl();
    for (int i = 0; i < count; i++) {
    threadPool.submit(runnable);
    }
    }
    }

    Executors 作为线程池工具类提供了诸多快速创建线程池的 API 。一部分底层使用的是 ThreadPoolExecutor ,一部分底层使用的是 ForkJoinPool

    使用线程池将不再需要显式创建线程,而只需要将任务提交到线程池(线程池内部使用一个 BlockingQueue 存放),线程池内部会自动创建/销毁线程来完成任务。

    更多与线程池相关的内容可以看:深入理解 Java ThreadPool


线程的几种状态

首先 Java 中的线程和操作系统的线程是一一对应的关系。线程状态可以通过调用线程的 getState() 方法获取,返回值是一个状态枚举:

1
2
3
4
5
6
7
8
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}

前面也说了创建线程的几种方式,无论是通过何种方式创建的线程,总离不开以下 6 种线程状态:

  1. New :一个线程被 New 出来之后,调用 start() 方法之前,就是 New 状态


  2. Runnable :一个被创建出来的线程调用 start() 方法,线程开始进入 Runnable 状态

    1. Ready :就绪状态,指线程可以被 CPU 执行,所有的处于 Ready 状态的线程会被存放在一个等待队列里。当线程被调度器选中之后,会从 Ready 状态转换为 Running 状态

    2. Running :运行状态,指线程正在被 CPU 执行。当线程被挂起或调用 Thread.yleid() ,会从 Runniing 切换为 Ready 状态


  3. TimedWaiting :线程在 Running 状态调用以下方法会进入 TimedWaiting 状态,等待时间到了之后,线程会重新变为 Ready 状态(回到就绪队列当中)

    • Thread.sleep(millis);

    • o.wait(timeout);

    • thread.join(millis);

    • LockSupport.parkNanos(nanos);

    • LockSupport.parkUntil(deadline);


  4. Waiting :线程在 Runnable 里的 Running 状态调用以下 ① 方法会进入 Waiting 状态,直到调用以下 ② 方法回到 Ready 状态

    1. Running 变为 Waiting 状态的方法:

      • o.wait();
      • thread.join();
      • LockSupport.park();
    2. Waiting 状态回到 Ready 状态的方法:

      • o.notify();

      • o.notifyAll();

      • LockSupport.unpark(currentThread);


  5. Blocked :一个处于 Running 状态的线程试图获取进入同步代码块的锁失败的时候,会进入 Blocked 状态。直到获取到进入同步代码块的锁,回到 Ready 状态


  6. Teminated :当线程任务正常完成后的线程状态

注意:当一个线程的处于 Teminated 状态时,不能通过调用 start() 重新回到 Runnable 状态。


AQS

在讲具体的锁之前,先来了解一下 AQS ( java.util.concurrent.locks.AbstractQueuedSynchronizer )。

AQS 为 Java 中的各种 CAS 锁提供了上层抽象,AQS 中最为核心的四部分内容:

  1. Node 内部类。由于 AQS 中使用双向链表存储想要获取锁的线程,Node 作为双向链表中的节点类,与线程进行绑定,同时记录前一位和后一位节点,同时设立了 Node 一些状态属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;
volatile Node next;
volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { }

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

  1. state 属性值。一个被 volatile 修饰的 int 类型属性(保证了线程可见性)。提供了基本的 Setter & Getter。至于 state 代表的含义是什么要看具体的子类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
private volatile int state;

protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}
...
}

  1. 各种 CAS 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}

private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
...

  1. 骨架逻辑。不难发现,AQS 封装了不少骨架逻辑,使得子类只需要实现部分方法即可以完成自定义锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
...
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
...


有了以上的基础认知,现在可以来看看 Java 的几种锁类型。

synchronized

synchronized 是最常用的实现线程安全的通用方案,synchronized 本质是锁升级过程。

使用 synchronized 的方式有两种。一种是直接使用同步代码块;另一种是将 synchronized 加在方法上,如果修饰的方法是实例方法,则使用 this 作为锁对象,如果修饰的方法是静态方法,则使用所在类的类对象作为锁对象。整个锁升级过程为:偏向锁 -> 轻量级锁 -> OS 锁 。

关于 synchronized 更详细的内容可以查看:深入理解 synchronized


ReentrantLock

ReentrantLock : 基于 CAS ,相比于 synchronized 能够更好的控制锁的状态。 synchronized 只要进入代码块就代表上锁,离开代码块就是释放锁;而 ReentranLock 需要手动的 lock 和 unlock,同时还提供 tryLock 方法 ,能够让我们在尝试获取锁失败后进行自定义操作。

ReentrantLock 还支持在初始化的实时指定 fair 参数,代表是否使用公平策略。公平锁使用的是 ReentrantLock 内部的 FairSync ;非公平锁使用的是 NonfairSync 。两者都间接继承自 AQS ,对 AQS 中的 state 的运用是用作记录是否上锁以及当前重入次数。

而 tryLock 方法,能够指定一个 timeout 参数,会在指定时间内进行尝试加锁,并返回加锁结果,不会像 lock 方法那样一直阻塞直到获取成功为止。

关于 ReentrantLock 更详细的内容可以查看:深入理解 ReentrantLock


ReadWriteLock

ReadWriteLock : 读写锁,其实是包含读锁(共享锁)和写锁(排它锁)。

读锁(共享锁): 当添加的是读锁,允许其他的线程同样使用读锁进入(其他的读线程),不允许使用写锁进入(写线程)。即读读并发,读写不并发。

写锁(排它锁): 当添加的是写锁的时候,其他使用读锁或者写锁的线程都不能进入。

关于 synchronized 更详细的内容可以查看:深入理解 ReadWriteLock


LockSupport

LockSupport :

  • synchronized 的实现原理 & 锁升级问题

关于 LockSupport 更详细的内容可以查看:深入理解 LockSupport


CountDownLatch

CountDownLatch : 倒计时,初始化的时候指定一个倒计时数值,当倒计时结束后,调用 wait 的线程会往下执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws InterruptedException {
int count = 100;
// 设置倒计时数为 100
final CountDownLatch countDownLatch = new CountDownLatch(count);

for (int i = 0; i < count; i++) {
new Thread(new Runnable() {
@SneakyThrows
public void run() {
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + ": " + countDownLatch.getCount());
// 每个线程调用一次 countDown 代表倒计时减 1
countDownLatch.countDown();
}
}).start();
}

System.out.println("countdown start");

// 在倒计时结束前(count 数为 0)一直阻塞,直到倒计时结束
countDownLatch.await();

System.out.println("countdown end");
}

没写过独立分析 CountDownLatch 的文章。这里简单分析下 CountDownLatch 的实现。CountDownLatch 的源码也十分简单,本质就是将 AQS 中的 state 作为计数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class CountDownLatch {

private static final class Sync extends AbstractQueuedSynchronizer {

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

/**
对当前 state 值固定减一,没有使用 release 参数
也就是说只能每次调用 countDown 来进行倒计时减一操作
而且只有 state 减为 0 ,才算是真正的锁释放,AQS 中的 doReleaseShared 方法才被执行,对线程执行 unpark 操作
*/
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

/**
尝试获取锁,如果获取失败,对线程进行 park 操作
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

public long getCount() {
return sync.getCount();
}
}

所以 CountDownLatch 顾名思义,就是将 AQS 中的 state 作为计数器使用,每次调用 countDown 则对 state 进行减一操作,只有 state 减到 0 才算释放锁。


CyclicBarrier

CyclicBarrier : 循环障碍。对调用了 await 方法的线程进行等待,直到有达到数量的等待线程之后再集体释放。

设定一个循环阈值和 Runnable 对象,每达到一次循环阈值,执行一次 Runnable 对象的 run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
int batchNumber = 10;

final CyclicBarrier cyclicBarrier = new CyclicBarrier(batchNumber, new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + ": " + "数量达到规定,集体释放等待队列中的线程。");
}
});

for (int i = 0; i < batchNumber; i++) {
new Thread(new Runnable() {
@SneakyThrows
public void run() {
for (int j = 0; j < 100; j++) {
System.out.println(Thread.currentThread().getName() + ": " + String.valueOf(j));
// 调用该方法会将当前线程放入 trip 等待队列中,直到队列中的数量达到阈值,再集体唤醒队列中的所有线程
cyclicBarrier.await();
}
}
}).start();
}
}

上述实验的打印结果是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Thread-0: 0
Thread-4: 0
Thread-3: 0
Thread-2: 0
Thread-1: 0
Thread-6: 0
Thread-5: 0
Thread-7: 0
Thread-8: 0
Thread-9: 0
Thread-9: 数量达到规定,集体释放等待队列中的线程。
Thread-9: 1
Thread-0: 1
Thread-3: 1
Thread-6: 1
Thread-8: 1
Thread-1: 1
Thread-2: 1
Thread-4: 1
Thread-7: 1
Thread-5: 1
Thread-5: 数量达到规定,集体释放等待队列中的线程。
...

每次线程打印一次当前计数之后被加入 trip 等待队列,等待其他线程同样的值打印完了相同的值之后(这时候 trip 达到规定数量),才会继续执行(达到规定数量的 trip 队列会对所有线程集体释放)。

阅读 CyclicBarrier 相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class CyclicBarrier {
// 一个帮助记录是否打断控制的内部类,主要用作跳出死循环
private static class Generation {
boolean broken = false;
}

// 是否打断控制的变量
private Generation generation = new Generation();

// 使用 ReentrantLock 作为锁对象
private final ReentrantLock lock = new ReentrantLock();
// 使用 lock.newCondition() 来创建一个 trip 队列
private final Condition trip = lock.newCondition();
// 保存调用构建函数时指定的循环数值
private final int parties;
// 保存调用构建函数时指定的 Runnable
private final Runnable barrierCommand;
// 当前的实际计数数值,当 count = parties,代表需要执行 Runnable 的 run 方法了
private int count;

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
...
// 每次调用 await 方法(指定超时或者不指定超时),都会执行该方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

// 每次都会对 count 进行减一操作
int index = --count;
// 达到阈值(trip 队列中线程数量达到 parties 个)
if (index == 0) {
boolean ranAction = false;
try {
// 执行 Runnable 的 run 方法,通常在最后一位添加到 trip 的线程中执行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 该方法的作用是集体释放 trip 等待队列队列中的线程,重置 count 值和 generation 控制变量
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// 如果 trip 队列中线程还没达到阈值,使用死循环一直对等待队列执行 await
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

// 当数量满足阈值,generation 会被重置,跳出死循环
if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
}

总结一下,CyclicBarrier 的底层使用 ReentrantLock 作为锁,并调用 newCondition 方法来创建 trip 等待队列,在等待队列的线程数没达到阈值时,调用 await 方法让队列中的线程集体等待,直到等待队列中线程数达到阈值,调用 signalAll 来对队列中的线程集体唤醒,同时重置计数器进行下一次的循环计数控制。


Phaser

Phaser : 相位器,也称为阶段器。可以看作是一个分段的 CyclicBarrier

当一个线程调用 arriveAndAwaitAdvance 的时候,代表该线程到达当前阶段,进入等待队列,直到所有线程都到达当前阶段(等待队列满了),再集体释放,进入下一阶段或结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
int workerNum = 3;
int phases = 4;

Phaser phaser = new Phaser(workerNum) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("phase: %s - parties: %s", phase, registeredParties));
return registeredParties == 0;
}
};

for (int i = 0; i < workerNum; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < phases; j++) {
System.out.println(Thread.currentThread().getName());
// 到达并等待前进,是指该线程到达这个阶段,等待其他线程到达这个阶段再一同前进。
phaser.arriveAndAwaitAdvance();
}
}
}).start();
}
}

当任务涉及多个阶段,并需要某一个阶段的任务全部完成后才能开始下一阶段的任务的时候,可以考虑使用 Phaser


Semaphore

Semaphore : 信号量,指定允许多少个线程同时执行(获得锁)。使用上与 ThreadPoolExecutor 的 maximumPoolSize 参数类似。最多允许有多少任务同时执行。

ReentrantLock 类似,支持公平锁和非公平锁。通过构造函数的 fair 参数进行指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
int count = 5;
Semaphore semaphore = new Semaphore(count);

for (int i = 0; i < count * 2; i++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
try {
// 阻塞直到获取锁
semaphore.acquire();
System.out.println(Thread.currentThread().getName());
Thread.sleep(5000);
} finally {
// 释放锁
semaphore.release();
}
}
}).start();
}
}

Semaphore 在实现上也 ReentrantLock 大致相同,内部有一个继承自 AQS 的内部类 Sync 。将 AQS 中的 state 作为计数器使用,记录当前剩下多少个可运行名额。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
...
abstract static class Sync extends AbstractQueuedSynchronizer {

Sync(int permits) {
setState(permits);
}

final int getPermits() {
return getState();
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// remaining >= 0 代表还有可用名额,使用 CAS 尝试获取
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 释放锁就是归还可用名额
if (compareAndSetState(current, next))
return true;
}
}

final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

static final class NonfairSync extends Sync {

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

static final class FairSync extends Sync {

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 相比只要有可用名额,调用 acquire 就有机会调用 CAS 尝试获取锁的非公平版本不同
// 公平版本会在多一步检查,检查是否已经有别的线程正在排队
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
}
...


Exchanger

Exchanger : 用作线程数据交换。

调用 exchange 方法后当前线程会阻塞,直到别的线程也调用 exchange 完成交换,才会往下执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {
Exchanger<Object> exchanger = new Exchanger<>();

new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
String s = (String) exchanger.exchange("String from T1");
System.out.println(Thread.currentThread().getName() + ": " + s);
}
}, "T1").start();

new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
String s = (String) exchanger.exchange("String from T2");
System.out.println(Thread.currentThread().getName() + ": " + s);
}
}, "T2").start();
}

查阅 Exchanger 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
// 用作交换数据的 Node 类,还用了 @sun.misc.Contended 注解保证一个对象能够在一个独立的缓存行里
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}

/** The corresponding thread local class */
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
...


CAS

  • ABA 问题

上面说到的几种锁,除了 synchronized ,其余都是基于 CAS 实现的。也就是其实现线程同步的方式都是:

  1. 取出当前的值,赋值给临时变量;或者直接指定希望的初始值
  2. 计算要设置的值;或者直接指定希望的目标值
  3. 将当前最新值和之前赋值的临时变量进行比较
    1. 如果相同,说明期间没有被其他线程修改过,将目标资源更新为要设置的值
    2. 如果不同,说明期间有其他线程修改过,重新从步骤一开始执行

ReentrantLock 的获取锁过程为例进行说明:无论是初始化是否指定 fair 参数的公平锁或者非公平锁(公平锁时 sync 为内部类 FairSync 实例;非公平锁时 sync 为内部类的 NonfairSync 实例),获取锁的时候都是调用 compareAndSetState(0, 1) (意思为期望当前状态为无锁状态 0 ,期望设置成有锁状态 1)。

但这流程当中有个问题:在高并发环境下,如果在计算要设置的值期间,有其他线程将目标资源从 A 修改为 B,再从 B 重新修改为 A ,是能够通过步骤三的判断的。但资源又确实是被其他线程修改过的。

这时候如果目标资源是基本数据类型,其实并不影响。例如我要 compareAndSetInt(0,1) ,那么代表我只关心初始值为 0 ,设置为 1 的条件,至于在我获取初始值(步骤一)和进行比较(步骤三)过程中发生了什么。并不需要关心。

如果是引用类型的话呢?引用对象没改变,但是对象中的某个属性发生了改变又该如何处理?当然是重写 equals 和 hashCode 方法,在步骤三中调用 equals 进行比较我们关心的属性值。

还有一个问题,如果我们确实需要保证在步骤一和步骤三之间没有被修改过,彻底避免 ABA 问题,能怎么处理?两种方案,一是为目标资源绑定一个 version,在步骤三中对 version 也进行比较;二是使用修改时间戳,同样在步骤三中进行比较。

  • 高并发场景

记得在我第一次梳理 CAS 流程的时候,就在想这难道就没有并发问题了吗?其实单纯从字面意思去理解步骤三的话是会出现并发问题的,也就是在对比了初始值和最新值之后,设置目标资源的目标值之前,是有可能被其他线程修改的?那为什么 CAS 这么普遍被使用呢?

其实在这两个步骤之间操作系统是有一个上锁的动作的,为的就是解决这个并发问题。在比较值和设置值这个操作,也就是 compareAndSet 这个动作发生时,系统会给总线或者缓存上锁,确保在多核环境下,不会被其他线程并发修改。但 CAS 无法解决一个问题是,只能保证保证一个共享变量是线程安全的。

  • 循环时间过长

我们知道 CAS 本质其实就是在一个 for 循环里将当前值和原来值做对比并尝试修改值。但当长时间 CAS 不成功的时候,将会为 CPU 带来巨大压力,特别是多个线程都尝试对一个共享变量进行 CAS 操作。如果 CAS 搭配处理器的 pause 指定一定程度能够缓解这个问题。pause 指令有两个作用:

  1. 可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。
  2. 可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。
Java Runtime Data Area 初探 Nacos

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×