- 前言
- 一、Queue接口的定义
- 二、AbstractQueue实现Queue的基本操作
- 1.AbstractQueue源码注释解析
- 2.方法add、remove、element、clear、addAll的实现原理
- 三、BlockingQueue接口定义解析
- 1.入列操作
- 2.出列操作
- 3.其他操作
- 四、LinkedBlockingQueue源码解析
- 1.LinkedBlockingQueue初步介绍
- 2.链表节点Node介绍
- 3.LinkedBlockingQueue基本属性介绍
- (3.1).capacity队列总容量
- (3.2).count队列节点计数器
- (3.3).head队列头结点
- (3.4).last尾部节点
- (3.5).入队锁putLock、notFull
- (3.6).出队锁takeLock、notEmpty
- 4.LinkedBlockingQueue核心方法源码解析
- (4.1).signalNotEmpty唤醒在notEmpty条件上等待的线程
- (4.2).signalNotFull唤醒在notFull条件上等待的线程
- (4.3).fullyLock锁住入列、出列操作
- (4.4).fullyUnlock解锁入列、出列操作
- (4.5).LinkedBlockingQueue构造函数
- (4.6).enqueue入列函数
- (4.7).dequeue出列函数
- (4.8).size函数统计当前队列节点个数
- (4.9).remainingCapacity函数计算当前队列剩余空间容量
- (5.0).阻塞入列put函数
- (5.1).入列offer函数
- (5.3).阻塞出列take函数
- (5.4).出列poll函数
- (5.5).检索peek函数
This class provides skeletal(原始) implementations of some {@link Queue} operations. The implementations in this class are appropriate when the base implementation does not allow null elements. Methods {@link #add add}, {@link #remove remove}, and {@link #element element} are based on {@link #offer offer}, {@link#poll poll}, and {@link #peek peek}, respectively(分别的), but throw exceptions instead of indicating failure via false or null returns.
/*** Inserts the specified element into this queue if it is possible to do so* immediately without violating capacity restrictions, returning* <tt>true</tt> upon success and throwing an <tt>IllegalStateException</tt>* if no space is currently available.* (当为到达队列容器限制时,插入指定的元素应该马上返回true表示成功,* 如果已经到达队列容器上线,那么抛出IllegalStateException)* 在offer之上再加了一层判断而已* */public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
/*** Retrieves and removes the head of this queue(检索并删除队列头元素). * This method differs* from {@link #poll poll} only in that it throws an exception if this* queue is empty.(不同于poll方法在于当队列为空时,抛出异常)** <p>This implementation returns the result of <tt>poll</tt>* unless the queue is empty.** @return the head of this queue* @throws NoSuchElementException if this queue is empty*/public E remove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();}
/*** Retrieves, but does not remove, the head of this queue(检索队列头元素但是不会移除该元素). This method* differs from {@link #peek peek} only in that it throws an exception if* this queue is empty.(该方法不同于peek方法在于当队列为空的情况下抛出异常)** <p>This implementation returns the result of <tt>peek</tt>* unless the queue is empty.** @return the head of this queue* @throws NoSuchElementException if this queue is empty*/public E element() {E x = peek();if (x != null)return x;elsethrow new NoSuchElementException();}
/*** Removes all of the elements from this queue.* The queue will be empty after this call returns.** <p>This implementation repeatedly invokes {@link #poll poll} until it* returns <tt>null</tt>.*/public void clear() {while (poll() != null);}
public boolean addAll(Collection<? extends E> c) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();boolean modified = false;for (E e : c)if (add(e))modified = true;return modified;}
)和阻塞超时方法(Times out
/*** Inserts the specified element into this queue, waiting if necessary* for space to become available.(当队列容量可用时插入此元素。)** @param e the element to add* @throws InterruptedException if interrupted while waiting* @throws ClassCastException if the class of the specified element* prevents it from being added to this queue* @throws NullPointerException if the specified element is null* @throws IllegalArgumentException if some property of the specified* element prevents it from being added to this queue*/void put(E e) throws InterruptedException;
除了put方法,还增加了一个带有阻塞超时的offer方法(offer(E e, long timeout, TimeUnit unit)
/*** 在指定等待时间内将指定元素插入队列* Inserts the specified element into this queue, waiting up to the* specified wait time if necessary for space to become available.** @param e the element to add* @param timeout how long to wait before giving up, in units of* {@code unit} 多久时间以后放弃插入* @param unit a {@code TimeUnit} determining how to interpret the* {@code timeout} parameter 时间单位* @return {@code true} if successful, or {@code false} if* the specified waiting time elapses before space is available* @throws InterruptedException if interrupted while waiting* @throws ClassCastException if the class of the specified element* prevents it from being added to this queue* @throws NullPointerException if the specified element is null 插入元素为null,则报异常* @throws IllegalArgumentException if some property of the specified* element prevents it from being added to this queue*/boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;
/*** Retrieves and removes the head of this queue, waiting if necessary* until an element becomes available.** @return the head of this queue* @throws InterruptedException if interrupted while waiting*/E take() throws InterruptedException;
/*** Retrieves and removes the head of this queue, waiting up to the* specified wait time if necessary for an element to become available.** @param timeout how long to wait before giving up, in units of* {@code unit}* @param unit a {@code TimeUnit} determining how to interpret the* {@code timeout} parameter* @return the head of this queue, or {@code null} if the* specified waiting time elapses before an element is available* @throws InterruptedException if interrupted while waiting*/E poll(long timeout, TimeUnit unit)throws InterruptedException;
/*** Returns the number of additional elements that this queue can ideally* (in the absence of memory or resource constraints) accept without* blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic* limit.** <p>Note that you <em>cannot</em> always tell if an attempt to insert* an element will succeed by inspecting {@code remainingCapacity}* because it may be the case that another thread is about to* insert or remove an element.** @return the remaining capacity*/int remainingCapacity();
/*** Returns {@code true} if this queue contains the specified element.* More formally, returns {@code true} if and only if this queue contains* at least one element {@code e} such that {@code o.equals(e)}.** @param o object to be checked for containment in this queue* @return {@code true} if this queue contains the specified element* @throws ClassCastException if the class of the specified element* is incompatible with this queue* (<a href="../Collection.html#optional-restrictions">optional</a>)* @throws NullPointerException if the specified element is null* (<a href="../Collection.html#optional-restrictions">optional</a>)*/public boolean contains(Object o);
/*** Removes all available elements from this queue and adds them* to the given collection. This operation may be more* efficient than repeatedly polling this queue. A failure* encountered while attempting to add elements to* collection {@code c} may result in elements being in neither,* either or both collections when the associated exception is* thrown. Attempts to drain a queue to itself result in* {@code IllegalArgumentException}. Further, the behavior of* this operation is undefined if the specified collection is* modified while the operation is in progress.** @param c the collection to transfer elements into* @return the number of elements transferred* @throws UnsupportedOperationException if addition of elements* is not supported by the specified collection* @throws ClassCastException if the class of an element of this queue* prevents it from being added to the specified collection* @throws NullPointerException if the specified collection is null* @throws IllegalArgumentException if the specified collection is this* queue, or some property of an element of this queue prevents* it from being added to the specified collection*/int drainTo(Collection<? super E> c);
/* An optionally-bounded {@linkplain BlockingQueue blocking queue} based on* linked nodes.* This queue orders elements FIFO (first-in-first-out).此队列按FIFO(先进先出)的顺序* The <em>head</em> of the queue is that element that has been on the* queue the longest time.头元素肯定是队列中存在时间最久的* The <em>tail</em> of the queue is that element that has been on the* queue the shortest time. New elements* are inserted at the tail of the queue, and the queue retrieval* operations obtain elements at the head of the queue.检索操作时获取队列头元素* Linked queues typically have higher throughput than array-based queues but* less predictable performance in most concurrent applications.** <p>The optional capacity bound constructor argument serves as a* way to prevent excessive queue expansion. The capacity, if unspecified,* is equal to {@link Integer#MAX_VALUE}. Linked nodes are* dynamically created upon each insertion unless this would bring the* queue above capacity.** <p>This class and its iterator implement all of the* <em>optional</em> methods of the {@link Collection} and {@link* Iterator} interfaces.*/
/*** Linked list node class*/static class Node<E> {E item;/*** One of:* - the real successor(后续) Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node) 如果是null,表示没有后续节点,该节点为最后一个节点元素*/Node<E> next;Node(E x) {item = x;}}
/*** The capacity bound, or Integer.MAX_VALUE if none 队列容量*/private final int capacity;/*** Creates a {@code LinkedBlockingQueue} with a capacity of* {@link Integer#MAX_VALUE}. 初始化LinkedBlockingQueue对象*/public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}/*** Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.** @param capacity the capacity of this queue* @throws IllegalArgumentException if {@code capacity} is not greater* than zero* 设置capacity为Integer.MAX_VALUE*/public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}
/*** Current number of elements 当前队列元素数量*/private final AtomicInteger count = new AtomicInteger();
/*** Head of linked list.* Invariant: head.item == null*/transient Node<E> head;
/*** Lock held by put, offer, etc*/private final ReentrantLock putLock = new ReentrantLock();/*** Wait queue for waiting puts*/private final Condition notFull = putLock.newCondition();
/*** Lock held by take, poll, etc*/private final ReentrantLock takeLock = new ReentrantLock();/*** Wait queue for waiting takes*/private final Condition notEmpty = takeLock.newCondition();
signalNotEmpty根据名字就可以猜测是唤醒某个等待线程,not empty意味着队列不为空,如果队列不为空时,那么就可以做出列操作。那么这里的signalNotEmpty方法就是唤醒某个等待进行出列操作的线程。也就是某个线程调用了take或者poll方法。可能由于队列为空,导致线程阻塞休眠,而当队列不为空时,则调用该方法唤醒线程,进行出列操作。
/*** Signals a waiting take. Called only from put/offer (which do not* otherwise ordinarily lock takeLock.take操作信息将被唤醒,但是这个唤醒操作由put/offer两个操作来触发)*/private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}
如果你以及理解了signalNotEmpty方法的原理,那么signalNotFull就变得相当简单。signal是唤醒的意思,not full则是不处于饱和状态。该方法用于当队列不是满队列的情况时,唤醒等待入列的某个线程。put或者offer用于将元素插入队列。但是当队列满了的情况下,线程调用put或者offer将会被阻塞休眠,直到队列不处于满状态,将元素入列。由此可见,什么时候队列会从满队列变为不满状态,那肯定是有出列操作(take或者poll)时,才会将满队列变得空闲。那么显而易见,signalNotFull则是在进行出列操作时进行调用,以此唤醒入列线程。
/*** Signals a waiting put. Called only from take/poll.唤醒put操作,这个操作由take/poll进行触发*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}
/*** Locks to prevent both puts and takes(同时锁住put和takes操作).*/void fullyLock() {putLock.lock();takeLock.lock();}
/*** Unlocks to allow both puts and takes.(解锁允许put和take操作)*/void fullyUnlock() {takeLock.unlock();putLock.unlock();}
LinkedBlockingQueue源码提供了三个默认的构造函数,LinkedBlockingQueue()和LinkedBlockingQueue(int capacity)以及LinkedBlockingQueue(Collection<? extends E> c)三个构造函数,使用不带参数的构造函数时,默认的队列容量为Integer.MAX_VALUE(2147483647),当然在构建LinkedBlockingQueue时我们也可以自定义队列容量。当初始化队列容量的同时,也分别给head节点和last节点初始化值。
第三个构造参数可以指定集合加入队列LinkedBlockingQueue(Collection<? extends E> c),默认调用有参构造函数初始化队列容量,使用putLock.lock加锁,循环集合插入队列,并记录当前队列有效节点数量,操作完成后 putLock.unlock解锁以便于后续操作。
/*** Creates a {@code LinkedBlockingQueue} with a capacity of* {@link Integer#MAX_VALUE}, initially containing the elements of the* given collection,* added in traversal order of the collection's iterator.** @param c the collection of elements to initially contain* @throws NullPointerException if the specified collection or any* of its elements are null*/public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);//调用有参构造函数初始化队列final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibility(此处不会出现竞争关系,但是加锁也是必要的)try {int n = 0;for (E e : c) { //循环集合if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e)); //初始化一个node并插入队列++n;}count.set(n);//记录当前队列节点数量} finally {putLock.unlock();}}
last = last.next = node; 节点入列操作
this.last = node;
通过分析我们已经将last = last.next = node; 分成两步完成整个入列操作,但是有一个疑问在于,我们只看到对last的处理,对head的处理并未在enqueue函数中有所体现,而且出列时是从head节点进行操作的。让我们再次回到构造函数查看源码:
你会发现,在初始化LinkedBlockingQueue时,初始化化了一个数据域为null的节点,并且该节点同时指向last和head,也就是说在初始化完成LinkedBlockingQueue时,last==head是成立的。那么在第一次调用enqueue函数了,last = last.next = node;,就变成了:
head.next=node; 头节点指针域指向node
介绍中我们知道head并不存储数据,它的下一个节点才是我们正真使用的节点。出队操作时,先得到头节点(head)的下一个节点first节点,将当前头节点的next指针域指向自己,代码中说是help gc,大概意思就是帮助头节点更好的被回收。然后将first作为头节点head,并将head节点的数据域(元素数据)拿出,然后将head数据域置为null并将刚刚拿出的元素数据返回。
/*** Returns the number of additional elements that this queue can ideally* (in the absence of memory or resource constraints) accept without* blocking. This is always equal to the initial capacity of this queue* less the current {@code size} of this queue.** <p>Note that you <em>cannot</em> always tell if an attempt to insert* an element will succeed by inspecting {@code remainingCapacity}* because it may be the case that another thread is about to* insert or remove an element.*//*** 返回队列剩余容量** @return*/public int remainingCapacity() {return capacity - count.get();}
/*** Inserts the specified element at the tail of this queue, waiting if* necessary for space to become available.等待队列由可用量时将元素插入队列** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();/**获取锁,如果有多个线程同时尝试使用lockInterruptibly获取锁,没有获取锁的线程,可用使用interrupt终止获取锁等待**/try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/while (count.get() == capacity) {notFull.await();//当前队列已经到达最大容量,notFull睡眠,此时,不允许进行插入操作,等到take或者poll操作时,将其唤醒(signalNotFull方法)}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();//唤醒take}
Node<E> node = new Node<E>(e);
3.如果获取到入队锁,那么判断队列是否是满队列,当前队列节点数量是否等于队列最大容量count.get() == capacity,如果是满队列,那么不满足插入条件,线程将进入休眠状态,等待有出列操作时(take,poll)调用notFull.signal唤醒线程。
/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/while (count.get() == capacity) {notFull.await();//当前队列已经到达最大容量,notFull睡眠,此时,不允许进行插入操作,等到take或者poll操作时,将其唤醒(signalNotFull方法)}
enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();
if (c == 0)
offer函数在LinkedBlockingQueue中有两个具体的实现,一个是带有阻塞超时效果的offer(E e, long timeout, TimeUnit unit) 另一个是带有阻塞效果的offer(E e)。
(1).offer(E e) 的实现于put函数逻辑大体一致,只是在构建Node对象之前,优先判断队列是否已满,如果已满则直接返回false表示插入失败。不同于put函数,offer判断是否满队列的逻辑在构建Node节点之前,因此当满队列时,offer不会出现线程阻塞效果,而是直接返回false,而put函数则会一直等待直到队列空闲,将节点插入队列。
/*** Inserts the specified element at the tail of this queue if it is* possible to do so immediately without exceeding the queue's capacity,* returning {@code true} upon success and {@code false} if this queue* is full.* When using a capacity-restricted queue, this method is generally* preferable to method {@link BlockingQueue#add add}, which can fail to* insert an element only by throwing an exception.** @throws NullPointerException if the specified element is null*/public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity) return false;//满队列直接返回失败int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}
(2).offer(E e, long timeout, TimeUnit unit) 可以指定在线程等待插入时间,如果超过指定时间,则返回false表示插入失败。
unit.toNanos(timeout) 会将指定超时时间转化为毫秒处理,count.get() == capacity如果成立,则表示当前队列已经处于满队列的状态,则线程将调用awaitNanos方法进入睡眠状态。awaitNanos方法在await方法的基础上,增加了超时跳出的机制,如果睡眠时间超过nanos 毫秒,则自动唤醒睡眠线程,此时返回的nanos 值为小于等于0。唤醒线程再次判断当前队列是否为满队,如果count.get() == capacity依然成立,则返回false。如果不成立则跳出while循环进行插入操作。另一种情况则是线程由take或者poll 函数调用 notFull.signal(); 唤醒,这种被动唤醒的方式,notFull.awaitNanos(nanos) 返回的值肯定大于等于0,由于调用了take或者poll 函数,进行了出列操作,则count.get() == capacity 并不成立,则线程将跳出循环,进行插入操作。
/*** Inserts the specified element at the tail of this queue, waiting if* necessary up to the specified wait time for space to become available.** @return {@code true} if successful, or {@code false} if* the specified waiting time elapses before space is available* @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}
如果你掌握刚刚讲诉的put函数和offer函数的实现逻辑,那么take和poll函数的底层实现就变得简单明了。take函数实现逻辑则是先获取入队锁,如果获取失败则阻塞,获取成功则判断队列是否为空队列,如果count.get() == 0成立,则表示当前队列为空队列,则线程调用notEmpty.await() 进入休眠状态,直到其他线程调用put或者poll函数,将新的节点插入队列后,调用notEmpty.signal方法唤醒该线程,告知该线程当前队列不为空队列,可以进行出列操作。计数器count将未自减的值赋值给变量c,当前c == capacity成立时,则表示在未出列时,队列处于满列状态,可能存在请求入列操作的休眠线程,当出列完成后,队列处于未满状态,则通过调用signalNotFull方法唤醒休眠的入列线程。
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity) //c的值是count未自减的值,如果未自减是时满队列,则自减后处于非满状态,则应该唤醒休眠的入列线程。signalNotFull();return x;}
public E poll() {final AtomicInteger count = this.count;if (count.get() == 0) //判断是否为空队列,是则直接返回nullreturn null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}
public E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {Node<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}}