Queue —— JUC 的豪华队列组件

目录

  • 引言
  • 一、Queue 的继承关系
    • 1.1 Queue 定义基础操作
    • 1.2 AbstractQueue 为子类减负
    • 1.3 BlockingQueue 阻塞式Queue
    • 1.4 Deque 两头进出
  • 二、Queue 的重要实现
  • 三、BlockingQueue 的实现原理
  • 四、Queue 在生产者消费者模式中的应用
  • 五、Queue 在线程池中的应用
  • 六、ConcurrentLinkedQueue
  • 总结

引言

Queue 是Collection 接口下的另一个重要接口。
常常作为生产者/消费者模式线程池任务队列等基础组件的形式存在。
JUC中提供了丰富的Queue组件,如 ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue、SynchronousQueue 等等。
学习 Queue 家族对于理解生产者消费者模式、并发、线程池等至关重要。
本篇博客总结 Queue 家族的接口、类,和一些重要的实现,及其部分底层逻辑。

一、Queue 的继承关系

Queue 接口下面几个重要的抽象有 AbstractQueue、BlockingQueue、Deque:
在这里插入图片描述

1.1 Queue 定义基础操作

Queue 接口定义了 6 个基本方法:

失败抛异常返回特殊值
添加add()offer()
移除remove()poll()
查看element()peek()

1.2 AbstractQueue 为子类减负

AbstractQueue 实现了 Queue 中的 add()、remove()、element(),为它们添加了失败时抛异常的逻辑:
在这里插入图片描述
同时它还实现了 Collection 接口中的一些公共方法,如 clear()、addAll() 等。
大多数常用 Queue 都实现了这个抽象类。
它的作用是为子类填补常规集合所需要的添加、删除操作的逻辑,让子类更加专注于某种特性的实现,为子类减负。 由于这些方法是集合通用API,因此提升到抽象类中来实现,
在实际使用子类Queue的时候,这些方法往往不常使用。

1.3 BlockingQueue 阻塞式Queue

BlockingQueue 毫不意外的继承了 Queue 接口。
并在 Queue 的基础之上,扩展了两个新的阻塞式方法:

void put(E e);E take();

又是一对添加和取出的组合方法,这两个方法要求子类必须以阻塞的方式放入和取出元素,所谓阻塞式行为就是由于队列中的容量限制等因素,导致在队列已满或为空时,无法再继续添加或取出的时候,线程必须等待不返回,直到条件满足完成操作。

举个例子
去饭店吃午饭,但是厨师没有做好的饭菜(队列为空),你可以选择离开去别的饭店(非阻塞直接返回),也可以选择留下来,等待厨师把饭菜做好,吃完再走(阻塞直到成功)。

在 Queue 中,offer() 和 poll() 都是非阻塞式的方法,put() 和 take() 是阻塞式的方法,我们也可以理解为 put() 就是阻塞式的 offer() ,take() 就是阻塞式的 poll()。

1.4 Deque 两头进出

经典的队列结构是 FIFO,即先进先出,一般都是“尾进头出”。在一些常规的 Queue 组件的源码中也经常看到会 setTail 这样的操作。
为了满足更丰富的应用场景,Queue 家族引入了 Deque 抽象。
Deque 意为 “双端队列”,可以在一端进也可以在这端出:
在这里插入图片描述
值得一提的是,对于经典数据结构中的 Stack 栈结构,Deque 也提供了相应的方法:

void push(E e);E pop();

二、Queue 的重要实现

Queue 的重要实现有以下这些:

特点实现描述
面向并发ConcurrentLinkedQueue基于CAS的并发线程安全的队列容器
阻塞式ArrayBlockingQueue、LinkedBlockingQueue、DelayQueueLinkedBQ 虽然是用链表实现的,但依然是有界队列,最大值为 Integer.maxValue,DelayQueue 可以在任务提交后延迟执行
双端队列LinkedList从名字完全看不出是队列的双端队列
交换队列TransferQueue(阻塞式)TransferQueue是一个接口,但它只有一个实现:LinkedTransferQueue
同步队列SynchronousQueue(阻塞式)不存储元素的队列,手递手传递元素

这些队列往往都会继承 AbstractQueue,同时又兼具其他特性,如 阻塞、延迟 等等。

三、BlockingQueue 的实现原理

