Executor框架
jdk5开始,把工作单元与执行机制分离开来,工作单元包括Runable和Callable,执行机制由Executor框架来提供。
Executor框架简介
Executor框架的两级调度模型
- Java线程被一对一映射为本地操作系统线程
- java线程启动会创建一个本地操作系统线程
- java线程终止操作系统线程也会被回收
- 操作系统会调度所有线程并将它们分配给可用的cpu
- 在上层,java多线程程序通常把应用分解为若干任务,然后使用用户级调度器将这些任务映射为固定数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上
- 应用通过Executor框架控制上层调度,下层由操作系统内核控制,下层调度不受应用程序控制。
Executor框架的结构与成员
Executor框架结构
-
主要由任务,任务执行和异步计算结果组成
- 任务:包括被执行任务需要实现的接口:Runnable和Callable
- 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorServie接口。(ThreadPoolExecutor和ScheduledThreadPoolExecutor两个关键类实现了)
- 异步计算的结果:包括接口Future和实现Future接口的FutureTask类
-
类和接口
-
- Executor是一个接口,是Executor框架的基础,将任务的提交与任务的执行分离开来
- ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
- ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。比Timer更灵活,功能更强大。
- Future接口和实现Future接口的FutureTask类,代表异步计算的结果
- Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行
-
使用:
- 主线程创建实现Runnable或Callable接口的任务对象,工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result))
- 可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command)),或者吧Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task))
- 执行ExecutorService.submit(…),将会返回一个实现Future接口的对象,也可以创建FutureTask然后直接交给ExecutorService执行
- 主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel来取消此任务
Executor框架的成员
-
ThreadPoolExecutor:通常使用工厂类Executors创建
-
FixedThreadPool:创建固定线程数的FixedThreadPool
-
适用于为了满足资源管理的需求,需要限制当前线程数量的应用场景,适用于负载比较重的服务器
-
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
-
-
SingleThreadExecutor:创建单个线程
-
适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的应用场景
public static ExecutorService newSingleThreadExecutor() public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
-
-
CachedThreadPool:创建一个会根据需要创建新线程
-
大小无界的线程池,适用于执行很多短期异步任务的小程序,或负载教轻的服务器
-
public static ExecutorService newCachedThreadPool() public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
-
-
-
ScheduledThreadPoolExecutor:通常使用工厂类Executors创建
-
创建ScheduledThreadPoolExecutor:包含若干线程的ScheduledThreadPoolExecutor
-
适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求需要限制后台线程数量的应用场景
-
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
-
-
创建SingleThreadScheduledExecutor:包含一个线程的ScheduledThreadPoolExecutor
-
适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景
-
public static ScheduledExecutorService newSingleThreadScheduledExecutor(int corePoolSize) public static ScheduledExecutorService newSingleThreadScheduledExecutor(int corePoolSize, ThreadFactory threadFactory)
-
-
-
Future接口
-
Future接口和Future接口的FutureTask类用来表示异步计算的结果
-
把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutors或ScheduledThreadPoolExecutors时,会返回给一个FutureTask对象
-
<T> Future<T> submit(Callable<T> task) <T> Future<T> submit(Runnable task, T result) Future<> submit(Runnable task)
-
-
Runnable接口和Callable接口
-
Runnable不会返回结果,Callable会返回结果
-
可以把Runnable包装成Callable
-
public static Callable<Object> callable(Runnable task)
-
可以把Runnable和一个待返回的结果包装成一个Callable
-
public static <T> Callable<T> callable(Runnable task, T result)
-
把callable对象提交给执行时,submit会返回一个FutureTask对象
- 执行get方法等待任务执行完成,任务执行完成后get方法将返回该任务的结果
-
ThreadPoolExecutor详解
构成组件
- corePool:核心线程池大小
- maximumPool:最大线程池大小
- BlockingQueue:用来暂时保存任务的工作队列
- RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或者饱和时,execute方法将要调用的Handler
创建类型
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
FixedThreadPool详解
固定线程数线程池
public static ExecutorService newFixedThreadPool(int nThreads){return new ThreadPoolExecutor(nThreads,nThreads, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
- corePoolSize 和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数
- 当啊线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止
- keepAliveTime设置为0,多余空闲线程会被立即终止
流程
- 如果当前运行线程数少于corePoolSize,则创建新线程来执行任务
- 在线程池完成预热后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
- 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务执行
- 使用无界队列LinkedBlockingQueue作为线程池工作队列,使用无界队列作为工作队列会对线程池带来影响
- 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
- 由于上条,使用无界队列时,maximumPoolSize将是一个无效参数
- 由于上条和上上条,使用无界队列时keepAliveTIme将是一个无效参数
- 由于使用无界队列,运行中的FixedThreadPool不会拒绝任务
SingleThreadExecutor详解
使用单个worker线程的Executor
public static ExecutorService newSingleThreadExecutor(){return new ThreadPoolExecutor(1,1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
- corePoolSize 和maximumPoolSize都被设置为1
- keepAliveTime设置为0,多余空闲线程会被立即终止
- 使用无界队列LinkedBlockingQueue作为线程池的工作队列
工作流程
- 如果当前运行的线程数少于corePoolSize,则创建一个新线程来执行任务
- 在线程池完成预热后,将任务加入LinkedBlockingQueue
- 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行
CachedThreadPool详解
根据需要创建新线程的线程池
public static ExecutorService newCachedThreadPool(int nThreads){return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
}
- corePoolSize设置为0
- maximumPoolSize被设置为Integer.MAX_VALUE
- keepAliveTime设置为60L,线程池中的空闲线程等待新任务的最长时间是60秒,空闲线程超过60秒将会被终止
- 使用没有容量的SynchronousQueue作为线程池的工作队列
- 如果主线程提交的任务速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程,极端情况下,会因为创建过多线程为耗尽cpu和内存资源
流程:
- 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正则执行SynchronousQueue.poll,那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行步骤2
- 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll.这种情况下,步骤1 将失败,此时CachedThreadPool会创建一个新线程执行任务,execute方法执行完成
- 在步骤2中新创建将任务执行完后,会执行SynchronousQueue.poll,这个poll操作会让空闲线程最多在SynchronousQueue中等待60s,如果60s内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务,否则这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
ScheduledThreadPoolExecutor详解
- 继承自ThreadPoolExecutor
- 主要用来在给定的延迟之后运行任务,会定期执行任务。
- 可以在构造函数中指定多个对应的后台线程数
ScheduledThreadPoolExecutor运行机制
- DelayQueue是一个无界队列。
主要两部分
- 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法或者scheduleWithFixedDelay方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
- 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下修改
- 使用DelayQueue作为任务队列
- 获取任务的方式不同
- 执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor的实现
ScheduledFutureTask成员变量:
- long time:表示这个任务将要被执行的具体时间
- long sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
- long period:表示任务执行的间隔周期
DelayQueue封装了一个优先级队列,这个优先级队列会将队列的任务根据time排列,小的在前,如果time相同,比较sequenceNumber,小的在前
ScheduledThreadPoolExecutor执行某个周期任务步骤
- 线程从DelayQueue中获取已到期的ScheduledFutureTask,到期任务是指ScheduledFutureTask的time大于等于当前时间
- 获取Lock
- 获取周期任务
- 如果PriorityQueue为空,当前线程到Condition中等待
- 如果PriorityQueue的头元素的time时间比当前时间大,到condition中等待到time时间
- 获取PriorityQueue的头元素,如果不为空,则唤醒condition中等待的所有线程
- 释放Lock
- 线程执行ScheduledFutureTask
- 线程修改ScheduledFutureTask的time后边变为下次将要被执行的时间
- 线程把这个修改time之后的ScheduledFutureTask放回到DelayQueue中
- 获取Lock
- 添加任务
- 向PriorityQueue添加任务
- 如果添加的任务是头元素,唤醒Condition中等待的所有线程
- 释放Lock
FutureTask详解
Future接口和实现Future接口的FutureTask类,代表异步计算的结果
简介
- FutureTask除了实现Future接口外,还实现了Runnable接口。
- FutureTask可以交给Executor执行,也可以由调用线程直接执行
- FutureTask可以处于3种状态
- 未启动:run方法还没有被执行前
- 执行FutureTask.get将导致调用线程阻塞
- 执行FutureTask.cancel将导致此任务永远不会被执行
- 已启动:run方法被执行过程中
- 执行FutureTask.get将导致调用线程阻塞
- 执行FutureTask.cancel(true)将以中断执行此任务线程的方式来试图停止任务
- 执行FutureTask.cancel(false)将不会对正在执行此任务的线程产生影响
- 已完成:run方法执行完后正常结束,或被取消,或执行run方法时抛出异常而异常结束
- 执行FutureTask.get将导致调用线程立即返回结果或抛出异常
- 执行FutureTask.cancel()返回false
- 未启动:run方法还没有被执行前
使用
- 可以把FutureTask交给Executor执行
- 可以通过ExecutorService.submit返回一个FutureTask,然后执行FutureTask.get或cancel方法
- 也可以单独使用FutureTask
当一个线程需要等待另一个线程把某个任务执行完后才能执行,此时可以使用FutureTask
实现
-
基于AQS实现,包含两种类型操作
- 至少一个acquire操作,阻塞调用线程,除非直到AQS状态允许这线程继续执行,FutureTask的acquire操作为get方法调用
- 至少一个release操作,这个操作改变AQS的状态,改变后的状态可以允许一个或多个阻塞线程被解除阻塞,FutureTask的release操作包括run和cancel
-
Sync是FutureTask的内部私有类,继承自AQS,FutureTask的所有公有方法都直接委托给了内部私有Sync
-
FutureTask.get方法会用AQS的acquireSharedInterruptibly方法,执行过程
- 调用AQS的acquireSharedInterryptibly方法,
- 回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功
- 成功条件:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null
- 回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功
- 如果成功则get方法立即返回,如果失败则到线程等待队列中去等待其他线程执行release操作
- 当其他线程执行release操作唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程
- 最后返回计算的结果或抛出异常
FutureTask.run方法
- 执行在构造函数中指定的任务(Callable.call)
- 以原子方式来更新同步,如果这个原则操作成功,就设置代表计算结果的变量result的值为callable.call的返回值,然后调用AQS.releaseShared
- AQS.releaseShared首先会回调在子类Sync中实现的tryReleaseShared来执行release操作,AQS.releaseShared,然后唤醒线程等待队列中的第一个线程
- 调用FutureTask.done1
- 调用AQS的acquireSharedInterryptibly方法,