AQS同步队列、条件队列源码解析

AQS详解

  • 前言
  • AQS几个重要的内部属性
    • 字段
    • 内部类 Node
    • 同步队列 | 阻塞队列
    • 等待队列 | 条件队列
  • 重要方法执行链
    • 同步队列的获取、阻塞、唤醒
      • 加锁代码流程
      • 解锁
    • 条件队列的获取、阻塞、唤醒
      • 大体流程
    • 调用await()方法
      • 1. 将节点加入到条件队列
      • 2. 完全释放独占锁
      • 3. 等待进入阻塞队列
      • 4. signal 唤醒线程,转移到阻塞队列
      • 唤醒后检查中断状态
      • 6. 获取独占锁
      • 7. 处理中断状态
      • * 带超时机制的 await
      • * 不抛出 InterruptedException 的 await

前言

在分析 Java 并发包 java.util.concurrent 源码的时候,少不了需要了解 AbstractQueuedSynchronizer(以下简写AQS)这个抽象类,因为它是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。

AQS几个重要的内部属性

字段

//共享变量,使用volatile修饰保证线程可见性
private volatile int state;// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

state 是用来记录同步状态的一个重要属性。在不同的AQS实现类上,往往有不同的含义。但是,总体有那么两种作用。

  • 共享锁:state的值代表着该临界资源的数量。如 :打印机的数量是2,state =2,那么最多允许同时打印两份文件。换算到线程中,就意味着,最多有 state 个线程同时进入。
  • 独占锁:对于独占锁而言,以state的初始值并不是1,而是0。这是因为在独占锁中,这个值的含义代表着重入次数,每重入一次加一,但值为0时,意味着这个这个临界资源并未被抢占。

内部类 Node

static final class Node {// 标识节点当前在共享模式下static final Node SHARED = new Node();// 标识节点当前在独占模式下static final Node EXCLUSIVE = null;// ======== 下面的几个int常量是给waitStatus用的 ===========/** waitStatus value to indicate thread has cancelled */// 代码此线程取消了争抢这个锁static final int CANCELLED =  1;/** waitStatus value to indicate successor's thread needs unparking */// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒或者说可以被唤醒static final int SIGNAL    = -1;/** waitStatus value to indicate thread is waiting on condition */// 条件队列标识static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/// 同样的不分析,略过吧static final int PROPAGATE = -3;// =====================================================// 取值为上面的1、-1、-2、-3,或者0(以后会讲到)// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,//    ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。volatile int waitStatus;// 前驱节点的引用volatile Node prev;// 后继节点的引用volatile Node next;// 这个就是线程本尊volatile Thread thread;//链接到等待条件或特殊值SHARED的下一个节点。即为,用于链接条件队列的指针Node nextWaiter;}

Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next + nextWaiter 五个属性而已,大家先要有这个概念在心里。

同步队列 | 阻塞队列

在多个线程竞争有限资源的情况下,一定会出现部分线程 得不到资源,即陷入阻塞状态。那么对于这些阻塞的线程,AQS会使用一个队列组织起来,用于后续线程的唤醒。这个队列就是CLH队列(Craig,Landin,and Hagersten),一个虚拟的双向链表(这个为什么形容为虚拟的有兴趣可以自己去了解一下),而队列的每一个节点就是一个Node节点,记录了阻塞线程的信息。
在这里插入图片描述

阻塞队列不包含 head 节点, head这个节点在逻辑上代表着 占有锁的这个节点

等待队列 | 条件队列

如何说同步队列是线程想要获取锁失败而入队的,那么条件队列就是已经获取到锁,但是没有满足某种条件而主动阻塞的。而这种情况的阻塞不是因为竞争锁而导致的,那么放在同步队列就不合适了。于是,引申出了条件队列,条件队列是一个单向链表
Condition 经常可以用在生产者-消费者的场景中,这里以 ReentrantLock 举例.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;class BoundedBuffer {final Lock lock = new ReentrantLock();// condition 依赖于 lock 来产生final Condition notFull = lock.newCondition();final Condition notEmpty = lock.newCondition();final Object[] items = new Object[100];int putptr, takeptr, count;// 生产public void put(Object x) throws InterruptedException {lock.lock();try {while (count == items.length)notFull.await();  // 队列已满,等待,直到 not full 才能继续生产items[putptr] = x;if (++putptr == items.length) putptr = 0;++count;notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去} finally {lock.unlock();}}// 消费public Object take() throws InterruptedException {lock.lock();try {while (count == 0)notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费Object x = items[takeptr];if (++takeptr == items.length) takeptr = 0;--count;notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去return x;} finally {lock.unlock();}}
}

await() 方法会释放锁

在这里插入图片描述

是的,有一条线指向了同步队列,这是因为,当条件队列的条件满足时,线程理论上就可以继续执行了,但是需要重新获取锁。

condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。

重要方法执行链

同步队列的获取、阻塞、唤醒

在这里插入图片描述

ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),我们看 FairSync 部分。

