目录
ConcurrentHashMap 一定是线程安全的吗
CAS 机制的注意事项
使用java 并行流 ,您要留意了
ConcurrentHashMap 在JDK1.8中ConcurrentHashMap 内部使用的是数组加链表加红黑树的结构,通过CAS+volatile或synchronized的方式来保证线程安全的,这些原理已毋庸置疑,一言不合上代码.
1. 模拟2个线程累计,通过ConcurrentHashMap 储存累计的结果。
/*** @description: ConcurrentHashMap 真的安全吗* @author: ppx* @date: 2023/8/17 14:11* @version: 1.0*/
public class TestMap {private static ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();private static String key = "hello";/*** @description: 测试2个线程 执行计算* @param:* @return: void* @author: ppx* @date: 2023/8/17 16:43*/private static void testRun() {ExecutorService executor = new ThreadPoolExecutor(2, 5,2L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());for (int i = 0; i < 2; i++) {executor.submit(() -> {for (int j = 0; j < 5; j++) {// 第一步读取int value = concurrentHashMap.getOrDefault(key, 0);// 第二步+1value++;// 第三补+ 回写mapconcurrentHashMap.put(key, value);}});}executor.shutdown();// 直到线程执行完成while(!executor.isTerminated()){}System.out.println("执行结果:" + concurrentHashMap.get(key));}public static void main(String[] args) {testRun();}
}
2.出乎意料执行多次输出不同的结果:
3. 分析原理:ConcurrentHashMap 本身是线程安全的,但for 里面的获map取值、加加操作及回写map 这三步是非原子性。要保证操作的安全性,这三步实现原子性即可。
优化后代码:
private static void testRun() {ExecutorService executor = new ThreadPoolExecutor(2, 5,2L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());for (int i = 0; i < 2; i++) {executor.submit(() -> {for (int j = 0; j < 5; j++) {synchronized (TestMap.class) {int value = concurrentHashMap.getOrDefault(key, 0);value ++;concurrentHashMap.put(key, value);}}});}executor.shutdown();while (!executor.isTerminated()) {}System.out.println("执行结果:" + concurrentHashMap.get(key));}
CAS 机制的注意事项
某线程把数据A更新了B,随后又从B更新成A,恰好此时另一线程读取该数据,发现数据的值还是A没有变化,误认为还是原来的A,但此时A的一些属性或状态已经发生过变化。
CAS操作中将判断“V的值是否仍然为A?”,如果是的话将执行更新操作,在某些CAS操作中,如果V的值首先由A变为B,再由B变为A,那么CAS仍然将会操作成功。
ABA问题:
线程A 的操作,cas中的值由1变成99,再由99变成1,此次线程B 发现AtomicInteger 的值还是1,于是更新到50,产生ABA的问题。
private static AtomicInteger atomicInteger = new AtomicInteger(1);public static void main(String[] args) {Thread threadA = new Thread(() -> {atomicInteger.compareAndSet(1, 99);atomicInteger.compareAndSet(99, 1);System.out.println("线程A进行CAS后的值:"+atomicInteger.get());try {Thread.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}, "线程A");Thread threadB = new Thread(() -> {try{atomicInteger.compareAndSet(1, 50);System.out.println("线程B进行CAS后的值:"+atomicInteger.get());}catch (Exception e) {e.printStackTrace();}}, "线程B");threadA.start();try {threadA.join();} catch (InterruptedException e) {e.printStackTrace();}threadB.start();}
基于AtomicStampedReference类实现
AtomicStampedReference内部增加了版本号的概念,只有期待的值与版本号分别匹配后,才满足条件,更新最新的值。
案例:
线程 A 进行CAS 操作更新时,发布版本已发生变动,CAS更新 失败。线程B 进行CAS 操作更新时,匹配对应的版本,期待值,更新成功。
public static void main(String[] args) {new Thread(() -> {// 让线程B 获取最新版本号,成功 执行更新try {Thread.sleep(11);} catch (InterruptedException e) {e.printStackTrace();}int stamp = atomicStampedReference.getStamp();System.out.println(Thread.currentThread().getName() + ", 当前版本号为:" + stamp);boolean firstCasFlag = atomicStampedReference.compareAndSet(100, 99, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);System.out.println("当前版本号:"+atomicStampedReference.getStamp()+", 线程A进行CAS后的值:" + atomicInteger.get() + ",第1次操作是否修改成功: " + firstCasFlag);}, "线程A").start();new Thread(() -> {try {int stamp = atomicStampedReference.getStamp();System.out.println(Thread.currentThread().getName() + ", 版本号为:" + atomicStampedReference.getStamp());boolean flag = atomicStampedReference.compareAndSet(100, 888, stamp, atomicStampedReference.getStamp() + 1);System.out.println("线程B进行CAS后的值:" + atomicStampedReference.getReference() + ", 此次操作是否修改成功: " + flag);} catch (Exception e) {e.printStackTrace();}}, "线程B").start();}
执行结果:
线程B, 版本号为:1
线程B进行CAS后的值:888, 此次操作是否修改成功: true
线程A, 当前版本号为:2
当前版本号:2, 线程A进行CAS后的值:1,第1次操作是否修改成功: false