AQS源码解析
[toc]
# 写在文章开头
本文将针对AQS源码设计的使用进行深入分析,希望对你有帮助。
# AQS简介
在JUC并发包下,有可重入锁(ReentrantLock)、倒计时门闩(CountDownLatch)等并发流程工具,其底层对于线程资源争抢以及获取执行权都是基于AQS实现的。
AQS的思想也很巧妙,将多线程抽象为一个个节点放到一个CLH双向队列中,争抢到的节点可以修改一个为state的状态值,这个state我们这里暂且可以理解为争抢到资源的标识,只有当前获取执行权的线程将state设置到某个值的时候才能进行临界资源操作,此时其余线程就会感知到这一点进入CLH队列中等待锁释放后的争抢:

可以说AQS在Doug Lea大神的操刀下使用起来非常方便,他把多线程当作一个个节点,实现了线程因争抢不到失败而进入等待队列,以及从等待队列中唤醒线程等细节都实现好了。我们只需要按需实现自己尝试获取锁和释放锁的逻辑即可。
# 基于AQS手写一个可重入锁
# 落地思路
我们希望编写一把可重入锁,他能做到同一个线程可以操作这把锁,例如线程1连续上锁5次,释放的时候也得连续释放5次,只有完全释放干净之后,其他争抢的线程才能操作这把锁。
目前我们初定的逻辑是,在可重入锁的实现一个内部类,这个内部集成AQS重写尝试取锁和尝试释放锁的逻辑。这里可能会有人有疑问,为什么我们只要实现尝试获取锁和尝试释放锁的逻辑呢?
因为AQS的已经为我们内置的如下的抽象逻辑,我们只需按需实现部分的规则尽可能基于AQS写出一款强大的并发工具:
定义了一个CLH双向链表队列,存放线程节点,对应的我们可以在AQS的源码中看到关于链表节点的定义:
static final class Node {
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前线程
volatile Thread thread;
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
内置一个尝试获取锁的逻辑,这个锁用一个int标识即state表示,内容大致为:通过CAS尝试取锁,成功就执行则返回成功标识。取不到就返回失败:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2
3
4
实现一个释放锁的逻辑,尝试释放当前锁,成功了唤醒后继节点并返回true,反之返回false。
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;
//成功后唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
可以看出AQS实现时通过模板方法封装的统一的抽象逻辑,将上锁、释放锁和唤醒CLH队列节点的逻辑都做了统一的封装,我们只要基于这些方法编写对应并发工具的上锁和释放锁的业务逻辑即可。
所以我们实现可重入锁的设计思路为,确保第一个将state设置为1的节点进行无限次重入并累加state的值,而其他节点则阻塞等待直到state因为锁释放变为0时再进行争抢:

# 基于AbstractQueuedSynchronizer实现逻辑抽象
按照上文描述,我们将AbstractQueuedSynchronizer 组合进来构成一个内部类,然后基于这个类实现尝试取锁和释放的抽象逻辑,可以看到这段逻辑本质上都是通过AbstractQueuedSynchronizer 内置的方法编写出来的,我们只需按照自己的逻辑按照可重入的锁实现设置state状态即可:
private class AQSSync extends AbstractQueuedSynchronizer {
/**
* 尝试取锁
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
//获取当前状态值
int state = getState();
//获取当前线程
Thread currentThread = Thread.currentThread();
//若为0说明没有线程拿到这个资源,当前线程可以基于CAS改变状态值,若CAS修改成功则说明这个线程拿到资源了
if (0 == state) {
if (compareAndSetState(0, arg)) {
//设置当前资源拥有者为当前线程
setExclusiveOwnerThread(currentThread);
return true;
}
} else if (getExclusiveOwnerThread() == currentThread) {//不走上述逻辑,走到这里则说明这个资源当前线程之前抢到了,这里又抢了一次,我们再叠加状态值即可
int newState = arg + state;
if (newState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(newState);
return true;
}
return false;
}
/**
* 尝试释放锁
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
//如果进行释放的不是当前线程则抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean flag = false;
int newState = getState() - arg;
//如果state扣减后为0说明当前线程完全释放资源了,其他线程可以开抢了
if (0 == newState) {
//设置资源拥有者为空
setExclusiveOwnerThread(null);
flag = true;
}
setState(newState);
return flag;
}
final Condition newCondition(){
return new ConditionObject();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 基于内置类对外暴露可重入锁
最终代码如下如下可以看到这就模板方法的好处,我们通过lock确定行为,基于AQS作为具体实现细节。那些取锁和释放锁的逻辑用AQS自带的即可。而尝试取锁、释放锁的逻辑用我们自己的实现的即可。
/**
* 自定义可重入锁
*/
public class ReentrantAQSLock implements Lock {
private AQSSync sync = new AQSSync();
@Override
public void lock() {
//调用基于AQS实现好的逻辑即可
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
//调用基于AQS实现好的逻辑即可
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
//调用我们实现的尝试取锁逻辑
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
//调用我们实现的尝试释放锁逻辑
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
private class AQSSync extends AbstractQueuedSynchronizer {
/**
* 尝试取锁
*
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
//获取当前状态值
int state = getState();
//获取当前线程
Thread currentThread = Thread.currentThread();
//若为0说明没有线程拿到这个资源,当前线程可以基于CAS改变状态值,若CAS修改成功则说明这个线程拿到资源了
if (0 == state) {
if (compareAndSetState(0, arg)) {
//设置当前资源拥有者为当前线程
setExclusiveOwnerThread(currentThread);
return true;
}
} else if (getExclusiveOwnerThread() == currentThread) {//不走上述逻辑,走到这里则说明这个资源当前线程之前抢到了,这里又抢了一次,我们再叠加状态值即可
int newState = arg + state;
if (newState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(newState);
return true;
}
return false;
}
/**
* 尝试释放锁
*
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
//如果进行释放的不是当前线程则抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean flag = false;
int newState = getState() - arg;
//如果state扣减后为0说明当前线程完全释放资源了,其他线程可以开抢了
if (0 == newState) {
//设置资源拥有者为空
setExclusiveOwnerThread(null);
flag = true;
}
setState(newState);
return flag;
}
final Condition newCondition() {
return new ConditionObject();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# 测试用例
我们希望使用可重入锁,实现一个线程获取锁两次后,对一个数字自增的逻辑,为了实现并发场景笔者写了下面这段代码
/**
* 可重入锁测试
*/
public class ReentrantAQSLockTest {
private static Logger logger = LoggerFactory.getLogger(ReentrantAQSLockTest.class);
private int count;
private ReentrantAQSLock lock = new ReentrantAQSLock();
public void incrementCount(){
try{
logger.info("尝试取锁");
lock.lock();
logger.info("第1次取锁成功");
lock.lock();
logger.info("第2次取锁成功");
++count;
}finally {
logger.info("尝试锁释放");
lock.unlock();
logger.info("第1次释放锁成功");
lock.unlock();
logger.info("第2次释放锁成功");
}
}
public static void main(String[] args) {
//500 个线程
ExecutorService threadPool = Executors.newFixedThreadPool(500);
ReentrantAQSLockTest reentrantAQSLock=new ReentrantAQSLockTest();
CountDownLatch countDownLatch=new CountDownLatch(1);
for (int i = 0; i < 500; i++) {
threadPool.submit(()->{
try {
//500 个线程全部等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
reentrantAQSLock.incrementCount();
});
}
//扣减倒计时门闩,500个线程同时尝试取锁,自增count
countDownLatch.countDown();
threadPool.shutdown();
while (!threadPool.isTerminated()){
}
logger.info("最终修改结果: "+reentrantAQSLock.count);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
输出结果为500,说明逻辑没有问题。
[main] INFO com.guide.thread.base.ReentrantAQSLockTest - 最终修改结果: 500
# 详解AQS底层数据结构
通过上面手写了一个可重入锁,我们大致对可重入锁有个大致的了解,所以我们就在这里更进一步的了解一下AQS。
我们之前说过,AQS就是一个同步器,把线程当作一个个节点放在一个双向队列里,而这里的资源其实就是state。以上文我们手写的可重入锁,state为0就代表没有线程获取这个资源,所有节点都可以基于CAS争抢。而不为1以及获取到state的线程不为自己,则说明资源被其他人拿了,这些线程都会被添加到双向队列中等待唤醒后进行资源争抢:

注意:这个队列的队首元素是AQS默认创建的一个Node节点,并没有存放实际线程,所以在后续资源争抢中是不参与的,这一点我们可以在上锁失败后第一个进入等待队列的线程所执行的方法enq印证。
可以看到该方法在队列全空的情况下通过CAS设置一个头节点然后才将node设置进去:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //队列全空情况下通过CAS创建头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//然后在第二轮循环将当前节点设置到等待队列中
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# AQS内部队列模式
实际上AQS有两种队列,一种是同步队列,就是上文所描述的。
另一种则是同步条件队列,原理和同步队列差不多,只不过,取锁是还得判断自己是否符合条件,若符合才能争抢资源。
# 状态位变量state
上文我们一直提到state,这里我们就可以展开探讨。我们不妨打开AQS(AbstractQueuedSynchronizer),可以看到这是一个volatile 变量,这就意味着对修改对每个node是保证可见性的。众所周知volatile无法保证原子性,所以我们实现类中对state的操作都是基于CAS的。而且我们在源码中也可以看到某些对于state的操作也是基于CAS实现。
private volatile int state;
protected final boolean compareAndSetState(int expect, int update) {
// 通过CAS修改State的值
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2
3
4
5
6
7
# 节点类Node
接下来就是节点类了,它用于记录每个参与资源挣钱的线程的信息,其核心源码如下,可以看到每个Node都会记录自己前后节点以及自己是那条线程、以及等待状态、是否是独占模式等。
static final class Node {
//当前节点等待状态
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前节点记录的线程
volatile Thread thread;
//下一个等待者
Node nextWaiter;
//设定为共享模式,意味着多线程可以使用同一个资源
static final Node SHARED = new Node();
//设置为独占模式
static final Node EXCLUSIVE = null;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
上文提到了waitStatus,其实设计者也为它设置了几种规定状态
//表示当前线程已被取消
static final int CANCELLED = 1;
/** 表示后继的节点等待唤醒*/
static final int SIGNAL = -1;
/**表示当前线程等待某个条件被唤醒*/
static final int CONDITION = -2;
/**
* 表示当前场景可以执行后续尝试获取共享锁的逻辑
*/
static final int PROPAGATE = -3;
2
3
4
5
6
7
8
9
10
# 独占锁模式源码
# 取锁的逻辑
我们就基于我们手写的可重入锁了解一下独占锁实现细节,首先我们的代码入口为lock()
public void incrementCount(){
try{
....
lock.lock();
......
}finally {
.....
}
}
2
3
4
5
6
7
8
9
查看lock内部逻辑,不过是调用了我们的aqs的acquire方法:
@Override
public void lock() {
//调用基于AQS实现好的逻辑即可
sync.acquire(1);
}
2
3
4
5
步入其内部,从角度语义我们可以知道,这里首先会进行尝试取锁,若不成功则放到等待队列中,并且打断当前线程。
public final void acquire(int arg) {
//尝试取锁,若失败则放到等待队列中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//......
}
2
3
4
5
6
了解了核心逻辑后,我们就开始了解每个细节,笔者会从尝试取锁细节、放到等待队列细节、设置为独占锁细节、打断线程细节逐个讨论。
首先是尝试取锁的逻辑,实际上这就是我们重写的逻辑
/**
* 尝试取锁
*
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
//获取当前状态值
int state = getState();
//获取当前线程
Thread currentThread = Thread.currentThread();
//若为0说明没有线程拿到这个资源,当前线程可以基于CAS改变状态值,若CAS修改成功则说明这个线程拿到资源了
if (0 == state) {
if (compareAndSetState(0, arg)) {
//设置当前资源拥有者为当前线程
setExclusiveOwnerThread(currentThread);
return true;
}
} else if (getExclusiveOwnerThread() == currentThread) {//不走上述逻辑,走到这里则说明这个资源当前线程之前抢到了,这里又抢了一次,我们再叠加状态值即可
int newState = arg + state;
if (newState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(newState);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
接下来就是取不到锁,放到等待队列的逻辑,代码如下
private Node addWaiter(Node mode) {
//为当前线程创建一个node节点
Node node = new Node(Thread.currentThread(), mode);
// 获取队尾节点,通过CAS讲当前节点添加到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//若是第一次入队,则会走到该逻辑进行入队操作
enq(node);
return node;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 释放锁的逻辑
入口为笔者实现的unlock,其内部同样用到的AQS的release方法:
@Override
public void unlock() {
//调用我们实现的尝试释放锁逻辑
sync.release(1);
}
2
3
4
5
我们看看核心逻辑,尝试释放锁,若成功则获取头节点,若头节点不为空且不为0(0代表在等待队列中等待取锁),则调用unparkSuccessor尝试唤醒一个后继节点:
public final boolean release(int arg) {
//尝试解锁即通过CAS修改state成功
if (tryRelease(arg)) {
Node h = head;
//从头节点开始找到后继节点将其唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
了解核心逻辑后,我们展开聊聊尝试释放后的unparkSuccessor唤醒节点的逻辑,比较简单,若后继节点为空或者后继节点状态被取消则从尾节点开始倒叙遍历找到非取消的节点将其唤醒,反之直接将头节点的后继节点线程直接唤醒:
private void unparkSuccessor(Node node) {
//......
//获取头节点的后继节点,若为空或者大于0(上文提到取消状态CANCELLED = 1),说明这个节点被取消了,那么我们就需要从后向前找到可以被唤醒的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//如果节点不为空则唤醒节点
LockSupport.unpark(s.thread);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 共享锁模式
# 取锁
代码如下所示,尝试取锁成功就进行doAcquireShared,我们不妨看看doAcquireShared做了什么
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
2
3
4
doAcquireShared逻辑则是判断当前节点的前驱是否为head,若是则获取资源,若成功则设置当前节点为队首,并看看资源还有没有剩下若有则通知其他线程可以继续获取:
private void doAcquireShared(int arg) {
//节点设置为共享锁
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//循环直到定位到头节点
if (p == head) {
//尝试取锁
int r = tryAcquireShared(arg);
if (r >= 0) {//若成功则将节点放到head,并将状态设置为propagate(表示可以取共享锁)
setHeadAndPropagate(node, r);
//辅助gc回收这个p节点
p.next = null; // help GC
//如果这个线程需要打断,则打断
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//如果因为获取资源失败,则判断p是否是SIGNAL状态,若是则直接打断,并设置 interrupted = true告知后节点也要一起等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 释放锁
同样的入口在releaseShared,或尝试释放从成功,则调用doReleaseShared:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2
3
4
5
6
7
doReleaseShared逻辑很简单,从队首开始遍历,将SIGNAL状态的节点设置为0意为可唤醒后续节点获取锁,然后调用unparkSuccessor通知队列中所有线程将其调用LockSupport.unpark唤醒抢锁:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//将头节点设置为0成功后尝试唤醒后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//......
}
if (h == head) // loop if head changed
break;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 相关面试题
# AQS是什么?
AQS全名AbstractQueuedSynchronizer即抽象队列同步器,用于实现多线程之间资源管理和调度的一个抽象类,类似ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask都是基于AQS实现的。
# AQS原理分析
AQS将线程当作一个个节点Node,存到CLH队列中。抢到资源的线程就会被设置为占有资源的线程,并且可以操作一个线程共有可见参数state,所有线程都会CAS查看这个状态的值从而判断资源是否被占有线程释放进而决定是否争抢。
# AQS对资源的两种共享方式
独占式(Exclusive):独占以为着某个时间段资源只能被一个线程持有。并且独占式锁争抢规则还分为公平和非公平两种,公平锁则是占有锁的线程释放锁后,根据队列顺序获取资源,非公平锁则是无视队列顺序所有线程集体争抢资源。共享式:单位时间内,多个线程可以获取资源,例如:CountDownLatch、Semaphore、CyclicBarrier、ReadWriteLock等。
# 小结
至此,我们对 AQS 的源码解析之旅已接近尾声。在这趟探索中,我们深入了解了 AQS 的核心架构与关键机制。AQS 作为 Java 并发框架的基础,通过一个 FIFO 队列来管理等待获取资源的线程,以 state 变量表示同步状态,精妙地实现了资源的同步控制。
我是 sharkchili ,CSDN Java 领域博客专家,mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 参考
深入理解高并发编程:https://book.douban.com/subject/35928998/?icn=index-latestbook-subject (opens new window)
Java 并发常见面试题总结(下):https://javaguide.cn/java/concurrent/java-concurrent-questions-03.html#项目相关 (opens new window)
AQS中那些waitStatus(一) :https://blog.csdn.net/weixin_39313241/article/details/114954769 (opens new window)