引言
我们探讨了 ThreadPoolExecutor
的基本概念、内部机制以及部分源码实现。本文将继续深入研究该类的更多细节,并结合提供的文档内容,进一步解析线程池的工作流程、任务提交和执行的具体过程,以及如何通过自定义配置来优化性能。
一、线程池的任务提交与执行
1.1 execute() 方法剖析
execute()
方法是向线程池提交任务的主要入口点。当调用此方法时,ThreadPoolExecutor
首先会尝试将任务分配给现有的工作线程;如果所有线程都在忙,则根据当前状态决定是否创建新的线程或把任务放入队列中等待处理。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);} else if (!addWorker(command, false))reject(command);
}
- 检查核心线程数:首先检查当前工作线程数量是否小于核心线程数,如果是则尝试添加一个新的核心线程。
- 尝试入队:若核心线程已满,则尝试将任务放入任务队列中。如果成功,再做一次运行状态的检查以确保线程池仍然处于运行状态。
- 创建非核心线程:如果任务队列也满了,但总线程数还未达到最大限制,则创建一个非核心线程来执行新任务。
- 拒绝策略:如果以上条件都不满足,则根据预设的拒绝策略处理这个多余的任务。
1.2 addWorker() 方法详解
addWorker()
方法负责创建并启动新的 Worker
实例。它接收两个参数:一个是待执行的任务,另一个是指明是否为添加核心线程的布尔值。该方法的核心逻辑包括:
- 检查线程池的状态,确保其不是正在关闭或已经关闭。
- 尝试增加线程计数器(
ctl
),这涉及到对ReentrantLock
的获取和 CAS 操作。 - 创建一个新的
Worker
对象,并启动其内部线程。
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheckthrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
二、任务的执行与回收
2.1 runWorker() 方法解析
一旦 Worker
被创建并启动,它就会进入 runWorker()
方法中的无限循环,不断从任务队列中取出任务并执行。每个 Worker
都持有一个 firstTask
,这是它第一次要执行的任务;之后它会持续从队列中拉取新的任务直到线程池被关闭或没有更多任务为止。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
2.2 getTask() 方法分析
getTask()
方法用于从任务队列中获取下一个待执行的任务。它的实现考虑到了线程池的不同状态以及任务队列的特点,因此采用了较为复杂的逻辑来保证高效性和正确性。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
- 检查线程池状态:首先确认线程池是否处于可以接受新任务的状态。
- 减少线程计数:如果线程池正在停止或者任务队列为空,则直接减少工作线程计数并返回
null
。 - 判断是否允许超时回收:基于配置和当前线程数,决定是否应该让空闲线程超时退出。
- 尝试获取任务:根据是否允许超时选择不同的阻塞队列操作——
poll()
或take()
。如果成功获取到任务,则返回该任务供Worker
执行;否则继续循环等待。
三、线程池的关闭与清理
3.1 shutdown() 和 shutdownNow()
shutdown()
方法平滑地关闭线程池,不再接受新的任务,但在完成所有已提交的任务后才会完全终止。而 shutdownNow()
则试图立即停止所有正在执行的任务,并返回尚未开始的任务列表。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();
}public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}
3.2 tryTerminate() 方法
tryTerminate()
方法是一个辅助函数,用来尝试彻底终止线程池。它会在每次调用 shutdown()
或 shutdownNow()
后被触发,检查是否有任何剩余的工作需要完成,如果没有则最终关闭线程池。
private void tryTerminate() {Retry:for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}}
}
四、总结
通过对 ThreadPoolExecutor
更深层次的源码解析,我们可以看到它是如何巧妙地利用锁、条件变量和原子操作来实现高效的线程管理和任务调度。理解这些内部机制不仅有助于我们在实际项目中更好地使用线程池,还能启发我们在设计其他并发组件时借鉴类似的技术方案。希望这篇文章能帮助读者更加全面地掌握这一重要的 Java 并发工具,并为其后续的学习和实践提供坚实的基础。