文章目录
- 一、CopyOnWriteArrayList 源码
- 1.1. 概述
- 1.2. 思想
- 1.3. 源码
- ① 数据结构
- ② 初始化
- ③ 添加元素
- ④ 获取元素
- ⑤ 删除元素
- 二、ArrayBlockingQueue 源码
- 2.1. 概述
- 2.2. 思想
- 2.3. 源码
- ① 数据结构
- ② 初始化
- ③ 阻塞式获取和新增元素
- ④ 非阻塞式获取和新增元素
- ⑤ 指定超时时间内阻塞式获取和新增元素
- 三、DelayQueue 源码
- 3.1. 概述
- 3.2. 源码
- ① 数据结构
- ② 初始化
- ③ 添加元素
- ④ 阻塞式获取元素
- ⑤ 非阻塞式获取元素
- 四、ConcurrentHashMap 源码
- 4.1. 概述
- 4.2. JDK1.7 版本
- ① 数据结构
- ② 初始化
- ③ 添加元素
- ④ 扩容:rehash
- ⑤ 获取元素
- ⑥ 删除元素
- ⑦ size 方法
- ⑧ isEmpty 方法
- 4.3. JDK1.8 版本
- ① 数据结构
- ② 初始化
- ③ 添加元素
- ④ 获取元素
一、CopyOnWriteArrayList 源码
1.1. 概述
在 JDK1.5 之前,如果想要使用并发安全的 List
只能选择 Vector
。而 Vector
是一种老旧的集合,已经被淘汰。Vector
对于增删改查等方法基本都加了 同步(synchronized),这种方式虽然能够保证同步,但这相当于对整个 Vector
加上了一把大锁,使得每个方法执行的时候都要去获得锁,导致性能非常低下。
在JDK1.5 引入了 JUC
包,其中唯一线程安全 List
实现就是 CopyOnWriteArrayList
。
对于大部分业务场景来说,读取操作往往是远大于写入操作的。由于读取操作不会对原有数据进行修改,因此,对于每次读取都进行加锁其实是一种资源浪费。相比之下,我们应该允许多个线程同时访问 List
的内部数据,毕竟对于读取操作来说是安全的。
这种思路与 ReentrantReadWriteLock
读写锁的设计思路非常类似,即读读不互斥、读写互斥、写写互斥。CopyOnWriteArrayList
更进一步地实现了这一思路。为了将读操作性能发挥到极致,CopyOnWriteArrayList
中的读取操作是完全无需加锁的。更厉害的是,写入操作也不会阻塞读取操作,只有写写才会互斥。这样一来,读操作的性能就可以大幅度提升。
1.2. 思想
CopyOnWriteArrayList
线程安全的核心在于其采用了写时复制(Copy-On-Write)的策略。
那什么是写实复制策略呢?
看一下维基百科对于其的介绍:
写入时复制(英语:Copy-On-Write,简称 COW)是一种计算机程序设计领域的优化策略。其核心思想是,如果存在多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其它调用者所见到的最初的资源仍然保持不变。这过程对其它的调用者都是透明的。此作法主要的优点是如果调用者没有修改该资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。
以 CopyOnWriteArrayList
为例:当需要修改(add
、set
、remove
等操作)CopyOnWriteArrayList
的内容时,不会直接修改原数组,而是会先创建底层数组的副本,对副本数组进行修改,修改完之后再将修改后的数组赋值回去,这样就可以保证写操作不会影响读操作了。
这里可以看出,写时复制机制非常适合读多写少的并发场景,能够极大地提高系统的并发性能。
不过,写时复制机制并不是银弹,依然存在一些缺点,如:
- 内存占用:每次写操作都需要复制一份原始数据,会占用额外的内存空间,在数据量比较大的情况下,可能会导致内存资源不足。
- 写操作开销:每一次写操作都需要复制一份原始数据,然后再进行修改和替换,所以写操作的开销相对较大,在写入比较频繁的场景下,性能可能会受到影响。
- 数据一致性问题:修改操作不会立即反馈到最终结果中,还需要等待复制完成,这可能会导致一定的数据一致性问题。
- …
1.3. 源码
CopyOnWriteArrayList 实现了 List
、RandomAccess
、Cloneable
、Serializable
接口。
public class CopyOnWriteArrayList<E>implements List<E>, RandomAccess, Cloneable, java.io.Serializable {...
}
-
List
: 表明它是一个列表,支持添加、删除、查找等操作,并且可以通过下标进行访问。 -
RandomAccess
:这是一个标志接口,表明实现这个接口的List
集合是支持快速随机访问的。 -
Cloneable
:表明它具有拷贝能力,可以进行深拷贝或浅拷贝操作。 -
Serializable
: 表明它可以进行序列化操作,也就是可以将对象转换为字节流进行持久化存储或网络传输,非常方便。
① 数据结构
底层数据结构
/** 保护锁 */
final transient ReentrantLock lock = new ReentrantLock();/** 存储数组,只能通过getArray和setArray方法访问 */
private transient volatile Object[] array;
没有 size
属性,我们可以猜想,CopyOnWriteArrayList 并没有给未添加元素预留一定空间,数组大小恰好能容纳所有元素。
事实也恰好如此。
public int size() {return getArray().length;
}
② 初始化
构造函数
/** * 默认无参构造函数*/
public CopyOnWriteArrayList() {setArray(new Object[0]);
}/*** 按照集合的迭代器返回的顺序创建的构造函数*/
public CopyOnWriteArrayList(Collection<? extends E> c) {Object[] elements;if (c.getClass() == CopyOnWriteArrayList.class)elements = ((CopyOnWriteArrayList<?>)c).getArray();else {elements = c.toArray();if (c.getClass() != ArrayList.class)elements = Arrays.copyOf(elements, elements.length, Object[].class);}setArray(elements);
}/*** 创建一个包含指定数组的副本的列表*/
public CopyOnWriteArrayList(E[] toCopyIn) {setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}
③ 添加元素
CopyOnWriteArrayList 的 add()
方法有三个版本:
add(E e)
:在尾部插入元素。add(int index, E element)
:在指定位置插入元素。addIfAbsent(E e)
:如果指定元素不存在,那么添加该元素。如果成功添加元素则返回 true。
下面来看一下相关实现。
add(E e) 方法
/*** 添加元素到尾部*/
public boolean add(E e) {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取元素的数组Object[] elements = getArray();// 获取原数组长度int len = elements.length;// 创建一个长度 +1 的新数组,并将原来数组的元素复制给新数组Object[] newElements = Arrays.copyOf(elements, len + 1);// 元素放入新数组末尾newElements[len] = e;// array 指向新数组setArray(newElements);return true;} finally {// 释放锁lock.unlock();}
}
通过以上代码,我们可以知道:
- 方法内部通过
ReentrantLock
加锁,保证了同步,避免了多线程写时会复制多个副本。锁被final
修饰保证了锁的内存地址不会被修改。 - 每次写操作都需要通过
Array.copyOf
复制底层数组,时间复杂度为 O(n),且会占用额外的内存空间。因此,CopyOnWriteArrayList
适用于读多写少的场景,在写操作不频繁且内存资源充足的情况下,可以提升系统的性能表现。
上一篇文章 Java 集合框架 已经简单介绍过 Arrays.copyOf 方法,底层调用了系统级别的拷贝指令,因此在实际应用中这个方法的性能表现比较优秀,但也需要注意控制复制的数据量,避免出现内存占用过高的情况。
add(int index, E element) 方法
/*** 在指定位置插入元素*/
public void add(int index, E element) {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取元素数组Object[] elements = getArray();// 获取原数组长度int len = elements.length;// 判断下标合法性if (index > len || index < 0)throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len);// 声明新数组Object[] newElements;// 计算出需要后移的元素个数int numMoved = len - index;if (numMoved == 0) // 不需要移动元素,表明是末尾插入newElements = Arrays.copyOf(elements, len + 1);else { // 需移动元素// 为新数组申请内存newElements = new Object[len + 1];// 将原数组下标前元素拷贝至新数组(包括下标)System.arraycopy(elements, 0, newElements, 0, index);// 将原数组下标后元素拷贝至新数组后。System.arraycopy(elements, index, newElements, index + 1,numMoved);}// 指定位置插入元素newElements[index] = element;// array 指向新数组setArray(newElements);} finally {// 释放锁lock.unlock();}
}
addIfAbsent(E e) 方法
/*** 添加元素(元素不存在)*/
public boolean addIfAbsent(E e) {// 获取元素数组Object[] snapshot = getArray();return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :addIfAbsent(e, snapshot);
}/*** 在指定数组中指定范围,查找指定元素的下标*/
private static int indexOf(Object o, Object[] elements,int index, int fence) {if (o == null) {for (int i = index; i < fence; i++)if (elements[i] == null)return i;} else {for (int i = index; i < fence; i++)if (o.equals(elements[i]))return i;}return -1;
}
由于方法比较简单,这里就不进行过多概述。
④ 获取元素
通过 COW 策略,我们知道在读取操作时,数据不会发生修改,不需要进行同步控制和锁操作,就可以保证数据安全性。因此 CopyOnWriteArrayList 的读取操作只是基于内部数组,并未实现修改。
/** * 获取指定下标元素*/
public E get(int index) {return get(getArray(), index);
}/*** 获取元素数组*/
final Object[] getArray() {return array;
}/*** 获取指定数组中指定下标的元素*/
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {return (E) a[index];
}
通过以上代码,我们可以知道,获取元素方法并未对下标进行合法校验,因此当下标越界非法时,将抛出越界异常情况。
由于 get
方法是弱一致的,因此在某些情况下可能读到旧的元素值。
⑤ 删除元素
CopyOnWriteArrayList 删除元素相关的方法一共有 4 个:
remove(int index)
:移除此列表中指定位置上的元素。remove(Object o)
:移除此列表中首次出现的指定元素,如果不存在则返回 false。removeAll(Collection<?> c)
:从此列表中移除指定集合中包含的所有元素。clear()
:移除此列表中的所有元素。
remove(int index) 方法
/*** 移除此列表中指定位置上的元素*/
public E remove(int index) {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取元素数组Object[] elements = getArray();// 获取数组长度int len = elements.length;// 获取下标处元素E oldValue = get(elements, index);// 计算移动元素个数int numMoved = len - index - 1;if (numMoved == 0) // 不需移动元素setArray(Arrays.copyOf(elements, len - 1));else { // 需移动元素Object[] newElements = new Object[len - 1];System.arraycopy(elements, 0, newElements, 0, index);System.arraycopy(elements, index + 1, newElements, index,numMoved);setArray(newElements);}// 返回指定下标处元素return oldValue;} finally {// 释放锁lock.unlock();}
}
remove(Object o) 方法
/*** 移除此列表中首次出现的指定元素,如果不存在则返回 false*/
public boolean remove(Object o) {Object[] snapshot = getArray();int index = indexOf(o, snapshot, 0, snapshot.length);return (index < 0) ? false : remove(o, snapshot, index);
}/*** */
private boolean remove(Object o, Object[] snapshot, int index) {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取元素数组Object[] current = getArray();// 获取数组长度int len = current.length;if (snapshot != current) findIndex: { // 快照不等于现数组// 检查指定元素是否存在于下标前int prefix = Math.min(index, len);for (int i = 0; i < prefix; i++) {if (current[i] != snapshot[i] && eq(o, current[i])) {// 更新下标index = i;break findIndex;}}if (index >= len) // 下标越界return false;if (current[index] == o) // 下标未移动break findIndex;index = indexOf(o, current, index, len);if (index < 0) // 元素不存在return false;}// 移除操作Object[] newElements = new Object[len - 1];System.arraycopy(current, 0, newElements, 0, index);System.arraycopy(current, index + 1,newElements, index,len - index - 1);setArray(newElements);return true;} finally {// 释放锁lock.unlock();}
}
removeAll(Collection<?> c) 方法
/** * 从此列表中移除指定集合中包含的所有元素*/
public boolean removeAll(Collection<?> c) {if (c == null) throw new NullPointerException();final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取元素数组Object[] elements = getArray();// 获取数组长度int len = elements.length;if (len != 0) { // 数组非空,存在元素// 新数组长度int newlen = 0;Object[] temp = new Object[len];for (int i = 0; i < len; ++i) {Object element = elements[i];if (!c.contains(element)) // 当前下标处元素不需删除temp[newlen++] = element;}if (newlen != len) { // 新数组长度不等于原数组长度setArray(Arrays.copyOf(temp, newlen));return true;}}return false;} finally {// 释放锁lock.unlock();}
}
clear() 方法
/*** 移除此列表中的所有元素*/
public void clear() {final ReentrantLock lock = this.lock;lock.lock();try {setArray(new Object[0]);} finally {lock.unlock();}
}
二、ArrayBlockingQueue 源码
2.1. 概述
为了解决高并发场景下多线程之间数据共享的问题,JDK1.5 出现了 ArrayBlockingQueue
和 LinkedBlockingQueue
,它们是带有生产者-消费者模式实现的并发容器。其中, ArrayBlockingQueue
是有界队列,即添加的元素达到上限之后,再次添加就会被阻塞或抛出异常。而 LinkedBlockingQueue
则由链表构成的队列,正是因为链表的特性,所以 LinkedBlockingQueue
在添加元素上并不会像 ArrayBlockingQueue
那样有着较多的约束,所以 LinkedBlockingQueue
设置队列是否有界是可选的(有界指的是队列的大小,默认为 Integer.MAX_VALUE
)。
随着 Java 的不断发展,JDK 后续的几个版本又对阻塞队列进行了不少的更新和完善:
- JDK1.6 版本:增加
SynchronousQueue
,一个不存储元素的阻塞队列。 - JDK1.7 版本:增加
TransferQueue
,一个支持更多操作的阻塞队列。 - JDK1.8 版本:增加
DelayQueue
,一个支持延迟获取元素的阻塞队列。
2.2. 思想
阻塞队列就是典型的生产者-消费者模型,它可以做到以下几点:
- 当阻塞队列数据为空时,所有的消费者线程就会被阻塞,等待队列非空。
- 当生产者往队列中填充数据后,队列就会通知消费者队列非空,消费者此时就可以进来消费。
- 当阻塞队列因为消费者消费过慢或生产者存放元素过快导致队列填满,无法容纳新元素时,生产者就会被阻塞,等待队列非满时继续存放元素。
- 当消费者从队列中消费一个元素之后,队列就会通知生产者队列非满,生产者可以继续填充数据。
总结下来就是:阻塞队列就是基于非空和非满两个条件实现生产者和消费者之间的交互,尽管这些交互流程和等待通知的机制实现非常复杂,但阻塞队列的细节被屏蔽,我们只需要调用相关 API 即可实现多线程之间的生产和消费。
这使得阻塞队列在多线程开发中有着广泛的应用,最常见的例子就是我们的线程池,从源码中我们就能看出当核心线程无法及时处理任务时,这些任务都会扔到 workQueue
中。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// ...}
2.3. 源码
ArrayBlockingQueue 继承了 AbstractQueue
类,实现了 BlockingQueue
、Serializable
接口。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {...
}
BlockingQueue
:表明一个阻塞队列,实现了阻塞队列那些常见的操作行为。Serializable
:表明它可以进行序列化操作,也就是可以将对象转换为字节流进行持久化存储或网络传输,非常方便。
① 数据结构
底层数据结构
// 存储数组
final Object[] items;// 队首下标
int takeIndex;// 插入下标
int putIndex;// 队列元素个数
int count;// 主锁
final ReentrantLock lock;// 非空条件
private final Condition notEmpty;// 非满条件
private final Condition notFull;// 迭代器共享状态
transient Itrs itrs = null;
② 初始化
构造函数
/*** 指定队列初始容量,默认非公平锁*/
public ArrayBlockingQueue(int capacity) {this(capacity, false);
}/*** 指定队列初始容量及锁的公平性*/
public ArrayBlockingQueue(int capacity, boolean fair) {// 容量合法性判断if (capacity <= 0)throw new IllegalArgumentException();// 初始化数组this.items = new Object[capacity];// 创建阻塞队列流程控制锁lock = new ReentrantLock(fair);// 用 lock 锁创建两个条件控制队列生产和消费notEmpty = lock.newCondition();notFull = lock.newCondition();
}/*** 指定初始容量及锁的公平性,并插入元素集合*/
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {// 初始化容量及锁的公平性this(capacity, fair);final ReentrantLock lock = this.lock;// 加锁lock.lock(); try {int i = 0;// 插入指定集合中元素 try {for (E e : c) {checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}// 记录队列容量count = i;// 更新下一次插入下标putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}
}
关于锁的公平性,指的是对于锁的争抢是否随机。
③ 阻塞式获取和新增元素
ArrayBlockingQueue 阻塞式获取和新增元素对应的就是生产者-消费者模型,虽然它也支持非阻塞式获取和新增元素,但一般不会使用。
阻塞式获取和新增元素的方法为:
put(E e)
:将元素插入队列中,如果队列已满,则该方法会一直阻塞,直到队列有空间可用或线程被中断。take()
:获取并移除队列头部的元素,如果队列为空,则该方法会一直阻塞,直到队列非空或者线程被中断。
下面来看一下方法的具体实现。
put(E e) 方法
/*** 队列尾部插入指定元素,等待队列已满时可用的空间。*/
public void put(E e) throws InterruptedException {// 确保插入元素不为 nullcheckNotNull(e);// 获取锁final ReentrantLock lock = this.lock;// 加锁lock.lockInterruptibly();try {// 判断队列是否满while (count == items.length) // 队列已满// 等待notFull.await();// 元素入队enqueue(e);} finally {// 释放锁lock.unlock();}
}/** * 检查元素非空*/
private static void checkNotNull(Object v) {if (v == null)throw new NullPointerException();
}/*** 元素入队*/
private void enqueue(E x) {// 获取队列数组final Object[] items = this.items;// 插入元素items[putIndex] = x;// 更新 putindexif (++putIndex == items.length)putIndex = 0;// 队列长度加一count++;// 通知队列非空,获取元素的阻塞线程可以工作notEmpty.signal();
}
take() 方法
/*** 获取元素*/
public E take() throws InterruptedException {// 获取锁final ReentrantLock lock = this.lock;// 加锁lock.lockInterruptibly();try {// 判断队列是否空while (count == 0) // 队列已空// 等待notEmpty.await();// 出队并返回元素return dequeue();} finally {// 释放锁lock.unlock();}
}/** * 出队*/
private E dequeue() {// 获取队列数组final Object[] items = this.items;@SuppressWarnings("unchecked")// 取出队首元素E x = (E) items[takeIndex];// 取出位置置空items[takeIndex] = null;// 更新队首下标if (++takeIndex == items.length)takeIndex = 0;// 队列长度减一count--;// 判断是否存在活跃迭代器if (itrs != null)// 更新迭代器状态itrs.elementDequeued();// 通知队列非满notFull.signal();return x;
}
④ 非阻塞式获取和新增元素
非阻塞式获取和新增元素的方法为:
offer(E e)
:将元素插入队尾。如果队列已满,返回 false,不会等待并阻塞线程。poll()
:获取并移除队首元素,如果队列为空,则直接返回 null,不会等待阻塞线程。add(E e)
:将元素插入队尾,如果队列已满,则抛出IllegalStateException
异常,底层基于offer(E e)
方法。remove()
:获取并移除队首元素,如果队列为空,则抛出NoSuchElementException
异常,底层基于poll()
方法。peek()
:获取但不移除队首元素,如果队列为空,则直接返回 null,不会等待阻塞线程。
这些方法实现与阻塞式方法实现差不多,唯一的区别就是失败不阻塞线程,会抛出异常或直接返回。
offer(E e) 方法
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}
}
poll() 方法
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}
}
add(E e) 方法
public boolean add(E e) {return super.add(e);
}// java.util.AbstractQueue
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");
}
remove() 方法
// java.util.AbstractQueue
public E remove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();
}
peek() 方法
public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}
}final E itemAt(int i) {return (E) items[i];
}
⑤ 指定超时时间内阻塞式获取和新增元素
在 offer(E e)
和 poll()
非阻塞式获取和新增元素的基础上,设计者提供了带有等待时间的 offer(E e, long timeout, TimeUnit unit)
和 poll(long timeout, TimeUnit unit)
,用于在指定的超时时间内阻塞式地获取和新增元素。
下面来看一下具体实现。
offer(E e, long timeout, TimeUnit unit) 方法
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 队列满,循环等待while (count == items.length) {if (nanos <= 0) // 时间到了队列还满,则直接返回 falsereturn false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}
}
poll(long timeout, TimeUnit unit) 方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 队列为空,循环等待while (count == 0) {if (nanos <= 0) // 时间到了,队列还为空,则直接返回 nullreturn null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}
}
有细心的盆友会发现,ArrayBlockingQueue 加锁采用 lock.lockInterruptibly()
方法,而不是 lock.lock()
,这是为了响应中断,如果在等待获取锁的过程中被打断则该方法会抛出 InterruptedException
异常,及时退出。
三、DelayQueue 源码
3.1. 概述
DelayQueue 是 JUC 包为我们提供的延迟队列,用于实现延时任务。它是 BlockingQueue
的一种,底层是一个基于 PriorityQueue
实现的一个无界队列,是线程安全的。
虽然 DelayQueue 是在 JDK1.5 的 JUC 包引入,但版本仅仅支持延迟功能,未解决线程安全问题。在 JDK1.6 中解决线程安全问题,并在后续版本对实现方式和可靠性进行进一步优化。
3.2. 源码
DelayQueue 继承了 AbsractQueue
类,实现了 BlockingQueue
接口。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {...
}
BlockingQueue
:表明一个阻塞队列,实现了阻塞队列那些常见的操作行为。
① 数据结构
底层数据结构
// 保护锁
private final transient ReentrantLock lock = new ReentrantLock();
// 存储集合(按照到期时间进行升序排序)
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 指向准备执行优先级最高的线程
private Thread leader = null;
// 实现多线程之间等待唤醒的交互
private final Condition available = lock.newCondition();
② 初始化
构造函数
/*** 默认无参构造器*/
public DelayQueue() {}/** * 插入集合元素*/
public DelayQueue(Collection<? extends E> c) {this.addAll(c);
}
由于所有成员变量在类加载时都已经初始完成了,所以构造函数就比较简单。
不了解类加载机制,可以看另一篇文章:Java 类加载机制
③ 添加元素
offer 方法
public boolean offer(E e) {// 获取锁final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 添加至优先队列q.offer(e);// 查看队首元素是否为插入元素if (q.peek() == e) { // 插入元素优先级最高// leader 置为空,调用获取元素方法而阻塞线程执行任务leader = null;available.signal();}return true;} finally {// 释放锁lock.unlock();}
}
add 方法、put 方法
public boolean add(E e) {return offer(e);
}public void put(E e) {offer(e);
}
④ 阻塞式获取元素
take 方法
public E take() throws InterruptedException {// 获取锁final ReentrantLock lock = this.lock;// 加锁lock.lockInterruptibly();try {for (;;) {// 查看队首元素E first = q.peek();// 判断队首是否空if (first == null) // 队首元素为空// 将当前线程放入 ConditionObject 的等待队列中available.await();else {// 获取过期时间long delay = first.getDelay(NANOSECONDS);// 判断是否已过期if (delay <= 0) // 已过期// 返回return q.poll();// 未过期// 释放引用first = null; // don't retain ref while waiting// 判断是否有线程等待if (leader != null) // 存在线程等待// 当前线程进入等待available.await();else { // 不存在线程等待// 将当前线程变为 leaderThread thisThread = Thread.currentThread();leader = thisThread;try {// 进入有限期等待available.awaitNanos(delay);} finally {// 等待任务到期时,释放 leader,进入下一次循环任务 returnif (leader == thisThread)leader = null;}}}}} finally {// 当 leader 为空,队列中存在任务,唤醒等待线程if (leader == null && q.peek() != null)available.signal();// 释放锁lock.unlock();}
}
⑤ 非阻塞式获取元素
poll 方法
public E poll() {// 获取锁final ReentrantLock lock = this.lock;// 释放锁lock.lock();try {// 查看队首元素E first = q.peek();// 判断队首是否为空 或 队首任务是否过期if (first == null || first.getDelay(NANOSECONDS) > 0) // 队首为空 或 队首未过期return null;elsereturn q.poll();} finally {// 释放锁lock.unlock();}
}
四、ConcurrentHashMap 源码
4.1. 概述
由于 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap()
方法来包装我们的 HashMap。但这是通过使用一个全局锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。
所以就有了 HashMap 的线程安全版本 – ConcurrentHashMap 的诞生。
在 JDK1.7 时,ConcurrentHashMap 对整个桶数组进行分割分段(Segment,分段锁),每把锁只锁容器其中一部分数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。
到了 JDK1.8 时,ConcurrentHashMap 已经摒弃了 Segment 的概念,而是直接使用 Node 数组 + 链表 + 红黑树的数据结构来实现,并发控制使用 synchronized 和 CAS 来操作。
下面分别对其实现进行解析学习。
4.2. JDK1.7 版本
① 数据结构
JDK1.7 中,整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表 “部分” 或 “一段” 的意思,而每一个 Segment 是一个类似于 HashMap 的结构,所以 HashMap 内部可以进行扩容,但 Segment 的个数一旦初始化就不能改变,默认 Segment 的个数为 16,所以理论上,最多同时支持 16 个线程并发写。
底层数据结构
//初始的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//初始的加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//初始的并发等级(下面会叙述作用)
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小的segment数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大的segment数量
static final int MAX_SEGMENTS = 1 << 16; static final int RETRIES_BEFORE_LOCK = 2;/*** Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组* 每个Segment相当于一个子Hash表*/
final Segment<K,V>[] segments;/*** segmentMask和segmentShift主要是为了定位段*/
final int segmentMask;final int segmentShift;
Segment 结构
static final class Segment<K,V> extends ReentrantLock implements Serializable {//volatile,这使得能够读取到最新的 table值而不需要同步transient volatile HashEntry<K,V>[] table;//count用来统计该段数据的个数transient int count;//modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变transient int modCount;//threashold用来表示需要进行rehash的界限值transient int threshold;//loadFactor表示负载因子。final float loadFactor;Segment(float lf, int threshold, HashEntry<K,V>[] tab) {this.loadFactor = lf;this.threshold = threshold;this.table = tab;}...
}
HashEntry 结构
static final class HashEntry<K,V> {final int hash;final K key;volatile V value;volatile HashEntry<K,V> next;
}
② 初始化
/*** 默认无参构造器*/
public ConcurrentHashMap() {this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {// 参数校验if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();// 校验并发级别大小,大于 1 << 16,重置为 65536if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0;int ssize = 1;// 计算并行级别,由于要保持并行级别为 2 的 n 次方while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}// 记录段偏移量this.segmentShift = 32 - sshift;// 记录段掩码this.segmentMask = ssize - 1;// 设置容量if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;// c = 容量 / ssize ,默认 16 / 16 = 1,这里是计算每个 Segment 中的类似于 HashMap 的容量int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = MIN_SEGMENT_TABLE_CAPACITY;//Segment 中的类似于 HashMap 的容量至少是2或者2的倍数while (cap < c)cap <<= 1;// 创建 Segment 数组,设置 segments[0]Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];// 数组中写入 segment[0]UNSAFE.putOrderedObject(ss, SBASE, s0);this.segments = ss;
}
③ 添加元素
注意:以下成员变量均按照默认值
put 方法
public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);// hash 值高 4 位与 (1111) 做与运算int j = (hash >>> segmentShift) & segmentMask;// 由于只初始化了 segment[0],可能 segment[j] 需要初始化if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) // 初始化 segment[j]s = ensureSegment(j);// 向 segment[j] 中插入元素return s.put(key, hash, value, false);
}// Segment 内部方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {// 在往该 segment 写入前,需要先获取该 segment 的独占锁HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {// 这个是 segment 内部的数组HashEntry<K,V>[] tab = table;// 再利用 hash 值,求应该放置的数组下标int index = (tab.length - 1) & hash;// first 是数组该位置处的链表的表头HashEntry<K,V> first = entryAt(tab, index);// 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {// 覆盖旧值e.value = value;++modCount;}break;}// 继续顺着链表走e = e.next;}else {// node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。// 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;// 如果超过了该 segment 的阈值,这个 segment 需要扩容if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node); // 扩容后面也会具体分析else// 没有达到阈值,将 node 放到数组 tab 的 index 位置,// 其实就是将新的节点设置成原链表的表头setEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {// 解锁unlock();}return oldValue;
}/*** 初始化槽*/
private Segment<K,V> ensureSegment(int k) {final Segment<K,V>[] ss = this.segments;long u = (k << SSHIFT) + SBASE; // raw offsetSegment<K,V> seg;if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {// 这里看到为什么之前要初始化 segment[0] 了,// 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]// 为什么要用“当前”,因为 segment[0] 可能早就扩容过了Segment<K,V> proto = ss[0];int cap = proto.table.length;float lf = proto.loadFactor;int threshold = (int)(cap * lf);// 初始化 segment[k] 内部的数组HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // 再次检查一遍该槽是否被其他线程初始化了。Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);// 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))break;}}}return seg;
}
/** * 获取写入锁*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {HashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;HashEntry<K,V> node = null;int retries = -1; // negative while locating node// 循环获取锁while (!tryLock()) {HashEntry<K,V> f; // to recheck first belowif (retries < 0) {if (e == null) {if (node == null) // speculatively create node// 进到这里说明数组该位置的链表是空的,没有任何元素// 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置node = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))retries = 0;else// 顺着链表往下走e = e.next;}// 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁// lock() 是阻塞方法,直到获取锁后返回else if (++retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) == 0 &&// 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头// 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;
}
④ 扩容:rehash
由于 segment 数组不能扩容,因此扩容是 segment 数组某个位置内部的数组 HashEntry<K, V>[] 进行扩容,容量为原来的 2 倍。
触发扩容的地方为 put 元素时,元素的插入导致元素个数超过阈值,因此触发扩容。由于插入元素时,已经获取了独占锁,因此这个方法不需要考虑并发问题。
rehash 方法
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;// 2 倍int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);// 创建新数组HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’int sizeMask = newCapacity - 1;// 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置for (int i = 0; i < oldCapacity ; i++) {// e 是链表的第一个元素HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;// 计算应该放置在新数组中的位置,// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19int idx = e.hash & sizeMask;if (next == null) // 该位置处只有一个元素,那比较好办newTable[idx] = e;else { // Reuse consecutive sequence at same slot// e 是链表表头HashEntry<K,V> lastRun = e;// idx 是当前链表的头节点 e 的新位置int lastIdx = idx;// 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}// 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置newTable[lastIdx] = lastRun;// 下面的操作是处理 lastRun 之前的节点,// 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;
}
大家可能对最后两个 for 循环存在疑惑,这里第一个 for 循环是为了寻找下一个 lastRun 节点(节点后面的所有 next 节点的新位置都素一样的),然后将这些作为一个链表赋值到新位置。第二个 for 循环是为了把剩余的元素通过头插法插入到指定位置链表。
据统计,如果使用默认的阈值,大约只有1/6 的节点需要克隆。因此可以采用这样的方式提高效率。
⑤ 获取元素
get 方法
public V get(Object key) {Segment<K,V> s; // manually integrate access methods to reduce overheadHashEntry<K,V>[] tab;// 计算 hash 值int h = hash(key);long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;// 根据 hash 找到对应的 segmentif ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {// 找到segment 内部数组相应位置的链表,遍历for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;
}
相较于 put 方法,get 方法就较为简单了,
- 只需要计算 hash 值,找到 segment 数组中位置。
- 再根据 hash ,找到数组中位置。
- 对链表进行遍历查找。
⑥ 删除元素
remove(E e) 方法
public V remove(Object key) {// 计算 hash 值int hash = hash(key);// 定位到 SegmentSegment<K,V> s = segmentForHash(hash);// 删除元素return s == null ? null : s.remove(key, hash, null);
}
remove(Object key, Object value) 方法
public boolean remove(Object key, Object value) {int hash = hash(key);Segment<K,V> s;return value != null && (s = segmentForHash(hash)) != null &&s.remove(key, hash, value) != null;
}
下面来看一下真正删除元素的代码实现:
final V remove(Object key, int hash, Object value) {//获取同步锁if (!tryLock())scanAndLock(key, hash);V oldValue = null;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;// 获取 HashEntry 链表HashEntry<K,V> e = entryAt(tab, index);//遍历链表用来保存当前链表节点的前一个节点HashEntry<K,V> pred = null;while (e != null) {K k;HashEntry<K,V> next = e.next;//找到key对应的键值对if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {V v = e.value;//键值对的值与传入的value相等if (value == null || value == v || value.equals(v)) {//当前元素为头节点,把当前元素的下一个节点设为头节点if (pred == null)setEntryAt(tab, index, next);//不是头节点,把当前链表节点的前一个节点的next指向当前节点的下一个节点elsepred.setNext(next);++modCount;--count;oldValue = v;}break;}pred = e;e = next;}} finally {// 释放锁unlock();}return oldValue;
}// 方法实现与 scanAndLockForPut 类似
private void scanAndLock(Object key, int hash) {// similar to but simpler than scanAndLockForPutHashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;int retries = -1;while (!tryLock()) {HashEntry<K,V> f;if (retries < 0) {if (e == null || key.equals(e.key))retries = 0;elsee = e.next;}else if (++retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f;retries = -1;}}
}
⑦ size 方法
size 方法这里需要说明一下其代码实现,由于并发情况下,元素个数可能不停变化,也不能直接将所有 Segment 上锁,为了解决这一问题,ConcurrentHashMap 采用折中的方法,先采用乐观的方式(认为在统计过程中没有其它线程修改 Segment 结构),它将无锁遍历两次进行统计,如果上一次遍历结果与这一次遍历结果相同就返回统计结果;如果不相同,那就上锁进行同步。
public int size() {// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.final Segment<K,V>[] segments = this.segments;int size;boolean overflow; // true if size overflows 32 bitslong sum; // sum of modCountslong last = 0L; // previous sumint retries = -1; // first iteration isn't retrytry {for (;;) {//达到RETRIES_BEFORE_LOCK,也就是两次if (retries++ == RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // force creation}sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);//遍历计算segment的modCount和count的和if (seg != null) {sum += seg.modCount;int c = seg.count;//是否溢出int范围if (c < 0 || (size += c) < 0)overflow = true;}}//last是上一次的sum值,相等跳出循环if (sum == last)break;last = sum;}} finally {//解锁if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size;
}
⑧ isEmpty 方法
isEmpty 方法也没有采用同步锁的方式,采用乐观的方式来判空。
public boolean isEmpty() {//累计segment的modCount值long sum = 0L;final Segment<K,V>[] segments = this.segments;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {if (seg.count != 0)return false;sum += seg.modCount;}}//再次检查if (sum != 0L) { // recheck unless no modificationsfor (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {if (seg.count != 0)return false;sum -= seg.modCount;}}if (sum != 0L)return false;}return true;
}
4.3. JDK1.8 版本
① 数据结构
JDK1.8 中,ConcurrentHashMap 与 HashMap 基本一样,不过它需要保证线程安全性。
底层数据结构
// 现有表
transient volatile Node<K,V>[] table;
// 更新后表
private transient volatile Node<K,V>[] nextTable;private transient volatile long baseCount;
/*** 表状态* * -1:正在初始化* * 小于 -1:正在被 -sizeCtrl-1 线程扩容* * 大于 0:未初始化,表示初始化的容量;初始化后,下一次扩容的容量-*/
private transient volatile int sizeCtl;private transient volatile int transferIndex;private transient volatile int cellsBusy;private transient volatile CounterCell[] counterCells;// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;
② 初始化
构造函数
/** * 默认无参构造器*/
public ConcurrentHashMap() {
}/*** 指定初始容量*/
public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)throw new IllegalArgumentException();// 这里是对 [1.5 * initCapacity] 向上取 2 的 n 次方 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;
}/*** 指定初始容量及负载因子*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, 1);
}/** * 指定初始容量、负载因子及并发线程数*/
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;
}/** * 指定数x最小的 2 的 N 次方*/
private static final int tableSizeFor(int c) {int n = c - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
③ 添加元素
put 方法
public V put(K key, V value) {return putVal(key, value, false);
}
/*** 插入元素实现*/
final V putVal(K key, V value, boolean onlyIfAbsent) {// 键值对判空if (key == null || value == null) throw new NullPointerException();// 高十六位哈希法计算 hash 值int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// Node 数组判空if (tab == null || (n = tab.length) == 0) // 数组未初始化// 初始化数组tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 指定 hash 位头节点为空// CAS 放入,不加锁,成功就 break 跳出if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED) // 正在进行扩容tab = helpTransfer(tab, f);else { // 此时 f 为该 hash 位头节点,且不为空V oldVal = null;// 对头节点加锁synchronized (f) {if (tabAt(tab, i) == f) {// 头节点 hash 值判断if (fh >= 0) { // 为链表binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;// 寻找 hash、key 相同的节点,判断是否值覆盖,之后 breakif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}// 链表末尾判断Node<K,V> pred = e;if ((e = e.next) == null) { // 到了链表末尾// 新值插入链表末尾pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) { // 为红黑树Node<K,V> p;binCount = 2;// 树节点插入元素if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) { // 节点已存在oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 判断是否尝试转为红黑树if (binCount >= TREEIFY_THRESHOLD)// 尝试转为红黑树,若数组长度小于 64,会继续数组扩容,而不转换treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;
}static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hashstatic final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;
}·/** * 初始化 Node 数组*/
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 其它线程执行 CAS 成功,正在进行初始化if ((sc = sizeCtl) < 0)// 让出 CPUThread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // sizeCtl 设置为 -1,代表抢到锁try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}break;}}return tab;
}/** * 链表转换为红黑树*/
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {// 判断数组长度与 64 关系if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 数组长度小于 64tryPresize(n << 1); // 数组扩容else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 头节点不为空且,且不在扩容状态// 头节点加锁synchronized (b) {// 进行转换if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}
}/** * 数组扩容*/
private final void tryPresize(int size) { // 此时 size 已经翻过倍了// 如果小于最大容量一半,设置为 size 的 1.5 倍再 +1,再向上取最近 2 的 n 次方 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// 是否初始化if (tab == null || (n = tab.length) == 0) { // 未初始化n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}else if (c <= sc || n >= MAXIMUM_CAPACITY) // 未到下一次扩容容量或已到达最大容量break;else if (tab == table) {// 计算扩容标志int rs = resizeStamp(n);if (sc < 0) { // 表示有线程正在扩容Node<K,V>[] nt;// 出现以下情况不帮助扩容迁移if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// 用 CAS 将 sizeCtl+1,然后进行真正扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2)) // 没有线程在扩容。通过原子操作尝试扩容transfer(tab, null);}}
}
④ 获取元素
get 方法
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 计算 hash 值int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 指定位置元素存在,头节点 hash 值相同if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))// key hash 值相等,key 值相同,直接返回元素 valuereturn e.val;}else if (eh < 0)// 头节点 hash 值小于 0,说明正在扩容或者红黑树,find 查找return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {// 链表遍历查找if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}