ReentrantLock
用途:锁是用来解决线程安全问题的
重入锁-> 互斥锁
- 满足线程的互斥性
- 意味着同一个时刻,只允许一个线程进入到加锁的代码中。多线程环境下,满足线程的顺序访问
锁的设计猜想
- 一定会涉及到锁的抢占,需要有一个标记来实现互斥。假设全局变量(0,1)
- 抢占到了锁,怎么处理(不需要处理)
- 没抢占到锁,怎么处理
- 需要等待(让处于排队中的线程,如果没有抢占到锁,则直接先阻塞->释放CPU资源)
- 如何让线程等待
- wait/notify(线程通信的机制,无法指定唤醒某个线程)
- LockSupport.park/unpark(阻塞一个指定的线程,唤醒一个指定的线程)
- Condition
- 需要排队(允许有N个线程被阻塞,此时线程处于活跃状态)
- 通过一个数据结构,把这N个排队的线程存储起来
- 如何让线程等待
- 抢占到锁的释放过程,如何处理
- LockSupport.park唤醒处于队列中的指定线程
- 锁抢占的公平性(是否允许插队)
- 公平
- 非公平
下图为锁猜想的整个过程
公平锁和非公平锁
锁抢占的公平性(是否允许插队)主要体现在 释放锁和抢占锁的临界区
先看对象的创建 重入锁默认的构造方法是非公平锁。那么公平锁和非公平锁有什么区别呢?
进入AQS队列中的都是采用的公平锁
公平锁
参数1代表抢占一把锁
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
public final boolean hasQueuedPredecessors() { Node t = tail;Node h = head;Node s; // 头尾指向一个节点,链表为空,返回falsereturn h != t &&// 头尾之间有节点,判断头节点的下一个是不是空// 不是空进入最后的判断,第二个节点的线程是否是本线程,不是返回 true,表示当前节点有前驱节点((s = h.next) == null || s.thread != Thread.currentThread());
}
分析: tryacquire方法是尝试着去抢占一把锁。tryacquire方法中1.先获得当前线程,判断当前的锁状态,为0则为无锁状态,2.若为无锁状态,再先检查AQS队列中是否有前驱节点,没有(false)才去竞争,有的话,就去老老实实的排队。3.如果当前为有锁状态,则会判断当前线程是否和抢占到锁的线程是同一个,如果是则对state进行加1操作,表示重入
非公平锁
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
分析:1.非公平锁在抢占锁之前就会进行插队,先进行一次CAS操作,如果成功了,则直接抢占到锁,如果失败了,则会重新去抢占锁2.在抢占锁与公平锁的区别主要体现在 非公平锁无需再判断AQS队列,而是直接进行锁的抢占。3.如果已有线程占有锁的情况下,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 加入队列并自旋等待
区别: 由上面代码可以看出,公平锁就是规规矩矩的按队列的顺序去抢占锁。而非公平锁在一开始就先去插队抢占锁 compareAndSetState(0,1)
加入对流并自旋等待
// AbstractQueuedSynchronizer#addWaiter,返回当前线程的 node 节点
private Node addWaiter(Node mode) {// 将当前线程关联到一个 Node 对象上, 模式为独占模式 Node node = new Node(Thread.currentThread(), mode);Node pred = tail;// 快速入队,如果 tail 不为 null,说明存在阻塞队列if (pred != null) {// 将当前节点的前驱节点指向 尾节点node.prev = pred;// 通过 cas 将 Node 对象加入 AQS 队列,成为尾节点,【尾插法】if (compareAndSetTail(pred, node)) {pred.next = node;// 双向链表return node;}}// 初始时队列为空,或者 CAS 失败进入这里enq(node);return node;
}
// AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {// 自旋入队,必须入队成功才结束循环for (;;) {Node t = tail;// 说明当前锁被占用,且当前线程可能是【第一个获取锁失败】的线程,【还没有建立队列】if (t == null) {// 设置一个【哑元节点】,头尾指针都指向该节点if (compareAndSetHead(new Node()))tail = head;} else {// 自旋到这,普通入队方式,首先赋值尾节点的前驱节点【尾插法】node.prev = t;// 【在设置完尾节点后,才更新的原始尾节点的后继节点,所以此时从前往后遍历会丢失尾节点】if (compareAndSetTail(t, node)) {//【此时 t.next = null,并且这里已经 CAS 结束,线程并不是安全的】t.next = node;return t; // 返回当前 node 的前驱节点}}}
}
addWaiter主要作用是把当前线程通过尾插法插入到队列中。
线程加入队列后,就是要想办法阻塞线程,不让它执行了,看acquireQueued中的实现
final boolean acquireQueued(final Node node, int arg) {// true 表示当前线程抢占锁失败,false 表示成功boolean failed = true;try {// 中断标记,表示当前线程是否被中断boolean interrupted = false;for (;;) {// 获得当前线程节点的前驱节点final Node p = node.predecessor();// 前驱节点是 head, FIFO 队列的特性表示轮到当前线程可以去获取锁if (p == head && tryAcquire(arg)) {// 获取成功, 设置当前线程自己的 node 为 headsetHead(node);p.next = null; // help GC// 表示抢占锁成功failed = false;// 返回当前线程是否被中断return interrupted;}// 判断是否应当 park,返回 false 后需要新一轮的循环,返回 true 进入条件二阻塞线程if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 条件二返回结果是当前线程是否被打断,没有被打断返回 false 不进入这里的逻辑// 【就算被打断了,也会继续循环,并不会返回】interrupted = true;}} finally {// 【可打断模式下才会进入该逻辑】if (failed)cancelAcquire(node);}
}
acquireQueued会在一个自旋中不断尝试获得锁,失败后进入park阻塞
如果当前线程是再head节点后,也就是第一个节点,又会直接多一次机会tryAcquire尝试获取锁,如果还是被占用,会返回失败。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;// 表示前置节点是个可以唤醒当前节点的节点,返回 trueif (ws == Node.SIGNAL)return true;// 前置节点的状态处于取消状态,需要【删除前面所有取消的节点】, 返回到外层循环重试if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);// 获取到非取消的节点,连接上当前节点pred.next = node;// 默认情况下 node 的 waitStatus 是 0,进入这里的逻辑} else {// 【设置上一个节点状态为 Node.SIGNAL】,返回外层循环重试compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}// 返回不应该 park,再次尝试一次retur
- shouldParkAfterFailedAcquire发现前驱节点等待状态是-1, 返回true,表示需要阻塞。
- shouldParkAfterFailedAcquire发现前驱节点等待状态大于0,说明是无效节点,会进行清理。
- shouldParkAfterFailedAcquire发现前驱节点等待状态等于0,将前驱 node 的 waitStatus 改为 -1,返回 false。
private final boolean parkAndCheckInterrupt() {// 阻塞当前线程,如果打断标记已经是 true, 则 park 会失效LockSupport.park(this);// 判断当前线程是否被打断,清除打断标记return Thread.interrupted();
}
通过不断自旋尝试获取锁,最终前驱节点的等待状态为-1的时候,进行阻塞当前线程。调用LockSupport.park进行阻塞
锁的释放
锁的释放主要是通过unlock方法,分为3步 1.设置锁定的线程exclusiveOwnerThread为null 2.设置state为0 无锁状态 3.唤醒队列中的第一个线程
public void unlock() {sync.release(1);
}
// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {// 尝试释放锁,tryRelease 返回 true 表示当前线程已经【完全释放锁,重入的释放了】if (tryRelease(arg)) {// 队列头节点Node h = head;// 头节点什么时候是空?没有发生锁竞争,没有竞争线程创建哑元节点// 条件成立说明阻塞队列有等待线程,需要唤醒 head 节点后面的线程if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;} return false;
}
// ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {// 减去释放的值,可能重入int c = getState() - releases;// 如果当前线程不是持有锁的线程直接报错if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();// 是否已经完全释放锁boolean free = false;// 支持锁重入, 只有 state 减为 0, 才完全释放锁成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}// 当前线程就是持有锁线程,所以可以直接更新锁,不需要使用 CASsetState(c);return free;
}
private void unparkSuccessor(Node node) {// 当前节点的状态int ws = node.waitStatus; if (ws < 0) // 【尝试重置状态为 0】,因为当前节点要完成对后续节点的唤醒任务了,不需要 -1 了compareAndSetWaitStatus(node, ws, 0); // 找到需要 unpark 的节点,当前节点的下一个 Node s = node.next; // 已取消的节点不能唤醒,需要找到距离头节点最近的非取消的节点if (s == null || s.waitStatus > 0) {s = null;// AQS 队列【从后至前】找需要 unpark 的节点,直到 t == 当前的 node 为止,找不到就不唤醒了for (Node t = tail; t != null && t != node; t = t.prev)// 说明当前线程状态需要被唤醒if (t.waitStatus <= 0)// 置换引用s = t;}// 【找到合适的可以被唤醒的 node,则唤醒线程】if (s != null)LockSupport.unpark(s.thread);
}
总结
lock锁主要分为两部分锁的抢占和锁的释放
抢占锁:是在释放锁和抢占锁的临界区 区分公平锁和非公平锁,非公平锁是在当前线程释放锁的瞬间刚来进来一个新的线程,则无需排队可以直接抢占锁,如果没有抢占到,则加入到AQS队列中进行阻塞等待。而公平锁的体现是加入到AQS队列中的阻塞线程,当释放锁后,只能按照先后顺序去抢占锁。
释放锁:释放锁的过程就是把独占锁的线程设为null,状态state恢复为无锁状态,并且去AQS队列中唤醒第一个处于等待的线程节点。