原文地址:http://benjaminwhx.com/2018/05/11/%E3%80%90%E7%BB%86%E8%B0%88Java%E5%B9%B6%E5%8F%91%E3%80%91%E8%B0%88%E8%B0%88LinkedBlockingQueue/
在集合框架里,想必大家都用过ArrayList和LinkedList,也经常在面试中问到他们之间的区别。ArrayList和ArrayBlockingQueue一样,内部基于数组来存放元素,而LinkedBlockingQueue则和LinkedList一样,内部基于链表来存放元素。
LinkedBlockingQueue实现了BlockingQueue接口,这里放一张类的继承关系图(图片来自之前的文章:说说队列Queue)
LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默认为Integer.MAX_VALUE
,也就是无界队列。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
2、源码分析
2.1 属性
/*** 节点类,用于存储数据*/ static class Node<E> {E item;Node<E> next;Node(E x) { item = x; } }/** 阻塞队列的大小,默认为Integer.MAX_VALUE */ private final int capacity;/** 当前阻塞队列中的元素个数 */ private final AtomicInteger count = new AtomicInteger();/*** 阻塞队列的头结点*/ transient Node<E> head;/*** 阻塞队列的尾节点*/ private transient Node<E> last;/** 获取并移除元素时使用的锁,如take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */ private final Condition notEmpty = takeLock.newCondition();/** 添加元素时使用的锁如 put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */ private final Condition notFull = putLock.newCondition();
从上面的属性我们知道,每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。
这里如果不指定队列的容量大小,也就是使用默认的Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。
另外,LinkedBlockingQueue对每一个lock锁都提供了一个Condition用来挂起和唤醒其他线程。
构造函数
public LinkedBlockingQueue() {// 默认大小为Integer.MAX_VALUEthis(Integer.MAX_VALUE); }public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null); }public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock();try {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();} }
默认的构造函数和最后一个构造函数创建的队列大小都为Integer.MAX_VALUE,只有第二个构造函数用户可以指定队列的大小。第二个构造函数最后初始化了last和head节点,让它们都指向了一个元素为null的节点。
方法
同样,LinkedBlockingQueue也有着和ArrayBlockingQueue一样的方法,我们先来看看入队列的方法。
2.3.1、入队方法
LinkedBlockingQueue提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:
- void put(E e);
- boolean offer(E e);
- boolean offer(E e, long timeout, TimeUnit unit)。
put(E e)
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 获取锁中断 putLock.lockInterruptibly();try {//判断队列是否已满,如果已满阻塞等待while (count.get() == capacity) {notFull.await();}// 把node放入队列中 enqueue(node);c = count.getAndIncrement();// 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// 如果队列中有一条数据,唤醒消费线程进行消费if (c == 0)signalNotEmpty(); }
小结put方法来看,它总共做了以下情况的考虑:
- 队列已满,阻塞等待。
- 队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没有元素,放完以后要唤醒消费线程进行消费。
offer(E e)
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {// 队列有可用空间,放入node节点,判断放入元素后是否还有可用空间,// 如果有,唤醒下一个添加线程进行添加操作。if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0; }
可以看到offer仅仅对put方法改动了一点点,当队列没有可用元素的时候,不同于put方法的阻塞等待,offer方法直接方法false。
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {// 等待超时时间nanos,超时时间到了返回falsewhile (count.get() == capacity) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true; }
该方法只是对offer方法进行了阻塞超时处理,使用了Condition的awaitNanos来进行超时等待,这里为什么要用while循环?因为awaitNanos方法是可中断的,为了防止在等待过程中线程被中断,这里使用while循环进行等待过程中中断的处理,继续等待剩下需等待的时间。