定时器
就是需要周期性的执行任务,也叫调度任务,在JDK中有个类Timer是支持周期性执行,但是这个类不建议使用了。
ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor线程池,在Executors默认创建了两种:
newSingleThreadScheduledExecutor:只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务。
newScheduledThreadPool: 可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候。
方法:
schedule:只执行一次,任务还可以延时执行
scheduleAtFixedRate:提交固定时间间隔的任务
scheduleWithFixedDelay:提交固定延时间隔执行的任务
两者的区别:间隔的时间定义不一样
建议在提交给ScheduledThreadPoolExecutor的任务要住catch异常。否则不能周期性执行任务。
基本原理
在之前将BlockingQueue<T>的时候有个叫DelayQueue<E extends Delayed>堵塞队列,这个就是实现延迟执行,在ScheduledThreadPoolExecutor实现时间间隔执行的原理与DelayQueue原理差不多
在ScheduledThreadPoolExecutor中有个静态类DelayedWorkQueue,该类也是一个延时队列。
构造方法是调用了父类构造,将队列换成了延时队列DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), handler);}
在ThreadPoolExecutor谈到了获取队列中Runnable对象即执行take方法,看一下DelayedWorkQueue的take()方法,在延时是调用了Condition的awaitNanos()方法进行延时执行,
private final Condition available = lock.newCondition(); public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}
周期性执行是在执行完后会再次将当前任务放入到线程池中,再次等待延时执行。
//在调用scheduleAtFixedRate()方法是会调用delayedExecute方法将当前Runnable对象添加到队列当中
//等待执行
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}}
//在ScheduledFutureTask中,当线程池调用好了Runnable对象的run方法的时候,会调用reExecutePeriodic()方法将任务再次放入到线程池中,所以如果在执行报错了,那么就不会放入到线程池中,/*** Overrides FutureTask version so as to reset/requeue if periodic.*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}
CompletionService
这个类了解一下就好了,在使用Future并发执行的时候一般都是将多个Future对象用数组或集合保存起来,然后在循环数组或集合调用get方法获取结果集,但是如果使用了CompletionService会将率先执行的结果集获取到,就是利用了堵塞队列原理实现的这种效果
用法:
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);CompletionService<String> completionService = new ExecutorCompletionService<String>(newFixedThreadPool);completionService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {return null;}});completionService.take().get();