深入剖析Java线程池的核心概念与源码解析:从Executors、Executor、execute逐一揭秘

文章目录

  • 文章导图
  • 前言
  • Executors、Executor、execute对比剖析
  • Executors生成的线程池?
  • 线程池中的 execute 方法
    • execute 方法的作用
    • execute的工作原理
      • 拒绝策略
    • 源码分析工作原理
      • 基本知识
        • 线程的状态
        • 线程池的状态
        • 线程池状态和线程状态总结
        • 线程池的状态信息和线程数量信息(ctl)
      • execute->addWorker->runWorker->getTask流程
      • execute方法
        • 步骤详细解释:
        • 关键方法:
        • 总结:
      • addWorker
        • Worker线程
        • 步骤详细解释:
        • 方法的关键点:
        • 总结:
      • runWorker
        • 代码解读:
      • getTask
        • 代码解读:
        • 特别注意:

线程池系列文章可参考下表,目前已更新完毕…

线程池系列:文章
Java基础线程池深入剖析Java线程池的核心概念与源码解析:从Executors、Executor、execute逐一揭秘
CompletableFuture线程池从用法到源码再到应用场景:全方位了解CompletableFuture及其线程池
SpringBoot默认线程池(@Async和ThreadPoolTaskExecutor)探秘SpringBoot默认线程池:了解其运行原理与工作方式(@Async和ThreadPoolTaskExecutor)
SpringBoot默认线程池和内置Tomcat线程池你是否傻傻分不清SpringBoot默认线程池和内置Tomcat线程池?

文章导图

image-20240601182910930

前言

在日常编码中,特别是在处理并发编程时,Java 提供了很多便捷工具帮助我们高效运行。不过你是否也曾被 Executors、Executor 和 execute 这些名字搞得一头雾水?

它们长得这么像,究竟有什么区别呢?接下来跟着我一探究竟吧!
在这里插入图片描述

Executors、Executor、execute对比剖析

java.util.concurrent.Executors是一个工具类,提供了一些静态方法,用于创建各种类型的 ExecutorService 实例。例如,newFixedThreadPool 可以创建一个固定大小的线程池,newCachedThreadPool 可以创建一个可缓存的线程池等。它简化了创建线程池的过程,提供了一些默认的配置选项。
java.util.concurrent.Executor是一个接口,定义了一个单一的方法 execute,用于提交任务到执行器中。这个接口是线程池的基础接口,它没有提供直接创建线程池的方法。
java.util.concurrent.Executor#executeExecutor 接口中唯一的方法,用于将任务提交给执行器执行。这个方法通常用于提交实现了 Runnable 接口的任务。

简而言之,Executors 是一个工具,用于创建各种类型的线程池实例。Executor 是一个接口,定义了提交任务到执行器的方法。execute 方法是 Executor 接口定义的唯一方法,用于提交任务给执行器执行。

如果你需要创建线程池,可以使用 Executors 类提供的方法来创建,然后使用返回的线程池实例来提交任务。具体看我下面的这个例子就理解了!:

//Executors工具类创建线程池默认是ExecutorService,ExecutorService又继承了Executor,它是线程池最底层的基础接口!
Executor executor = Executors.newFixedThreadPool(5);
//然后用executor线程池去执行execute方法,接收一个Runnable任务
executor.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName());}
});

image-20240530222442006

Executors生成的线程池?

说到了Executors,就必须谈下用它生成的线程池,在阿里Java规约中,对Executors有一个专门的规约:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lKKPHsVd-1596103123455)(EE1DC468E4A74F7AB21E05EF9D46AFB0)]

原因其实也很简单,会有各种OOM风险,也不便于管理,详细的我都帮你们总结成思维导图了!

  • Executors.newWorkStealingPool方法之外,其他方法都有OOM风险。
  • 我们可以发现Executors创建的线程池底层其实还是基于ThreadPoolExecutor的方式

image-20240531214546898

线程池中的 execute 方法

讲完了Executors生成的线程池,接下来当然是看看这个线程池的核心方法execute了

execute 方法的作用

execute 方法是线程池的核心方法之一,可以用来提交任务并执行,当然也可以用submit,但是实际核心逻辑都是一样的。

不管是上面的Executors生成的线程池,还是自己定义的ThreadPoolExecutor线程池,实际去执行线程的时候都会调用到我们的execute 方法,

  • 它用于向线程池提交一个任务,供线程池调度执行。

  • 它接受一个 Runnable 对象作为参数,并将其提交给内部的工作队列。

execute的工作原理

线程池采用的是一种生产者-消费者的模型,如下图:
FixedThreadPool的execute()方法运行示意图

工作原理如下:

  1. 任务提交
    • 当外部提交一个任务到线程池时,线程池会根据当前的运行状态和线程数量决定如何处理这个任务。
  2. 任务队列和线程的管理
    • 如果当前运行的线程少于核心线程数(corePoolSize),线程池会创建一个新的工作线程来执行这个任务。
    • 如果当前运行的线程数达到核心线程数,任务会被放入任务队列等待执行。
    • 如果任务队列已满,并且当前运行的线程数少于最大线程数(maximumPoolSize),线程池会创建新的工作线程来处理任务。
    • 如果任务队列已满,并且线程池中的线程数已经达到最大线程数,线程池会根据拒绝策略处理这个任务。
  3. 任务执行
    • 工作线程从任务队列中获取任务并执行。
    • 执行完一个任务后,工作线程会继续从任务队列中获取下一个任务,直到任务队列为空。
  4. 线程的回收和销毁
    • 如果一个线程在一定时间内(keepAliveTime)没有获取到新的任务,且当前线程数超过核心线程数,那么该线程将被终止。
    • 当所有任务完成后,核心线程会继续等待新任务,而非核心线程会被回收。

image-20240601013614020

拒绝策略

策略名称描述适用场景
AbortPolicy默认的拒绝策略。当任务无法提交到线程池时,抛出 RejectedExecutionException 异常。希望在任务无法处理时立即得到通知,并进行相应处理的场景。
CallerRunsPolicy调用者运行策略。当任务无法提交到线程池时,由提交任务的线程(即调用者线程)执行该任务。希望降低任务提交速度,并且不希望丢弃任务的场景。
DiscardPolicy丢弃策略。当任务无法提交到线程池时,直接丢弃该任务,不进行任何处理。可以接受任务丢失,并且不希望对系统产生过多负载的场景。
DiscardOldestPolicy丢弃最旧策略。当任务无法提交到线程池时,丢弃任务队列中最旧的未处理任务,然后尝试重新提交当前任务。希望抛弃旧任务,优先处理新任务的场景。

在实际操作里,建议选默认的AbortPolicyCallerRunsPolicy策略。

AbortPolicy策略:

  • AbortPolicy是最保险安全的,简单粗暴,一满就拒绝,无论什么情况都能保证系统不会因线程池出问题。
  • 缺点也明显,一满就可能丢线程,没执行到你想执行的业务逻辑。

CallerRunsPolicy策略:

  • CallerRunsPolicy策略是,当线程池塞不下新任务时,会让调用线程来跑,既不丢任务,又能适当减缓新任务速度,减轻线程池压力,特别适合需要高执行保证的场景。
  • 但它的缺点正好相反,不丢线程,但流量大时可能出问题。

想象一下,某天早上,系统访问量突然猛增,线程池马上满了,剩下的都被拒绝策略打回给调用者线程处理,就像高速公路后半段的车都要掉头走别的路,你说后面的高速会不会堵得瘫痪?

系统也是一样,这个拒绝策略在这种情况下可能让系统崩溃,在前端页面上可能出现假死、卡顿、超时未响应等问题。

当然,如果默认的策略不满足你的需求也可以通过实现 RejectedExecutionHandler 接口来定义自己的拒绝策略,以满足特殊的业务需求。

class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 自定义处理逻辑}
}

源码分析工作原理

上面讲到了线程池的工作原理或者也是工作流程,那么为什么是这样的呢?

知其然知其所以然,那我们来看看源码不就知道了!

基本知识

线程的状态

注意我们这里涉及到了线程池的状态,注意区别我们常见的线程状态,线程状态如下

线程状态指的是单个线程在其生命周期中所处的状态。Java 中的线程有以下几种状态:

img

状态描述
NEW线程已创建但尚未启动。
RUNNABLE线程正在 Java 虚拟机中执行。
BLOCKED线程被阻塞,等待监视器锁。
WAITING线程无限期等待另一个线程执行特定操作。
TIMED_WAITING线程在指定的等待时间内等待另一个线程执行特定操作。
TERMINATED线程已退出。

代码示例:

Thread thread = new Thread(() -> {// 线程任务
});System.out.println(thread.getState()); // NEW
thread.start();
System.out.println(thread.getState()); // RUNNABLE 或其他状态
线程池的状态

线程池的状态有5种,他们的状态转换如下图所示,可以看到和线程的状态完全它们不是一回事。

图3 线程池生命周期

ThreadPoolExecutor类存放线程池的状态信息很特别,是存储在一个int类型原子变量的高3位,而低29位用来存储线程池当前运行的线程数量。通过将线程池的状态和线程数量合二为一,可以做到一次CAS原子操作更新数据。

状态高3位值说明
RUNNING111运行状态,线程池被创建后的初始状态,能接受新提交的任务,也能处理阻塞队列中的任务。
SHUTDOWN000关闭状态,不再接受新提交的任务,但任可以处理阻塞队列中的任务。
STOP001停止状态,会中断正在处理的线程,不能接受新提交的任务,也不会处理阻塞队列中的任务。
TIDYING010所有任务都已经终止,有效工作线程为0。
TERMINATED011终止状态,线程池彻底终止。

代码示例:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);// 获取线程池状态
System.out.println(executor.isShutdown()); // false
System.out.println(executor.isTerminating()); // false
System.out.println(executor.isTerminated()); // falseexecutor.shutdown();// 获取线程池状态
System.out.println(executor.isShutdown()); // true
System.out.println(executor.isTerminating()); // true 或 false
System.out.println(executor.isTerminated()); // true 或 false
线程池状态和线程状态总结
比较项线程状态线程池状态
描述单个线程在其生命周期中所处的状态。线程池在其生命周期中所处的状态。
常见状态NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATEDRUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED
主要用途用于监控和调试单个线程的执行状态。用于管理和控制整个线程池的生命周期和任务处理行为。
示例代码Thread.getState()ThreadPoolExecutor.isShutdown(), ThreadPoolExecutor.isTerminating(), ThreadPoolExecutor.isTerminated()
线程池的状态信息和线程数量信息(ctl)

线程池常见属性和方法.png

// 使用原子操作类AtomicInteger的ctl变量,前3位记录线程池的状态,后29位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的范围为[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用来辅助左移位运算
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位用来存储线程池运行状态,其余位数表示线程池的容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 线程池状态以常量值被存储在高三位中
private static final int RUNNING    = -1 << COUNT_BITS; // 线程池接受新任务并会处理阻塞队列中的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 线程池不接受新任务,但会处理阻塞队列中的任务
private static final int STOP       =  1 << COUNT_BITS; // 线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 所有任务都执行完成,且工作线程数为0,将调用terminated方法
private static final int TERMINATED =  3 << COUNT_BITS; // 最终状态,为执行terminated()方法后的状态
  • ctl变量的封箱拆箱相关的方法
// ctl变量的封箱拆箱相关的方法
private static int runStateOf(int c)     { return c & ~CAPACITY; } // 从ctl中获取线程池的状态值
private static int workerCountOf(int c)  { return c & CAPACITY; } // 从ctl中获取线程池的数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //生成ctl值, rs 表示线程池状态,wc 表示当前线程池中 worker
(线程)数量,相与以后就是合并后的状态
  • ​ 线程池状态值比较
比较当前线程池 ctl 所表示的状态:线程池状态值的大小关系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
// 比较当前线程池 ctl 所表示的状态,是否小于某个状态 s
private static boolean runStateLessThan(int c, int s) { return c < s; }
// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态s
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// 小于 SHUTDOWN 的一定是 RUNNING,SHUTDOWN == 0
private static boolean isRunning(int c) { return c < SHUTDOWN; }
  • CAS设置ctl的值

    // 使用 CAS 方式 让 ctl 值 +1 ,成功返回 true, 失败返回 false
    private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
    }
    // 使用 CAS 方式 让 ctl 值 -1 ,成功返回 true, 失败返回 false
    private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
    }
    // 将 ctl 值减一,do while 循环会一直重试,直到成功为止
    private void decrementWorkerCount() {
    do {} while (!compareAndDecrementWorkerCount(ctl.get()));
    }

execute->addWorker->runWorker->getTask流程

ThreadPoolExecutor 类中的 execute 方法、addWorker 方法、runWorker 方法和 getTask 方法共同协作,以确保线程池的有效执行和任务管理。

  1. execute 方法
    • 是线程池接口 Executor 的实现,用户通过这个方法提交任务(Runnable 对象)给线程池。
    • execute 方法首先判断是否需要添加新的工作线程(Worker)来执行提交的任务,如果是,则调用 addWorker 方法。
  2. addWorker 方法
    • 负责创建新的工作线程(Worker),并将提交的任务作为线程的首个任务(firstTask)。
    • 该方法同样负责确认线程池的状态和工作线程的数量是否允许添加新的 Worker
    • 如果成功添加了 Worker,该 Worker 会通过 runWorker 方法开始执行。
  3. runWorker 方法
    • 由每个工作线程(Worker)调用,是工作线程的主要执行循环。
    • 在这个方法中,线程会循环地从任务队列中获取任务(通过调用 getTask 方法),并执行它们。
  4. getTask 方法
    • 从线程池的任务队列中获取待执行的任务。
    • 如果线程池处于关闭状态或者配置允许线程超时且没有任务可执行,getTask 可以返回 null,导致 runWorker 方法结束执行循环,从而允许工作线程结束。

大致流程如下:
image-20240601195340699

了解了大致流程以后,接下来我们再仔细剖析每个方法!

execute方法

这段代码是Java中的ThreadPoolExecutor类的核心方法之一:execute。此方法负责处理接收到的Runnable任务,并根据线程池的状态(包括核心线程、工作队列、最大线程数等)来决定如何处理这个任务。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取当前线程池的状态int c = ctl.get();// 步骤1:如果当前运行的线程数少于 corePoolSize,则创建一个新线程来执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))  // 调用 addWorker 方法尝试增加一个新的工作线程return;c = ctl.get();  // 如果 addWorker 失败(添加线程失败或线程池已关闭),则重新获取线程池状态}// 步骤2:如果任务可以成功加入工作队列if (isRunning(c) && workQueue.offer(command)) {  // 尝试将任务加入工作队列int recheck = ctl.get();  // 再次检查线程池状态if (!isRunning(recheck) && remove(command))  // 如果线程池不再运行,并且成功从队列中移除任务reject(command);  // 拒绝任务else if (workerCountOf(recheck) == 0)  // 如果线程池仍然运行但没有正在执行的线程addWorker(null, false);  // 尝试添加一个新的工作线程}// 步骤3:如果工作队列已满或池已满,尝试创建一个新的线程,如果失败则拒绝任务else if (!addWorker(command, false))reject(command);  // 如果添加新线程失败,拒绝任务
}
步骤详细解释:
  1. 第一步:如果当前运行的线程数少于核心线程数(corePoolSize),尝试创建一个新线程来执行任务

    • if (workerCountOf(c) < corePoolSize):检查当前运行的线程数是否少于核心线程数。
    • if (addWorker(command, true)):调用 addWorker 方法尝试创建一个新线程,如果成功,新线程将执行给定的任务 command
      • addWorker(command, true)返回 true 表示已成功创建并启动了一个新线程来处理任务。
      • 如果 addWorker 失败(如无法新建线程,或线程池状态已变),则继续步骤2。
  2. 第二步:如果成功地将任务放入队列

    • if (isRunning(c) && workQueue.offer(command)):检查线程池是否处于运行状态,并尝试将任务添加到工作队列中(非阻塞操作)。
    • if (!isRunning(recheck) && remove(command)):重新检查线程池的状态,如果线程池不再运行,并且成功从工作队列中移除了任务(即任务被停止),则执行拒绝策略 reject(command)
    • else if (workerCountOf(recheck) == 0): 如果没有任何线程在运行,尝试创建一个新线程以确保至少有一个线程在执行任务。
  3. 第三步:如果无法将任务加入队列(即队列满了)

    • else if (!addWorker(command, false)):尝试新增一个非核心线程来执行任务,如果再次失败(比如线程数超过最大限制 maxPoolSize),则执行拒绝策略 reject(command)
关键方法:
  • ctl: 是一个控制变量,它用来控制线程池的状态和线程数量。

  • workerCountOf(c): 是一个方法,用来从控制变量c中提取当前的工作线程数量。

  • addWorker(Runnable firstTask, boolean core):该方法用于创建并启动新线程,将新线程添加到线程池中执行任务。

  • isRunning(int c):检查线程池是否处于运行状态。

  • workQueue.offer(command):尝试将任务加入工作队列。

  • remove(Runnable task):从工作队列中移除指定任务。

  • reject(Runnable task):拒绝任务,通常是通过调用RejectedExecutionHandler

总结:

这段代码通过三个步骤确保线程池以最佳方式处理新提交的任务:

  1. 优先使用核心线程处理任务。
  2. 如果核心线程已满,将任务放入队列。
  3. 如果队列已满,则使用备用策略(如创建新的非核心线程)处理任务,最后如果所有处理策略都失败,则拒绝新任务。

这种设计用于确保线程池的高效利用并提供灵活的任务处理机制,同时保护系统免受过载。

addWorker

Worker线程

Worker是通过继承AQS(AbstractQueuedSynchronizer),使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread;//Worker持有的线程Runnable firstTask;//初始化的任务,可以为null
}

Worker 类是线程池中负责实际执行任务的工作线程。它封装了一个线程,并且负责从任务队列中取出任务并执行。WorkerThreadPoolExecutor 中的主要作用包括:

  1. 执行任务Worker 从任务队列中取出任务并执行这些任务。
  2. 维护线程存活:即使当前没有任务可执行,Worker 线程也可以保持存活,以便随时准备处理新的任务。
  3. 生命周期管理Worker 管理着线程的生命周期,包括启动、运行和终止。
步骤详细解释:

这段代码来自ThreadPoolExecutor类的addWorker方法,该方法用于向线程池中添加新的工作线程。如果线程池处于非关闭状态或者允许新任务进入,则会将传入的Runnable任务交给新线程来执行。以下是对该代码的逐行解释:

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();  // 获取线程池状态控制变量int rs = runStateOf(c);  // 获取线程池当前运行状态// 检查线程池状态:// 1. 线程池状态是否 >= SHUTDOWN(即 STOP、TERMINATE 等),如果是且// 2. 当前状态是 SHUTDOWN,且 firstTask 为 null,且工作队列不为空,则继续添加。// 否则,不能添加新线程,直接返回 false。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 内层循环,尝试增加 worker 数量。for (;;) {int wc = workerCountOf(c);  // 获取当前工作线程数// 3. 如果工作线程数已达到上限(CAPACITY)或达到核心线程数(corePoolSize)或最大线程数(maximumPoolSize),则返回 false。if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;// 4. 尝试通过 CAS(Compare-And-Swap)操作增加工作线程计数。如果成功,则跳出整个 retry 循环。if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // 5. 如果 CAS 失败,重新获取线程池状态控制变量// 6. 如果运行状态已改变,则返回到外层 retry 循环重新尝试。if (runStateOf(c) != rs)continue retry;// 否则,重新尝试内循环(CAS 操作失败的情况)。}}// 简单总结上面的CAS过程://(1)内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas//(2)cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了//(3)如果变了,则重新进入外层循环重新获取线程池状态,否者重新进入内层循环继续进行cas// 走到这里说明cas成功,线程数+1,但并未被执行boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建新 Worker,并分配 firstTaskw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;  // 获取主锁,确保添加操作的同步mainLock.lock();try {// 7. 再次检查池的运行状态,确保在保持锁定的一致性int rs = runStateOf(ctl.get());// 8. 如果状态允许添加新线程(即池处于 RUNNING 或 SHUTDOWN 且 firstTask 为 null),则进行进一步操作。if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 防止创建已启动的线程if (t.isAlive()) throw new IllegalThreadStateException();// 将 Worker 添加到工作集workers.add(w);// 更新最大池大小统计int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();  // 确保在退出时释放锁}if (workerAdded) {t.start();  // 启动工作线程workerStarted = true;}}} finally {// 如果线程启动失败,进行适当的清理if (! workerStarted)addWorkerFailed(w);}// 返回是否成功启动工作线程return workerStarted;
}
  1. 初始检查和状态验证

    • 使用一个无限循环 (retry: 标签) 不断尝试添加新的工作线程。
    • 从控制字段 (ctl) 中获取线程池状态,并获取当前的运行状态 (runStateOf(c)).
    • 检查线程池是否正在关闭 (rs >= SHUTDOWN),如果是且任务队列不为空,返回 false 退出。
  2. 递增工作线程数量

    • 内部循环再一次检查当前的工作线程数量 (workerCountOf(c)),并确保其未超过CAPACITY或给定的大小限制 (corePoolSizemaximumPoolSize)。
    • 使用CAS操作 (compareAndIncrementWorkerCount(c)) 递增工作线程计数,确保线程安全。
    • 如果CAS操作失败,重新获取 ctl 用于检查运行状态是否已更改。
  3. 实例化工作线程并添加到池中

    • 如果成功增加了工作线程计数,创建一个新的 Worker 实例,并分配传入的 firstTask
    • 获取Worker关联的线程实例 (w.thread) 并使用ReentrantLock (mainLock) 确保操作的同步。
    • 再次检查运行状态,如果状态允许则工作线程已添加入池中 (workers.add(w)), 并更新线程池大小统计 (largestPoolSize)。
  4. 启动新线程

    • 如果 Worker 成功添加,则启动新线程 (t.start()), 将 workerStarted 设置为 true
  5. 清理和失败处理

    • 最后,如果线程未成功启动,调用 addWorkerFailed 方法进行清理和失败处理。
    • 返回 workerStarted 表示工作线程是否成功启动。
方法的关键点:
  • 运行状态验证:多次检查和验证线程池的运行状态以确定是否应该添加新的工作线程。
  • 线程安全:使用CAS操作和锁来确保多线程环境中的线程安全。
  • 资源管理:同步地添加或移除线程,并在失败时进行恰当的资源清理。
总结:

该方法通过复杂且细致的状态检查和同步操作,确保只在适宜的条件下添加新的工作线程以处理任务,从而维护线程池的有效运行和资源的最佳利用。多次的状态检查和CAS操作确保了较高的并发安全性。

runWorker

if (workerAdded) {t.start();  // 启动工作线程workerStarted = true;
}

在上面的addWorker方法中,我们看到,如果线程添加成功(workerAdded=true)了以后,会执行t.start()启动线程

// Worker 类的 run 方法,是每个 worker 线程的执行主体
final void runWorker(Worker w) {Thread wt = Thread.currentThread(); // 获取当前线程的引用Runnable task = w.firstTask; // 取出传入的第一个任务w.firstTask = null; // 清空第一个任务,避免重复引用导致内存泄漏w.unlock(); // 解锁,允许中断// 初始值为 true,以标记线程是由于未处理的异常突然完成任务boolean completedAbruptly = true;try {// 循环取任务执行,当 task 不为 null 或者能从队列中获取到任务时while (task != null || (task = getTask()) != null) {w.lock(); // 对 worker 加锁,开始执行任务// 如果线程池停止或当前线程被中断,则确保线程被中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行任务之前可以进行的钩子方法beforeExecute(wt, task);// 异常变量,用于捕获任务执行中的异常Throwable thrown = null;try {task.run(); // 运行任务} catch (RuntimeException x) {thrown = x; throw x; // 捕获运行时异常} catch (Error x) {thrown = x; throw x; // 捕获错误} catch (Throwable x) {thrown = x; throw new Error(x); // 捕获其他可抛出的异常} finally {// 执行任务之后的钩子方法,传递任务和异常信息afterExecute(task, thrown);}} finally {// 任务执行完毕,清除引用,完成此任务task = null;// 完成任务计数增加w.completedTasks++;// 解锁 workerw.unlock();}}// 如果 while 循环完成并没有异常,则设置 completedAbruptly 为 falsecompletedAbruptly = false;} finally {// 线程退出处理,针对异常终止或正常终止的线程做不同的退出处理processWorkerExit(w, completedAbruptly);}
}
代码解读:
  1. 线程开始运行,获取当前线程引用,并尝试执行Worker的第一个任务。
  2. w.unlock()允许在执行任务时可以中断当前线程,比如取消任务或关闭线程池的情况。
  3. completedAbruptly标志位用于跟踪任务是否突然完成(如抛出异常)。
  4. try块中循环执行以下步骤:
    • 从任务队列中获取下一个任务 (getTask())。
    • 执行Worker的加锁操作,以确保任务的执行过程中不会被其他线程干扰。
    • 如果线程池正在停止,确保当前线程被中断;如果线程池没有停止,确保线程不被中断。
    • 在开始执行任务之前调用beforeExecute()
    • 执行任务 (task.run())。
    • 使用try-catch捕捉执行任务过程中抛出的异常。
    • 在任务执行完毕后调用afterExecute()
  5. 每完成一个任务后,将该任务设置为null,完成任务数 (completedTasks) 加一,然后进行解锁操作。
  6. 如果while循环正常结束,将completedAbruptly设置为false,表示工作线程正常完成,没有发生异常。
  7. 最后,在finally块中调用processWorkerExit()方法处理工作线程的退出。如果completedAbruptly仍为true,意味着工作线程异常退出,这个方法将进行相应的清理工作。

以上流程确保了任务能够在工作线程中被正确处理执行,同时确保在任务执行前后能进行相应的额外处理(如资源释放、计数器维护等),并且在工作线程因异常结束时做出适当的清理。

getTask

final void runWorker(Worker w) {.....// 循环取任务执行,当 task 不为 null 或者能从队列中获取到任务时while (task != null || (task = getTask()) != null) {

上面runWorker里面的getTask() 也是 ThreadPoolExecutor 中的一个方法,用于从线程池的工作队列中获取待执行的任务。它处理线程池的状态和工作队列,以决定是否提供一个任务、等待任务或终止线程。

private Runnable getTask() {boolean timedOut = false; // 上一次调用 poll() 是否超时?for (;;) { // 无限循环,直到从工作队列获取到任务或者确定线程应该终止int c = ctl.get(); // 获取当前线程池控制状态int rs = runStateOf(c); // 从控制状态中提取运行状态// 如果线程池正在 SHUTDOWN 或更高状态,并且工作队列为空或者线程池正在 STOP 状态// 那么此线程应该被终止if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // 减少工作线程的计数器return null; // 返回 null,表示没有任务执行,线程应该退出}int wc = workerCountOf(c); // 从控制状态中提取当前工作线程数// 判断是否允许核心线程超时或当前工作线程数大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 如果当前工作线程数大于最大线程数,或者(允许超时并且已经超时)且(工作线程数大于1或工作队列为空)// 那么尝试减少工作线程并返回 nullif ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null; // 成功减少工作线程计数,则当前线程应该退出continue; // 如果减少失败,重新进行循环}try {// 根据队列是否允许线程超时来从工作队列获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r; // 如果成功获取到任务,则返回这个任务timedOut = true; // 如果 poll() 调用返回了 null,则表示超时} catch (InterruptedException retry) {timedOut = false; // 如果在等待任务时被中断,则重置超时标志}}
}
代码解读:
  1. 循环尝试:该方法在一个无限循环中运行,直到获取一个任务或确定线程应当退出。
  2. 状态检查:检查线程池的当前状态,以及当前线程是否应当因为线程池状态(如关闭)或工作队列状态(如空)而终止。
  3. 线程数调整:根据线程池配置(allowCoreThreadTimeOutcorePoolSizemaximumPoolSize)和当前状态(超时、工作队列空),决定是否终止当前线程。
  4. 任务获取
    • 如果不需要终止当前线程,则尝试从工作队列中获取任务。
    • 根据是否允许超时,使用 poll(带超时)或 take(无超时)从队列中获取任务。
  5. 超时和中断处理:处理在等待任务时发生的超时和中断事件。
特别注意:
  • 这个方法负责管理线程是否持续等待新任务或者是从工作队列中获取任务时是否采取超时策略,这对于线程池的效率和资源管理非常关键。
  • 方法通过精细的逻辑检查线程池的状态和工作队列的状态,合理地控制线程的生命周期,确保不会有过多线程空闲或占用资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/20534.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RedisSearch与Elasticsearch:技术对比与选择指南

码到三十五 &#xff1a; 个人主页 数据时代&#xff0c;全文搜索已经成为许多应用程序中不可或缺的一部分。RedisSearch和Elasticsearch是两个流行的搜索解决方案&#xff0c;它们各自具有独特的特点和优势。本文简单探讨一些RedisSearch和Elasticsearch之间的技术差异。 目录…

9款实用而不为人知的小众软件推荐!

AI视频生成&#xff1a;小说文案智能分镜智能识别角色和场景批量Ai绘图自动配音添加音乐一键合成视频https://aitools.jurilu.com/ 在电脑软件的浩瀚海洋中&#xff0c;除了那些广为人知的流行软件外&#xff0c;还有许多简单、干净、功能强大且注重实用功能的小众软件等待我们…

[NISACTF 2022]sign_crypto(LATEX)

题目&#xff1a; 我们看出这是LATEX编码&#xff0c;破解之后&#xff1a; 看出每个“\”之后的第一个字母连起来即使&#xff1a;nss....&#xff0c;在大写即可得到flag。

Sui Nami Bags对NFT使用案例进行创新

在四月的Sui Basecamp活动中&#xff0c;与会者体验了一系列Sui技术&#xff0c;这些技术以Nami Bags的形式呈现&#xff0c;这些数字礼包里满是来自Sui生态的NFT和优惠券。通过Enoki&#xff08;Mysten Labs的新客户参与平台&#xff09;提供支持&#xff0c;即使没有加密钱包…

OpenCV学习 基础图像操作(十七):泛洪与分水岭算法

原理 泛洪填充算法和分水岭算法是图像处理中的两种重要算法&#xff0c;主要用于区域分割&#xff0c;但它们的原理和应用场景有所不同&#xff0c;但是他们的基础思想都是基于区域迭代实现的区域之间的划分。 泛洪算法 泛洪填充算法&#xff08;Flood Fill&#xff09;是一…

修改element-ui el-radio颜色

修改element-ui el-radio颜色 需求效果图代码实现 小结 需求 撤销扣分是绿色&#xff0c;驳回是红色 效果图 代码实现 dom <el-table-columnlabel"操作"width"200px"><template v-slot"scope"><el-radio-group v-model"s…

Vue插槽与作用域插槽

title: Vue插槽与作用域插槽 date: 2024/6/1 下午9:07:52 updated: 2024/6/1 下午9:07:52 categories: 前端开发 tags:VueSlotScopeSlot组件通信Vue2/3插槽作用域API动态插槽插槽优化 第1章&#xff1a;插槽的概念与原理 插槽的定义 在Vue.js中&#xff0c;插槽&#xff08;…

c++(七)

c&#xff08;七&#xff09; 内联函数内联函数的特点为什么要有内联函数内联函数是如何工作的呢 类型转换异常处理智能指针单例模式懒汉模式饿汉模式 VS中数据库的相关配置 内联函数 修饰类的成员函数&#xff0c;关键字&#xff1a;inline inline 返回值类型 函数名(参数列…

vue-el-steps 使用2[代码示例]

效果图 代码 element代码 <template> <div class"app-container"> <el-form :model"queryForm" size"small" :inline"true"> <el-form-item label"内容状态"> <el-button-group> <el-bu…

as keyof GlobalStore

解释 as keyof GlobalStore 在 TypeScript 中&#xff0c;as keyof GlobalStore 是一种类型断言语法。它告诉 TypeScript&#xff0c;返回的值是一个特定类型的值&#xff0c;这里是 GlobalStore 类型的键。这在编译时有助于确保类型安全。 关键点&#xff1a; 类型断言&…

构建智慧银行保险系统的先进技术架构

随着科技的不断发展&#xff0c;智慧银行保险系统正日益受到关注。在这个数字化时代&#xff0c;构建一个先进的技术架构对于智慧银行保险系统至关重要。本文将探讨如何构建智慧银行保险系统的先进技术架构&#xff0c;以提升服务效率、降低风险并满足客户需求。 ### 1. 智慧银…

qwen-moe

一、定义 qwen-moe 代码讲解&#xff0c; 代码qwen-moe与Mixtral-moe 一样&#xff0c; 专家模块qwen-moe 开源教程Mixture of Experts (MoE) 模型在Transformer结构中如何实现&#xff0c;Gate的实现一般采用什么函数&#xff1f; Sparse MoE的优势有哪些&#xff1f;MoE是如…

统计信号处理基础 习题解答10-6

题目 在例10.1中&#xff0c;把数据模型修正为&#xff1a; 其中是WGN&#xff0c;如果&#xff0c;那么方差&#xff0c;如果&#xff0c;那么方差。求PDF 。把它与经典情况PDF 进行比较&#xff0c;在经典的情况下A是确定性的&#xff0c;是WGN&#xff0c;它的方差为&#…

5.算法讲解之-二分查找(简单易懂)

1.简介 1.二分查找的思路简单易懂&#xff0c;较难的是如何处理查找过程中的边界条件&#xff0c;当较长时间没写二分查找的时候就容易忘记如何处理边界条件。 2.只有多写代码&#xff0c;多做笔记就不易忘记边界条件 2.算法思路 正常查找都是从头到尾查找一个数字是否在数组中…

使用pycharm+opencv进行视频抽帧(可以用来扩充数据集)+ labelimg的使用(数据标准)

一.视频抽帧 1.新创建一个空Pycharm项目文件&#xff0c;命名为streach zhen 注&#xff1a;然后要做一个前期工作 创建opencv环境 &#xff08;1&#xff09;我们在这个pycharm项目的终端里面输入下面的命令&#xff1a; pip install opencv-python --user -i https://pypi.t…

[数据集][目标检测]猕猴桃检测数据集VOC+YOLO格式1838张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;1838 标注数量(xml文件个数)&#xff1a;1838 标注数量(txt文件个数)&#xff1a;1838 标注…

sensitive-word 敏感词 v0.17.0 新特性之 IPV4 检测

敏感词系列 sensitive-word-admin 敏感词控台 v1.2.0 版本开源 sensitive-word-admin v1.3.0 发布 如何支持分布式部署&#xff1f; 01-开源敏感词工具入门使用 02-如何实现一个敏感词工具&#xff1f;违禁词实现思路梳理 03-敏感词之 StopWord 停止词优化与特殊符号 04-…

Jupyter Notebook快速搭建

Jupyter Notebook why Jupyter Notebook Jupyter Notebook 是一个开源的 Web 应用程序&#xff0c;允许你创建和分享包含实时代码、方程、可视化和解释性文本的文档。其应用包括&#xff1a;数据清洗和转换、数值模拟、统计建模、数据可视化、机器学习等等。 Jupyter Notebo…

东芝机械人电池低报警解除与机器人多旋转数据清零

今天启动一台设备,触摸屏一直显示机器人报警(翻译过后为电池电量低),更换电池后关机重启后也不能消除,所以打开示教器,下面就来说说怎么解决此项问题(可以参考官方发的手册,已手册为主)。 一,设备 下面来看看机械手的照片与示教器的照片 四轴机械手(六轴机器人有可…

可视化大屏也在卷组件化设计了?分享一些可视化组件

hello&#xff0c;我是大千UI工场&#xff0c;这次分享一些可视化大屏的组件&#xff0c;供大家欣赏。&#xff08;本人没有源文件提供&#xff09;