【JUC】原子操作类及LongAddr源码分析

文章目录

    • 1. 十八罗汉
    • 2. 原子类再分类
      • 2.1 基本类型原子类
      • 2.2 数组类型原子类
      • 2.3 引用类型原子类
      • 2.4 对象的属性修改原子类
      • 2.5 原子操作增强类
    • 3. 代码演示及性能比较:
    • 4. LongAddr原理
    • 5. LongAddr源码分析
      • 5.1 add()
      • 5.2 longAccumulate()
      • 5.3 sum()
    • 6. 小总结
      • 6.1 AtomicLong
      • 6.2 LongAdder

1. 十八罗汉

底层使用Unsafe类的CAS方法,而无需使用synchronized等重量锁使操作变得线程安全

  1. AtomicBoolean
  2. AtomicInteger
  3. AtomicIntegerArray
  4. AtomicIntegerFieldUpdater
  5. AtomicLong
  6. AtomicLongArray
  7. AtomicLongFieldUpdater
  8. AtomicMarkableReference
  9. AtomicReference
  10. AtomicReferenceArray
  11. AtomicReferenceFieldUpdater
  12. AtomicStampedReference
  13. DoubleAccumulator
  14. DoubleAdder
  15. LongAccumulator
  16. LongAdder
  17. Striped64,LongAdder是Striped64的子类
  18. Number,Striped64,AtomicInteger等是Number的子类

2. 原子类再分类

2.1 基本类型原子类

  1. AtomicBoolean
  2. AtomicInteger
  3. AtomicLong

基本使用

public class Temp {static int num = 0;static AtomicInteger atomicNum = new AtomicInteger();public static void main(String[] args) throws InterruptedException {int size = 50;CountDownLatch count = new CountDownLatch(size);for (int i = 0; i < size; i++) {new Thread(() -> {try {for (int j = 0; j < 1000; j++) {num++;atomicNum.getAndIncrement();}} finally {count.countDown();}}).start();}count.await();System.out.println(num);System.out.println(atomicNum.get());}
}

输出

39224
50000

2.2 数组类型原子类

如何理解?类比数组即可

  1. AtomicIntegerArray
  2. AtomicLongArray
  3. AtomicReferenceArray

2.3 引用类型原子类

  1. AtomicReference
  2. AtomicStampedReference
    • 用版本号解决CAS的ABA问题,可以统计出修改过几次
  3. AtomicMarkableReference
    • 用状态戳解决CAS的ABA问题,可以标记出是否修改过

2.4 对象的属性修改原子类

  1. AtomicIntegerFieldUpdater:原子更新对象中int类型字段的值
  2. AtomicLongFieldUpdater:原子更新对象中Long类型字段的值
  3. AtomicReferenceFieldUpdater:原子更新对象中引用类型字段的值

目的:以一种线程安全的方式操作非线程安全对象内的某些字段

要求:

  • 更新的对象属性必须使用public volatile修饰
  • 使用静态放法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性
/*** 需求:多线程环境下初始化资源类,要求只能初始化一次*/
public class Temp {public volatile Boolean isInit = Boolean.FALSE;private static final AtomicReferenceFieldUpdater<Temp, Boolean> ATOMIC_REFERENCE_FIELD_UPDATER =AtomicReferenceFieldUpdater.newUpdater(Temp.class, Boolean.class, "isInit");public void init(Temp temp) {if (ATOMIC_REFERENCE_FIELD_UPDATER.compareAndSet(temp, Boolean.FALSE, Boolean.TRUE)) {System.out.println(Thread.currentThread().getName()+":init start");try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) {}System.out.println(Thread.currentThread().getName()+":init finished");} else {System.out.println(Thread.currentThread().getName()+":已有线程正在初始化");}}public static void main(String[] args) throws InterruptedException {int size = 5;CountDownLatch count = new CountDownLatch(size);Temp temp = new Temp();for (int i = 0; i < size; i++) {new Thread(() -> {try {temp.init(temp);} finally {count.countDown();}}).start();}count.await();}
}

2.5 原子操作增强类

  1. DoubleAccumulator:一个或多个变量共同维护使用的函数更新的运行double值
  2. DoubleAdder:一个或多个变量共同维持最初的零和double总和
  3. LongAccumulator:一个或多个变量共同维护使用的函数更新的运行long值
  4. LongAdder:一个或多个变量共同维持最初的零和long总和

