并发的学习与使用系列 第八篇

AQS是AbstractQueuedSynchronizer的简称,是并发组件包java.util.concurrent也就是JUC(Java Util Concurrency)的核心,以及很多并发组件如前面几篇中介绍到的ReentrantLock,Condition,BlockingQueue以及线程池里使用的worker等都是基于其实现的,将很多复杂的,繁琐的并发控制过程封装起来,便于其他并发工具类来实现更多,方便的功能,其主要通过volatile和UnSafe类的原子操作(Atomic相关)来实现阻塞和同步,之前的文章并发的学习与使用系列提到里Lock的实现类ReentrantLock是一个可重入的,可实现公平的锁。下面通过ReentrantLock的源码来看看其是怎么实现的。

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是一个抽象类,其他类主要通过重载其tryAcquire(int arg)来获取锁通过tryRealese()来释放锁。

非公平锁获取锁的过程

ReentrantLock的默认构造函数

1
2
3
4
5
6
7
8
9
10
11
public ReentrantLock() {
sync = new NonfairSync();
}
static final class NonfairSync extends Sync {
...
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}

Sync是ReentrantLock实现公平与非公平锁的主要实现,所以默认请况下ReentrantLock是个非公平锁。一般通过ReentrantLock.lock()来获取锁,其实现是在Sync中完成的。下面是先以非公平锁的实现方式来分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
NoFairSync类
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
AQS类
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

compareAndSetState(0, 1)是一个原子操作,其代表的是如果原来的值是0那就将其设为1,并且返回ture。那这个原来的值是指的谁的值呢?从compareAndSetState中并看不出来。那就从整体来看,在AQS中有个表示当前锁的状态的int值state,当state等于0时,表示锁可用,否则表示锁定状态,是否可用还需考虑其他情况如可重入性。

1
private volatile int state;

可以想到compareAndSetState(0, 1)应该就是设置这个state的状态,其实现原理是通过Unsafe类可直接操作内存的特性来实现的。

1
2
3
4
5
private static final Unsafe unsafe =Unsafe.getUnsafe();
private static final long stateOffset;
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));

所以这样就很好理解了,通过Unsafe直接得到state的内存地址然后直接操作内存,因为直接可以控制内存值,这也是Unsafe类名字的来源,后面还会介绍其改进。

在回来分析lock(),如果设置状态成功,也就是成功获取了锁,接下来是

1
2
3
4
setExclusiveOwnerThread(Thread.currentThread());
AQS类中
exclusiveOwnerThread = thread;

表示当前exclusiveOwnerThread占据着该锁,可重入性的实现就与其有关,后面介绍。这时就可以直接执行lock()后面的程序了。

如果获取锁失败进入AQS的acquire(int arg);

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire是在NonfairSync中实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
NonfairSync:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这个过程是先去判断锁的状态是否为可用,如果锁已被持有,则再判断持有锁的线程是否未当前线程,如果是则将锁的持有递增,这也是java层实现可重入性的原理。如果再次失败,则进入等待队列。

通过类Node来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

AQS的线程阻塞队列是个双向队列,提供了FIFO先来先服务的公平性,用head节点表示队首,tail表示队尾。

  1. 节点Node维护一个volatile状态,维护一个prev指针指向前一个节点,根据前一个节点的状态来判断是否可获取锁

  2. 当线程释放锁时,只需要修改自身状态即可,后续节点会观察到volatile状态的改动而获取锁

接着看下获取锁失败后进入队列的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
//将新的节点加入到队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

未获取到锁将新建一个Node节点,然后将其加入到队尾,此时并未有将其阻塞,在acquireQueued中将再次尝试获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//判断前一个节点是否为头节点并且成功过去了锁,
如果是将当前节点设为头结点,也就是说明队列的头
结点就是当前获取锁的线程,可以看出一个节点是否
能获取锁只和他前面的节点有关
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果其前节点当前属于等待被唤醒的状态,返回值代表可以将当前节点阻塞。
return true;
if (ws > 0) {
//如果其前节点已取消,则向前继续找知道找到状态不是CANCELLED的作为新的前节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前节点状态是0(不要和AQS的锁的状态state弄混),或者其他状态,先将其前节点置为Node.SIGNAL,此时不阻塞,待下次循环中确认
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//如果shouldParkAfterFailedAcquire()返回false,及当且只有其前节点pred状态为Node.SIGNAL时,将当前节点node阻塞
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.java类:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
//这里记录线程阻塞在的对象,便于调试用
setBlocker(t, blocker);
//此步骤为将当前线程阻塞
unsafe.park(false, 0L);
setBlocker(t, null);
}

