HashMap
一、基础常量以及结构
//数组默认初始容量static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16//数组容量最大值static final int MAXIMUM_CAPACITY = 1 << 30;//默认扩容因子static final float DEFAULT_LOAD_FACTOR = 0.75f;//链表长度阈值 树化条件static final int TREEIFY_THRESHOLD = 8;//树中只有6个或一下转化成链表static final int UNTREEIFY_THRESHOLD = 6;//树化的条件之一 数组长度需要达到的值static final int MIN_TREEIFY_CAPACITY = 64;// 默认的数组transient Node<K,V>[] table;//判断是否扩容的大小 因子*容量int threshold;//扩容因子final float loadFactor;
都应该知道HashMap 的结构是数组+链表,链表会在一定条件下树化变成红黑树(本节我们只追究常规操作,不深究红黑树这种数据结构),结构如下图所示
二、构造方法
public HashMap(int initialCapacity, float loadFactor) {if (initialCapacity < 0)throw new IllegalArgumentException("Illegal initial capacity: " +initialCapacity);if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;if (loadFactor <= 0 || Float.isNaN(loadFactor))throw new IllegalArgumentException("Illegal load factor: " +loadFactor);//扩容因子this.loadFactor = loadFactor;//扩容阈值this.threshold = tableSizeFor(initialCapacity);}//实现了把一个数变为最接近的2的n次方 比如:7变成8 10变成16static final int tableSizeFor(int cap) {int n = cap - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;}
我们可以看到初始化的时候传的参数是初始容量大小和扩容因子,为什么在初始化的时候并没看看数组容量的初始化,反而把数组容量赋值给了扩容阈值,阈值还没*扩容因子?为什么数组容量的初始化要做2的n次方处理?
问题一:数组容量什么时候初始化的?扩容阈值什么时候计算的?
在对象第一次Put操作的时候初始化的,有点类似懒加载的方式,只有在用时候我才申请空间资源 下面是取证,我就直接截图了,具体过程后面会说(可以看到初始化的时候把阈值给了数组长度,而阈值重新计算了一次)
问题二:为什么数组容量要设置成2的n次方?
因为HashMap是根据Key的Hash去确定在数组中的具体下标,HashMap为了减少数据碰撞(下标冲突),就需要使得数据分布更均匀,那就是取模算法 hashcode%length(数组长度),计算机直接求余数不如位运算效率高,所以源码中做了优化,使用hashcode&(length-1),hashcode%length等于hashcode&(length-1)的前提是length是2的n次幂。
三、put操作
其实我们在了解HashMap的结构后,已经猜得到添加操作需要做什么了
- 数组为空的时候,扩容初始化(正如我们上面说的一样)
- 根据要插入key的hash算出在数组中的下标(hashcode&(length-1)),看看是否已经存在,不存在则新建节点直接放入
- 如果已经存在则判断先头节点的hash、key与要插入的key、hash是否一致,一致则替换
- 不一致致需要判断这个节点是树结构还是链表结构
- 树结结构则查找树 存在则替换,不存在给树新添一个树节点
- 链表结构则遍历链 存在则替换,不存在则在尾部新添一个节点,并判断链表的长度是否达到树化条件
- 以上需要替换的节点都会被取出,新值替换旧值,并返回旧值
- 到了这说明是新添加了一个节点,需要把容量+1,并判断是否达到扩容标准,达到了还要扩容操作
源码如下:
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {Node<K,V>[] tab; //创建数组Node<K,V> p; //创建新节点int n, i;//数组为空 还没初始化的话,就扩容操作 初始化一下if ((tab = table) == null || (n = tab.length) == 0) {//对数组进行初始化n = (tab = resize()).length;}//(n - 1) & hash 求数组的下标,然后从数组中取节点,判断是否已经存在if ((p = tab[i = (n - 1) & hash]) == null) {// 原来数组中不存在就新建一个节点放入tab[i] = newNode(hash, key, value, null);}else {// 用来获取已经存在的节点Node<K,V> e;K k;// hash值和key都一致则进行替换if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) {e = p;}else if (p instanceof TreeNode) {//存储的节点的key的不存在,判断是否为树节点(是不是已经转化为红黑树)//如果已经是树了,那就进行树的操作e = ((TreeNode<K, V>) p).putTreeVal(this, tab, hash, key, value);}else {//头节点不同,也不是树结构,说明是链表 需要遍历链表比较for (int binCount = 0; ; ++binCount) {if ((e = p.next) == null) {//直接找到链表的尾部,直接插入p.next = newNode(hash, key, value, null);//判断链表的长度是否大于可以转化为树结构的阈值if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st//转化成树treeifyBin(tab, hash);break;}if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) {break;}p = e;}}// 存在映射的key,覆盖原值,将值返回if (e != null) {V oldValue = e.value;if (!onlyIfAbsent || oldValue == null) {e.value = value;}afterNodeAccess(e);return oldValue;}}++modCount;//hashmap的容量大于阈值if (++size > threshold) {// 扩容resize();}afterNodeInsertion(evict);return null;}
四、resize 扩容机制
HashMap的扩容与正常的数组扩容没啥区别都是要新建一个扩容后的数组再讲数据填进去:
- 以前的数组长度>0 则将长度翻一倍,阈值也翻一倍
- 如果以前数组长度≤0,阈值>0 ,则将旧的阈值给新的长度(初始化数组的情况)
- 如果以前数组长度、阈值都≤0,则新的数组将设置为默认长度以及阈值
- 上述都设置完成如果新的阈值还为0,则根据新的长度*扩容因子重新设置
- 然后就要遍历旧的数组,将旧数组数据转移到新的数组上面(转移时数据需要重新计算下标)
- 旧数据里面的节点数据如果下一个指向为空,说明只有一个节点,直接转移过去即可
- 不为空则可能是树结构或者是链表,两者都要遍历切割为高低位节点(树结构切割后还需判断长度是否≤6,满足则重新转为链表结构),然后再赋值过去(下面说为什么要切割)
final Node<K,V>[] resize() {Node<K,V>[] oldTab = table; // 旧的数组int oldCap = (oldTab == null) ? 0 : oldTab.length; //旧的数组长度int oldThr = threshold; // 旧的阈值int newCap, newThr = 0;if (oldCap > 0) {// 旧的数组长度>0的情况下// 如果原本的长度就已经是最大值了 就继续返回旧的数组,并将阈值也设置成最大值if (oldCap >= MAXIMUM_CAPACITY) {threshold = Integer.MAX_VALUE;return oldTab;}else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY) {// 长度翻一倍且符合条件的情况下 阈值也翻一倍newThr = oldThr << 1; // double threshold}}else if (oldThr > 0) { // initial capacity was placed in threshold// 旧的长度<=0 且 旧的阈值 >0 就把新的长度设置成旧的阈值newCap = oldThr;}else { // zero initial threshold signifies using defaults// 如果旧的长度、阈值都为0 就重新设置新的长度和阈值为默认值newCap = DEFAULT_INITIAL_CAPACITY;newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);}// 一系列操作后 新的阈值为0 就重新根据新的长度设置阈值if (newThr == 0) {float ft = (float)newCap * loadFactor; //新的数组长度*扩容因子newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?(int)ft : Integer.MAX_VALUE);}threshold = newThr; // 扩容因子赋值替换//根据新的长度初始化新的数组@SuppressWarnings({"rawtypes","unchecked"})Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];table = newTab; //数组替换if (oldTab != null) {// 遍历旧的节点数组for (int j = 0; j < oldCap; ++j) {Node<K,V> e;if ((e = oldTab[j]) != null) {oldTab[j] = null;if (e.next == null) {// 节点下一个指向为空说明只有一个值 直接赋值即可newTab[e.hash & (newCap - 1)] = e;}else if (e instanceof TreeNode) {// 说明节点已经变成树了 进行树的拆分((TreeNode<K, V>) e).split(this, newTab, j, oldCap);}else {// 说明节点是链表//低位链表:存放在扩容之后的数组的下标位置,与当前数组下标位置一致//loHead:低位链表头节点//loTail低位链表尾节点Node<K,V> loHead = null, loTail = null;//高位链表,存放扩容之后的数组的下标位置,=原索引+扩容之前数组容量//hiHead:高位链表头节点//hiTail:高位链表尾节点Node<K,V> hiHead = null, hiTail = null;Node<K,V> next;do {next = e.next;//高位为0,放在低位链表if ((e.hash & oldCap) == 0) {if (loTail == null)loHead = e;elseloTail.next = e;loTail = e;}//高位为1,放在高位链表else {if (hiTail == null)hiHead = e;elsehiTail.next = e;hiTail = e;}} while ((e = next) != null);//低位链表已成,将头节点loHead指向在原位if (loTail != null) {loTail.next = null;newTab[j] = loHead;}//高位链表已成,将头节点指向新索引if (hiTail != null) {hiTail.next = null;newTab[j + oldCap] = hiHead;}}}}}return newTab;}
问题一:为什么要切割分为高低位节点,直接转过去不好吗?
因为数组长度越小发生数组下标冲突的几率越大,所以对于已经冲突形成链表或树结构的下标数据需要重新根据新的数组长度计算数组下标值,再转移到新数组;(这也是为什么数组长度≥64的时候才可以树化,这之前都是选择扩容)
如下图所示:
两个数据在扩容前是下标冲突的,下标都是5,在扩容后:高位为1的数据,下标已经变成了21,正好是之前的下标+原数组长度,这也是为什么上述源代码里面高位链表在放到新数组里面的时候下标会加上数组长度的原因;这时候有人疑惑,你这是(hash&length-1)啊,源代码里面可是(hash&length),细心的人已经发现了两者是一样的道理,前者是计算下标,后者是判断是否高低位,下图只是为了更好的理解
这也是把长度设计成2的n次方的牛逼之处
五、treeifyBin树化
可以看到只有当数组长度≥64才会树化,没满足的情况下是扩容操作
树化的时候可以看到树化前是将节点转化成了树节点,而且树节点同时还是一个双向链表
真正的树化操作是treeify()方法(不深究,有兴趣的可以研究下红黑树结构)
源代码如下:
final void treeifyBin(Node<K,V>[] tab, int hash) {int n, index; Node<K,V> e;if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)resize();else if ((e = tab[index = (n - 1) & hash]) != null) {TreeNode<K,V> hd = null, tl = null;do {//双向链表TreeNode<K,V> p = replacementTreeNode(e, null);if (tl == null)hd = p;else {p.prev = tl;tl.next = p;}tl = p;} while ((e = e.next) != null);if ((tab[index] = hd) != null)//真正的树化操作hd.treeify(tab);}}
六、get操作
这个思路是一样的看着HashMap的结构就能猜到获取思路了:
- 判断数组是否为空,节点是否存在
- 对比节点的hash与key值是否一致,一致则返回
- 判断节点下一个指向是否为空,不为空则要判断是否节点是树结构
- 树结构则获取树节点,链表则遍历获取节点
源代码如下:
public V get(Object key) {Node<K,V> e;return (e = getNode(hash(key), key)) == null ? null : e.value;
}
final Node<K,V> getNode(int hash, Object key) {Node<K,V>[] tab; Node<K,V> first, e; int n; K k;//判断数组是否为空、节点是否存在if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) {//头结点相同直接返回if (first.hash == hash && ((k = first.key) == key || (key != null && key.equals(k)))) {return first;}if ((e = first.next) != null) {//头结点不同 且 下一个指向不为空 if (first instanceof TreeNode) {// 树结构就获取树节点return ((TreeNode<K, V>) first).getTreeNode(hash, key);}//链表结构则遍历链表do {if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) {return e;}} while ((e = e.next) != null);}}return null;}
七、contains操作
containsKey
实际是get操作,get后判断是否为空 **存在返回true 不存在返回:false **
源代码如下:
public boolean containsKey(Object key) {return getNode(hash(key), key) != null;
}
containsValue
遍历数组同时遍历节点,存在相等的value 返回true 不存在返回 false
为什么节点的遍历不需要考虑树节点?
上面说过了树节点其实也是双向链表
源代码如下:
public boolean containsValue(Object value) {Node<K,V>[] tab; V v;if ((tab = table) != null && size > 0) {//遍历数组for (int i = 0; i < tab.length; ++i) {//遍历节点 for (Node<K,V> e = tab[i]; e != null; e = e.next) {if ((v = e.value) == value ||(value != null && value.equals(v)))return true;}}}return false;}
HashTable
一、基础常量以及结构
// 数组容器private transient Entry<?,?>[] table;// 容器容量private transient int count;// 扩容阈值private int threshold;// 扩容因子private float loadFactor;private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
和HashMap不同,HashTable只有数组+链表,结构如下:
二、构造方法
public Hashtable() {this(11, 0.75f);
}public Hashtable(int initialCapacity, float loadFactor) {if (initialCapacity < 0)throw new IllegalArgumentException("Illegal Capacity: "+initialCapacity);if (loadFactor <= 0 || Float.isNaN(loadFactor))throw new IllegalArgumentException("Illegal Load: "+loadFactor);if (initialCapacity==0)initialCapacity = 1;this.loadFactor = loadFactor;table = new Entry<?,?>[initialCapacity];threshold = (int)Math.min(initialCapacity * loadFactor, MAX_ARRAY_SIZE + 1);
}
可以看到初始化的时候就已经确定了容量大小和扩容大小,默认大小是11,默认扩容因子是0.75
三、put操作
- 根据hash获取数组下标
- 节点已经存在则遍历链表,如有相同的值则替换value ,否则 添加新节点
这个跟HashMap差不多的思路,就是没了树结构的操作,有两个特点就是不允许value为null,而且方法用synchronized 所修饰
源代码如下:
public synchronized V put(K key, V value) {if (value == null) {throw new NullPointerException();}Entry<?,?> tab[] = table;int hash = key.hashCode();// 获取hash获取数组下标 (数组长度取余)int index = (hash & 0x7FFFFFFF) % tab.length;Entry<K,V> entry = (Entry<K,V>)tab[index];// 节点不为空则遍历链表 已存在则替换value值 for(; entry != null ; entry = entry.next) {if ((entry.hash == hash) && entry.key.equals(key)) {V old = entry.value;entry.value = value;return old;}}// 不存在则添加addEntry(hash, key, value, index);return null;
}private void addEntry(int hash, K key, V value, int index) {modCount++;Entry<?,?> tab[] = table;// 添加前先判断是否需要扩容if (count >= threshold) {// 扩容rehash();// 获取扩容后数组tab = table;hash = key.hashCode();// 扩容后重新计算需要添加节点的数组下标index = (hash & 0x7FFFFFFF) % tab.length;}// 创建节点Entry<K,V> e = (Entry<K,V>) tab[index];tab[index] = new Entry<>(hash, key, value, e);count++;}
四、rehash()扩容操作
HashTable扩容很简单就是正常的数组扩容:
- 新建一个数组,容量为之前的2倍+1
- 重新计算扩容阈值
- 遍历老的数组,重新计算每个节点的数组下标并放入新的数组
源代码如下:
protected void rehash() {int oldCapacity = table.length;Entry<?,?>[] oldMap = table;// 翻倍且+1int newCapacity = (oldCapacity << 1) + 1;if (newCapacity - MAX_ARRAY_SIZE > 0) {if (oldCapacity == MAX_ARRAY_SIZE)// Keep running with MAX_ARRAY_SIZE bucketsreturn;newCapacity = MAX_ARRAY_SIZE;}Entry<?,?>[] newMap = new Entry<?,?>[newCapacity];modCount++;// 重新计算扩容阈值threshold = (int)Math.min(newCapacity * loadFactor, MAX_ARRAY_SIZE + 1);table = newMap;// 遍历转移数据for (int i = oldCapacity ; i-- > 0 ;) {for (Entry<K,V> old = (Entry<K,V>)oldMap[i] ; old != null ; ) {Entry<K,V> e = old;old = old.next;// 重新计算每个节点数组下标int index = (e.hash & 0x7FFFFFFF) % newCapacity;e.next = (Entry<K,V>)newMap[index];newMap[index] = e;}}}
五、get操作
根据hash取下标,遍历对比取值,主要是被synchronized修饰
源代码如下:
public synchronized V get(Object key) {Entry<?,?> tab[] = table;int hash = key.hashCode();int index = (hash & 0x7FFFFFFF) % tab.length;for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {if ((e.hash == hash) && e.key.equals(key)) {return (V)e.value;}}return null;}
六、contains 操作
containsValue
遍历数组内所有节点对比value值,被synchronized修饰
源代码如下:
public boolean containsValue(Object value) {return contains(value);}public synchronized boolean contains(Object value) {if (value == null) {throw new NullPointerException();}Entry<?,?> tab[] = table;for (int i = tab.length ; i-- > 0 ;) {for (Entry<?,?> e = tab[i] ; e != null ; e = e.next) {if (e.value.equals(value)) {return true;}}}return false;}
containsKey
遍历数组内所有节点对比key值,被synchronized修饰
源代码如下:
public synchronized boolean containsKey(Object key) {Entry<?,?> tab[] = table;int hash = key.hashCode();int index = (hash & 0x7FFFFFFF) % tab.length;for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {if ((e.hash == hash) && e.key.equals(key)) {return true;}}return false;}
总结
相比于HashMap,HashTable最大的区别就是线程安全,操作方法都被synchronized关键字所修饰,而且舍弃了树结构的优化,在容量方面也不在是2的n次方了
ConcurrentHashmap
一、基本常量和结构
/** 节点的hash值,这里有三种特殊的,正常的>0 */
static final int MOVED = -1; // 表示该节点槽位正在扩容中
static final int TREEBIN = -2; // 表示该节点是树节点
static final int RESERVED = -3; // hash for transient reservations// 默认容量大小
private static final int DEFAULT_CAPACITY = 16;
//默认扩容因子
private static final float LOAD_FACTOR = 0.75f;
//链表长度阈值 树化条件
static final int TREEIFY_THRESHOLD = 8;
//树中只有6个或一下转化成链表
static final int UNTREEIFY_THRESHOLD = 6;
//树化的条件之一 数组长度需要达到的值
static final int MIN_TREEIFY_CAPACITY = 64;
// 默认的容器数组
transient volatile Node<K,V>[] table;
// 辅助扩容时使用的数组
private transient volatile Node<K,V>[] nextTable;
//元素计数器
private transient volatile long baseCount;//表初始化和大小调整控件 有4种情况
//1.sizeCtl为0,代表数组未初始化, 且数组的初始容量为16
//2.sizeCtl为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值
//3.sizeCtl为-1,表示数组正在进行初始化
//4.sizeCtl小于0,并且不是-1,表示数组正在扩容
private transient volatile int sizeCtl;// 扩容时使用 需要转移槽位的索引
private transient volatile int transferIndex;// 在计算元素个数时,防并发的锁(CAS ),跟下面那个东东配合
private transient volatile int cellsBusy;
// 计算元素个数时使用(防止并发,并发时每个线程都会把当前操作的槽位节点数放入里面最后累计)
// 配合baseCount 使用
private transient volatile CounterCell[] counterCells;
大部分都是常规的常量,但是要记住sizeCtl和节点的特殊hash值,这两者在下面的操作里面扮演着关键角色,从结构上来看基本和HashMap一致 数组+链表/红黑树,如下:
因ConcurrentHashmap的操作思路基本与HashMap一致,所以建议先看看HashMap!
二、构造方法
这里只列举三个常用的构造,不过也基本是全部了(容量计算就不说了)
- **无参构造:*啥也没有,注意此时*sizeCtl 默认为0
- 初始化容量大小的构造:sizeCtl为计算后的容量,注意是扩大1.5倍后计算的
- 完整的带参构造:同样的sizeCtl为计算后的容量
这时候就会有个疑问,不管怎么初始化就算了个容量?扩容因子、扩容阈值啥都没有,难道和HashMap一样在第一次添加操作的时候,在初始化数组里面完成的?那就直接去看数组的初始化!
源代码如下:
//无参构造
public ConcurrentHashMap() {}
//初始化容量大小的构造
public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0) {throw new IllegalArgumentException();}int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :// 把传参扩大了1.5倍后计算容量(变成最近的2的n次方数)tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));//sizeCtl=容量this.sizeCtl = cap;
}
//完整的带参构造
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);//sizeCtl=容量this.sizeCtl = cap;
}
//计算容量的方法 往上找到最近的2的n次方数 比如:7变成8 10变成16
private static final int tableSizeFor(int c) {int n = c - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
三、initTable(容器初始化)
以CAS + 自旋的方式保证初始化的线程安全:
- sizeCtl<0 就代表有其他线程正在扩容或者初始化,所以让出CPU ,让其他线程先上
- 尝试用CAS 把sizeCtl换成-1,失败就继续自旋
- 成功了就初始化容器与扩容阈值,有意思的是扩容阈值的计算(容量-1/4),这也就意味着在构造方法里面指定的扩容因子是不生效的,始终是0.75
此时sizeCtl为扩容阈值!
源代码如下:
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;// 自旋 只要容器数组为空 就不断循环while ((tab = table) == null || tab.length == 0) {//sizeCtl,代表着初始化资源或者扩容资源的锁,必须要获取到该锁才允许进行初始化或者扩容的操作if ((sc = sizeCtl) < 0)//放弃当前cpu的使用权,让出时间片,线程计入就绪状态参与竞争Thread.yield(); //CAS 比较并尝试将sizeCtl替换成-1,如果失败则继续循环 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {//进行一次double check 防止在进入分支前,容器发生了变更if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")//初始化容器Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;//容量-容量/4 == 容量*3/4 == 扩容阈值(扩容因子0.75)sc = n - (n >>> 2);}} finally {// 此时sizeCtl为扩容阈值sizeCtl = sc;}break;}}return tab;}
四、put操作
整体逻辑和HashMap差不多:1.计算hash 2.是否初始化数组 3.是否直接插入 4. 是否插入链表/红黑树
但是加入了线程安全的操作保障(CAS+自旋+synchronized,数组操作全是内存的偏移量 ):
- 计算hash值后,直接死循环(自旋)
- 判断容器数组是否为空,空则初始化容器数组
- 根据hash计算数组下标【(length-1)&hash】,再结合偏移量从数组中取值
- 值为空,说明槽位还没节点,所以可以直接放入该下标对应的槽位(CAS+自旋放入)
- 值不为空,说明槽位已经被占了(下标冲突了),给该槽位加锁(槽位第一个节点),判断链表和树插入
- 链表的话就直接尾部插入,树的话就树节点的方式插入
- 完事还要判断链条长度是否需要树化
- 最后对比容器元素个数是否达到扩容阈值,是否需要扩容
源代码如下:
final V putVal(K key, V value, boolean onlyIfAbsent) {// 不允许为nullif (key == null || value == null) throw new NullPointerException();// 计算hash值(不深究)int hash = spread(key.hashCode());int binCount = 0;// 自旋 因为下面有CAS操作for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 数组为空 长度为0 就要初始化数组if (tab == null || (n = tab.length) == 0)// 初始化数组(上面说过了)tab = initTable();// 计算下标 并获取数组中的节点(tabAt就是利用偏移量*下标来获取数组里面的值)else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 下标所在的值为null说明槽位为空 所以就可以把值放进去// 创建新的节点 利用CAS的方式 放入数组 (casTabAt就是CAS内存的偏移量)// 放入成功就结束了,CAS失败就会自旋if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))break; }else if ((fh = f.hash) == MOVED)// 说明有其他线程正在扩容中 所以我们去协助扩容(之后说)tab = helpTransfer(tab, f);else {// 到这说明下标冲突了 所以要判断插入链表或者插入树V oldVal = null;// 给头节点上锁synchronized (f) {// CAS 再确认一次头节点有没有变if (tabAt(tab, i) == f) {// 节点的hash >0 说明是正常要插入链表里面(树节点的hash是-2)if (fh >= 0) {binCount = 1;// 遍历链表 把新节点插入到尾部// 这里跟hashMap一样 因为前面已经上锁了 所以是安全的for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key, value, null);break;}}}else if (f instanceof TreeBin) {// 这里就是树结构了 所以要插入树节点(看过HashMap的非常熟悉吧)Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 到这就插入完成啦 所以要判断链表上节点个数是否需要树化if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)// 链表树化treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 最后要判断容量是否到达扩容阈值 是否需要扩容// 这里和HashMap不一样 我们后面单独提出来说addCount(1L, binCount);return null;}
addCount
一个a++ 的操作搞这么麻烦干嘛?用原子类计数或者加把锁不就搞定了?😭😭😭😭😭😭😭😭好了,我已经帮你们吐槽一次了
为什么不这样做呢?原子类是采用CAS+自旋保证的计数安全,但是当竞争激烈的时候,会导致多线程频繁自旋阻塞,加锁那更加不用说了,所以呀我们来学习学习大佬是怎么做的?
📌首先请出两个主角counterCells数组和baseCount计数器,baseCount就是正常的计数器,采用CAS的方式+1,如果已经存在并发竞争关系了,那就会把值放入counterCells数组中,数组长度刚开始为2,后续扩容为2倍扩容,下标方式计算为【 线程hash&(length-1】,放入的时候值已经存在了就采用CAS使其+1,CAS失败了那就扩容并修改线程的Hash 重新放入一次,最后的size就是baseCount+counterCells数组内的所有数
该过程的伪思想图(不是addCount()流程图)如下:
一定要搞清楚上述的思想哈,直接看代码是很难懂的,搞懂上面的思想后,我们再代入到代码看:
- CounterCell[] 数组不为空就代表已经存在竞争了,CAS baseCount+1失败也代表有并发了
- ThreadLocalRandom.getProbe() 就是当前线程的hash ,根据线程hash计算下标没取到值就进入fullAddCount()方法具体操作了
- 根据线程hash取到值但是CAS失败了,存在并发也要进去fullAddCount()方法
- fullAddCount()里面最核心的在上面图里面已经说了,就不再一一过了
- 上面完事了就计算一次总计s =(baseCount+counterCells数组内的所有数)
- 然后就拿着总计去对比此时的扩容阈值sizeCtl,≥就要扩容了嘛,所以自旋直至扩容结束
- 扩容又是个蛋疼的操作,这里先记着要扩容,扩容搞懂了,自然就懂了这里的处理是干嘛的了
源代码如下:
// x就是1 ,check 就是容器数组槽位下的节点数
private final void addCount(long x, int check) {CounterCell[] as; long b, s;// CounterCell[] 数组不为空(已经存在竞争) 或者 baseCount总计累加失败// 说明之前已经存在并发的情况了if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;//标识是否有多线程竞争 true表示无并发 下面CAS失败了就是false 有并发boolean uncontended = true;//当CounterCell[] 数组为空 || 长度为0//或者当前线程对应的CounterCell[] 槽位的元素为空(为空我肯定要把值放进去嘛)//或者当前线程对应的CounterCell[] 槽位的元素不为空,但是CAS累加失败(有并发)if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// 经过上面判断说明有并发,所以在这里面处理存在并发情况的的值(不多说了)// 这个就是有关放入CounterCell[] 数组的流程操作,核心的其实上面的图里面已经写了fullAddCount(x, uncontended);return;}if (check <= 1)return;// 计算一次容器数组内元素个数总计 (baseCount+counterCells数组内的所有数)s = sumCount();}// 到了这说明不管有没有并发 元素总计也已经算好啦 此时 s变量就是总计数if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// 总计数(s)>=扩容阈值(sizeCtl) 且 容器数组不为空 (说明要扩容啦)// 满足条件循环 (自旋) 直至扩容结束while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {// 获取一个很大的正数 int rs = resizeStamp(n);// 注意此时 sc 就是 sizeCtl,<0说明已经有其他线程正在扩容中了if (sc < 0) {//扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0){break;}//上面判断完到这里说明还没扩容完,把 sizeCtl +1 代表多了一个线程协助扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))// CAS 加入成功了,我们就去协助扩容transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))// 注意此时sizeCtl 已经被CAS替换成了一个负数(不为-1)// 扩容操作(下面说)transfer(tab, null);// 计算一次容器数组内元素个数总计 (baseCount+counterCells数组内的所有数)s = sumCount();}}
}
五、transfer 扩容操作
虽然扩容有点绕,但是对数据的处理思想和HashMap差不多,都需要对链表/树节点重新计算hash然后再放入新数组,但由于是线程安全又要考虑性能,所以加了一个分槽位转移的操作,老规矩先上图理解一下再看代码:
等于是把一个数组分成了几份,每个线程都处理一份(实际不是均分),如下假设每个线程都处理4个槽位,每处理一个槽位就把槽位标记成fwd 代表已处理,槽位如果有数据,就需要对里面的数据重新根据新数组长度计算一下下标值,然后放入新数组; 注意:在实际代码中,是每进入一次transfer扩容方法就分配一次处理的任务,外部死循环直至槽位全部处理完,所以如果是单线程处理,会进入多次transfer扩容方法
伪思想如下图:
理解完上面的内容后,就可以代入看下面的代码了:
- 根据CPU核心数分配每个线程进来需要处理的槽位数量:stride
- 第一个进来扩容的线程初始化一些参数:新数组(nextTable)、转移的总槽位数量(transferIndex)
- 死循环 也就是自旋,因为下面有CAS操作
- 内循环,为了给线程分配槽位以及线程遍历处理刚分配的槽位
- 每处理一个槽位需要把该槽位标记为ForwardingNode特殊节点,表示已处理
- 处理有数据的槽位时,需要把这些数据根据新数组重新计算一遍下标然后放入新数组中
- 每个线程处理完自己的任务会把sizeCtl-1,当该值与刚进入扩容的时候一致则代表扩容完成(因为进入时会+1)
- finishing为true代表扩容完成,然后把新数组赋值给容器数组,并设置新的扩容阈值sizeCtl
- 最后transferIndex为0,nextTable 为null ,外部进入transfer方法的死循环也会结束,完成扩容!
搞懂这里之后再回过头看看addCount后面的扩容判断操作是不是就一些豁然开朗了
(跳过了链表/树 的数据处理,可以看看HashMap,主要是理解高低位赋值这个思想)
源代码如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;//每个线程处理槽位的最小数目,可以看出核数越高步长越小,最小16个if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 第一个进来的扩容的线程nextTab肯定为空if (nextTab == null) { try {// 2倍扩容 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}// 扩容后的新数组nextTable = nextTab;// 需要转移的槽位总数量transferIndex = n;}int nextn = nextTab.length;//扩容时的特殊节点,标明此节点正在进行迁移ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;//所有的槽位是否都已迁移完成。boolean finishing = false; // 死循环 自旋 (这个i 就是待转移数组的下标索引)// 整个转移过程是从数组的尾部到头部for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;//此循环的作用是确定当前线程要迁移的桶的范围或通过更新i的值确定当前范围内下一个要处理的节点。while (advance) {int nextIndex, nextBound;// 这一步就是为了更新 数组下标索引 iif (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {// 表示所有槽位都已经处理完了i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {// 分配每个线程需要处理的槽位数 从头分配到尾部bound = nextBound;// 设置待转移数组的下标索引i = nextIndex - 1;advance = false;}}// 当前线程自己的活已经做完或所有线程的活都已做完if (i < 0 || i >= n || i + n >= nextn) {int sc;// 扩容结束标识if (finishing) {// 扩容结束后把扩容数组置为nullnextTable = null;// 把扩容后的数组给容器数组table = nextTab;// 设置新的扩容阈值,新容量的0.75sizeCtl = (n << 1) - (n >>> 1);return;}// 每有一个线程做完活 就把sizeCtl-1 (因为每有一个线程协助sizeCtl就会+1)if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 还记得刚扩容时sizeCtl 的值吗?// 这里就是判断sizeCtl是否与扩容前的值相等if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 不相等直接返回return;// 相等就代表扩容结束了 最后检查一遍finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)// CAS 把该索引处设置成ForwardingNode ,也就是hash是-1的代表扩容中// 因为扩容的同时,原数组还是可以put操作的,所以尽管此处为null 也要标记成fwd节点,表示已经处理了advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)// 节点hash为-1 代表已经处理过了advance = true; // already processedelse {// 到了这说明该槽位有数据要迁移了,所以先上个锁synchronized (f) {// 二次确认if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// 链表处理if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}//低位链表放在i处setTabAt(nextTab, i, ln);//高位链表放在i+n处setTabAt(nextTab, i + n, hn);//在原table中设置ForwardingNode节点以提示该槽位处理完成setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {// 树节点处理TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}//如果拆分后的树的节点数量已经少于6个就需要重新转化为链表ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;//低位放在i处setTabAt(nextTab, i, ln);//高位放在i+n处setTabAt(nextTab, i + n, hn);//在原table中设置ForwardingNode节点以提示该槽位处理完成setTabAt(tab, i, fwd);advance = true;}}}}}}
六、helpTransfer协助扩容
理解的上面的扩容后,就可以知道协助扩容其实最后调用的也是transfer()扩容方法,而且这一段和addCount()后面那一段是不是基本一致?都是进去扩容方法领取一部分槽位转移,然后自旋直至扩容结束
源代码如下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 节点是fwd 节点说明正在扩容 且 nextTable数组不为空if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 获取一个很大的正数int rs = resizeStamp(tab.length);//死循环自旋直至扩容结束while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {//扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;//上面判断完到这里说明还没扩容完,把 sizeCtl +1 代表多了一个线程协助扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 扩容transfer(tab, nextTab);break;}}return nextTab;}return table;
}
七、get操作
与其他操作相比,get操作可以说是最低调的了,并没有什么CAS或者加锁的操作,逻辑也基本很简单:
- 根据hash算出下标然后去数组查找,没找到就返回null
- 找到了,就对比key值,一致则返回
- 不一致判断hash是否<0,否则遍历链表找对应的值返回
- hash<0的节点,说明是树节点或者是fwd节点(扩容中的),树节点就调用树节点的find方法
- fwd节点就需要调用fwd节点的find的方法(下面贴出了fwd的find方法)
源代码如下:
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;//计算hash值int h = spread(key.hashCode());//根据hash值确定节点位置if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点 if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}//如果eh<0(hash <0) 说明这个节点在树上或者在扩容中并且转移到新数组了//所以这个find方法是树节点的find方法或者是fwd节点的find方法else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;//否则遍历链表 找到对应的值并返回while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}// 这是ForwardingNode节点的find方法
Node<K,V> find(int h, Object k) {// 注意这里的nextTable 是扩容时传进来的outer: for (Node<K,V>[] tab = nextTable;;) {Node<K,V> e; int n;// 没找到直接返回 nullif (k == null || tab == null || (n = tab.length) == 0 ||(e = tabAt(tab, (n - 1) & h)) == null)return null;// 自旋for (;;) {int eh; K ek;// 找到了就返回节点if ((eh = e.hash) == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;// 同样要判断是树节点还是ForwardingNode节点if (eh < 0) {//ForwardingNode节点就继续往里找if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;continue outer;}else// 树节点 就调用数节点的find方法return e.find(h, k);}// 没找到就返回nullif ((e = e.next) == null)return null;}}
}
总体逻辑呢和HashMap差不多,唯一的区别啊就是有可能数组正处于扩容中呢?在扩容中的数组别忘了槽位的数据转移完了就会变成ForwardingNode节点,所以我get也可能拿到fwd节点啊,怎么办呢?只能去转移后的数组里面取;
注意:转移后的数组不是全局变量nextTable,而是在扩容里面new ForwardingNode的时候传入了一个数组(别不信,截图为证,也可以回头好好看看扩容过程体会一下哈)
八、size 操作
前面在addCount()里面说过了,对一个数的累计都做了一个性能的优化,所以获取时也不像其他容器一样那么简单了,这里需要 用baseCount+counterCells数组内的所有数(搞懂addCount方法后也很简单对吧)
源代码如下:
public int size() {// 总和的计算long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {// 遍历CounterCell[]数组 把数全累加起来for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;
}
九、总结
好了核心操作基本都结束了,对了,还有个remove操作(这里就不贴了,理解完上面的在自己去看看吧,会发现哇原来这么简单),我觉得也没啥好总结的,综合来说使用了volatile+synchronized+CAS+自旋保证了线程安全,synchronized锁的细粒度+分槽位可协助扩容+ 计数器特殊处理 极大程度的保证了性能的提升
下面提几个问题,辅助大家巩固一下吧:
- 1.ConcurrentHashMap是怎么判断正在扩容中的?
- 2.扩容期间在未迁移到的槽位中插入数据会发生什么?
- 3.为什么get操作不需要加锁?
- 4.扩容过程中get操作受影响吗?怎么处理的?
- 5.ConcurrentHashMap在性能优化方面做了那些事?
- 6.扩容的时候同时发生了remove会有影响吗?怎么处理的?
- 7.数组变量都被volatile修饰了,按理说取值就是线程安全的,为什么在数组取值的时候还需要用内存偏移量呢?
ArrayList
一、基础常量以及结构
// 默认数组长度
private static final int DEFAULT_CAPACITY = 10;// 数组容器
transient Object[] elementData; // 长度设置成0的默认数组容器
private static final Object[] EMPTY_ELEMENTDATA = {};// 默认的数组
private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {};// 容器现有元素数量
private int size;
ArrayList用到的存储结构就是数组,我们看常量有两个空的数组常量,为什么会有两个?(下面再说)
二、构造方法
初始化很简单,没有什么扩容阈值,就初始化一个数组,值得注意的就是默认的初始化和参数为0的初始化不太一样,用了两个不一样的空数组,就是上面说的两个空数组常量,所以这里猜测目的就是为了区分指定初始化容量为0的容器和默认初始化的容器。(目的是什么?有什么用?下面说)
源代码如下:
public ArrayList(int initialCapacity) {if (initialCapacity > 0) {this.elementData = new Object[initialCapacity];} else if (initialCapacity == 0) {this.elementData = EMPTY_ELEMENTDATA;} else {throw new IllegalArgumentException("Illegal Capacity: "+ initialCapacity);}
}public ArrayList() {this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}
三、add操作
有两个add方法
不带索引的: 1.扩容判断 2.尾部赋值
带索引的:1.下标校验 2.扩容判断 3.数组数据后移 4.指定位置赋值
源代码如下:
// 默认添加 直接数组尾部添加
public boolean add(E e) {// 扩容判断ensureCapacityInternal(size + 1); // Increments modCount!!// 赋值elementData[size++] = e;return true;
}
// 指定下标位置添加
public void add(int index, E element) {// 下标校验rangeCheckForAdd(index);// 扩容判断ensureCapacityInternal(size + 1); // Increments modCount!!// 数组数据移位System.arraycopy(elementData, index, elementData, index + 1, size - index);// 指定位置赋值elementData[index] = element;size++;
}
private void rangeCheckForAdd(int index) {if (index > size || index < 0)throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}private void ensureCapacityInternal(int minCapacity) {// 当数组是无参构造方法生成的默认数组的时候,这里会给一个默认的数组大小 10if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);}ensureExplicitCapacity(minCapacity);
}private void ensureExplicitCapacity(int minCapacity) {modCount++;// 容量大于数组长度 就扩容if (minCapacity - elementData.length > 0)grow(minCapacity);
}
问题一:两个默认的常量空数组有什么作用?
用于区分指定0容量初始化和默认初始化两种情况,无参构造初始化的时候并没有初始化默认的数组长度为10,而是在第一次add操作的时候扩容里面判断赋值的,这里的判断就是通过两个数组的对比来实现的
指定0容量初始化:在第一次add操作后,数组长度为1
ArrayList<Object> objects = new ArrayList<>(0);
objects.add(1);
默认初始化:在第一次add操作后,数组长度为10
ArrayList<Object> objects = new ArrayList<>();
objects.add(1);
问题二:指定下标赋值的时候,数组数据是怎么移位的?
是利用最底层的**System.arraycopy()方法,常见的ArrayList.toArray()**方法也是利用的这个
源代码如下:
/*** @param src 源数组* @param srcPos 源数组起始位置* @param dest 目标数组* @param destPos 目标数组的起始位置* @param length 源数组要复制的长度
*/
public static native void arraycopy(Object src, int srcPos,Object dest, int destPos,int length);
这是个数组复制的方法,简单来说就是把源数组中的一段连续的数据截取出来,复制到目标数组的指定位置中,如下面例子:
Integer[] arr1 = { 1, 2, 3, 4, 5 };
Integer[] arr2 = new Integer[5];
System.arraycopy(arr1, 2, arr2, 3, 2);
System.out.println(Arrays.toString(arr2));
// 结果输出 arr2为:
[null, null, null, 3, 4]
从源数组中找出从下标2开始连续两个数,复制到目标数组的下标3处,ArrayList里面同理从原下标开始往后所有数据都复制,再赋值到数组原下标+1的位置,就完成了整体数据往后移一位的效果了
图示只是为了更好理解,不是原理图:
四、grow()扩容操作
这里需要注意的就是每次扩容都是原来的1.5倍,会创建一个新的数组,然后再利用System.arraycopy() 将原数据复制过去
源代码如下:
private void grow(int minCapacity) {// 旧数组长度int oldCapacity = elementData.length;// 扩容1.5倍 int newCapacity = oldCapacity + (oldCapacity >> 1);if (newCapacity - minCapacity < 0)newCapacity = minCapacity;if (newCapacity - MAX_ARRAY_SIZE > 0)newCapacity = hugeCapacity(minCapacity);// 扩容需要将老数据转移到新数组上 原理上面说过了 elementData = Arrays.copyOf(elementData, newCapacity);
}
// 容量饱和策略 就是容量达到最大值的处理
private static int hugeCapacity(int minCapacity) {if (minCapacity < 0) // overflowthrow new OutOfMemoryError();return (minCapacity > MAX_ARRAY_SIZE) ?Integer.MAX_VALUE :MAX_ARRAY_SIZE;
}
五、get 操作
这个不多说了,数组取值 简单的一批
源代码如下:
public E get(int index) {// 下标校验 不能超过现有容量rangeCheck(index);// 根据下标数组取值return elementData(index);
}
E elementData(int index) {return (E) elementData[index];
}
六、contains操作
遍历对比元素,以后要有人问怎么优化,二分查找法丢过去
源代码如下:
public boolean contains(Object o) {// 下标存在即 true return indexOf(o) >= 0;
}//遍历取下标
public int indexOf(Object o) {if (o == null) {for (int i = 0; i < size; i++)if (elementData[i]==null)return i;} else {for (int i = 0; i < size; i++)if (o.equals(elementData[i]))return i;}return -1;
}
七、remove操作
利用System.arraycopy(),最后把尾部元素置null
源代码如下:
public E remove(int index) {rangeCheck(index);modCount++;E oldValue = elementData(index);int numMoved = size - index - 1;if (numMoved > 0)System.arraycopy(elementData, index+1, elementData, index,numMoved);elementData[--size] = null; // clear to let GC do its workreturn oldValue;}
LinkedList
一、基本结构
// 元素的个数 (链表长度)
transient int size = 0;// 头结点
transient Node<E> first;// 尾结点
transient Node<E> last;//节点类 定义如下
private static class Node<E> {E item;Node<E> next;Node<E> prev;Node(Node<E> prev, E element, Node<E> next) {this.item = element;this.next = next;this.prev = prev;}
}
从上面源代码可知,底层结构是链表,而且是一个双向链表,这也就意味着不会再有容量的限制,没了扩容操作,但自身不仅仅实现了List相关操作还实现了Deque相关操作,我们以List操作为主
二、add操作
采用尾插法,尾部为空时说明链表为空,新节点会同时成为尾节点和头节点,尾部不为空,就把尾结点指向到新节点,最后链表长度+1
源代码如下:
public boolean add(E e) {linkLast(e);return true;
}
void linkLast(E e) {final Node<E> l = last;// 新建一个节点final Node<E> newNode = new Node<>(l, e, null);// 尾部插入 新节点直接成为尾部节点last = newNode;// 判断之前链表尾部是否为空if (l == null)// 为空说明链表内还没节点 新节点直接作为头节点first = newNode;else// 不为空说明链表已经有节点了 把原尾部节点指向到下一个新节点l.next = newNode;// 元素个数+1(链表长度+1)size++;modCount++;
}
因为是list所以同样可以直接指定位置插入元素,操作也很简单,先找到原来的节点,然后把新节点放进去,改变原节点的上一个指向,总得来说原来的节点下移
源代码如下:
public void add(int index, E element) {// 索引校验 不能<0 或者 >链表长度checkPositionIndex(index);// 判断是否在尾部插入 if (index == size)// 同上尾插法linkLast(element);elselinkBefore(element, node(index));
}
// 找到索引原有位置的节点
Node<E> node(int index) {// 判断索引位置是在链表上半部还是在下半部if (index < (size >> 1)) {// 上半部从头节点开始遍历Node<E> x = first;for (int i = 0; i < index; i++)x = x.next;return x;} else {// 下半部从尾节点开始遍历Node<E> x = last;for (int i = size - 1; i > index; i--)x = x.prev;return x;}
}
void linkBefore(E e, Node<E> succ) {// 获取原节点的上一个节点final Node<E> pred = succ.prev;// 新建一个节点 原节点下移final Node<E> newNode = new Node<>(pred, e, succ);succ.prev = newNode;if (pred == null)first = newNode;elsepred.next = newNode;size++;modCount++;
}
三、get操作
就是上面说的分半然后遍历
源代码如下:
public E get(int index) {checkElementIndex(index);return node(index).item;
}
// 找到索引原有位置的节点
Node<E> node(int index) {// 判断索引位置是在链表上半部还是在下半部if (index < (size >> 1)) {// 上半部从头节点开始遍历Node<E> x = first;for (int i = 0; i < index; i++)x = x.next;return x;} else {// 下半部从尾节点开始遍历Node<E> x = last;for (int i = size - 1; i > index; i--)x = x.prev;return x;}
}
四、set操作
找到节点,然后把节点内部的值替换
源代码如下:
public E set(int index, E element) {checkElementIndex(index);//找到索引对应的节点Node<E> x = node(index);E oldVal = x.item;//把节点对应的值替换x.item = element;return oldVal;
}
五、contains操作
因为是允许存在null值的,所以遍历的时候要两种情况遍历,一种是遍历null值,一种是对比值,遍历找值对应节点的索引,没找到就返回-1 即false不存在
源代码如下:
public boolean contains(Object o) {return indexOf(o) != -1;
}
public int indexOf(Object o) {int index = 0;if (o == null) {//遍历链表 找null值for (Node<E> x = first; x != null; x = x.next) {if (x.item == null)return index;index++;}} else {//遍历链表 for (Node<E> x = first; x != null; x = x.next) {if (o.equals(x.item))return index;index++;}}return -1;
}
六、remove操作
分半查找到元素,然后修改指向
源代码如下:
public boolean remove(Object o) {if (o == null) {for (Node<E> x = first; x != null; x = x.next) {if (x.item == null) {unlink(x);return true;}}} else {for (Node<E> x = first; x != null; x = x.next) {if (o.equals(x.item)) {unlink(x);return true;}}}return false;
}E unlink(Node<E> x) {// assert x != null;final E element = x.item;final Node<E> next = x.next;final Node<E> prev = x.prev;if (prev == null) {// 上指向为空说明该节点是头节点 所以头节点要改成要移除节点的下指向节点 first = next;} else {prev.next = next;x.prev = null;}if (next == null) {// 下指向为空说明该节点是尾节点 所以尾节点要改成要移除节点的上指向节点last = prev;} else {next.prev = prev;x.next = null;}// 把值置为null 便于GCx.item = null;// 长度-1size--;modCount++;return element;
}
CopyOnWriteArrayList
一、基础常量和结构
// 锁
final transient ReentrantLock lock = new ReentrantLock();// 空数组
private transient volatile Object[] array;
可以看到底层依旧还是数组,但是没了默认大小和容量大小的变量了,而且数组容器被volatile关键字修饰,同时还多了一把锁,这同时说明了CopyOnWriteArrayList是线程安全的设计
为什么没有默认大小了呢?难道不需要判断扩容了?接着往下看
二、构造方法
public CopyOnWriteArrayList() {setArray(new Object[0]);
}
final void setArray(Object[] a) {array = a;
}
// 带参构造
public CopyOnWriteArrayList(E[] toCopyIn) {setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}
两个构造方法:
- 默认的:直接创建了个空数组然后赋值给了容器数组
- 带参的:把传参的数组数据赋值到了一个新数组上面然后把新数组给了容器数组(对Arrays.copyOf不熟的可以看看 上篇ArrayList介绍 )
为什么带参的方法要搞一个新的数组出来然后在赋值给容器数组呢?
因为数组赋值,不是值传递,传递后依旧会受原数组的影响,如下:(不清楚的了解一下值传递和地址传递)
Object[] aa=new Object[]{1,2};
Object[] aa1=aa;
System.out.println("改变前:"+aa1[0]);
aa[0]=3;
System.out.println("改变后:"+aa1[0]);//结果如下:
改变前:1
改变后:3
ps:改的是aa数组
三、add操作
默认添加
整个过程逻辑很简单:
- 加锁,获取容器数组
- 创建一个长度+1 的新数组,并把之前数组的数据复制过去
- 然后新数组尾部赋值,并把新数组重新赋值给容器数组
因为每次添加都会创建一个长度+1的新数组,所以并不需要扩容了
线程安全方面: 容器array是volatile修饰的,即set和get方法都是线程安全的,整个添加过程上了锁,所以整体是通过volatile和lock来保证的线程安全
性能方面: 可以看到舍弃了扩容操作,但每次添加都会创建个新的数组并复制数据过去,这个过程是非常耗时的,所以并不合适频繁写入的场景
源代码如下:
public boolean add(E e) {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取容器数组Object[] elements = getArray();// 获取容器数组长度int len = elements.length;// 创建一个长度+1的新数组 并把之前的数据复制到新数组Object[] newElements = Arrays.copyOf(elements, len + 1);// 新数组尾部赋值newElements[len] = e;// 把新数组重新赋值给容器数组setArray(newElements);return true;} finally {lock.unlock();}
}
指定位置插入
同样也可以指定位置插入,流程跟上述差不多一致,但是有个双Copy操作:
- 加锁,获取容器数组
- 下标校验
- 如果正好是尾部插入:创建一个长度+1 的新数组,并把之前数组的数据复制过去
- 不是尾部:需要复制两次,总的来说就是下标前的数据照旧复制,下标后的数据复制后,整体位置往后移一位
- 然后新数组尾部赋值,并把新数组重新赋值给容器数组
源代码如下:
public void add(int index, E element) {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取容器数组Object[] elements = getArray();// 获取容器数组长度int len = elements.length;// 下标校验if (index > len || index < 0)throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len);Object[] newElements;int numMoved = len - index;if (numMoved == 0)// 如果正好是尾部 则照旧创建并复制newElements = Arrays.copyOf(elements, len + 1);else {// 不是尾部则创建一个长度+1 的新数组newElements = new Object[len + 1];// 依旧是复制 但这里复制了两次 // 0-index的元素复制一次 位置不变System.arraycopy(elements, 0, newElements, 0, index);// index-尾部的元素复制一次 整体位置都后移一位System.arraycopy(elements, index, newElements, index + 1,numMoved);}// 指定位置赋值newElements[index] = element;// 把新数组重新赋值给容器数组setArray(newElements);} finally {lock.unlock();}}
四、get操作
很简单先获取容器数组,然后根据数组下标取值就好
容器数组是volatile修饰的,所以本身get就是线程安全的,始终获取的最新值
源代码如下:
public E get(int index) {return get(getArray(), index);
}
final Object[] getArray() {return array;
}private E get(Object[] a, int index) {return (E) a[index];
}
五、set操作
这个我在ArrayList里面没有介绍,因为很简单就是替换数组下标原本的值即可
这里提一下是因为此操作也是线程安全的上了锁:
- 加锁,获取下标对应的旧值
- 对比新值和旧值,值一样则不作任何操作
- 值不一样则创建个新的数组,然后再修改下标的值,再把新数组回写
源代码如下:
public E set(int index, E element) {final ReentrantLock lock = this.lock;lock.lock();try {// 获取容器数组Object[] elements = getArray();// 获取原本的旧值E oldValue = get(elements, index);// 对比新值和旧值if (oldValue != element) {int len = elements.length;// 值不一致 则创建个新数组,把数据复制过去Object[] newElements = Arrays.copyOf(elements, len);// 修改新数组对应下标下的值newElements[index] = element;// 把新数组写回setArray(newElements);} else {// 新值旧值一样 就不做任何操作 ,把数组写回去就好了setArray(elements);}return oldValue;} finally {lock.unlock();}
}
六、remove操作
根据值remove
- 先遍历查找元素下标所在
- 然后加锁,判断下标是否有效,无效需要重新找一次下标,没找到直接返回false
- 找到了下标然后创建长度-1的新数组,双copy,然后回写到容器数组
总的逻辑不复杂,跟常规的list一样,先查再移除,但由于第一次查找的时候并没有加锁,所以第一次找到的下标到移除的过程中数组可能已经发生了修改,下标会失效,所以在真正移除的时候加锁之后又判断了一次下标的有效性
为什么不直接加锁然后在查找下标后移除呢?
当然也可以,但是加锁会阻塞其他的操作,等于是在遍历查找的时候其他操作就全被阻塞了,但是现在这样假设数组没被修改,则直接双copy移除了,相比更优,假设数组被修改,无非就是再重新遍历一次,从效率上来讲多遍历了一次,效率低了,从阻塞上来看都一样是遍历+双copy,所以综合来说这种设计侧重于使用场景
源代码如下:
public boolean remove(Object o) {// 获取容器数组Object[] snapshot = getArray();// 遍历查找元素所在的下标索引int index = indexOf(o, snapshot, 0, snapshot.length);// 根据索引移除return (index < 0) ? false : remove(o, snapshot, index);
}private boolean remove(Object o, Object[] snapshot, int index) {final ReentrantLock lock = this.lock;lock.lock();try {// 获取当前最新的容器数组Object[] current = getArray();int len = current.length;// 判断传入的数组是否与当前数组相同 如果不相同则需要判断传入index下标的有效性if (snapshot != current) findIndex: {//得到遍历范围最小值,这个范围不一定能找到元素,当元素被后移时//注意index是索引,len是数组大小。int prefix = Math.min(index, len);for (int i = 0; i < prefix; i++) {//严格的判断。只有当两个数组相同索引位置的元素不是同一个元素;//且current索引元素和参数o 是相等的时候 则重新获取赋值index 退出分支if (current[i] != snapshot[i] && eq(o, current[i])) {index = i;break findIndex;}}// 下标已经超过或等于长度 则下标已经无效了 直接返回if (index >= len)return false;// 下标依旧有效并且值相等则 退出分支if (current[index] == o)break findIndex;// 上面都不满足则重新查找一次下标index = indexOf(o, current, index, len);// 不存在了则直接返回if (index < 0)return false;}// 经过上面一顿操作 已经找到了要移除元素的下标了,所以创建个长度-1的新数组Object[] newElements = new Object[len - 1];// 双复制 与之前一样 ,index后面的元素需要往前移System.arraycopy(current, 0, newElements, 0, index);System.arraycopy(current, index + 1, newElements, index, len - index - 1);// 新数组赋值给容器数组setArray(newElements);return true;} finally {lock.unlock();}}
根据下标remove
这个和根据下标add操作有点类似:
- 判断下标是不是尾部,是尾部则直接Arrays.copyOf
- 不是尾部则双arraycopy,index后的数据要整体前移一位
源代码如下:
public E remove(int index) {final ReentrantLock lock = this.lock;lock.lock();try {// 获取容器数组Object[] elements = getArray();int len = elements.length;// 获取旧值E oldValue = get(elements, index);int numMoved = len - index - 1;if (numMoved == 0)// 如果是尾部则直接创建长度-1的数组再复制过去 setArray(Arrays.copyOf(elements, len - 1));else {// 不是尾部 则双copy ,index后的数据整体前移一位Object[] newElements = new Object[len - 1];System.arraycopy(elements, 0, newElements, 0, index);System.arraycopy(elements, index + 1, newElements, index,numMoved);setArray(newElements);}return oldValue;} finally {lock.unlock();}}
总结
从上面可以看出CopyOnWriteArrayList是线程安全的,是volatile和lock来保证的,但是也很明显的可以看出弊端就是对修改的效率不高,每个修改都涉及到copy操作,甚至还有两次copy的,而且每个修改都是在新的数组中进行的,这也应对了CopyOnWrite这个命名;就因为写的效率不高,所以这个更适合在读多写少的场景中使用