线程池主要解决了两个问题:
第一个是当大量执行异步任务的时候提供较好的性能;在不使用线程池的时候,每次需要执行一个异步任务都需要新建一个 Thread 来进行,而线程的创建和销毁都是需要时间的,所以可以通过线程池来实现线程的复用,从而解决这个问题。
同时线程池也提供了一种资源限制和管理的手段,比如可以限制线程的个数、动态的增加线程等;ThreadPoolExecutor 保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
1)介绍
1.1)使用案例
ThreadPoolExecutor
是 Java 中 java.util.concurrent
包的一部分,用于管理线程池的实现。它提供了一种灵活的方法来 创建和管理线程池,以便有效地执行大量的任务。它允许开发人员配置线程池的各种参数,如核心线程数、最大线程数、线程闲置时间、任务队列等。简单来说,它就是一个内部维护了一个线程池的任务执行器(Executor)。
在正式开始阅读源码之前,先来看一下应该如何使用这个它来执行任
public class ThreadPoolExecutorSourceRead {public static void main(String[] args) {// 构造线程池执行器ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,10,TimeUnit.SECONDS,new LinkedBlockingQueue<>(100));for (int i = 10; i >= 0; i--) {threadPoolExecutor.execute(() -> {try {Thread.sleep(1000);System.out.println("Hello");System.out.println(threadPoolExecutor.isTerminated());} catch (InterruptedException e) {throw new RuntimeException(e);}});}// 关闭线程池threadPoolExecutor.shutdown();try {if (!threadPoolExecutor.awaitTermination(60, TimeUnit.SECONDS)) {threadPoolExecutor.shutdownNow();}} catch (InterruptedException e) {threadPoolExecutor.shutdownNow();}}
}
1.2)类图结构与继承关系
ThreadPoolExecutor
类的类图:
继承关系:ThreadPoolExecutor
继承了 AbstractExecutorService
, AbstractExecutorService
又实现了 ExecutorService
接口,然后 ExecutorService
实现了 Executor
接口。
其中,Executor
接口是 Java 并发框架中的一个核心接口,用于 将任务的提交任务的执行解耦。它的设计目标是提供一种标准的方法来执行提交的任务,而不需要关心任务是如何执行的(例如,使用单个线程、线程池、异步调用等),这个接口是一个函数式接口,里面只有一个 execute(Runnable command); 方法。
/*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.*/void execute(Runnable command);
常见的和 Executor
接口有关的类有以下几个:
Executors
工具类,里面提供了静态的工厂方法来创建常用的 Executor 实现- 本节中讲的
ThreadPoolExecutor
是该接口最常用的实现,用于基于创建的线程池来执行任务 SingleThreadExecutor
, 使用单个工作线程执行任务,任务按提交顺序执行。- **
ScheduledThreadPoolExecutor
,**支持任务定时和周期性执行的Executor
实现
1.3)关键属性
说完了类的继承关系,下面按照类图中的顺序讲解一下 ThreadPoolExecutor
中需要了解的属性:
其中的成员变量 ctl
是一个 Integer 类型的原子变量,用来记录线程池的状态和线程池中的线程个数,类似于前面的 ReentrantReadWriteLock
使用一个变量来存储两个信息。
其中的 mainLock
是独占锁,用来控制新增 Worker
线程操作的原子性。termination
是这个锁对应的一个 Condition,线程调用**termination.awaitNanos(nanos)
** 方法的时候会处于阻塞的情况;再通过**termination.signalAll();
** 等方法来唤醒线程。
Worker
类实现了 Runnable
接口,是具体承载任务的对象,其代理了 Thread 线程,通过静态代理的方式代理了线程的 run()
方法,使得线程池管理线程更加方便。它继承 AQS 自己实现了简单的不可重入锁;其中 state=0 表示锁未被获取的状态,state=1 表示已经被获取的状态,state=-1 是 Worker 的默认状态,创建线程的时候设置初始状态会将 state 设置为 -1,将其设置为 -1 的原因后面会提到;其中的变量 firstTask 记录这个工作线程执行的第一个任务,thread 是具体执行任务的线程。
DefaultThreadFactory
是线程工厂,通过使用线程工厂提供的 newThread 方法可以便捷的创建线程
poolNumber
是一个静态的原子变量,用来统计线程工厂的个数,没当线程工厂被实例化之后,会使用 CAS 操作使得原子变量做一个递增操作。threadNumber
用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程名称的一部分。
2)预备知识
在正式开始源码的阅读之前,我们需要先了解一些预备知识。
2.1)线程池状态及转换方式
线程池有如下几种状态:
- RUNNING:接受新任务并且处理阻塞队列中的任务。
- SHUTDOWN:拒绝新任务并且处理阻塞队列中的任务。
- TIDYING:所有任务都执行完成(包括阻塞队列中的任务)后当前线程池活动线程数为 0,将要调用 terminated 方法。
- TERMINATED:终止状态,
terminated()
方法调用完成以后的状态。
它们之间的转换方式是这样的:
- RUNNING→SHUTDOWN:显示的调用
shutdown()
方法。 - RUNNING 或者 SHUTDOWN → STOP:显示的调用
shutdownNow()
方法的时候。 - SHUTDOWN → TIDYING:当线程池和任务队列都为空的时候。
- TIDYING→TERMINATED:当
terminated()
方法执行完成的时候。
2.2)线程池的参数
线程池的参数有以下几种:
corePoolSize
:线程池核心线程个数workQueue
:用于保存等待执行的任务阻塞队列,比如基于数组的有界 ArrayBlockingQueue、基于链表的无界 LinkedBlockingQueue、最多只有一个元素的同步队列 SynchronousQueue 以及优先级队列 PriorityBlockingQueue 等。maximumPoolSize
:线程池的最大线程数量ThreadFactory
:创建线程的工厂RejectedExecutionHandler
:饱和策略,当任务队列满并且线程个数达到最大容量的时候采取的策略,比如 AbortPolicy 抛出异常、CallerRunsPolicy 使用调用者所在的线程来运行任务、DiscardOldestPolicy 调用 poll 丢弃一个任务,执行当前的任务、以及 DiscardPolicy 默默丢弃不抛弃异常。keeyAliveTime
:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置的状态,则这些闲置的线程存活的最大时间TimeUnit
:存活时间的时间单位
2.3)线程池的类型
线程池的类型有以下几种,可以通过 Executors 工具类提供的方法快速的创建
newFixedThreadPool
:创建一个核心线程个数和最大线程个数都为 nThreads 的线程池,并且阻塞队列的长度为 Integer.MAX_VALUE。keeyAliveTIme 为 0,说明只要线程个数比核心线程个数多就执行回收的操作
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}// 使用自定义线程创建工厂public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}
newSingleThreadExecutor
:常见一个核心线程个数和最大线程个数都为 1 的线程池,并且阻塞队列的长度为 Integer.MAX_VALUE。且 keeyAliveTime = 0,即只要线程个数比核心线程数多就回收。
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}// 使用线程工厂创建public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}
newCachedThreadPool
:创建一个按需创建线程的线程池,初始的线程个数为 0,最多的线程个数为 Integer.MAX_VALUE,并且阻塞队列为同步队列。它的线程空闲存活时间为 60s。加入同步队列的任务会被马上执行,且同步队列中最多只有一个任务。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}// 使用线程工厂创建public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}
2.4)线程池的执行逻辑
线程池中任务的执行是依赖于 Worker 实例化对象的,它被创建后会存储在
private final HashSet<Worker> workers = new HashSet<Worker>();
中,Worker 是一个实现了 Runnable 接口的类,它的内部存储着一个真正执行任务的线程,它的创建方式是这样的:
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); // 创建线程}Thread newThread(Runnable r);
将 this 也就是 Worker 对象本身传递给线程工厂来创建,当通过 start 方法启动这个线程的时候,其实执行的就是 Worker 中重写的 run 方法。Worker 类是实际执行任务的类,它的 run()
方法的逻辑就是通过循环不断的从任务队列 workQueue
中获取任务,然后执行。
线程池就是维护根据上面提到的,诸如核心线程数,存活时间等参数来维护 workers,实时的增加或者删除其中的线程;通过将任务加入到 workQueue
队列中来让这些 worker 来执行这些任务。
3)源码分析
3.1)线程池执行任务
这个方法的作用是根据线程池当前的状态来决定让这个任务立刻执行、添加到线程池还是。执行拒绝策略
ThreadPoolExecutor
的实现是一个生产消费模型,当用户添加任务到线程池相当于生产者生产元素,而 Worker
来执行任务相当于消费元素;这个方法的执行逻辑在源码的注释中已经写的很清楚了,下面是翻译后的内容:
- 如果运行的线程少于
corePoolSize
,尝试启动一个新线程,并将给定的命令作为其第一个任务。调用addWorker
会原子地检查运行状态和工作线程数量,从而防止错误报警导致不应增加线程的情况,通过返回false
来阻止这种情况。 - 如果任务可以成功排队,那么我们仍然需要再次确认是否应该添加一个线程(因为自上次检查以来现有的线程可能已经终止)或者线程池在进入该方法之后已经关闭。因此,我们会重新检查状态,并在必要时回滚任务的排队操作(如果线程池已经停止),或者如果没有线程则启动一个新线程。
- 如果我们不能将任务排队,那么我们会尝试添加一个新线程。如果添加失败,我们就知道线程池已经关闭或已饱和,因此拒绝该任务。
public void execute(Runnable command) {// (1)检查传入的参数,如果为 null 会抛出 NPE 异常if (command == null)throw new NullPointerException();// (2)获取当前线程池的状态 + 线程个数的组合值 int c = ctl.get();// (3)检查线程池的个数是否小于 corePoolSize,如果小于则开启新的线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// (4)如果线程池处于 Running 状态,添加任务到阻塞队列if (isRunning(c) && workQueue.offer(command)) {// (4.1)二次检查int recheck = ctl.get();// 如果线程池的状态不是 RUNNING 则删除队列中的任务,执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// (4.2)如果当前线程池为空,则添加一个线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// (5)阻塞队列满了,尝试添加新的线程,如果失败了就执行拒绝策略else if (!addWorker(command, false))reject(command);}
上面需要特别注意的是代码(5)的执行时机,也就是当代码 (4)执行失败的处于什么状态
此时向任务队列中添加任务失败,或者是线程池不处于 RUNNING 状态(除了 RUNNING 状态以外都无法添加任务或需要执行拒绝策略),此时执行这段代码来尝试重新开启一个线程来执行任务,如果此时仍然添加失败,我们就知道线程池已经关闭或已饱和,因此拒绝该任务。
3.2)添加 Worker
这个方法是用来添加一个新的 Worker 对象到 workers 中,参数为初始任务(firstTask)、是否是核心线程(core)。
在添加 Worker 的时候需要保证并发安全性,且需要实时的监测线程池的状态,并且还含有执行任务,修改参数等代码,所以长度会很长,这里分成两个部分来讲解;先来看第一部分,这一部分通过 CAS 操作尝试去修改 Worker 的个数,当修改成功后才会去执行真正的实例化代码。
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 1)检查队列是否在必要的时候为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 2)在循环中通过 CAS 来增加线程的个数for (;;) {int wc = workerCountOf(c);// 2.1)如果线程数目超过指定的上限,返回 falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 2.2)使用 CAS 来增加线程的个数if (compareAndIncrementWorkerCount(c))break retry;// 2.3)如果 CAS 失败了,先去检查线程池的状态是否变化,// 如果变化则跳到外层循环重新获取线程池状态,否则循环继续尝试 CAS。c = ctl.get();if (runStateOf(c) != rs)continue retry;}// 。。。 第二段代码}
代码(1)是检测线程池状态的代码,通过逻辑运算,可以将其转换成以下的格式
s >= SHUTDOWN && (rs != SHUTDOWN ||firstTask != null ||workQueue.isEmpty())
这段逻辑的含义是这样的:
- 如果当前线程池状态
s
大于或等于SHUTDOWN
(即线程池已经开始关闭或已经关闭)。 - 然后进一步检查以下情况之一:
- 重新检查的状态
rs
不是SHUTDOWN
,表示线程池的状态可能已经改变。 firstTask
不为空,意味着有一个新的任务尝试提交。- 任务队列是空的。
- 重新检查的状态
当上述条件都满足时,通常表示线程池不应接受新的任务,因此会拒绝任务的提交。
然后,线程在循环中不断通过 CAS 操作来尝试修改 WorkCount 的值(ctl 的一部分),如果修改成功,则继续执行下一部分代码。
// 3)CAS 成功了boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 3.1)创建 workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 3.2)加锁,防止并发问题final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// 3.3)重新检查线程池的状态,线程池处于 SHUTDOWN 方法会停用if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 如果线程被启用,线程工厂创建的线程出了问题,直接抛出异常if (t.isAlive())throw new IllegalThreadStateException();// 3.4)添加一个任务执行单位workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// 3.5)如果添加任务成功就执行任务if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
这一部分代码是成功通过 CAS 添加 Worker 的个数后执行的,但是现在任务还没有正式开始执行。在将 worker 添加到 workers 之前通过独占锁保证了线程安全。
最终,当添加成功的时候,就会启动工作线程。
3.3)工作线程执行
当用户提交任务到线程池之后,是通过 Worker 来执行的,Worker 是 ThreadPoolExecutor 中的一个内部类,它的构造方法是这样的:
Worker(Runnable firstTask) {setState(-1); // 在调用 runWorkker 方法前禁止中断this.firstTask = firstTask;// 通过线程工厂来创建一个线程this.thread = getThreadFactory().newThread(this);}
在构造方法的时候设置 Worker 的状态为 -1,是为了避免 Worker 在调用 runWorker 方法之前被中断,当其他线程调用线程池 shutdownNow 的时候,如果 Worker 状态大于等于零则会被中断。这里设置了 -1 就不会被中断了。
而当执行 runWorker 方法的时候,会将其 state 设置为 0,此时就可以被中断了。
Worker
类实现了 Runnable 接口,也就是实现了 run()
方法,这个方法的内部调用的就是 runWorker()
方法来完成线程执行逻辑。
/** Delegates main run loop to outer runWorker */public void run() { runWorker(this); }
下面来看一下 runWorker
方法的具体实现:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许 Worker 被中断boolean completedAbruptly = true;try {// 循环执行任务,有任务的时候会一直循环while (task != null || (task = getTask()) != null) {w.lock();// (1)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行前的准备工作beforeExecute(wt, task);Throwable thrown = null;try {task.run(); // 执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;// 统计当前的线程完成了多少任务w.completedTasks++;w.unlock(); }}completedAbruptly = false;} finally {// 清理工作processWorkerExit(w, completedAbruptly);}}
先去判断是否有任务需要执行,如果有就通过 task.run() 方法来执行(task 也是实现了 Runnable 接口的对象),在执行任务之前会 Worker 加上锁,这样说为了避免在执行任务期间,其他线程调用 shutdown 方法导致正在执行的任务被中断,shutdown 方法的逻辑的只中断被阻塞挂起的线程,这个在后面分析源码的时候会提到。
上面的 (1)代码是处理线程池的停止和线程的中断逻辑的,它的判断逻辑是这样的:
这个条件判断分为两部分:
runStateAtLeast(ctl.get(), STOP)
检查线程池的状态是否至少是STOP
状态,这意味着有请求停止线程池。(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
检查当前线程是否已被中断,并且线程池状态也是STOP
。Thread.interrupted()
会清除当前线程的中断状态并返回之前中断的状态。 如果以上任意条件满足,并且工作线程wt
尚未被中断(!wt.isInterrupted()
),则执行下一行的中断操作。
当线程在循环中结束后,最终执行会清理工作,调用 processWorkerExit,其代码如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly)decrementWorkerCount();// 统计当前线程完成任务的个数,并且从工作集中删除当前的 Workerfinal ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}// 尝试设置线程状态为 TERMINATED,当前是 SHUTDOWN 状态并且工作// 队列为空的时候或者当前为 STOP 状态,线程池中没有活动线程tryTerminate();// 如果当前线程个数小于核心数目,增加新的 Workerint c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}
代码中先加入全局锁,然后统计线程池完成任务的个数,最后删除 Worker 对象。
然后尝试将线程池的状态设置为 TERMINATED,最终检测当前线程池中的线程个数是否小于核心线程数,如果是则会新增一个工作线程,并将其 firstTask 设置为 null。
3.4)shutdown 操作
调用 shutdown 方法后,线程池不会再接受新的任务了,但是工作队列中的任务还是会执行的,该方法会立刻返回,不会等待队列任务完成再返回。
public void shutdown() {// 锁,注意这个锁是 ThreadPoolExecutor 中的锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查当前线程是否有权限执行checkShutdownAccess();// 设置状态为 SHUTDOWN,如果已经是则直接返回advanceRunState(SHUTDOWN);// 给 Workers 设置中断标志interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 尝试中断tryTerminate();}
其中比较关键的方法是 advanceRunState()
方法和 interruptIdleWorkers()
两个方法,下面来分别看一下它们的实现:
private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();// 当前状态已经比 SHUTDOWN 还要高,也就是以下三种情况// STOP、TIDYING、TERMINATEDif (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}}// 用于判断线程池的运行状态(c)是否至少达到了给定的状态级别(s)private static boolean runStateAtLeast(int c, int s) {return c >= s;}
上面的方法中先判断当前状态是否已经达到了 SHUTDOWN,如果是直接返回,否则就通过 CAS 操作将其状态设置为 SHUTDOWN。
private void interruptIdleWorkers(boolean onlyOne) {// 上锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 如果线程没有被中断,并且获取锁成功,也就是说这个线程没有执行任务if (!t.isInterrupted() && w.tryLock()) {try {// 尝试中断线程t.interrupt();} catch (SecurityException ignore) {} finally {// unlock worker,将其 state 设置为 0w.unlock();}}if (onlyOne)// 如果只需要中断一个线程的话,释放锁break;}} finally {// 释放锁mainLock.unlock();}}
上面的方法接收一个布尔值参数 onlyOne 表明中断一个线程还是全部,然后在循环工作集,中断未被中断的线程,在中断线程的时候会使用 Worker 的独占锁,而正在执行任务的线程会持有自己的锁,无法被释放,所以这里其实只中断了未执行任务的锁。
3.5)shutdownNow() 方法
看完了不会中断执行任务线程的 shutdown 方法, shutdownNow 方法则会中断所有线程,无论它们是否在执行任务。
这个方法与前面的 shutdown 方法的区别就是调用的方法不通,在这里中断线程调用的是 interruptWorkers()
,会中断所有的线程。
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}
这里不会去尝试获取线程的锁,而是直接执行中断的方法,这里如果线程的 state 小于零(未启动的时候初始值为 -1),此时不会去中断线程。
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}