一、CAS 与原子类
1. CAS
CAS(compare and swap),是一条 cpu 指令,其含义为:CAS(M, A, B); M 表示内存,A 和 B 分别表示一个寄存器;如果 M 的值和 A 的值相同,则把 M 和 B 的值交换,如果不同,则无事发生; 因为单条 cpu 指令本身就是原子的,因此可以基于 CAS 指令,不进行加锁,来编写线程安全代码;
CAS 指令操作经过操作系统,JVM 的层层封装,最后 Java 标准库,提供了一些工具类,其中最主要工具类就是 原子类,由于原子类内部用的是 CAS 实现,所以性能要比加锁实现高很多;在 java.util.concurrent.atomic 包下;
2. AtomicInteger
其中常用的类 AtomicInteger:该类是对 int 的 CAS 实现,该类的常用方法如下:
AtomicInteger(int initialValue): 构造方法,创建一个值为 initialValue 的 AtomicInteger 对象;
count.getAndIncrement(): 相当于 count++,先返回 count,并将 count + 1;
count.incrementAndGet(): 相当于 ++count,将 count + 1,再返回 count;
count.getAndAdd(int delta): 先返回 count,再将 count + delta;
count.addAndGet(int delta): 先将 count + delta,再返回 count;
public static void main(String[] args) {AtomicInteger count = new AtomicInteger(0);System.out.println(count.getAndAdd(2));System.out.println(count.addAndGet(2));}
在多线程情况下使用原子类的变量,不会出现线程安全问题,例如:
public class Demo6 {private static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {for (int i = 0; i < 20000; i++) {count.getAndIncrement();}});Thread t2 = new Thread(() -> {for (int i = 0; i < 20000; i++) {count.getAndIncrement();}});t1.start();t2.start();t1.join();t2.join();System.out.println(count);}
}
3. ABA 问题
以下是对 AtomicInteger 中 getAndIncrement 方法的 CAS 操作的伪代码实现,
public int getAndIncrement() {int oldValue = value;while ( CAS(value, oldValue, oldValue+1) != true) {oldValue = value;}return oldValue;}
CAS 操作的实现
假设两个线程同时调用 getAndIncrement 方法:
1)两个线程都读取 value 的值到 oldValue中 (oldValue 是栈上的⼀个局部变量,每个线程有自己的栈)
2)线程 1 先执行 CAS 操作,由于 oldValue 和 value 的值相同,会直接对 value 赋值;由于 CAS是直接读写内存的,并且 CAS 的读内存,比较,写内存操作是⼀条硬件指令,是原子的,故此时线程 2 无法穿插;
3)当线程 2 再执行 CAS 操作,第⼀次 CAS 的时候发现 oldValue 和 value 不相等,就不能进行赋值,需要进入循环,在循环里重新读取新的 value 的值赋给 oldValue;
综上,CAS 通过 "值没有发生改变" 来作为 "没有其他线程穿插执行的" 的判定依据;这样就存在了一个问题;即
ABA 问题:
假设存在两个线程 t1 和 t2,有⼀个共享变量 num,初始值为 A,线程 t1 想使⽤ CAS 把 num 值从 A 改成 Z,就需要进行这两个操作:1)先读取 num 的值,并记录到 oldNum 变量中,2)使用 CAS 判定当前 num 的值是否为 A,如果为 A,就修改成 Z;但是在 t1 执行这两个操作之间,t2 线程可能把 num 的值从 A 改成了 B,又从 B 改成了 A;此时 t1 线程无法区分当前这个变量始终是 A,还是经历了⼀个变化过程,这就是 ABA 问题;
ABA 引发的 bug:假设 巧巧 有 200 块钱存款,巧巧 想从 ATM 中取 100 块钱,取款机创建了两个线程,并发的来执行 -50 操作;于是
1)存款 200,线程1 获取到当前存款值为 200,期望更新为 100;线程2 获取到当前存款值为200,期望更新为 100;
2)线程1 执行扣款成功,存款被改成 100,线程2 阻塞等待中;
正常的过程:
3)轮到线程2 执行时,发现当前存款为 100,和之前读到的 200 不相同,执行失败;
发生异常的过程:
3)此时 巧巧 的朋友给她转账了 100 元,此时账户金额又变为 200;
4)轮到线程2 执行时,发现当前存款为 200,和之前读到的 200 相同,再次执行扣款操作;
ABA 问题的解决方案:
给要修改的值引入版本号,在 CAS 比较当前值和旧值的同时,也要比较版本号是否符合预期;
二、信号量
信号量,用来表示 "可用资源的个数",本质上就是⼀个计数器,锁就是可用资源个数为 1 的信号量,加锁(申请资源)对应 P 操作,解锁(释放资源)对应 V 操作;
Java 标准库提供了 Semaphore 类对信号量进行实现,Semaphore 的 P V 操作中的加减计数器操作都是原子的,可以在多线程环境下直接使用;
常用方法:
Semaphore(int permits):构造方法:创建一个可用资源为 permits 的信号量对象;
acquire():申请资源;
release():释放资源;
可以借助信号量实现类似于锁的效果,代码示例:
public class Demo7 {private static int count = 0;public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(1);Thread t1 = new Thread(() -> {for (int i = 0; i < 20000; i++) {try {semaphore.acquire();} catch (InterruptedException e) {throw new RuntimeException(e);}count++;semaphore.release();}});Thread t2 = new Thread(() -> {for (int i = 0; i < 20000; i++) {try {semaphore.acquire();} catch (InterruptedException e) {throw new RuntimeException(e);}count++;semaphore.release();}});t1.start();t2.start();t1.join();t2.join();System.out.println(count);}
}
三、ReentrantLock 类
可重入互斥锁,和 synchronized 定位类似,都是用来实现互斥效果,保证线程安全;
该类常用的方法:
lock():加锁,如果获取不到锁就死等待;
trylock(超时时间):加锁,如果获取不到锁,等待⼀定的时间之后就放弃加锁;
unlock():解锁;
ReentrantLock 和 synchronized 的区别:
1)synchronized 是⼀个关键字,是 JVM 内部实现的,ReentrantLock 是 Java 标准库的⼀个类,在 JVM 外实现的
2)synchronized 使用时不需要手动释放锁,而ReentrantLock 在使用时需要通过 unlock 手动释放锁,使用起来更灵活,但是也容易忘记释放锁(最好通过 try finally 的方式加锁并释放);
3)synchronized 在申请锁失败时会死等待,ReentrantLock 可以通过 trylock 的方式等待⼀段时间就放弃;
4)synchronized 是非公平锁,ReentrantLock 默认是非公平锁,但可以通过构造方法传入一个 true 开启公平锁模式;
5)synchronized 是通过 Object 类的 wait / notify 实现等待唤醒,每次唤醒的是⼀个随机等待的线程,ReentrantLock 搭配 Condition 接口实现等待唤醒,可以更精确控制唤醒某个指定的线程;
四、CountDownLatch 类
该类可以判定多线程任务是否全部都执行完成;
常用方法:
CountDownLatch(int count):构造方法,count 表示任务的数量;
await():调用该方法的线程会阻塞,等待其他线程全部执行完任务之后,该线程才会继续执行;
countDown():用于告诉 countDownLatch 对象,当前任务执行完毕;
代码示例:
public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {int id = i;Thread t = new Thread(() -> {Random random = new Random();int time = random.nextInt(6) * 300;System.out.println("线程 " + id + "开始执行");try {Thread.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("线程 " + id + "执行结束");countDownLatch.countDown();});t.start();}countDownLatch.await();System.out.println("所有线程执行完毕");}
以上代码创建了一个可以包含 5 个任务的 CountDownLatch 对象,并创建了 5 个线程,每个线程执行完都会通知该 CountDownLatch 对象,并在主线程中等待所有线程执行完毕;
五、线程安全的集合类
1. Vector,Stack,Hashtable
这三个集合类本身就是线程安全的,其内部都是通过对类中的整个方法加上 synchronized 实现的,但是效率都太低了;
2. 通过 Collections 工具类提供的方法;
synchronizedList(List<T> list):将指定的 list 变为线程安全的并返回;
synchronizedSet(Set<T> set):将指定的 set 变为线程安全的并返回;
synchronizedMap(Map<K,V> map):将指定的 map 对象变为线程安全的并返回;
3. 使用 CopyOnWrite 容器
CopyOnWriteArrayList():构造方法,返回一个 CopyOnWriteArrayList 对象;
CopyOnWriteArraySet():构造方法,返回一个 CopyOnWriteArraySet 对象;
CopyOnWrite 容器是写时复制的容器,当我们往⼀个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 copy,复制出⼀个新的容器,然后往新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器;
例如两个线程使用同一个 CopyOnWriteArrayList 对象,两个线程可同时读,但如果有一个线程要修改,就把该对象复制一个副本,对副本进行修改,同时,不影响另一个线程继续读原来的数据,在修改完后,让原来的对象的引用指向修改后的副本;
这样做的好处是我们可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素, 所以 CopyOnWrite 容器是⼀种读写分离的思想;
优点:在读多写少的场景下,性能很高,不需要加锁竞争
缺点:1. 占用内存较多,不适合存储大量数据;
2. 新写的数据不能被第⼀时间读取到;
4. BlockingQueue
多线程环境下使用队列,可以借助 BlockingQueue 接口下的实现子类;
1)ArrayBlockingQueue 基于数组实现的阻塞队列;
2)LinkedBlockingQueue 基于链表实现的阻塞队列;
3)PriorityBlockingQueue 基于堆实现的带优先级的阻塞队列;
4)TransferQueue 最多只包含⼀个元素的阻塞队列;
5. ConcurrentHashMap
多线程环境下使用哈希表可以使用 Hashtable(但是效率很慢)或者 ConcurrentHashMap;
因为 Hashtable 只是简单的把关键方法加上了 synchronized 关键字,这相当于直接针对 Hashtable 对象本身加锁;此时如果多线程访问同⼀个 Hashtable 就会直接造成锁冲突,size 属性也是通过 synchronized 来控制同步,也是比较慢的,并且⼀旦触发扩容,就由该线程完成整个扩容过程,这个过程会涉及到大量的元素拷贝,效率会非常低;
而 ConcurrentHashMap 相比于 Hashtable 的优点如下:
1)对读操作没有加锁,但针对读操作使用了 volatile 保证从内存读取结果),只对写操作进行加锁,加锁的方式仍然是使用 synchronized,但不是锁住整个对象,而是 "锁住每个桶" (用每个链表的头结点作为锁对象),只有两个线程访问的是同一个链表上的数据时才会发生锁冲突,这就大大的降低了锁冲突的概率;
2)充分利用 CAS 特性,比如 size 属性通过 CAS 来更新,避免出现重量级锁的情况;
3)在扩容时,每个来操作 ConcurrentHashMap 的线程,都会参与搬家的过程,每个线程都负责搬运一小部分元素,搬完最后⼀个元素再把旧的数组删掉,并且在这个搬运期间,插入只会往新数组增加,而查找需要同时查新数组和旧的数组,保证了数据的准确性;