文章目录
- 一:AQS简介
- 二:了解AQS 上锁和释放锁的原理
- 1:前言
- 2:上锁(非公平锁)
- (1):我们从main主函数中点进去
- (2):从lock进入
- (3):找到非公平锁中的lock
- (4):查看acquire()方法
- (5):查看tryAcquire(arg)方法
- a:前言介绍
- b:进入ReentrantLock类中的nonfairTryAcquire方法
- (6):addWaiter(Node.EXCLUSIVE)
- a:前言
- b:源码走一波
- (7): acquireQueued(Node, int)
- a:acquireQueued()
- b: shouldParkAfterFailedAcquire(pred, Node)
- c:parkAndCheckInterrupt()
- d:小结 acquireQueued(),总结下该函数的具体流程:
- (8):总结
- 3:公平锁
- 4:释放锁
- (1):从main函数进入
- (2):在进入Release(ReentrantLock的类中)
- (3):进入AQS源码中
- (4): unparkSuccessor(h)
- a:源码解析:
- b:为啥处于等待结点时候是逆序
- (5):小结
一:AQS简介
- AQS全称为AbstractQueuedSynchronizer,它是一个一个抽象类
- 在AQS中有几个属性和一个双向队列(CLH队列)
//头节点
private transient volatile Node head;
//尾节点
private transient volatile Node tail;
//状态值
private volatile int state;
- AQS是一个基类,在JUC并发包下,其实现类有ReentrantLock,CountDownLatch,…
- 图示例
二:了解AQS 上锁和释放锁的原理
1:前言
我们是通过AQS的实现类ReentrantLock来进行解析其上锁 和 释放锁的原理,但是ReetrantLock锁是有公平锁和非公平锁,以及可重入锁的分类的 我们先看非公平锁
2:上锁(非公平锁)
(1):我们从main主函数中点进去
public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();lock.lock();//点进去lock看源码}
(2):从lock进入
public void lock() {sync.lock();}
- 解释:我们这里从lock直接点进去看源码的话,其实是进了 sync的,该sync类中是定义了lock这个方法的,并且syn 内部类是继承自AbstractQueuedSynchronizer类,那我们的非公平锁类和公平锁类是继承自 sync 这个类的,并且实现其lock方法 。
(3):找到非公平锁中的lock
final void lock() {//以CAS的方式尝试将 state 状态从 0改为1,如果返回 true 则说明上锁成功,否则该资源有线程占用修改失败if (compareAndSetState(0, 1))//证明当前线程拿到了锁资源setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
(4):查看acquire()方法
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
- tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
- 如果尝试获取资源失败的话 也就是 tryAcquire(arg) = false 而 !tryAcquire(arg) =true ,那么接下来就将该线程封装成一个结点Node将其追加到AQS的队列后面中。
(5):查看tryAcquire(arg)方法
a:前言介绍
- 我们点击tryAcquire()的话,我们是直接进入到的是 AQS 基类的 tryAcquire(),这里并没有给出什么具体的实现;但是其实现类
ReentrantLock中 是有其实现的,所以我们需要在ReentrantLock中寻找tryAcquire()的方法
b:进入ReentrantLock类中的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {//获取到当前的线程final Thread current = Thread.currentThread();//获取到AQS的state值int c = getState();//c == 0 证明此时无锁,我们可以通过CAS对某个资源进行上锁if (c == 0) {if (compareAndSetState(0, acquires)) {//证明当前线程拿到了资源setExclusiveOwnerThread(current);return true;}}//锁重入了, 就是我们同一个线程对一个资源上了多次锁else if (current == getExclusiveOwnerThread()) {//将State + 1int nextc = c + acquires;//当c是最大值的时候 再+1的话,那么在二进制中 符号位就会从1到0,那么的话整个值就为负数if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");//重新对AQS中的state值进行赋值setState(nextc);return true;}return false;}
(6):addWaiter(Node.EXCLUSIVE)
a:前言
如果尝试获取资源失败的话,那么我们将该线程封装成Node结点,并将其放到AQS队列尾部
b:源码走一波
private Node addWaiter(Node mode) {//这里就是将当前线程封装成一个结点,mode有两种:EXCLUSIVE(独占)和SHARED(共享),在reentrantLock中,mode 是独占锁Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure//获取到AQS队列的尾部结点Node pred = tail;//如果队列不空的话if (pred != null) {//让新的结点指向AQS尾部结点(双向队列)node.prev = pred;//因为AQS队列中tail一直指向的是最后一个结点,所以我们通过CAS操作将 tail指向最后一个结点if (compareAndSetTail(pred, node)) {//让倒数第二个结点指向倒数第一个结点pred.next = node;return node;}}//如果队列为空的话 或者是CAS操作失败的话,enq(node);return node;}
如果队列为空的话 或者是CAS操作失败的话,则进入enq()函数;
private Node enq(final Node node) {//死循环,CAS通过自旋的方式一定将结点成功的插入到队列尾部,for (;;) {//获取队列的尾部结点 Node t = tail;if (t == null) { // Must initialize 初始化完成后 tail和head都指向该node结点if (compareAndSetHead(new Node()))tail = head;} else { //else 就是上个函数CAS操作失败了,或者是初始化的时候,发生线程上下文切换,那么的话我们执行下列代码node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
(7): acquireQueued(Node, int)
a:acquireQueued()
通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了,进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了(跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回)
1 final boolean acquireQueued(final Node node, int arg) {2 boolean failed = true;//标记是否成功拿到资源3 try {4 boolean interrupted = false;//标记等待过程中是否被中断过5 6 //又是一个“自旋”!7 for (;;) {8 final Node p = node.predecessor();//拿到前驱9 //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
10 if (p == head && tryAcquire(arg)) {
11 setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
12 p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
13 failed = false; // 成功获取资源
14 return interrupted;//返回等待过程中是否被中断过
15 }
16
17 //这里需要保证上一个结点是的-1 (SIGNAL = -1 这个-1代表上一个锁是被唤醒的状态), 当是-1的时候返回true
18 if (shouldParkAfterFailedAcquire(p, node) &&
19 parkAndCheckInterrupt())//基于unsafe类的park()方法,阻塞线程,直到被unpark()唤醒。
20 interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
21 }
22 } finally {
23 if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
24 cancelAcquire(node);
25 }
26 }
b: shouldParkAfterFailedAcquire(pred, Node)
此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了只是瞎站着,那也说不定,对吧。(如果前驱节点pred的状态不是-1的话,那么我们的Node就往前进寻找,直到 找到一个前驱结点是-1的,然后将Node放到其后面)
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {2 int ws = pred.waitStatus;//拿到前驱的状态3 if (ws == Node.SIGNAL)//Node.SIGNAL == -14 //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了5 return true;6 if (ws > 0) {7 /*8 * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。9 * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
10 */
11 do {
12 node.prev = pred = pred.prev;
13 } while (pred.waitStatus > 0);
14 pred.next = node;
15 } else {
16 //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
17 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
18 }
19 return false;
20 }
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
c:parkAndCheckInterrupt()
如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。
d:小结 acquireQueued(),总结下该函数的具体流程:
- 结点进入队尾后,检查状态,找到安全休息点;
- 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,然后获取该锁资源;如果没拿到,继续流程1,继续等待。
(8):总结
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
3:公平锁
从源码中我们可以观察出,公平锁是在上锁的时候,并没有一上来就先去尝试获取资源,而是直接进入 acquire(1)
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}}
4:释放锁
(1):从main函数进入
ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();//从这里进入
(2):在进入Release(ReentrantLock的类中)
public void unlock() {sync.release(1);}
(3):进入AQS源码中
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;//找到头节点if (h != null && h.waitStatus != 0)unparkSuccessor(h);//唤醒下一个线程 return true;}return false;}
(4): unparkSuccessor(h)
a:源码解析:
private void unparkSuccessor(Node node) {int ws = node.waitStatus;//获取到线程的状态, 小于0 说明数处于等待状态,大于0说明说明该结点已经放弃等待if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//0状态就是已经完成的状态//得到头结点的后继结点head.nextNode s = node.next;//如果这个后继结点为空或者状态⼤于0//通过前⾯的定义我们知道 ⼤于0只有⼀种可能 就是这个结点已被取消//那么我们就从重新再找个处于等待状态的结点去唤醒,我们找的顺序是从尾结点开始。if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
b:为啥处于等待结点时候是逆序
- 由于并发问题,addWaiter()入队操作和cancelAcquire()取消排队操作都会造成next链的不一致,而prev链是强一致的,所以这时从后往前找是最安全的。
(5):小结
release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了==(即state=0)==,它会唤醒等待队列里的其他线程来获取资源。