Java 线程池 ThreadPoolExecutor 底层原理与源码分析

引言

我们探讨了 ThreadPoolExecutor 的基本概念、内部机制以及部分源码实现。本文将继续深入研究该类的更多细节,并结合提供的文档内容,进一步解析线程池的工作流程、任务提交和执行的具体过程,以及如何通过自定义配置来优化性能。

一、线程池的任务提交与执行
1.1 execute() 方法剖析

execute() 方法是向线程池提交任务的主要入口点。当调用此方法时,ThreadPoolExecutor 首先会尝试将任务分配给现有的工作线程;如果所有线程都在忙,则根据当前状态决定是否创建新的线程或把任务放入队列中等待处理。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);} else if (!addWorker(command, false))reject(command);
}
  • 检查核心线程数:首先检查当前工作线程数量是否小于核心线程数,如果是则尝试添加一个新的核心线程。
  • 尝试入队:若核心线程已满,则尝试将任务放入任务队列中。如果成功,再做一次运行状态的检查以确保线程池仍然处于运行状态。
  • 创建非核心线程:如果任务队列也满了,但总线程数还未达到最大限制,则创建一个非核心线程来执行新任务。
  • 拒绝策略:如果以上条件都不满足,则根据预设的拒绝策略处理这个多余的任务。
1.2 addWorker() 方法详解

addWorker() 方法负责创建并启动新的 Worker 实例。它接收两个参数:一个是待执行的任务,另一个是指明是否为添加核心线程的布尔值。该方法的核心逻辑包括:

  • 检查线程池的状态,确保其不是正在关闭或已经关闭。
  • 尝试增加线程计数器(ctl),这涉及到对 ReentrantLock 的获取和 CAS 操作。
  • 创建一个新的 Worker 对象,并启动其内部线程。
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheckthrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
二、任务的执行与回收
2.1 runWorker() 方法解析

一旦 Worker 被创建并启动,它就会进入 runWorker() 方法中的无限循环,不断从任务队列中取出任务并执行。每个 Worker 都持有一个 firstTask,这是它第一次要执行的任务;之后它会持续从队列中拉取新的任务直到线程池被关闭或没有更多任务为止。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
2.2 getTask() 方法分析

getTask() 方法用于从任务队列中获取下一个待执行的任务。它的实现考虑到了线程池的不同状态以及任务队列的特点,因此采用了较为复杂的逻辑来保证高效性和正确性。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
  • 检查线程池状态:首先确认线程池是否处于可以接受新任务的状态。
  • 减少线程计数:如果线程池正在停止或者任务队列为空,则直接减少工作线程计数并返回 null
  • 判断是否允许超时回收:基于配置和当前线程数,决定是否应该让空闲线程超时退出。
  • 尝试获取任务:根据是否允许超时选择不同的阻塞队列操作——poll()take()。如果成功获取到任务,则返回该任务供 Worker 执行;否则继续循环等待。
三、线程池的关闭与清理
3.1 shutdown() 和 shutdownNow()

shutdown() 方法平滑地关闭线程池,不再接受新的任务,但在完成所有已提交的任务后才会完全终止。而 shutdownNow() 则试图立即停止所有正在执行的任务,并返回尚未开始的任务列表。

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();
}public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}
3.2 tryTerminate() 方法

tryTerminate() 方法是一个辅助函数,用来尝试彻底终止线程池。它会在每次调用 shutdown()shutdownNow() 后被触发,检查是否有任何剩余的工作需要完成,如果没有则最终关闭线程池。

private void tryTerminate() {Retry:for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}}
}
四、总结

通过对 ThreadPoolExecutor 更深层次的源码解析,我们可以看到它是如何巧妙地利用锁、条件变量和原子操作来实现高效的线程管理和任务调度。理解这些内部机制不仅有助于我们在实际项目中更好地使用线程池,还能启发我们在设计其他并发组件时借鉴类似的技术方案。希望这篇文章能帮助读者更加全面地掌握这一重要的 Java 并发工具,并为其后续的学习和实践提供坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/66765.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql--基础篇--视图,存储过程,触发器

