文章目录
- 初识读写锁
- ReentrantReadWriteLock类结构
- 注意事项
- ReentrantReadWriteLock源码分析
- 读写状态的设计
- HoldCounter 计数器
- 读锁的获取
- 读锁的释放
- 写锁的获取
- 写锁的释放
- 锁降级
初识读写锁
Java中的锁——ReentrantLock
和synchronized
都是排它锁,意味着这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,在写线程访问的时候其他的读线程和写线程都会被阻塞。读写锁维护一对锁(读锁和写锁),通过锁的分离,使得并发性提高。
关于读写锁的基本使用:在不使用读写锁的时候,一般情况下我们需要使用synchronized
搭配等待通知机制完成并发控制(写操作开始的时候,所有晚于写操作的读操作都会进入等待状态),只有写操作完成并通知后才会将等待的线程唤醒继续执行。
如果改用读写锁实现,只需要在读操作的时候获取读锁,写操作的时候获取写锁。当写锁被获取到的时候,后续操作(读写)都会被阻塞,只有在写锁释放之后才会执行后续操作。并发包中对ReadWriteLock
接口的实现类是ReentrantReadWriteLock
,这个实现类具有下面三个特点。
-
具有与
ReentrantLock
类似的公平锁和非公平锁的实现:默认的支持非公平锁,对于二者而言,非公平锁的吞吐量优于公平锁; -
支持重入:读线程获取读锁之后能够再次获取读锁,写线程获取写锁之后能再次获取写锁,也可以获取读锁;
-
锁能降级:遵循获取写锁、获取读锁再释放写锁的顺序,即写锁能够降级为读锁。
ReentrantReadWriteLock类结构
ReentrantReadWriteLock
是可重入的读写锁实现类。在它内部,维护了一对相关的锁,一个用于读操作,另一个用于写操作。只要没有 写线程,读锁可以由多个 读线程 同时持有。也就是说,写锁是独占的,读锁是共享的。
// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 公平锁或非公平锁
final Sync sync;
注意事项
- 读锁不支持条件变量
- 重入时升级不支持:持有读锁的情况下去获取写锁,会导致获取永久等待
- 重入时支持降级: 持有写锁的情况下可以去获取读锁
ReentrantReadWriteLock源码分析
读写状态的设计
设计的精髓:用一个变量如何维护多种状态
在 ReentrantLock
中,使用 AQS
中Node
结点 int
类型的 state
来表示同步状态,表示锁被一个线程重复获取的次数。
但是,读写锁 ReentrantReadWriteLock
内部维护着一对读写锁,如果要用一个变量维护多种状态,需要采用 “按位切割使用” 的方式来维护这个变量,将其切分为两部分:高16位表示读,低16位表示写。
当state
值不等于0的时候,如果写状态(state & 0x0000FFFF)
等于0的话,读状态是大于0的,表示读锁被获取;如果写状态不等于0的话,读锁没有被获取。这个特点也在源码中实现。
exclusiveCount(int c)
方法,获得持有写状态的锁的次数。
sharedCount(int c)
方法,获得持有读状态的锁的线程数量。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;/** 返回读锁数量 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写锁数量 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
不同于写锁,读锁可以同时被多个线程持有。而每个线程持有的读锁支持重入的特性,所以需要对每个线程持有的读锁的数量单独计数,这就需要用到 HoldCounter
计数器
HoldCounter 计数器
- 读锁的内在机制其实就是一个共享锁。
- 一次共享锁的操作就相当于对
HoldCounter
计数器的操作。 - 获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。
- 只有当线程获取共享锁后才能对共享锁进行释放、重入操作。
读锁的获取
实现共享式同步组件的同步语义需要通过重写AQS
的tryAcquireShared
方法和tryReleaseShared方法。
// 读锁加锁操作
public final void acquireShared(int arg) {// tryAcquireShared,尝试获取锁资源,获取到返回1,没获取到返回-1if (tryAcquireShared(arg) < 0)// doAcquireShared 前面没拿到锁,这边需要排队~doAcquireShared(arg);
}// tryAcquireShared方法
protected final int tryAcquireShared(int unused) {// 获取当前线程Thread current = Thread.currentThread();// 拿到stateint c = getState();// 拿写锁标识,如果 !=0,代表有写锁if (exclusiveCount(c) != 0 &&// 如果持有写锁的不是当前线程,排队去!getExclusiveOwnerThread() != current)// 排队!return -1;// 没有写锁!// 获取读锁信息int r = sharedCount(c);// 公平锁: 有人排队,返回true,直接拜拜,没人排队,返回false// 非公平锁:正常的逻辑是非公平直接抢,因为是读锁,每次抢占只要CAS成功,必然成功// 这就会出现问题,写操作无法在读锁的情况抢占资源,导致写线程饥饿,一致阻塞…………// 非公平锁会查看next是否是写锁的,如果是,返回true,如果不是返回falseif (!readerShouldBlock() &&// 查看读锁是否已经达到了最大限制r < MAX_COUNT &&// 以CAS的方式,对state的高16位+1compareAndSetState(c, c + SHARED_UNIT)) {// 拿到锁资源成功!!!if (r == 0) {// 第一个拿到锁资源的线程,用first存储firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {// 我是锁重入,我就是第一个拿到读锁的线程,直接对firstReaderHoldCount++记录重入的次数firstReaderHoldCount++;} else {// 不是第一个拿到锁资源的// 先拿到cachedHoldCounter,最后一个线程的重入次数HoldCounter rh = cachedHoldCounter;// rh == null: 我是第二个拿到读锁的!// 或者发现之前有最后一个来的,但不是我,将我设置为最后一个。if (rh == null || rh.tid != getThreadId(current))// 获取自己的重入次数,并赋值给cachedHoldCountercachedHoldCounter = rh = readHolds.get();// 之前拿过,现在如果为0,赋值给TLelse if (rh.count == 0)readHolds.set(rh);// 重入次数+1,// 第一个:可能是第一次拿// 第二个:可能是重入操作rh.count++;}return 1;}return fullTryAcquireShared(current);
}// 通过tryAcquireShared没拿到锁资源,也没返回-1,就走这
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {// 拿stateint c = getState();// 现在有互斥锁,不是自己,拜拜!if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;// 公平:有排队的,进入逻辑。 没排队的,过!// 非公平:head的next是写不,是,进入逻辑。 如果不是,过!} else if (readerShouldBlock()) {// 这里代码特别乱,因为这里的代码为了处理JDK1.5的内存泄漏问题,修改过~// 这个逻辑里不会让你拿到锁,做被阻塞前的准备if (firstReader == current) {// 什么都不做} else {if (rh == null) {// 获取最后一个拿到读锁资源的rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {// 拿到我自己的记录重入次数的。rh = readHolds.get();// 如果我的次数是0,绝对不是重入操作!if (rh.count == 0)// 将我的TL中的值移除掉,不移除会造成内存泄漏readHolds.remove();}}// 如果我的次数是0,绝对不是重入操作!if (rh.count == 0)// 返回-1,等待阻塞吧!return -1;}}// 超过读锁的最大值了没?if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");// 到这,就CAS竞争锁资源if (compareAndSetState(c, c + SHARED_UNIT)) {// 跟tryAcquireShared一模一样if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; }return 1;}}
}
- 读锁共享,读读不互斥
- 读锁可重入,每个获取读锁的线程都会记录对应的重入数
- 读写互斥,锁降级场景除外
- 支持锁降级,持有写锁的线程,可以获取读锁,但是后续要记得把读锁和写锁读释放
readerShouldBlock
读锁是否阻塞实现取决公平与非公平的策略
读锁的释放
获取到读锁,执行完临界区后,要记得释放读锁(如果重入多次要释放对应的次数),不然会阻塞其他线程的写操作。
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();//如果当前线程是第一个获取读锁的线程if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--; //重入次数减1} else { //不是第一个获取读锁的线程HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count; //重入次数减1}for (;;) { //cas更新同步状态int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}
}
写锁的获取
public final void acquire(int arg) {// 尝试获取锁资源(看一下,能否以CAS的方式将state 从0 ~ 1,改成功,拿锁成功)// 成功走人// 不成功执行下面方法if (!tryAcquire(arg) &&// addWaiter:将当前没按到锁资源的,封装成Node,排到AQS里// acquireQueued:当前排队的能否竞争锁资源,不能挂起线程阻塞acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}// 读写锁的写锁,获取流程
protected final boolean tryAcquire(int acquires) {// 拿到当前线程Thread current = Thread.currentThread();// 拿到stateint c = getState();// 拿到了写锁的低16位标识wint w = exclusiveCount(c);// c != 0:要么有读操作拿着锁,要么有写操作拿着锁if (c != 0) {// 如果w == 0,代表没有写锁,拿不到!拜拜!// 如果w != 0,代表有写锁,看一下拿占用写锁是不是当前线程,如果不是,拿不到!拜拜!if (w == 0 || current != getExclusiveOwnerThread())return false;// 到这,说明肯定是写锁,并且是当前线程持有// 判断对低位 + 1,是否会超过MAX_COUNT,超过抛Errorif (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 如果没超过锁重入次数, + 1,返回true,拿到锁资源。setState(c + acquires);return true;}// 到这,说明c == 0// 读写锁也分为公平锁和非公平锁// 公平:看下排队不,排队就不抢了// 走hasQueuedPredecessors方法,有排队的返回true,没排队的返回false// 非公平:直接抢!// 方法实现直接返回falseif (writerShouldBlock() ||// 以CAS的方式,将state从0修改为 1!compareAndSetState(c, c + acquires))// 要么不让抢,要么CAS操作失败,返回falsereturn false;// 将当前持有互斥锁的线程,设置为自己setExclusiveOwnerThread(current);return true;
}
- 写锁是一个支持重进入的排它锁。
- 如果当前线程已经获取了写锁,则增加写状态。
- 如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者 该线程不是已经获取写锁的线程, 则当前线程进入等待状态。
- 写锁的获取是通过重写
AQS
中的tryAcquire
方法实现的。
写锁的释放
写锁释放通过重写AQS
的tryRelease
方法实现
// 写锁的释放锁
public final boolean release(int arg) {// 只有tryRealse是读写锁重新实现的方法,其他的和ReentrantLock一致if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}// 读写锁的真正释放
protected final boolean tryRelease(int releases) {// 判断释放锁的线程是不是持有锁的线程if (!isHeldExclusively())// 不是抛异常throw new IllegalMonitorStateException();// 对state - 1int nextc = getState() - releases;// 拿着next从获取低16位的值,判断是否为0boolean free = exclusiveCount(nextc) == 0;// 返回trueif (free)// 将持有互斥锁的线程信息置位nullsetExclusiveOwnerThread(null);// 将-1之后的nextc复制给statesetState(nextc);return free;
}
锁降级
锁降级是指将持有写锁的线程获取读锁后再释放写锁的过程。这样可以在保持数据一致性的同时,允许其他线程同时进行读操作,提高并发性能。
Java中的ReentrantReadWriteLock类支持锁降级。按照锁降级的规则,持有写锁的线程可以首先获取读锁,然后再释放写锁,以将写锁降级为读锁,不支持锁升级。
例如,以下是一个示例代码,演示了锁降级的过程:
import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReentrantReadWriteLockTest {private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private int data = 0;public void processWriteAndReadData() {lock.writeLock().lock(); // 获取写锁try {System.out.println(Thread.currentThread().getName() + "线程获取写锁");Thread.sleep(3000);// 执行数据处理操作data++;System.out.println(Thread.currentThread().getName() +"线程数据处理完成,当前值为: " + data);// 锁降级:获取读锁lock.readLock().lock();System.out.println(Thread.currentThread().getName() + "线程获取读锁");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.writeLock().unlock(); // 释放写锁System.out.println(Thread.currentThread().getName() + "线程释放写锁");}try {Thread.sleep(3000);// 执行读操作System.out.println(Thread.currentThread().getName() + "线程读取数据: " + data);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.readLock().unlock(); // 释放读锁System.out.println(Thread.currentThread().getName() + "线程释放读锁");}}public void processReadData() {lock.readLock().lock();try {System.out.println(Thread.currentThread().getName() + "线程获取读锁");System.out.println(Thread.currentThread().getName() + "线程读取数据,当前值为: " + data);} finally {lock.readLock().unlock();System.out.println(Thread.currentThread().getName() + "线程释放读锁");}}public static void main(String[] args) {ReentrantReadWriteLockTest reentrantReadWriteLockTest = new ReentrantReadWriteLockTest();new Thread(() -> {reentrantReadWriteLockTest.processWriteAndReadData();}, "t1").start();try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}new Thread(() -> {reentrantReadWriteLockTest.processReadData();}, "t2").start();}
}
以下是输出结果:
t1线程获取写锁
t1线程数据处理完成,当前值为: 1
t1线程获取读锁
t1线程释放写锁
t2线程获取读锁
t2线程读取数据,当前值为: 1
t2线程释放读锁
t1线程读取数据: 1
t1线程释放读锁
可以发现,通过锁降级,我们能够在保持数据一致性的同时,允许其他线程进行读操作,提高了并发性能。这是读写锁的一个重要特性。
- 个人公众号
- 个人小游戏