凡是支持阻塞操作的队列,都继承自 BlockingQueue,是线程安全的队列容器。
在 ArrayBlockingQueue 和 LinkedBlockingQueue 中,都用到了显式锁 Lock 和条件队列 Condition。
在《Java 并发编程实战》一书中的 “14.3 显式的 Condition 对象” 中也举例了类似的有界缓存代码。
Condition 的原理是将基于不同的前提条件的依赖性操作区分在多个条件等待队列中,基于不同条件等待和唤醒

以 ArrayBlockingQueue 为例,当执行 put 或 take 的时候,首先需要拿到主锁 lock。
紧接着就是校验前置条件:count == items.length 或 count == 0,如果条件不满足,就执行相应 Condition.await。
如果条件满足,那么执行入队或出队方法——enqueue(E) 或 dequeue:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 监控所有访问的主锁 */final ReentrantLock lock;/** 等待 take 操作的 Condition */private final Condition notEmpty;/** 等待 put 操作的 Condition */private final Condition notFull;/*** Inserts the specified element at the tail of this queue, waiting* for space to become available if the queue is full.* 插入一个指定元素到队尾,如果队列满,则等待可用空间 */public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}/*** 从队首提取一个元素,如果队列空,则等待队列放入元素*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}/*** Inserts element at current put position, advances, and signals.* 插入一个元素到当前的 putIndex,然后指针进一,执行唤醒* Call only when holding lock.* 仅当持锁才可调用*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}/*** Extracts element at current take position, advances, and signals.* 从当前 takeIndex 位置提取一个元素,指针进一,执行唤醒* Call only when holding lock.* 仅当持锁才可调用*/private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}
}

四、Queue 在生产者消费者模式中的应用

在没有阻塞队列的情况下,如果想要实现生产者-消费者,必须自行处理生产者的状态切换和消费者的状态切换。

而 BlockingQueue 的实现中已经集成了 Condition 和 Lock 等一系列生产者-消费者模式所必须的关键元素,让代码变得异常简洁。
不需要考虑生产者或消费者的线程状态,只要调用了阻塞队列的 put 和 take,BlockingQueue 会自动为我们维护线程状态的切换

以下代码是使用 ArrayBlockingQueue(5) 实现的生产者消费者模式。

producer 线程负责生产任务,consumer 线程负责消费任务,队列中只能容纳 5 个元素。stateMonitor 线程用于监控任务队列的满、空情况,并打印 producer 和 consumer 线程的状态。

通过调整 producer 和 consumer 线程的工作快慢程度,可以很清晰的感受到 BlockingQueue 的强大!

提示:由于程序中为了模拟生产或消费动作需要一定时间,使用了 sleep ,“动作更快”的线程会打印出 TIMED_WAITING ,它并不是 BlockingQueue 处理成的线程状态,直接忽略即可。

/*** 使用 BlockingQueue 实现生产者消费者模式** @author 圣斗士Morty* @data 2021/6/27 13:29*/
public class T08_ProducerConsumerMode {/* 调换producer和consumer线程的快慢程度,观察执行结果*/static int slow = 5, fast = 2;public static void main(String[] args) {// 可容纳 5 个任务的有界阻塞队列BlockingQueue<String> taskQueue = new ArrayBlockingQueue<>(5);// 生产者线程Thread producer = new Thread(() -> {for (int i = 0; ; i++) {try {TimeUnit.SECONDS.sleep(fast);System.out.println("正在生产'任务" + i + "'...");taskQueue.put("任务" + i);} catch (InterruptedException e) {e.printStackTrace();}}});// 消费者线程Thread concumer = new Thread(() -> {for (; ; ) {try {TimeUnit.SECONDS.sleep(slow);String task = taskQueue.take();System.out.println("正在消费'" + task + "'...");} catch (InterruptedException e) {e.printStackTrace();}}});// 状态监控线程Thread stateMonitor = new Thread(() -> {for (; ; ) {try {TimeUnit.SECONDS.sleep(1);if (taskQueue.isEmpty())System.out.println("消费者状态:" + concumer.getState());if (taskQueue.size() == 5)System.out.println("生产者状态:" + producer.getState());} catch (InterruptedException e) {e.printStackTrace();}}});stateMonitor.start();producer.start();concumer.start();}
}

五、Queue 在线程池中的应用

这里简单带一下队列在线程池中的应用。
juc 中提供了四种常用的线程池实现:

Executors.newCachedThreadPool();
Executors.newFixedThreadPool(10);
Executors.newSingleThreadExecutor();
Executors.newScheduledThreadPool(10);

Executors 是类似Collections的工具方法,以上四个方法返回一个 ExecutorService 对象,实际上是它的一个子类 ThreadPoolExecutor 和 ScheduledExecutorService。
以 cachedThreadPool 和 fixedThreadPool 为例:

    public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
    public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

它们直接返回固定构造结果的 ThreadPoolExecutor,构造器如下:

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}

ThreadPoolExecutor 的构造器如上所示,它有 7 个参数:

1、corePoolSize :核心线程数,随线程池永远存活的线程数量
2、maximumPoolSize:最大线程数
3、keepAliveTime:空闲时存活时间
4、TimeUnit :存活时间单位,3、4两个参数共同决定非核心线程可以空闲多久回收给操作系统
5、workQueue:一个 BlockingQueue类型的任务队列。
6、threadFactory:线程工厂,定义线程的创建方式
7、RejectedExecutionHandler:拒绝策略,当线程池已满时应该执行怎样的拒绝策略

cachedThreadPool 使用 SynchronousQueue 作为任务队列,SynchronousQueue是手递手式的队列,即放入的元素都必须直接交给消费者。
通过这样一个队列,任何任务提交到 cachedThreadPool 之后,都会立即被消费,如果任务足够多,可能会打到 Integer.MAX_VALUE 的级别,所以在实际开发中,一定要清楚的知道业务中的任务规模才能很好的使用。
fixedThreadPool 使用 LinkedBlockingQueue 作为任务队列的实现,这个线程池能够维持一个固定任务数的线程池。但是为什么不使用 ArrayBlockingQueue 呢?

六、ConcurrentLinkedQueue

ConcurrentLinkedQueue 是针对并发场景下提供的一种线程安全的队列容器,继承 AbstractQueue,同时实现了 Queue。
线程安全的非阻塞队列,没有支持阻塞的 put 和 take 方法。
内部维护了 Node 链表结构。
offer()、poll()等方法都采用了CAS的方式,保证了线程安全性,同时也兼顾了性能:

    public boolean offer(E e) {// 非空校验checkNotNull(e);final Node<E> newNode = new Node<E>(e);// 取得 tail 节点,同时令 p 指向它,然后循环CASfor (Node<E> t = tail, p = t;;) {// 取得 tail 的next节点Node<E> q = p.next;// 拿到 tail 的瞬间可能会有其他线程设置了新的 tail,需要判断 tail 后面是否为 nullif (q == null) {// 如果 q 为 null,说明 p 确实是 tail,此时将新的节点 CAS 到 tail 的后面if (p.casNext(null, newNode)) {// p 指向新的节点if (p != t) // CAS 设置新的 tailcasTail(t, newNode);return true;}// Lost CAS race to another thread; re-read next}else if (p == q)// We have fallen off list.  If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable.  Else the new tail is a better bet.p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops.p = (p != t && t != (t = tail)) ? t : q;}}
    public E poll() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;if (item != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}else if ((q = p.next) == null) {updateHead(h, p);return null;}else if (p == q)continue restartFromHead;elsep = q;}}}

总结

了解 Queue 的类图结构非常关键,重点记忆 BlockingQueue 的结构和扩展的两个方法 put() 、take(),这是阻塞式API的关键:

失败抛异常返回特殊值阻塞
添加add()offer()put()
取出remove()poll()take()
查看element()peek()

以上表格是Queue api中的重点,需要熟记。
常见的实现队列:

特点实现描述
面向并发ConcurrentLinkedQueue基于CAS的并发线程安全的队列容器
阻塞式ArrayBlockingQueue、LinkedBlockingQueue、DelayQueueLinkedBQ 虽然是用链表实现的,但依然是有界队列,最大值为 Integer.maxValue,DelayQueue 可以在任务提交后延迟执行
双端队列LinkedList从名字完全看不出是队列的双端队列
交换队列TransferQueue(阻塞式)TransferQueue是一个接口,但它只有一个实现:LinkedTransferQueue
同步队列SynchronousQueue(阻塞式)不存储元素的队列,手递手传递元素

常见的四种线程池:

线程池任务队列
Executors.newCachedThreadPool();SynchronousQueue
Executors.newFixedThreadPool(10);LinkedBlockingQueue
Executors.newSingleThreadExecutor();LinkedBlockingQueue
Executors.newScheduledThreadPool(10);DelayedWorkQueue

