文章目录
- Java 线程池概述
- ThreadPoolExecutor 构造方法
- 线程池拒绝策略
- 工作流程
- 并发库中的线程池
- CachedThreadPool
- FixedThreadPool
- SingleThreadExecutor
- ScheduledThreadPool
- ThreadPoolExecutor 源码分析
- 线程池状态表示
- 获取 runState
- 获取 workerCount
- 生成 ctl
- 提交任务 execute()
- 为什么需要二次检查
- 创建工作线程 addWorker()
- 工作线程 Worker
- 主逻辑 runWorker
- 获取任务 getTask()
- 工作线程的退出
- RUNNING 状态所有任务执行完成
- shutdown 关闭线程池
- 所有线程等待新任务
- 所有线程繁忙
- 队列中剩余少量的任务
- 写在最后
Java 线程池概述
Java 语言中创建线程看上去就像创建一个对象,仅仅调用 new Thread()
即可,但实际上创建线程比创建对象复杂得多。创建对象仅仅是在 JVM 的堆里分配一块内存,而创建线程,需要 调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本很高。因此,线程是一个重量级的对象,应该避免频繁的创建和销毁。
线程池没有采用一般的池化资源设计方法(例如:连接池、对象池),因为我们无法获取一个启动的 Thread 对象,然后动态地将需要执行的任务 Runnable task
提交给线程执行。目前业界地线程池设计,普遍采用生产者-消费者模型,线程池的使用方为 Producer,而线程池中的 工作线程为 Consumer。
ThreadPoolExecutor 构造方法
Java 提供的线程池相关的工具类中,最核心的是ThreadPoolExecutor。ThreadPoolExecutor 的构造函数如下:
corePoolSize
:线程池保有的最小线程数(核心线程数)。如果线程池中的线程数小于 corePoolSize,提交任务时会创建一个核心线程,该任务作为新创建的核心线程第一个执行的任务。maximumPoolSize
:最大线程数。如果提交任务时任务队列已经满了,且当前工作线程数小于 maximumPoolSize,会创建新的工作线程用于执行该任务;反之如果工作线程数大于等于 maximumPoolSize,则执行拒绝策略。keepAliveTime & unit
:一个线程如果在时间单位为 unit 的 keepAliveTime 时间内没有执行任务,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收。workQueue
:任务队列,为 BlockingQueue 实现类。threadFactory
:线程工厂,通过该参数可以自定义如何创建线程。ThreadFactory 是一个接口,里面是有一个newThread
方法等待实现:Thread newThread(Runnable r);//接口方法默认为public abstract
handler
:任务的拒绝策略,如果线程池中 所有的线程都在忙碌,且任务队列已经满了(前提是任务队列是有界队列),此时提交任务线程池会拒绝执行。决绝的策略可以通过该参数指定。
温馨提示:线程池的静态工厂类 Executors 提供了很多开箱即用的线程池,可以帮助快速创建线程池,但提供的线程池很多使用的是 无界队列 LinkedBlockingQueue,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理。
在阅读完本节后我们知道,在生产环境中使用线程池时需要设置 ThreadPoolExecutor 构造方法的 workQueue 参数为 ArrayBlockingQueue
等有界阻塞队列。
线程池拒绝策略
上一小节提到,构造方法中的 RejectedExecutionHandler handler
参数可以用于自定义任务拒绝策略。ThreadPoolExecutor 已经提供了 4 种拒绝策略:
-
CallerRunsPolicy:提交任务的线程自己去执行该任务。
-
AbortPolicy:默认的拒绝策略,会抛出 RejectedExecutionException。
-
DiscardPolicy:直接丢弃任务,没有任何异常抛出。
-
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
默认拒绝策略为 AbortPolicy,该拒绝策略抛出 RejectedExecutionException 为运行时异常,编译器不会强制 catch,开发人员可能会忽略,因此默认拒绝策略要慎重使用。
如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中, 自定义的拒绝策略往往和 降级策略 配合使用。
例如:将任务信息插入数据库或者消息队列,配置 XXL-JOB 定时任务扫描失败任务表,将执行失败的任务交给专用于补偿的线程池去进行补偿。
工作流程
线程池中有几个重要的概念:核心线程池(CorePool)、**空闲线程池(IdlePool)**以及 任务队列。下图为我绘制的线程池工作流程图,包含上述三个概念模型,cpSize 核心线程池中当前的线程数、cpCap 核心线程池容量、ipSize 空闲线程池中当前线程数。
我来简述下提交任务 task 时,线程池的执行流程:
-
如果核心线程池未满,即 cpSize 小于 cpCap,通过线程工厂 创建一个核心线程,将 task 作为新线程的第一个任务。
-
如果 核心线程池已满,但是任务队列仍然有空间,将 task 添加到任务队列。核心线程在执行完手头的任务后,会从任务队列中获取新的任务,继续执行。如果任务队列为空,核心线程会阻塞在任务获取阶段,直到有 新的任务提交到任务队列。
-
如果任务队列已满,则创建空闲线程,并将 task 作为第一个执行的任务。空闲线程如果执行完手头的任务,也会从任务队列中获取新的任务。
如果任务队列为空,空闲线程会阻塞,直到 超出 keepalive 设定的时间 或 获取到新的任务执行。如果等待新任务超时,空闲线程的生命周期就会结束了。 -
如果空闲线程数+核心线程数已经达到了 maximumPoolSize,创建新线程的方法会失败,此时提交的任务将被拒绝,拒绝策略由 RejectedHandler 负责执行。
并发库中的线程池
java.util.concurrent.Executors
提供了通用线程池创建方法,去创建不同配置的线程池,该工具类目前提供了五种不同的线程池创建配置:
CachedThreadPool
CachedThreadPool 是一种用来 处理大量短时间工作任务的线程池,会在先前构建的线程可用时重用已创建的工作线程,但是当工作线程空闲超过 60s,则会从线程池中移除。
任务队列为 SynchronousQueue,它是一个不存储元素的阻塞队列(容量 0),提交任务的操作必须等待工作线程的移除操作,反之亦然。
为什么使用 SynchronousQueue 作为任务队列?
个人想法:线程池的工作逻辑是,提交任务时如果 核心线程数达到 corePoolSize 且任务队列已满,则会创建空闲线程执行。因为 SynchronousQueue 容量为 0 天然是满的,且 corePoolSize 被设置为 0,这意味着创建任务时如果没有可用线程,就会立即创建一个新线程来处理任务。
这使得 CachedThreadPool 在执行大量短期异步任务时更加高效,避免了任务对线程资源的等待,符合设计初衷:快速执行大量的短暂任务。
FixedThreadPool
核心线程数和最大线程数相等,使用的是 无界任务队列 LinkedBlockingQueue。如果当前的工作线程数已经达到 nThreads,任务将被添加到任务队列中等待执行。如果有工作线程退出,下一次提交任务时将会有新的工作线程被创建来补足线程池。
SingleThreadExecutor
工作线程限制为 1 的 FixedThreadExecutor,它 保证了所有任务的都是被顺序执行。
ScheduledThreadPool
ScheduledThreadPoolExecutor 允许安排一个任务在延迟指定时间后 执行,还可以 周期性地执行任务。周期性调度任务有两种类型:固定延迟和固定频率。固定延迟 是在上一个任务结束和下一个任务开始之间保持固定的延迟,而 固定频率 是以固定的频率执行任务,不管任务的执行时间多长。
ScheduledThreadPoolExecutor 中定义了内部类 DelayedWorkQueue 作为任务队列,DelayedWorkQueue 是基于堆的数据结构。队列中的元素为 RunnableScheduledFuture 类型:
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
RunnableScheduledFuture 接口继承关系如下图所示:
- Delayed 接口继承了 Comparable 接口,
getDelay
方法返回任务剩余的延迟时间,返回值小于等于 0 说明延迟的时间已过;compareTo
方法用于比较任务下一次的执行时间,用于维护小顶堆属性(父节点任务的执行时间小于儿子节点)。 - RunnableFuture 接口的 run 方法定义了需要执行的任务逻辑,Future 接口用于获取异步任务的执行结果。
DelayedWorkQueue#take 方法用于获取下一个需要执行的定时任务,代码及详细注释如下:
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;// 上锁, 避免堆数据访问产生的数据竞争lock.lockInterruptibly();try {for (;;) {// 堆顶元素: Delayed#getDelay 延迟时间最小的任务RunnableScheduledFuture<?> first = queue[0];if (first == null)// 堆中任务空, 等待新任务入堆available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// delay小于等于0, 说明延迟时间已过, 可以执行;// finishPoll 弹出堆顶任务return finishPoll(first);first = null; // don't retain ref while waitingif (leader != null)// leader为等待堆顶任务到达执行时间的线程// leader 非空说明已经有线程正在等待堆顶任务可执行, 因此当前线程为 follower, 需要等待直到堆顶元素变更available.await();else {// 当前线程是等待堆顶元素的 leader 线程, 设置 leader 属性Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待任务延迟的时间available.awaitNanos(delay);} finally {// await期间会释放锁, leader可能因为新任务的加入而失效(当前线程可能等待的不再是堆顶任务)// 所以await超时后, 需要判断leader是否为当前线程, 为当前线程才能设为nullif (leader == thisThread)leader = null;}}}}} finally {// 任务队列非空, leader为空说明没有线程等待堆顶元素可执行, 此时唤醒 follower 线程, 尝试获取堆顶的任务if (leader == null && queue[0] != null)available.signal();lock.unlock();}
}
ThreadPoolExecutor 源码分析
线程池状态表示
ThreadPoolExecutor 最重要的状态参数为:线程池状态(rs) 以及 活跃线程数(wc)。ThreadPoolExecutor 使用一个 Integer 变量 ctl
存储这两个状态参数:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Integer 长度位 32 bits,ctl 中最高的三位 (29-31) 存储线程池状态,低 29 位 (0~28) 存储活跃线程数,因此线程池中活跃线程数理论上限为 2 29 − 1 2^{29}-1 229−1。
了解了 ThreadPoolExecutor 的这种设计之后,我们来看看状态相关的位运算:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
CAPACITY
表示线程池中活跃线程的理论上限 2 29 − 1 2^{29}-1 229−1,COUNT_BITS
表示线程数位数( 32 − 3 = 29 32-3=29 32−3=29)。
RUNNING 、SHUTDOWN、STOP、TIDYING、TERMINATED 为线程池的五种状态。根据代码,这五种状态表示如下图所示:
RUNNING
:可接收新的任务,并且处理队列中排队的任务;SHUTDOWN
:不接收新的任务,但会处理队列中剩下的任务;STOP
:不接收新任务,不处理队列中的任务,并且中断进行中的任务;TIDYING
:所有的任务终止,工作线程数 (workerCount) 等于 0;TERMINATED
:线程池关闭,terminated()
方法完成。
获取 runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
runStateOf
方法从 ctl 获取线程池运行状态,保留 ctl 的最高的三位,其余位设置为 0。以 STOP 状态、3 个活跃线程数的 ctl 为例,求 rs 的过程如下:
获取 workerCount
private static int workerCountOf(int c) { return c & CAPACITY; }
workerCountOf 获取线程池中的活跃线程数,即保留 ctl 的 0-28 位,将 29-31 位设置为 0。
生成 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctlOf
通过状态值和线程数值计算出 ctl,就是对 rs 和 wc 进行或运算,保留 wc 的 0-28 位和 rs 的 29-31 位。
提交任务 execute()
ThreadPoolExecutor#execute
方法用于提交任务给线程池执行,代码以及详细注释如下:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取线程池状态参数 ctlint c = ctl.get();// 如果活跃线程数小于核心线程池容量corePoolSize, addWorker创建新线程, 以command作为第一个任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;// 创建新线程失败, 更新 ctlc = ctl.get();}// 创建核心线程失败, 尝试将任务添加到任务队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 二次检查, 如果线程池不在运行状态, 需要回滚刚刚入队的任务if (!isRunning(recheck) && remove(command))// 移除任务成功, 执行拒绝策略reject(command);else if (workerCountOf(recheck) == 0)// 线程池为运行状态, 但是没有工作线程, 创建线程处理任务队列中的任务addWorker(null, false);}// 任务添加到队列失败, (1)线程池状态不是RUNNING状态 或 (2)任务队列已满// 尝试增加非核心线程, 执行 command 任务, 如果线程池不为RUNNING, addWorker会返回falseelse if (!addWorker(command, false))// 线程池不为RUNNING, 新增非核心线程失败, 执行任务拒绝策略reject(command);}
这段代码的主要逻辑很简洁:
- 当 wc 小于 corePoolSize 时,创建核心线程执行 command 任务;
- 如果核心线程数已满,则将任务缓存在任务队列中 (
workQueue.offer
),工作线程完成手头上的任务后,从任务队列中获取新任务。 - 如果任务队列也满了,offer 方法返回 false,尝试增加非核心线程执行 command。如果线程创建失败,
reject
执行任务拒绝策略。
除此之外,我想在本篇博客中探讨下 execute 方法的一些实现细节,并给出我自己的观点用于抛砖引玉。
为什么需要二次检查
大家请看下面这段有关二次检查的代码,在阅读源码时,我产生了疑问: 为什么需要二次检查 ?该操作 解决了什么场景下的数据竞争 ?
// ...
c = ctl.get(); // -------(1)
if (isRunning(c) && workQueue.offer(command)) { // -------(2)int recheck = ctl.get(); // -------(3)if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);
}
// ...
执行二次检查的前提是:
- 线程池在执行语句 (1) 的时候,是运行状态;
- 任务队列未满,command 被添加至队列,(2) 处的
offer
方法返回 true;
假设没有二次检查,会发生什么?
场景 1:在语句 (1) 后,语句(2) 执行前,线程池使用者调用了 shutdownNow
方法将线程池工作线程关闭,清空任务队列中的任务。时序图如下:
我们先来看看 shutdownNow 调用的 drainQueue 方法:
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}private List<Runnable> drainQueue() {BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();q.drainTo(taskList);if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;
}
drainQueue 方法用于移除任务队列 workQueue 中的 Runnable 任务,这些未执行的任务作为 shutdownNow 方法的返回值,通知方法调用者哪些任务未执行。
如果按照上图中的执行序列,线程池的状态已经为 STOP,任务队列也被清空,但是新提交的任务 command 却被添加到任务队列中。这导致这个新任务不会被运行、也不会执行拒绝策略、也无法通过 shutdownNow 返回的任务列表通知调用者。
这严重降低了线程池的健壮性,难以想象一个已提交的任务消失在线程池中!
场景 2:线程池处于运行状态,corePoolSize 设置为 0,阻塞队列的容量大于 0。
线程池刚启动时,提交任务 command 显然无法创建核心线程执行,任务会被缓冲在任务队列中,直到任务队列容量到达上限,线程池才会创建非核心线程执行任务。这导致 大量任务将不能及时被处理,甚至可能永远得不到执行!
场景示意图如下(图中任务队列容量为 4,corePoolSize 等于 0):
二次检查解决了上述两种场景的问题吗?当然!!
c = ctl.get(); // -------(1)
if (isRunning(c) && workQueue.offer(command)) { // -------(2)int recheck = ctl.get(); // -------(3)if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0) // -----(4)addWorker(null, false);
}
针对场景 1:
如果在语句 (1) 和 语句(2) 之间,shutdownNow 被调用并执行完成,然后语句 (2) 将新任务 command 加入任务队列。在语句 (3) 重新获取最新的 ctl,此时就能得知线程池的状态已经为 STOP,使用 remove 方法回滚入队的任务,并执行 reject 方法拒绝执行任务 。
针对场景 2:
如果线程池状态为 RUNNING,但因为线程中没有线程,语句(4) 判断为 true,创建非核心线程处理任务队列中的任务,防止异步任务长时间处于队列中得不到处理 的情况。
创建工作线程 addWorker()
addWorker 用于创建工作线程,我将其分为两部分分析:
- 第一部分:根据外层死循环判断 ThreadPoolExecutor 的运行状态 是否能够创建线程。如果可以创建线程,通过内层死循环 CAS 更新状态参数 ctl,直到更新成功或线程池状态发生改变。
第一部分的含详细注释的代码如下:
private boolean addWorker(Runnable firstTask, boolean core) {// retry为外层循环retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 仅 (1) RUNNING状态 或 (2) SHUTDOWN状态+队列中仍有任务+firstTask为空 时 创建工作线程// firstTask为空, 说明活跃线程数不满足线程池运行的最小数量if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// for内层循环for (;;) {int wc = workerCountOf(c);// 如果线程数达到容量上限, 不可创建新线程// 如果core为true, 线程数大于等于corePoolSize, 不能创建核心线程// 如果 core 为 false, 线程数大于等于 maximumPoolSize, 不可以创建非核心线程if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS更新 ctl, 如果成功, 则退出 retry 循环, 执行创建流程if (compareAndIncrementWorkerCount(c))break retry;// CAS更新失败, 重新读取 ctlc = ctl.get();if (runStateOf(c) != rs)// 状态发生改变, 重新执行大循环continue retry;// else: 线程数改变导致CAS失败, 继续for循环即可}}// ... 省略第二部分
}
- 第二部分:状态更新成功后,执行真正的线程创建逻辑,包括:工作线程添加至 Worker 集合、启动 Thread 对象。
第二部分详细代码注释如下:
private boolean addWorker(Runnable firstTask, boolean core) {// ... 省略第一部分boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 持有锁的情况下获取 ctl, 防止 shutdown、shutdownNow 导致的状态变更int rs = runStateOf(ctl.get());// 运行状态为 RUNNING或运行状态为 SHUTDOWN 且 firstTask为空 才允许启动工作线程if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 线程可能已经启动, 抛出异常(例如: 自定义的ThreadFactory#newThread 方法多次调用返回同一个 Thread 对象)if (t.isAlive())throw new IllegalThreadStateException();workers.add(w); // 添加到 HashSet 中int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// worker 成功添加到 workers 集合, 在这里真正启动工作线程t.start();workerStarted = true;}}} finally {if (! workerStarted)// 启动线程失败(可能线程已经启动 或 线程池状态发生改变), 将worker从workers中移除, 扣减 workerCountaddWorkerFailed(w);}return workerStarted;
}
大部分代码阅读注释即可了解原理,这里提一下我阅读时产生疑惑的地方:
疑惑一:firstTask
等于 null 代表什么?为什么判断能否创建线程时,处于 SHUTDOWN 状态还需要 firstTask 等于 null ?
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;
疑惑二:为什么需要在持有 mainLock 后,需要重新检查运行状态 rs
?
先来看疑惑一,firstTask 等于 null 出现的场景有:
- 预启动核心线程(所有包含
prestart
单词的方法)
public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize &&addWorker(null, true);
}public int prestartAllCoreThreads() {int n = 0;while (addWorker(null, true))++n;return n;
}
- 在工作线程退出时,替换死亡的工作线程。(
processWorkerExit
方法)
private void processWorkerExit(Worker w, boolean completedAbruptly) {// ...int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}
- 提交的新任务被缓冲在队列,但活跃线程数 workCount 等于 0。(
execute
方法)
在 addWorker 方法中,Running 状态可以创建工作线程,SHUTDOWN 状态仅可以在 firstTask 等于 null 的条件下创建线程。这符合 SHUTDOWN 状态的设计初衷:不接受新的任务、仅处理已添加至阻塞队列中的任务。
除了预启动场景,execute 场景和 processWorkerExit场景 均是为了确保已经添加到任务队列中的任务不被放弃,能够成功执行。
再来看疑惑二:为什么在持有 mainLock 的情况下获取运行状态 rs?
这是为了防止 shutdown、shutdownNow 方法关闭线程池,改变运行状态。
为了确保 shutdown 和 shutdownNow 方法执行时 worker 集合的稳定,从而保证方法执行过程的原子性,这两种方法都会 在持有 mainLock 的情况下,修改 runState。
因此,如果创建 worker 时 rs 发生了改变从而不应该增加工作线程,应该退出创建流程。(例如 RUNNING 变为 STOP 状态,此时不应该创建线程,因为任务都被丢弃了)。
mainLock.lock();int rs = runStateOf(ctl.get());// 确保运行状态 rs 可以创建新的线程if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// ...}
mainLock.unlock();
下面是我绘制的 addWorker 工作流程图,作为本小节的总结:
工作线程 Worker
ThreadPoolExecutor 中的线程资源被包装为 Worker 对象,它持有一个 Thread 对象,实现了 Runnable 接口,又继承了 AQS,因此也具有锁的性质。
需要指出的是,它没有利用 AQS 中的 CLH 队列管理等待资源的线程,因为 Worker 并 不存在多个线程争抢所有权,它的 lock 方法仅由内部持有的 线程调用。
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // AQS state 属性初始化为 -1this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 线程的执行逻辑就是 runWorker方法public void run() {runWorker(this);}// runWorker方法中, 线程在执行任务前持有锁, 将state更改为 1public void lock() { acquire(1); }// shutdown 关闭空闲线程时, 使用 tryLock 尝试获取锁 public boolean tryLock() { return tryAcquire(1); }// 任务执行完成释放锁, state更改为 0public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }// ...
}
AQS 在 Worker 中的主要作用是维护 state 属性。Worker 构造函数中,state
初始化为 -1,执行 runWorker() 方法时会被设置为 0。state 等于 0 说明线程是空闲的,state 等于 1 说明线程正在处理任务。
Worker#lock()
方法在仅在 runWorker 方法中被调用,线程在执行任务前调用该方法持有锁,将state更改为 1。Worker#unlock()
方法在执行完任务后被调用,释放锁,将 state 更改为 0。Worker#tryLock()
方法在 shutdown() 方法中被调用,用于中断空闲的工作线程,因为空闲的 Worker state 等于 0,因为 tryLock 能返回 true。
主逻辑 runWorker
runWorker 代码及详细注释如下:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 工作线程的第一个任务, 创建核心线程 或 线程池已满创建非核心线程时, firstTask非空Runnable task = w.firstTask;w.firstTask = null;// 将 state 由初始值 -1 修改为 0w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// getTask如果为空, 说明任务队列中已经没有任务可以执行, 工作线程正常退出while (task != null || (task = getTask()) != null) {w.lock(); // 在执行任务前, 清除线程的中断标记(较为费解, 随后详细解释)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行任务前的钩子方法, 继承ThreadPoolExecutor的类可重写beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务task.run(); } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行任务完成后的钩子方法, 继承ThreadPoolExecutor的类可重写afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock(); // state修改为0, 工作线程空闲}}completedAbruptly = false;} finally {// 处理线程退出:// 1. 从 worker 集合中移除当前工作线程// 2. 如果活跃线程数不满足线程池运行的最低要求, 或者线程因为执行异常而终止, 创建新线程替换processWorkerExit(w, completedAbruptly);}}
工作线程的运行流程概括起来为:
getTask
从线程池中获取 Runnable 任务;- 按照
beforeExecute
、Runnable#run
、afterExecute
的顺序执行,beforeExecute 和 afterExecute 为 ThreadPoolExecutor 提供的两个扩展点,子类可以重写这两个方法满足打点、日志等自定义需求。 - 如果任务顺利执行,进行下一轮循环,通过
getTask
获取新任务。
如果 getTask 返回 null,说明任务队列中没有任务 或者 当前线程因为线程池关闭而被中断。 - 如果任务 或 钩子函数执行时抛出了异常,线程同样会退出,completedAbruptly 为 true。
在讲解完工作线程的主要流程后,我们来讨论下面这个 if 语句的含义:
Thread wt = Thread.currentThread();
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();
// ...
这段代码执行的目的是:
工作线程 worker 已经领取了一个任务准备执行,如果线程池状态为 RUNNING 或 SHUTDOWN,应该确保当前线程的中断标记被清除,从而不影响任务的执行。Thread.interrupted()
方法会 返回当前线程的中断标记,并将线程中断标记清空。
如果线程池的状态为 STOP,且当前线程未被中断,wt.interrupt()
为当前线程打上中断标记。
下面我来分类讨论,帮助大家更好的理解:
runStateAtLeast(ctl.get(), STOP) == true && !wt.isInterrupted() == true
当前线程池的状态至少为 STOP,当前线程却没有中断标记。if 判断为 true,中断当前线程;(runStateAtLeast(ctl.get(), STOP) == false
、Thread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == false
。
if 判断为 false,当前线程的状态为 RUNNING 或 SHUTDOWN,且已经有一个即将执行的任务,Thread.interrupted()
将中断标记清除。(runStateAtLeast(ctl.get(), STOP) == false
、Thread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == true
、!wt.isInterrupted() == true
。
这种情况非常反直觉,但是有可能出现的。下图操作序列很好说明了这种情况:因为 错误地将 STOP 中断标记给清除,所以 if 也会判断为 true,执行wt.interrupt()
中断当前线程。
-
(runStateAtLeast(ctl.get(), STOP) == false
、Thread.interrupted() == false && runStateAtLeast(ctl.get(), STOP) == true
这种情况类似上一种,只是线程池状态设置为 STOP,还未中断当前线程,if 操作会返回 false。
获取任务 getTask()
工作线程通过 getTask 从任务队列中获取任务,如果 getTask 返回 null,线程就会退出 runWorker 中的死循环。
getTask 何时返回 null?
条件一:线程池状态为STOP、TIDYING、TERMINATED;或者是 SHUTDOWN且工作队列为空。
条件二:工作线程 wc 大于最大线程数或当前工作线程已经超时, 且还有其他工作线程或任务队列为空。
当前线程超时的条件:(【核心线程可以超时】或【线程数大于核心线程数】)且 上一轮循环从阻塞队列的 poll
方法超时返回。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c); // rs保留c的高3位, 低29位全部清零// 大小顺序为TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING// 条件一if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // cas扣减线程数return null;}int wc = workerCountOf(c);// timed表示当前线程是否能够超时(设置了【核心线程超时】或线程数超过了核心线程)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 条件二if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 可能有多个线程同时满足条件二, 需要使用cas扣减if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 超出核心线程数时, poll等待存在超时时间; 反之, 使用take阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):workQueue.take();if (r != null)return r;timedOut = true; // poll取任务超时, timedOut设置为true} catch (InterruptedException retry) {timedOut = false;}}
}
getTask 的流程图如下:
随后,我将在【工作线程的退出】章节,详细介绍 不同场景线程池回收工作线程的过程 ,会结合 getTask 方法分析。
工作线程的退出
RUNNING 状态所有任务执行完成
这种场景下,会将工作线程的数量减少到核心线程数大小。
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;
}
timed
表示是否允许线程因为超时被回收;timedOut
记录上一轮循环中,线程从阻塞队列获取任务是否超时了。
假设线程池核心线程数为2,最大线程数为4。线程数低于核心线程数时,使用execute 提交任务便会创建核心线程;线程数达到 2 后,任务被添加至阻塞队列,如果阻塞队列也满了,将工作线程逐渐增加到 4。当全部任务执行完成后:
-
工作队列为空,四个线程阻塞在
workQueue.poll
上,各自等待 keepAliveTime 时间后,超时返回,timedOut 设置为 true。 -
进入下一轮循环,因为 wc 等于 4 大于 corePoolSize=2,因此四个线程 timed 均为 true,从而
timed&timedOut
为 true 且 当前任务队列为空,情况二成立,4 个线程都可以被超时回收。 -
四个线程尝试 CAS 扣减 wc 为 3(仅有一个线程能扣减成功,getTask 返回 null)。其余三个线程继续循环,直到线程数达到核心线程数,timed 等于 false。
shutdown 关闭线程池
调用 shutdown()
后,线程池状态流转为 SHUTDOWN,随后向所有的空闲工作线程发送中断信号。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers(); // 中断所有空闲线程onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();
}
处于 getTask
获取任务阶段的工作线程是空闲的,并没有锁定 Worker。我将分三种情况探讨工作线程如何响应中断信号。
- 任务全部完成,所有线程在等待;
- 任务队列中积压了大量任务,所有线程在繁忙;
- 队列中剩余的任务少于空闲线程数;
所有线程等待新任务
// getTask(): 条件一
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // cas扣减线程数return null;
}
...
try {// 超出核心线程数时, poll等待存在超时时间; 反之, 使用take阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true; // poll取任务超时, timedOut设置为true
} catch (InterruptedException retry) {timedOut = false;
}
中断信号将阻塞的线程唤醒,进入下一轮循环。当到达条件一处,检查到 rs 等于SHUTDOWN,且工作队列为空,满足条件,扣减线程数后返回null。在runWorker 中退出循环,结束线程。
所有线程繁忙
此时任务队列中积压了很多任务,工作线程因为 shutdown 而被中断,在获取任务时 调用 poll 或 take 方法都会抛出 InterruptedException 异常,然后被 catch 捕获,重新进行循环。
第二次循环到达条件一,虽然 rs 为 SHUTDOWN,但是工作队列非空,不满足退出条件。
// 工作队列非空, 条件1不满足
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;
}
timedOut 为 false,不是因为 poll 超时而返回,因此条件 2 也不满足:
// timedOut false
if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;
}
因此,shutdown 方法在线程池繁忙的情况下,相当于让 正在获取任务的线程空转了一次,不影响线程池运行。
队列中剩余少量的任务
假设情形:
线程池状态已经是SHUTDOWN,但任务队列中剩余两个任务,A、B、C、D四个线程同时通过条件一和条件二,尝试从阻塞队列中获取任务。线程A、B成功获取任务,而线程 C、D因队列为空而阻塞。
线程A、B执行完任务后再次调用 getTask()
,条件一的判断为true(线程池运行状态为SHUTDOWN且工作队列为空),于是返回 null,线程退出 runWorker 死循环,准备进行回收。
final void runWorker(Worker w) {boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {...}}finally {// 回收退出的线程processWorkerExit(w, completedAbruptly);
}
在回收前,还需要执行 processWorkerExit 方法。在该方法中会将 worker 移除出 worker 集合,并调用tryTerminate()。
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 执行任务时抛出异常退出, 而非getTask()返回null退出, 需要更新ctl属性反映线程数的变化if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; // 统计完成的任务数workers.remove(w); // 将Worker对象移除工作线程集合} finally {mainLock.unlock();}tryTerminate();...
}
在 tryTerminate 中,线程A、B判断线程池状态为 SHUTDOWN 且工作队列为空,不会在第一个 if 处返回。
然后判断出当前workers中的工作线程数不为0(因为线程C、D正阻塞),然后调用 interruptIdleWorkers(ONLY_ONE)
。
注意:此时线程A、线程B的线程数已经从ctl扣减,Worker实例也从workers中移除。
final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 线程池状态为SHUTDOWN, 但仍然有线程阻塞在take或poll方法处if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}...}
}
interruptIdleWorkers
的入参 onlyOne 为true,因此只会中断一个空闲线程,然后break循环。假设先中断线程C,线程C从阻塞中被唤醒,抛出InterruptedException异常,被 catch 住异常后重新进行一轮循环,发现条件一满足,更新 ctl 并返回null。
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 正在执行任务的Worker是无法获取锁的, 因此这里只能回收空闲线程if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt(); } catch (SecurityException ignore) {} finally {w.unlock();}}// 仅中断一个空闲线程if (onlyOne)break;}} finally {mainLock.unlock();}
}
随后,线程 D 可以由上一个退出的线程中断唤醒(例如线程 C),从而让工作线程优雅地退出。
写在最后
感谢各位读者阅读本片博客,本篇博客的创作过程中参考了大量资料,笔者也详细阅读了 ThreadPoolExecutor 源码。笔者将很多阅读源码的思考融入本篇博客,尽可能去体会 Doug Lea 大神每一行代码的用意。这些细节可能很少有博客涉及,因此很可能存在纰漏和理解错误。如果有异见,欢迎在评论区指教,笔者将虚心倾听。
创作过程耗时费力,但我乐在其中(钻研源码的过程和分享知识是让人快乐的事情),如果大家喜欢这种图文结合、代码详细注释的写作风格,就给我点一个免费的赞吧!