简介
ConcurrentHashMap是线程安全的HashMap实现,这里主要研究JDK8后的ConcurrentHashMap,下面是ConcurrentHashMap的简单结构:
ConcurrentHashMap基于HashMap的基本逻辑,通过CAS + synchronized 来保证并发安全性。ConcurrentHashMap使用的数组及数组的每个节点都为volatile类型,通过CAS进行更新删除操作,而所有的链表操作都需要通过synchronized锁定链表的头节点,然后进行操作。
/*** The array of bins. Lazily initialized upon first insertion.* Size is always a power of two. Accessed directly by iterators.*/transient volatile Node<K,V>[] table;/*** Key-value entry. This class is never exported out as a* user-mutable Map.Entry (i.e., one supporting setValue; see* MapEntry below), but can be used for read-only traversals used* in bulk tasks. Subclasses of Node with a negative hash field* are special, and contain null keys and values (but are never* exported). Otherwise, keys and vals are never null.*/static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;}
核心方法
1.put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;}
我们可以看到put是通过一个死循环来实现,在循环逻辑内:
- 首先检查核心的Node<K,V>[] table是否已经初始化,如果没有初始化,则利用CAS将sizeCtl的值置为-1进行初始化。
- 通过CAS查询key相应的槽位是否为 null,若为null直接通过CAS将键值对放入槽位。
- 如果相应的槽位已经有节点,并且其hash值为-1,则表示正在进行扩容,则当前线程帮忙进行扩容。
- 否则通过synchronized锁住槽位内的节点即链表的头结点,然后遍历链表,寻找是否有hash值及key值相同的节点,若有则将value设置进去,否者创建新的节点加入链表。
- 通过addCount函数更新ConcurrentHashMap键值对的数量。
2.get方法
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;}
get方法实现的逻辑比较简单:
- 利用key通过cas的方式获取其对应槽位的节点,若该节点就是想要查询的节点,那就直接返回value。
- 如果槽位内节点的hash值小于0则说明正在进行扩容,则通过ForwardingNode的find函数去新的数组nextTable中进行查找。
- 以上都不符合的话,就直接遍历节点,匹配就返回,否则最后就返回null。
3.扩容方法
/*** Helps transfer if a resize is in progress.*/final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {int rs = resizeStamp(tab.length);while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {transfer(tab, nextTab);break;}}return nextTab;}return table;}/*** Moves and/or copies the nodes in each bin to new table. See* above for explanation.*/private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
扩容的逻辑比较复杂,如果扩容时有多个线程,那么每个线程都可以通过helpTransfer函数帮忙进行扩容:
- 首先新建一个两倍长度的数组nextTable。
- 初始化ForwardingNode节点,其中保存了新数组nextTable的引用,在处理完每个槽位节点后当做占位节点,表示该槽位已经处理过了。
- 通过for循环处理每个槽位中的链表元素,处理完后在这个槽位内通过CAS插入初始化的ForwardingNode节点,用于告诉其它线程该槽位已经处理过了。
- 如果某个槽位已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为MOVED,值为-1,则直接跳过,继续处理下一个槽位内的节点。
4.计数方法
/*** {@inheritDoc}*/public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);}/*** A padded cell for distributing counts. Adapted from LongAdder* and Striped64. See their internal docs for explanation.*/@sun.misc.Contended static final class CounterCell {volatile long value;CounterCell(long x) { value = x; }}final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
代码里的变量baseCount用于在无竞争环境下记录元素的个数,每当插入元素或删除元素时都会利用CAS更新键值对个数。
当有线程竞争时,会使用CounterCell数组来计数,每个ConuterCell都是一个独立的计数单元。线程可以通过ThreadLocalRandom.getProbe() & m找到属于它的CounterCell进行计数。这种方法能够降低线程的竞争,相比所有线程对一个共享变量不停进行CAS操作性能上要好很多。这里的CounterCell数组初始容量为2,最大容量是机器的CPU数。
注意这里有个@sun.misc.Contended,这个注解用于解决伪共享问题。所谓伪共享,就是在同一缓存行cache line(CPU缓存的基本单位)中连续存储了多个变量,当其中一个变量被修改时,会导致其他变量也失效,会降低计算机cache的缓存命中率并且导致内存总线流量大增。