文章目录
- 概述
- BlockingQueue
- ArrayBlockingQueue
- 数据存储相关属性
- 阻塞特性相关属性
- 主要方法
- LinkedBlockingQueue
- LinkedBlockingQueue 主要属性
- LinkedBlockingQueue 设计思想
- ConcurrentLinkedQueue
- PriorityBlockingQueue
- PriorityBlockingQueue 主要属性
- PriorityBlockingQueue 设计思想
- SynchronousQueue
- LinkedBlockingDeque
- DelayQueue
- DelayQueue 主要属性
- DelayQueue 主要设计思想
- CopyOnWrite
- CopyOnWriteArrayList
- CopyOnWriteArrayList 的主要属性
- CopyOnWriteArrayList 主要设计思想
- CopyOnWriteArraySet
概述
常用的并发集合类,主要有 ArrayBlockingQueue
,ConcurrentLinkedQueue
,LinkedBlockingQueue
,PriorityBlockingQueue
,DelayQueue
,SynchronousQueue
,LinkedBlockingDeque
,以及 COW
系列的集合,如 CopyOnWriteArrayList
,CopyOnWriteArraySet
等
BlockingQueue
BlockingQueue
描述了一个阻塞队列应该有的行为,例如:
- 阻塞读方法:
take()
- 阻塞写方法:
put()
阻塞的读写方法的函数签名上都会抛出 InterruptedException
,具体的阻塞逻辑由实现类自行实现。
ArrayBlockingQueue
是一个基于数组实现的阻塞队列,创建对象时必须指定容量
数据存储相关属性
内置了一个循环队列用于存储数据,可以更高效地利用内存空间
/** The queued items */final Object[] items;/** items index for next take, poll, peek or remove */int takeIndex;/** items index for next put, offer, or add */int putIndex;/** Number of elements in the queue */int count;
阻塞特性相关属性
内置一个 ReentrantLock
锁对象,一个队列是否为空的 Condition
对象,以及一个队列是否满的 Condition
对象
/** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;
其中,notEmpty
条件和 notFull
条件都是由 lock
对象创建出来的
主要方法
- 构造方法中必须传入容量,可以通过
public ArrayBlockingQueue(int capacity, boolean fair)
中的fair
参数来指定公平/非公平模式fair
参数最终会用于构造ReentrantLock
锁对象
put
方法会在队列满的时候((队尾下标+1
)%队列长度=队头下标))调用notEmpty
条件进行等待,等队列再次有空位的时候(有元素出队时将会被唤醒)将元素放至队尾take
会在队列为空的时候(队尾下标=队头下标)调用notFull
条件进行等待,当队列不为空时,将队头的元素返回,并删除队头元素
LinkedBlockingQueue
LinkedBlockingQueue
是一个基于单向链表实现的阻塞队列,其容量为:
- 如果在构造函数中指定了容量,那么容量就是队列长度的最大边界限制
- 如果没有在构造函数中指定容量,容量将会默认设置成
Integer.MAX_VALUE
,近似无界
所以,LinkedBlockingQueue
在不传入容量的情况下,最大容量会被设置为 Integer.MAX_VALUE
LinkedBlockingQueue 主要属性
/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;/** Current number of elements */private final AtomicInteger count = new AtomicInteger();/*** Head of linked list.* Invariant: head.item == null*/transient Node<E> head;/*** Tail of linked list.* Invariant: last.next == null*/private transient Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue
底层的数据结构是单向链表- 内置一个读取锁,
ReentrantLock takeLock
,以及由这把锁创建出来的非空条件Condition notEmpty
- 内置一个写入锁,
ReentrantLock putLock
,以及由这把锁创建出来的非空条件Condition notFull
LinkedBlockingQueue 设计思想
- 执行
take()
方法的时候,会从队列的头部取出元素,所以必须要获取读取锁takLock
,从而确保同时只有一个线程可以操作链表的头部- 当执行
take()
方法的时候如果队列为空,那么当前线程将会调用notEmpty.await()
方法进行阻塞,一直到有元素放入事件发生(如调用put()
方法)
- 当执行
- 执行
put()
方法的时候,会从队列的尾部添加元素,所以必须要获取到写入锁putLock
,从而确保同时只有一个线程可以操作链表的尾部- 当执行
put()
方法的时候如果队列满了,那么当前线程将会调用notFull.await()
方法进行阻塞,一直到有元素取出事件发生(如调用take()
方法)
- 当执行
- 当执行
remove()
、contains()
等方法时,将会同时获取读、写两把锁,在方法结束后同时释放两把锁- 这样做的目的是因为
remove()
等方法不能确定在链表的某个位置操作(读取或者新增、删除)元素,所以需要同时对队头和队尾都加锁
- 这样做的目的是因为
总的来说,LinkedBlockingQueue
就是通过控制链表同一方向的操作,来实现线程安全和读写条件阻塞;当不能确定操作元素的位置时,将进行双重加锁
ConcurrentLinkedQueue
ConcurrentLinkedQueue
是一个基于单向链表实现的无界线程安全队列,基于 CAS
+ 自旋来保证线程安全
PriorityBlockingQueue
PriorityBlockingQueue
是一个基于优先级的无界阻塞队列,底层的数据结构是堆。
PriorityBlockingQueue 主要属性
/*** Priority queue represented as a balanced binary heap: the two* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The* priority queue is ordered by comparator, or by the elements'* natural ordering, if comparator is null: For each node n in the* heap and each descendant d of n, n <= d. The element with the* lowest value is in queue[0], assuming the queue is nonempty.*/private transient Object[] queue;/*** The number of elements in the priority queue.*/private transient int size;/*** The comparator, or null if priority queue uses elements'* natural ordering.*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;/*** Spinlock for allocation, acquired via CAS.*/private transient volatile int allocationSpinLock;
PriorityBlockingQueue
底层有一个可扩容的对象数组Object[] queue
,结构是堆- 内置一个
ReentrantLock
独占锁,以及由这个锁构造出来的Condition notEmpty
条件 - 一个
volatile
关键字修饰的整型变量volatile int allocationSpinLock
PriorityBlockingQueue 设计思想
- 所有的存取方法都需要获取锁后才能执行,包括
size()
方法 - 当数组的长度大于等于容量时,需要进行扩容操作
- 扩容之前先要释放锁
- 首先要尝试使用
CAS
尝试把allocationSpinLock
变量的值修改为1
- 当前容量小于
64
时,增长为原来的2
倍 +2
;否则增长为原来的1.5
倍 - 还原
allocationSpinLock
的值 - 扩容完成之后,再次获取锁
- 插入元素时,会按照指定的
Comparator
的比较方法进行插入;否则将会使用元素自身的Comparable.compareTo()
方法进行比较 - 调用
take()
方法时如果队列为空时,当前线程将会调用notEmpty.await()
进行阻塞,直到有新的元素入队后被唤醒 - 出队的元素始终是堆顶元素
SynchronousQueue
一个不存储元素的阻塞队列,每一个入队操作必须对应一个出队操作,每一个出队操作必须对应一个入队操作。
LinkedBlockingDeque
LinkedBlockingQueue
是一个双端阻塞队列,底层数据结构是双向链表。
- 内置一个
ReentrantLock
独占锁,以及由这个锁构造出来的Condition notEmpty
(队列为空,等待放入) 和Condition notFull
(队列已满,等待取出)条件 - 当队列为空时调用阻塞取出相关方法(
take()
)时,将会调用notEmpty.await()
方法进行阻塞,直到有元素被放入到队列中 - 当队列已满时调用阻塞存入相关方法(
put()
)时,将会调用notFull.await()
方法进行阻塞,直到有元素被取出 - 可以从队列的两端(头部/尾部)进行元素操作(存入/取出)元素,意味着可以使用
LinkeBlockingQueue
自由实现栈或者队列
DelayQueue
DelayQueue
是一个无界延时阻塞队列。底层的数据结构是优先级队列 PriorityQueue
DelayQueue 主要属性
private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();private Thread leader = null;/*** Condition signalled when a newer element becomes available* at the head of the queue or a new thread may need to* become leader.*/private final Condition available = lock.newCondition();
- 内置一个
ReentrantLock
独占锁,以及一个由锁对象创建出来的条件对象Condition available
- 底层数据结构是
PriorityQueue
,是由堆来实现的 - 有一个
Thread leader
属性,代表的是当前第一个尝试等待获取队头元素的线程
DelayQueue 主要设计思想
- 所有放入到
DelayQueue
中的元素,都要实现Delayed
接口 - 当队列为空时调用出队方法,将会调用
available.await()
方法进行阻塞,直到有新的元素被添加到队列中 - 当队列不为空时调用出队方法,首先判断队头元素的剩余过期时间是否小于等于
0
- 如果队头元素的剩余过期时间小于等于
0
,那么说明元素已经到期,直接把这个元素移除并返回 - 如果队头元素的剩余过期时间(
long delay
)大于0
,将会判断当前leader
是否为空- 如果为空,则把自己设置为
leader
,并且调用available.awaitNanos(long delay)
方法,将自己阻塞long delay
时间 - 如果不为空,说明已经有其他线程在等待获取元素了,那么将调用
available.await()
方法进行无限期等待
- 如果为空,则把自己设置为
- 在阻塞了
long delay
时间后,重新将leader
置为null
,并且执行上面的逻辑
- 如果队头元素的剩余过期时间小于等于
- 所有存取方法调用前都需要获取锁
CopyOnWrite
CopyOnWrite
,顾名思义,就是写时复制的意思。
主要的设计思想就是在修改数据的时候,并不直接在原数据上进行修改,而是先拷贝一份原数据的副本,后续对于数据的修改操作在拷贝出来的副本上进行,最后将副本替换掉原数据。
这样做的好处就是,在读取数据的时候,是不用采取任何并发控制手段的,直接访问源数据就行,所以写时复制适用于读多写少的场景。
CopyOnWriteArrayList
CopyOnWriteArrayList
是一个并发安全的动态数组,底层数据结构是数组。
CopyOnWriteArrayList 的主要属性
/** The lock protecting all mutators */final transient ReentrantLock lock = new ReentrantLock();/** The array, accessed only via getArray/setArray. */private transient volatile Object[] array;
- 内置一个
ReentrantLock
独占锁 - 一个用
volatile
关键字修饰的对象数组array
CopyOnWriteArrayList 主要设计思想
- 在所有的修改方法执行之前,首先要获取锁
- 在使用元素添加方法时,每次会拷贝出一个长度
+1
的新数组,同时把要添加的元素放入到新数组的最后一个位置上,最后把旧数组替换成新数组 - 在使用元素移除方法时,每次会拷贝出一个长度
-1
的新数组,最后根据要删除的元素位置,进行数组拷贝,最后把旧数组替换成新数组 - 所有读取方法都没有采取任何同步措施,包括获取迭代器
- 获取迭代器的方法,并没有像很多文章中说的对当前数组进行了一个拷贝,而是直接把当前数组的引用传递给了迭代器
- 之所以不用对当前数组进行拷贝,是因为每次的修改方法都会创建出一个新数组,所以
CopyOnWriteArrayList
迭代器根本不会,也没有必要去拷贝当前数组
CopyOnWriteArraySet
CopyOnWriteArraySet
是一个并发安全的无重复集合,底层是基于 CopyOnWriteArrayList
来实现的