1、视图 MySQL视图&#xff08;View&#xff09;是一个虚拟表&#xff0c;其内容由查询定义。同真实的表一样&#xff0c;视图包含一系列带有名称的列和行数据。但是&#xff0c;视图并不在数据库中以存储的数据值集形式存在。行和列数据来自由定义视图的查询所引用的表&#…

某团 mtgsig1.2 | sdkVersion: 3.0.0 签名算法分析记录(2025/1/9)

【作者主页】&#xff1a;小鱼神1024 【擅长领域】&#xff1a;JS逆向、小程序逆向、AST还原、验证码突防、Python开发、浏览器插件开发、React前端开发、NestJS后端开发等等 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;不提供完整代码&#…

(二)最长公共子序列、最长上升子序列、最大子段和、三角形最小路径和、矩阵连乘、0-1背包

最近刚考完算法设计分析课的考试&#xff0c;复习总结一下期末考试的几道算法题吧 目录 LCR 095. 最长公共子序列 300. 最长递增子序列 53. 最大子数组和 LCR 100. 三角形最小路径和 矩阵连乘问题 0-1背包 LCR 095. 最长公共子序列 给定两个字符串 text1 和 text2&#xff…

聚类系列 (二)——HDBSCAN算法详解

在进行组会汇报的时候&#xff0c;为了引出本研究动机&#xff08;论文尚未发表&#xff0c;暂不介绍&#xff09;&#xff0c;需要对DBSCAN、OPTICS、和HDBSCAN算法等进行详细介绍。在查询相关资料的时候&#xff0c;发现网络上对于DBSCAN算法的介绍非常多与细致&#xff0c;但…

通义灵码在跨领域应用拓展之物联网篇

目录 一.引言 二.通义灵码简介 三.通义灵码在物联网领域的设备端应用 1.传感器数据采集 (1).不同类型传感器的数据读取 (2).数据转换与预处理 2.设备控制指令接收和执行 (1).指令解析与处理 (2).设备动作执行 四.通义灵码在物联网领域的云端平台应用 1.数据存储和管…

python_excel列表单元格字符合并、填充、复制操作

读取指定sheet页&#xff0c;根据规则合并指定列&#xff0c;填充特定字符&#xff0c;删除多余的列&#xff0c;每行复制四次&#xff0c;最后写入新的文件中。 import pandas as pd""" 读取指定sheet页&#xff0c;根据规则合并指定列&#xff0c;填充特定字…

C++ STL 中的 vector 总结

1. 什么是 std::vector&#xff1f; std::vector 是 C STL 提供的动态数组容器&#xff0c;可以动态调整大小并存储任意类型的元素。 与普通数组相比&#xff0c;std::vector 更加灵活&#xff0c;提供了丰富的操作接口。 2. 基本特性 动态大小&#xff1a;支持在运行时动态增…

DolphinScheduler自身容错导致的服务器持续崩溃重大问题的排查与解决

01 问题复现 在DolphinScheduler中有如下一个Shell任务&#xff1a; current_timestamp() { date "%Y-%m-%d %H:%M:%S" }TIMESTAMP$(current_timestamp) echo $TIMESTAMP sleep 60 在DolphinScheduler将工作流执行策略设置为并行&#xff1a; 定时周期调度设置…

【stm32+K210项目】基于K210与STM32协同工作的智能垃圾分类系统设计与实现(完整工程资料源码)

视频效果演示: 基于K210与STM32协同工作的智能垃圾分类系统设计与实现 目录: 目录 视频效果演示: 目录:

CISAW-ES应急服务方向信息安全事件分级

网络安全事件事件分级 网络安全事件分为四级&#xff1a;特别重大网络安全事件、重大网络安全事大网络安全事件、一般网络安全事件。 1&#xff0e;特别重大网络安全事件 符合下列情形之一的&#xff0c;为特别重大网络安全事件。 &#xff08;1&#xff09;重要网络和信息系…

油猴支持阿里云自动登陆插件

