3.3.8 ThreadPoolExecutor的关闭方法
首先查看shutdownNow方法,可以从RUNNING状态转变为STOP
// shutDownNow方法,shutdownNow不会处理阻塞队列的任务,将任务全部给你返回了。
public List<Runnable> shutdownNow() {
// 声明返回结果
List<Runnable> tasks;
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 不关注这个方法……
checkShutdownAccess();
// 将线程池状态修改为STOP
advanceRunState(STOP);// 无论怎么,直接中断工作线程。
interruptWorkers();
// 将阻塞队列的任务全部扔到List集合中。
tasks = drainQueue();
} finally {
// 释放锁
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 将线程池状态修改为STOP
private void advanceRunState(int STOP) {
// 死循环。
for (;;) {
// 获取ctl属性的值
int c = ctl.get();
// 第一个判断:如果当前线程池状态已经大于等于STOP了,不管了,告辞。
if (runStateAtLeast(c, STOP) ||
// 基于CAS,将ctl从c修改为STOP状态,不修改工作线程个数,但是状态变为了STOP
// 如果修改成功结束
ctl.compareAndSet(c, ctlOf(STOP, workerCountOf(c))))
break;
}
}
// 无论怎么,直接中断工作线程。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历HashSet,拿到所有的工作线程,直接中断。
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// 移除阻塞队列,内容全部扔到List集合中
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
// 阻塞队列自带的,直接清空阻塞队列,内容扔到List集合
q.drainTo(taskList);
// 为了避免任务丢失,重新判断,是否需要编辑阻塞队列,重新扔到List
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
// 查看当前线程池是否可以变为TERMINATED状态
final void tryTerminate() {
// 死循环。
for (;;) {
// 拿到ctl
int c = ctl.get();
// 如果是RUNNING,直接告辞。
// 如果状态已经大于等于TIDYING,马上就要凉凉,直接告辞。
// 如果状态是SHUTDOWN,但是阻塞队列还有任务,直接告辞。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果还有工作线程
if (workerCountOf(c) != 0) {
// 再次中断工作线程
interruptIdleWorkers(ONLY_ONE);
// 告辞,等你工作线程全完事,我这再尝试进入到TERMINATED状态
return;
}
// 加锁,为了可以执行Condition的释放操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程池状态修改为TIDYING状态,如果成功,继续往下走
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {
// 这个方法是空的,如果你需要在线程池关闭后做一些额外操作,这里你可以自行实现
terminated();
} finally {
// 最终修改为TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。
// 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作
// 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
再次shutdown方法,可以从RUNNING状态转变为SHUTDOWN
shutdown状态下,不会中断正在干活的线程,而且会处理阻塞队列中的任务
public void shutdown() {
// 加锁。。
final ReentrantLock mainLock = this.mainLock;mainLock.lock();
try {
// 不看。
checkShutdownAccess();
// 里面是一个死循环,将线程池状态修改为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 说了,这个是为了ScheduleThreadPoolExecutor准备的,不管
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试结束线程
tryTerminate();
}
// 中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果线程没有中断,那么就去获取Worker的锁,基于tryLock可知,不会中断正在干活的线程
if (!t.isInterrupted() && w.tryLock()) {try {
// 会中断空闲线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
3.4 线程池的核心参数设计规则
线程池的使用难度不大,难度在于线程池的参数并不好配置。
主要难点在于任务类型无法控制,比如任务有CPU密集型,还有IO密集型,甚至还有混合型的。
因为IO咱们无法直接控制,所以很多时间按照一些书上提供的一些方法,是无法解决问题的。
《Java并发编程实践》
想调试出一个符合当前任务情况的核心参数,最好的方式就是测试。
需要将项目部署到测试环境或者是沙箱环境中,结果各种压测得到一个相对符合的参数。
如果每次修改项目都需要重新部署,成本太高了。
此时咱们可以实现一个动态监控以及修改线程池的方案。
因为线程池的核心参数无非就是:
● corePoolSize:核心线程数
● maximumPoolSize:最大线程数
● workQueue:工作队列
线程池中提供了获取核心信息的get方法,同时也提供了动态修改核心属性的set方法。
[image]
也可以采用一些开源项目提供的方式去做监控和修改
比如hippo4j就可以对线程池进行监控,而且可以和SpringBoot整合。
Github地址:https://github.com/opengoofy/hippo4j
官方文档:https://hippo4j.cn/docs/user_docs/intro
3.5 线程池处理任务的核心流程
基于addWorker添加工作线程的流程切入到整体处理任务的位置