本文将介绍AQS的简单原理。
首先有个整体认识,全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。常用的ReentrantLock、Semaphore、CountDownLatch等都有实现它。
本文参考:
深入理解AbstractQueuedSynchronizer只需15张图_abstractqueuedsynchronizer流程图-CSDN博客
黑马程序员深入学习Java并发编程,JUC并发编程全套教程_哔哩哔哩_bilibili
从ReentrantLock的实现看AQS的原理及应用 - 美团技术团队 (meituan.com)
Java AQS 核心数据结构-CLH 锁 (qq.com)
AQS关键组成部分
state
state用来表示资源的状态(独占模式和共享模式),子类需要控制这个变量,就能够获取锁或者释放锁。
独占模式:只允许一个线程访问资源。
共享模式:允许多个线程访问资源。
CLH队列
这个队列保存的是没有获得锁资源进入阻塞的线程。使用的是头节点和尾结点来创建出这个双向队列。这个队列是先进先出的,不支持优先级。这里的CLH队列是AQS对于CLH锁的升级改后的应用。
该队列是对自旋锁的改进。那么自旋锁有上面缺点呢?
自旋锁
public class SpinLock {private AtomicReference<Thread> owner = new AtomicReference<Thread>();public void lock() {Thread currentThread = Thread.currentThread();// 如果锁未被占用,则设置当前线程为锁的拥有者// 这里就是自旋锁的核心部分while (!owner.compareAndSet(null, currentThread)) {}}public void unlock() {Thread currentThread = Thread.currentThread();// 只有锁的拥有者才能释放锁owner.compareAndSet(currentThread, null);}
}
第一个是锁饥饿问题。在锁竞争激烈的情况下,可能存在一个线程一直被其他线程”插队“而一直获取不到锁的情况。
第二是性能问题。在实际的多处理上运行的自旋锁在锁竞争激烈时性能较差。
CLH队列就把上述的两个问题解决了。
CLH队列过程(加解锁)
- 初始化一个 Tail 指向一个状态为false的空节点。
- 当t1线程获取锁,tail就指向t1线程,同时修改状态为true
- 当t2也想获得锁,此时t1还没释放,tail就指向t2,同时t1对应的节点。此时t2检查到t1节点为false,就开始轮询上一个节点的状态
- 当t1结束后,就把值修改为false
- 当t2轮询到值为false后,就是获取锁成功。
AQS升级CLH锁
- 扩展了每个节点的状态(waitStatus),从简单的true和false,扩展到如下部分。
- 维护前驱后继节点
原始版本的 CLH 锁中,节点间甚至都没有互相链接。但是,通过在节点中显式地维护前驱节点,CLH 锁就可以处理“超时”和各种形式的“取消”:如果一个节点的前驱节点取消了,这个节点就可以滑动去使用前面一个节点的状态字段。 - 用阻塞等待代替自选操作
当前一个节点释放锁之后,它会通知后一个节点获取锁。
条件变量
如果线程不满足条件变量后,那么它就要进入另外一种队列进行等待。每一个条件变量都会创建出一个队列。其中这个队列是单向的。
锁资源获取与释放
- 独占式
- acquire获取资源
- release释放资源
- 共享式
- acquireShared获取资源
- releaseShared释放资源
获得独占锁
acquire是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入CLH队列,直到获取资源成功为止,且整个过程忽略中断的影响。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
释放独占锁
AQS中提供了release模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CLH队列的第二个线程节点。
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
获得共享锁
acquireShared是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响。
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
释放共享锁
AQS中提供了releaseShared模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CHL队列的第二个线程节点(首节点的下个节点)
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
自定义锁
下面我们自定义一个锁,该锁内部的同步器是不可重入的独占锁。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;// 自定义锁
public class MyLock implements Lock {// 独占锁 同步器类public class MySync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int acquires) {if (compareAndSetState(0, 1)) {// 加锁,并设置成当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int acquires) {setExclusiveOwnerThread(null);// state是被volatile修饰的,exclusiveOwnerThread 没有被volatile修饰// 所以设置exclusiveOwnerThread成null要放在前面setState(0);return true;}protected Condition newCondition() {return new ConditionObject();}// 是否持有独占锁@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}}private MySync sync = new MySync();// 加锁,不成功进入等待队列@Overridepublic void lock() {sync.acquire(1);}// 加锁,不成功进入等待队列,可被中断@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}// 尝试加锁一次,不成功直接返回@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}// 尝试加锁(带超时的),不成功进入等待队列,可被中断@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}// 解锁@Overridepublic void unlock() {// 这里release本质上调用的是tryRelease方法 解锁并唤醒等待队列中的线程// 如果调用的是release方法,就不会唤醒等待的线程sync.release(1);}// 创建一个Condition@Overridepublic Condition newCondition() {return sync.newCondition();}
}class Test {public static void main(String[] args) {MyLock lock = new MyLock();new Thread(() -> {lock.lock();long now = System.currentTimeMillis();try {System.out.println("t1 获取锁");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("t1 释放锁");System.out.println("t1 耗时:" + (System.currentTimeMillis() - now));lock.unlock();}}, "t1").start();new Thread(() -> {lock.lock();try {System.out.println("t2 获取锁");} finally {System.out.println("t2 释放锁");lock.unlock();}}, "t2").start();}
}