我们知道在多线程环境下,HashMap在初始化桶数组、put桶、插入链表以及树化等阶段都有线程安全问题,在jdk1.5之前我们通常用HashTable或Collections.synchronizedMap包装过的HashMap来保证线程安全,不过它们在执行任何操作时都需要锁住整个hash表,这显著的限制了并发性能,所以在jdk1.5我们的并发大神Doug Lea设计一个高性能且线程安全的集合——ConcurrentHashMap。
ps:本文默认是jdk1.8版本,如有别的版本会强调
那么ConcurrentHashMap在哪些地方做了并发控制呢?
一、put操作的并发控制
ConcurrentHashMap的工作原理与HashMap类似,同样也是在jdk1.8前后区别较大,这里只讨论保证线程安全的实现措施:
jdk1.7:使用了分段锁技术,将哈希表分成多个段,每个段拥有一个独立的锁,这样可以在多个线程访问哈希表时,只需要锁住需要操作的那个段,而不是整个哈希表,从而提高了并发性能,锁用的是ReentranLock。
jdk1.8:对1.7的实现方式进行了改进,使用节点锁的思想,即采用“cas+synchronized”的机制来保证线程安全。在1.8中,如果某个桶/段没有元素,那么使用CAS操作来添加新节点,如果有元素,则使用synchronized锁住当前桶/段,再次尝试put。这样可以避免分段锁机制下的锁粒度太大,再次提高了并发性能。
下面是jdk1.7中分段锁的实现:
static final class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K, V> next;
Node(int hash, K key, V val, Node<K, V> next) {this.hash = hash;this.key = key;this.val = val;this.next = next;}//...
}
static final class Segment<K,V> extends ReentrantLock implements Serializable {private static final long serialVersionUID = 2249069246763182397L;transient volatile HashEntry<K,V>[] table;transient int count;transient int modCount;transient int threshold:final float loadFactor;
}
我们可以看到,每个Segment都是ReentrantLock的实现,每个Segment包含一个HashEntry数组,每个HashEntry则包含一个k-v键值对。
接下来再看下jdk 1.8中“cas+synchronized”机制的代码实现:
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();//扰动计算,和HashMap一样int hash = spread(key.hashCode());int binCount = 0;//进入循环for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0){//也是懒加载,如果table为null或长度为0,则进行初始化tab = initTable();}
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//当前索引位置没有元素,则通过CAS操作尝试插入新节点if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED){//说明当前索引位置有元素,且hash值为MOVED,表示正在进行扩容。//帮助迁移数据tab = helpTransfer(tab, f);} else {//ps:到这步则进行链表/红黑树的节点遍历和插入操作V oldVal = null;//加锁,确保只有一个线程操作改桶位置的链表或红黑树synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {//遍历链表,找到相同的key的节点,更新值或插入新节点binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {//将新节点插入到链表末尾(尾插法)pred.next = new Node<K,V>(hash, key,value, null);break;}}}//遍历红黑树,找到相同的key的节点,更新至或插入新节点else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {//插入或更新值成功,判断是否需要进行树化操作if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}//累加元素个数,传入添加的数量1,以及链表节点个数,用于控制是否执行扩容操作addCount(1L, binCount);return null;
}
从上述代码可以看出,如果某个桶/段为空,那么使用CAS操作来添加新节点,如果第一个节点的hash为MOVED,表示当前段/桶正在进行扩容操作,那么就调用helpTransfer方法来协助扩容;否则,使用synchronized锁住当前段/桶,然后进行节点的添加操作。
二、初始化阶段的并发控制
先来了解一个重要的属性:
/*** 用来控制表初始化和扩容,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75* 当为负的时候,说明表正在初始化或扩张,*0:默认状态,表示数组还没有被初始化。*-1:初始化数组*-(1+n):n:表示活动的扩张线程*sizeCtl>0:记录下一次需要扩容的大小。为3/4数组最大长度,相当于扩容的阈值*/
private transient volatile int sizeCtl;
此处采用CAS操作,如果此时没有线程初始化,则去初始化,否则当前线程让出cpu时间片,等待下一次唤醒,源码如下:
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {//ps:进行初始化操作if ((sc = sizeCtl) < 0){//sizeCtl为负数,说明有线程在初始化//临时放弃cpu,让给优先级比自己高的线程或相同的线程Thread.yield(); // lost initialization race; just spin} else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//尝试把SIZECTL修改为-1(尝试获取锁)//ps:获取锁成功,进行初始化操作try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;//sc=n*3/4sc = n - (n >>> 2);}} finally {//初始化后,sizeCtl长度为数组长度的3/4sizeCtl = sc;}break;}}return tab;
}
三、扩容阶段的并发控制(transfer())
在扩容阶段,ConcurrentHashMap并没有一味的通过CAS或锁区限制多线程,而是通过多线程来加速扩容。
在分析之前,我们需要知道两件事:
ConcurrentHashMap通过ForwardingNode来记录当前桶是否被迁移,如果old Table[i] instanceOf ForwardingNode则说明处于i节点的桶已经被移动到newTable中了。它里面有一个nextTable变量,指向的是下一次扩容后的table。
transferIndex记录了当前扩容的桶索引,最开始为oldTable.length,它给下一个线程指定了要扩容的节点。
下面是大致的扩容流程:
通过CPU核数为每个线程计算划分任务,每个线程最少的任务是迁移16个桶。
将当前桶扩容的索引transferIndex赋值给当前线程,如果索引<=0,则说明扩容完毕,结束流程,否则
再将当前线程扩容后的索引赋值给transferIndex,比如如果transferIndex原来是32,那么赋值之后transferIndex应该变为16,这样下一个线程就可以从16开始扩容。通过CAS进行设置,不会有并发问题。
之后就可以对真正的扩容流程进行加锁操作了。
源码如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {//n:数组长度 stride:每个处理多少任务int n = tab.length, stride;//多喝处理n>>>3/核心数个任务,最少处理16个任务if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating//ps:表示第一个线程进此方法try {//扩容为原数组1倍@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;//创建一个fwd节点,用于控制并发。当一个节点为空或已被转移后,接设置fwd节点,表示处于move状态ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//是否继续向前查找boolean advance = true;//在完成之前重新再扫描一遍数组,看看有没完成的没boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 执行数据迁移// 且重新非陪transferIndex的值,用于不停向前推进更新迁移数据while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}//数据迁移完成的后置处理。包括重新检查一遍迁移数据以及归还线程。if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}
else if ((f = tabAt(tab, i)) == null)//把数组中null的元素的hash值置为MOVED,让循环体处理下一个节点,后续辅助线程发现节点为MOVE则会直接跳过advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)//表示已有线程再处理,让循环体处理下一个节点advance = true; // already processedelse {//锁住节点,开始真正的扩容流程synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {//ps:说明是node节点int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}//前面的节点不确定高低位,所以遍历f~LastRun范围的所有节点//分别逆序存入Ln或hn链表中for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}//存入之前的位置setTabAt(nextTab, i, ln);//存入改变后的位置setTabAt(nextTab, i + n, hn);//设置fwd,这样其它线程执行的时候,会跳过去setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {//ps:红黑树的处理//...省略}}}}}
}
End:希望对大家有所帮助,如果有纰漏或者更好的想法,请您一定不要吝啬你的赐教🙋。