加锁代码流程

static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;// 争锁final void lock() {acquire(1);}// 来自父类AQS,我直接贴过来这边,下面分析的时候同样会这样做,不会给读者带来阅读压力// 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。// 否则,acquireQueued方法会将线程压到队列中public final void acquire(int arg) { // 此时 arg == 1// 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试// 因为有可能直接就成功了呢,也就不需要进队列排队了,// 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)if (!tryAcquire(arg) &&// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();}}/*** Fair version of tryAcquire.  Don't grant access unless* recursive call or no waiters or is first.*/// 尝试直接获取锁,返回值是boolean,代表是否获取到锁// 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// state == 0 此时此刻没有线程持有锁if (c == 0) {// 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,// 看看有没有别人在队列中等了半天了if (!hasQueuedPredecessors() &&// 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=// 因为刚刚还没人的,我判断过了compareAndSetState(0, acquires)) {// 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁setExclusiveOwnerThread(current);return true;}}// 会进入这个else if分支,说明是重入了,需要操作:state=state+1// 这里不存在并发问题else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}// 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁// 回到上面一个外层调用方法继续看:// if (!tryAcquire(arg) //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //     selfInterrupt();return false;}// 假设tryAcquire(arg) 返回false,那么代码将执行://        acquireQueued(addWaiter(Node.EXCLUSIVE), arg),// 这个方法,首先需要执行:addWaiter(Node.EXCLUSIVE)/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/// 此方法的作用是把线程包装成node,同时进入到队列中// 参数mode此时是Node.EXCLUSIVE,代表独占模式private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure// 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后Node pred = tail;// tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)if (pred != null) { // 将当前的队尾节点,设置为自己的前驱 node.prev = pred; // 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴if (compareAndSetTail(pred, node)) { // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,// 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了pred.next = node;// 线程入队了,可以返回了return node;}}// 仔细看看上面的代码,如果会到这里,// 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)// 读者一定要跟上思路,如果没有跟上,建议先不要往下读了,往回仔细看,否则会浪费时间的enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/// 采用自旋的方式入队// 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,// 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的private Node enq(final Node node) {for (;;) {Node t = tail;// 之前说过,队列为空也会进来这里if (t == null) { // Must initialize// 初始化head节点// 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的// 还是一步CAS,你懂的,现在可能是很多线程同时进来呢if (compareAndSetHead(new Node()))// 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了// 这个时候有了head,但是tail还是null,设置一下,// 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了// 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return// 所以,设置完了以后,继续for循环,下次就到下面的else分支了tail = head;} else {// 下面几行,和上一个方法 addWaiter 是一样的,// 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}// 现在,又回到这段代码了// if (!tryAcquire(arg) //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //     selfInterrupt();// 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,// 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false// 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head// 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列// 所以当前节点可以去试抢一下锁// 这里我们说一下,为什么可以去试试:// 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,// enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程// 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,// tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下stateif (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,// 要么就是tryAcquire(arg)没有抢赢别人,继续往下看if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 什么时候 failed 会为 true???// tryAcquire() 方法抛异常的情况if (failed)cancelAcquire(node);}}/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops.  Requires that pred == node.prev** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/// 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;// 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回trueif (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。// 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。// 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,// 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,// 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/// 仔细想想,如果进入到这个分支意味着什么// 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3// 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0// 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0// 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}// 这个方法返回 false,那么会再走一次 for 循序,//     然后再次进来此方法,此时会从第一个分支返回 truereturn false;}// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)// 这个方法结束根据返回值我们简单分析下:// 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒//        我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了// 如果返回false, 说明当前不需要被挂起,为什么呢?往后看// 跳回到前面是这个方法// if (shouldParkAfterFailedAcquire(p, node) &&//                parkAndCheckInterrupt())//                interrupted = true;// 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,// 那么需要执行parkAndCheckInterrupt():// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的// 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}// 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况// 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。// 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程:// => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。
}

