Java阻塞队列

什么阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  2. 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

四组API

image-20230731214338293

  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true。

Java中7种阻塞队列

image-20230801093001063

ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

DelayQueue:一个使用优先级队列实现的无界阻塞队列。

SynchronousQueue:一个不存储元素的阻塞队列。

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

1.ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。

默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列为了保证公平性,通常会降低吞吐量。

下面是公平阻塞队列:

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000, true);public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();
}

访问者的公平性是使用可重入锁实现的。

2.LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

3.PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

4.DelayQueue *

DelayQueue是一个支持延时获取元素的无界阻塞队列队列使用PriorityQueue来实现队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

image-20230801093608736

可以将DelayQueue运用在以下应用场景:

缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

(1)如何实现Delayed接口

DelayQueue队列的元素必须实现Delayed接口。

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

参考ScheduledThreadPoolExecutor里ScheduledFutureTask类的实现,一共有三步。

第一步:在对象创建的时候,初始化基本数据。使用time记录当前对象延迟到什么时候可以使用,使用sequenceNumber来标识元素在队列中的先后顺序

       private static final AtomicLong sequencer = new AtomicLong();ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}

第二步:实现getDelay方法,该方法返回当前元素还需要延时多长时间,单位是纳秒

        public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}

延迟时间参数的单位是纳秒,自己设计的时候最好使用纳秒,实现getDelay()方法时可以指定任意单位,一旦以秒或分作为单位,而延时时间又精确不到纳秒会出现误差。

第三步:实现compareTo方法来指定元素的顺序。

下述代码让延时时间最长的放在队列的末尾:

        public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}

(2)如何实现延时阻塞队列

当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。

take方法:

long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)return q.poll();
else if (leader != null)available.await();
else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}

变量leader是一个等待获取队列头部元素的线程。

如果leader不等于空,表示已经有线程在等待获取队列的头元素。使用await()方法让当前线程等待信号。

如果leader等于空,则把当前线程设置成leader,并使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。

5.SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。

它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。构造设置为true,则等待的线程会采用先进先出的顺序访问队列。

image-20230731225333708

队列本身并不存储任何元素,非常适合传递性场景(负责把生产者线程处理的数据直接传递给消费者线程)。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

6.LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法

(1)transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。

如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

transfer方法的关键代码:

// ...
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行试图把存放当前元素的s节点作为tail节点。

第二行让CPU自旋等待消费者消费元素。因为自旋会消耗CPU,所以自旋一定的次数后使用Thread.yield()方法来暂停当前正在执行的线程,并执行其他线程。

(2)tryTransfer方法

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。 如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回

带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

7.LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。

双向队列多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。

在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。

阻塞队列的实现原理

如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?

使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。

以ArrayBlockingQueue为例,ArrayBlockingQueue使用了Condition来实现(await、signal方法):

// 队列两种状态,空或者满 两个Condition对象 等待通知
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {// ... notEmpty = lock.newCondition();notFull =  lock.newCondition();}
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}
}
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
}
private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();
}

调用Condition#await方法,会将同步队列中的头节点获取锁的线程(释放锁)转移到等待队列中。

当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过LockSupport.park(this)来实现。

        public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;// 此时当前节点已经不在同步队列中,while死循环while (!isOnSyncQueue(node)) {// 阻塞当前线程LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}

继续进入源码,发现调用setBlocker先保存一下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。

    public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);UNSAFE.park(false, 0L);setBlocker(t, null);}

unsafe.park是个native方法:

public native void park(boolean isAbsolute, long time);

park这个方法会阻塞当前线程,只有以下4种情况中的一种发生时,该方法才会返回。

  1. 与park对应的unpark执行或已经执行时。“已经执行”是指unpark先执行,然后再执行park的情况。
  2. 线程被中断时。
  3. 等待完time参数指定的毫秒数时。
  4. 异常现象发生时,这个异常现象没有任何原因。

JVM是如何实现park方法:park在不同的操作系统中使用不同的方式实现,在Linux下使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的os::PlatformEvent::park方法:

void os::PlatformEvent::park() {int v ;for (;;) {v = _Event ;if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;}guarantee (v >= 0, "invariant") ;if (v == 0) {// Do this the hard way by blocking ...int status = pthread_mutex_lock(_mutex);assert_status(status == 0, status, "mutex_lock");guarantee (_nParked == 0, "invariant") ;++ _nParked ;while (_Event < 0) {// !!!    status = pthread_cond_wait(_cond, _mutex);// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...// Treat this the same as if the wait was interruptedif (status == ETIME) { status = EINTR; }assert_status(status == 0 || status == EINTR, status, "cond_wait");}-- _nParked ;// In theory we could move the ST of 0 into _Event past the unlock(),// but then we'd need a MEMBAR after the ST._Event = 0 ;status = pthread_mutex_unlock(_mutex);assert_status(status == 0, status, "mutex_unlock");}guarantee (_Event >= 0, "invariant") ;}}

pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。方法接收两个参数:一个共享变量 _cond,一个互斥量 _mutex。而unpark方法在Linux下是使用pthread_cond_signal实现的。park方法在Windows下则是使用WaitForSingleObject实现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/17183.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue项目开发常用工具类

防止重复造轮子&#xff0c;将经常用的函数进行记录&#xff0c;也参考网上的并一起进行记录&#xff0c;后续会持续更新常用到的函数工具类方法&#x1f609;&#x1f609; /** 验证手机号是否合格* true--说明合格*/ export function isPhone(phoneStr) {let myreg /^[1][3,…

艺人商务代言:避雷策略与成功合作之道

避免在艺人商务代言中遇到风险&#xff0c;是每个企业和艺人都应该高度重视的问题。代言活动是一种有效的市场营销手段&#xff0c;可以为企业带来广泛的曝光和销售增长&#xff0c;同时也能让艺人获得额外的收入和更高的知名度。然而&#xff0c;不慎选择错误的代言合作可能带…

Vue3--->组合式API与Pinia

目录 使用create-vue搭建 1、使用create-vue创建项目 2、项目目录和关键文件 组合式API 1、组合式API - setup选项 2、组合式API - reactive和ref函数 3、组合式API - computed 4、组合式API - watch 1、基础使用 - 侦听单个数据 2、基础使用 - 侦听多个数据 3、immediate&…

基于 FFmpeg 的跨平台视频播放器简明教程(七):使用多线程解码视频和音频

系列文章目录 基于 FFmpeg 的跨平台视频播放器简明教程&#xff08;一&#xff09;&#xff1a;FFMPEG Conan 环境集成基于 FFmpeg 的跨平台视频播放器简明教程&#xff08;二&#xff09;&#xff1a;基础知识和解封装&#xff08;demux&#xff09;基于 FFmpeg 的跨平台视频…

leetcode每日一题Day2——344. 反转字符串

✨博主&#xff1a;命运之光 &#x1f984;专栏&#xff1a;算法修炼之练气篇&#xff08;C\C版&#xff09; &#x1f353;专栏&#xff1a;算法修炼之筑基篇&#xff08;C\C版&#xff09; &#x1f433;专栏&#xff1a;算法修炼之练气篇&#xff08;Python版&#xff09; …

【面试题】与通义千问的芯片前端设计模拟面试归纳

这里是尼德兰的喵芯片设计相关文章,欢迎您的访问! 如果文章对您有所帮助,期待您的点赞收藏! 让我们一起为芯片前端全栈工程师而努力! 前言 两个小时,与chatGPT进行了一场数字IC前端设计岗的面试_尼德兰的喵的博客-CSDN博客 和GPT-3.5的回答可以对比品尝,味道更好。 模…

Jenkins pipeline 脚本语言学习支持

1 引言 Groovy是用于Java虚拟机的一种敏捷的动态语言&#xff0c;它是一种成熟的面向对象编程语言&#xff0c;既可以用于面向对象编程&#xff0c;又可以用作纯粹的脚本语言。 使用该种语言不必编写过多的代码&#xff0c;同时又具有闭包和动态语言中的其他特性。 Groovy是一…

用Python写了一个下载网站所有内容的软件,可见即可下

目录标题 前言环境介绍:代码实战获取数据获取视频采集弹幕采集评论 GUI部分尾语 前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 今天我们分享一个用Python写下载视频弹幕评论的代码。 顺便把这些写成GUI&#xff0c;把这些功能放到一起让朋友用起来更方便~ 环境介绍: py…

tinkerCAD案例:29. 摇头娃娃

Research Your Favorite Bobblehead 摇头娃娃 Project Overview: 项目概况&#xff1a; Design and create your favorite Minecraft 3D bobble head. All you need is a computer, 3D printer, spring and your creativity to your favorite Minecraft character in the for…

dreamStudio试用教程【AI绘画】

文章目录 dreamStudio 简介打开官网如下邮箱登录即可切换随机提示词新用户的试用次数目前只有25张图像&#x1f4d9; 预祝各位 前途似锦、可摘星辰 dreamStudio 简介 https://github.com/Stability-AI/StableStudio StabilityAI在官网上重磅宣布——旗下的文生图应用DreamStu…

智能提词器有哪些?了解一下这款提词工具

智能提词器有哪些&#xff1f;使用智能提词器可以帮助你更好地准备和交付演讲、报告或其他提词场合。它可以提高你的效率&#xff0c;节省你的时间&#xff0c;并让你更加自信地与听众沟通。另外&#xff0c;智能提词器还可以提供一些有用的功能&#xff0c;如语音识别、智能建…

Spring Boot实践三 --数据库

一&#xff0c;使用JdbcTemplate访问MySQL数据库 1&#xff0c;确认本地已正确安装mysql 按【winr】快捷键打开运行&#xff1b;输入services.msc&#xff0c;点击【确定】&#xff1b;在打开的服务列表中查找mysql服务&#xff0c;如果没有mysql服务&#xff0c;说明本机没有…

迁移学习、微调、计算机视觉理论(第十一次组会ppt)

@TOC 数据增广 迁移学习 微调 目标检测和边界框 区域卷积神经网络R—CNN

IDEA开启并配置services窗口

前言&#xff1a; 一般一个spring cloud项目中大大小小存在几个十几个module编写具体的微服务项目。此时&#xff0c;如果要调试测需要依次启动各个项目比较麻烦。 方法一&#xff1a; 默认第一次打开项目的时候&#xff0c;idea会提示是否增加这个选项卡&#xff0c;如果你没…

《golang设计模式》第一部分·创建型模式-03-建造者模式(Builder)

文章目录 1. 概念1.1 角色1.2 类图 2. 代码示例2.1 设计2.2 代码2.3 类图 1. 概念 1.1 角色 Builder&#xff08;抽象建造者&#xff09;&#xff1a;给出一个抽象接口&#xff0c;以规范产品对象的各个组成成分的建造。ConcreteBuilder&#xff08;具体建造者&#xff09;&a…

NOsql之MongoDB入门分享

目录 一、MongoDB简介 1、概念理解 2、yum安装部署 3、二进制安装部署 4、配置文件解析 二、MongoDB基本管理 1、登录操作 2、管理命令 3、用户管理 一、MongoDB简介 1、概念理解 关系型数据库&#xff08;RDBMS:Relational Database Management System) MySql、Ora…

STM32-风速传感器(ADC)

目录 0 说明 1 传感器介绍 2 代码说明 2.1 ADC.c 2.2 adc.h 2.3 main.c 0 说明 本篇文章主要是说明怎么使用STM32单片机读取风速传感器采集到的数据&#xff0c;读取方式是ADC&#xff0c;并且附带着STM32所需要的全部代码&#xff0c;所使用的风速传感器如下图所示。 附&am…

IDEA的基础使用——【初识IDEA】

IDEA的基础使用——【初识IDEA】 文章目录 IDEA简介前言官网 IDEA的下载与安装选择下载路径勾选自己需要的其余按默认选项进行即可 目录简介安装目录简介 运行Hello WorldIDEA快捷键常用模板模板一&#xff1a;psvm&#xff08;main&#xff09;模板二&#xff1a;模板三&#…

PHP-Mysql好运图书管理系统--【白嫖项目】

强撸项目系列总目录在000集 PHP要怎么学–【思维导图知识范围】 文章目录 本系列校训本项目使用技术 首页必要的项目知识ThinkPHP的MVCThinkTemplateThinkPHP 6和ThinkPHP 5 phpStudy 设置导数据库前台展示页面后台的管理界面数据库表结构项目目录如图&#xff1a;代码部分&a…

【c语言初级】c++基础

文章目录 1. C关键字2. 命名空间2.1 命名空间定义2.2 命名空间使用 3. C输入&输出4. 缺省参数4.1 缺省参数概念4.2 缺省参数分类 5. 函数重载5.2 C函数重载的原理--名字修饰采用C语言编译器编译后结果 1. C关键字 C是在C的基础之上&#xff0c;容纳进去了面向对象编程思想…