一、java8原子类LongAdder
在Java并发编程(2)中有关于AtomicInteger的介绍. AtomicInteger通过CAS非阻塞的原子操作提升了并发性能. 但随着线程数量增大对共享资源的竞争提升,大量线程竞争失败会进入到自旋中,消耗了cpu资源.
JDK8中新增了原子类LongAdder, 弥补AtomicLong的上述缺点. AtomicInteger的缺陷在于共享资源只有一个,LongAdder的思路是将共享资源复制多份,让多个资源去被竞争.在使用的时候再把多个共享资源的数据整合计算得到真实的数据.
从原码具体看下实现
LongAdder longAdder = new LongAdder();
for (int i = 0; i < 10000; i++) {longAdder.add(1);
}
LongAdder的继承关系
public class LongAdder extends Striped64 implements Serializable
构造函数
public LongAdder() {
}
add方法
public void add(long x) {Cell[] as; long b, v; int m; Cell a;if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}
}
Cell[] as这里的cells是父类定义的,cells可以理解成上边说的共享资源复制了多份.保存在一个数组中
transient volatile Cell[] cells;
Cell的定义
- 使用Contended修饰,避免伪共享
- 其余代码和上一章介绍的atomicInteger内存基本一致
@sun.misc.Contended static final class Cell {//内部维护了long类型的valuevolatile long value;//构造函数Cell(long x) { value = x; }//CAS操作final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;//变量内存地址较当前对象的地址偏移量private static final long valueOffset;//初始化valueOffsetstatic {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}
}
继续看add方法:
- 如果cells为空, 就执行casBase
- base也是父类定义的,是基值变量. 一开始并非不高的情况下不会创建多个副本cells. 只会对基础值base做操作.
- casBase即熟悉的cas原子非阻塞操作
if ((as = cells) != null || !casBase(b = base, b + x)) {
final boolean casBase(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
- uncontended:表示运算操作是否成功
- m是cells的数组大小
- a = as[getProbe() & m]是获取当前线程可操作的cell
- 如果获取到了cell 就进行cas操作, 操作结果可能为false cas可能自旋几次会失败
- 失败了或者没有获取到cell执行longAccumulate
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);
longAccumulate的内容特别多,这里会对cells数组初始化和扩容
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {//for循环前主要获取h的值。h表示当前线程的hash值 对应在cells下标 int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();//true表示没有竞争wasUncontended = true;}//是否需要扩容boolean collide = false; // True if last slot nonempty//CAS的for循环 for (;;) {Cell[] as; Cell a; int n; long v;if ((as = cells) != null && (n = as.length) > 0) {//当前线程对应的cess是否为空if ((a = as[(n - 1) & h]) == null) {//cellsBusy是标识当前是否有线程在对cells在做初始化或者扩容.if (cellsBusy == 0) { // Try to attach new Cell创建一个CellCell r = new Cell(x); // Optimistically create//cellsBusy标识当前没有线程在对cells在做初始化或者扩容. 并将 cellsBusy标识置为1if (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {//将新建的cell加进数组rs[j] = r;created = true;}} finally {cellsBusy = 0;}//创建成功了跳出循环if (created)break;//失败了就是当前位置已经有了cell继续循环continue; // Slot is now non-empty}}//获取cellsBusy标识失败collide = false;}//上面位置已经有了cellelse if (!wasUncontended) // CAS already known to fail//是否存在竞争标志设为truewasUncontended = true; // Continue after rehash//未发生锁竞争 尝试将值累加在a上 else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))//尝试成功 break;else if (n >= NCPU || cells != as)collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) { // Expand table unless stale//扩容Cell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = advanceProbe(h);}//初始化cellselse if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize tableif (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}//初始化成功if (init)break;}//当`Cell`数组竞争激烈时尝试在`base`上进行累加else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))//尝试成功 break; // Fall back on using base}
}
二、并发包中的并发List- CopyOnWriteArrayList
先看下常用的ArrayList在多线程情况下会发生什么
List<Integer> list = new ArrayList<>();
Thread thread = new Thread(new Runnable() {@Overridepublic void run() {for (int i =0 ; i<100; i++){list.add(i);}}
});Thread thread1 = new Thread(() -> {for (int i =100 ; i<200; i++){list.add(i);}
});
thread.start();
thread1.start();
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 10at java.util.ArrayList.indexOf(ArrayList.java:321)at java.util.ArrayList.contains(ArrayList.java:304)at com.dxm.test.juc.ListTest.main(ListTest.java:40)
报了数组越界异常. ArrayList是可以自动扩容的,为什么会报越界?
下面是原码
transient Object[] elementData; /
public boolean add(E e) {ensureCapacityInternal(size + 1); // Increments modCount!!elementData[size++] = e;return true;
}
private void ensureExplicitCapacity(int minCapacity) {modCount++;// overflow-conscious codeif (minCapacity - elementData.length > 0)grow(minCapacity);
}
transient:简单说下elementData为什么用elementData修饰. transient作用是序列化和反序列化时忽略elementData. 主要因为容量问题,elementData可能存在空的元素.Arraylist想要屏蔽接口实现的序列化方法 自己实现自定义的序列化方法.
再看下add方法 add后数组元素个数大于当前elementData.length时会进行grow扩容.
当两个线程同时走到这行ensureCapacityInternal(size + 1); 代码时 并且刚好还剩一个位置时添加一个元素, 只有一个线程会成功执行 elementData[size++] = e; 另一个线程就会报数组越界了.
最后看下CopyOnWriteArrayList是如何保证线程安全的
CopyOnWriteArrayList内部有一把ReentrantLock锁
public class CopyOnWriteArrayList<E>implements List<E>, RandomAccess, Cloneable, java.io.Serializable {private static final long serialVersionUID = 8673264195747942595L;/** The lock protecting all mutators */final transient ReentrantLock lock = new ReentrantLock();
add方法。
- 首先获取锁
- 将数组copy一份
- 修改完再将引用指向新数组
其他修改 删除方法实现基本都一致, 这里之所以复制了一份数组是因为读操作没有锁,防止读的同时将原数组的某个元素修改了. CopyOnWriteArrayList的读操作是弱一致性的.
public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock();}
}