我们可以看到 FairSync extends SYnc extends AbstractQueuedSynchronize,并对 tryAcquire(int acquires)tryRelease(int releases)进行了重写,而其他操作都是通过 AbstractQueuedSynchronizer 这个抽象类的共性操作来实现的。

解锁

最后,就是还需要介绍下唤醒的动作了。我们知道,正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this); 挂起停止,等待被唤醒。

// 唤醒的代码还是比较简单的,你如果上面加锁的都看懂了,下面都不需要看就知道怎么回事了
public void unlock() {sync.release(1);
}public final boolean release(int arg) {// 往后看吧if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();// 是否完全释放锁boolean free = false;// 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}/*** Wakes up node's successor, if one exists.** @param node the node*/
// 唤醒后继节点
// 从上面调用处知道,参数node是head头结点
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;// 如果head节点当前waitStatus<0, 将其修改为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)// 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)// 唤醒线程LockSupport.unpark(s.thread);
}

唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:

private final boolean parkAndCheckInterrupt() {LockSupport.park(this); // 刚刚线程被挂起在这里了return Thread.interrupted();
}
// 又回到这个方法了:acquireQueued(final Node node, int arg),这个时候,node的前驱是head了

条件队列的获取、阻塞、唤醒

大体流程

在这里插入图片描述

我们首先来看下我们关注的 Condition 的实现类 AbstractQueuedSynchronizer 类中的 ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;// 条件队列的第一个节点// 不要管这里的关键字 transient,是不参与序列化的意思private transient Node firstWaiter;// 条件队列的最后一个节点private transient Node lastWaiter;......

prev 和 next用于实现阻塞队列的双向链表,这里的nextWaiter用于实现条件队列的单向链表
在这里插入图片描述

  1. 条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的;

  2. 我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter 和 lastWaiter;

  3. 每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;

  4. 调用condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。

这个图看懂后,下面的代码分析就简单了。

接下来,我们一步步按照流程来走代码分析。

调用await()方法

// 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
// 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
public final void await() throws InterruptedException {// 老规矩,既然该方法要响应中断,那么在最开始就判断中断状态if (Thread.interrupted())throw new InterruptedException();// 添加到 condition 的条件队列中Node node = addConditionWaiter();// 释放锁,返回值是释放锁之前的 state 值// await() 之前,当前线程是必须持有锁的,这里肯定要释放掉int savedState = fullyRelease(node);int interruptMode = 0;// 这里退出循环有两种情况,之后再仔细分析// 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了// 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 被唤醒后,将进入阻塞队列,等待获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

这个就是await 整体过程了,下面我们分步把上面的几个点用源码说清楚。

1. 将节点加入到条件队列

addConditionWaiter() 是将当前节点加入到条件队列,这种条件队列内的操作是线程安全的。

// 将当前线程对应的节点入队,插入队尾
private Node addConditionWaiter() {Node t = lastWaiter;// 如果条件队列的最后一个节点取消了,将其清除出去// 为什么这里把 waitStatus 不等于 Node.CONDITION,就判定为该节点发生了取消排队?if (t != null && t.waitStatus != Node.CONDITION) {// 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列unlinkCancelledWaiters();t = lastWaiter;}// node 在初始化的时候,指定 waitStatus 为 Node.CONDITIONNode node = new Node(Thread.currentThread(), Node.CONDITION);// t 此时是 lastWaiter,队尾// 如果队列为空if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}

上面的这块代码很简单,就是将当前线程进入到条件队列的队尾。

在addWaiter 方法中,有一个 unlinkCancelledWaiters() 方法,该方法用于清除队列中已经取消等待的节点。

当 await 的时候如果发生了取消操作(这点之后会说),或者是在节点入队的时候,发现最后一个节点是被取消的,会调用一次这个方法。

// 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去
// 纯属链表操作,很好理解,看不懂多看几遍就可以了
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;// 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}
}

2. 完全释放独占锁

回到 wait 方法,节点入队了以后,会调用 int savedState = fullyRelease(node); 方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的。

考虑一下这里的state 的值。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。

// 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值
// 对于最简单的操作:先 lock.lock(),然后 condition1.await()。
//         那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1
//         相应的,如果 lock 重入了 n 次,savedState == n
// 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();// 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}
}

