cs,表示 Cell 数组的引用;b,表示获取的 base 值,类似于 AtomicLong 中全局变量的 value 值,在没有竞争的情况下数据直接累加到 base 上,或者扩容时,也需要将数据写入到 base 上;v,表示期望值;m,表示 Cell 数组的长度;a,表示 当前线程命中的 cell 表格。
public void add(long x) {Cell[] cs; long b, v; int m; Cell c;// 首次首线程 ((cs = cells) != null) 一定是false;此时走 casBase(long cmp, long val) 方法,当且仅当 cas 失败的时候,才会走到 if 中。if ((cs = cells) != null || !casBase(b = base, b + x)) {// getProbe(); 的返回的是线程中 threadLocalRandomProbe 字段,它是通过一个随机数字段,对于一个特定的线程这个值是固定的(除非刻意修改它)int index = getProbe();// true,无竞争;false,表示竞争激烈,多个线程 hash 到同一个 Cell,可能需要扩容boolean uncontended = true;// 条件一:Cell[] 为 null// 条件二:一般不会出现// 条件三:当前线程所在的 cell 为空,说明当前线程还没有更新过 Cell,应初始化一个 Cell// 条件四:更新当前线程所在的 cell 失败,说明现在竞争很激烈,多个线程 hash 到了同一个 Cell,应扩容if (cs == null || (m = cs.length - 1) < 0 ||(c = cs[index & m]) == null ||!(uncontended = c.cas(v = c.value, v + x)))// longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended, int index)longAccumulate(x, null, uncontended, index); // x,一般默认都是1;fn,默认是 null,uncontended,如果是第四个条件,那就是false。}
}
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended, int index) {if (index == 0) {// 如果 index 是0,那就强制为当前线程初始化ThreadLocalRandom.current(); // force initialization// getProbe(); 获取当前线程的 hash 值index = getProbe();wasUncontended = true; // true,表示无竞争,重新计算了当前线程的 hash 值后认为此次不算是一次竞争,因为都还没有进行初始化,肯定还不存在竞争激烈}// collide 表示扩容意向,false 表示一定不会扩容, true 表示可能会扩容for (boolean collide = false;;) { // True if last slot nonemptyCell[] cs; Cell c; int n; long v;// 第一种情况:Cell[] 数组已经被初始化了if ((cs = cells) != null && (n = cs.length) > 0) {// 当前前程的 hash 值运算后映射得到的 Cell 单元为 null,说明该 cell 没有被使用过if ((c = cs[(n - 1) & index]) == null) {// cellsBusy,初始化 cells,或者扩容 cells 需要获取锁。0,表示无锁状态。1,表示其他线程已经持有了锁。if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically create// casCellsBusy(),通过 cas 操作修改 cellsBusy 的值,cas 成功代表获取锁,返回 true// 尝试加锁,成功后 cellsBusy = 1if (cellsBusy == 0 && casCellsBusy()) {try { // Recheck under lockCell[] rs; int m, j;// 在有锁的情况下再检测一遍之前的判断if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & index] == null) {rs[j] = r; // 将 Cell 添加到 Cell[] 数组中break;}} finally {cellsBusy = 0;}continue; // Slot is now non-empty}}collide = false;}// 这个分支,表示我竞争失败了,将 wasUncontended = true; 执行 index = advanceProbe(index); 重新竞争else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehashelse if (c.cas(v = c.value,(fn == null) ? v + x : fn.applyAsLong(v, x)))break;// NCPU,当前计算机 CPU 的个数,Cell 数组扩容时会用到else if (n >= NCPU || cells != cs) // 如果 n = cs.length 大于了 CPU 的数量或者 cells 和我们的 cs 不是同一个,那就别扩容了collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 && casCellsBusy()) {try {// 当前的 cells 数组和最先赋值的cs是同一个,代表没有被其他线程扩容过if (cells == cs) // Expand table unless stalecells = Arrays.copyOf(cs, n << 1); // 按位左移1位,相当于扩大为原来大小的两倍,扩容后再将原先的数组元素拷贝到新数组中} finally {cellsBusy = 0; // 释放锁}collide = false;continue; // Retry with expanded table}// advanceProbe(index); 重置当前线程的 hash 值index = advanceProbe(index);}// 第二种情况:Cell[] 数组还没有进行初始化,尝试对它加锁,并首次 Cell[] 数组初始化else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {try { // Initialize tableif (cells == cs) {Cell[] rs = new Cell[2];rs[index & 1] = new Cell(x);cells = rs;break;}} finally {cellsBusy = 0;}}// 第三种情况:Cell[] 数组正在进行初始化的时候(第二步还没有完成,其他线程来了),则尝试直接在基数 base 上进行累加操作,兜底的操作// Fall back on using baseelse if (casBase(v = base,(fn == null) ? v + x : fn.applyAsLong(v, x)))break;}
}
static final int getProbe() {return (int) THREAD_PROBE.get(Thread.currentThread());}
看下里面的 THREAD_PROBE,在 jdk8 中用的是 UNSAFE 类,这里是 MethodHandles.lookup()。
MethodHandles.Lookup l = MethodHandles.lookup();
THREAD_PROBE = l.findVarHandle(Thread.class, "threadLocalRandomProbe", int.class);
左移(<<)规则:符号位不变,高位溢出截断,低位补零。比如 -1 << 2 = -4 (为方便讲解,图示的补码为-1)
位运算规则:
Java数值运算过程中都是先将十进制转换为二进制然后再进行运算,再把二进制数据转换为十进制展现给用户。二进制运算规则如下:
1、对于有符号的而言,最高位为符号位,0表示正数,1表示负数。
2、正数的原码,反码和补码都一样,三码合一。
3、负数的反码:符号位保持不限,其他位取反,负数的补码:补码 + 1。
4、0的反码和补码都是0。
下面以 -1 为例子展示原码、反码和补码的转换关系(以int数据类型为例,int类型在Java中占4字节):