在Java并发: 面临的挑战那一篇中我们提到锁和同步是实现并发安全(可见性/原子性)的方法之一。这一章我们来讲讲Java中的锁和同步的各种工具,包括:
- LockSupport
- AbstractQueuedSynchronizer
- Java内置的锁实现
1. LockSupport
LockSupport是基于Unsafe的park/unpark实现的,用来支持线程的挂起和唤醒。
1.1 工作原理
可以理解为线程上有一个0/1标志位,park/unpark基于这个标志位工作的,使用这个模型我们能比较容易理解它的工作模式
- park()调用,检查标志位,标志位=0挂起当前线程,直到标志位被置1,或被中断/超时;如果标志位=1,将标志位置0,从park方法返回,执行后续代码
- unpark()调用,作用是将标志位置1
unpark()可以在park()之前被调用,已经被unpark()调用过的线程,调用park()时标志位=1,会直接返回而不阻塞。工具方法,sleep休眠指定毫秒数,println打印小时时间戳。
Thread t = new Thread(() -> {println("before sleep");sleep(2000);println("after sleep, going to park");LockSupport.park();println("after park");
});
t.start();
println("before unpark");
LockSupport.unpark(t);
println("after unpark");
t.join();
我们在线程t启动后立刻进行了unpark,而此时线程t应该还在sleep中,sleep结束后的park调用是瞬时返回的
关于unpark还有两个点是需要特别注意的
- 线程Thread t在t.start()调用之前,调用LockSupport.unpark(t)不会做标志位置位,相当于是无效调用
- 对同一个线程t连续两次调用LockSupport.unpark(t),标志位仍然只是置1,只能唤醒一个LockSupport.park()调用
1.2 虚假唤醒
LockSupport.park()的唤醒可能是因为调用了LockSupport.unpark(),也可能是因为线程中断、park超时,一般的做法是在检查park条件时做一个循环。我们来看个常见的示例
public void lock() {while (condition) {LockSupport.park(this);}}
即使park()是因为中断而退出的,程序也能重新进入条件校验,重新挂起,从而避免虚假唤醒导致问题。想想锁和条件wait的写法,是不是和这个如出一辙呢?
1.3 应用案例
LockSupport的文档上提供了一个最简单的锁的案例,FIFOMutex,按调用顺序依次把加锁的机会给每一个调用者,代码如下
class FIFOMutex {private final AtomicBoolean locked = new AtomicBoolean(false);private final Queue<Thread> waiters = new ConcurrentLinkedQueue<>();public void lock() {boolean wasInterrupted = false;// 将想要加锁的线程进队列waiters.add(Thread.currentThread());// 出队列的第一个线程外,全部挂起;第一个线程,尝试加锁,CAS设置locked=truewhile (waiters.peek() != Thread.currentThread() || !locked.compareAndSet(false, true)) {LockSupport.park(this);if (Thread.interrupted()) // 如果线程被中断了,用wasInterrupted保留中断的状态wasInterrupted = true;}waiters.remove(); // 加锁成功的线程从队列移除if (wasInterrupted)Thread.currentThread().interrupt();}public void unlock() {locked.set(false); // 释放锁LockSupport.unpark(waiters.peek()); // 恢复等待锁的第一个线程}static {// Reduce the risk of "lost unpark" due to classloadingClass<?> ensureLoaded = LockSupport.class;}
}
在3. 使用Unsafe里我们有写过一个CrashIntegerID在无锁的情况下生成自增ID,会导致ID重复,限制我们用这个自定义的FIFOMutex进行竞态条件保护,修改后代码如下
public class CrashIntegerID implements ID {private int id;private FIFOMutex mutex;public CrashIntegerID(FIFOMutex lock, int start) {this.id = start;this.mutex = lock;}public int incrementAndGet() {mutex.lock();try {return id++;} finally {mutex.unlock();}}
}
将控制台输出用shell命令统计,可以发现生成10w次后,最大ID是10_0000,ID没有重复的了,说明我们FIFOMutext是生效的。
randy@Randy:~$ cat num | egrep -v '^$' | sort -n | tail -599996
99997
99998
99999
100000
randy@Randy:~$ cat num | egrep -v '^$' | sort -n | uniq -d
2.AbstractQueuedSynchronized
前面我们通过LockSupport实现了一个简单的独占锁FIFOMutex,但是功能比较简易。Java内部通过了一个类似的实现,只需要覆写少数方法就能创建一个功能强大的锁,AbstractQueueSynchronizer
类似于FIFOMutex,AQS也维护了一个内部状态state,将等待锁的线程通过一个CLH队列保存,额外提供ConditionObject对象,支持基于条件的等待还唤醒,同时它还支持共享锁。JDK内部大量的锁和同步器都是基于AQS实现的,比如ReentrantLock、Semaphore等等。
2.1 如何使用
要想基于AQS实现同步器和锁,只需通过AQS提供的getState()、setState(int)、compareAndSetState(int,int)覆写AQS中的5个方法。根据先要实现的锁不同state有不同的含义、不同的值,假设要实现一个非可重入锁,我们可以假定state=0时锁已经被其他线程持有,state=1表示锁限制没有被持有;假设要实现一个类似Semaphore的同步器,state就用来表示可用的信号量。
方法 | 说明 |
boolean tryAcquire(int n) | 申请n个独占资源,返回true表示申请成功,false表示申请失败 |
boolean tryRelease(int n) | 释放n给独占资源,返回true表示释放成功,false表示释放失败 |
int tryAcquireShared(int n) | 申请n个共享资源,返回true表示申请成功,false表示申请失败 |
boolean tryReleaseShared(int n) | 释放n给共享资源,返回true表示释放成功,false表示释放失败 |
boolean isHeldExclusively() | 根据state判断是否独占锁,如果是独占式的,锁持有期间AQS不会调度锁的等待队列的节点来尝试加锁 |
要让AQS正常且高效的工作,覆写这5个方法必须是线程安全的,且不应该有长时间的阻塞。此外AQS还继承了AbstractOwnableSynchronizer,支持在同步器上继续当前持有锁的线程,这样我们能做线程的监控和分析工具能查看,方便定位问题。
2.2 源码解析
锁的使用中核心的逻辑就4个,锁的申请和释放,条件的等待和唤醒,接下来我们重点看一下这4段的逻辑实现。为了方便理解,对源码做过编辑,核心逻辑是接近的。
1. 申请锁
首先是锁的申请,AQS是通过acquire(n)方法申请锁,调用后会一直初始当前线程,除非加锁成功。acquire的第一层逻辑很简单,尝试通过tryAcquire申请资源,申请成功直接就算加锁成功
public final void acquire(int arg) {if (!tryAcquire(arg)) {acquire(null, arg, false, false, false, 0L);}
}
如果申请失败,调用acquire方法,进入一个无限循环,循环的代码略长,根据代码的目的,我把它定义为6个操作,分别是
- 操作1,申请锁的当前节点不是等待队列的队首,清理CLH等待队列中已经放弃(取消)的节点
- 操作2,如果是等待队列对手或没有前置节点,尝试加锁
- 操作3,如果node是null创建节点
- 操作4,将node加入到CLH等待队列
- 操作5,如果是等待队列的队首,还有自旋次数可以用,进行一次自旋
- 操作6,自旋失败,升级使用LockSupport挂起线程
我们来看一下acquire(int arg)调用的acquire方法内部的执行过程
- 一开始node和pred都是null,会先执行操作2,如果加锁成功直接返回,否则继续运行
- 加锁失败的话,执行操作3,创建node节点,进入下一轮循环
- 这个时候node!=null,但是pred依然是null,再次执行操作2,加锁成功直接返回,否则继续运行
- 加锁失败的话,执行操作4,将node加入到CLH等待队列,进入下一轮循环
- 进入操作1,判断在等待队列中的位置
- 第1个节点,执行操作2
- 第2个节点,自旋并进入下一轮循环
- 多次尝试后,确实无法加锁的,进入操作6,将线程挂起
在有的多线程编程的文章和书籍中,将这个执行过程描述为锁升级,把自旋锁定义为玄而又玄的算法,其实所谓的自旋只是让CPU执行一个空指令,看是不是能在几个指令周期后能够成功加锁,从而避免因为线程的挂起(park/unpark)导致的线程上下文切换。所谓的锁升级只是从一开始直接尝试加锁,失败后尝试自旋,仍然不能成功才进入等待队列的过程。
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {Thread current = Thread.currentThread();byte spins = 0, postSpins = 0; // retries upon unpark of first threadboolean interrupted = false, first = false;Node pred = null; // predecessor of node when enqueuedfor (;;) {// 操作1: 如果node不是第一个节点,有前置节点,前置节点不是head节点,等待前置节点if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {if (pred.status < 0) {cleanQueue(); // 如果前置节点是取消状态的,清除前置节点continue;} else if (pred.prev == null) {Thread.onSpinWait(); // 如果队列中只有一个前置节点,尝试自旋等待continue;}}// 操作2: 如果是第一个节点,或没有前置节点,尝试加锁if (first || pred == null) {boolean acquired;try {if (shared)acquired = (tryAcquireShared(arg) >= 0);elseacquired = tryAcquire(arg);} catch (Throwable ex) {cancelAcquire(node, interrupted, false);throw ex;}if (acquired) {if (first) { // 如果第一个节点加锁成功,删除waiter对线程的引用,让head执行第一个节点node.prev = null;head = node;pred.next = null;node.waiter = null;if (shared)signalNextIfShared(node);if (interrupted)current.interrupt();}return 1;}}// 操作3: 如果节点为null,先创建节点if (node == null) { // allocate; retry before enqueueif (shared)node = new SharedNode();elsenode = new ExclusiveNode();} // 操作4: 将Node放入到CLH的等待队列else if (pred == null) { // try to enqueuenode.waiter = current;Node t = tail;node.setPrevRelaxed(t); // avoid unnecessary fenceif (t == null)tryInitializeHead();else if (!casTail(t, node))node.setPrevRelaxed(null); // back outelset.next = node;} // 操作5: 第一个节点,且自旋次数大于0,尝试自旋else if (first && spins != 0) {--spins; // reduce unfairness on rewaitsThread.onSpinWait();} else if (node.status == 0) {node.status = WAITING; // enable signal and recheck} // 操作6: 自旋失败,使用LockSupport挂起线程else {long nanos;spins = postSpins = (byte)((postSpins << 1) | 1);if (!timed)LockSupport.park(this);else if ((nanos = time - System.nanoTime()) > 0L)LockSupport.parkNanos(this, nanos);elsebreak;node.clearStatus();if ((interrupted |= Thread.interrupted()) && interruptible)break;}}return cancelAcquire(node, interrupted, interruptible);
}
2. 释放锁
相比申请锁的过程,释放就极其的简单了,直接调用tryRelease释放资源,释放重构后通过siganalNext通知等待队列,执行LockSupport.unpark唤醒线程。
public final boolean release(int arg) {if (tryRelease(arg)) {signalNext(head);return true;}return false;
}
3. 条件等待
AQS通过ConditionObject提供条件等待的支持,当我们调用Condition.await()时,程序经历了4步操作
- 操作1: 释放await关联的锁对象
- 操作2: 挂起线程
- 操作3: 修改节点、线程状态
- 操作4: 重新加锁
之前我们有提到过,一个持有锁的方法调用,只有在方法执行结束、方法执行异常、或者调用锁相关的条件等待时才会释放锁。这个操作从源码层面告诉我们为什么条件等待会释放锁。
public final void await() throws InterruptedException {ConditionNode node = new ConditionNode();// 操作1: 释放锁int savedState = enableWait(node);LockSupport.setCurrentBlocker(this); // for back-compatibility...while (!canReacquire(node)) {...if ((node.status & COND) != 0) { // 操作2: 阻塞线程if (rejected)node.block(); // 内部调用的还是LockSupport.parkelseForkJoinPool.managedBlock(node);} else {Thread.onSpinWait(); // awoke while enqueuing}}// 操作3: 执行到这里,说明线程已经被唤醒LockSupport.setCurrentBlocker(null);node.clearStatus();// 操作4: 重新加锁acquire(node, savedState, false, false, false, 0L);if (interrupted) {if (cancelled) {unlinkCancelledWaiters(node);throw new InterruptedException();}Thread.currentThread().interrupt();}
}private int enableWait(ConditionNode node) {if (isHeldExclusively()) {node.waiter = Thread.currentThread();...int savedState = getState(); // condition对象上会保存关联的锁的资源if (release(savedState)) // await时,会释放锁return savedState;}node.status = CANCELLED; // lock not held or inconsistentthrow new IllegalMonitorStateException();
}
2.3 应用案例
如果用AQS重写1.3中的案例FIFOMutex会比原来简单的多,我们来看一下重写后的代码
public class AQSFIFOMutex {private Sync sync;public AQSFIFOMutex() {sync = new Sync();}public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}private static class Sync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int n) {assert getState() == 0;return compareAndSetState(0, 1);}@Overrideprotected boolean tryRelease(int n) {assert getState() == 1;return compareAndSetState(1, 0);}@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}}
}
3. Java自带的锁实现
到现在我们已经大概了解锁的实现原理,后续的章节我们来看看JDK内置的锁实现类,有什么特点,要如何使用。
3.1 ReentrantLock
首先要看的是ReentranLock,ReentrantLock是一把可重入锁,它是基于AbstractQueuedSynchronizer实现的。如果一个线程已经持有了锁,再次调用申请锁的时候,这个调用不会被阻塞。
1. 接口定义
核心方法定义见下表
方法 | 说明 |
void lock() | 尝试加锁,加锁成功则返回,否则阻塞等待 |
void lockInterruptibly() | 同lock()方法,但是响应中断,在lockInterruptibly()执行期间,如果线程被中断,这个方法抛出InterruptedException |
boolean tryLock() | 尝试加锁但不阻塞,成功返回true,失败返回false |
boolean tryLock(long timeout, TimeUnit unit) | 尝试加锁,设置超时时间,如果给定时间内没加锁成功返回false,否则返回true |
void unlock() | 释放锁 |
2. 使用案例
ReentrantLock有两种典型的使用模式,阻塞和非阻塞,不管那种方式都应该把unlock放到finally中以保证unlock会被调用。
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {// 业务代码
} finally {lock.unlock();
}
如果使用tryLock代码应该这样写
if (lock.tryLock(2000, TimeUnit.MILLISECONDS)) {try {// 业务代码} finally {lock.unlock();}
}
3.2 ReentrantReadWriteLock
ReentrantReadWriteLock相比ReentrantLock的做了增强,支持读写锁,实现原理是将AQS的state分成了2部分,高16位用于保存共享锁,低16位用于保存独占锁,以这个逻辑实现AQS的tryAcquire、tryAcquireShared。我们来看一个案例,假设有两个线程,DoRead负责读数据,DoWrite负责写数据,我们现在想模拟的是两类场景
- writeLock被持有的时,所有的readLock无法加锁成功
- readLock可以被两个线程同时持有
为了做到这两点可观测,我们定义一个DoWrite,持有writeLock后休眠5s,启动DoWrite后,等1s在启动DoRead,为了让DoWrite先执行并先拿到写锁。
public static class DoWrite implements Runnable {private ReentrantReadWriteLock.WriteLock writeLock;public DoWrite(ReentrantReadWriteLock.WriteLock writeLock) {this.writeLock = writeLock;}public void run() {println("before write lock");writeLock.lock();try {println("under write lock , before sleep");sleep(5000);println("under write lock , after sleep");} finally {writeLock.unlock();}println("after write lock");}
}
以下是读锁的代码,以及测试启动的代码
public static class DoRead implements Runnable {private ReentrantReadWriteLock.ReadLock readLock;private int identity;public DoRead(int identity, ReentrantReadWriteLock.ReadLock readLock) {this.readLock = readLock;this.identity = identity;}public void run() {println("before read lock , identity: " + identity);readLock.lock();try {println("under read lock, before sleep , identity: " + identity);sleep(3000);println("under read lock, after sleep , identity: " + identity);} finally {readLock.unlock();}println("after read lock , identity: " + identity);}
}
// 测试代码
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
Thread tw = new Thread(new DoWrite(writeLock));
tw.start();
sleep(1000);
Thread tr1 = new Thread(new DoRead(1, readLock));
tr1.start();
Thread tr2 = new Thread(new DoRead(2, readLock));
tr2.start();tw.join();
tr1.join();
tr2.join();
我们来分析一下输入的日志,看看程序是按什么顺序执行的
3.3 StampedLock
JDK 8开始提供StampedLock,它支持3中锁模式,比较特别的是它不是可重入锁,因此在某个线程拿到锁之后,不能在这个线程内部再次申请锁
- 写锁writeLock,只在读写锁都没有被持有的情况下才能申请
- 读锁readLock,只在没有线程持有写锁时才能申请
- 乐观读tryOptimisticRead,读取锁的state状态,假设操作期间不会发生写锁
StampedLock的实现思路借鉴了有序读写锁的算法(Ordered RW locks),感兴趣的话可以查看对应的算法描述: Design, verification and applications of a new read-write lock algorithm | Proceedings of the twenty-fourth annual ACM symposium on Parallelism in algorithms and architectures。
按简化模型来理解的话,调用tryOptimisticRead时会获取stamp作为版本号,建立本地数据的快照,再验证版本号,如果版本号未变更则任务数据快照是有效的。我们来看一下下使用流程
- 获取stamp版本后,用的是state的值
- 建立业务数据快照
- 使用Unsafe.loadFence()建立内存屏障,保证进入第4步之前,业务数据快照已经读取完成
- 验证第1步读取的stamp版本号,验证通过说明stamp未被修改,任意的写锁会导致stamp被修改,stamp未修改说明期间没有申请过写锁,因此数据未被修改
- 如果验证通过,升级为读锁,再次执行第2步重新建立数据快照
- 释放读锁
- 使用数据快照,执行业务逻辑
通过这个执行步骤,我们可以知道tryOptimisticRead能提升性能的前提是大部分情况下validate(stamp)会成功,即业务是读多写少的情况。 业务数据快照只是基于内存屏障实现的,执行期间并没有锁,所以只能保证快照是某一时刻的数据,但不能保证是当前最新的数据。
下面我们举个例子来解释一下StampedLock怎么使用,假设我们有一个Statistic类,用来统计数字的个数、总和,然后提供平均值
public class Statistic {private final StampedLock lock = new StampedLock();private int count;private int total;public void newNum(int num) {long stamp = lock.writeLock(); // 写锁try {count++;total += num;} finally {lock.unlock(stamp);}}public double avg() {long stamp = lock.tryOptimisticRead(); // 乐观读int tempCount = count, tempTotal = total; // 快照数据if (!lock.validate(stamp)) {stamp = lock.readLock(); // 读锁try {tempCount = count;tempTotal = total;} finally {lock.unlock(stamp);}}return tempTotal * 1.0 / tempCount; // 使用快照数据做业务计算}
}