目录
1. LockSupport 类
2. 如何设计一把独占锁?
3. 管程 — Java同步的设计思想
3.1 MESA模型
为什么条件队列的线程需要移到同步队列再唤醒运行?
4. AQS原理分析
4.1 什么是AQS
4.2 AQS核心结构
AQS内部维护属性volatile int state
4.3 AQS定义两种队列
同步等待队列
条件等待队列
5. ReentrantLock源码分析
源码阅读过程中要关注的问题
ReentrantLock源码分析
1. LockSupport 类
LockSupport 类是 Java 并发包提供的工具类,用于线程的阻塞和唤醒。它通过许可证(permit)的方式来控制线程的阻塞和唤醒,每个线程都有一个许可证。park() 方法用于消耗许可证,如果线程有可用的许可证,则消耗许可证后立即返回,否则线程进入阻塞状态。unpark(Thread thread) 方法用于释放许可证,唤醒指定线程。每个线程最多只能有一个许可证,重复调用 unpark() 方法不会增加许可证的数量。
当调用LockSupport.park时,表示当前线程将会阻塞,直至获得许可,当调用LockSupport.unpark时,必须把等待获得许可的线程作为参数进行传递,用于唤醒此线程。
public class ParkAndUnparkDemo {public static void main(String[] args) {ParkAndUnparkThread myThread = new ParkAndUnparkThread(Thread.currentThread());myThread.start();System.out.println("before park");// 获取许可LockSupport.park();System.out.println("after park");}
}class ParkAndUnparkThread extends Thread {private Object object;public ParkAndUnparkThread(Object object) {this.object = object;}public void run() {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("before unpark");// 释放许可LockSupport.unpark((Thread) object);System.out.println("after unpark");}
}//运行结果
before park
before unpark
after unpark
after park
2. 如何设计一把独占锁?
- 设置状态state:是否加锁(用0,1表示),可以使用CAS实现多线程加锁
- 加锁操作:加锁操作是获取锁的过程,需要确保在多线程环境下只有一个线程能够成功获取锁。加锁操作通常使用原子性的操作,如 CAS(Compare-and-Swap),来更新锁的状态。
- 解锁操作:解锁操作是释放锁的过程,需要将锁的状态设置为未获取状态,以便其他线程可以获取锁。解锁操作也要确保在多线程环境下保持线程安全。
- 支持可重入:可重入是指允许同一个线程多次获取同一把锁而不会被阻塞。设计独占锁时,可以添加一个计数器来记录当前线程已获取锁的次数,在解锁时递减计数器,直到计数器为零时才真正释放锁。
- 线程阻塞:当锁已被其他线程获取时,需要使当前线程进入阻塞状态,等待锁的释放。可以使用等待/通知机制或者使用 Java 提供的锁同步器(如AbstractQueuedSynchronizer)来实现线程的阻塞和唤醒。
- 等待队列:竞争锁失败的线程数据 Node(Thread) 队列
- 等待唤醒某个线程:可以使用LockSupport.park/unpark
共性的逻辑:定义个抽象类
入口等待队列和条件等待队列 入队出队操作
修改cas的操作
等待唤醒机制:
synchronizd + object.wait()/object.notify()/object.notifyAll
ReentrantLock + condition.await()/conditon.signal/signalAll
/*
*类说明:自定义独占锁
*/
public class BubbleLock extends AbstractQueuedSynchronizer{@Overrideprotected boolean tryAcquire(int unused) {// CAS 加锁,将 state 从 0 设置为 1if (compareAndSetState(0, 1)) {// 设置当前线程为独占锁的拥有者线程setExclusiveOwnerThread(Thread.currentThread());return true; // 成功获取锁,返回 true}return false; // 获取锁失败,返回 false}@Overrideprotected boolean tryRelease(int unused) {// 释放锁,将独占锁的拥有者线程设置为 null,并将 state 设置为 0 表示锁已释放setExclusiveOwnerThread(null);setState(0);return true; // 成功释放锁,返回 true}// 加锁操作,调用 acquire(1) 方法实际获取锁public void lock() {acquire(1);//只允许一个线程获取锁,这次是自定义独占锁}// 尝试加锁操作,调用 tryAcquire(1) 方法尝试获取锁public boolean tryLock() {return tryAcquire(1);}// 释放锁操作,调用 release(1) 方法实际释放锁public void unlock() {release(1);}// 检查当前锁是否被线程持有public boolean isLocked() {return isHeldExclusively();}
}
public class SyncDemo {private static int counter = 0;private final static BubbleLock lock = new BubbleLock();public static void increment() {lock.lock();try {counter++;}finally {lock.unlock();}}public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {for (int i = 0; i < 5000; i++) {increment();}}, "t1");Thread t2 = new Thread(() -> {for (int i = 0; i < 5000; i++) {increment();}}, "t2");t1.start();t2.start();t1.join();t2.join();System.out.println(counter);}
}//运行结果
10000
3. 管程 — Java同步的设计思想
管程:指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。
互斥:同一时刻只允许一个线程访问共享资源。
同步:线程之间如何通信、协作。
3.1 MESA模型
在管程的发展史上,先后出现过三种不同的管程模型,分别是Hasen模型、Hoare模型和MESA模型。现在正在广泛使用的是MESA模型。
管程中引入了条件变量的概念,而且每个条件变量都对应有一个等待队列。条件变量和等待队列的作用是解决线程之间的同步问题。
为什么条件队列的线程需要移到同步队列再唤醒运行?
通常情况下,条件队列是用于保存等待条件满足的线程的队列,并不直接用于唤醒和执行线程。
- 为了共用一套唤醒逻辑,提高代码复用。
- 避免竞争条件:如果直接从条件队列唤醒线程,多个线程可能会同时被唤醒并竞争执行。这可能导致竞争条件和数据不一致的问题。确保条件满足:条件队列是用于保存等待某个条件满足的线程,直接唤醒线程可能会导致条件不满足的情况下线程继续执行。
4. AQS原理分析
4.1 什么是AQS
java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队 列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称 AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。 JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的
- 一般是通过一个内部类Sync继承 AQS
- 将同步器所有调用都映射到Sync对应的方法
AQS具备的特性:
- 阻塞等待队列
- 共享/独占
- 公平/非公平
- 可重入
- 允许中断
4.2 AQS核心结构
AQS内部维护属性volatile int state
- state表示资源的可用状态
State三种访问方式:
- getState()
- setState()
- compareAndSetState()
定义了两种资源访问方式:
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
- Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
4.3 AQS定义两种队列
同步等待队列: 主要用于维护获取锁失败时入队的线程。
条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁。
AQS 定义了5个队列中节点状态:
- 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
- CANCELLED,值为1,表示当前的线程被取消;
- SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
- CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
- PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
AQS 依赖CLH同步队列来完成同步状态的管理:
- 当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
- 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
- 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
条件等待队列
AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:
- 调用await方法阻塞线程;
- 当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)
5. ReentrantLock源码分析
ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。
ReentrantLock基本使用方式:
public class ReentrantLockTest {private final ReentrantLock lock = new ReentrantLock();// ...public void doSomething() {lock.lock(); // block until condition holdstry {// ... method body} finally {lock.unlock();}}
}
源码阅读过程中要关注的问题
1.公平和非公平锁,可重入锁是如何实现的
2.设计的精髓:并发场景下入队和出队操作是如何设计的
- 线程竞争锁失败入队阻塞逻辑实现
- 释放锁的线程唤醒阻塞线程出队竞争锁的逻辑实现
ReentrantLock源码分析
下面以模拟抢票作为例子进行分析
/*
*类说明:模拟抢票场景
*/
public class ReentrantLockDemo {private final ReentrantLock lock = new ReentrantLock();//默认非公平private static int tickets = 8; // 总票数public void buyTicket() {lock.lock(); // 获取锁try {if (tickets > 0) { // 还有票 读try {Thread.sleep(10); // 休眠10ms} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票"); //写//buyTicket();//共享锁} else {System.out.println("票已经卖完了," + Thread.currentThread().getName() + "抢票失败");}} finally {lock.unlock(); // 释放锁}}public static void main(String[] args) {ReentrantLockDemo ticketSystem = new ReentrantLockDemo();for (int i = 1; i <= 10; i++) {Thread thread = new Thread(() -> {ticketSystem.buyTicket(); // 抢票}, "线程" + i);// 启动线程thread.start();}try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("剩余票数:" + tickets);}
}//运行结果
线程1购买了第8张票
线程2购买了第7张票
线程3购买了第6张票
线程8购买了第5张票
线程5购买了第4张票
线程6购买了第3张票
线程7购买了第2张票
线程4购买了第1张票
票已经卖完了,线程9抢票失败
票已经卖完了,线程10抢票失败
剩余票数:0
CAS操作将 state 的值从 0 设置为 1。其中0表示锁是未被占用的状态,1表示锁已经被某个线程占用。
setExclusiveOwnerThread(Thread.currentThread());将当前线程设置为独占锁的拥有者线程,如果 CAS 操作失败,表示锁已被其他线程占用,则当前线程无法直接获取锁。
在这种情况下,当前线程会以独占模式获取acquire(1),忽略中断。 通过调用至少一次tryAcquire(int)实现,成功返回。
获取锁返回
未获取到锁的线程继续持续以下逻辑
通过非公平方式
为当前线程和给定模式创建队列节点(如果尾节点tail=null,需要初始化队列,设置前后prev,next指针)
把线程节点加入到队列中,设置头尾head,tail节点
返回前一个节点
在阻塞前再次通过非公平方式tryAcquire(arg)获取锁
shouldParkAfterFailedAcquire(p, node),阻塞入队前准备,设置waitStatus=-1
如果前一节点是head头节点,这里还可以在阻塞前再次通过非公平方式tryAcquire(arg)获取锁
获取锁失败,调用LockSupport.park(this);从RUNNING->WAIT
释放锁
以独占模式释放锁, 如果tryRelease(int)返回true,则通过解除阻塞一个或多个线程来实现。 该方法可以用于实现方法Lock.unlock()
int c = getState() - releases;
tryRelease 方法会根据传入的 releases 参数计算新的锁状态 c。releases 表示要释放的锁的数量,对于 ReentrantLock 来说,通常是1。
判断计算后的新状态 c 是否等于0,即判断锁是否已完全释放。如果 c == 0,表示锁已完全释放,此时将 exclusiveOwnerThread 字段置为 null,表示当前锁没有拥有者。并且将 free 设置为 true,表示成功释放了锁。
setState(c) ,设置state值,state==0表示锁已释放
调用unparkSuccessor(h);方法
compareAndSetWaitStatus(node, ws, 0)=> cas操作把当前线程waitStatus从-1更新为0,为出队列做准备
LockSupport.unpark(s.thread)=> 唤醒在阻塞的线程 =>此时下一个线程已经被唤醒WAIT->RUNNING
final Node p = node.predecessor();获取前一个节点
tryAcquire(arg),使用非公平方式nonfairTryAcquire
如果CAS 操作成功,表示当前线程成功获取了锁,将当前线程设置为独占锁的拥有者线程,并返回 true。
setHead(node);设置当前节点为头节点,把prev,next置空方便GC回收
以上过程的流程图: