深入分析Java线程池——ThreadPoolExecutor

文章目录

  • Java 线程池概述
    • ThreadPoolExecutor 构造方法
    • 线程池拒绝策略
    • 工作流程
    • 并发库中的线程池
      • CachedThreadPool
      • FixedThreadPool
      • SingleThreadExecutor
      • ScheduledThreadPool
  • ThreadPoolExecutor 源码分析
    • 线程池状态表示
      • 获取 runState
      • 获取 workerCount
      • 生成 ctl
    • 提交任务 execute()
      • 为什么需要二次检查
      • 创建工作线程 addWorker()
    • 工作线程 Worker
      • 主逻辑 runWorker
      • 获取任务 getTask()
    • 工作线程的退出
      • RUNNING 状态所有任务执行完成
      • shutdown 关闭线程池
        • 所有线程等待新任务
        • 所有线程繁忙
        • 队列中剩余少量的任务
  • 写在最后

Java 线程池概述

Java 语言中创建线程看上去就像创建一个对象,仅仅调用 new Thread() 即可,但实际上创建线程比创建对象复杂得多。创建对象仅仅是在 JVM 的堆里分配一块内存,而创建线程,需要 调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本很高。因此,线程是一个重量级的对象,应该避免频繁的创建和销毁。


线程池没有采用一般的池化资源设计方法(例如:连接池、对象池),因为我们无法获取一个启动的 Thread 对象,然后动态地将需要执行的任务 Runnable task 提交给线程执行。目前业界地线程池设计,普遍采用生产者-消费者模型,线程池的使用方为 Producer,而线程池中的 工作线程为 Consumer

ThreadPoolExecutor 构造方法

Java 提供的线程池相关的工具类中,最核心的是ThreadPoolExecutor。ThreadPoolExecutor 的构造函数如下:

  • corePoolSize:线程池保有的最小线程数(核心线程数)。如果线程池中的线程数小于 corePoolSize,提交任务时会创建一个核心线程,该任务作为新创建的核心线程第一个执行的任务。
  • maximumPoolSize:最大线程数。如果提交任务时任务队列已经满了,且当前工作线程数小于 maximumPoolSize,会创建新的工作线程用于执行该任务;反之如果工作线程数大于等于 maximumPoolSize,则执行拒绝策略。
  • keepAliveTime & unit:一个线程如果在时间单位为 unit 的 keepAliveTime 时间内没有执行任务,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收。
  • workQueue:任务队列,为 BlockingQueue 实现类。
  • threadFactory:线程工厂,通过该参数可以自定义如何创建线程。ThreadFactory 是一个接口,里面是有一个 newThread 方法等待实现:
    Thread newThread(Runnable r);//接口方法默认为public abstract
    
  • handler:任务的拒绝策略,如果线程池中 所有的线程都在忙碌,且任务队列已经满了(前提是任务队列是有界队列),此时提交任务线程池会拒绝执行。决绝的策略可以通过该参数指定。

温馨提示:线程池的静态工厂类 Executors 提供了很多开箱即用的线程池,可以帮助快速创建线程池,但提供的线程池很多使用的是 无界队列 LinkedBlockingQueue,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理。
在阅读完本节后我们知道,在生产环境中使用线程池时需要设置 ThreadPoolExecutor 构造方法的 workQueue 参数为 ArrayBlockingQueue 等有界阻塞队列。


线程池拒绝策略

上一小节提到,构造方法中的 RejectedExecutionHandler handler 参数可以用于自定义任务拒绝策略。ThreadPoolExecutor 已经提供了 4 种拒绝策略:

  • CallerRunsPolicy:提交任务的线程自己去执行该任务。

  • AbortPolicy:默认的拒绝策略,会抛出 RejectedExecutionException。

  • DiscardPolicy:直接丢弃任务,没有任何异常抛出。

  • DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。


默认拒绝策略为 AbortPolicy,该拒绝策略抛出 RejectedExecutionException 为运行时异常,编译器不会强制 catch,开发人员可能会忽略,因此默认拒绝策略要慎重使用
如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中, 自定义的拒绝策略往往和 降级策略 配合使用

例如:将任务信息插入数据库或者消息队列,配置 XXL-JOB 定时任务扫描失败任务表,将执行失败的任务交给专用于补偿的线程池去进行补偿


工作流程

线程池中有几个重要的概念:核心线程池(CorePool)、**空闲线程池(IdlePool)**以及 任务队列。下图为我绘制的线程池工作流程图,包含上述三个概念模型,cpSize 核心线程池中当前的线程数、cpCap 核心线程池容量、ipSize 空闲线程池中当前线程数。

请添加图片描述

我来简述下提交任务 task 时,线程池的执行流程:

  1. 如果核心线程池未满,即 cpSize 小于 cpCap,通过线程工厂 创建一个核心线程,将 task 作为新线程的第一个任务。

  2. 如果 核心线程池已满,但是任务队列仍然有空间,将 task 添加到任务队列。核心线程在执行完手头的任务后,会从任务队列中获取新的任务,继续执行。如果任务队列为空,核心线程会阻塞在任务获取阶段,直到有 新的任务提交到任务队列

  3. 如果任务队列已满,则创建空闲线程,并将 task 作为第一个执行的任务。空闲线程如果执行完手头的任务,也会从任务队列中获取新的任务。
    如果任务队列为空,空闲线程会阻塞,直到 超出 keepalive 设定的时间 或 获取到新的任务执行。如果等待新任务超时,空闲线程的生命周期就会结束了。

  4. 如果空闲线程数+核心线程数已经达到了 maximumPoolSize,创建新线程的方法会失败,此时提交的任务将被拒绝,拒绝策略由 RejectedHandler 负责执行。


并发库中的线程池

java.util.concurrent.Executors 提供了通用线程池创建方法,去创建不同配置的线程池,该工具类目前提供了五种不同的线程池创建配置:

CachedThreadPool

CachedThreadPool 是一种用来 处理大量短时间工作任务的线程池,会在先前构建的线程可用时重用已创建的工作线程,但是当工作线程空闲超过 60s,则会从线程池中移除。

任务队列为 SynchronousQueue,它是一个不存储元素的阻塞队列(容量 0),提交任务的操作必须等待工作线程的移除操作,反之亦然。

在这里插入图片描述


为什么使用 SynchronousQueue 作为任务队列

个人想法:线程池的工作逻辑是,提交任务时如果 核心线程数达到 corePoolSize 且任务队列已满,则会创建空闲线程执行。因为 SynchronousQueue 容量为 0 天然是满的,且 corePoolSize 被设置为 0,这意味着创建任务时如果没有可用线程,就会立即创建一个新线程来处理任务。

这使得 CachedThreadPool 在执行大量短期异步任务时更加高效,避免了任务对线程资源的等待,符合设计初衷:快速执行大量的短暂任务

FixedThreadPool

核心线程数和最大线程数相等,使用的是 无界任务队列 LinkedBlockingQueue。如果当前的工作线程数已经达到 nThreads,任务将被添加到任务队列中等待执行。如果有工作线程退出,下一次提交任务时将会有新的工作线程被创建来补足线程池。

SingleThreadExecutor

工作线程限制为 1 的 FixedThreadExecutor,它 保证了所有任务的都是被顺序执行

ScheduledThreadPool

ScheduledThreadPoolExecutor 允许安排一个任务在延迟指定时间后 执行,还可以 周期性地执行任务。周期性调度任务有两种类型:固定延迟和固定频率。固定延迟 是在上一个任务结束和下一个任务开始之间保持固定的延迟,而 固定频率 是以固定的频率执行任务,不管任务的执行时间多长。


ScheduledThreadPoolExecutor 中定义了内部类 DelayedWorkQueue 作为任务队列,DelayedWorkQueue 是基于堆的数据结构。队列中的元素为 RunnableScheduledFuture 类型:

private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

RunnableScheduledFuture 接口继承关系如下图所示:

  • Delayed 接口继承了 Comparable 接口,getDelay 方法返回任务剩余的延迟时间,返回值小于等于 0 说明延迟的时间已过;compareTo 方法用于比较任务下一次的执行时间,用于维护小顶堆属性(父节点任务的执行时间小于儿子节点)。
  • RunnableFuture 接口的 run 方法定义了需要执行的任务逻辑,Future 接口用于获取异步任务的执行结果。

DelayedWorkQueue#take 方法用于获取下一个需要执行的定时任务,代码及详细注释如下:

public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;// 上锁, 避免堆数据访问产生的数据竞争lock.lockInterruptibly();try {for (;;) {// 堆顶元素: Delayed#getDelay 延迟时间最小的任务RunnableScheduledFuture<?> first = queue[0];if (first == null)// 堆中任务空, 等待新任务入堆available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// delay小于等于0, 说明延迟时间已过, 可以执行;// finishPoll 弹出堆顶任务return finishPoll(first);first = null; // don't retain ref while waitingif (leader != null)// leader为等待堆顶任务到达执行时间的线程// leader 非空说明已经有线程正在等待堆顶任务可执行, 因此当前线程为 follower, 需要等待直到堆顶元素变更available.await();else {// 当前线程是等待堆顶元素的 leader 线程, 设置 leader 属性Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待任务延迟的时间available.awaitNanos(delay);} finally {// await期间会释放锁, leader可能因为新任务的加入而失效(当前线程可能等待的不再是堆顶任务)// 所以await超时后, 需要判断leader是否为当前线程, 为当前线程才能设为nullif (leader == thisThread)leader = null;}}}}} finally {// 任务队列非空, leader为空说明没有线程等待堆顶元素可执行, 此时唤醒 follower 线程, 尝试获取堆顶的任务if (leader == null && queue[0] != null)available.signal();lock.unlock();}
}

ThreadPoolExecutor 源码分析

线程池状态表示

ThreadPoolExecutor 最重要的状态参数为:线程池状态(rs) 以及 活跃线程数(wc)。ThreadPoolExecutor 使用一个 Integer 变量 ctl 存储这两个状态参数:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

Integer 长度位 32 bits,ctl 中最高的三位 (29-31) 存储线程池状态,低 29 位 (0~28) 存储活跃线程数,因此线程池中活跃线程数理论上限为 2 29 − 1 2^{29}-1 2291

了解了 ThreadPoolExecutor 的这种设计之后,我们来看看状态相关的位运算:

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

CAPACITY 表示线程池中活跃线程的理论上限 2 29 − 1 2^{29}-1 2291COUNT_BITS 表示线程数位数( 32 − 3 = 29 32-3=29 323=29)。

RUNNING 、SHUTDOWN、STOP、TIDYING、TERMINATED 为线程池的五种状态。根据代码,这五种状态表示如下图所示:

在这里插入图片描述

  • RUNNING:可接收新的任务,并且处理队列中排队的任务;
  • SHUTDOWN:不接收新的任务,但会处理队列中剩下的任务;
  • STOP:不接收新任务,不处理队列中的任务,并且中断进行中的任务;
  • TIDYING:所有的任务终止,工作线程数 (workerCount) 等于 0;
  • TERMINATED:线程池关闭,terminated() 方法完成。

获取 runState

private static int runStateOf(int c)     { return c & ~CAPACITY; }

runStateOf 方法从 ctl 获取线程池运行状态,保留 ctl 的最高的三位,其余位设置为 0。以 STOP 状态、3 个活跃线程数的 ctl 为例,求 rs 的过程如下:
在这里插入图片描述


获取 workerCount

private static int workerCountOf(int c)  { return c & CAPACITY; }

workerCountOf 获取线程池中的活跃线程数,即保留 ctl 的 0-28 位,将 29-31 位设置为 0。

在这里插入图片描述

生成 ctl

private static int ctlOf(int rs, int wc) { return rs | wc; }

ctlOf 通过状态值和线程数值计算出 ctl,就是对 rs 和 wc 进行或运算,保留 wc 的 0-28 位和 rs 的 29-31 位。

提交任务 execute()

ThreadPoolExecutor#execute 方法用于提交任务给线程池执行,代码以及详细注释如下:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取线程池状态参数 ctlint c = ctl.get();// 如果活跃线程数小于核心线程池容量corePoolSize, addWorker创建新线程, 以command作为第一个任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;// 创建新线程失败, 更新 ctlc = ctl.get();}// 创建核心线程失败, 尝试将任务添加到任务队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 二次检查, 如果线程池不在运行状态, 需要回滚刚刚入队的任务if (!isRunning(recheck) && remove(command))// 移除任务成功, 执行拒绝策略reject(command);else if (workerCountOf(recheck) == 0)// 线程池为运行状态, 但是没有工作线程, 创建线程处理任务队列中的任务addWorker(null, false);}// 任务添加到队列失败, (1)线程池状态不是RUNNING状态 或 (2)任务队列已满// 尝试增加非核心线程, 执行 command 任务, 如果线程池不为RUNNING, addWorker会返回falseelse if (!addWorker(command, false))// 线程池不为RUNNING, 新增非核心线程失败, 执行任务拒绝策略reject(command);}

这段代码的主要逻辑很简洁:

  1. 当 wc 小于 corePoolSize 时,创建核心线程执行 command 任务;
  2. 如果核心线程数已满,则将任务缓存在任务队列中 (workQueue.offer),工作线程完成手头上的任务后,从任务队列中获取新任务。
  3. 如果任务队列也满了,offer 方法返回 false,尝试增加非核心线程执行 command。如果线程创建失败,reject 执行任务拒绝策略。

除此之外,我想在本篇博客中探讨下 execute 方法的一些实现细节,并给出我自己的观点用于抛砖引玉。


为什么需要二次检查

大家请看下面这段有关二次检查的代码,在阅读源码时,我产生了疑问: 为什么需要二次检查 ?该操作 解决了什么场景下的数据竞争

// ...
c = ctl.get(); // -------(1)
if (isRunning(c) && workQueue.offer(command)) { // -------(2)int recheck = ctl.get();  // -------(3)if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);
}
// ...

执行二次检查的前提是:

  • 线程池在执行语句 (1) 的时候,是运行状态;
  • 任务队列未满,command 被添加至队列,(2) 处的 offer 方法返回 true;

假设没有二次检查,会发生什么

场景 1:在语句 (1) 后,语句(2) 执行前,线程池使用者调用了 shutdownNow 方法将线程池工作线程关闭,清空任务队列中的任务。时序图如下:


我们先来看看 shutdownNow 调用的 drainQueue 方法:

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}private List<Runnable> drainQueue() {BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();q.drainTo(taskList);if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;
}

drainQueue 方法用于移除任务队列 workQueue 中的 Runnable 任务,这些未执行的任务作为 shutdownNow 方法的返回值,通知方法调用者哪些任务未执行。

如果按照上图中的执行序列,线程池的状态已经为 STOP,任务队列也被清空,但是新提交的任务 command 却被添加到任务队列中。这导致这个新任务不会被运行、也不会执行拒绝策略、也无法通过 shutdownNow 返回的任务列表通知调用者

这严重降低了线程池的健壮性,难以想象一个已提交的任务消失在线程池中!


场景 2:线程池处于运行状态,corePoolSize 设置为 0,阻塞队列的容量大于 0。

线程池刚启动时,提交任务 command 显然无法创建核心线程执行,任务会被缓冲在任务队列中,直到任务队列容量到达上限,线程池才会创建非核心线程执行任务。这导致 大量任务将不能及时被处理,甚至可能永远得不到执行

场景示意图如下(图中任务队列容量为 4,corePoolSize 等于 0):


二次检查解决了上述两种场景的问题吗?当然!!

c = ctl.get(); // -------(1)
if (isRunning(c) && workQueue.offer(command)) { // -------(2)int recheck = ctl.get();  // -------(3)if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0) // -----(4)addWorker(null, false);
}

针对场景 1

如果在语句 (1) 和 语句(2) 之间,shutdownNow 被调用并执行完成,然后语句 (2) 将新任务 command 加入任务队列。在语句 (3) 重新获取最新的 ctl,此时就能得知线程池的状态已经为 STOP,使用 remove 方法回滚入队的任务,并执行 reject 方法拒绝执行任务


针对场景 2
如果线程池状态为 RUNNING,但因为线程中没有线程,语句(4) 判断为 true,创建非核心线程处理任务队列中的任务,防止异步任务长时间处于队列中得不到处理 的情况。


创建工作线程 addWorker()

addWorker 用于创建工作线程,我将其分为两部分分析:

  • 第一部分:根据外层死循环判断 ThreadPoolExecutor 的运行状态 是否能够创建线程。如果可以创建线程,通过内层死循环 CAS 更新状态参数 ctl,直到更新成功或线程池状态发生改变。

第一部分的含详细注释的代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {// retry为外层循环retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 仅 (1) RUNNING状态 或 (2) SHUTDOWN状态+队列中仍有任务+firstTask为空 时 创建工作线程// firstTask为空, 说明活跃线程数不满足线程池运行的最小数量if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// for内层循环for (;;) {int wc = workerCountOf(c);// 如果线程数达到容量上限, 不可创建新线程// 如果core为true, 线程数大于等于corePoolSize, 不能创建核心线程// 如果 core 为 false, 线程数大于等于 maximumPoolSize, 不可以创建非核心线程if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS更新 ctl, 如果成功, 则退出 retry 循环, 执行创建流程if (compareAndIncrementWorkerCount(c))break retry;// CAS更新失败, 重新读取 ctlc = ctl.get();if (runStateOf(c) != rs)// 状态发生改变, 重新执行大循环continue retry;// else: 线程数改变导致CAS失败, 继续for循环即可}}// ... 省略第二部分
} 
  • 第二部分:状态更新成功后,执行真正的线程创建逻辑,包括:工作线程添加至 Worker 集合、启动 Thread 对象。

第二部分详细代码注释如下:

private boolean addWorker(Runnable firstTask, boolean core) {// ... 省略第一部分boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 持有锁的情况下获取 ctl, 防止 shutdown、shutdownNow 导致的状态变更int rs = runStateOf(ctl.get());// 运行状态为 RUNNING或运行状态为 SHUTDOWN 且 firstTask为空 才允许启动工作线程if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 线程可能已经启动, 抛出异常(例如: 自定义的ThreadFactory#newThread 方法多次调用返回同一个 Thread 对象)if (t.isAlive())throw new IllegalThreadStateException();workers.add(w); // 添加到 HashSet 中int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// worker 成功添加到 workers 集合, 在这里真正启动工作线程t.start();workerStarted = true;}}} finally {if (! workerStarted)// 启动线程失败(可能线程已经启动 或 线程池状态发生改变), 将worker从workers中移除, 扣减 workerCountaddWorkerFailed(w);}return workerStarted;
}

大部分代码阅读注释即可了解原理,这里提一下我阅读时产生疑惑的地方:

疑惑一firstTask 等于 null 代表什么?为什么判断能否创建线程时,处于 SHUTDOWN 状态还需要 firstTask 等于 null ?

 if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;

疑惑二:为什么需要在持有 mainLock 后,需要重新检查运行状态 rs


先来看疑惑一,firstTask 等于 null 出现的场景有:

  • 预启动核心线程(所有包含 prestart 单词的方法)
public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize &&addWorker(null, true);
}public int prestartAllCoreThreads() {int n = 0;while (addWorker(null, true))++n;return n;
}
  • 在工作线程退出时,替换死亡的工作线程。(processWorkerExit 方法)
private void processWorkerExit(Worker w, boolean completedAbruptly) {// ...int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}
  • 提交的新任务被缓冲在队列,但活跃线程数 workCount 等于 0。(execute方法)

在 addWorker 方法中,Running 状态可以创建工作线程,SHUTDOWN 状态仅可以在 firstTask 等于 null 的条件下创建线程。这符合 SHUTDOWN 状态的设计初衷:不接受新的任务、仅处理已添加至阻塞队列中的任务

除了预启动场景,execute 场景和 processWorkerExit场景 均是为了确保已经添加到任务队列中的任务不被放弃,能够成功执行。


再来看疑惑二:为什么在持有 mainLock 的情况下获取运行状态 rs?

这是为了防止 shutdown、shutdownNow 方法关闭线程池,改变运行状态。
为了确保 shutdown 和 shutdownNow 方法执行时 worker 集合的稳定,从而保证方法执行过程的原子性,这两种方法都会 在持有 mainLock 的情况下,修改 runState

因此,如果创建 worker 时 rs 发生了改变从而不应该增加工作线程,应该退出创建流程。(例如 RUNNING 变为 STOP 状态,此时不应该创建线程,因为任务都被丢弃了)。

mainLock.lock();int rs = runStateOf(ctl.get());// 确保运行状态 rs 可以创建新的线程if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// ...}
mainLock.unlock();

下面是我绘制的 addWorker 工作流程图,作为本小节的总结:


工作线程 Worker

ThreadPoolExecutor 中的线程资源被包装为 Worker 对象,它持有一个 Thread 对象,实现了 Runnable 接口,又继承了 AQS,因此也具有锁的性质。

需要指出的是,它没有利用 AQS 中的 CLH 队列管理等待资源的线程,因为 Worker 并 不存在多个线程争抢所有权,它的 lock 方法仅由内部持有的 线程调用。

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // AQS state 属性初始化为 -1this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 线程的执行逻辑就是 runWorker方法public void run() {runWorker(this);}// runWorker方法中, 线程在执行任务前持有锁, 将state更改为 1public void lock()        { acquire(1); }// shutdown 关闭空闲线程时, 使用 tryLock 尝试获取锁 public boolean tryLock()  { return tryAcquire(1); }// 任务执行完成释放锁, state更改为 0public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }// ...
}

AQS 在 Worker 中的主要作用是维护 state 属性。Worker 构造函数中,state 初始化为 -1,执行 runWorker() 方法时会被设置为 0。state 等于 0 说明线程是空闲的,state 等于 1 说明线程正在处理任务

  • Worker#lock() 方法在仅在 runWorker 方法中被调用,线程在执行任务前调用该方法持有锁,将state更改为 1。
  • Worker#unlock() 方法在执行完任务后被调用,释放锁,将 state 更改为 0。
  • Worker#tryLock() 方法在 shutdown() 方法中被调用,用于中断空闲的工作线程,因为空闲的 Worker state 等于 0,因为 tryLock 能返回 true。

主逻辑 runWorker

runWorker 代码及详细注释如下:

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 工作线程的第一个任务, 创建核心线程 或 线程池已满创建非核心线程时, firstTask非空Runnable task = w.firstTask;w.firstTask = null;// 将 state 由初始值 -1 修改为 0w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// getTask如果为空, 说明任务队列中已经没有任务可以执行, 工作线程正常退出while (task != null || (task = getTask()) != null) {w.lock(); // 在执行任务前, 清除线程的中断标记(较为费解, 随后详细解释)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行任务前的钩子方法, 继承ThreadPoolExecutor的类可重写beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务task.run(); } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行任务完成后的钩子方法, 继承ThreadPoolExecutor的类可重写afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock(); // state修改为0, 工作线程空闲}}completedAbruptly = false;} finally {// 处理线程退出:// 1. 从 worker 集合中移除当前工作线程// 2. 如果活跃线程数不满足线程池运行的最低要求, 或者线程因为执行异常而终止, 创建新线程替换processWorkerExit(w, completedAbruptly);}}

工作线程的运行流程概括起来为:

  1. getTask 从线程池中获取 Runnable 任务;
  2. 按照 beforeExecuteRunnable#runafterExecute 的顺序执行,beforeExecute 和 afterExecute 为 ThreadPoolExecutor 提供的两个扩展点,子类可以重写这两个方法满足打点、日志等自定义需求。
  3. 如果任务顺利执行,进行下一轮循环,通过 getTask 获取新任务。
    如果 getTask 返回 null,说明任务队列中没有任务 或者 当前线程因为线程池关闭而被中断
  4. 如果任务 或 钩子函数执行时抛出了异常,线程同样会退出,completedAbruptly 为 true。

在讲解完工作线程的主要流程后,我们来讨论下面这个 if 语句的含义:

Thread wt = Thread.currentThread();
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();
// ...

这段代码执行的目的是:

工作线程 worker 已经领取了一个任务准备执行,如果线程池状态为 RUNNING 或 SHUTDOWN,应该确保当前线程的中断标记被清除,从而不影响任务的执行。Thread.interrupted() 方法会 返回当前线程的中断标记,并将线程中断标记清空

如果线程池的状态为 STOP,且当前线程未被中断,wt.interrupt() 为当前线程打上中断标记。

下面我来分类讨论,帮助大家更好的理解:

  • runStateAtLeast(ctl.get(), STOP) == true && !wt.isInterrupted() == true
    当前线程池的状态至少为 STOP,当前线程却没有中断标记。if 判断为 true,中断当前线程;
  • (runStateAtLeast(ctl.get(), STOP) == falseThread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == false
    if 判断为 false,当前线程的状态为 RUNNING 或 SHUTDOWN,且已经有一个即将执行的任务,Thread.interrupted() 将中断标记清除。
  • (runStateAtLeast(ctl.get(), STOP) == falseThread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == true!wt.isInterrupted() == true
    这种情况非常反直觉,但是有可能出现的。下图操作序列很好说明了这种情况:因为 错误地将 STOP 中断标记给清除,所以 if 也会判断为 true,执行 wt.interrupt() 中断当前线程。
  • (runStateAtLeast(ctl.get(), STOP) == falseThread.interrupted() == false && runStateAtLeast(ctl.get(), STOP) == true
    这种情况类似上一种,只是线程池状态设置为 STOP,还未中断当前线程,if 操作会返回 false。


获取任务 getTask()

工作线程通过 getTask 从任务队列中获取任务,如果 getTask 返回 null,线程就会退出 runWorker 中的死循环。

getTask 何时返回 null

条件一:线程池状态为STOP、TIDYING、TERMINATED;或者是 SHUTDOWN且工作队列为空

条件二:工作线程 wc 大于最大线程数或当前工作线程已经超时, 且还有其他工作线程或任务队列为空。

当前线程超时的条件:(【核心线程可以超时】或【线程数大于核心线程数】)且 上一轮循环从阻塞队列的 poll 方法超时返回。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c); // rs保留c的高3位, 低29位全部清零// 大小顺序为TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING// 条件一if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // cas扣减线程数return null;}int wc = workerCountOf(c);// timed表示当前线程是否能够超时(设置了【核心线程超时】或线程数超过了核心线程)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 条件二if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 可能有多个线程同时满足条件二, 需要使用cas扣减if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 超出核心线程数时, poll等待存在超时时间; 反之, 使用take阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):workQueue.take();if (r != null)return r;timedOut = true; // poll取任务超时, timedOut设置为true} catch (InterruptedException retry) {timedOut = false;}}
}

getTask 的流程图如下:


随后,我将在【工作线程的退出】章节,详细介绍 不同场景线程池回收工作线程的过程 ,会结合 getTask 方法分析。

工作线程的退出

RUNNING 状态所有任务执行完成

这种场景下,会将工作线程的数量减少到核心线程数大小。

int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;
}

timed 表示是否允许线程因为超时被回收;timedOut 记录上一轮循环中,线程从阻塞队列获取任务是否超时了。

假设线程池核心线程数为2,最大线程数为4。线程数低于核心线程数时,使用execute 提交任务便会创建核心线程;线程数达到 2 后,任务被添加至阻塞队列,如果阻塞队列也满了,将工作线程逐渐增加到 4。当全部任务执行完成后:

  1. 工作队列为空,四个线程阻塞在 workQueue.poll 上,各自等待 keepAliveTime 时间后,超时返回,timedOut 设置为 true。

  2. 进入下一轮循环,因为 wc 等于 4 大于 corePoolSize=2,因此四个线程 timed 均为 true,从而 timed&timedOut 为 true 且 当前任务队列为空,情况二成立,4 个线程都可以被超时回收。

  3. 四个线程尝试 CAS 扣减 wc 为 3(仅有一个线程能扣减成功,getTask 返回 null)。其余三个线程继续循环,直到线程数达到核心线程数,timed 等于 false。

shutdown 关闭线程池

调用 shutdown() 后,线程池状态流转为 SHUTDOWN,随后向所有的空闲工作线程发送中断信号

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers(); // 中断所有空闲线程onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();
}

处于 getTask 获取任务阶段的工作线程是空闲的,并没有锁定 Worker。我将分三种情况探讨工作线程如何响应中断信号。

  • 任务全部完成,所有线程在等待;
  • 任务队列中积压了大量任务,所有线程在繁忙;
  • 队列中剩余的任务少于空闲线程数;
所有线程等待新任务
// getTask(): 条件一
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // cas扣减线程数return null;
}
...
try {// 超出核心线程数时, poll等待存在超时时间; 反之, 使用take阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true; // poll取任务超时, timedOut设置为true
} catch (InterruptedException retry) {timedOut = false;
}

中断信号将阻塞的线程唤醒,进入下一轮循环。当到达条件一处,检查到 rs 等于SHUTDOWN,且工作队列为空,满足条件,扣减线程数后返回null。在runWorker 中退出循环,结束线程。

所有线程繁忙

此时任务队列中积压了很多任务,工作线程因为 shutdown 而被中断,在获取任务时 调用 poll 或 take 方法都会抛出 InterruptedException 异常,然后被 catch 捕获,重新进行循环。

第二次循环到达条件一,虽然 rs 为 SHUTDOWN,但是工作队列非空,不满足退出条件。

// 工作队列非空, 条件1不满足
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;
}

timedOut 为 false,不是因为 poll 超时而返回,因此条件 2 也不满足:

// timedOut false
if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;
}

因此,shutdown 方法在线程池繁忙的情况下,相当于让 正在获取任务的线程空转了一次,不影响线程池运行。

队列中剩余少量的任务

假设情形

线程池状态已经是SHUTDOWN,但任务队列中剩余两个任务,A、B、C、D四个线程同时通过条件一和条件二,尝试从阻塞队列中获取任务。线程A、B成功获取任务,而线程 C、D因队列为空而阻塞。

线程A、B执行完任务后再次调用 getTask(),条件一的判断为true(线程池运行状态为SHUTDOWN且工作队列为空),于是返回 null,线程退出 runWorker 死循环,准备进行回收。

final void runWorker(Worker w) {boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {...}}finally {// 回收退出的线程processWorkerExit(w, completedAbruptly);
}

在回收前,还需要执行 processWorkerExit 方法。在该方法中会将 worker 移除出 worker 集合,并调用tryTerminate()。

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 执行任务时抛出异常退出, 而非getTask()返回null退出, 需要更新ctl属性反映线程数的变化if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; // 统计完成的任务数workers.remove(w); // 将Worker对象移除工作线程集合} finally {mainLock.unlock();}tryTerminate();...
}

在 tryTerminate 中,线程A、B判断线程池状态为 SHUTDOWN 且工作队列为空,不会在第一个 if 处返回。

然后判断出当前workers中的工作线程数不为0(因为线程C、D正阻塞),然后调用 interruptIdleWorkers(ONLY_ONE)

注意:此时线程A、线程B的线程数已经从ctl扣减,Worker实例也从workers中移除。

final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 线程池状态为SHUTDOWN, 但仍然有线程阻塞在take或poll方法处if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}...}
}

interruptIdleWorkers 的入参 onlyOne 为true,因此只会中断一个空闲线程,然后break循环。假设先中断线程C,线程C从阻塞中被唤醒,抛出InterruptedException异常,被 catch 住异常后重新进行一轮循环,发现条件一满足,更新 ctl 并返回null。

private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 正在执行任务的Worker是无法获取锁的, 因此这里只能回收空闲线程if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();    } catch (SecurityException ignore) {} finally {w.unlock();}}// 仅中断一个空闲线程if (onlyOne)break;}} finally {mainLock.unlock();}
}

随后,线程 D 可以由上一个退出的线程中断唤醒(例如线程 C),从而让工作线程优雅地退出。

写在最后

感谢各位读者阅读本片博客,本篇博客的创作过程中参考了大量资料,笔者也详细阅读了 ThreadPoolExecutor 源码。笔者将很多阅读源码的思考融入本篇博客,尽可能去体会 Doug Lea 大神每一行代码的用意。这些细节可能很少有博客涉及,因此很可能存在纰漏和理解错误。如果有异见,欢迎在评论区指教,笔者将虚心倾听

创作过程耗时费力,但我乐在其中(钻研源码的过程和分享知识是让人快乐的事情),如果大家喜欢这种图文结合、代码详细注释的写作风格,就给我点一个免费的赞吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/733875.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

漫谈技术成长

引言 相信很多程序员在自己的技术成长之路上&#xff0c;总会遇到许许多多的难关&#xff0c;有些难关咬咬牙就过去了&#xff0c;而有点难关则需要有一定的能力&#xff0c;才能克服。因此&#xff0c;本文主要围绕“技术成长” 话题&#xff0c;为何会选择技术方向&#xff0…

开源的Java图片处理库介绍

在 Java 生态系统中&#xff0c;有几个流行的开源库可以用于图片处理。这些库提供了丰富的功能&#xff0c;如图像缩放、裁剪、颜色调整、格式转换等。以下是几个常用的 Java 图片处理库的介绍&#xff0c;包括它们的核心类、主要作用和应用场景&#xff0c;以及一些简单的例子…

Normalizer(归一化)和MinMaxScaler(最小-最大标准化)的区别详解

1.Normalizer&#xff08;归一化&#xff09;&#xff08;更加推荐使用&#xff09; 优点&#xff1a;将每个样本向量的欧几里德长度缩放为1&#xff0c;适用于计算样本之间的相似性。 缺点&#xff1a;只对每个样本的特征进行缩放&#xff0c;不保留原始数据的分布形状。 公式…

C语言指针从入门到基础详解(非常详细)

1.内存和地址 我们知道电脑中的CPU在处理数据的时候需要在内存中读取数据处理后的数据也会放在内存中。把内存划分为一个个的内存单元每个单元的大小是一个字节。每个字节都有它对应的编号也就是它的地址&#xff0c;以便CPU可以快速的找到一个内存空间。C语言中我们把地址叫做…

MySQL-锁:共享锁(读)、排他锁(写)、表锁、行锁、意向锁、间隙锁,锁升级

MySQL-锁&#xff1a;共享锁&#xff08;读&#xff09;、排他锁&#xff08;写&#xff09;、表锁、行锁、意向锁、间隙锁 共享锁&#xff08;读锁&#xff09;、排他锁表锁行锁意向锁间隙锁锁升级 MySQL数据库中的锁是控制并发访问的重要机制&#xff0c;它们确保数据的一致性…

SQL中常见的DDL操作及示例,数据库操作及表操作

目录 一、数据库操作 1、创建数据库 2、查看所有数据库 3、使用数据库 4、删除数据库 二、表操作&#xff1a; 1、创建表 2、查看表结构 3、修改表结构 3.1 添加列 3.2 修改列数据类型 3.3 修改列名 3.4 删除列 3.5 修改表名 3.6 删除表 注意&#xff1a; 在数…

数字化解决方案的设计与实现:提升业务效率与用户体验

摘要&#xff1a;随着数字化时代的到来&#xff0c;越来越多的企业和组织开始寻求数字化解决方案来提升业务效率和改善用户体验。本文将探讨数字化解决方案的设计与实现过程&#xff0c;并介绍一些关键的技术和策略。 ## 引言 在当今竞争激烈的商业环境中&#xff0c;企业和组…

Unity 轮转图, 惯性, 自动回正, 点击选择

简单的实现 2D 以及 3D 的轮转图, 类似于 Web 中无限循环的轮播图那样. 文中所有代码均已同步至 github.com/SlimeNull/UnityTests 3D 轮转图: Assets/Scripts/Scenes/CarouselTestScene/Carousel.cs2D 轮转图: Assets/Scripts/Scenes/CarouselTestScene/UICarousel.cs 主要逻…

HashMap的底层实现

1、1.7版本的底层实现 HashMap在1.7版本中数据结构是数组链表&#xff0c; 1.1 put方法 put方法中操作步骤&#xff1a; &#xff08;1&#xff09;、对key计算相应的hash值&#xff0c;然后通过hash & table.length-1计算可以获得到在hash表中中相应的桶位置&#xff…

海外媒体宣发套餐如何利用3种方式洞察市场-华媒舍

在当今数字化时代&#xff0c;媒体宣发成为了企业推广产品和品牌的重要手段之一。其中&#xff0c;7FT媒体宣发套餐是一种常用而有效的宣传方式。本文将介绍这种媒体宣发套餐&#xff0c;以及如何利用它来洞察市场。 一、关键概念 在深入讨论7FT媒体宣发套餐之前&#xff0c;让…

golang实现正向代理和反向代理

文章目录 正向代理反向代理区别与联系:总结代理服务器实现正向代理反向代理正向代理 正向代理是客户端代理,它位于客户端和目标服务器之间。它的作用是保护客户端的隐私和安全。 如我们现在想要访问谷歌,但是由于某些原因,无法直接访问到谷歌,我们可以通过连接一台代理服务…

STM32_3-1点亮LED灯与蜂鸣器发声

STM32之GPIO GPIO在输出模式时可以控制端口输出高低电平&#xff0c;用以驱动Led蜂鸣器等外设&#xff0c;以及模拟通信协议输出时序等。 输入模式时可以读取端口的高低电平或电压&#xff0c;用于读取按键输入&#xff0c;外接模块电平信号输入&#xff0c;ADC电压采集灯 GP…

【NERF】入门学习整理(二)

【NERF】入门学习整理(二) 1. Hierarchicalsampling分层采样2. Loss定义(其实就是简单的均方差MSE)3. 隐式重建与显示重建1. Hierarchicalsampling分层采样 粗网络coarse,均匀采样64个点 缺点:如果仅使用粗网络会存在点位浪费和欠采样的问题,比比如空气中很多无效的点 精细…

【C语言】文件操作篇-----程序文件和数据文件,文件的打开和关闭,二进制文件和文本文件,fopen,fclose【图文详解】

欢迎来CILMY23的博客喔&#xff0c;本篇为【C语言】文件操作篇-----程序文件和数据文件&#xff0c;文件的打开和关闭&#xff0c;二进制文件和文本文件【图文详解】&#xff0c;感谢观看&#xff0c;支持的可以给个一键三连&#xff0c;点赞关注收藏。 前言 在了解完动态内存管…

运维随录实战(13)之docker搭建mysql集群(pxc)

了解 MySQL 集群之前,先看看单节点数据库的弊病 大型互联网程序用户群体庞大,所以架构需要特殊设计。单节点数据库无法满足大并发时性能上的要求。单节点的数据库没有冗余设计,无法满足高可用。单节点 MySQL无法承载巨大的业务量,数据库负载巨大常见 MySQL 集群方案 Re…

少儿编程 蓝桥杯青少组科技素养题 信息素养真题及解析第25套

少儿编程 科技素养 信息素养真题第25套 1、旅行结束之后&#xff0c;回到家的小蓝决定将照片备份在云端的网盘上。备份照片主要占用的是小蓝家的( )带宽 A、下行 B、上行 C、文件 D、数据 答案&#xff1a;B 考点分析&#xff1a;主要考查网络相关知识&#xff0c;要将照…

DHCP中继实验(华为)

思科设备参考&#xff1a; 一&#xff0c;技术简介 DHCP中继&#xff0c;可以实现在不同子网和物理网段之间处理和转发DHCP信息的功能。如果DHCP客户机与DHCP服务器在同一个物理网段&#xff0c;则客户机可以正确地获得动态分配的IP地址。如果不在同一个物理网段&#xff0c;…

JVM知识整体学习

前言&#xff1a;本篇没有任何建设性的想法&#xff0c;只是我很早之前在学JVM时记录的笔记&#xff0c;只是想从个人网站迁移过来。文章其实就是对《深入理解JVM虚拟机》的提炼&#xff0c;纯基础知识&#xff0c;网上一搜一大堆。 一、知识点脑图 本文只谈论HotSpots虚拟机。…

C# 视频转图片

在 C# 中将视频转换为图像可以使用 FFmpeg 库。下面是一个示例代码来完成这个任务&#xff1a; using System; using System.Diagnostics;class Program {static void Main(string[] args){string inputFile "input_video.mp4"; // 输入的视频文件路径string outpu…

【Leetcode打卡】递归回溯

【Leetcode打卡】递归回溯 784. 字母大小写全排列 class Solution { public:int find(string s,int pos){int ipos;while(i<s.size()){if(isalpha(s[i])){return i;}i;}return -1;}void turn(string& s,int pos){if(islower(s[pos])){s[pos]toupper(s[pos]);}else{s[po…