这里注意,考虑一下,如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED,这个已经入队的节点之后会被后继的节点”请出去“。

3. 等待进入阻塞队列

释放掉锁以后,接下来是这段,这边会自旋,如果发现自己还没到阻塞队列,那么挂起,等待被转移到阻塞队列。

int interruptMode = 0;
// 如果不在阻塞队列中,注意了,是阻塞队列
while (!isOnSyncQueue(node)) {// 线程挂起LockSupport.park(this);// 这里可以先不用看了,等看到它什么时候被 unpark 再说if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
}

isOnSyncQueue(Node node) 用于判断节点是否已经转移到阻塞队列了:

// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
// 这个方法就是判断 node 是否已经移动到阻塞队列了
final boolean isOnSyncQueue(Node node) {// 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到// 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中// 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞(同步)队列了。条件队列是使用 nextWaiter 作为后继指针!!if (node.next != null) return true;// 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列// 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。// 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail,// 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。return findNodeFromTail(node);
}// 从阻塞队列的队尾往前遍历,如果找到,返回 true
private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}
}

回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this); 这里线程挂起。

4. signal 唤醒线程,转移到阻塞队列

为了大家理解,这里我们先看唤醒操作,因为刚刚到 LockSupport.park(this); 把线程挂起了,等待唤醒。

唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。

// 唤醒等待了最久的线程
// 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列
public final void signal() {// 调用 signal 方法的线程必须持有当前的独占锁if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}// 从条件队列队头往后遍历,找出第一个需要转移的 node
// 因为前面我们说过,有些线程会取消排队,但是可能还在队列中
private void doSignal(Node first) {do {// 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了// 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 nullif ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;// 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);// 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推
}// 将节点从条件队列转移到阻塞队列
// true 代表成功转移
// false 代表在 signal 之前,节点已经取消了
final boolean transferForSignal(Node node) {// CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,// 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点// 否则,将 waitStatus 置为 0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// enq(node): 自旋进入阻塞队列的队尾// 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点Node p = enq(node);int ws = p.waitStatus;// ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释// 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))// 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节LockSupport.unpark(node.thread);return true;
}

正常情况下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这句中,ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 会返回 true,所以一般也不会进去 if 语句块中唤醒 node 对应的线程。然后这个方法返回 true,也就意味着 signal 方法结束了,节点进入了阻塞队列。

假设发生了阻塞队列中的前驱节点取消等待,或者 CAS 失败,只要唤醒线程,让其进到下一步即可。

唤醒后检查中断状态

上一步 signal 之后,我们的线程由条件队列转移到了阻塞队列,之后就准备获取锁了。只要重新获取到锁了以后,继续往下执行。

等线程从挂起中恢复过来,继续往下看

int interruptMode = 0;
while (!isOnSyncQueue(node)) {// 线程挂起LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
}

先解释下 interruptMode。interruptMode 可以取值为 REINTERRUPT(1),THROW_IE(-1),0

  • REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
  • THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
  • 0 :说明在 await 期间,没有发生中断

有以下三种情况会让 LockSupport.park(this); 这句返回继续往下执行:

  • 常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark)
  • 线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
  • signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的CAS操作失败了
  • 假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题

线程唤醒后第一步是调用 checkInterruptWhileWaiting(node) 这个方法,此方法用于判断是否在线程挂起期间发生了中断,如果发生了中断,是 signal 调用之前中断的,还是 signal 之后发生的中断。

// 1. 如果在 signal 之前已经中断,返回 THROW_IE
// 2. 如果是 signal 之后中断,返回 REINTERRUPT
// 3. 没有发生中断,返回 0
private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;
}

Thread.interrupted():如果当前线程已经处于中断状态,那么该方法返回 true,同时将中断状态重置为 false,所以,才有后续的 重新中断(REINTERRUPT) 的使用。

看看怎么判断是 signal 之前还是之后发生的中断:

// 只有线程处于中断状态,才会调用此方法
// 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
// 返回 true:如果此线程在 signal 之前被取消,
final boolean transferAfterCancelledWait(Node node) {// 用 CAS 将节点状态设置为 0 // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {// 将节点放入阻塞队列// 这里我们看到,即使中断了,依然会转移到阻塞队列enq(node);return true;}// 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0// signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成// 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断while (!isOnSyncQueue(node))Thread.yield();return false;
}

这里再说一遍,即使发生了中断,节点依然会转移到阻塞队列。

到这里,大家应该都知道这个 while 循环怎么退出了吧。要么中断,要么转移成功。

这里描绘了一个场景,本来有个线程,它是排在条件队列的后面的,但是因为它被中断了,那么它会被唤醒,然后它发现自己不是被 signal 的那个,但是它会自己主动去进入到阻塞队列。

6. 获取独占锁

while 循环出来以后,下面是这段代码:

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;

由于 while 出来后,我们确定节点已经进入了阻塞队列,准备获取锁。

这里的 acquireQueued(node, savedState) 的第一个参数 node 之前已经经过 enq(node) 进入了队列,参数 savedState 是之前释放锁前的 state,这个方法返回的时候,代表当前线程获取了锁,而且 state == savedState了。

注意,前面我们说过,不管有没有发生中断,都会进入到阻塞队列,而 acquireQueued(node, savedState) 的返回值就是代表线程是否被中断。如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。

继续往下:

if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();
if (interruptMode != 0)reportInterruptAfterWait(interruptMode);

本着一丝不苟的精神,这边说说 node.nextWaiter != null 怎么满足。我前面也说了 signal 的时候会将节点转移到阻塞队列,有一步是 node.nextWaiter = null,将断开节点和条件队列的联系。

可是,在判断发生中断的情况下,是 signal 之前还是之后发生的? 这部分的时候,我也介绍了,如果 signal 之前就中断了,也需要将节点进行转移到阻塞队列,这部分转移的时候,是没有设置 node.nextWaiter = null 的。

之前我们说过,如果有节点取消,也会调用 unlinkCancelledWaiters 这个方法,就是这里了。

7. 处理中断状态

到这里,我们终于可以好好说下这个 interruptMode 干嘛用了。

  • 0:什么都不做,没有被中断过;
  • THROW_IE:await 方法抛出 InterruptedException 异常,因为它代表在 await() 期间发生了中断;
  • REINTERRUPT:重新中断当前线程,因为它代表 await() 期间没有被中断,而是 signal() 以后发生的中断
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();
}

* 带超时机制的 await

经过前面的 7 步,整个 ConditionObject 类基本上都分析完了,接下来简单分析下带超时机制的 await 方法。

public final long awaitNanos(long nanosTimeout) throws InterruptedException
public final boolean awaitUntil(Date deadline)throws InterruptedException
public final boolean await(long time, TimeUnit unit)throws InterruptedException

这三个方法都差不多,我们就挑一个出来看看吧:

public final boolean await(long time, TimeUnit unit)throws InterruptedException {// 等待这么多纳秒long nanosTimeout = unit.toNanos(time);if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);// 当前时间 + 等待时长 = 过期时间final long deadline = System.nanoTime() + nanosTimeout;// 用于返回 await 是否超时boolean timedout = false;int interruptMode = 0;while (!isOnSyncQueue(node)) {// 时间到啦if (nanosTimeout <= 0L) {// 这里因为要 break 取消等待了。取消等待的话一定要调用 transferAfterCancelledWait(node) 这个方法// 如果这个方法返回 true,在这个方法内,将节点转移到阻塞队列成功// 返回 false 的话,说明 signal 已经发生,signal 方法将节点转移了。也就是说没有超时嘛timedout = transferAfterCancelledWait(node);break;}// spinForTimeoutThreshold 的值是 1000 纳秒,也就是 1 毫秒// 也就是说,如果不到 1 毫秒了,那就不要选择 parkNanos 了,自旋的性能反而更好if (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;// 得到剩余时间nanosTimeout = deadline - System.nanoTime();}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return !timedout;
}

超时的思路还是很简单的,不带超时参数的 await 是 park,然后等待别人唤醒。而现在就是调用 parkNanos 方法来休眠指定的时间,醒来后判断是否 signal 调用了,调用了就是没有超时,否则就是超时了。超时的话,自己来进行转移到阻塞队列,然后抢锁。

