阻塞队列之七:DelayQueue延时队列

一、DelayQueue简介

  是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的(PriorityQueue实际存放Delayed接口对象),即队头对象的延迟到期时间最短(队列顶端总是最小的元素)。注意:不能将null元素放置到这种队列中。

  DelayQueue在poll/take的时候,队列中元素会判定这个elment有没有达到超时时间,如果没有达到,poll返回null,而take进入等待状态。但是,除了这两个方法,队列中的元素会被当做正常的元素来对待。例如,size方法返回所有元素的数量,而不管它们有没有达到超时时间。而协调的Condition available只对take和poll是有意义的。

二、DelayQueue源码分析

2.1、DelayQueue的lock

DelayQueue使用一个可重入锁和这个锁生成的一个条件对象进行并发控制。

    private final transient ReentrantLock lock = new ReentrantLock();
//内部用于存储对象
private final PriorityQueue<E> q = new PriorityQueue<E>();/*** Thread designated to wait for the element at the head of* the queue. This variant of the Leader-Follower pattern* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to* minimize unnecessary timed waiting. When a thread becomes* the leader, it waits only for the next delay to elapse, but* other threads await indefinitely. The leader thread must* signal some other thread before returning from take() or* poll(...), unless some other thread becomes leader in the* interim. Whenever the head of the queue is replaced with* an element with an earlier expiration time, the leader* field is invalidated by being reset to null, and some* waiting thread, but not necessarily the current leader, is* signalled. So waiting threads must be prepared to acquire* and lose leadership while waiting.*/private Thread leader = null;/*** Condition signalled when a newer element becomes available* at the head of the queue or a new thread may need to* become leader.*/private final Condition available = lock.newCondition();

2.2、成员变量

要先了解下DelayQueue中用到的几个关键对象:

2.2.1、Delayed, 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。

此接口的实现必须定义一个 compareTo()方法,该方法提供与此接口的 getDelay()方法一致的排序。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {

DelayQueue是一个BlockingQueue,其泛型类的参数是Delayed接口对象。

Delayed接口:

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);   //返回与此对象相关的剩余延迟时间,以给定的时间单位表示。
}

Comparable接口:

public interface Comparable<T> {public int compareTo(T o);
}

Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。

2.2.2、PriorityQueue,优先级队列存放有序对象

优先队列的比较基准值是时间。详解见《阻塞队列之八:PriorityBlockingQueue优先队列》

DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

public class DelayQueue<E extends Delayed> implements BlockingQueue<E> { private final PriorityQueue<E> q = new PriorityQueue<E>();
}

总结:DelayQueue内部是使用PriorityQueue实现的,DelayQueue = BlockingQueue + PriorityQueue + Delayed。

2.3、构造函数

    public DelayQueue() {}    public DelayQueue(Collection<? extends E> c) {this.addAll(c);}
    public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e);}

超时的参数被忽略,因为是无界的。不会阻塞或超时。

