简介
JUC的核心是AQS,大部分锁都是基于AQS扩展出来的,这里先结合可重入锁和AQS,做一个讲解,其它的锁的实现方式也几乎类似
ReentrantLock和AQS
AQS的基本结构
AQS,AbstractQueuedSynchronizer,抽象队列同步器,JUC中的基础组件,基于AQS,JUC实现了多种锁和同步工具。
AQS在设计模式是采用了模板方法设计模式,要想基于AQS实现一个同步工具,需要继承AQS,同时实现所有protected权限的方法,这些方法定义了如何获取锁(独占锁、共享锁),AQS负责整体流程的编排,同时维护阻塞队列、线程的阻塞和唤醒。
AQS用于实现依赖单个原子值去表示状态的同步器
1、AQS的继承体系:
// AQS:AQS继承了AbstractOwnableSynchronizer,并且AQS本身是一个抽象类
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable { }// AQS的父类,AbstractOwnableSynchronizer,只有一个成员变量,用于存放持有独占锁的线程。
// 一个线程可以独占的同步器。本类为创建可能包含所有权概念的锁和相关同步器提供了基础
public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {protected AbstractOwnableSynchronizer() { }// 持有排他锁的线程private transient Thread exclusiveOwnerThread;protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
}
2、AQS的体系结构
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {// protected权限的构造方法,只允许子类继承和调用protected AbstractQueuedSynchronizer() { }// 定义了队列中的节点,节点中封装了线程对象和指向前后节点的指针。AQS中没有队列实例,// 而是通过包装线程为Node类,用前、后节点指针来实现一个虚拟的双向队列。static final class Node {/* 两个常量,定义了锁的模式 */// 锁是共享模式static final Node SHARED = new Node();// 锁是独占模式static final Node EXCLUSIVE = null;Node nextWaiter; // 锁的模式,为null,独占模式,SHARED,共享模式;同时,Condition使用它构建条件队列// 节点间组成一个双向链表volatile Node prev;volatile Node next;// 节点中封装的线程volatile Thread thread;}// 双向链表的头结点private transient volatile Node head;// 双向链表的尾结点private transient volatile Node tail;// 锁的状态,这个状态由子类来操作private volatile int state;/* 模板方法:交给子类实现的方法 */// 尝试获取独占锁protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}// 尝试释放独占锁protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}// 尝试获取共享锁protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}// 尝试释放共享锁protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}// 判断当前锁是否是独占模式protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}/* 父类中负责流程编排 */// 获取独占锁,先调用交给子类实现的tryAcquire方法,如果获取锁失败,进入队列,然后等待public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}// 释放独占锁,先调用交给子类实现的tryRelease方法,如果释放成功,当前节点移出队列,然后唤醒队列中的第一个节点public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}// 获取共享锁public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}// 释放共享锁public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
}
从AQS的基本结构中可以看到:
- AQS的队列,实际是一个双向链表。
- AQS在设计上使用了模板方法模式,父类中定义好流程,然后再定义好交给子类实现的模板方法。在当前案例中,交给子类实现的是获取锁、释放锁的方式,父类来维护阻塞队列、线程的阻塞和唤醒。通过这种方式,子类指定锁的获取和释放的方式,子类可以实现多种类型的锁,比如公平锁、非公平锁、读写锁、并发度控制(信号量)等。
AQS中提供了两套模式:独占模式和共享模式
- 独占模式:exclusive, 同一时间只有一个线程能拿到锁执行
- 共享模式:shared, 同一时间有多个线程可以拿到锁协同工作
子类可以选择独占模式、也可以选择共享模式,例如,读写锁中,读锁就是共享模式,写锁就是独占模式,可重入锁也是独占模式。
ReentrantLock的基本结构
1、ReentrantLock的继承体系:ReentrantLock实现了Lock接口,并且支持序列化
public class ReentrantLock implements Lock, java.io.Serializable {
Lock接口:定义了一个锁应该具备的功能
public interface Lock {/* 获取锁 */// 获取锁void lock();// 可打断地获取锁void lockInterruptibly() throws InterruptedException;// 不阻塞地获取锁,如果获取不到,立刻返回boolean tryLock();// 获取锁,指定超时时长boolean tryLock(long time, TimeUnit unit) throws InterruptedException;// 释放锁void unlock();// 获取条件变量,条件变量是指调用wait方法、notify方法的锁对象,ReentrantLock可以实现在多个条件变量上等待和唤醒Condition newCondition();
}
2、ReentrantLock的类结构
public class ReentrantLock implements Lock, java.io.Serializable {// 同步器:抽象静态内部类,定义了获取锁、释放锁的功能。同步器继承了AQS,AQS是juc的核心。abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;// 获取锁的功能,交给子类扩展abstract void lock();// 释放锁protected final boolean tryRelease(int releases) { } // 这里暂时不关心它的实现}// 同步器的实例private final Sync sync;// 非公平锁static final class NonfairSync extends Sync { }// 公平锁static final class FairSync extends Sync { }// 构造方法public ReentrantLock() {sync = new NonfairSync(); // 默认使用非公平锁}// 获取锁public void lock() {sync.lock();}// 释放锁public void unlock() {sync.release(1);}
}
总结:
- ReentrantLock内部定义了一个同步器,并且它的所有功能实际上都是委托给同步器来完成。
- 同步器把获取锁的方法定义为抽象方法,同时实现了释放锁的方法,在同步器的基础上,扩展出了两个子类,公平锁、非公平锁,公平锁和非公平锁只是获取锁的方式不同,其它都相同。ReentrantLock默认使用非公平锁
- 同步器继承了AQS
ReentrantLock是如何获取锁的?
ReentrantLock内部实现了公平锁和非公平锁两种获取锁的模式,默认使用非公平锁,这里分别讲解它们的工作机制。
非公平锁是如何获取锁的?
源码:
1、整体流程
// 这是同步器的子类NonfairSync中获取锁的方法
final void lock() {// 获取锁:尝试使用cas算法改变状态变量,把它的值从0改为1,改变成功,表示获取到锁if (compareAndSetState(0, 1))// 获取到锁后,把当前线程设置为独占线程setExclusiveOwnerThread(Thread.currentThread());else// 没有获取到锁:调用AQS中的方法,进入阻塞队列acquire(1);
}
这个方法中调用的方法都是来自AQS。从方法的实现步骤来看,非公平锁在获取锁时,先尝试直接获取锁,获取不到,再进入阻塞队列,符合之前提到的非公平锁的原理。
2、AQS中acquire方法:它在AQS中定义了获取独占锁的流程
// 整体流程
public final void acquire(int arg) {// 尝试获取锁:首先调用交给子类实现的的tryAcquire方法,尝试获取锁,如果获取成功,直接进入同步代码块,if (!tryAcquire(arg) && // 如果没有获取到锁,进入阻塞队列:如果获取锁失败,线程进入队列,阻塞,入队之前还会尝试再次获取锁,// 成功则不入队acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 为线程打上中断状态,因为acquireQueued方法中会清除线程的中断标识,所以这里需要重新为线程打上中断标识selfInterrupt();
}
2.1、交给子类实现的tryAcquire方法:尝试获取锁
// 定义在NonfairSync中
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}// 定义在Sync中
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // 表示无锁// 尝试获取锁,获取锁就是通过cas算法改变state变量的状态,改变成功,就是获取到锁if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) { // 如果获取锁的线程是当前线程,重入int nextc = c + acquires; // 状态加1if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc); // 更新状态return true;}return false;
}
2.2、获取锁失败后进入阻塞队列的逻辑:
// 1、addWaiter方法:创建节点并且把节点加入到队列中
private Node addWaiter(Node mode) { // 这里的参数mode表示节点的模式,这里是独占模式// 将当前线程封装成一个node节点Node node = new Node(Thread.currentThread(), mode);/* 下面是操作双向链表的代码,将新节点加入到队列的尾部,队列中的头结点是一个虚拟节点 */Node pred = tail;if (pred != null) { // 如果当前队列中有节点node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node); // 当前队列中没有节点,那么需要创建虚拟头结点,然后新节点入队return node;
}// 节点入队的逻辑
private Node enq(final Node node) {for (;;) {// 自旋// 获取当前队列中的尾节点Node t = tail;if (t == null) { // Must initialize // 初始化一个空节点 new Node(),作为队列中的哨兵节点if (compareAndSetHead(new Node())) // cas操作,设置头节点为空节点tail = head;// 初始化空节点后,因为此时队列中只有一个节点,所以head和tail都指向这一个节点} else {// 第二次循环,将节点挂载到空节点上node.prev = t;// 将用户传入的node节点设置为尾节点if (compareAndSetTail(t, node)) {t.next = node;return t; // 返回前一个节点}}}
}
2.3、线程进入阻塞的逻辑:
// 2、acquireQueued,线程进入阻塞
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) { // 自旋// 获取当前节点的上一个节点final Node p = node.predecessor();// 判断前一个节点是不是头节点(队列中使用一个虚拟节点作为头结点,所以当前节点的前一个节点是头结点,// 那么当前节点就是事实上的头结点),如果是,尝试抢占锁资源if (p == head && tryAcquire(arg)) {// 抢到锁资源后,这里可以理解为当前节点出队,将当前节点设置为队列的虚拟头结点,// 所以线程获取到锁之后,就会被移出阻塞队列setHead(node);p.next = null; // help GC,将原先的头结点清除failed = false;return interrupted;}// 抢锁失败if (shouldParkAfterFailedAcquire(p, node) && // 判断是否应该阻塞当前线程parkAndCheckInterrupt()) // 调用LockSupport中的park方法,阻塞当前线程interrupted = true;}} finally {if (failed)cancelAcquire(node); // 发生异常,当前节点出队}
}// 判断是否应该阻塞当前线程:shouldParkAfterFailedAcquire
// 第一个参数是当前节点的上一个节点,第二个参数是当前节点,
// 总结:如果上一个节点的状态是SIGNAL,那么当前节点应该阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 这个方法中需要判断节点的状态。表示状态的常量:// static final int CANCELLED = 1;:表示当前节点的线程已经被取消// static final int SIGNAL = -1;:表示当前节点的后继节点的线程正在等待唤醒// static final int CONDITION = -2;:节点在条件队列中,节点线程等待在Condition上,// 不过当其他的线程对Condition调用了signal()方法后,该节点就会从等待队列// 转移到同步队列中,然后开始尝试对同步状态的获取// static final int PROPAGATE = -3;:表示下一个共享状态应该被无条件传播int ws = pred.waitStatus;// 判断上一个节点的状态if (ws == Node.SIGNAL)return true; // 如果前驱节点状态为SIGNAL,当前节点需要被阻塞if (ws > 0) { // 如果前驱节点的状态是CANCELLED,需要把CANCELLED状态的节点移除出队列do {node.prev = pred = pred.prev; // 前一个节点前移,同时当前节点指向前前一个节点} while (pred.waitStatus > 0);pred.next = node; // 将当前节点连接到新的前驱节点} else {// 将前驱节点状态设置为SIGNAL,当下一次执行这个方法时,// 因为ws == SIGNAL状态成立,所以下一次会执行返回truecompareAndSetWaitStatus(pred, ws, Node.SIGNAL); }return false;
}// parkAndCheckInterrupt:阻塞当前线程
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); // 这个方法会清除中断标记
}
2.4、发生异常,节点出队的方法:
private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null;// Skip cancelled predecessorsNode pred = node.prev;while (pred.waitStatus > 0) // 如果前一个节点的状态是CANCELLED,需要移除前一个节点,重新构建队列node.prev = pred = pred.prev; // 前一个节点前移,同时当前节点指向前前一个节点Node predNext = pred.next; // 循环过后前面的节点的下一个节点node.waitStatus = Node.CANCELLED;// 如果当前节点是尾结点,将尾结点更新为前一个节点if (node == tail && compareAndSetTail(node, pred)) {// 同时前一个节点的next指针指向nullcompareAndSetNext(pred, predNext, null);} else {// 如果后序节点需要被唤醒int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {// 进入if分支,证明:前驱节点不是头节点、前驱节点的状态是SIGNAL或可以被设置为SIGNAL、// 前驱节点仍然有线程关联,移除当前节点即可Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next); // 从队列中移除当前节点} else {// 进入else分支,证明当前节点的前驱节点无法正常处理后续节点的唤醒逻辑,因此在这里唤醒后续节点unparkSuccessor(node); // 唤醒后续节点}node.next = node; // help GC}
}// 唤醒后续节点
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 遍历,从尾结点开始,找到当前节点之后第一个状态不为CANCELLED的节点Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); // 唤醒该节点
}
总结:用一张流程图来总结代码中的流程
1、lock方法:尝试获取锁的流程
2、acquire方法的流程
公平锁是如何获取锁的?
源码:
// FairSync:这是同步器的子类FairSync中获取锁的方法
final void lock() {// 直接进入阻塞队列,也符合公平锁的原理,acquire中会调用tryAcquire方法,// 公平锁和非公平锁都分别实现了这个方法,定制自己获取锁的逻辑acquire(1);
}// FairSync 公平锁中获取锁的流程
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // 如果无锁if (!hasQueuedPredecessors() && // 等待队列中没有节点compareAndSetState(0, acquires)) { // 获取锁成功setExclusiveOwnerThread(current); // 设置当前线程为拥有独占锁的线程return true;}}else if (current == getExclusiveOwnerThread()) { // 锁的可重入逻辑int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
公平锁不像非公平锁那样,先尝试获取锁,而是按照流程,判断阻塞队列中有没有节点,如果有,进入阻塞队列
ReentrantLock是如何释放锁的?
公平锁和非公平锁释放锁的原理是一样的,这里统一讲解。
调用AQS中的release方法,来完成释放锁的逻辑
public final boolean release(int arg) {// tryRelease,交给子类实现的模板方法,执行释放锁的逻辑,如果返回true,表示锁释放成功if (tryRelease(arg)) {// 处理等待队列,唤醒后继节点Node h = head;if (h != null && h.waitStatus != 0)// 唤醒后继节点,这里是唤醒队列中的第一个节点unparkSuccessor(h); // 这个方法在前面有提到return true;}return false;
}// ReentrantLock中释放锁的逻辑,同样的,改变状态,将独占锁标识改为null
protected final boolean tryRelease(int releases) {int c = getState() - releases; // 改变状态,这里包含可重入的逻辑if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断,如果当前线程没有持有锁throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {// 确认是释放锁,将独占锁的标识置为null,此前它是当前线程free = true;setExclusiveOwnerThread(null);}setState(c); // 设置状态return free;
}
Condition 源码
1、ReentrantLock中获取Condition的方法:
// ReentrantLock中的方法:
public Condition newCondition() {return sync.newCondition();
}
// 同步器中的方法:
final ConditionObject newCondition() {return new ConditionObject();
}
可以看到,最终是获取了一个ConditionObject的实例
2、ConditionObject:它被定义在AQS中,实现了Condition接口。
Condition接口:条件对象,条件对象从Object类中提取出了wait、notify、notifyAll方法的功能,使得一个锁对象上可以支持多个等待队列。
public interface Condition {// 使当前线程进入等待状态,直到被唤醒或调用interrupt方法void await() throws InterruptedException;// 等待,直到被唤醒void awaitUninterruptibly();// 等待,并且指定时长long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;// 唤醒单个线程void signal();// 唤醒全部线程void signalAll();
}
2、ConditionObject的整体结构:它是AQS的成员内部类
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/* 维护一个等待队列 */// 头节点private transient Node firstWaiter;// 尾节点private transient Node lastWaiter;
}
3、阻塞逻辑的实现:
// await方法:线程入队,然后调用park方法,进入等待状态
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 向条件队列添加一个节点Node node = addConditionWaiter();int savedState = fullyRelease(node); // 完全释放当前线程持有的锁int interruptMode = 0;// 判断当前节点是否在同步队列while (!isOnSyncQueue(node)) {// 如果不在,进入阻塞状态LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 唤醒后,没有进入上面的while,证明节点在同步队列中,尝试重新获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // 判断节点是否仍在条件队列中unlinkCancelledWaiters(); // 清理队列中所有被取消的节点,确保队列的正确性if (interruptMode != 0)reportInterruptAfterWait(interruptMode); // 处理中断逻辑,被外部中断后,是抛异常还是继续进行
}// 清理队列中失效的节点:这里实际上是处理单向链表的逻辑,等待队列中,使用nextWaiter指向下一个节点
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) { // 从头结点开始遍历Node next = t.nextWaiter; // 下一个节点if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t; // 用作下一次循环时记录前一个节点t = next; // 头结点后移}
}
3.1、添加线程到条件队列,条件队列是一个单向链表
private Node addConditionWaiter() {Node t = lastWaiter;// 这里先判断尾结点的状态,如果它不是CONDITION状态if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters(); // 清理队列中失效的节点t = lastWaiter;}// 节点入队Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}
3.2、释放锁的逻辑
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) { // 释放锁failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED; // 如果释放失败,将节点状态改为CANCELLED}
}
4、唤醒逻辑的实现:
// signal方法:唤醒线程
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first); // 唤醒队列中的第一个节点,方法中会调用LockSupport.unpark方法
}private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null) // 头结点后移lastWaiter = null; // 如果等待队列中只有一个节点,尾结点置为空first.nextWaiter = null; // 原先的头结点出队} while (!transferForSignal(first) && // 唤醒头结点所在的线程,如果唤醒成功,退出循环(first = firstWaiter) != null);
}// 唤醒头结点所在的线程
final boolean transferForSignal(Node node) {// 状态转换,由CONDITION变为初始状态if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;Node p = enq(node); // 节点加入同步队列,这里返回的是当前节点的前一个节点int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前一个节点的状态无法被转换为SIGNALLockSupport.unpark(node.thread); // 手动唤醒当前节点return true;
}
总结:Condition的工作机制
- Condition的实现类ConditionObject,是AQS的内部类,所以它会持有AQS的引用,在当前案例中,AQS是通过子类ReentrantLock实例化的,所以Condition实际上持有ReentrantLock的实例。成员内部类的实例依赖于外部类的实例,通过ReentrantLock创建ConditionObject的实例。
- Condition内部会维护一个自己的等待队列,它又持有ReentrantLock的实例,所以它可以操作ReentrantLock的同步队列
- 用户通过Condition实例调用await方法,condition会把用户线程加入到自己的条件队列中,然后阻塞
- 用户通过Condition实例调用signal方法,condition会把用户线程从自己的条件队列中移除,然后加入到ReentrantLock的同步队列中,然后唤醒用户线程
- 用户线程被唤醒后,判断自己是不是在同步队列中,如果在,抢锁,如果不在,继续阻塞。抢到锁之后,会额外判断,如果当前线程还在条件队列中,会清理条件队列中失效的节点
总结
这里介绍了可重入锁和AQS是如何配合在一起工作的,它们的设计模式,哪些功能被定义在可重入锁中,哪些功能被定义在AQS中,后面介绍的几个工具也是这么实现的,模式基本相同。
ReentrantReadWriteLock 源码
读写锁,读锁是共享锁,写锁是独占锁,和之前的ReentrantLock类似的一点,ReentrantLock本身可以被理解为是一个写锁
ReentrantReadWriteLock的基本结构
1、ReentrantReadWriteLock的继承体系:ReentrantReadWriteLock实现了读写锁(ReadWriteLock)接口,并且支持序列化
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {
ReadWriteLock接口:定义了读锁和写锁
public interface ReadWriteLock {// 读锁Lock readLock();// 写锁Lock writeLock();
}
2、ReentrantReadWriteLock的类结构
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {// 同步器abstract static class Sync extends AbstractQueuedSynchronizer {/* 同步器中定义的抽象方法 */// 判断读是否应该阻塞abstract boolean readerShouldBlock();// 判断写是否应该阻塞abstract boolean writerShouldBlock();}// 非公平锁static final class NonfairSync extends Sync {final boolean writerShouldBlock() {return false; // 非公平锁,写锁可以竞争}final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}}// 公平锁static final class FairSync extends Sync {final boolean writerShouldBlock() {return hasQueuedPredecessors();}final boolean readerShouldBlock() {return hasQueuedPredecessors();}}// 读锁:内部操作的是共享锁public static class ReadLock implements Lock, java.io.Serializable {// 同步器,锁的内部,所有的功能都是委托同步器实现的private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() { // 获取锁sync.acquireShared(1);}public void unlock() { // 释放锁sync.releaseShared(1);}}// 写锁,内部操作的是独占锁public static class WriteLock implements Lock, java.io.Serializable {// 同步器private final Sync sync;public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}}// 构造方法,读锁和写锁默认都是非公平锁public ReentrantReadWriteLock() {this(false);}public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);}
}
ReentrantReadWriteLock和ReentrantLock类似,只是它的内部扩展出了读锁和写锁,读锁和写锁依赖的是同一个同步器,读锁是共享锁,写锁是排它锁。
读写锁内置的同步器
读写锁需要在读锁和写锁之间同步,这些功能都依赖同步器
同步器的结构:
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 6317671515068378041L;/* 锁的状态:用state的高16位作为读锁的数量,低16位表示写锁的数量 */static final int SHARED_SHIFT = 16; // 计算常量static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 读锁的次数,状态字段无符号右位移16位static int sharedCount(int c) { return c >>> SHARED_SHIFT; }// 写锁的次数,状态字段取后16位static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }// 静态内部类:封装了线程id和该线程持有读锁的次数static final class HoldCounter {int count = 0;// Use id, not reference, to avoid garbage retentionfinal long tid = getThreadId(Thread.currentThread()); // 线程id,在创建实例时初始化}private transient HoldCounter cachedHoldCounter;// 静态内部类:继承了ThreadLocal,用于记录每个线程持有读锁的次数static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}private transient ThreadLocalHoldCounter readHolds;// 第一个持有读锁的线程private transient Thread firstReader = null;// 第一个持有读锁的线程持有几次读锁,这里是可重入锁的逻辑private transient int firstReaderHoldCount;Sync() {readHolds = new ThreadLocalHoldCounter();setState(getState()); // ensures visibility of readHolds}
}
获取读锁的逻辑
读锁直接调用了AQS中获取共享锁的逻辑,并且在某些步骤做了自己的定制,这就是模板方法设计模式。
源码:
1、基本步骤
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) // 尝试获取共享锁doAcquireShared(arg); // 获取失败,进入阻塞队列
}
2、尝试获取共享锁
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 获取 state 的值int c = getState();// 判断是否有线程获取了写锁并且不是当前线程,exclusiveCount(c),计算写锁的数量if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)return -1; // 如果有线程获取了写锁,直接返回// 计算读锁的数量int r = sharedCount(c);if (!readerShouldBlock() && // 读是否应阻塞,这个实现在子类中r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) { // cas算法修改状态,修改成功,表示获取到读锁/* 下面这一大段逻辑都是用来记录当前线程获取了几次共享锁 */if (r == 0) {// 表示是第一个获取读锁的线程firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {// 如果是当前获取读锁的线程重入,再次获取锁firstReaderHoldCount++;} else {// 如果不是第一个获取读锁的线程来获取读锁,使用cachedHoldCounter和readHolds。// cachedHoldCounter记录线程id和该线程对于读锁的持有次数,readHolds将该数据// 存储到ThreadLocal中HoldCounter rh = cachedHoldCounter;// 这个if else是处理readHolds和cachedHoldCounter之间的关系if (rh == null || rh.tid != getThreadId(current)){// 这里的get方法,会调用初始化数据的方法initialValue() // 参考ThreadLocalcachedHoldCounter = rh = readHolds.get();} else if (rh.count == 0)readHolds.set(rh);rh.count++; // 线程id在创建实例时初始化,这里记录该线程对于读锁的持有次数}return 1; // 表示获取锁成功}// 需要被阻塞获取读锁失败,那么需要进入下面完整版的获取锁的过程return fullTryAcquireShared(current);
}// 读是否需要被阻塞:这个逻辑定义在同步器中。具体实现是,如果队列中的第一个节点是一个独占模式,读线程需要被阻塞,否则不需要。
// 因为即使当前没有写线程持有锁,队列中也可能会有写请求,并且队列中的写请求也应该被优先处理,在非公平锁中,读线
// 程和写线程的请求顺序可能不严格按照先进先出处理,但写线程的请求仍然需要被优先处理。
final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null &&(s = h.next) != null &&!s.isShared() && // 队列中的头结点是独占模式s.thread != null;
}
获取锁失败后,需要调用完整版的获取锁的流程:
// 完整版的获取锁的过程:用于在快速路径失败后,提供一种更全面的尝试获取共享锁的机制。
// 它处理了快速路径未能处理的复杂情况,例如锁降级、读锁数量达到上限、线程是否需要阻塞等。
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) { // 自旋int c = getState();// 判断是否有线程获取了写锁并且不是当前线程if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1; // 如果有,直接返回} else if (readerShouldBlock()) { // 判断读线程是否应该被阻塞if (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove(); // 移除当前线程持有锁的记录}}if (rh.count == 0)return -1;}}// 如果当前线程已经持有写锁,继续尝试获取读锁(锁降级),if (sharedCount(c) == MAX_COUNT) // 检查读锁数量是否达到上限throw new Error("Maximum lock count exceeded");// 尝试获取共享锁if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}
}
3、获取锁失败后,进入阻塞队列
private void doAcquireShared(int arg) {// 向队列尾部添加一个节点final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {// 如果当前节点是头结点,再次尝试获取锁int r = tryAcquireShared(arg);if (r >= 0) {// 获取锁成功,头结点出队并且唤醒后续节点setHeadAndPropagate(node, r); p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}// 阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node); // 处理异常情况}
}private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node); // 将当前节点设置为一个虚拟头结点if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 如果下一个节点是共享模式或者null,唤醒后续节点Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}
释放读锁的逻辑
整体流程:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 尝试释放共享锁doReleaseShared(); // 释放共享锁return true;}return false;
}
1、tryReleaseShared:负责更新状态
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();// 更新当前线程持有共享锁的次数if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}// 更新state变量for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}
}
2、doReleaseShared:维护阻塞队列
private void doReleaseShared() {for (;;) { // 无限循环,直到释放操作完成Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 尝试将状态从SIGNAL改为0continue; // 如果CAS失败,重新循环检查unparkSuccessor(h); // 唤醒头节点的后继节点}// 如果头节点的状态为0,说明当前没有线程需要被唤醒,但需要确保释放操作能够继续传播。else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changed// 在每次循环结束时,检查头节点是否发生变化。如果头节点没有变化,说明当前的释放操作已经完成,可以退出循环。break; }
}
获取和释放写锁的逻辑
和ReentrantLock类似,只是操作state字段的方式不同。
获取写锁的逻辑:
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);if (c != 0) { // 如果锁已经被占用// (Note: if c != 0 and w == 0 then shared count != 0),如果c != 0 && w == 0,// 证明共享锁不为0,有线程持有共享锁if (w == 0 || current != getExclusiveOwnerThread()) // 不是当前线程获取的写锁return false; // 退出// 如果当前线程已经持有写锁,检查是否超过最大次数if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 重入setState(c + acquires);return true;}// 如果锁未被占用(c == 0)if (writerShouldBlock() || // 如果写线程应该阻塞!compareAndSetState(c, c + acquires)) // 或者 CAS 更新状态失败return false;setExclusiveOwnerThread(current);return true;
}
释放写锁的逻辑:
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);setState(nextc);return free;
}
总结
读写锁使用同一个状态字段、同一个阻塞队列,内部基于同一个同步器,彼此之间互相影响,只是获取锁的方式不同。
Semaphore 源码
Semaphore底层是基于共享锁,也可以理解为读锁,加锁和释放锁,操作的都是读锁。它允许多个线程同时执行同步代码块,但是它又会限制线程数量,从而达到控制并发量的目的
Semaphore的基本结构:
public class Semaphore implements java.io.Serializable {// 同步器的实例private final Sync sync;// 同步器的定义abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;// 这里重点关注构造方法,它调用AQS中的setState方法,state字段表示锁的状态,如果是共享锁,// state的值0是到n,如果是排它锁,state的值是0到1,0表示无锁,1表示获取到锁。值为n,表示// 允许n个线程同时获取共享锁Sync(int permits) {setState(permits);} }// 构造方法,参数premits代表许可证数量,也就是同时允许多少个线程进行并发操作public Semaphore(int permits) {sync = new NonfairSync(permits); // 默认使用非公平锁}// 非公平锁static final class NonfairSync extends Sync {}// 公平锁static final class FairSync extends Sync { }// Semaphore方法将获取锁和释放锁的功能都委托给同步器,同步器又调用了AQS中的方法,所以核心实现在AQSpublic void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public void release() {sync.releaseShared(1);}}
和ReentrantLock类似,Semaphore内部定义了同步器Sync,同步器继承了AQS,基于同步器扩展出了公平锁、非公平锁,Semaphore默认使用非公平锁。
原理:Semaphore将状态设置为n,acquire方法,n - 1,代表获取到锁、release方法, n + 1,代表释放锁
CountDownLatch 源码
CountDownLatch的内部是一个共享锁,同样的,它的内部定义了同步器,但是没有根据同步器扩展出非公平锁、公平锁
public class CountDownLatch {// 同步器,继承了AQSprivate static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 构造方法Sync(int count) {setState(count); // 调用了AQS中的setState方法}}// 同步器实例private final Sync sync;// 构造方法,参数count,指定了调用countDown方法的次数,调用够指定次数的countDown方法后,// await方法才会结束阻塞public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}// 等待,直到倒计时锁内部值为0public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}// 倒计时锁减1,如果减1后值为0,释放等待的线程public void countDown() {sync.releaseShared(1);}
}
和Semaphore类似,将state设置为n,调用countDown方法, n - 1,调用await方法,判断 n 是否等于 0 ,如果是,取消阻塞
Q&A
读锁、写锁,公平锁、非公平锁,是怎么配合在一起工作的?
读锁、写锁,公平锁、非公平锁,是从不同的角度描写了一个锁的特性,读锁可以是公平锁、也可以是非公平锁,它们是一个实体的两个属性
线程什么时候从阻塞队列中移出?
获取到锁之后,所以线程释放锁资源时只需要唤醒队列中的第一个节点即可
锁超时怎么实现?tryLock方法
进入阻塞状态时加上超时时长。
源码:
tryAcquireNanos:尝试获取锁,在用户指定的时长过后,如果没有获取锁,结束阻塞,是ReentrantLock中tryLock等方法的基础
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {// 判断打断状态if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) || // 尝试获取锁doAcquireNanos(arg, nanosTimeout); // 获取锁,或者阻塞
}// doAcquireNanos:真正执行获取锁或阻塞的逻辑
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;// 结束阻塞的时间,当前时间加上用户传入的时间final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor();// 如果上一个节点是头节点,尝试获取锁,成功后直接返回,不进入阻塞if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}// 计算出应该阻塞的时长nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;// 判断是否应该阻塞if (shouldParkAfterFailedAcquire(p, node) &&// 这是一项优化,如果时长大于自旋阈值才进行阻塞,否则进行自旋,// 因为如果阻塞时间特别短,相较于自旋,阻塞比较耗费性能nanosTimeout > spinForTimeoutThreshold)// 阻塞LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
可打断怎么实现?tryInterruptibly方法
AQS中会判断当前线程是否调用了interrupt方法,可打断的情况下,如果调用了interrupt方法,抛异常,同时用户需要处理这个异常,不可打断的情况下,也就是通过lock()方法获取锁时,即使检测到interrupt方法被调用,也会继续向下执行。