ConcurrentLinkedQueue 是非阻塞并发队列,它的 CAS操作,offer 为例:

    public boolean offer(E e) {// 非空校验checkNotNull(e);final Node<E> newNode = new Node<E>(e);// 取得 tail 节点,同时令 p 指向它,然后循环CASfor (Node<E> t = tail, p = t;;) {// 取得 tail 的next节点Node<E> q = p.next;// 拿到 tail 的瞬间可能会有其他线程设置了新的 tail,需要判断 tail 后面是否为 nullif (q == null) {// 如果 q 为 null,说明 p 确实是 tail,此时将新的节点 CAS 到 tail 的后面if (p.casNext(null, newNode)) {// p 指向新的节点if (p != t) // CAS 设置新的 tailcasTail(t, newNode);return true;}// Lost CAS race to another thread; re-read next}// ... 其他逻辑}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/558651.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

daad转换器实验数据_箔芯片电阻在高温应用A/D转换器中的应用

工业/应用领域高温&#xff1a;地震数据采集系统、石油勘探监测、高精度检测仪产品采用&#xff1a;V5X5 Bulk Metal (R) Foil芯片电阻案例介绍TX424是一个完整的4通道24位模数转换器&#xff0c;采用40脚封装。该设计采用最先进设计方案&#xff0c;两个双通道24位调节器和一个…

excel分段排序_学会这个神操作,报表填报不再五花八门,效率远超Excel

在报表工作人员的的日常工作中&#xff0c;常常要面临统计混乱的终端用户输入的问题。由于无法准确限制用户的输入内容&#xff0c;所以在最终进行数据统计时&#xff0c;常常会出现数据不合法的情况。为此需要花费大量的人力和时间核对校验数据。举个简单的例子&#xff0c;某…

IDEA——必备插件指南

目录一、Free-Mybatis-Plugin二、Lombok三、jclasslib Bytecode Viewer一、Free-Mybatis-Plugin 二、Lombok 三、jclasslib Bytecode Viewer 学习 class 文件的必备插件。 使用简单&#xff0c;安装后可以在菜单 View 中看到 show bytecode with jclasslib&#xff1a; 效果…

jitter 如何优化网络_如何做好关键词优化网络?

越来越多的传统企业开始建立自己的网站&#xff0c;进而不断的推广自己的产品。为了能够让自己的企业网站出现在搜索引擎的首页&#xff0c;现在最常用的手段就是竞价排名和关键词优化网络。往往很多企业会选择关键词优化网络这种方式来推广自己的网站&#xff0c;对于新手seoe…

python学生名片系统_Python入门教程完整版400集(懂中文就能学会)快来带走

如何入门Python&#xff1f;权威Python大型400集视频&#xff0c;学了Python可以做什么&#xff1f;小编今天给大家分享一套高老师的python400集视频教程&#xff0c;里面包含入门进阶&#xff0c;源码&#xff0c;实战项目等等&#xff0c;&#xff0c;不管你是正在学习中&…

JVM——详解类加载过程

导航一、过程概述二、Loading2.1 类加载器2.2 双亲委派机制2.3 类在内存中的结构三、Linking四、Initializing一、过程概述 java 源文件编译后会生成一个 .class文件存储在硬盘上。 在程序运行时&#xff0c;会将用到的类文件加载到 JVM 内存中。从磁盘到内存的过程总共分为三…

下载 Java 学习的权威文档

JVMS 和 JLS 文档的下载 快速直达&#xff1a; https://docs.oracle.com/javase/8/ --> Java Language and Virtual Machine Specifications jvm specification 和 java language specification 是Java 学习的两个最权威的文档。如果你用的是 Java 8&#xff0c;就可以去下载…

iso图像测试卡_4700万像素 五轴防抖 徕卡正式发布SL2无反相机

出自蜂鸟网-器材频道&#xff0c;原文链接&#xff1a;https://m.fengniao.com/document/5358989.html徕卡于今日正式发布SL2相机&#xff0c;搭载4700万像素CMOS感光元件、通过感光元件移位实现光学图像稳定的五轴防抖技术、全新徕卡物距探测式自动对焦技术以及576万像素分辨率…

JVM——对象的创建与内存布局

导航一、对象的创建过程二、对象的内存布局2.1 内存布局2.2 计算对象的内存大小三、对象的定位3.1 句柄池3.2 直接指针四、对象的分配过程一、对象的创建过程 对象&#xff0c;又叫实例&#xff0c;是 OOP 的最常用角色。 如何创建一个对象&#xff1f;一般都是使用 new 关键…

JVM垃圾收集器——G1

导航引言一、G1 介绍1.1 适用场景1.2 设计初衷1.3 关注焦点1.4 工作模式1.5 堆的逻辑结构1.6 主要收集目标1.7 停顿预测模型1.8 拷贝和压缩1.9 与 CMS 和 Parallel 收集器的比较1.10 固定停顿目标二、堆的逻辑分区2.1 region2.2 CSet2.3 RSet2.4 Card Table三、G1 的工作原理3.…

的mvc_简述PHP网站开发的MVC模式

为了提高开发时候的代码重用和开发速度&#xff0c;php使用了mvc的模式&#xff0c;主要是对代码的功能进行了分类&#xff0c;M&#xff1a;model主要是对数据库进行操作&#xff0c;v&#xff1a;view主要是前端html文件操作&#xff0c;c&#xff1a;controller主要是编写基…

CAP 原则与 BASE 理论

导航引言一、CAP 原则1.1 Consistency 一致性1.2 Available 可用性1.3 Partition tolerance 分区容错性1.4 CAP 的矛盾1.5 CAP 的组合场景二、BASE 理论2.1 基本可用2.2 软状态2.3 最终一致性2.3.1 因果一致性2.3.2 读自身所写2.3.3 会话一致性2.3.4 单调读一致性2.3.5 单调写一…

java 教室借用管理系统_[内附完整源码和文档] 基于JAVA语言的学生选课信息管理系统...

摘 要本系统运用Java面向对象的方法设计而成。近年来&#xff0c;学生选课系统越来越在高校学生群体中得到普及&#xff0c;其所承担的功能也变得越来越丰富&#xff0c;所起到的作用也变得越来越重要&#xff0c;在被学校学生重视的同时&#xff0c;也意味着它的功能要更加完善…

jMeter 模拟 web 高并发请求

导航一、jmeter 简介与下载二、接口压测设置三、实战演示一、jmeter 简介与下载 Apache JMeter是Apache组织开发的基于Java的压力测试工具。 最初被设计用于Web应用测试&#xff0c;但后来扩展到其他测试领域。JMeter 可以用于对服务器、网络或对象模拟巨大的负载&#xff0c…

实施文档_建设工程监理全套资料范本,Word文档附百份案例表格,超实用

建设工程监理全套资料范本&#xff0c;Word文档附百份案例表格&#xff0c;超实用在日常工作中&#xff0c;监理人员不仅需要经常跑腿儿检查&#xff0c;同时还需要提交许许多多的资料存档&#xff0c;甚至可能需要熬夜码字。今天整理的监理资料范本&#xff0c;既能让监理人员…

微服务架构 —— 服务雪崩与容错方案

导航一、什么是服务雪崩二、雪崩效应的三个核心原因三、容错四、业界常见容错思路五、常见容错组件一、什么是服务雪崩 服务雪崩 指的是微服务架构中&#xff0c;微服务各节点之间由于网络通信异常或微服务自身故障等问题&#xff0c;导致请求堆积、任务堆积&#xff0c;消耗和…

手游方舟怎么输入代码_明日方舟再次登顶失败,为了不发十连奖励,鹰角实力控分?...

在明日方舟新版本活动“孤岛风云”正式上线后&#xff0c;关于干员的强度和游戏剧情的讨论也在最近多了起来。尤其是在一周年卫星干员山落地&#xff0c;并且人气干员塞雷娅背后的故事揭晓之后&#xff0c;明日方舟的热度也在玩家圈子中迅速的攀升&#xff0c;成为近期话题量十…

Spring Cloud Alibaba —— Sentinel 入门

导航一、什么是Sentinel1.1 Sentinel 的优点二、整合 Sentinel 演示三、Sentinel控制台与微服务通信的原理四、Sentinel 流控演示一、什么是Sentinel Sentinel 是阿里开源的用于提供微服务架构容错方案的组件。它以流量作为切入点&#xff0c;从流量控制、熔断降级、系统负载保…

Spring Cloud Alibaba —— Sentinel 详细使用

导航引言一、Sentinel的两个基本概念二、流控规则2.1 基本选项2.2 高级选项三、熔断(降级)规则四、热点规则五、授权规则&#xff08;了解&#xff09;六、系统规则&#xff08;了解&#xff09;七、自定义异常返回八、SentinelResource九、Sentinel 规则持久化&#xff08;待补…

扫地机器人单扫和双扫_小米扫拖机器人体验:再见了,拖把君

小米在2016年首次推出了扫地机器人&#xff0c;凭借产品力和性价比&#xff0c;可以说为中国家庭的智能清洁概念普及&#xff0c;立下一功。不过&#xff0c;近两年因为一直没有推出扫拖一体产品&#xff0c;急得民间高手都开始自己动手给米家扫地机改造拖地功能了&#xff0c;…