目录
- 引言
- 一、Queue 的继承关系
- 1.1 Queue 定义基础操作
- 1.2 AbstractQueue 为子类减负
- 1.3 BlockingQueue 阻塞式Queue
- 1.4 Deque 两头进出
- 二、Queue 的重要实现
- 三、BlockingQueue 的实现原理
- 四、Queue 在生产者消费者模式中的应用
- 五、Queue 在线程池中的应用
- 六、ConcurrentLinkedQueue
- 总结
引言
Queue 是Collection 接口下的另一个重要接口。
常常作为生产者/消费者模式、线程池任务队列等基础组件的形式存在。
JUC中提供了丰富的Queue组件,如 ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue、SynchronousQueue 等等。
学习 Queue 家族对于理解生产者消费者模式、并发、线程池等至关重要。
本篇博客总结 Queue 家族的接口、类,和一些重要的实现,及其部分底层逻辑。
一、Queue 的继承关系
Queue 接口下面几个重要的抽象有 AbstractQueue、BlockingQueue、Deque:
1.1 Queue 定义基础操作
Queue 接口定义了 6 个基本方法:
失败抛异常 | 返回特殊值 | |
---|---|---|
添加 | add() | offer() |
移除 | remove() | poll() |
查看 | element() | peek() |
1.2 AbstractQueue 为子类减负
AbstractQueue 实现了 Queue 中的 add()、remove()、element(),为它们添加了失败时抛异常的逻辑:
同时它还实现了 Collection 接口中的一些公共方法,如 clear()、addAll() 等。
大多数常用 Queue 都实现了这个抽象类。
它的作用是为子类填补常规集合所需要的添加、删除操作的逻辑,让子类更加专注于某种特性的实现,为子类减负。 由于这些方法是集合通用API,因此提升到抽象类中来实现,
在实际使用子类Queue的时候,这些方法往往不常使用。
1.3 BlockingQueue 阻塞式Queue
BlockingQueue 毫不意外的继承了 Queue 接口。
并在 Queue 的基础之上,扩展了两个新的阻塞式方法:
void put(E e);E take();
又是一对添加和取出的组合方法,这两个方法要求子类必须以阻塞的方式放入和取出元素,所谓阻塞式行为就是由于队列中的容量限制等因素,导致在队列已满或为空时,无法再继续添加或取出的时候,线程必须等待不返回,直到条件满足完成操作。
举个例子:
去饭店吃午饭,但是厨师没有做好的饭菜(队列为空),你可以选择离开去别的饭店(非阻塞直接返回),也可以选择留下来,等待厨师把饭菜做好,吃完再走(阻塞直到成功)。
在 Queue 中,offer() 和 poll() 都是非阻塞式的方法,put() 和 take() 是阻塞式的方法,我们也可以理解为 put() 就是阻塞式的 offer() ,take() 就是阻塞式的 poll()。
1.4 Deque 两头进出
经典的队列结构是 FIFO,即先进先出,一般都是“尾进头出”。在一些常规的 Queue 组件的源码中也经常看到会 setTail 这样的操作。
为了满足更丰富的应用场景,Queue 家族引入了 Deque 抽象。
Deque 意为 “双端队列”,可以在一端进也可以在这端出:
值得一提的是,对于经典数据结构中的 Stack 栈结构,Deque 也提供了相应的方法:
void push(E e);E pop();
二、Queue 的重要实现
Queue 的重要实现有以下这些:
特点 | 实现 | 描述 |
---|---|---|
面向并发 | ConcurrentLinkedQueue | 基于CAS的并发线程安全的队列容器 |
阻塞式 | ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue | LinkedBQ 虽然是用链表实现的,但依然是有界队列,最大值为 Integer.maxValue,DelayQueue 可以在任务提交后延迟执行 |
双端队列 | LinkedList | 从名字完全看不出是队列的双端队列 |
交换队列 | TransferQueue(阻塞式) | TransferQueue是一个接口,但它只有一个实现:LinkedTransferQueue |
同步队列 | SynchronousQueue(阻塞式) | 不存储元素的队列,手递手传递元素 |
这些队列往往都会继承 AbstractQueue,同时又兼具其他特性,如 阻塞、延迟 等等。
三、BlockingQueue 的实现原理
凡是支持阻塞操作的队列,都继承自 BlockingQueue,是线程安全的队列容器。
在 ArrayBlockingQueue 和 LinkedBlockingQueue 中,都用到了显式锁 Lock 和条件队列 Condition。
在《Java 并发编程实战》一书中的 “14.3 显式的 Condition 对象” 中也举例了类似的有界缓存代码。
Condition 的原理是将基于不同的前提条件的依赖性操作区分在多个条件等待队列中,基于不同条件等待和唤醒。
以 ArrayBlockingQueue 为例,当执行 put 或 take 的时候,首先需要拿到主锁 lock。
紧接着就是校验前置条件:count == items.length 或 count == 0,如果条件不满足,就执行相应 Condition.await。
如果条件满足,那么执行入队或出队方法——enqueue(E) 或 dequeue:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 监控所有访问的主锁 */final ReentrantLock lock;/** 等待 take 操作的 Condition */private final Condition notEmpty;/** 等待 put 操作的 Condition */private final Condition notFull;/*** Inserts the specified element at the tail of this queue, waiting* for space to become available if the queue is full.* 插入一个指定元素到队尾,如果队列满,则等待可用空间 */public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}/*** 从队首提取一个元素,如果队列空,则等待队列放入元素*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}/*** Inserts element at current put position, advances, and signals.* 插入一个元素到当前的 putIndex,然后指针进一,执行唤醒* Call only when holding lock.* 仅当持锁才可调用*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}/*** Extracts element at current take position, advances, and signals.* 从当前 takeIndex 位置提取一个元素,指针进一,执行唤醒* Call only when holding lock.* 仅当持锁才可调用*/private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}
}
四、Queue 在生产者消费者模式中的应用
在没有阻塞队列的情况下,如果想要实现生产者-消费者,必须自行处理生产者的状态切换和消费者的状态切换。
而 BlockingQueue 的实现中已经集成了 Condition 和 Lock 等一系列生产者-消费者模式所必须的关键元素,让代码变得异常简洁。
不需要考虑生产者或消费者的线程状态,只要调用了阻塞队列的 put 和 take,BlockingQueue 会自动为我们维护线程状态的切换。
以下代码是使用 ArrayBlockingQueue(5) 实现的生产者消费者模式。
producer 线程负责生产任务,consumer 线程负责消费任务,队列中只能容纳 5 个元素。stateMonitor 线程用于监控任务队列的满、空情况,并打印 producer 和 consumer 线程的状态。
通过调整 producer 和 consumer 线程的工作快慢程度,可以很清晰的感受到 BlockingQueue 的强大!
提示:由于程序中为了模拟生产或消费动作需要一定时间,使用了 sleep ,“动作更快”的线程会打印出 TIMED_WAITING ,它并不是 BlockingQueue 处理成的线程状态,直接忽略即可。
/*** 使用 BlockingQueue 实现生产者消费者模式** @author 圣斗士Morty* @data 2021/6/27 13:29*/
public class T08_ProducerConsumerMode {/* 调换producer和consumer线程的快慢程度,观察执行结果*/static int slow = 5, fast = 2;public static void main(String[] args) {// 可容纳 5 个任务的有界阻塞队列BlockingQueue<String> taskQueue = new ArrayBlockingQueue<>(5);// 生产者线程Thread producer = new Thread(() -> {for (int i = 0; ; i++) {try {TimeUnit.SECONDS.sleep(fast);System.out.println("正在生产'任务" + i + "'...");taskQueue.put("任务" + i);} catch (InterruptedException e) {e.printStackTrace();}}});// 消费者线程Thread concumer = new Thread(() -> {for (; ; ) {try {TimeUnit.SECONDS.sleep(slow);String task = taskQueue.take();System.out.println("正在消费'" + task + "'...");} catch (InterruptedException e) {e.printStackTrace();}}});// 状态监控线程Thread stateMonitor = new Thread(() -> {for (; ; ) {try {TimeUnit.SECONDS.sleep(1);if (taskQueue.isEmpty())System.out.println("消费者状态:" + concumer.getState());if (taskQueue.size() == 5)System.out.println("生产者状态:" + producer.getState());} catch (InterruptedException e) {e.printStackTrace();}}});stateMonitor.start();producer.start();concumer.start();}
}
五、Queue 在线程池中的应用
这里简单带一下队列在线程池中的应用。
juc 中提供了四种常用的线程池实现:
Executors.newCachedThreadPool();
Executors.newFixedThreadPool(10);
Executors.newSingleThreadExecutor();
Executors.newScheduledThreadPool(10);
Executors 是类似Collections的工具方法,以上四个方法返回一个 ExecutorService 对象,实际上是它的一个子类 ThreadPoolExecutor 和 ScheduledExecutorService。
以 cachedThreadPool 和 fixedThreadPool 为例:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
它们直接返回固定构造结果的 ThreadPoolExecutor,构造器如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
ThreadPoolExecutor 的构造器如上所示,它有 7 个参数:
1、corePoolSize :核心线程数,随线程池永远存活的线程数量
2、maximumPoolSize:最大线程数
3、keepAliveTime:空闲时存活时间
4、TimeUnit :存活时间单位,3、4两个参数共同决定非核心线程可以空闲多久回收给操作系统
5、workQueue:一个 BlockingQueue类型的任务队列。
6、threadFactory:线程工厂,定义线程的创建方式
7、RejectedExecutionHandler:拒绝策略,当线程池已满时应该执行怎样的拒绝策略
cachedThreadPool 使用 SynchronousQueue 作为任务队列,SynchronousQueue是手递手式的队列,即放入的元素都必须直接交给消费者。
通过这样一个队列,任何任务提交到 cachedThreadPool 之后,都会立即被消费,如果任务足够多,可能会打到 Integer.MAX_VALUE 的级别,所以在实际开发中,一定要清楚的知道业务中的任务规模才能很好的使用。
fixedThreadPool 使用 LinkedBlockingQueue 作为任务队列的实现,这个线程池能够维持一个固定任务数的线程池。但是为什么不使用 ArrayBlockingQueue 呢?
六、ConcurrentLinkedQueue
ConcurrentLinkedQueue 是针对并发场景下提供的一种线程安全的队列容器,继承 AbstractQueue,同时实现了 Queue。
线程安全的非阻塞队列,没有支持阻塞的 put 和 take 方法。
内部维护了 Node 链表结构。
offer()、poll()等方法都采用了CAS的方式,保证了线程安全性,同时也兼顾了性能:
public boolean offer(E e) {// 非空校验checkNotNull(e);final Node<E> newNode = new Node<E>(e);// 取得 tail 节点,同时令 p 指向它,然后循环CASfor (Node<E> t = tail, p = t;;) {// 取得 tail 的next节点Node<E> q = p.next;// 拿到 tail 的瞬间可能会有其他线程设置了新的 tail,需要判断 tail 后面是否为 nullif (q == null) {// 如果 q 为 null,说明 p 确实是 tail,此时将新的节点 CAS 到 tail 的后面if (p.casNext(null, newNode)) {// p 指向新的节点if (p != t) // CAS 设置新的 tailcasTail(t, newNode);return true;}// Lost CAS race to another thread; re-read next}else if (p == q)// We have fallen off list. If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable. Else the new tail is a better bet.p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops.p = (p != t && t != (t = tail)) ? t : q;}}
public E poll() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;if (item != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}else if ((q = p.next) == null) {updateHead(h, p);return null;}else if (p == q)continue restartFromHead;elsep = q;}}}
总结
了解 Queue 的类图结构非常关键,重点记忆 BlockingQueue 的结构和扩展的两个方法 put() 、take(),这是阻塞式API的关键:
失败抛异常 | 返回特殊值 | 阻塞 | |
---|---|---|---|
添加 | add() | offer() | put() |
取出 | remove() | poll() | take() |
查看 | element() | peek() |
以上表格是Queue api中的重点,需要熟记。
常见的实现队列:
特点 | 实现 | 描述 |
---|---|---|
面向并发 | ConcurrentLinkedQueue | 基于CAS的并发线程安全的队列容器 |
阻塞式 | ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue | LinkedBQ 虽然是用链表实现的,但依然是有界队列,最大值为 Integer.maxValue,DelayQueue 可以在任务提交后延迟执行 |
双端队列 | LinkedList | 从名字完全看不出是队列的双端队列 |
交换队列 | TransferQueue(阻塞式) | TransferQueue是一个接口,但它只有一个实现:LinkedTransferQueue |
同步队列 | SynchronousQueue(阻塞式) | 不存储元素的队列,手递手传递元素 |
常见的四种线程池:
线程池 | 任务队列 |
---|---|
Executors.newCachedThreadPool(); | SynchronousQueue |
Executors.newFixedThreadPool(10); | LinkedBlockingQueue |
Executors.newSingleThreadExecutor(); | LinkedBlockingQueue |
Executors.newScheduledThreadPool(10); | DelayedWorkQueue |
ConcurrentLinkedQueue 是非阻塞并发队列,它的 CAS操作,offer 为例:
public boolean offer(E e) {// 非空校验checkNotNull(e);final Node<E> newNode = new Node<E>(e);// 取得 tail 节点,同时令 p 指向它,然后循环CASfor (Node<E> t = tail, p = t;;) {// 取得 tail 的next节点Node<E> q = p.next;// 拿到 tail 的瞬间可能会有其他线程设置了新的 tail,需要判断 tail 后面是否为 nullif (q == null) {// 如果 q 为 null,说明 p 确实是 tail,此时将新的节点 CAS 到 tail 的后面if (p.casNext(null, newNode)) {// p 指向新的节点if (p != t) // CAS 设置新的 tailcasTail(t, newNode);return true;}// Lost CAS race to another thread; re-read next}// ... 其他逻辑}}