volatile 解决多线程内存不可见问题。对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。

说明: 如果是 count++操作,使用如下类实现: AtomicInteger count = new AtomicInteger0;count.addAndGet(1);如果是JDK8,推荐使用 LongAdder 对象,比 AtomicLong 性能更好( 减少乐观锁的重试次数 );同时,使用LongAdder的空间代价更大,故越是高并发越推荐使用LongAdder 。

LongAdder只能用来计算加法,且从零开始计算;

LongAccumulator提供了自定义函数的操作且可以传入初始值。

3. 代码演示及性能比较:

/*** 需求:点赞数统计,50个线程,每个点赞1千万次*/
public class Temp {long i = 0;public synchronized void synchronizedIncrement() {i++;}AtomicLong atomicLong = new AtomicLong();public void atomicLongIncrement() {atomicLong.getAndIncrement();}LongAdder longAdder = new LongAdder();public void longAdderIncrement() {longAdder.increment();}LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);public void longAccumulatorIncrement() {longAdder.increment();}public static void main(String[] args) throws InterruptedException {int threadNum = 50;int times = 10000000;process(threadNum, times, 1);process(threadNum, times, 2);process(threadNum, times, 3);process(threadNum, times, 4);}public static void process(int threadNum, int times, int type) throws InterruptedException {long startTime = System.currentTimeMillis();CountDownLatch countDownLatch = new CountDownLatch(threadNum);Temp temp = new Temp();for (int i = 0; i < threadNum; i++) {new Thread(() -> {try {for (int j = 0; j < times; j++) {switch (type) {case 1: temp.synchronizedIncrement();break;case 2: temp.atomicLongIncrement();break;case 3: temp.longAdderIncrement();break;case 4: temp.longAccumulatorIncrement();break;default: throw new RuntimeException("不存在的类型");}}} finally {countDownLatch.countDown();}}).start();}countDownLatch.await();long endTime = System.currentTimeMillis();System.out.println("costTime:" + (endTime - startTime) + " 毫秒," + typeStr(type));}public static String typeStr(int type) {switch (type) {case 1: return "synchronizedIncrement";case 2: return "atomicLongIncrement";case 3: return "longAdderIncrement";case 4: return "longAccumulatorIncrement";default: throw new RuntimeException("不存在的类型");}}
}

执行结果:

costTime:22854 毫秒,synchronizedIncrement
costTime:4888 毫秒,atomicLongIncrement
costTime:215 毫秒,longAdderIncrement
costTime:214 毫秒,longAccumulatorIncrement

4. LongAddr原理

LongAddr继承了Striped64,LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回

sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,
从而降级更新热点

Striped64比较重要的成员变量

    /** Number of CPUS, to place bound on table size *  CPU数量,即cells数组的最大长度*/static final int NCPU = Runtime.getRuntime().availableProcessors();/*** Table of cells. When non-null, size is a power of 2.* cells数组,长度为2的幂,2,4,8,16...,方便以后位运算*/transient volatile Cell[] cells;/*** Base value, used mainly when there is no contention, but also as* a fallback during table initialization races. Updated via CAS.* 基础value值,当并发较低时,只累加该值,主要用于没有竞争的情况,通过CAS更新*/transient volatile long base;/*** Spinlock (locked via CAS) used when resizing and/or creating Cells.* 创建或扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁*/transient volatile int cellsBusy;

在这里插入图片描述

5. LongAddr源码分析

小总结:LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零分散热点的做法,用空间换时间用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行换作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程换作完毕,将数组cells的所有值和base都加起来作为最终结果

5.1 add()

