文章目录
- 概述
- ThreadPoolExecutor
- ThreadPoolExecutor 的主要属性
- Worker 主要属性
- 线程池的状态
- 线程池的状态流转
- 线程池提交任务的执行流程
- 线程数量的设置
- 线程池的种类
- FixedThreadPool
- CachedThreadPool
- SingleThreadExecutor
- ScheduledThreadPoolExecutor
- SingleThreadScheduledExecutor
- ForkJoinPool
- work-stealing
- ForkJoinPool 注意事项
概述
线程池,是资源池化思想的一种实现。线程是一种宝贵且有限的 CPU
资源,一个线程的创建跟销毁的成本是比较高的。
所以创建线程池,主要有以下两个目的:
- 复用线程:单个线程创建使用完毕后,可以不用立马销毁,而是把这个线程放入到线程池中,等待下次执行任务
- 管理线程:线程是一种宝贵且有限的
CPU
资源,其数量并不是无上限的- 可以通过线程池来限制创建线程的数量,也可以通过线程池来决定何时销毁冗余创建的线程
- 在线程池中存在多个线程时,可以通过线程池来协调每个线程的任务执行情况,从而避免出现线程处于空闲状态,造成线程资源的浪费
ThreadPoolExecutor
ThreadPoolExecutor
是一个通用的线程池的实现,类图如下所示:
可以看到,ThreadPoolExecutor
主要实现了 Executor
、ExecutorService
、AbstractExecutorService
ThreadPoolExecutor 的主要属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/private volatile RejectedExecutionHandler handler;/*** Timeout in nanoseconds for idle threads waiting for work.* Threads use this timeout when there are more than corePoolSize* present or if allowCoreThreadTimeOut. Otherwise they wait* forever for new work.*/private volatile long keepAliveTime;/*** If false (default), core threads stay alive even when idle.* If true, core threads use keepAliveTime to time out waiting* for work.*/private volatile boolean allowCoreThreadTimeOut;/*** Core pool size is the minimum number of workers to keep alive* (and not allow to time out etc) unless allowCoreThreadTimeOut* is set, in which case the minimum is zero.*/private volatile int corePoolSize;/*** Maximum pool size. Note that the actual maximum is internally* bounded by CAPACITY.*/private volatile int maximumPoolSize;/*** The default rejected execution handler*/private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
- 内置一个
AtomicInteger
,高3
位用于表示线程池当前状态,低29
位用于表示线程个数 - 内置一个阻塞队列,用于核心线程都处于执行状态后保存任务
- 内置一个
ReentrantLock
独占锁,以及一个由锁对象创建出来的条件对象Condition termination
- 一个去重集合
HashSet<Worker> workers
,用于保存线程 - 历史出现过的最多线程数量
int largestPoolSize
- 已完成的任务数量
long completedTaskCount
- 由
volatile
关键字修饰的线程组ThreadFactory threadFactory
- 由
volatile
关键字修饰的拒绝策略RejectedExecutionHandler handler
- 由
volatile
关键字修饰的空闲线程最大存活时间long keepAliveTime
- 由
volatile
关键字修饰的是否允许核心线程超时boolean allowCoreThreadTimeOut
- 由
volatile
关键字修饰的核心线程数量int corePoolSize
- 由
volatile
关键字修饰的最大线程数量int maximumPoolSize
- 默认的拒绝策略
new AbortPolicy()
,即默认的拒绝策略是抛出异常
Worker 主要属性
Worker
即工作线程。Worker
继承了 AQS
并实现了 Runnable
接口,主要有以下几个属性:
Thread thread
:线程对象Runnable firstTask
:首次执行的任务
线程池的状态
线程池一共有 5
个状态,分别是:
RUNNING
:运行状态,接受新任务并且处理阻塞队列里的任务SHUTDOWN
:标记关闭状态,拒绝新任务,但是处理正在执行和阻塞队列里的任务STOP
:关闭状态,拒绝新任务并且清空阻塞队列,同时会中断所有线程TIDYING
:清场状态,线程池和阻塞队列都为空,将要调用terminated()
方法TERMINATED
:终止状态。这是当terminated()
方法调用完成之后的状态
线程池的状态流转
- 线程池一旦被创建,就是
RUNNING
状态,可以正常地执行任务,以及接受新任务 - 在
RUNNING
状态调用shutdown()
方法后,线程池就会切换到SHUTDOWN
状态,标记关闭状态下- 遍历线程池,当前所有空闲线程都会被调用
interrupt()
方法设置中断( 被设置中断的线程将会从线程池中移除) - 不再接受新任务,新提交的任务将直接丢弃
- 当前正在执行任务的线程把正在执行的任务继续完成,且这些线程将会继续从阻塞队列中获取任务执行
- 遍历线程池,当前所有空闲线程都会被调用
- 在
RUNNING
或者SHUTDOWN
状态调用shutdownNow()
方法后,线程池就会切换到STOP
状态- 不再接受新任务
- 遍历线程池,调用每个线程的
interrupt()
方法设置中断 - 遍历阻塞队列,移除所有任务
- 在
SHUTDOWN
或者STOP
状态持续一段时间后,当线程池中没有线程,并且阻塞队列也为空后,线程池将会切换到TIDYING
状态 TIDYING
状态将会做最后的收尾工作,确保所有资源都被释放
线程池提交任务的执行流程
- 当有新任务提交时,判断当前核心线程数是否小于最大核心线程数
- 如果小于最大核心线程数,则创建一个新线程执行任务
- 如果大于核心线程数,则尝试将任务提交到阻塞队列中
- 如果阻塞队列未满,则将任务提交到阻塞队列中
- 如果阻塞队列已满,则尝试开启新线程
- 如果当前线程数量小于最大线程数,则创建一个新线程执行任务
- 如果当前线程数量等于最大线程数,则执行拒绝策略
AbortPolicy
:直接抛出异常CallerRunsPolicy
:由提交任务的线程执行DiscardPolicy
:直接丢弃任务DiscardOldestPolicy
:将阻塞队列队头的任务取出并丢弃,然后重试提交
线程数量的设置
线程数量的经验计算公式为:
线程数 = CPU 核心数 *(1 + 平均等待时间 / 平均工作时间)
线程池的种类
FixedThreadPool
FixedThreadPool
,即固定数量线程池,特点是核心线程数量 = 最大线程数量,且阻塞队列容量较大甚至无界。
- 固定数量,意味着线程不会频繁地创建和销毁,在线程数量达到核心线程数量后,在执行任务过程中没有发生异常的情况下,线程数量将不会再发生变化
- 阻塞队列容量非常大,代表着任务可以被无条件地,一直不断地添加进来,可能会引发
GC
问题- 当任务的执行速度远小于任务的添加速度时,可能会导致阻塞队列中的节点数量特别巨大,吃光堆内存空间,导致
OOM
- 当任务的执行速度远小于任务的添加速度时,可能会导致阻塞队列中的节点数量特别巨大,吃光堆内存空间,导致
FixedThreadExecutor
适用于计算密集型任务, 确保 CPU
在长时间被单个工作线程使用的情况下, 尽可能少地创建、分配、销毁线程, 即适用于长期且数量可控的任务。
CachedThreadPool
CachedThreadPool
,即缓存线程池,特点是
- 核心线程数量为
0
,意味着在线程池空闲时,不会占用任何的线程资源 - 最大线程数非常巨大,意味着可以创建非常多的线程,可能会导致系统线程资源耗尽的问题
- 线程有较短的最大存活时间(如
60s
),意味着每个工作线程的存活时间比较短 - 阻塞队列不存储元素,意味着每次有新任务提交时,要么可以立马被一个空闲线程执行,要么可以立马创建一个新线程执行去执行
CachedThreadPool
适用于存在数量多且耗时少的任务场景,由于未限制线程最大数量,在任务的执行速度远小于任务的添加速度时,有可能会导致系统线程资源耗尽或引发 OOM
。
SingleThreadExecutor
SingleThreadExecutor
,即单线程线程池,特点是:
- 核心线程数量 = 最大线程数量 =
1
,意味着线程池中最多只会有一个线程,意味着所有提交的任务都会被串行化执行,任意时刻不会有同时两个任务在被同时执行 - 阻塞队列容量非常大甚至无界,代表着任务可以被无条件地,一直不断地添加进来,可能会引发
GC
问题
SingleThreadExecutor
适用于多个任务需要串行化执行的场景,由于阻塞队列无界,还是有可能引发 OOM
。
注意,Executors.newSingleThreadExecutor()
并不等价于 Executors.newFixedThreadPool(1)
,Executors
中的源码为:
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
可以看到,newSingleThreadExecutor()
方法创建出来的 ThreadPoolExecutor
对象被包了一层 FinalizableDelegatedExecutorService
,所以这两者不等价
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
,是定时任务线程池,可以使用它来实现定时任务,主要的构造方法为:
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
可以看到,ScheduledThreadPoolExecutor
的特点是:
- 最大线程数无界
- 空闲线程的最大存活时间为
0
,即在线程池不会存储空闲线程 - 阻塞队列
DelayedWorkQueue
是一个底层数据结构为堆的延迟阻塞队列,无界
SingleThreadScheduledExecutor
SingleThreadScheduledExecutor
即单线程定时任务线程池,可以保证所有定时任务都会由一个线程串行执行
ForkJoinPool
ForkJoinPool
,是一个用于管理 ForkJoinWrokerThread
的线程池,ForkJoin
是一种基于分治思想的并发编程框架,通过将任务分解后使用多个线程并行计算,最后合并所有子任务的计算结果得到最终的计算结果。
ForkJoinPool
线程池可以把一个大任务拆分成小任务并行执行,但是任务类必须继承自 RecursiveTask
或者 RecursiveAction
work-stealing
工作窃取算法(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。主要实现的思路有一下几个关键点:
- 每个工作者线程都会对应一个双端任务队列
- 当某个工作者线程自己的任务队列中的任务执行完毕之后,将会从其他工作者线程的任务队列里窃取任务出来执行
- 工作者线程从自己的任务队列的头部来获取任务(
removeFirst
) - 工作者线程执行任务窃取时,将从其他工作者线程的任务队列的尾部来获取任务(
removeLast
)
ForkJoinPool 注意事项
- 任务类必须继承自
RecursiveTask
或者RecursiveAction
- 子任务间没有相互依赖
- 任务最好不要包含阻塞
IO
操作