2.4、入队

    public boolean add(E e) {return offer(e);}public void put(E e) {offer(e);}public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {//添加元素后peek还是e,重置leader,通知条件队列 leader = null;available.signal();}return true;} finally {lock.unlock();}}

2.5、出队

public E poll() {  final ReentrantLock lock = this.lock;  lock.lock();  try {  E first = q.peek();  if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //队列为空或者延迟时间未过期  return null;  else  return q.poll();  } finally {  lock.unlock();  }  
}  /** * take元素,元素未过期需要阻塞 */  
public E take() throws InterruptedException {  final ReentrantLock lock = this.lock;  lock.lockInterruptibly();  try {  for (;;) {  E first = q.peek();  if (first == null)  available.await(); //队列空,加入条件队列  else {  long delay = first.getDelay(TimeUnit.NANOSECONDS); //获取剩余延迟时间  if (delay <= 0) //小于0,那就poll元素  return q.poll();  else if (leader != null) //有延迟,检查leader,不为空说明有其他线程在等待,那就加入条件队列  
                    available.await();  else {   Thread thisThread = Thread.currentThread();  leader = thisThread; //设置当前为leader等待  try {  available.awaitNanos(delay); //条件队列等待指定时间  } finally {  if (leader == thisThread) //检查是否被其他线程改变,没有就重置,再次循环  leader = null;  }  }  }  }  } finally {  if (leader == null && q.peek() != null) //leader为空并且队列不空,说明没有其他线程在等待,那就通知条件队列  
            available.signal();  lock.unlock();  }  
}  /** * 响应超时的poll */  
public E poll(long timeout, TimeUnit unit) throws InterruptedException {  long nanos = unit.toNanos(timeout);  final ReentrantLock lock = this.lock;  lock.lockInterruptibly();  try {  for (;;) {  E first = q.peek();  if (first == null) {  if (nanos <= 0)  return null;  else  nanos = available.awaitNanos(nanos);  } else {  long delay = first.getDelay(TimeUnit.NANOSECONDS);  if (delay <= 0)  return q.poll();  if (nanos <= 0)  return null;  if (nanos < delay || leader != null)  nanos = available.awaitNanos(nanos);  else {  Thread thisThread = Thread.currentThread();  leader = thisThread;  try {  long timeLeft = available.awaitNanos(delay);  nanos -= delay - timeLeft;  } finally {  if (leader == thisThread)  leader = null;  }  }  }  }  } finally {  if (leader == null && q.peek() != null)  available.signal();  lock.unlock();  }  
}  /** * 获取queue[0],peek是不移除的 */  
public E peek() {  final ReentrantLock lock = this.lock;  lock.lock();  try {  return q.peek();  } finally {  lock.unlock();  }  
}  

三、JDK或开源框架中使用

ScheduledThreadPoolExecutor中使用了DelayedWorkQueue。

应用场景

下面的应用场景是来源于网上,虽然借用DelayedQueue可以快速找到要“失效”的对象,但DelayedQueue内部的PriorityQueue的(插入、删除时的排序)也耗费资源。

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
d)session超时管理,网络应答通讯协议的请求超时处理。

 

四、示例

1、缓存示例

  1. 当向缓存中添加key-value对时,如果这个key在缓存中存在并且还没有过期,需要用这个key对应的新过期时间
  2. 为了能够让DelayQueue将其已保存的key删除,需要重写实现Delayed接口添加到DelayQueue的DelayedItem的hashCode函数和equals函数
  3. 当缓存关闭,监控程序也应关闭,因而监控线程应当用守护线程

以下是Sample,是一个缓存的简单实现。共包括三个类Pair、DelayItem、Cache。如下:

package com.dxz.concurrent.delayqueue;public class Pair<K, V> {public K key;public V value;public Pair() {}public Pair(K first, V second) {this.key = first;this.value = second;}@Overridepublic String toString() {return "Pair [key=" + key + ", value=" + value + "]";}}

以下是Delayed的实现

package com.dxz.concurrent.delayqueue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;public class DelayItem<T> implements Delayed {/** Base of nanosecond timings, to avoid wrapping */private static final long NANO_ORIGIN = System.nanoTime();/*** Returns nanosecond time offset by origin*/final static long now() {return System.nanoTime() - NANO_ORIGIN;}/*** Sequence number to break scheduling ties, and in turn to guarantee FIFO* order among tied entries.*/private static final AtomicLong sequencer = new AtomicLong(0);/** Sequence number to break ties FIFO */private final long sequenceNumber;/** The time the task is enabled to execute in nanoTime units */private final long time;private final T item;public DelayItem(T submit, long timeout) {this.time = now() + timeout;this.item = submit;this.sequenceNumber = sequencer.getAndIncrement();}public T getItem() {return this.item;}public long getDelay(TimeUnit unit) {long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);return d;}public int compareTo(Delayed other) {if (other == this) // compare zero ONLY if same objectreturn 0;if (other instanceof DelayItem) {DelayItem x = (DelayItem) other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1);}
}

以下是Cache的实现,包括了put和get方法,还包括了可执行的main函数。

package com.dxz.concurrent.delayqueue;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;public class Cache<K, V> {private static final Logger LOG = Logger.getLogger(Cache.class.getName());private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>();private DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>();private Thread daemonThread;public Cache() {Runnable daemonTask = new Runnable() {public void run() {daemonCheck();}};daemonThread = new Thread(daemonTask);daemonThread.setDaemon(true);daemonThread.setName("Cache Daemon");daemonThread.start();}private void daemonCheck() {if (LOG.isLoggable(Level.INFO))LOG.info("cache service started.");for (;;) {try {DelayItem<Pair<K, V>> delayItem = q.take();if (delayItem != null) {// 超时对象处理Pair<K, V> pair = delayItem.getItem();cacheObjMap.remove(pair.key, pair.value); // compare and// remove
                }} catch (InterruptedException e) {if (LOG.isLoggable(Level.SEVERE))LOG.log(Level.SEVERE, e.getMessage(), e);break;}}if (LOG.isLoggable(Level.INFO))LOG.info("cache service stopped.");}// 添加缓存对象public void put(K key, V value, long time, TimeUnit unit) {V oldValue = cacheObjMap.put(key, value);if (oldValue != null) {boolean result = q.remove(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, oldValue), 0L));System.out.println("remove:="+result);}long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime));}public V get(K key) {return cacheObjMap.get(key);}public DelayQueue<DelayItem<Pair<K, V>>> getQ() {return q;}public void setQ(DelayQueue<DelayItem<Pair<K, V>>> q) {this.q = q;}// 测试入口函数public static void main(String[] args) throws Exception {Cache<Integer, String> cache = new Cache<Integer, String>();cache.put(1, "aaaa", 60, TimeUnit.SECONDS);cache.put(1, "aaaa", 10, TimeUnit.SECONDS);//cache.put(1, "ccc", 60, TimeUnit.SECONDS);cache.put(2, "bbbb", 30, TimeUnit.SECONDS);cache.put(3, "cccc", 66, TimeUnit.SECONDS);cache.put(4, "dddd", 54, TimeUnit.SECONDS);cache.put(5, "eeee", 35, TimeUnit.SECONDS);cache.put(6, "ffff", 38, TimeUnit.SECONDS);cache.put(1, "aaaa", 70, TimeUnit.SECONDS);for(;;) {Thread.sleep(1000 * 2);{for(Object obj : cache.getQ().toArray()) {System.out.print(((DelayItem)obj).toString());System.out.println(",");}System.out.println();}}}
}

结果片段1:(重复key的Delayed对象将从DelayedQueue中移除

remove:=true
remove:=true
七月 04, 2017 11:28:36 上午 com.dxz.concurrent.delayqueue.Cache daemonCheck
信息: cache service started.
DelayItem [sequenceNumber=3, time=30000790187, item=Pair [key=2, value=bbbb]],
DelayItem [sequenceNumber=6, time=35000842411, item=Pair [key=5, value=eeee]],
DelayItem [sequenceNumber=7, time=38000847189, item=Pair [key=6, value=ffff]],
DelayItem [sequenceNumber=5, time=54000835925, item=Pair [key=4, value=dddd]],
DelayItem [sequenceNumber=4, time=66000803499, item=Pair [key=3, value=cccc]],
DelayItem [sequenceNumber=9, time=70000900437, item=Pair [key=1, value=aaaa]],

结果片段2:(队头对象将最先过时,可以被take()出来,这段代码在daemonCheck()方法中,即对超时对象的处理,如这里是清理session集合对象)

...
DelayItem [sequenceNumber=3, time=30000665600, item=Pair [key=2, value=bbbb]],
DelayItem [sequenceNumber=6, time=35000689152, item=Pair [key=5, value=eeee]],
DelayItem [sequenceNumber=7, time=38000694272, item=Pair [key=6, value=ffff]],
DelayItem [sequenceNumber=5, time=54000685398, item=Pair [key=4, value=dddd]],
DelayItem [sequenceNumber=4, time=66000679595, item=Pair [key=3, value=cccc]],
DelayItem [sequenceNumber=9, time=70000728406, item=Pair [key=1, value=aaaa]],DelayItem [sequenceNumber=6, time=35000689152, item=Pair [key=5, value=eeee]],
DelayItem [sequenceNumber=5, time=54000685398, item=Pair [key=4, value=dddd]],
DelayItem [sequenceNumber=7, time=38000694272, item=Pair [key=6, value=ffff]],
DelayItem [sequenceNumber=9, time=70000728406, item=Pair [key=1, value=aaaa]],
DelayItem [sequenceNumber=4, time=66000679595, item=Pair [key=3, value=cccc]],
...

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/293689.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

研究表明:喝酒“上脸”是基因突变,不仅容易老年痴呆,还容易得胃癌

全世界只有3.14 % 的人关注了爆炸吧知识本文转自科研大匠“喝酒上脸的人能喝&#xff01;”这句话&#xff0c;不管来自天南还是海北的&#xff0c;在酒桌上&#xff0c;肯定都耳熟能详有没有&#xff1f;其实&#xff0c;喝酒“上脸”并不意味着能喝&#xff0c;而是一种基因突…

PHP中如何配置smarty框架实现PHP代码和HTML代码分离

header(Cache-Control:Private);//保留用户填写的信息 session_start();//开启缓存 define(MYCMS,UTF-8);//定义网站编码常量 define(ROOT,str_replace(\\,/,realpath(dirname((__FILE__))./../)));//定义根目录常量 ../是返回上级目录 define(TPL,ROOT./tpl);//定义网页模板的…

Hook API (C++)

一、基本概念&#xff1a; 钩子(Hook)&#xff0c;是Windows消息处理机制的一个平台,应用程序可以在上面设置子程以监视指定窗口的某种消息&#xff0c;而且 所监视的窗口可以是其他进程所创建的。当消息到达后&#xff0c;在目标窗口处理函数之前处理它。钩子机制允许应用程序…

本科、硕士、博士的区别(终极版)

全世界只有3.14 % 的人关注了爆炸吧知识本文转自募格学术本科生和研究生到底有何区别&#xff1f;硕士和博士又有什么不同&#xff1f;这是很多人都有的困惑&#xff0c;对于这个问题的说法也有很多版本&#xff0c;我们挑选了几个比较经典的版本&#xff0c;以期能和大家一同探…

C# Jpush 极光推送消息推送

简介消息推送&#xff08;Push&#xff09;指运营人员通过自己的产品或第三方工具对用户移动设备进行的主动消息推送。用户可以在移动设备锁定屏幕和通知栏看到push消息通知&#xff0c;通知栏点击可唤起APP并去往相应页面。我们平时在锁屏上看到的微信消息等等都属于APP消息推…

Linux 环境变量 $PATH

我们知道查阅文件属性的指令 ls 完整文件名为&#xff1a;/bin/ls(这是绝对路径)&#xff0c;那为什么可以在任何地方执行/bin/ls 这个指令呢&#xff1f; 为什么在任何目录下输入 ls 就一定可以显示出一些讯息而不会说找不到该 /bin/ls 指令呢&#xff1f; 这是因为环境变量 …

文件项目SVN+TortoiseSVN+Subclipse使用总结

近来使用开辟的过程中涌现了一个小问题&#xff0c;顺便记录一下原因和方法--文件项目 一、SVN、TortoiseSVN、Subclipse分析 团队开辟技术&#xff1a; (1)单元测试&#xff1b;(2)版本控制&#xff1b; (3)项目主动化&#xff1b; SCM:软件配置管理&#xff0c;包含SVN&#…

PHP中常见的五种设计模式

设计模式只是为 Java架构师准备的 — 至少您可能一直这样认为。实际上&#xff0c;设计模式对于每个人都非常有用。如果这些工具不是 “架构太空人” 的专利&#xff0c;那么它们又是什么&#xff1f;为什么说它们在 PHP 应用程序中非常有用&#xff1f;本文解释了这些问题。 设…

Java常用类集接口以及实现方式总结

最近学习map-reduce原理以及map-reduce编程&#xff0c;于是顺带着学习下Java编程&#xff0c;对于Java常用的数据结构和类集&#xff0c;我总结到mind图中&#xff0c;便于理清相互之间的关系 package leiji; import java.util.ArrayList; import java.util.List; import java…

Android之Launcher分析和修改1——Launcher默认界面配置(default_workspace)

www.cnblogs.com/mythou/p/3153880.html 最近工作都在修改Launcher&#xff0c;所以打算把分析源码和修改源码的过程记录下来&#xff0c;最近会写一些关于Launcher的分析和修改博文。因为我是修改4.0.3的Launcher&#xff0c;所以后面文章里面的Launcher都是基于Android4.0.…

在 .NET Core 中如何让 Entity Framework Core 在日志中记录由 LINQ 生成的SQL语句

在开发中&#xff0c;我们想在调试中查看EF Core执行的sql语句&#xff0c;可以使用SQL Studio Manager Tools工具&#xff0c;另一种方式是使用EF Core提供的日志。在ASP.NET Core使用Entity Framework Core的日志.早在Entity Framework Core1.0 ,使用相关的ILoggerProvider I…

如果你喜欢上了一个程序员小伙 献给所有程序员女友(来自ITeye博客的文章 作者:talent2012)...

程序员向来是善于幽默自嘲的群体&#xff0c;但从某种程度上影响了咱程序员在广大女同胞心中的印象啊&#xff5e;&#xff5e; 于是写下此篇&#xff08;有从别处看到的3句加进来的&#xff09;&#xff0c; 就算是为咱程序员做个广告&#xff5e;&#xff5e;要是觉得有点过的…

豆瓣9.6分!这部BBC的纪录片太让人震撼!

全世界只有3.14 % 的人关注了爆炸吧知识英国广播公司BBC的纪录片素来就是高质量的代名词&#xff0c;推出的《地球无限》(Planet Earth)、《地球的力量》(Earth The Power of the Planet)、《冷血生命》(Life In Cold Blood)等片不仅在英国播放时获得极高收视&#xff0c;还获得…

vim-snipmate编写snippet的语法

vim-snipmate真的很好用&#xff0c;以前好多编写代码的问题得到完美的解决。还附带提升我对vim的理解和信心&#xff0c;在这里感谢一下作者。thank you。 1、现说一下我浓缩的重要语法。 1、定义是下面这样&#xff0c;注意中间必须是一个制表符<TAB>不能用空格代替。 …

Android之Launcher分析和修改2——Icon修改、界面布局调整、壁纸设置

上一篇文章说了如何修改Android自带Launcher2的默认界面设置&#xff08;http://www.cnblogs.com/mythou/p/3153880.html&#xff09;。 今天主要是说说Launcher里面图标、布局、壁纸等的设置问题。毕竟我们一般修改Launcher&#xff0c;这些都是需要修改的地方&#xff0c;也是…

捷径 - The certain shortcut

赛斯高汀(Seth Godin)的博客&#xff1a;http://sethgodin.typepad.com/seths_blog/2013/05/the-certain-shortcut.html The shortcut thats sure to work, every time: 所谓的捷径向来如此&#xff1a; Take the long way. 脚踏实地&#xff0c;一步一个脚印(遍历) Do t…

高效率三大法则总结

原则一&#xff1a;专注 之前经常听到很多人所谓的牛人&#xff0c;说什么边干活的时候可以开着msn、qq&#xff0c;工作和娱乐两不误&#xff0c;边听下属汇报的同时边写下午会议的发言稿&#xff0c;然而科学事实证明人类的大脑硬件结构决定了根本不可能一脑两用&#xff0c;…

N 年沉淀,腾讯这套系统终于开源!

大家好&#xff0c;我是鱼皮&#xff0c;前段时间给大家介绍了字节跳动开源的两套设计系统&#xff0c;分别是 Arco Design 和抖音 Semi Design。而就在几天前&#xff0c;腾讯终于也开源了自家的设计系统 TDesgin &#xff01;这次&#xff0c;终于能介绍自己公司的项目了。如…

在php中使用sockets:从新闻组中获取文章

PHP能打开远程或本地主机上的Socket端口。本文是一个使用Socket的小例子&#xff1a;连 接到一个Usenet新闻组服务器&#xff0c;同服务器对话&#xff0c;从新闻组中下载一些文章。在php中打开一个socket 使用fsockopen()打开一个socket.这个函数在php3和php4种都可以使用。函…

Android之Launcher分析和修改3——Launcher启动和初始化

前面两篇文章都是写有关Launcher配置文件的修改&#xff0c;代码方面涉及不多&#xff0c;今天开始进入Launcher代码分析。 我们开机启动Launcher&#xff0c;Launcher是由Activity Manager启动的&#xff0c;而Activity Manager是由system server启动。 原创博文&#xff0c…