JUC阻塞队列(四):DelayQueue

1、DelayQueue介绍

      DelayQueue 是一个延迟队列,生产者写入一个数据,这个数据具有被直接消费的延迟时间,

      让数据具有延迟的特性。

      DelayQueue底层也是基于二叉堆来实现的,DelayQueue本就是基于PriorityBQueue 实现的。

      二叉堆结构每次获取的是堆顶数据,在比较时,根据延迟时间进行比较,延迟时间剩余端的放

      在堆顶。

      由于 DelayQueue 基于 PriorityQueue  实现的,因此 DelayQueue 理论上也是一个无边界队

      列,DelayQueue 容量可以进行无限扩容。

      

2、DelayQueue核心属性

      由DelayQueue 结构可以发现,DelayQueue 存储的数据必须实现 Delayed 接口

      DelayQueue 结构如下:

            

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {//锁,阻塞队列需要使用锁来保证线程安全//只有一把锁,表示生产者和消费者使用的是同一把锁private final transient ReentrantLock lock = new ReentrantLock();//基于优先级队列 PriorityQueueprivate final PriorityQueue<E> q = new PriorityQueue<E>();/*** leader 一般用来保存等待堆顶数据的消费者线程*/private Thread leader = null;/*** 基于 PriorityQueue(基于二叉堆)实现数据存储,生产者在插入数据时是不会阻塞的,* 当前的Condition就是给消费者用的,当消费者获取数据时,当堆顶数据的延迟时间还不为* 0(即还没到执行时间点),此时消费者线程会阻塞挂起等待一会(等待的是堆顶数据),直到堆顶数据延迟时间为0(到达任务执行时间点)* 或者 生产者新插入的数据到了堆顶,此时生产者会调用Condition.signal() 方法唤醒消费者线程*/private final Condition available = lock.newCondition();public DelayQueue() {}public DelayQueue(Collection<? extends E> c) {this.addAll(c);}
}/**Delayed 继承 Comparable接口,所以 Delayed 的实现都可以进行比较操作
*/
public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);//获取延迟时间,比较延迟时间
}

3、DelayQueue使用示例

      DelayQueue 常用方法也是 BlockingQueue接口中定义的那几个存储数据和获取数据的方法,

      只有一点需要注意,即 DelayQueue 保存的数据必须实现接口Delayed 

       DelayQueue 使用示例如下:

                

public class TaskDelayed implements Delayed {private String name;/** 执行时间点*/private Long time;public TaskDelayed(String name,Long time){this.name = name;this.time = System.currentTimeMillis()+time;}/*** 设置 任务TaskDelayed 什么时候可以出延迟队列DelayedQueue* 该方法返回值小于等于0时任务才会从 延迟队列DelayedQueue 中取出执行**/@Overridepublic long getDelay(TimeUnit unit) {//TimeUnit.MILLISECONDS :将时间转换为毫秒return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/*** 比较器* 2个 TaskDelayed 任务在存储到延迟队列时的比较方式,通过time属性进行比较* 返回值:*    < 0: 按从小到大排列*    == 0 : 相等*    >0 : 从大到小排列* @param o* @return*/@Overridepublic int compareTo(Delayed o) {TaskDelayed task = (TaskDelayed) o;return (int)(this.time - task.getTime());}
}public class DelayedQueueDemo01 {public static void main(String[] args) throws InterruptedException {TaskDelayed task1 = new TaskDelayed("Tome",2000L);TaskDelayed task2 = new TaskDelayed("JieRui",4000L);TaskDelayed task3 = new TaskDelayed("zhuDy",3000L);TaskDelayed task4 = new TaskDelayed("zhanmusi",1000L);//DelayQueue 存放的数据必须实现接口DelayedDelayQueue<TaskDelayed> queue = new DelayQueue<>();//添加数据queue.add(task1);queue.offer(task2);queue.offer(task3,4, TimeUnit.SECONDS);queue.put(task4);//取数据System.out.println(queue.remove());//若堆顶数据的延迟时间还没到达,则poll()返回null,remove()会直接抛出异常System.out.println(queue.poll());System.out.println(queue.poll(5,TimeUnit.SECONDS));System.out.println(queue.take());}
}

4、DelayQueue写入流程分析

      因为 DelayQueue 底层是基于 PriorityQueue  实现的,也就是基于二叉堆实现的,所以

      DelayQueue 是一个无界的队列,存储数据的数组可以动态扩容,所以生产者不需要关注

      队列满了而阻塞的问题,因此这里只需要关注offer(E e) 方法就可以了

             offer(E e) 代码如下:

               

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {//直接调用PriorityQueue的插入方法q.offer(e);//判断数据插入后的二叉堆的堆顶元素是不是刚插入的数据,//若是,则说明当前堆顶数据可能已到达延迟时间可以进行消费,唤醒等待的消费者线程,并将当前等待的消费者线程设置为nullif (q.peek() == e) {/*** leader 赋值为null* todo 在消费者消费数据时会判断leader 是否为null*/leader = null;/*** 唤醒挂起阻塞的消费者线程,避免刚插入的数据的延迟时间出现问题* 这里可以发现消费者等待的是堆顶数据*/available.signal();}return true;} finally {lock.unlock();}}

              

5、DelayQueue取数据流程分析

      消费者取数据过程需要考虑阻塞问题:

              1)队列为空,无数据,消费者线程需要挂起等待一会

              2)堆顶数据的延迟时间还没到,此时消费者线程需要挂起等待一会

