缘起
每门编程语言基本都离不开并发问题,Java亦如此。谈到Java的并发就离不开Doug lea老爷子贡献的juc包,而AQS又是juc里面的佼佼者
因此今天就一起来聊聊AQS
概念
AQS是什么,这里借用官方的话
Provides a framework for implementing blocking locks and related synchronizers that rely on first-in-first-out wait queues
AQS的全程是AbstractQueuedSynchronizer,在这里咱们进行咬文嚼字一下。
Abstract:这是AQS采用模板设计模式的基础,AQS中将定义了大部分同步的流程,仅将加解锁的操作留给子类根据需求进行自定义(这也就是为什么使用AQS可以快速开发锁或者同步器的主要原因)
Queued:这里指的是CLH队列;当共享资源被占用时,就需要一套线程阻塞等待以及被唤醒时锁分配机制,AQS通过CLH队列来实现
CLH是一个虚拟的双向链表实现的队列,获取不到锁资源时会被AQS封装成Node节点并加入队列。每个线程执行结束都会唤醒其下一个节点
Synchronizer:同步控制,AQS的同步控制实现有两种。一种是volatile+CAS的乐观锁设计,另一种是LockSupport+CAS的悲观锁设计
切入点
- 模板方法设计
- 可重入设计
- CLH队列设计
- 公平/非公平锁设计
模板方法设计
我们来看看AQS的设计流程
从图中可看到,AQS设计并且实现了一个同步器/锁的完整流程;
但是将tryAcquire/tryAcquireShared和tryRelease/tryReleaseShared这些经常改动的操作设置为抽象方法,留给子类自行拓展
acquire方法剖析
这个方法采用门面设计模式,将CAS获取锁,封装线程以及添加CLH队列的操作封装成一个方法并对外提供
我们常用的ReentrantLock.lock方法实际上就是调用此方法
public final void acquire(int arg) {// 尝试通过CAS获取锁,若获取成功则执行同步代码块// 获取失败则通过addWaiter将当前线程封装成AQS的Node节点并加入CLH队列,同时中断当前线程if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
addWaiter方法剖析
Node是AQS的内部类,Node是组成CLH队列的节点
申请公平/非公平锁失败都会被加入CLH队列
private Node addWaiter(Node mode) {// 1. 将当前线程跟Node关联起来,方便AQS根据队列顺序唤醒获取锁Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 2. 追加新建节点到双向链表尾if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 3. 初始化链表enq(node);return node;
}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
acquireQueued方法剖析
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 1. 获得当前Node的前驱节点,也就是当前任务的前一个任务final Node p = node.predecessor();// 2. 如果前一个任务是head则尝试通过CAS来获取锁if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//3. 通过UNSAGE.park来阻塞当前线程来等待许可if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
小结
通过这小节我们可以看到AQS是如何通过模板方法设计模式大大简化了同步器/锁开发的
可重入设计
ReentrantLock是可重入锁的典型设计,在这里就基于它进行分析
ReentrantLock的锁是组合设计,通过内部类Sync、NonfairSync和FairSync来实现
这里看看非公平锁NonfairSync的实现
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//1. 如果当前锁空闲,则获取锁并设置锁的Owner为当前线程if (c == 0) { if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 2. 如果当前锁已经被获取,则判断锁的Owner是否是当前线程else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}protected final boolean tryRelease(int releases) {// 1. 进行锁释放时对state进行减法,由于加锁和释放锁的操作都是配对出现的。那么重入2次时状态为3,那么释放锁的时候也会释放3次直到状态state为0时才释放锁int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;// 2. 释放锁的时候先将锁的Owner置空setExclusiveOwnerThread(null);}// 3. 释放锁setState(c);return free;
}
锁Owner实现
AQS的锁Owner是通过继承父类AbstractOwnableSynchronizer来实现的
AbstractOwnableSynchronizer只有一个成员属性exclusiveOwnerThread,用来标识独占锁场景下是哪个线程持有锁,方便可重入判断
public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {private transient Thread exclusiveOwnerThread;protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
}
小结
通过分析我们可以看到,AQS的可重入设计是通过父类AbstractOwnableSynchronizer+volatile修饰的state作为计数器来实现的
CLH队列设计
当共享资源被占用时,就需要一套线程阻塞等待以及被唤醒时锁分配机制,AQS通过CLH队列来实现
CLH队列保证了锁可以高效进行分配,避免了无意义的唤醒阻塞未获得锁线程
CLH的入口在于AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,它是一个虚拟的双向链表,addWaiter会将请求锁的线程封装成Node节点,Node节点中通过存储前后序节点来维护双向队列
源码剖析
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 1. 获取前序节点final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)return true;// 将当前节点所有直接前驱节点从CLH队列中移除if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}private final boolean parkAndCheckInterrupt() {//请求锁的线程加入CLH队列中,通过调用LockSupport、UNSAFE来进行线程的阻塞,直至CLH链表中上一个节点释放锁后才会主动唤醒LockSupport.park(this);return Thread.interrupted();
}
LockSupport源码实现
public static void park(Object blocker) {Thread t = Thread.currentThread();// 这是做什么用的?setBlocker(t, blocker);// 最终调用UNSAFE来执行UNSAFE.park(false, 0L);setBlocker(t, null);
}// 在UNSAFE中显示这是个本地方法,是由C++来实现的
public native void park(boolean var1, long var2);// C++通过给当前线程添加一个互斥量0,每次该线程获得CPU时间片都会判断该互斥量,为0则将当前线程切换为阻塞状态
// 直到其他线程通过unpark来将此互斥量修改为1,该方法才会继续往下执行
小结
CLH队列给AQS维护了一个高效率的锁分配/释放的基础
公平/非公平锁设计
JDK中公平锁和非公平锁的具体实现分别是FairSync和NonfairSync
FairSync的获取锁流程
NonfairSync的获取锁流程
小结
通过分析可看到公平锁和非公平锁的获取锁/释放锁逻辑几乎一致,这都要归功于AQS的模板方法设计模式
两者不同的地方在于获取公平锁的请求会直接加入到CLH队列中等待锁,获取非公平锁的请求都会尝试直接获取锁,获取失败再加入CLH队列进行等待
非公平锁的性能相对而言更好,一般也是首选,但是会存在锁饥饿的现象;公平锁可以有效解决锁饥饿的问题,但性能相对而言会差一些
总结
- 在AQS中可以看到不少优秀的设计,这都要归功于Doug Lea老爷子;除了AQS,在juc里还有很多优秀的设计,如并发性能最好的字典ConcurrentHashMap、无锁高性能队列ConcurrentLinkedQueue等等
在品完源码后,你会发现其设计思想丝毫不逊色于各个大数据组件 - “曾经想征服全世界,到最后回头才发现,这世界点点滴滴全部都是你” ——致JDK