文章目录
- 悲观锁 和 乐观锁
- 1.基于CAS实现乐观锁
- 2.自旋锁
- 2.1.不可重入自旋锁
- 2.2.可重入自旋锁
- 2.3.CLH自旋锁
悲观锁 和 乐观锁
Java中的synchronized就是悲观锁的一个实现,悲观锁可以确保无论哪个线程持有锁,都能独占式的访问临界区代码,虽然悲观锁的实现比较简单,但是还是会存在不少问题。
悲观锁总是假设会发生最坏的情况,每次线程去读取数据的时候,也会上锁。这样其他线程在读取数据的时候也会被阻塞,直到它拿到锁,传统的关系型数据库就用到了很多的悲观锁,如行锁
,表锁
,读锁
,写锁
。
悲观锁会存在以下几个问题:
- 在多线程环境下, 加锁和释放锁都会导致线程
上下文的切换
以及调度延时
,会引发一系列性能问题。 - 一个线程持有锁的时候,会导致其他抢锁线程都被
临时挂起
。 - 如果一个
线程优先级高的线程
等待一个优先级低的线程
释放锁,就会导致线程优先级倒置
,从而引发性能风险。
解决以上悲观锁的方式,就是使用乐观锁去替代 悲观锁。乐观锁其实时一种思想,在使用乐观锁的时候,每次线程
都去读取
的数据的时候都认为其他线程不会进行修改
,所以不会上锁
,仅仅在更新
的时候判断一下其他线程有没有去更新这个数据
。
数据库操作中的带版本号的数据更新,JUC原子类中都使用了乐观锁的方式来提高性能。
这里针对于悲观锁,我们就就不在举例说明了,感兴趣的话可以去学习一下看一下之前的synchronized章节。
1.基于CAS实现乐观锁
乐观锁的实现步骤主要就两个
(1)冲突监测
(2)数据更新
乐观锁时一种比较典型的CAS原子操作,JUC强大的高并发性能就是建立在CAS原子操作上的,CAS操作中包含
三个操作数
(1)需要操作的内存位置(V)
(2)进行比较的预期原值(A)
(3)拟写入的新值(B)
如果
内存 V的位置的值
和预期原值 A
比较一致
,那么CPU会自动将该位置的值替换为新的值 B
,否则
CPU不做任何操作
下面我们通过一个案例来了解一下CAS中的乐观锁,在Java中,乐观锁的一种常见实现方式是借助于java.util.concurrent.atomic
包下的原子类,比如AtomicInteger
。下面我们通过一个简单的银行账户转账的例子来展示如何使用CAS(Compare-And-Swap)操作实现乐观锁。
/*** CAS乐观锁的一个实现*/
public class OptimismLockDemo {private final Logger logger = LoggerFactory.getLogger(OptimismLockDemo.class);@Test@DisplayName("测试JUC的CAS操作")public void testOptimismLock() {// 初始余额设置100Bank bank = new Bank(100);// 线程1存钱 100Thread t1 = new Thread(() -> {if (bank.updateCount(100)) {logger.error("存钱100成功!");} else {logger.error("取钱成功!");}}, "t1");// 线程2 取钱 100Thread t2 = new Thread(() -> {if (bank.updateCount(-100)) {logger.error("取钱100成功!");} else {logger.error("取钱失败!");}}, "t2");// 启动t1 和 t2 线程t1.start();t2.start();// 等待全部线程执行完毕try {t2.join();t1.join();} catch (InterruptedException e) {throw new RuntimeException(e);}}static class Bank {// 定义余额private AtomicInteger balance;Bank(int initBalance) {this.balance = new AtomicInteger(initBalance);}public boolean updateCount(int money) {// 获取当前余额int currentMoney = balance.get();// 计算出预期的结果int newMoney = currentMoney + money;// 如果是转出且余额不足,直接返回false,否则尝试更新余额if (money < 0 && newMoney < 0) {return false;}// 尝试更新余额,这里就是CAS操作的核心// compareAndSet比较并交换,如果当前余额仍为currentMoney 则更新为newMoney,否则就告知更新失败return balance.compareAndSet(currentMoney, newMoney);}}
}
在这个例子中,我们创建了一个初始余额为100的账户,然后启动了两个线程分别执行转出100元和转入100元的操作。由于转账方法基于CAS实现,因此在并发环境下能够确保转账的原子性和正确性,避免了传统锁机制可能导致的性能瓶颈。
2.自旋锁
在实际情况中,一个成功的数据更新操作可能需要多次执行CAS(比较并交换)操作,这就是所谓的CAS自旋。通过反复尝试,直至更新成功,这样的机制无需锁定资源,实现了多线程环境下变量状态的高效协同,我们称之为“无锁同步”或“非阻塞同步”。这种方式,正是乐观锁的核心思想之一,它体现了在并发编程领域追求高性能与低冲突的“乐观”策略。
2.1.不可重入自旋锁
自旋锁(SpinLock)的基本含义就是:当一个线程在获取锁 的时候,如果锁已经被其他线程获取,那么该调用线程就一致那里循环监测锁是否已经被释放,一直到获取那个锁之后才会退出循环。
package com.hrfan.thread.lock.type.spin;import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 自旋锁* 不可重入锁*/
public class SpinLockDemo {private static final Logger log = LoggerFactory.getLogger(SpinLockDemo.class);@Test@DisplayName("测试不可重入自旋锁")public void testSpinLock() {SpinLock spinLock = new SpinLock();// 这里为了模拟多线程的一个并发性能 我们使用线程池来进行测试ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 3; i++) {executorService.submit(() -> {spinLock.lock();try {// 模拟执行操作log.error("抢锁成功!执行操作");} finally {spinLock.unlock();}});}executorService.shutdown();}static class SpinLock implements Lock {/*** 当前锁的拥有者*/private AtomicReference<Thread> owner = new AtomicReference<>();@Overridepublic void lock() {// 抢占锁// 获取当前线程Thread thread = Thread.currentThread();// 开始抢占锁 (不断cas 直到owner的值为null 才更新为当前线程)while (!owner.compareAndSet(null, thread)) {log.error("抢锁失败!让出剩余CPU时间片!");// 如果抢锁失败(即当前锁已被其他线程持有),则让出CPU时间片给其他线程,稍后再试Thread.yield();}}@Overridepublic void unlock() {// 通过代码 我们可以发现,SpinLock是不支持重入的,在一个线程获取锁没有释放之前,它不可能再次获得锁。// 释放锁Thread thread = Thread.currentThread();// 只有拥有者才能够释放锁if (thread == owner.get()) {log.error("释放锁成功!");// 这里设置为拥有者为空,不需要在使用compareAndSet() 因为上面已经判断owner.set(null);}}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}@Overridepublic Condition newCondition() {return null;}}
}
2.2.可重入自旋锁
为了实现可重入自旋锁,这里引入一个计数器,用来记录一个线程获取锁的次数。
package com.hrfan.thread.lock.type.spin;import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 可重入自旋锁*/
public class ReentrantSpinLockDemo {ReentrantSpinLock lock = new ReentrantSpinLock();@Test@DisplayName("测试可重入锁")public void test() throws InterruptedException {new Thread(() -> {lock.lock();try {log.error("开始执行任务! 重入次数{}", lock.getCount());reentrantSpinLock();} finally {lock.unlock();}}, "t1").start();Thread.sleep(1000);}public void reentrantSpinLock() {lock.lock();try {log.error("开始执行重入方法! 重入次数{}", lock.getCount());} finally {lock.unlock();}}private static final Logger log = LoggerFactory.getLogger(ReentrantSpinLockDemo.class);static class ReentrantSpinLock implements Lock {/*** 当前锁的拥有者* 使用拥有者的Thread作为同步状态,而不是使用一个简单的整数作为同步状态*/private AtomicReference<Thread> owner = new AtomicReference<>();/*** 记录一个线程同步获取锁的状态*/private int count = 0;@Overridepublic void lock() {// 抢占锁Thread thread = Thread.currentThread();// 如果是重入 增加重入次数返回if (thread == owner.get()) {count++;return;}// 如果不是重入 那么进行自旋操作while (!owner.compareAndSet(null, thread)) {log.error("抢锁失败!让出剩余CPU时间片!");// 如果抢锁失败(即当前锁已被其他线程持有),则让出CPU时间片给其他线程,稍后再试Thread.yield();}}@Overridepublic void unlock() {// 只有拥有者才能释放锁Thread thread = Thread.currentThread();if (thread == owner.get()) {// 如果发现count的次数不是 0减少重入次数 并返回if (count > 0) {count--;} else {// 直接将拥有者设置为空owner.set(null);}}}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}@Overridepublic Condition newCondition() {return null;}public int getCount() {return count;}}
}
自旋锁的特点:线程在获取锁的时候,如果锁被其他线程持有,当前线程循环等待,直到获取锁。线程抢锁期间线程的状态不会改变
,一直时运行状态,在操作系统层面线程处于用户态。
自旋锁的问题:在争用激烈的场景下,如果某个线程持有的锁的时间太长,就会导致其他自旋线程的CPU资源耗尽
,另外,如果大量的线程进行空自旋,还可能导致硬件层面的总线风暴
2.3.CLH自旋锁
前面提到了CAS自旋
可能引发的一些性能问题
,尤其是它可能导致CPU层面的总线争用(总线风暴)
。面对这一挑战,Java并发包(JUC)中的轻量级锁如何有效规避这一问题呢?其解决方案在于利用队列机制对试图获取锁的线程进行有序排列
,从而大幅度减少了CAS操作的频次,从根本上减轻了对CPU和总线的压力。
具体而言,CLH锁
作为一种典型的基于队列
(常采用单向链表形式)实现的自旋锁机制
,为这一问题提供了高效的解答。在CLH锁的模型中,任何一个请求加锁的线程首先会尝试通过CAS操作将自己的信息节点添加到队列的末端
。一旦成功入队
,该线程随后仅需在其直接前驱节点
上执行相对简单的自旋
等待操作,直至前驱释放锁资源。
这一设计精妙之处在于,CLH锁仅在节点初次尝试加入队列时涉及一次CAS操作
。一旦入队完成
,后续的自旋过程无需进一步的CAS介入
,仅需执行标准的自旋逻辑即可。因此,在高度竞争的并发场景下,CLH锁能够显著削减CAS操作的总量,有效避开了可能引发的总线风暴现象。
在Java并发工具包(JUC)中,显式锁的实现底层依赖于AbstractQueuedSynchronizer(AQS),这一框架实质上是对CLH锁原理的一种扩展与变体应用,进一步强化了锁的管理和线程调度能力,展现了高级的并发控制策略。
上面提到的简单自旋 其实就是
普通自旋是指线程在未能立即获取锁时,不进行复杂的操作如CAS(比较并交换)而是执行简单的循环检查某个条件是否满足(比如前驱节点是否已经释放了锁)。
下面我们通过一个案例来学习一下CLH自旋锁
package com.hrfan.thread.lock.type.spin;import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.sound.sampled.FloatControl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** CLH版本自旋锁*/
public class CLHSpinLockDemo {private static final Logger log = LoggerFactory.getLogger(CLHSpinLockDemo.class);public static int count = 0;@Test@DisplayName("测试CLH自旋锁")public void test() {long startTime = System.currentTimeMillis();// 创建CLH自旋锁CLHSpinLock lock = new CLHSpinLock();// ReentrantLock lock = new ReentrantLock();// 线程数量int threads = 10;// 每次执行的次数int turns = 10000;// 通过线程池来创建线程ExecutorService executorService = Executors.newFixedThreadPool(threads);// 创建计时器CountDownLatch latch = new CountDownLatch(threads);for (int i = 0; i < threads; i++) {executorService.submit(() -> {for (int j = 0; j < turns; j++) {// 创建锁lock.lock();try {count++;} finally {lock.unlock();}}// 更新计时器latch.countDown();});}// 等待全部线程执行完毕try {latch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}long endTime = System.currentTimeMillis();log.error("-------------线程执行结束,最终结果为:{}", count);log.error("耗时:{}", endTime - startTime);}static class Node {// 当前线程正在抢占锁,或者已经占有锁 true// 当前线程已经释放锁 下一个线程可以占有锁了 falsevolatile boolean locked;// 前驱节点 需要监听其lock字段Node preNode;public Node(boolean locked, Node preNode) {this.locked = locked;this.preNode = preNode;}// 空节点public static final Node EMPTY = new Node(false, null);public boolean isLocked() {return locked;}public void setLocked(boolean locked) {this.locked = locked;}public Node getPreNode() {return preNode;}public void setPreNode(Node preNode) {this.preNode = preNode;}}static class CLHSpinLock implements Lock {// 创建当前节点的本地变量private static ThreadLocal<Node> currentNodeLocal = new ThreadLocal<Node>();// CLH队列的尾指针,使用AtomicReference,方法CAS操作AtomicReference<Node> tail = new AtomicReference<>(null);public CLHSpinLock() {// 设置尾部节点tail.getAndSet(Node.EMPTY);}@Overridepublic void lock() {// 加锁操作// 将节点添加到等待队列的尾部Node curNode = new Node(true, null);Node preNode = tail.get();// 通过CAS自旋 将当前节点插入到队列的尾部while (!tail.compareAndSet(preNode, curNode)) {preNode = tail.get();}// 设置前驱节点curNode.setPreNode(preNode);// 自旋监听前驱节点的locked变量,直到其值为false,如果前驱节点为true说明上一个线程还没释放锁while (curNode.getPreNode().isLocked()) {// 抢锁失败 让出cpu时间片Thread.yield();}// 到这里说明已经抢到了锁// log.error("已经抢锁成功!");// 将当前节点缓存到线程本地变量中 释放锁的时候需要使用currentNodeLocal.set(curNode);}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}@Overridepublic void unlock() {// 释放锁// 获取当前线程的threadLocalNode node = currentNodeLocal.get();// 将locked标志改为falsenode.setLocked(false);// 将前驱节点设置为null 断开引用方便垃圾回收node.setPreNode(null);// 释放当前缓存中的线程信息currentNodeLocal.set(null);}@Overridepublic Condition newCondition() {return null;}}
}