🏷️个人主页:牵着猫散步的鼠鼠
🏷️系列专栏:Java全栈-专栏
🏷️个人学习笔记,若有缺误,欢迎评论区指正
目录
1. 前言
2. AOS、AQS、AQLS的区别
3. AQS的底层原理
3.1. 核心思想
3.2. 数据结构
3.2.1. state
3.2.2 CLH 双向队列
3.3. 资源共享
3.3.1. 独占模式
3.3.2. 共享模式
3.4 Node节点
3.5. 获取资源与释放资源
3.5.1. 获取资源
3.5.2. 释放资源
4.总结
1. 前言
AQS 的全称为 AbstractQueuedSynchronizer ,翻译过来的意思就是抽象队列同步器。这个类在 java.util.concurrent.locks 包下面。AQS为Java的并发包提供了强大的同步支持。通过内置的FIFO队列来完成资源获取线程的排队工作,并且利用一个被volatile关键字修饰的int类型的变量表示同步状态。AQS 为构建锁和同步器提供了一些通用功能的实现,许多同步类实现都依赖于它,如常用的ReentrantLock、Semaphore等。
2. AOS、AQS、AQLS的区别
AOS(AbstractOwnableSynchronizer) : JDK1.6时发布的,是AQS和AQLS的父类,这个类的主要作用是表示持有者与锁之间的关系。
AQS(AbstractQueuedSynchronizer) :JDK1.5时发布,抽象队列同步器,是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器,诸如:ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue等等皆是基于 AQS 的。AQS 内部使用了一个 volatile 的变量 state(int类型) 来作为资源的标识。
AQLS(AbstractQueuedLongSynchronizer) :这个类诞生于JDK1.6,原因时上述的int类型的state资源,在当下的业务场景中,资源数量有可能超过int范围,因此,便诞生了这个类,采用Long类型的state。
//AQS中共享变量,使用volatile修饰保证线程可见性
private volatile int state;//AQLS中共享变量,采用long类型
private volatile long state;
3. AQS的底层原理
以上我们大致的介绍了一下AQS是什么,其实在很多面试中都会提及AQS,可能被问到最多的就是:“麻烦介绍一下AQS的底层原理?”,很多同学都浅尝辄止,今天我就来总结下AQS的底层实现
3.1. 核心思想
核心思想大概如下:在多线程访问共享资源时,若标识的共享资源空闲,则将当前获取到共享资源的线程设置为有效工作线程,共享资源设置为锁定状态(独占模式下),其他线程没有获取到资源的线程进入阻塞队列,等待当前线程释放资源后,继续尝试获取。
3.2. 数据结构
其实AQS的实现主要基于两个内容,分别是 state 变量和基于FIFO结构的 CLH 双向队列
3.2.1. state
state 变量由 volatile 修饰,保证了多线程环境下的可见性,用于展示当前临界资源的获锁情况。
/*** The synchronization state.*/private volatile int state;
AQS内部还提供了获取和修改state的方法,注意,这里的方法都是final修饰的,意味着不能被子类重写!
// 获取stateprotected final int getState() {return state;}// 修改stateprotected final void setState(int newState) {state = newState;}// Cas操作修改stateprotected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
3.2.2 CLH 双向队列
我们在上面提到了独占模式下,没有获取资源的线程会被放入队列,然后阻塞、唤醒、锁的重分配机制,就是基于CLH实现的。CLH 锁 (Craig, Landin, and Hagersten locks)是一种自旋锁的改进,是一个虚拟的双向队列,所谓虚拟是指没有队列的实例,内部仅存各结点之间的关联关系。
AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个节点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。
3.3. 资源共享
在AQS的框架中对于资源的获取有两种方式:
- 独占模式(Exclusive) :资源是独有的,每次只能一个线程获取,如ReentrantLock;
- 共享模式(Share) :资源可同时被多个线程获取,具体可获取个数可通过参数设定,如CountDownLatch(我们后续也会出这个的工具类的源码解读),Semaphore信号量(上篇文章讲了)。
3.3.1. 独占模式
以ReentrantLock为例,其内部维护了一个state字段,用来标识锁的占用状态,初始值为0,当线程1调用lock()方法时,会尝试通过tryAcquire()方法(钩子方法)独占该锁,并将state值设置为1,如果方法返回值为true表示成功,false表示失败,失败后线程1被放入等待队列中(CLH队列),直到其他线程释放该锁。
但需要注意的是,在线程1获取到锁后,在释放锁之前,自身可以多次获取该锁,每获取一次state加1,这就是锁的可重入性,这也说明ReentrantLock是可重入锁,在多次获取锁后,释放时要释放相同的次数,这样才能保证最终state为0,让锁恢复到未锁定状态,其他线程去尝试获取!
3.3.2. 共享模式
CountDownLatch(倒计时器)就是基于AQS共享模式实现的同步类,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程开始执行任务,每执行完一个子线程,就调用一次 countDown() 方法。该方法会尝试使用 CAS(Compare and Swap) 操作,让 state 的值减少 1。当所有的子线程都执行完毕后(即 state 的值变为 0),CountDownLatch 会调用 unpark() 方法,唤醒主线程。这时,主线程就可以从 await() 方法(CountDownLatch 中的await() 方法而非 AQS 中的)返回,继续执行后续的操作。
Semaphore信号量也是基于基于AQS共享模式实现的同步类,也是通过CAS操作维护AQS内部的state值来记录许可证数,详细可见博文Semaphore信号量源码解读与使用-CSDN博客
3.4 Node节点
上述的两种共享模式、线程的引用、前驱节点、后继节点等都存储在Node对象中,我们接下来就走进Node的源码中一探究竟!
static final class Node {// 标记一个结点(对应的线程)在共享模式下等待static final Node SHARED = new Node();// 标记一个结点(对应的线程)在独占模式下等待static final Node EXCLUSIVE = null;// waitStatus的常量值,表示该结点(对应的线程)已被取消static final int CANCELLED = 1;// waitStatus的常量值,表示后继结点(对应的线程)需要被唤醒static final int SIGNAL = -1;// waitStatus的常量值,表示该结点(对应的线程)在等待某一条件static final int CONDITION = -2;/*waitStatus的常量值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/static final int PROPAGATE = -3;// 等待状态,取值范围,-3,-2,-1,0,1volatile int waitStatus;volatile Node prev; // 前驱结点volatile Node next; // 后继结点volatile Thread thread; // 结点对应的线程Node nextWaiter; // 等待队列里下一个等待条件的结点// 判断共享模式的方法final boolean isShared() {return nextWaiter == SHARED;}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}// 其它方法忽略,可以参考具体的源码
}// AQS里面的addWaiter私有方法
private Node addWaiter(Node mode) {// 使用了Node的这个构造函数Node node = new Node(Thread.currentThread(), mode);// 其它代码省略
}
其中我们需要着重注意waitStatus这个变量,他可取以下五个值
- CANCELLED:这个变量表示当前节点(对应的线程)已被取消。当等待超时或被中断,会触发进入为此状态,进入该状态后节点状态不再变化;
- SIGNAL: 后面节点等待当前节点唤醒;
- CONDITION: Condition 中使用,当前线程阻塞在Condition,如果其他线程调用了Condition的signal方法,这个节点将从等待队列转移到同步队列队尾,等待获取同步锁;
- PROPAGATE: 共享模式,前置节点唤醒后面节点后,唤醒操作无条件传播下去;
- 0:中间状态,当前节点后面的节点已经唤醒,但是当前节点线程还没有执行完成。
3.5. 获取资源与释放资源
有了以上的知识积累后,我们再来看一下AQS中关于获取资源和释放资源的实现吧。
3.5.1. 获取资源
在AQS中获取资源的是入口是acquire(int arg)方法,arg 是要获取的资源个数,在独占模式下始终为 1。
public final void accquire(int arg) {// tryAcquire 再次尝试获取锁资源,如果尝试成功,返回true,尝试失败返回falseif (!tryAcquire(arg) &&// 走到这,代表获取锁资源失败,需要将当前线程封装成一个Node,追加到AQS的队列中//并将节点设置为独占模式下等待acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 线程中断selfInterrupt();
}
tryAcquire()是一个可被子类具体实现的钩子方法,用以在独占模式下获取锁资源,如果获取失败,则把线程封装为Node节点,存入等待队列中,实现方法是addWaiter(),我们继续跟入源码去看看。
private Node addWaiter(Node mode) {//创建 Node 类,并且设置 thread 为当前线程,设置为排它锁Node node = new Node(Thread.currentThread(), mode);// 获取 AQS 中队列的尾部节点Node pred = tail;// 如果 tail == null,说明是空队列,// 不为 null,说明现在队列中有数据,if (pred != null) {// 将当前节点的 prev 指向刚才的尾部节点,那么当前节点应该设置为尾部节点node.prev = pred;// CAS 将 tail 节点设置为当前节点if (compareAndSetTail(pred, node)) {// 将之前尾节点的 next 设置为当前节点pred.next = node;// 返回当前节点return node;}}enq(node);return node;
}// 自旋CAS插入等待队列
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
在这部分源码中,将获取资源失败的线程封装后的Node节点存入队列尾部,考虑到多线程情况下的节点插入问题,这里提供了CAS的方式保证插入操作的原子性,通过自旋方式(一直循环,不让线程挂起,减少线程状态切换带来的开销)来减少性能损耗。
等待队列中的所有线程,依旧从头结点开始,一个个的尝试去获取共享资源,这部分的实现可以看acquireQueued()方法,我们继续跟入。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// interrupted用于记录线程是否被中断过boolean interrupted = false;for (;;) { // 自旋操作// 获取当前节点的前驱节点final Node p = node.predecessor();// 如果前驱节点是head节点,并且尝试获取同步状态成功if (p == head && tryAcquire(arg)) {// 设置当前节点为head节点setHead(node);// 前驱节点的next引用设为null,这时节点被独立,垃圾回收器回收该节点p.next = null; // 获取同步状态成功,将failed设为falsefailed = false;// 返回线程是否被中断过return interrupted;}// 如果应该让当前线程阻塞并且线程在阻塞时被中断,则将interrupted设为trueif (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {// 如果获取同步状态失败,取消尝试获取同步状态if (failed)cancelAcquire(node);}
}
在这个方法中,从等待队列的head节点开始,循环向后尝试获取资源,获取失败则继续阻塞,头结点若获取资源成功,则将后继结点设置为头结点,原头结点从队列中回收掉 ,这里我们就可以发现堵塞队列中线程中的唤醒是有序的(上次面试被问到是有序还是无序,好在这一块记忆比较深刻)
如果是无序唤醒,会引起羊群效应,多个线程在竞争state资源,竞争锁的过程中,就会造成资源的浪费。
3.5.2. 释放资源
相对于获取资源,AQS中的资源释放就简单多啦,我们直接上源码!
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}private void unparkSuccessor(Node node) {// 如果状态是负数,尝试把它设置为0int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 得到头结点的后继结点head.nextNode s = node.next;// 如果这个后继结点为空或者状态大于0// 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消(只有 Node.CANCELLED(=1) 这一种状态大于0)if (s == null || s.waitStatus > 0) {s = null;// 从尾部开始倒着寻找第一个还未取消的节点(真正的后继者)for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果后继结点不为空,if (s != null)LockSupport.unpark(s.thread);
}
这里的tryRelease(arg)通过是个钩子方法,需要子类自己去实现,比如在ReentrantLock中的实现,会去做state的减少操作int c = getState() - releases;,毕竟这是一个可重入锁,直到state的值减少为0,表示锁释放完毕!
接下来会检查队列的头结点。如果头结点存在并且waitStatus不为0,这意味着还有线程在等待,那么会调用unparkSuccessor(Node h)方法来唤醒后续等待的线程。
4.总结
AQS (AbstractQueuedSynchronizer) 是抽象队列同步器,是Java并发包的根基,像Java的Lock,信号量(Semaphore)都用到了AQS。
AQS提供了独占模式和共享模式两种方式获取共享资源,其中ReentrantLock就用到独占模式实现了排他锁,CountDownLatch和Semaphore都用到了AQS的共享模式实现了共享锁。
AQS内部通过维护一个 volatile 修饰的state变量作为竞态条件,以及一个基于FiFO结构的CLH 双向队列队列来存放需要等待获取锁的线程。 多个线程通过对state这个变量修改来实现竞态条件,竞争失败的线程会加入等待队列并阻塞,当锁释放后会有序唤醒队列中的线程进行锁竞争。
我们在后续可能会基于AQS实现一个同步器,将AQS用起来,感兴趣的话就多多支持吧