public void add(long x) {// as是Striped64中的cells数组属性// b是Striped64中的base属性// v是当前线程hash到Cell中存储的值// m是cells的长度-1,hash时作为掩码使用// a是当前线程hash到的CellCell[] as; long b, v; int m; Cell a;// 首次首线程(as = cells) != null一定是false,此时走casBase方法,以CAS的方式更新base值,且只有当cas失败时,才会走到if中// 		条件1: cells不为空// 		条件2: cas操作base失败,说明其他线程先一步修改了base,出现竞争if ((as = cells) != null || !casBase(b = base, b + x)) {// true无竞争,false表示竞争激烈,多个线程hash到同一个cell,可能要扩容boolean uncontended = true;// 条件1:cells为空// 条件2:应该不会出现// 条件3:当前线程所在cell为空,说明当前线程还没有更新过cell,应初始化一个cell// 条件4:更新当前线程所在cell失败,说明竞争很激烈,多个线程hash到了同一个cell,应扩容if (as == null || (m = as.length - 1) < 0 ||// getProbe()方法返回的是线程中的threadLocalRandomProbe字段// 它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}
}

5.2 longAccumulate()

    /*** Handles cases of updates involving initialization, resizing,* creating new Cells, and/or contention. See above for* explanation. This method suffers the usual non-modularity* problems of optimistic retry code, relying on rechecked sets of* reads.** @param x the value * @param fn the update function, or null for add (this convention* avoids the need for an extra field or function in LongAdder). * @param wasUncontended false if CAS failed before call * x: 需要增加的值,一般默认都是1* fn: 默认传递的是null* wasUncontended: 竞争标识,如果是false标识有竞争。只有cells初始化之后,并且当前线程CAS修改失败才会是false*/final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {// h:存储线程的probe值int h;// 如果getProbe()方法返回0,说明随机数未初始化if ((h = getProbe()) == 0) { // 这个操作相当于给当前线程生成一个非0的hash值// 使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化ThreadLocalRandom.current(); // force initialization// 重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为trueh = getProbe();// 重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈,故设置wasUncontended竞争状态为truewasUncontended = true;}// 如果hash取模映射得到的cell单元不是null,则为true,此值也可以看做是扩容意向boolean collide = false;                // True if last slot nonemptyfor (;;) {// 自旋:按CASE 2,3,1看代码Cell[] as; Cell a; int n; long v;// CASE 1:cells已经被初始化了if ((as = cells) != null && (n = as.length) > 0) {// CASE 1.1 :当前线程hash到的cell还未初始化,则需要进行初始化处理,初始化的值为x即默认的1if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) {       // Try to attach new CellCell r = new Cell(x);   // Optimistically create// 双端检查并尝试抢锁if (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try {               // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue;           // Slot is now non-empty}}collide = false;}// CASE 1.2:竞争激烈,允许重新计算hash值else if (!wasUncontended)       // CAS already known to failwasUncontended = true;      // Continue after rehash// CASE 1.3:当前线程hash到已初始化的槽位,使用cas进行更新,更新成功则跳出循环else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;// CASE 1.4:如果cells的长度已经大于等于CPU核数,就不要再扩容了,继续hash;cells != as说明已有线程正在扩容else if (n >= NCPU || cells != as)collide = false;            // At max size or stale// CASE 1.5:将扩容意向置为true,如果再hash仍然不满足上诉条件便扩容else if (!collide)collide = true;// CASE 1.6:尝试扩容,将老数组复制到新数组上else if (cellsBusy == 0 && casCellsBusy()) {try {// 当前的cells数组和最先赋值的as是同一个,代表没有被其他线程扩容过if (cells == as) {      // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue;                   // Retry with expanded table}// 允许重置当前线程的hash值h = advanceProbe(h);}// CASE 2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组;cellsBusy=0表示无锁状态,cellsBusy=1为持锁状态else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// boolean init = false;try {                           // Initialize tableif (cells == as) {// 与条件中cells == as形成双重检查Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x); // 按照当前线程hash到数组中的位置并创建其对应的Cellcells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}// CASE 3:cells正在进行初始化,则尝试直接在基数base上进行累加操作;兜底操作,当新线程进来,而数组正在初始化时,则用base进行累加else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;                          // Fall back on using base}}

5.3 sum()

    /*** Returns the current sum.  The returned value is <em>NOT</em> an* atomic snapshot; invocation in the absence of concurrent* updates returns an accurate result, but concurrent updates that* occur while the sum is being calculated might not be* incorporated.** @return the sum*/public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}

sum执行时,并没有限制对base和cells的更新。所以LongAdder不是强一致性的,它是最终一致性的

6. 小总结

