🚀 优质资源分享 🚀
学习路线指引(点击解锁) | 知识定位 | 人群定位 |
---|---|---|
🧡 Python实战微信订餐小程序 🧡 | 进阶级 | 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。 |
💛Python量化交易实战💛 | 入门级 | 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统 |
什么是AbstractQueuedSynchronizer(AQS)
字面意思是抽象队列同步器
,使用一个voliate
修饰的int类型
的同步状态
,通过一个FIFO队列
完成资源获取的排队工作,把每个参与资源竞争的线程封装成一个Node节点
来实现锁的分配。
AbstractQueuedSynchronizer源码
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizerimplements java.io.Serializable {private transient volatile Node head;//链表头private transient volatile Node tail;//链表尾private transient Thread exclusiveOwnerThread;//持有锁的线程private volatile int state;//同步状态,0表示当前没有线程获取到锁static final class Node {//链表的Node节点类volatile int waitStatus;//当前节点在队列中的状态volatile Node prev;//前置节点volatile Node next;//后置节点volatile Thread thread;//当前线程}
}
AQS同步队列的基本结构
Node.waitStatus的说明
状态值 | 描述 |
---|---|
0 | 节点默认的初始值 |
SIGNAL=-1 | 线程已经准备好,等待释放资源 |
CANCELLED=1 | 获取锁的请求线程被取消 |
CONDITION=-2 | 节点在队列中,等待唤醒 |
state为什么要用volatile修饰?
- 可见性,一个线程对变量的修改可以立即被别的线程感知到
- 有序性,禁止指令重排
AQS获取锁步骤
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
- 当一个线程获取锁时,首先判断state状态值是否为0
- 如果state==0,则通过CAS的方式修改为非0状态
- 修改成功,则表明获取锁成功,执行业务代码
- 修改失败,则把当前线程封装为一个Node节点,加入到队列中并挂起当前线程
- 如果state!=0,则把当前线程封装为一个Node节点,加入到队列中并挂起当前线程
AQS获取锁过程
首先调用tryAcquire
去修state
的状态值,成功就获取当前锁;失败则加入当前等待队列中,然后挂起线程。
tryAcquire
在AQS
的源码中tryAcquire
是一空实现,需要它的子类去实现这个空方法。因为在AQS中虽然公平锁
和非公平锁
的都是基于一个CLH去实现,但是在获取锁的过程中略有不同。
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
公平锁FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {//获取当前线程final Thread current = Thread.currentThread();int c = getState();//获取同步器的状态if (c == 0) {//当前没有线程获取到锁//首先判断祖宗节点的线程是否当前线程一样if (!hasQueuedPredecessors() &&//更改state的状态值为非0compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果锁持有者的线程是当前线程,则可放行,锁的重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
/**
* 判断祖宗节点的线程是否当前线程一样
* 傀儡节点的下个节点
*/
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;//头节点的下个节点所持有的线程是否与当前线程相同return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
非公平锁NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {//通过CAS更改state的状态值if (compareAndSetState(0, acquires)) {//把当前线程设置为锁的持有者setExclusiveOwnerThread(current);return true;}}//如果锁持有者的线程是当前线程,则可放行,锁的重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
对比后发现,公平锁先判断是否有老祖宗节点,如果有则返回false
;如果当前线程对应的node就是老祖宗节点,则直接去修改state状态,把state改为非0。
addWaiter
获取锁成功的线程去执行业务逻辑了,获取锁失败的线程则会在队列中排队等候,每个等候的线程也都不安分的。
private Node addWaiter(Node mode) {//把当前线程封装为一个Node节点Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//加入到队列的尾部enq(node);return node;
}
- 把当前线程封装为一个Node节点
- 当第一次执行这个方法时,由于head和tail都还没有赋值,则pred指向的tail也是空,所以直接直到
enq(node)
- 当pred指向的tail不为空时,则通过CAS的方式加入到尾部,如果成功直接返回;如果失败,则进入
enq(node)
通过自旋的方式加入。
//通过自旋的方式将节点加入到节点的尾部
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
为了操作链表的方便,一般都要在链表的头前加入一个傀儡节点,AQS的链表也不例外。
先创建一个傀儡节点,并把head、tail均指向它,然后再把node节点加入到尾部后面,移动tail的指向。
acquireQueued
当节点成功加入到链表的尾部后,等待被唤醒,然后通过自旋的方式去获取锁
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//当前节点的前置节点final Node p = node.predecessor();//如果前置节点是傀儡节点(head指向傀儡节点),则再次尝试去获取锁if (p == head && tryAcquire(arg)) {//获取成功后,则移除之前的傀儡节点,head指向当前node,setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//获取锁失败后,设置node节点的状态,并挂起当前节点if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
- 获取node节点的前置节点,如果前置节点是head,则再次尝试去获取锁
- 设置当前node节点的前置节点状态为-1(表示后续节点正在等待状态,默认是0),然后通过自旋的后会进行到
parkAndCheckInterrupt
挂起当前节点 LockSupport.park(this)
执行完事,当前线程会一直阻塞到这个地方- 当前唤醒时再次从1开始执行
AQS释放锁过程
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
主要是恢复state的值、重置锁持有都线程,然后唤醒挂起的线程。
protected final boolean tryRelease(int releases) {int c = getState() - releases;//当前线程与锁持有者线程不一样会报错if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {//重入的次数为0时,则当前线程已经没有重入了,可以清空锁的持有者free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
恢复state状态的值,如果重入次数为0时,则清空锁的持有都为null
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//唤醒下个node对应的线程LockSupport.unpark(s.thread);}
设置head指向的node节点的watiStatus的状态值,然后找到下个节点对应的线程并唤醒。