1. ConcurrentHashMap
为什么要使用ConcurrentHashmap
在多线程的情况下,使用HashMap是线程不安全的。另外可以使用Hashtable,其是线程安全的,但是Hashtable的运行效率很低,之所以效率低下主要是因为其实现使用了synchronized关键字对put等操作进行加锁,而synchronized关键字加锁是对整个对象进行加锁,也就是说在进行put等修改Hash表的操作时,锁住了整个Hash表,从而使得其表现的效率低下。所以最终就诞生了ConcurrentHashMap.
锁分段技术
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的 线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么 当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并 发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存 储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数 据也能被其他线程访问。
2. ConcurrentHashMap-JDK1.7
ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Seqment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可以实现多线程put操作。接下来分析JDK1.7版本中ConcurrentHashMap的实现原理。
2.1. 数据结构
ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:
通过这样的结构,在同步的过程中,如果是对不同的segment中的数据进行操作,就不涉及到“竞争”关系,只对该元素所在的segment加锁即可,这样在最理想的情况下,ConcurrentHashMap就可以最高支持和segment数量的写操作,因此就提升了并发能力。
2.1.1. segment数据结构
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile int count; transient int modCount; transient int threshold; transient volatile HashEntry<K,V>[] table; final float loadFactor; }
详细解释一下Segment里面的成员变量的意义:
count:Segment中元素的数量modCount:对table的大小造成影响的操作的数量(比如put或者remove操作)threshold:阈值,Segment里面元素的数量超过这个值依旧就会对Segment进行扩容table:链表数组,数组中的每一个元素代表了一个链表的头部loadFactor:负载因子,用于确定threshold
count用来统计该段数据的个数,它是volatile变量,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。这利用了 Java 5中对volatile语义的增强,对同一个volatile变量的写和读存在happens-before关系。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时会还会详述。threashold用来表示需要进行rehash的界限值。table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的 table值而不需要同步。loadFactor表示负载因子。
2.1.2. HashEntry
Segment中的元素是以HashEntry的形式存放在链表数组中的,看一下HashEntry的结构:
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; }
可以看到HashEntry的一个特点,除了value以外,其他的几个变量都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next 引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。
2.2. 初始化
在java7中,初始化ConcurretnHashMap主要根据三个参数,他们分别是
initialCapacity
:初始容量。实际操作的时候需要平均分给每个segment。loadFactor
:负载因子,决定了哈希表的扩容阈值。实对每个segment内部扩容使用的,并不是给segment扩容的。concurrencyLevel
:并发级别,表示希望支持的最大线程并发数(也就是segment的个数)。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { // 1. 校验参数是否合法if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 并发级别不能超过最大分段数 MAX_SEGMENTSif (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 2. 计算 Segment 数量(ssize 为 2 的幂次方,方便位运算定位)int sshift = 0; // 用于记录位移次数int ssize = 1; // 初始 Segment 数量while (ssize < concurrencyLevel) { ++sshift; // 每次位移,表示 ssize 向左移一位,2 倍扩展ssize <<= 1; // ssize <<= 1 相当于 ssize = ssize * 2}// 记录 Segment 的位移偏移量和掩码值segmentShift = 32 - sshift; // 用于定位键的 SegmentsegmentMask = ssize - 1; // 掩码,保证索引不会越界this.segments = Segment.newArray(ssize); // 创建 Segment 数组// 3. 如果初始容量大于最大值,则设为最大值if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 4. 计算每个 Segment 的初始容量int c = initialCapacity / ssize; // 平均分配给每个 Segment 的容量if (c * ssize < initialCapacity) ++c; // 如果不能整除,需要多分配一点// 5. 计算每个 Segment 容量的最小 2 的幂次方int cap = 1; while (cap < c) cap <<= 1; // 找到大于等于 c 的最小 2 的幂次方// 6. 初始化每个 Segment,设置其容量和负载因子for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
这个初始化方法在使用无参初始化的时候会默认被调用,创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) 的空散列映射表。
static final int DEFAULT_INITIAL_CAPACITY= 16; /** * 散列映射表的默认装载因子为 0.75,该值是 table 中包含的 HashEntry 元素的个数与* table 数组长度的比值* 当 table 中包含的 HashEntry 元素的个数超过了 table 数组的长度与装载因子的乘积时,* 将触发 再散列* 在构造函数中没有指定这个参数时,使用本参数*/ static final float DEFAULT_LOAD_FACTOR= 0.75f; /** * 散列表的默认并发级别为 16。该值表示当前更新线程的估计数* 在构造函数中没有指定这个参数时,使用本参数*/ static final int DEFAULT_CONCURRENCY_LEVEL= 16;原文链接:https://blog.csdn.net/dingjianmin/article/details/79776646
/** * 创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) * 的空散列映射表。*/ public ConcurrentHashMap() { // 使用三个默认参数,调用上面重载的构造函数来创建空散列映射表this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
2.3. 定位segment
前面介绍了concurrenthashmap通过segment分段,并对segment加锁,来提升并发能力。那么在插入数据的时候肯定要先定位到segment。在java7的源码中,ConcurrentHashMap会首先使用变种hash算法对元素的哈市Code进行再散列。进行再散列的目的是减少哈希冲突,使元素能够均匀地分布在不同的segment上,从而提升容器的存取效率。
/*** Applies a supplemental hash function to a given hashCode, which* defends against poor quality hash functions. This is critical* because ConcurrentHashMap uses power-of-two length hash tables,* that otherwise encounter collisions for hashCodes that do not* differ in lower bits. Note: Null keys always map to hash 0.*/
static int hash(int h) {// 扰动函数:将 hashCode 的高 16 位和低 16 位进行异或运算h += (h << 15) ^ 0xffffcd7d;h ^= (h >>> 10);h += (h << 3);h ^= (h >>> 6);h += (h << 2) + (h << 14);return h ^ (h >>> 16);
}
hash函数的输入是对象原始的hashCode,直接来自于key.hashCode();
返回扰动后的哈希值,这个值会被用来定位到具体地segment。定位具体的segment所使用的函数是segmentFor(int hash)
final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
2.4. ConcurrentHashMap的操作
2.4.1. get操作
ConcurrentHashMap的get操作是不用加锁的,我们这里看一下其实现:
1 public V get(Object key) { 2 int hash = hash(key.hashCode()); 3 return segmentFor(hash).get(key, hash); 4 }
第二行,对hash值进行了二次hash,之所以要进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。
看第三行,segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数.
get操作的高效之处在于整个get过程不需要加锁。我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHash-Map的get操作是如何做到不加锁不出问题的呢?原因是它的get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前 Segement大小的count字段和用于存储值的HashEntry的value。
transient volatile int count;
volatile Vvalue;
2.4.2. put操作
由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个 步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位 置,然后将其放在HashEntry数组里。
是否需要扩容:在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阈值,则对数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap 是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容 之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
如何扩容:在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。
//首先,根据 key 计算出对应的 hash 值:public V put(K key, V value) { if (value == null) //ConcurrentHashMap 中不允许用 null 作为映射值throw new NullPointerException(); int hash = hash(key.hashCode()); // 计算键对应的散列码// 根据散列码找到对应的 Segment return segmentFor(hash).put(key, hash, value, false); }//根据 hash 值找到对应的 Segment:/** * 使用 key 的散列码来得到 segments 数组中对应的 Segment */ final Segment<K,V> segmentFor(int hash) { // 将散列值右移 segmentShift 个位,并在高位填充 0 // 然后把得到的值与 segmentMask 相“与”// 从而得到 hash 值对应的 segments 数组的下标值// 最后根据下标值返回散列码对应的 Segment 对象return segments[(hash >>> segmentShift) & segmentMask]; }//在这个 Segment 中执行具体的 put 操作:V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); // 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap try { int c = count; if (c++ > threshold) // 如果超过再散列的阈值rehash(); // 执行再散列,table 数组的长度将扩充一倍HashEntry<K,V>[] tab = table; // 把散列码值与 table 数组的长度减 1 的值相“与”// 得到该散列码对应的 table 数组的下标值int index = hash & (tab.length - 1); // 找到散列码对应的具体的那个桶HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { // 如果键 / 值对以经存在oldValue = e.value; if (!onlyIfAbsent) e.value = value; // 设置 value 值} else { // 键 / 值对不存在 oldValue = null; ++modCount; // 要添加新节点到链表中,所以 modCont 要加 1 // 创建新节点,并添加到链表的头部 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // 写 count 变量} return oldValue; } finally { unlock(); // 解锁} `}
3. ConcurrentHashMap-JDK1.8
并发编程在高并发场景下是一个绕不开的话题,ConcurrentHashMap 是 Java 提供的一种线程安全的哈希表实现。与 HashMap 不同,ConcurrentHashMap 能够在多线程环境下保证数据的安全访问。在 JDK1.8 之前,它通过分段锁(Segment)来实现高效并发控制,而在 JDK1.8 中,ConcurrentHashMap 进行了重大改进,摒弃了分段锁,采用更高效的数据结构和锁机制来提升并发性能。
3.1. 简介
不再使用 Segment 分段锁,而是基于Node+ CAS(Compare-And-Swap) + synchronized 的组合进行并发控制。锁粒度更细:在单个桶(Node)上进行同步操作。使用 数组 + 链表 + 红黑树 作为底层数据结构,当链表长度超过 8 时(并且达到一定的容量要求--node数组超过64,否则不会树化,而是进行扩容),链表会转化为红黑树,以提升查询性能。 Java 8 中,锁粒度更细,synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 hash 不冲突,就不会产生并发,就不会影响其他 Node 的读写,效率大幅提升。
数据结构
底层数据结构和java8的hashMap是一样的
3.2. 源码分析
3.2.1. Node节点
static class Node<K,V> {final int hash; // 哈希值final K key; // 键volatile V val; // 值volatile Node<K,V> next; // 下一个节点
}
每个 Node 里面是 key-value 的形式,并且把 value 用 volatile 修饰,以便保证可见性,同时内部还有
一个指向下一个节点的 next 指针。
3.2.2. put方法
public V put(K key, V value) {return putVal(key, value, false);
}/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {// key 和 value 不能为空if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {// f = 目标位置元素Node<K,V> f; int n, i, fh;// fh 后面存放目标位置的元素 hash 值if (tab == null || (n = tab.length) == 0)// 数组桶为空,初始化数组桶(自旋+CAS)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 桶内为空,CAS 放入,不加锁,成功了就直接 break 跳出if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;// 使用 synchronized 加锁加入节点synchronized (f) {if (tabAt(tab, i) == f) {// 说明是链表if (fh >= 0) {binCount = 1;// 循环加入新的或者覆盖节点for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {// 红黑树Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;
}
- 根据 key 计算出 hashcode 。
- 判断是否需要进行初始化。
- 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
- 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
- 如果都不满足,则利用 synchronized 锁写入数据。
- 如果数量大于 TREEIFY_THRESHOLD 则要执行树化方法,在 treeifyBin 中会首先判断当前数组长度 ≥64 时才会将链表转换为红黑树。
3.2.3. get
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// key 所在的 hash 位置int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 如果指定位置元素存在,头结点hash值相同if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))// key hash 值相等,key值相同,直接返回元素 valuereturn e.val;}else if (eh < 0)// 头结点hash值小于0,说明正在扩容或者是红黑树,find查找return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {// 是链表,遍历查找if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}
总结一下 get 过程:
- 根据 hash 值计算位置。
- 查找到指定位置,如果头节点就是要找的,直接返回它的 value.
- 如果头节点 hash 值小于 0 ,说明正在扩容或者是红黑树,查找之。
- 如果是链表,遍历查找之。