6.1 AtomicLong

原理:CAS+自旋

场景:低并发下的全局计算,AtomicLong能保证并发情况下计数的准确性,内部使用CAS来解决并发安全问题

缺陷:高并发后性能急剧下降,N个线程CAS操作修改线程的值,每次只有一个成功,其它N-1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,很浪费CPU资源

6.2 LongAdder

原理:CAS+Base+Cell数组分散,空间换时间并分散了热点数据

场景:高并发下的全局计算

缺陷:sum求和时还有计算线程修改结果的话,最后的结果不够准确

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/106331.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解Kafka分区副本机制

1. Kafka集群 Kafka 使用 Zookeeper 来维护集群成员 (brokers) 的信息。每个 broker 都有一个唯一标识 broker.id&#xff0c;用于标识自己在集群中的身份&#xff0c;可以在配置文件 server.properties 中进行配置&#xff0c;或者由程序自动生成。下面是 Kafka brokers 集群自…

上机实验二 设计单循环链表 西安石油大学数据结构

实验名称:设计单循环链表 (1&#xff09;实验目的:掌握线性表的链式存储结构;掌握单循环链表及其基本操作的实现。 (2&#xff09;主要内容:实现单循环链表的初始化、求数据元素个数、插入、删除、取数据元素等操作;用插入法建立带头结点的单循环链表;设计一个测试主函数验证…

hadoop组成

在hadoop1.x时代,Hadoop中的MapReduce同时处理业务逻辑运算和资源调度,耦合性较大; 在hadoop2.x时代,新增了yarn,主要负责资源的调度,MapReduce仅负责运算; 在hadoop3.x时代,在组成上没有变化;

2023-10-14 LeetCode每日一题(只出现一次的数字)

2023-10-14每日一题 一、题目编号 136. 只出现一次的数字二、题目链接 点击跳转到题目位置 三、题目描述 给你一个 非空 整数数组 nums &#xff0c;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现两次。找出那个只出现了一次的元素。 你必须设计并实现线性时…

支付宝开放平台第三方代小程序开发,消息服务推送通知总结

大家好&#xff0c;我是小悟 关于支付宝开放平台第三方代小程序开发的消息服务推送通知&#xff0c;是开放平台代小程序实现业务的重要功能。 消息服务推送通知&#xff0c;支持商家两种通讯协议来接收消息&#xff0c;分别为websocket 长连接和http。 关于websocket方式&am…

苍穹外卖(八) 使用WebSocket协议完成来单提醒及客户催单功能

WebSocket介绍 WebSocket 是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工通信(双向传输)——浏览器和服务器只需要完成一次握手&#xff0c;两者之间就可以创建持久性的连接&#xff0c; 并进行双向数据传输。 HTTP协议和WebSocket协议对比&#xff1a; HTTP…

ChatGPT角色扮演教程,Prompt词分享

使用指南 1、可直复制使用 2、可以前往已经添加好Prompt预设的AI系统测试使用 https://ai.idcyli.comhttps://ai.idcyli.com 雅思写作考官 我希望你假定自己是雅思写作考官&#xff0c;根据雅思评判标准&#xff0c;按我给你的雅思考题和对应答案给我评分&#xff0c;并且按…

DirectX绘制流水线

使用DirectX可以让在Windows平台上运行的游戏或多媒体程序获得更高的执行效率&#xff0c;掌握DirectX的基本概念和技术是虚拟现实技术、计算机仿真和3D游戏程序开发的基础。 DirectX概述 DirectX是微软的一个多媒体应用编程接口(API)工具包&#xff0c;用于为Windows操作系统…

EfficientDet: Scalable and Efficient Object Detection

CVPR2020 V7 Mon, 27 Jul 2020 引用量&#xff1a;243 机构&#xff1a;Google 贡献&#xff1a;1>提出了多尺度融合网络BiFPN 2>对backbone、feature network、box/class prediction network and resolution进行复合放缩&#xff0c;有着不同的…

在 VSCode 中使用 PlantUML

最近&#xff0c;因为工作需要绘制一些逻辑图&#xff0c;我自己现在使用的是 PlantUML 或者 mermaid&#xff0c;相比之下前者更加强大。不过它的环境也麻烦一些&#xff0c;mermaid 在一些软件上已经内置了。但是 PlantUML 一般需要自己本地安装或者使用远程服务器&#xff0…

Unity中Shader不同灯光类型的支持与区分

文章目录 前言一、在开始之前做一些准备1、在上一篇文章的场景基础上&#xff0c;增加一个Unity默认的球体作为对照组2、创建一个点光源&#xff0c;用来看点光源的影响 对 Unity默认的Shader效果 和 我们实现的Shader效果 之间的不同 二、点光源的适配把上一篇文章中 ForwardB…

R语言的计量经济学实践技术应用

计量经济学通常使用较小样本&#xff0c;但这种区别日渐模糊&#xff0c;机器学习在经济学领域、特别是经济学与其它学科的交叉领域表现日益突出&#xff0c;R语言是用于统计建模的主流计算机语言&#xff0c;在本次培训中&#xff0c;我们将从实际应用出发&#xff0c;重点从数…

【arm实验2】按键中断事件控制实验

设置按键中断&#xff0c;按键1按下&#xff0c;LED亮&#xff0c;再次按下&#xff0c;灭 按键2按下&#xff0c;蜂鸣器叫&#xff0c;再次按下&#xff0c;停 按键3按下&#xff0c;风扇转&#xff0c;再次按下&#xff0c;停 主函数&#xff1a; linuxlinux:~/study/08-c$…

【基础篇】四、本地部署Flink

文章目录 1、本地独立部署会话模式的Flink2、本地独立部署会话模式的Flink集群3、向Flink集群提交作业4、Standalone方式部署单作业模式5、Standalone方式部署应用模式的Flink Flink的常见三种部署方式&#xff1a; 独立部署&#xff08;Standalone部署&#xff09;基于K8S部署…

使用Tortoisegit界面拉起master主分支以副分支以及命令行操作

文章目录 1、Gui操作界面2、命令行操作3、合并分支到master分支上面 1、Gui操作界面 "小乌龟"通常指的是Git的图形用户界面&#xff08;GUI&#xff09;工具&#xff0c;其中比较常见的是TortoiseGit。下面是使用TortoiseGit来拉取&#xff08;checkout&#xff09;一…

数据结构(一)—— 数据结构简介

文章目录 一、基本概念和术语&#xff1f;1.1、数据1.2、数据元素1.3、数据项&#xff08;属性、字段&#xff09;1.4、数据对象1.5、数据结构 二、逻辑结构和物理结构&#xff08;存储结构&#xff09;2.1、逻辑结构1、定义2、分类&#xff08;线性结构和非线性结构&#xff0…

Netty 入门 — 亘古不变的Hello World

这篇文章我们正式开始学习 Netty&#xff0c;在入门之前我们还是需要了解什么是 Netty。 什么是 Netty 为什么很多人都推崇 Java boy 去研究 Netty&#xff1f;Netty 这么高大上&#xff0c;它到底是何方神圣&#xff1f; 用官方的话说&#xff1a;Netty 是一款异步的、基于事…

vue绑定style和class 对象写法

适用于&#xff1a;要绑定多个样式&#xff0c;个数确定&#xff0c;名字也确定&#xff0c;但不确定用不用。 绑定 class 样式【对象写法】&#xff1a; .box{width: 100px;height: 100px; } .aqua{background-color: aqua; } .border{border: 20px solid red; } .radius{bor…

【动态库】Ubuntu 添加动态库的搜索路径

在运行程序时&#xff0c;经常遇到下面这种动态库加载失败的情况&#xff0c;这时往往是系统在动态库的搜索路径下没有找到对应的库文件导致的。 目录 一、使用 LD_LIBRARY_PATH 二、修改 /etc/ld.so.conf 一、使用 LD_LIBRARY_PATH 环境变量 LD_LIBRARY_PATH是动态库的搜索…

Jenkins集成newman

一、Docker环境准备 二、Jenkins环境准备 三、登录Jenkins 安装NodeJs插件 四、Jenkins全局工具配置Nodejs 五、创建Jenkins自由风格项目 构建步骤1&#xff1a;选择Execute NodeJS script构建步骤2&#xff1a;选择执行shell脚本 六、将postman相关的脚本、环境变量数据、全局…