              3)当消费者A已经在等待堆顶数据,此时消费B也过来取数据,此时消费者B需要

                    挂起等待一会

5.1、remove() 

        该方法功能是取数据,取堆顶数据,若取不到数据,则直接抛出异常。

        注意:若堆顶数据的延迟时间还没到达,则取不到数据,也会抛出异常

        remove 方法如下:

              

5.2、poll()

         该方法功能是读取数据,poll()方法不会阻塞消费者,能获取数据就直接返回,否则返回null

         poll 方法代码如下:

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {//查看堆顶数据E first = q.peek();/***first == null 表示堆为空,没有数据* getDelay 方法返回值大于0,表示堆顶数据还没到延迟时间,不能执行,堆顶数据无法取出*/if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}

5.3、poll(long timeout, TimeUnit unit)

         带超时时间的读取数据的方法

         poll方法代码如下:

            

/***带超时时间的取数据方法*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {//将超时时间转换为纳秒long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//加锁,可被中断,当被中断时会抛出异常,直接退出lock.lockInterruptibly();try {//自旋for (;;) {//查看堆顶数据E first = q.peek();//若堆顶数据为空,即堆无数据,则判断超时时间是否已过,若超时时间也已经过了,则返回nullif (first == null) {if (nanos <= 0)return null;else//阻塞等待,并返回剩余超时时间//等待生产者线程添加数据之后唤醒nanos = available.awaitNanos(nanos);} else {//堆有数据//获取堆顶数据的延迟时间,单位纳秒long delay = first.getDelay(NANOSECONDS);//若堆顶数据的延迟时间小于等于0,表示当前堆顶数据可以执行,立即取出if (delay <= 0)return q.poll();//若堆顶数据的延迟时间大于0(表示堆顶数据还不能执行)且超时时间已经过了,则返回nullif (nanos <= 0)return null;/*** 指定到这里,说明堆顶数据的延迟时间大于0(表示延迟时间没到,堆顶数据还不能执行)且方法超时时间还没过* 消费者需要挂起等待*/first = null; // don't retain ref while waiting//方法剩余超时时间小于堆顶数据的延迟时间,则消费者线程继续阻塞,并返回剩余超时时间/*** todo 疑问:这里为什么不直接结束,反正最终是无法获取数据的?*           因为 你不确定在剩余的超时时间nanos内,是否有新的数据插入(新插入的数据可能延迟时间很短),*           前边offer(E e)新增数据后也会唤醒等待的消费者线程*           第二个条件 leader != null,leader != null表示前边已经有*           消费者线程在挂起阻塞堆顶数据的延迟时间到期,后边的消费者线程执行到这里*           需要直接阻塞挂起,这样避免 leader 的重复赋值*/if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);else {//方法剩余超时时间大于堆顶数据的延迟时间,表示当前消费者可以在超时时间nanos内拿到堆顶数据,// 且当前没有消费者在等待堆顶数据//将leader 设置为 当前消费者线程Thread thisThread = Thread.currentThread();leader = thisThread;try {//阻塞等待,并返回剩余的阻塞时间//当前消费者阻塞堆顶数据的延迟时间long timeLeft = available.awaitNanos(delay);//更新剩余的可阻塞时间,已消耗的超时时间是 delay - timeLeftnanos -= delay - timeLeft;} finally {//堆顶数据的延迟时间到了,将 leader 设置为null//这一步只有生产者和消费者自己可以做if (leader == thisThread)leader = null;}}}}} finally {//没有消费者线程在等待堆顶数据的延迟时间且堆不为空,此时可能还有消费者在阻塞,则唤醒这些阻塞的消费者线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

5.4、take()

         读取数据,允许中断,若队列为空则一直阻塞,直到队列有数据 或 被中断时异常退出

         take() 方法代码如下:     

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁,允许中断,中断异常退出lock.lockInterruptibly();try {//自旋for (;;) {//查看堆顶数据E first = q.peek();//若队列没有数据,则阻塞,由生产者唤醒 或者被中断异常退出if (first == null)available.await();else {//队列不为空//获取堆顶数据的延迟时间long delay = first.getDelay(NANOSECONDS);//表示堆顶数据可以执行,则从队列中取出数据if (delay <= 0)return q.poll();//执行到这里,表示堆顶数据的延迟时间还没到,消费者线程需要阻塞,等待堆顶数据到达其延迟时间first = null; // don't retain ref while waiting//前边有消费者线程在阻塞等待堆顶数据的延迟时间到达,则当前线程直接阻塞if (leader != null)available.await();else {//当前没有消费者线程在等待堆顶数据的延迟时间到达,则把当前消费者设置为等待堆顶数据延迟时间到达的线程Thread thisThread = Thread.currentThread();leader = thisThread;try {//阻塞,阻塞时间是堆顶数据的延迟时间available.awaitNanos(delay);} finally {//阻塞结束,获取到堆顶数据后将 leader 设置为Nullif (leader == thisThread)leader = null;}}}}} finally {//没有消费者线程在等待堆顶数据的延迟时间且堆不为空,此时可能还有消费者在阻塞,则唤醒这些阻塞的消费者线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/877576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kubernetes Pod入门

在 Kubernetes 中&#xff0c;一个重要的概念就是 Pod(豆英)&#xff0c;Kubernetes 并不是直接管理容器他的最小管理单元叫做 Pod。的&#xff0c; 在 Docker 的应用中&#xff0c;我们把一个应用程序封装在一个镜像中&#xff0c;之后启动这个镜像并映射一个宿主机端…

x-cmd pkg | dua - 一个可以方便地了解给定目录的磁盘空间使用情况的工具

目录 简介用户首次快速实验指南技术特点竞品和相关项目进一步阅读 简介 dua 是 Disk Usage Analyzer 的简写&#xff0c;该工具可以快速查看给定目录的磁盘空间使用情况。 对于想要深入了解磁盘空间使用情况并有效管理存储的用户来说&#xff0c;Dua 是一个很有价值的工具。通…

华硕飞行堡垒键盘全部失灵【除电源键】

华硕飞行堡垒FX53VD键盘全部失灵【除电源键】 前言一、故障排查二、发现问题三、使用方法总结 前言 版本型号&#xff1a; 型号 ASUS FX53VD&#xff08;华硕-飞行堡垒&#xff09; 板号&#xff1a;GL553VD 故障情况描述&#xff1a; 键盘无法使用&#xff0c;键盘除开机键外…

轮式自主移动机器人的研究发展与ROS环境搭建

前言&#xff1a; 在轮式自主移动机器人的研发过程中&#xff0c;编程技术的重要性不言而喻。编程不仅涉及到机器人各种功能模块的协调运作&#xff0c;还决定了机器人能否实现复杂的行为决策和控制。首先&#xff0c;编程技术为机器人提供了“大脑”&#xff0c;从而使其能够感…

暑假请停止躺平!0门槛赚22万奖金!

在这个数字化时代&#xff0c;数据成为推动社会进步的重要力量。您是否想运用手中的数据&#xff0c;为治理与环保领域贡献力量&#xff1f;现在&#xff0c;机会就在眼前&#xff01; 2024年厦门市大数据创新应用大赛正在火热进行中&#xff0c;我们诚邀全国高等院校在校学生…

工厂现场多功能帮手,三防平板改善管理体验

随着制造业的智能化变革&#xff0c;信息化、自动化和智能化逐渐成为工厂管理的新常态。在这一波技术浪潮中&#xff0c;三防平板作为一种多功能的工作工具&#xff0c;正在逐步改善工厂现场的管理体验。 一、三防平板的定义与特点 三防平板&#xff0c;顾名思义&#xff0c;是…

『 Linux 』利用UDP套接字实现简单群聊

文章目录 服务端通过传入命令处理实现远程命令执行使用Windows编辑UDP客户端实现Windows远程控制Linux接收套接字的其他信息UDP套接字简单群聊服务端UDP套接字简单群聊客户端运行测试及分离输入输出 参考代码 服务端通过传入命令处理实现远程命令执行 『 Linux 』利用UDP套接字…

springboot医疗远程管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图详细视频演示技术栈系统测试为什么选择我官方认证玩家&#xff0c;服务很多代码文档&#xff0c;百分百好评&#xff0c;战绩可查&#xff01;&#xff01;入职于互联网大厂&#xff0c;可以交流&#xff0c;共同进步。有保障的售后 代码参考数据库参…

el-table实现动态添加行,并且有父子级联动下拉框

<template><div><el-button click"addRow">添加行</el-button><el-table :data"tableData" style"width: 100%"><el-table-column label"序号"type"index"width"100"align"…

【ARM+Codesys 客户案例 】基于RK3568/A40i/STM32+CODESYS开发的控制器在自动输送分拣系统上的应用,支持定制

2021年“京东618” 累计下单金额超3438亿元,再次刷新纪录! 从下单到收货&#xff0c;各种货品均可在短短几天内通过四通八达的物流网络送达全国任何一个家庭。电子商务和快递物流的迅猛发展对仓储、分拣、配送效率和准确性均提出了更高的要求&#xff0c;加速了智能物流的发展。…

如何免费获取乡镇级边界数据geoJson数据

如何免费获取乡镇级边界数据geoJson数据 我们可以通过 阿里云数据可视化平台 &#xff0c;可以获取到中国各个省份/区级/县级的json数据&#xff0c;但是区级和县级&#xff0c;并没有包含街道和乡镇的数据 获取乡镇级边界数据 1.下载bigemap全能版 安装好后选择你要导出的…

C++竞赛初阶L1-13-第五单元-循环嵌套(29~30课)537: T456456 质因数分解

题目内容 已知正整数 n 是两个不同的质数的乘积&#xff0c;试求出较大的那个质数。 输入格式 输入只有一行&#xff0c;包含一个正整数 n&#xff08;6<n<109&#xff09;。 输出格式 输出只有一行&#xff0c;包含一个正整数 p&#xff0c;即较大的那个质数。 样例…

《黑神话:悟空》媒体评分解禁 M站均分82

《黑神话&#xff1a;悟空》媒体评分现已解禁&#xff0c;截止发稿时&#xff0c;M站共有43家媒体评测&#xff0c;均分为82分。 部分媒体评测&#xff1a; God is a Geek 100&#xff1a; 毫无疑问&#xff0c;《黑神话&#xff1a;悟空》是今年最好的动作游戏之一&#xff…

ant design pro v6 如何做好角色管理

先上图&#xff1a; 整个角色管理是如何做的吗&#xff1f; 首先你要处理后端&#xff0c;要先把角色存到用户那。 这是用户管理部分的内容&#xff1a; 可以看到一个用户是有多个角色的。 看到没有&#xff0c;存的是数组 数组的是一个 role 对象 role 对象是这样&#xf…

在选择或推荐数据恢复软件之前,您如何测试和审查它?

数据恢复软件可以帮助您从各种存储设备中检索丢失或删除的文件&#xff0c;例如硬盘驱动器&#xff0c;USB闪存驱动器&#xff0c;存储卡或智能手机。但是&#xff0c;并非所有数据恢复软件都是一样的&#xff0c;根据您的情况和需求&#xff0c;有些软件的性能可能比其他软件更…

网安入门—信息收集

1.定义 信息收集是指收集有关目标应用程序和系统的相关信息。这些信息可以帮助攻击者了解目标系统的架构、技术实现细节、运行环境、网络拓扑结构、安全措施等方面的信息&#xff0c;以便我们在后续的渗透过程更好的进行。 2.分类 主动信息收集和被动信息收集 区别&#xf…

微信公众号发送模板消息使用说明

一、获取access_token def get_access():appid secret url fhttps://api.weixin.qq.com/cgi-bin/token?grant_typeclient_credential&appid{appid}&secret{secret}res requests.get(url).json()return res 返回结果如下&#xff1a; {access_token: 83_TAxuwdt…

android FD_SET_chk问题定位

android FD_SET_chk问题定位 一、FD报错二、问题定位2.1 APM定位2.2 adb定位2.3. 代码获取FD数 三、FD优化 一、FD报错 App在运行中记录报错如下&#xff0c;FD_SET&#xff0c;这个问题大概是文件描述符&#xff08;File Descriptor&#xff0c;简称FD&#xff09;超过了最大…

MySQL InnoDB引擎四大特性ACID实现方案分析

文章目录 概要InnoDb引擎ACID模型的实现方案小结 概要 对于Mysql&#xff0c;事物的支撑并不依赖于Server层&#xff0c;不同的存储引擎对于事物的支持也不一样&#xff0c;对于我们常用的InnoDB引擎&#xff0c;其提供了一套基于【ACID模型】的事物完整的解决方案。为什么MyIS…

云计算实训32——安装nginx(修改端口为8080)、roles基本用法、使用剧本安装nginx、使用roles实现lnmp

一、安装nginx并更改其端口 编辑hosts配置文件 [rootmo ~]# vim /etc/ansible/hosts 创建目录 [rootmo ~]# mkdir /etc/ansible/playbook 编辑配置文件 [rootmo ~]# vim /etc/ansible/playbook/nginx.yml 执行测试 [rootmo ~]# ansible-playbook /etc/ansible/playbook/n…