本文已收录于:https://github.com/danmuking/all-in-one(持续更新)
前言
哈喽,大家好,我是 DanMu。这篇文章想和大家聊聊 ConcurrentHashMap 相关的知识点。严格来说,ConcurrentHashMap 属于java.lang.current
,并属于通常意义上的容器,但是考虑到面试官非常喜欢打出 HashMap 和 ConcurrentHashMap 这套丝滑小连招,因此我决定将它们放在一起,来加深大家的印象,发车!
数据结构
与 HashMap 一样,ConcurrentHashMap 的底层结构在JDK1.8的时候出现过一次比较大的变化,这篇文章将以JDK1.8的数据结构为主,对于1.8前的结构,大家只需要大致了解即可(JDK版本都已经更新到21了,让我看看哪个活化石面试官还问老版的数据结构)
JDK1.8之前
JDK1.8
在JDK1.8之前,ConcurrentHashMap 采用 **Segment 数组 + HashEntry 数组 + 链表 **的结构,因此其最大并发数会收到 Segment 数量的限制。在 JDK 1.8中,对 ConcurrentHashMap 主要进行了两点改进:
- 细化锁的粒度,从针对 Segment 加锁修改为针对 每个数组元素单独加锁,
- 与 HashMap 一样,将 链表 的结构修改为 链表/红黑树 的结构,提高了搜索链表的效率
对ConcurrentHashMap的数据结构有了一个基础的认识之后,接下来就准备深入底层,彻彻底底的搞懂 ConcurrentHashMap 的实现原理。
ConcurrentHashMap 的源码相对于 HashMap 复杂了很多,希望大家能静下心来慢慢看,我会尽力帮助大家简单的理解源码中的思想。
核心源码详解
重要的成员变量
老规矩,还是先来看看 ConcurrentHashMap 中维护了哪些重要的成员属性:
// ConcurrentHashMap 中存放元素的最大值
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认的初始化大小
private static final int DEFAULT_CAPACITY = 16;
// 从链表转换为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
// 从红黑树转换为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 开启红黑树转换的数组阈值
// 只有同时满足 TREEIFY_THRESHOLD 和 MIN_TREEIFY_CAPACITY 才会转换为红黑树
static final int MIN_TREEIFY_CAPACITY = 64;
// 接下来几个都是标志位,用来表示扩容过程中的状态,在后面会详细解释
static final int MOVED = -1; // 表示正在转移
static final int TREEBIN = -2; // 表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
// 实际存放元素的数组
transient volatile Node<K,V>[] table;
// 在扩容时会用到的数组
private transient volatile Node<K,V>[] nextTable;
/*** 用来控制表初始化和扩容的* 默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,先有个印象* -1 说明正在初始化-N 说明有 N-1 个线程正在进行扩容0 默认状态,表明table没有初始化>0 表示 table 扩容的阈值,如果 table 已经初始化。*/
private transient volatile int sizeCtl;
// 扩容时使用,用来表示扩容进行到哪里
private transient volatile int transferIndex;
初始化
接下来,来看看 ConcurrentHashMap 在初始化的时候做了什么
// 默认的初始化函数
// 可以看到默认的初始化函数啥都没干,所以实际分配空间也是推迟到第一次添加元素的时候
// 这里也是采用的懒加载
public ConcurrentHashMap() {
}
// 指定初始化容量的初始化函数
public ConcurrentHashMap(int initialCapacity) {// 这边先处理了一下两个边界情况if (initialCapacity < 0)throw new IllegalArgumentException();int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :// tableSizeFor 的作用是找到最接近 initialCapacity 的2的次幂// 在扩容的时候也是扩容为原容量的2倍// 因此 ConcurrentHashMap 的容量永远是2的幂次tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;
}// 因为 ConcurrentHashMap 使用了懒加载,因此实际分配空间将会推迟到第一次添加函数时
// 这里为了内容的连贯性,将它们放在一起
// initTable 初始化空间
private final Node<K,V>[] initTable() {// 用来存放 table 成员变量Node<K,V>[] tab; // 用来存放 sizeCtl 成员变量int sc;// 判断空间初始化了没有// 注意:这里可能有多个线程想进行初始化,因此这里用了一个循环// 只有 cas 成功的线程才能进行初始化// 其他 cas 失败的线程会进入自旋,找到 ConcurrentHashMap 初始化完成,然后退出。while ((tab = table) == null || tab.length == 0) {// cas 失败的线程会走到这个条件// 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。// 失败的线程释放 CPU,或者在 while 里自旋if ((sc = sizeCtl) < 0)// 让出 CPU 使用权Thread.yield(); // lost initialization race; just spin// cas 修改 sc 的值为 -1,只有一个线程能够成功,然后开始初始化else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {// 上面说了,如果是带容量的初始化函数会将容量暂存在 sc 里// 所以如果 sc > 0,就是走带容量的流程,否则就是默认初始化容量int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 给成员变量赋值table = tab = nt;// 给 sc 赋值,现在 sc 存放的是扩容阈值// sc为0.75数组大小sc = n - (n >>> 2);}} finally {// 赋值给对应的成员变量sizeCtl = sc;}break;}}return tab;
}
添加元素
下面,来看看如何向 ConcurrentHashMap 中添加元素。
要向 ConcurrentHashMap 中添加元素可分为这几个步骤:
- 计算 key 对应的 hash 值,根据 hash 定位到对应的数组下标
- 先快速判断一下对应下标是否为空,如果为空,直接使用 cas 将元素写入
- 否则的话就需要遍历对应数组位置的链表/红黑树,为保证遍历过程中数据不发生变化,需要先给对应数组位置加锁。
- 如果在遍历过程中遇到相同的 key 就直接覆盖,否则就添加到尾部。
大致理解一下流程应该能帮助大家更好的理解源码,接下来直接上硬菜:
// 向 ConcurrentHashMap 中添加元素
public V put(K key, V value) {return putVal(key, value, false);
}
// 添加元素中实际调用的函数
final V putVal(K key, V value, boolean onlyIfAbsent) {// 边界条件判断,ConcurrentHashMap 是不允许添加 null 的if (key == null || value == null) throw new NullPointerException();// 计算 hashint hash = spread(key.hashCode());// 用于记录相应链表的长度int binCount = 0;// 先把table的值存到tab里// 这里加了一个循环,和上面初始化时候的循环作用类似for (Node<K,V>[] tab = table;;) {// f 用来存放 key 对应数组位置的第一个节点Node<K,V> f; int n, i, fh;// 如果数组"空",进行数组初始化if (tab == null || (n = tab.length) == 0)// 上面已经说过了tab = initTable();// 找该 hash 值对应的数组下标,得到第一个节点 felse if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 如果数组该位置为空,// 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了// 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}// hash 居然可以等于 MOVED,这个可以先放着,涉及到后面扩容的知识else if ((fh = f.hash) == MOVED)// 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了tab = helpTransfer(tab, f);// 到这里就是说,hash 对应的数组位置已经有值了,需要遍历链表/红黑树else { V oldVal = null;// 对该数组位置加锁// 接下来就是遍历了// 如果有相同的 key 就覆盖,否则添加到尾部synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) { // 头节点的 hash 值大于 0,说明是链表// 用于累加,记录链表的长度binCount = 1;// 遍历链表for (Node<K,V> e = f;; ++binCount) {K ek;// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}// 到了链表的最末端,将这个新值放到链表的最后面Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 红黑树的遍历,这里可以不用看了,很复杂,面试也不大可能会考else if (f instanceof TreeBin) { // 红黑树Node<K,V> p;binCount = 2;// 调用红黑树的插值方法插入新节点if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8if (binCount >= TREEIFY_THRESHOLD)// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树// 具体源码我们就不看了,扩容部分后面说treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;
}
获取元素
获取元素的逻辑和添加元素差不多,这里就不在赘述了:
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 计算 hash 值int h = spread(key.hashCode());// 根据 hash 值确定节点位置if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 如果搜索到的节点 key 与传入的 key 相同且不为 null,直接返回这个节点 if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// 这边涉及到一些后面扩容的知识,可以先不看,等看完后面扩容的知识在来看就懂了// 如果 eh<0(hash <0) 说明这个节点在树上或者在扩容中并且转移到新数组了// 所以这个 find 方法是树节点的 find 方法或者是 fwd 节点的 find 方法else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 否则遍历链表 找到对应的值并返回while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}// 这是 ForwardingNode 节点的 find 方法
Node<K,V> find(int h, Object k) {// 注意这里的nextTable 是扩容时传进来的outer: for (Node<K,V>[] tab = nextTable;;) {Node<K,V> e; int n;// 没找到直接返回 nullif (k == null || tab == null || (n = tab.length) == 0 ||(e = tabAt(tab, (n - 1) & h)) == null)return null;// 自旋for (;;) {int eh; K ek;// 找到了就返回节点if ((eh = e.hash) == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;// 同样要判断是树节点还是 ForwardingNode 节点if (eh < 0) {// ForwardingNode 节点就继续往里找if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;continue outer;}else// 树节点 就调用数节点的 find 方法return e.find(h, k);}// 没找到就返回n ullif ((e = e.next) == null)return null;}}
}
扩容
在 ConcurrentHashMap 中还有一个非常重要的知识点,就是 ConcurrentHashMap 的扩容,与 HashMap 的库容不同,由于 ConcurrentHashMap 需要再扩容的过程中不仅要有高性能,还要保证线程的安全,因此 ConcurrentHashMap 的扩容过程可以说是相当的复杂,希望大家能够耐下心来慢慢慢看,我也会尽力帮大家理解 ConcurrentHashMap 的扩容思想。
总体来说,ConcurrentHashMap 的**核心思想就是:分而治之。**在扩容过程中,ConcurrentHashMap 将数组划分成若干个不重合的任务包,并将任务包分配给多个线程,来达到提高性能的同时保证线程安全的目的。
但是体现在源码中,可不像说起来这么简单。来,上硬菜:
// tryPresize 会在将链表转换为红黑树的 treeifyBin 中调用
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
// 也就是说,如果当前 ConcurrentHashMap 的数组长度为8,那么 size 就是 16
private final void tryPresize(int size) {// c: size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。// 注意,c 不表示扩容后的数组大小int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2); // 0.75 * n}} finally {sizeCtl = sc;}}}// 这个分支负责处理一些边界情况else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {// 扩容戳,为一个负值// 高16为为扩容标记// 低16为为扩容线程数+1,从2开始是因为1已经作为初始化标记// 这个函数会在后面详细分析int rs = resizeStamp(n);// 这个分支表示当前已经开始扩容,用来帮助扩容if (sc < 0) {Node<K,V>[] nt;// 这个分支用来处理边界,只要满足一个条件就直接退出,不帮助扩容// RESIZE_STAMP_SHIFT 表示左移16位,用来比较扩容标记是否相同// 这个条件有问题,不会走到if ((sc >>> RESIZE_STAMP_SHIFT) != rs || // 最后一个扩容进程在执行最后一个任务sc == rs + 1 ||// 帮助扩容的线程数达到上线sc == rs + MAX_RESIZERS || // 新数组还没有初始化完成(nt = nextTable) == null ||// 所有的任务包都已经分发完transferIndex <= 0)break;// cas 将 sc+1 表示扩容进程数+1,然后开始帮助扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 第一个开始扩容的线程,cas 将 sc 赋值为 rs+2 开始,开始扩容else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}
}
从上面可以看到,具体实现扩容的逻辑都通过调用 transfer 来实现,接下来就来看看 transfer 函数:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// n 是原数组的长度int n = tab.length, stride;// 前面说了,扩容的核心思想就分而治之,将数组划分为多个任务包,stride就是每个任务包的大小// stride 在单核下直接等于 n,就等于只有一个任务包// 多核模式下为 (n>>>3)/NCPU,最小值是 16// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 如果 nextTab 为 null,先进行一次初始化// 前面我们说了,外围会保证第一个发起扩容的线程调用此方法时,参数 nextTab 为 null// 之后帮助扩容的线程调用此方法时,nextTab 不会为 nullif (nextTab == null) {try {// 容量翻倍Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}// nextTable 是 ConcurrentHashMap 中的属性nextTable = nextTab;// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置transferIndex = n;}int nextn = nextTab.length;// ForwardingNode 翻译过来就是正在被迁移的 Node// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了// 所以它其实相当于是一个标志。ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了// 初始化为 trueboolean advance = true;// 迁移工作是否全部完成boolean finishing = false; // to ensure sweep before committing nextTab/** 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看* */// i 是位置索引,bound 是边界,注意是从后往前// 这里可以理解为初始化,并不是要使用的值for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// advance 为 true 表示可以进行下一个位置的迁移了while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;// 将 transferIndex 值赋给 nextIndex// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 第一次会走这个分支,将 transferIndex 赋值为 nextIndex-stride// 第一个进程就负责处理 nextIndex-stride 到 nextIndex 这一个任务包// 任务包会被从后往前处理else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前bound = nextBound;i = nextIndex - 1;// 刚开始当前任务肯定没完成advance = false;}}// 所有任务完成if (i < 0 || // 不应该发生i >= n || // 不应该发生i + n >= nextn) {int sc;// 扩容完成if (finishing) {// 所有的迁移操作已经完成nextTable = null;// 将新的 nextTab 赋值给 table 属性,完成迁移table = nextTab;// 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍sizeCtl = (n << 1) - (n >>> 1);return;}// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 任务结束,方法退出if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了finishing = advance = true;i = n; // recheck before commit}}// 下面几个分支就是进行迁移的代码了,比较简单// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// 头节点的 hash 大于 0,说明是链表的 Node 节点if (fh >= 0) {// 需要将链表一分为二,// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的// lastRun 之前的节点需要进行克隆,然后分到两个链表中int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 其中的一个链表放在新数组的位置 isetTabAt(nextTab, i, ln);// 另一个链表放在新数组的位置 i+nsetTabAt(nextTab, i + n, hn);// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了setTabAt(tab, i, fwd);// advance 设置为 true,代表该位置已经迁移完毕advance = true;}// 红黑树可以不看else if (f instanceof TreeBin) {// 红黑树的迁移TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 如果一分为二后,节点数小于等于6,那么将红黑树转换回链表ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;// 将 ln 放置在新数组的位置 isetTabAt(nextTab, i, ln);// 将 hn 放置在新数组的位置 i+nsetTabAt(nextTab, i + n, hn);// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了setTabAt(tab, i, fwd);// advance 设置为 true,代表该位置已经迁移完毕advance = true;}}}}}
}
恭喜你,看到这里你已经对于扩容有了相当深入的理解!扩容的代码确实相当的硬核,需要大家有一点耐心,慢慢理解。
答疑
在源码中还有一些用一句两句话很难解释清楚的问题,就在这部分进行更详细的解读
扩容戳有什么用
先来填个前面留下的坑
resizeStamp 会生成一个扩容戳,要理解
要理解扩容戳,必须从二进制的角度来分析。resizeStamp 方法就一句话:
static final int resizeStamp(int n) {return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
其中 RESIZE_STAMP_BITS
是一个默认值 16。这里面关键就是 Integer.numberOfLeadingZeros(n) 这个方法,这个方法源码就不贴出来了,实际上这个方法就是做一件事,那就是获取当前数据转成二进制后的最高位的 1 前的 0 的个数。这句话有点拗口,举个例子:
以 16 为准,16 转成二进制是10000
,最高非 0 位是在第 5 位,因为 int 类型是 32 位,所以他前面还有 27 位,而且都是 0,那么这个方法得到的结果就是 27(1 的前面还有 27 个 0)。
然后1 << (RESIZE_STAMP_BITS - 1)
在当前版本就是1<<15
,也就是得到一个二进制数1000 0000 0000 0000
,这里也是要做一件事,把这个 1 移动到第 16 位。最后这两个数通过|
操作一定得到的结果就是第 16 位是 1,因为 int 是 32 位,最多也就是 32 个 0,而且因为 n 的默认大小是 16(ConcurrentHashMap 默认大小),所以实际上最多也就是 27(11011)个 0,也就是说这个数最高位的 1 也只是在第五位,执行|
运算最多也就是影响低 5 位的结果。扩容戳在被使用的时候会先左移 16 位,然后赋值给 sc,所以这边计算出的扩容戳,其实是高 16 位。上面说的高 16 位代表当前扩容的标记,低 16 位代表了扩容的线程数的扩容戳实际上指的是 rs 赋值后的 sizeCtl。
注意:这里之所以要保证第 16 位为 1,是为了保证 sizeCtl 变量为负数,扩容戳在被使用时会先左移 16 位,这样最高位永远为 1 保证 sizeCtl 变量为负数。
首次扩容为什么计数要 +2 而不是 +1
我们上面已经说了扩容戳的作用:
- 高 16 位代表当前扩容的标记
- 低 16 位代表了扩容的线程数。
知道了这两个条件就好理解了,因为 rs 最终是要赋值给 sizeCtl 的,而 sizeCtl 负数才代表扩容,而将 rs 左移 16 位就刚好使得最高位为 1,此时低 16 位全部是 0,而因为低 16 位要记录扩容线程数,所以应该 +1,但是这里是 +2,原因是 sizeCtl 中 -1 这个数值已经被使用了,用来代替当前有线程准备扩容,所以如果直接 +1 是会和标志位发生冲突。为了避开标志位,所以初始化第一次记录扩容线程数的时候才需要 +2。
点关注,不迷路
好了,以上就是这篇文章的全部内容了,如果你能看到这里,非常感谢你的支持!
如果你觉得这篇文章写的还不错, 求点赞👍 求关注❤️ 求分享👥 对暖男我来说真的 非常有用!!!
白嫖不好,创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见!
如果本篇博客有任何错误,请批评指教,不胜感激 !
最后推荐我的IM项目DiTing(https://github.com/danmuking/DiTing-Go),致力于成为一个初学者友好、易于上手的 IM 解决方案,希望能给你的学习、面试带来一点帮助,如果人才你喜欢,给个Star⭐叭!
参考资料
助力面试之ConcurrentHashMap面试灵魂拷问,你能扛多久
JUC集合: ConcurrentHashMap详解