- 1. 十八罗汉
- 2. 原子类再分类
- 2.1 基本类型原子类
- 2.2 数组类型原子类
- 2.3 引用类型原子类
- 2.4 对象的属性修改原子类
- 2.5 原子操作增强类
- 3. 代码演示及性能比较:
- 4. LongAddr原理
- 5. LongAddr源码分析
- 5.1 add()
- 5.2 longAccumulate()
- 5.3 sum()
- 6. 小总结
- 6.1 AtomicLong
- 6.2 LongAdder
public class Temp {static int num = 0;static AtomicInteger atomicNum = new AtomicInteger();public static void main(String[] args) throws InterruptedException {int size = 50;CountDownLatch count = new CountDownLatch(size);for (int i = 0; i < size; i++) {new Thread(() -> {try {for (int j = 0; j < 1000; j++) {num++;atomicNum.getAndIncrement();}} finally {count.countDown();}}).start();}count.await();System.out.println(num);System.out.println(atomicNum.get());}
- 用版本号解决CAS的ABA问题,可以统计出修改过几次
- 用状态戳解决CAS的ABA问题,可以标记出是否修改过
- 更新的对象属性必须使用public volatile修饰
- 使用静态放法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性
/*** 需求:多线程环境下初始化资源类,要求只能初始化一次*/
public class Temp {public volatile Boolean isInit = Boolean.FALSE;private static final AtomicReferenceFieldUpdater<Temp, Boolean> ATOMIC_REFERENCE_FIELD_UPDATER =AtomicReferenceFieldUpdater.newUpdater(Temp.class, Boolean.class, "isInit");public void init(Temp temp) {if (ATOMIC_REFERENCE_FIELD_UPDATER.compareAndSet(temp, Boolean.FALSE, Boolean.TRUE)) {System.out.println(Thread.currentThread().getName()+":init start");try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) {}System.out.println(Thread.currentThread().getName()+":init finished");} else {System.out.println(Thread.currentThread().getName()+":已有线程正在初始化");}}public static void main(String[] args) throws InterruptedException {int size = 5;CountDownLatch count = new CountDownLatch(size);Temp temp = new Temp();for (int i = 0; i < size; i++) {new Thread(() -> {try {temp.init(temp);} finally {count.countDown();}}).start();}count.await();}
volatile 解决多线程内存不可见问题。对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。
说明: 如果是 count++操作,使用如下类实现: AtomicInteger count = new AtomicInteger0;count.addAndGet(1);如果是JDK8,推荐使用 LongAdder 对象,比 AtomicLong 性能更好( 减少乐观锁的重试次数 );同时,使用LongAdder的空间代价更大,故越是高并发越推荐使用LongAdder 。
/*** 需求:点赞数统计,50个线程,每个点赞1千万次*/
public class Temp {long i = 0;public synchronized void synchronizedIncrement() {i++;}AtomicLong atomicLong = new AtomicLong();public void atomicLongIncrement() {atomicLong.getAndIncrement();}LongAdder longAdder = new LongAdder();public void longAdderIncrement() {longAdder.increment();}LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);public void longAccumulatorIncrement() {longAdder.increment();}public static void main(String[] args) throws InterruptedException {int threadNum = 50;int times = 10000000;process(threadNum, times, 1);process(threadNum, times, 2);process(threadNum, times, 3);process(threadNum, times, 4);}public static void process(int threadNum, int times, int type) throws InterruptedException {long startTime = System.currentTimeMillis();CountDownLatch countDownLatch = new CountDownLatch(threadNum);Temp temp = new Temp();for (int i = 0; i < threadNum; i++) {new Thread(() -> {try {for (int j = 0; j < times; j++) {switch (type) {case 1: temp.synchronizedIncrement();break;case 2: temp.atomicLongIncrement();break;case 3: temp.longAdderIncrement();break;case 4: temp.longAccumulatorIncrement();break;default: throw new RuntimeException("不存在的类型");}}} finally {countDownLatch.countDown();}}).start();}countDownLatch.await();long endTime = System.currentTimeMillis();System.out.println("costTime:" + (endTime - startTime) + " 毫秒," + typeStr(type));}public static String typeStr(int type) {switch (type) {case 1: return "synchronizedIncrement";case 2: return "atomicLongIncrement";case 3: return "longAdderIncrement";case 4: return "longAccumulatorIncrement";default: throw new RuntimeException("不存在的类型");}}
costTime:22854 毫秒,synchronizedIncrement
costTime:4888 毫秒,atomicLongIncrement
costTime:215 毫秒,longAdderIncrement
costTime:214 毫秒,longAccumulatorIncrement
/** Number of CPUS, to place bound on table size * CPU数量,即cells数组的最大长度*/static final int NCPU = Runtime.getRuntime().availableProcessors();/*** Table of cells. When non-null, size is a power of 2.* cells数组,长度为2的幂,2,4,8,16...,方便以后位运算*/transient volatile Cell[] cells;/*** Base value, used mainly when there is no contention, but also as* a fallback during table initialization races. Updated via CAS.* 基础value值,当并发较低时,只累加该值,主要用于没有竞争的情况,通过CAS更新*/transient volatile long base;/*** Spinlock (locked via CAS) used when resizing and/or creating Cells.* 创建或扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁*/transient volatile int cellsBusy;
public void add(long x) {// as是Striped64中的cells数组属性// b是Striped64中的base属性// v是当前线程hash到Cell中存储的值// m是cells的长度-1,hash时作为掩码使用// a是当前线程hash到的CellCell[] as; long b, v; int m; Cell a;// 首次首线程(as = cells) != null一定是false,此时走casBase方法,以CAS的方式更新base值,且只有当cas失败时,才会走到if中// 条件1: cells不为空// 条件2: cas操作base失败,说明其他线程先一步修改了base,出现竞争if ((as = cells) != null || !casBase(b = base, b + x)) {// true无竞争,false表示竞争激烈,多个线程hash到同一个cell,可能要扩容boolean uncontended = true;// 条件1:cells为空// 条件2:应该不会出现// 条件3:当前线程所在cell为空,说明当前线程还没有更新过cell,应初始化一个cell// 条件4:更新当前线程所在cell失败,说明竞争很激烈,多个线程hash到了同一个cell,应扩容if (as == null || (m = as.length - 1) < 0 ||// getProbe()方法返回的是线程中的threadLocalRandomProbe字段// 它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}
/*** Handles cases of updates involving initialization, resizing,* creating new Cells, and/or contention. See above for* explanation. This method suffers the usual non-modularity* problems of optimistic retry code, relying on rechecked sets of* reads.** @param x the value * @param fn the update function, or null for add (this convention* avoids the need for an extra field or function in LongAdder). * @param wasUncontended false if CAS failed before call * x: 需要增加的值,一般默认都是1* fn: 默认传递的是null* wasUncontended: 竞争标识,如果是false标识有竞争。只有cells初始化之后,并且当前线程CAS修改失败才会是false*/final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {// h:存储线程的probe值int h;// 如果getProbe()方法返回0,说明随机数未初始化if ((h = getProbe()) == 0) { // 这个操作相当于给当前线程生成一个非0的hash值// 使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化ThreadLocalRandom.current(); // force initialization// 重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为trueh = getProbe();// 重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈,故设置wasUncontended竞争状态为truewasUncontended = true;}// 如果hash取模映射得到的cell单元不是null,则为true,此值也可以看做是扩容意向boolean collide = false; // True if last slot nonemptyfor (;;) {// 自旋:按CASE 2,3,1看代码Cell[] as; Cell a; int n; long v;// CASE 1:cells已经被初始化了if ((as = cells) != null && (n = as.length) > 0) {// CASE 1.1 :当前线程hash到的cell还未初始化,则需要进行初始化处理,初始化的值为x即默认的1if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically create// 双端检查并尝试抢锁if (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}// CASE 1.2:竞争激烈,允许重新计算hash值else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash// CASE 1.3:当前线程hash到已初始化的槽位,使用cas进行更新,更新成功则跳出循环else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;// CASE 1.4:如果cells的长度已经大于等于CPU核数,就不要再扩容了,继续hash;cells != as说明已有线程正在扩容else if (n >= NCPU || cells != as)collide = false; // At max size or stale// CASE 1.5:将扩容意向置为true,如果再hash仍然不满足上诉条件便扩容else if (!collide)collide = true;// CASE 1.6:尝试扩容,将老数组复制到新数组上else if (cellsBusy == 0 && casCellsBusy()) {try {// 当前的cells数组和最先赋值的as是同一个,代表没有被其他线程扩容过if (cells == as) { // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}// 允许重置当前线程的hash值h = advanceProbe(h);}// CASE 2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组;cellsBusy=0表示无锁状态,cellsBusy=1为持锁状态else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// boolean init = false;try { // Initialize tableif (cells == as) {// 与条件中cells == as形成双重检查Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x); // 按照当前线程hash到数组中的位置并创建其对应的Cellcells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}// CASE 3:cells正在进行初始化,则尝试直接在基数base上进行累加操作;兜底操作,当新线程进来,而数组正在初始化时,则用base进行累加else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}}
/*** Returns the current sum. The returned value is <em>NOT</em> an* atomic snapshot; invocation in the absence of concurrent* updates returns an accurate result, but concurrent updates that* occur while the sum is being calculated might not be* incorporated.** @return the sum*/public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