* 不抛出 InterruptedException 的 await

public final void awaitUninterruptibly() {Node node = addConditionWaiter();int savedState = fullyRelease(node);boolean interrupted = false;while (!isOnSyncQueue(node)) {LockSupport.park(this);if (Thread.interrupted())interrupted = true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/37582.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小程序安卓手机点击uni-data-select 下拉框选择器会出现蓝色阴影

解决方法&#xff1a;在导入的包中找到uni-data-select.vue&#xff0c;接着找到.uni-stat__select样式&#xff0c;把cursor: pointer去掉。 如果出现穿透问题&#xff0c;uni-select__selector的z-index加高&#xff0c;默认是2。

zdppy_api+vue3+antd开发前后端分离的预加载卡片实战案例

后端代码 import api import upload import timesave_dir "uploads"async def rand_content(request):key api.req.get_query(request, "key")time.sleep(0.3)return api.resp.success(f"{key} " * 100)app api.Api(routes[api.resp.get(&qu…

UnityUGUI之二 CameraTargetTexture

在我们需要将3D物体呈现在2D视角时就可以使用TargetTexture&#xff0c;若想只显示3D物体则需改变背景颜色&#xff0c;并且得再增加一个相机

智慧城市新利器:免费可视化工具助力高效管理

在智慧城市的建设中&#xff0c;实现高效的统筹管理是至关重要的。通过免费可视化工具“山海鲸可视化”&#xff0c;这一目标可以轻松达成。山海鲸可视化是一款免费可视化工具&#xff0c;具备二三维融合、易用性、安全性以及高质量画面渲染等特色&#xff0c;是制作智慧城市可…

数据结构—选择题

01-数据结构—判断题 71.在数据结构中&#xff0c;从逻辑上可以把数据结构分为&#xff08; &#xff09;。 A. 动态结构和静态结构 B. 紧凑结构和非紧凑结构 C. 线性结构和非线性结构 D. 内部结构和外部结构 答案&#xff1a;C 72.当输入规模为n时&#xff0c;下列算法…

如何获得更高质量的回答-chatgpt

在与技术助手如ChatGPT进行交互时&#xff0c;提问的方式直接影响到你获得的答案质量。以下是几个关键的提问技巧&#xff0c;可以帮助你在与ChatGPT的互动中获得更有效的回答&#xff1a; 1. 清晰明了的问题 技巧&#xff1a;确保问题清晰明了&#xff0c;避免含糊不清或模糊的…

短信群发平台:验证码在不同行业的应用

1、手机号码真实性验证&#xff1a;用户注册会员时&#xff0c;为了获取用户真实的手机号码&#xff0c;需要通过手机短信验证功能&#xff0c;确保用户填写的手机号码的真实性。 2、用户找回密码&#xff1a;用户有可能会忘记掉自己的密码甚至用户名&#xff0c;如果该会员已…

Python自定义线程池,这么高效,是不是开了挂?

目录 1、线程池基础 🏗️ 1.1 线程池概念与优势 1.2 Python标准库concurrent.futures简介 示例代码:使用ThreadPoolExecutor执行简单任务 2、利用ThreadPoolExecutor定制 🎛️ 2.1 创建自定义线程池类 示例代码:自定义ThreadPoolExecutor子类 2.2 设置线程池参数与…

四.iOS核心动画 - 图层的视觉效果

引言 在前几篇博客中我们讨论了图层的frame,bounds,position以及让图层加载图片。但是图层事实上不仅可以显示图片&#xff0c;或者规则的矩形块&#xff0c;它还有一系列内建的特性来创建美丽优雅的页面元素。在这篇博客中我们就来探索一下CALayer的视觉效果。 视觉效果 图…

转化分析|一位数据分析师的实验田复盘

花3个月时间&#xff0c;吭哧吭哧写了80页草稿的《投资——1. 知己知彼》&#xff0c;发布之前豪言壮语“2000阅读量”&#xff0c;到现在累计72&#xff0c;真是piapia打脸&#xff01;心态那个崩啊&#xff01;&#xff01; 朋友们吐槽内容太长、定位不明确、分析深度不够&am…

手持式雷达流速仪的工作原理

TH-LS5手持式雷达流速仪基于雷达技术和多普勒效应进行工作。它发射一束微波信号到水体表面&#xff0c;当信号遇到水流时&#xff0c;会发生多普勒频移。发射器发出的高频电磁波信号(通常是微波信号)遇到流体后&#xff0c;部分信号会被反射回来。接收器接收到反射回来的信号&a…

开发一套java语言的智能导诊需要什么技术?java+ springboot+ mysql+ IDEA互联网智能3D导诊系统源码

开发一套java语言的智能导诊需要什么技术&#xff1f;java springboot mysql IDEA互联网智能3D导诊系统源码 医院导诊系统是一种基于互联网和3D人体的智能化服务系统&#xff0c;旨在为患者提供精准、便捷的医院就诊咨询服务。该系统整合了医院的各种医疗服务资&#xff1b;智慧…

【机器学习】机器学习与医疗健康在疾病预测中的融合应用与性能优化新探索

文章目录 引言第一章&#xff1a;机器学习在医疗健康中的应用1.1 数据预处理1.1.1 数据清洗1.1.2 数据归一化1.1.3 特征工程 1.2 模型选择1.2.1 逻辑回归1.2.2 决策树1.2.3 随机森林1.2.4 支持向量机1.2.5 神经网络 1.3 模型训练1.3.1 梯度下降1.3.2 随机梯度下降1.3.3 Adam优化…

【你也能从零基础学会网站开发】(了解)关系型数据库的基本架构体系结构与概念理解

&#x1f680; 个人主页 极客小俊 ✍&#x1f3fb; 作者简介&#xff1a;程序猿、设计师、技术分享 &#x1f40b; 希望大家多多支持, 我们一起学习和进步&#xff01; &#x1f3c5; 欢迎评论 ❤️点赞&#x1f4ac;评论 &#x1f4c2;收藏 &#x1f4c2;加关注 关系型数据库的…

【第五节】C/C++数据结构之图

目录 一、图的基本概念 1.1 图的定义 1.2 图的其他术语概念 二、图的存储结构 2.1 邻接矩阵 2.2 邻接表 三、图的遍历 3.1 广度优先遍历 3.2 深度优先遍历 四、最小生成树 4.1 最小生成树获取策略 4.2 Kruskal算法 4.3 Prim算法 五、最短路径问题 5.1 Dijkstra算…

INFINI Easysearch尝鲜Hands on

INFINI Easysearch 是一个分布式的近实时搜索与分析引擎&#xff0c;核心引擎基于开源的 Apache Lucene。Easysearch 的目标是提供一个自主可控的轻量级的 Elasticsearch 可替代版本&#xff0c;并继续完善和支持更多的企业级功能。 与 Elasticsearch 相比&#xff0c;Easysear…

熊猫烧香是什么?

熊猫烧香&#xff08;Worm.WhBoy.cw&#xff09;是一种由李俊制作的电脑病毒&#xff0c;于2006年底至2007年初在互联网上大规模爆发。这个病毒因其感染后的系统可执行文件图标会变成熊猫举着三根香的模样而得名。熊猫烧香病毒具有自动传播、自动感染硬盘的能力&#xff0c;以及…

vue 组件下 img 标签动态传入不展示

效果 解决办法&#xff1a; require() <titleComponent:title"业务工作概览":src"require(/assets/imgs/evaluation/overviewStatistics.png)"></titleComponent> 效果&#xff1a;

Github 上 Star 数最多的大模型应用基础服务 Dify 深度解读(一)

背景介绍 接触过大模型应用开发的研发同学应该都或多或少地听过 Dify 这个大模型应用基础服务&#xff0c;这个项目自从 2023 年上线以来&#xff0c;截止目前&#xff08;2024-6&#xff09;已经获得了 35k 多的 star&#xff0c;是目前大模型应用基础服务中最热门的项目之一…

从0到1搭建微服务框架

目录 1.技术栈&#xff1a; 2.模块介绍: 3.关键代码讲解 3.1基础公共模块(common)依赖&#xff1a; 3.3授权模块(auth)依赖: 3.4授权模块核心配置类(AuthrizatonConfig): 3.4 SecurityConfig.java 3.5 bootstrap的核心配置文件(其他服务配置类似这个)&#xff1a; 3.6n…