遇到的以下问题,都已在脚本中解决: 获取到的元素赋值在页面显示,但是底层的value并没有改写,导致请求就是获取不到数据元素的加载时机不定,尤其是弱网情况下,只靠延迟还是有可能获取不到,且登陆不丝滑,通过元素发现机制,解决此问题并做到丝滑登陆根据密钥计算校验码之…

简聊MySQL的顺序读写和随机读写

在MySQL数据库中&#xff0c;顺序读写和随机读写的应用区别主要体现在以下几个方面&#xff1a; 一、定义回顾 顺序读写&#xff1a;数据按照物理地址的连续性进行读写操作&#xff0c;通常用于处理大型文件或连续的数据块。随机读写&#xff1a;数据分散在磁盘的不同位置进行…

[Git] git reset --hard / git reset --soft

git reset --hard 功能&#xff1a;重置索引&#xff08;暂存区&#xff09;和工作目录到指定的提交状态。这意味着它会丢弃所有未提交的更改和已暂存的更改。 适用场景&#xff1a;当你想要完全放弃当前工作目录中的所有更改并回退到某个特定提交状态时&#xff0c;可以使用这…

C语言基本知识复习浓缩版:scanf函数

C语言基本知识复习浓缩版&#xff1a;scanf函数 1 scanf()函数用于读取用户的键盘输入 2 scanf()函数的基本形式&#xff1a;scanf("参数列表1",参数列表2): 参数列表1&#xff1a;用户键盘输入的数据的类型&#xff0c;用占位符表示。 参数列表2&#xff1a;用户键…

Ubuntu 24.04 LTS系统安装Docker踩的坑

一开始我跟着Docker给出的官网文档 Ubuntu | Docker Docs 流程走&#xff0c;倒腾了两个多小时&#xff0c;遇到了各种坑&#xff0c;最后放弃了。在我们使用脚本安装Docker命令前&#xff0c;我们先把已经安装的Docker全部卸载掉。 卸载Docker 1.删除docker及安装时自动安装…

B树与B+树:数据库索引的秘密武器

想象一下&#xff0c;你正在构建一个超级大的图书馆&#xff0c;里面摆满了各种各样的书籍。B树和B树就像是两种不同的图书分类和摆放方式&#xff0c;它们都能帮助你快速找到想要的书籍&#xff0c;但各有特点。 B树就像是一个传统的图书馆摆放方式&#xff1a; 1. 书籍摆放&…

城市生命线安全综合监管平台

【落地产品&#xff0c;有需要可留言联系&#xff0c;支持项目合作或源码合作】 一、建设背景 以关于城市安全的重要论述为建设纲要&#xff0c;聚焦城市安全重点领域&#xff0c;围绕燃气爆炸、城市内涝、地下管线交互风险、第三方施工破坏、供水爆管、桥梁坍塌、道路塌陷七…

关于内网外网,ABC类地址,子网掩码划分

本文的三个关键字是&#xff1a;内网外网&#xff0c;ABC类地址&#xff0c;子网掩码划分。围绕以下问题展开&#xff1a; 如何从ip区分外网、内网&#xff1f;win和linux系统中&#xff0c;如何查询自己的内网ip和外网ip。开发视角看内外网更多是处于安全考虑&#xff0c;接口…

后端Java开发:第十二天

第十二天&#xff1a;封装 - 理解与应用 欢迎来到今天的学习内容&#xff01;今天&#xff0c;我们将一起深入探讨 Java 中的 封装&#xff08;Encapsulation&#xff09;。封装是面向对象编程的四大基本特性之一&#xff0c;它的核心思想是把对象的状态&#xff08;属性&…

成为LabVIEW自由开发者

成为LabVIEW自由开发者的体验可以非常丰富且具有挑战性&#xff0c;同时也充满了自我成长和多样化项目的机会。 ​ 1. 高度的灵活性与自由度 工作时间与地点&#xff1a;作为自由开发者&#xff0c;你可以自由选择工作时间和地点。你可以在家工作&#xff0c;也可以选择在咖啡…