jdk中unsafe.park调用native方法将线程阻塞,而在Android sdk中有不同的实现方式,后续介绍。

以上就是ReentrantLock的非公平锁调用lock()过程,首先去尝试改变AQS设为state的状态,改变成功就获取了锁,失败后再次通过判断当前的state是否为0,即未锁定状态,再次尝试改变state状态获取锁,如果state不为0,即锁已经被其他线程持有,则判断当前线程是不是已经持有该锁,如果是,则获取锁成功,且锁的次数增加。否则加入到Node队列,加入队列后在在for循环中通过判断其前节点的状态来决定是否需要阻塞,可以看出在加入队列前及阻塞前多次尝试去获取锁,而避免进入线程阻塞,这是因为阻塞、唤醒都需要cpu的调度,以及上下文切换,这是个重量级的操作,应尽量避免

公平锁获取锁的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
FairSync类:
final void lock() {
//先去判断锁的状态,而不是直接去获取
acquire(1);
}
AQS类:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
FairSync类:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//hasQueuedPredecessors判断是否有前节点,如果有就不会尝试去获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

看下实现过程,和非公平锁很相似,主要差别lock()的时候不是直接去获取锁,而是先看锁是否可用并且没有前节点,有前节点的话,即使锁是空闲也不会获取锁。

释放锁的过程

公平锁和非公平锁的释放过程是一样的,其实现都是在Sync父类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

代码很好理解,就是先去改变AQS的中代表锁状态的state值,改变后如果state为0,说明没有线程持有该锁,因为是可重入的,所以如果之前一个线程多次获取该锁,也需要释放多次。

锁释放后并没有唤醒之前阻塞的线程,所以还需要后续的唤醒操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void unparkSuccessor(Node node) {
//改变头结点的值,对唤醒后续节点没影响.
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
LockSupport类:
public static void unpark(Thread thread) {
if (thread != null)
unsafe.unpark(thread);
}

从lock()的过程可知,Head节点就是当前持有锁的线程节点,当释放锁时,从头结点的next来看,头结点的下一个节点如果不为null,且waitStatus不大于0,则跳过判断,否则从队尾向前找到最前的一个waitStatus的节点,然后通过LockSupport.unpark(s.thread)唤醒该节点线程。可以看出ReentrantLock的非公平锁只是在获取锁的时候是非公平的,如果进入到等待队列后,在head节点的线程unlock()时,会按照进入的顺序来得到唤醒,保证了队列的FIFO的特性。

await,signal实现线程间协作

显示锁(Lock)及Condition的学习与使用提到过ReentrantLock里有个函数newCondition(),该函数得到一个锁上的”条件”,用于实现线程间的通信,Condition拥有await(),signal(),signalAll(),await对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。synchronized的wait()就是让出对象锁让其他线程持有,如果有多个线程wait,那么当调用

AQS总结

ReentrantLock和synchronized的选择

可以看出无论是获取锁还是释放锁的时候锁最多只有两个线程在竞争,而其他情况下,阻塞的线程不会被唤醒参与竞争,所以性能较高,因为阻塞和唤醒都是需要上下文切换,同时多个线的程竞争都会使CPU占用率升高,降低吞吐量。内置锁在最初的jdk版本中会有这个情况,但后续逐渐优化,所以选择ReentrantLock和synchronized的条件不应该是主要原因,而是应该考虑锁是否需要公平性,是否需要可中断,可共享等来作为选择依据。

有趣的Unsafe类

为什么叫Unsafe类呢?也就是不安全的,因为Unsafe类可以直接操作内存,这对java安全性是一个隐患,据说后续会逐渐改变其实现方式,而在Android的sdk-23中,Unsafe类已经有些改变了,Thread也和jdk的不同,之前通过unsafe.park()的native方法来阻塞一个线程,unsafe.unpark()来唤醒,而在Android中unsafe.park()会调用Thread的park方法,而Thread的park其实又会调用其wait(),而我们知道在使用wait之前必须要先持有这个对象的锁,即先用synchronized获取线程锁,然后在进行等待,这下就清晰了,就和普通的对象的wait()一样,然后unsafe.unpark()最终会调用到notify()方法。

所以源码一直实在变化的,尤其Android sdk和jdk也有一些区别,但掌握其思想就能很好的理解并掌握新的变化。这也是这一年来在做的,放下浮躁的心态,扎实基础,掌握原理。