引言
AQS 是AbstractQuenedSynchronizer 的缩写,抽象的队列式同步器,它是除了java自带的synchronized关键字之外的锁机制。是 JUC 下的重要组件。
相关产物有:ReentrantLock、CountDownLatch、Semaphore、ReadWriteLock等。
一、AQS的设计思想
AbstractQuenedSynchronizer 维护了一个 volatile int state 变量,代表共享资源。
若state 是0,代表资源空闲,当前线程将 0 改为 1,表示上锁,当前线程置为工作线程;
若state不为0,代表资源占用,当前线程依然会 acquire() 一个资源,如果恰好是当前的工作线程,那么state 累加,以此描述“重入性”;如果当前线程并不是工作线程,则会被安置在一个由AQS维护的资源等待队列。
AQS队列会让第一个线程Node自旋获取资源,而后面的线程,则通过 LockSupport.park(this) 方法将线程置为 WAITING 状态等待被唤醒。
如果第一个线程获取到了资源,那么就将它设置为队列的 head 节点,原 head 就会被移出队列。
AQS的设计中用到了模板方法模式,不同的资源共享机制如互斥或共享可以由子类自定义实现:
Exclusive:如ReentrantLock、
Share:如信号量、闭锁、读写锁等。
AQS 的另一个特点是自旋+CAS。
在请求资源和入列等操作中,经常会看到 for(;;) 、compareAndSetState、compareAndSetTail等操作,这与synchronized的实现有很大区别。
通过比较并设置的方式,可以有效提高资源获取的效率,但同时也会消耗额外的CPU资源。
二、两种资源访问策略的代表
在AQS中维护了一个 Node 节点,它有两种等待模式,同时也表示资源的两种不同的访问策略:
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
由此,衍生出两类不同的子类实现,第一类是以ReentrantLock为代表的互斥锁,它在语义上与synchronized实现了相同的互斥性和可重入性;另一类是以闭锁CountDownLatch 为代表的线程同步工具。
2.1 ReentrantLock
首先 AQS 中的 state 为 0.
A 线程在执行 lock() 方法后,以独占方式 CAS state 为 1。AQS 会记录 A 线程为当前的独占线程,其他线程如果再尝试获取资源,就会进入等待队列,直到 A 线程调用 unlock() 方法,释放了资源,即 state 回归 0 状态。
在“重入性”方面,如果A线程第二次尝试取锁,state 会累加。也就是说,上锁的次数一定等于释放锁的次数。
2.2 CountDownLatch
CountDownLatch 翻译为 “闭锁”或“门闩”,这是一种非常好用的同步工具,可以延迟线程的进度直到终止状态。
与 ReentrantLock 不同的是,在构造 CountDownLatch 对象的时候,会先设定一个 state 大小:
CountDownLatch latch = new CountDownLatch(3);
这个 3 就是 state 变量的初始值,然后线程使用 countDown() 方法递减这个计数,直到 state = 0,放行所有 waiting 中的线程。
CountDownLatch 维护的 state 表示的是事件数量,当指定数量的事件执行完毕后,就会 unpark() 主调线程,继续后续动作。
在使用CountDownLatch时有一个误区是,state 的值就代表了线程的数量,认为我 state = 3 ,就需要 3 个线程去执行任务,其实,state = 10 也依然可以使用 一个线程去执行,关键要区分事件与并行任务的概念。
三、ReentrantLock 源码
作为补充 synchronized 的锁机制,ReentrantLock 显示锁的功能非常强大,但这里不打算全面分析ReentrantLock的奇技淫巧,而是从 lock() 方法出发,分析一下 AQS 是如何实现资源的锁定和等待队列的维护的。
3.1 acquire
acquire 是 AQS 的顶层入口,他表示获取锁资源。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
Doug Lea 的代码非常简洁,根本没有一句废话,就连代码结构也非常精简,如果是分析源码的话,我们可以尝试去改写一下这个方法,让其可读性更强一些:
public final void acquire(int arg) {if (!tryAcquire(arg)) {Node newWaiter = addWaiter(Node.EXCLUSIVE);boolean needInterrupt = acquireQueued(newWaiter, arg);if (needInterrupt) {selfInterrupt();}}
}
从方法中的一系列方法名和判断逻辑来看。
尝试获取资源,如果成功,则直接返回。如果不成功,addWaiter 添加一个独占模式的等待者,acquireQueued 以排队的方式去获取资源。
3.2 tryAcquire
tryAcquire 在 ReentrantLock 中有两种实现,分别是:
FairSync 中的公平锁实现;
NonfairSync 中的非公平锁实现
当然,公平与非公平并不是重点,就以非公平的实现来看一下,
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
当前线程会 CAS state 0->1,或累加重入,成功返回true,失败返回false。
3.3 addWaiter
在acquire上锁操作失败后,会执行这个方法:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
addWaiter ,添加一个等待者,它只完成一项工作,就是向等待队列中添加一个 Node:
1、将当前线程封装为一个队列 Node;
2、取得队列的尾节点 tail,并CAS 新的节点设置为新的 tail
3、设置新 tail 成功,直接返回
4、若设置新 tail 不成功,或者干脆,原tail 就不存在,执行 enq 方法,自旋操作以上步骤,直到成功。
enq方法是 enqueue 的缩写,意思是“使队列化”,它就是一个 while-true ,如果队列不存在,就创建一个队列,如果队列已经存在,就把 node 放到最后一个:
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
这里就用到了自旋操作,每次自旋都会获取当前的 tail 节点,避免在设置的过程中间被其他线程加塞,却又不知道。
刚进入方法的时候,肯定需要走初始化的逻辑,这会创建一个 空的 Node 节点作为 head,所以由此我们也知道,AQS 队列中的头结点实际上就是一个没有实际意义的功能型节点,里边是没有线程的,真正封装了线程的节点是从第二个节点开始。
总体来看,addWaiter 完全就是一个 do-while 循环,先执行一次 CASTail,失败后循环执行CASTail,直到成功后返回该 node,同时也是新的 tail 节点。
3.4 acquireQueued
在 addWaiter 添加了新的 tail 后,需要做哪些事情呢?acquireQueued!
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
这里需要说明一下,该方法的逻辑兼顾了中断的操作,如果对中断机制不太了解,可以暂时不去理会。
该方法同样是一个 while-true 循环,当且仅当,当前节点是队列中第二个节点(addWaiter中已经很明确,AQS队列中的head 节点就是一个空的 Node),并且 tryAcquire 成功,才会返回。在返回之前,仅做了一些队列的维护工作:设置新的head 节点。
如果没有“当且仅当”,那么执行 park:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}
也就是说,除了第二个节点以后的节点,都要进行 park,即线程切换为 WAITING 状态。
四、AQS acquire 流程
经过了上一节的源码分析,我们已经大概清楚了 lock() 方法调用之后发生的事情,接下来就需要总结一下 acquire 流程步骤,提炼一下 AQS 队列的工作原理:
总结
AQS 使用了大量的 CAS 操作,避免上锁,你在ReentrantLock中看不到一句 synchronized 。
通过CAS 和自旋的配合可以一定程度上提高同步代码的性能。
state 以 volatile 类型修饰,可以在多线程之间提供可见性。
ReentrantLock 和 CountDownLatch 对 state 的访问方式分为独占和共享两种,本文虽然没有解析 CountDownLatch 的源码,但通过上面源码的分析,可以想到其大致实现流程。