源码角度看待线程池的执行流程

文章目录

  • 前言
  • 一、线程池的相关接口和实现类
    • 1.Executor接口
    • 2.ExecutorService接口
    • 3.AbstractExecutorService接口
    • 4.ThreadPoolExecutor 实现类
  • 二、ThreadPoolExecutor源码解析
    • 1.Worker内部类
    • 2.execute()方法
    • 3.addWorker()方法
  • 总结


前言

线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或者放入队列(工作队列)中等待。
线程池在多线程编程中扮演着重要角色,它能够管理和复用线程,提高并发执行效率。

在之前的学习中,我们知道了线程池的基本流程如下,而今天我们则用这个流程配合着源码的来重新分析线程池的执行流程。
在这里插入图片描述


一、线程池的相关接口和实现类

1.Executor接口

public interface Executor {/*** Executes the given command at some time in the future.  The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command);
}

Executor接口作为线程池技术中的顶层接口,它的作用是用来定义线程池中,用于提交并执行线程任务的核心方法:exuecte()方法。未来线程池中所有的线程任务,都将由exuecte()方法来执行。

2.ExecutorService接口

public interface ExecutorService extends Executor {//.....
}

ExecutorService接口继承了Executor接口,扩展了awaitTermination()、submit()、shutdown()等专门用于管理线程任务的方法。

3.AbstractExecutorService接口

public abstract class AbstractExecutorService implements ExecutorService {//....
}

ExecutorService接口的抽象实现类AbstractExecutorService,为不同的线程池实现类,提供submit()、invokeAll()等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()执行策略不同,所以在AbstractExecutorService并未提供该方法的具体实现。

4.ThreadPoolExecutor 实现类

public class ThreadPoolExecutor extends AbstractExecutorService {//...
}

ThreadPoolExecutor实现类是AbstractExecutorService接口的的两个重要实现类之一,(ForkJoinPool是另一个),也是要掌握的关于线程池的重点区域;
ThreadPoolExecutor线程池通过Woker工作线程、BlockingQueue阻塞工作队列 以及 拒绝策略实现了一个标准的线程池;


二、ThreadPoolExecutor源码解析

在对源码进行解析之前,我们先看看官方给我们关于ThreadPoolExecutor的解析是什么:

The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* …

这句话什么意思呢?
主池控制状态ctl是一个原子整数封装?

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

再结合ctl的实例化,我们这个发现这个ctl是一个具有原子性的整数,再往后就是告诉我们这个具有原子性的整数是由两个概念组成的:workerCount工作线程数和runState运行状态。
他是一个32位的整数,具体表示形式位:
在这里插入图片描述
所以说,它的作用就是通过位运算来存储线程池的状态和活动线程数信息

1.Worker内部类

每个Woker类的对象,都代表线程池中的一个工作线程。
Worker类是ThreadPoolExecutor类中定义的一个私有内部类,保存了每个Worker工作线程要执行的Runnable线程任务和Thread线程对象。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {	private static final long serialVersionUID = 6138294804551838833L;//用于存储当前工作线程的引用。final Thread thread;//用于存储该工作线程要执行的第一个任务。Runnable firstTask;//用于记录该工作线程已经完成的任务数量volatile long completedTasks;//初始化工作线程的状态,设置第一个任务并创建一个线程对象Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//Runnable 接口的 run 方法,调用了 runWorker(this),将工作线程自身作为参数传递给 runWorker方法public void run() {runWorker(this);}//因为继承了AbstractQueuedSynchronizer 类,一下方法都是基于线程安全的方法//.....       }

当ThreadPoolExecutor线程池,通过exeute()方法执行1个线程任务时,会调用addWorker()方法创建一个Woker工作线程对象。并且,创建好的Worker工作线程对象,会被添加到一个HashSet workders工作线程集合,统一由线程池进行管理。
当Worker工作线程,在第一次执行完成线程任务后,这个Worker工作线程并不会销毁,而是会以循环的方式,通过线程池的getTask()方法,获取阻塞工作队列中新的Runnable线程任务,并通过当前Worker工作线程中所绑定Thread线程,完成新线程任务的执行,从而实现了线程池的中Thread线程的重复使用

2.execute()方法

ThreadPoolExecutor线程池中,会通过execute(Runnable command)方法执行Runnable类型的线程任务。
在分析execute()方法之前,也来看看官方是怎么解释这个方法的:

    /** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/

简单来说就是将execute()方法分成三个步骤:
1.如果,工作线程的数量小于核心线程数,则通过addWorker()方法,创建新的Worker工作线程,并添加至workers工作线程集合;
2.如果一个任务可以成功排队(工作线程的数量大于核心线程数),并且线程池处于RUNNING状态,那么,线程池会将Runnable类型的线程任务,缓存至workQueue阻塞工作队列,等待某个空闲工作线程获取并执行该任务;
3.如果我们不能排队任务,那么我们尝试添加一个新的线程。如果它失败了,我们知道我们被关闭或饱和了,因此拒绝该任务。

源码:

/* *@param 要提交给线程池的任务
*/
public void execute(Runnable command) {//如果传入的任务为空,抛出空指针异常。if (command == null)throw new NullPointerException();//获取当前线程池的状态和活动线程数。int c = ctl.get();//如果当前线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {//如果可以通过addWorker方法创建一个新的工作线程来执行任务//因为当前线程数小于核心线程数,所以第二个参数穿入true代表创建的是核心线程if (addWorker(command, true))return;//创建成功,直接返回。//创建失败,获取更新后的线程池状态c = ctl.get();}//如果线程池状态是运行中且工作队列能够接受新任务if (isRunning(c) && workQueue.offer(command)) {//任务进入工作队列//重新获取线程池的状态和工作线程数int recheck = ctl.get();//如果线程池不是运行状态,则删除任务if (! isRunning(recheck) && remove(command))//执行拒绝策略reject(command);// 如果工作线程数等于零,通过addWorker()方法检查线程池状态和工作队列else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果无法将任务添加到队列中,也无法创建新的工作线程,那么拒绝任务的执行else if (!addWorker(command, false))reject(command);}

3.addWorker()方法

在分析execute()方法中,发现execute()方法多次调用了addWorker()创建一个工作线程,用于执行当前线程任务。
addWorker()可以分为两个执行部分,检查线程池的状态和工作线程数量和创建并执行工作线程。
第1部分:检查线程池的状态和工作线程数量

//参数:1.传入的任务,2.是否创建核心线程
private boolean addWorker(Runnable firstTask, boolean core) {// 循环检查线程池的状态,直到符合创建工作线程的条件,通过retry标签break退出retry:for (;;) {//获取线程池运行状态int c = ctl.get();int rs = runStateOf(c);//如果线程池处于开始关闭的状态(获取线程任务为空,同时工作队列不等于空)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;//检查工作线程数量for (;;) {//获取当前工作线程数int wc = workerCountOf(c);//如果工作线程数量如果超出线程池的最大容量或者核心线程数(最大线程数)//三元运算符表示的是当前要的是核心线程还是非核心线程if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//不再创建新的线程//通过ctl对象,将当前工作线程数量+1,并通过retry标签break退出外层循环if (compareAndIncrementWorkerCount(c))break retry;//再次获取线程池状态,检查是否发生变化c = ctl.get();  if (runStateOf(c) != rs)continue retry;//...}}}

第二部分:创建并执行线程工程

		//....//用于判断工作线程是否启动和保存boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建新工作线程,并通过线程工厂创建Thread线程w = new Worker(firstTask);//获取新工作线程的Thread线程对象,用于启动真正的线程final Thread t = w.thread;   if (t != null) {//获取线程池的reentrantLock主锁,保证线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//检查线程池运行状态int rs = runStateOf(ctl.get());//如果线程池状态小于关闭状态或者线程池状态为关闭且没有初始任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果工作线程已经在运行(存活),抛出非法线程状态异常。if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//保存工作线程workers.add(w);//记录线程池的最大工作线程数int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//正式启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {//如果工作线程没有成功启动,则调用添加失败的方法if (! workerStarted)addWorkerFailed(w);}//返回线程启动状态return workerStarted;

总结

execute()方法是ThreadPoolExecutor线程池执行的开始,它完整实现了Executor接口定义execute()方法,这个方法作用是执行一个Runnable类型的线程任务。整体的执行流程是:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/59226.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMq深度学习

什么是RabbitMq? RabbitMQ是一个开源的消息队列中间件&#xff0c;它实现了高级消息队列协议&#xff08;AMQP&#xff09;。它被广泛用于分布式系统中的消息传递和异步通信。RabbitMQ提供了一种可靠的、可扩展的机制来传递消息&#xff0c;使不同的应用程序能够相互之间进行…

Llama-2大模型本地部署研究与应用测试

最近在研究自然语言处理过程中&#xff0c;正好接触到大模型&#xff0c;特别是在年初chatgpt引来的一大波AIGC热潮以来&#xff0c;一直都想着如何利用大模型帮助企业的各项业务工作&#xff0c;比如智能检索、方案设计、智能推荐、智能客服、代码设计等等&#xff0c;总得感觉…

C语言网络编程实现广播

1.概念 如果同时发给局域网中的所有主机&#xff0c;称为广播 我们可以使用命令查看我们Linux下当前的广播地址&#xff1a;ifconfig 2.广播地址 以192.168.1.0 (255.255.255.0) 网段为例&#xff0c;最大的主机地址192.168.1.255代表该网段的广播地址&#xff08;具体以ifcon…

开源的经济影响:商业与社区的平衡

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

ChatGPT 一条命令总结Mysql所有知识点

想学习Mysql的同学,可以使用ChatGPT直接总结mysql所有的内容与知识点大纲 输入 总结Mysql数据库所有内容大纲与大纲细分内容 ChatGPT不光生成内容,并且直接完成了思维导图。 AIGC ChatGPT ,BI商业智能, 可视化Tableau, PowerBI, FineReport, 数据库Mysql Oracle, Offi…

K 次取反后最大化的数组和【贪心算法】

1005 . K 次取反后最大化的数组和 给你一个整数数组 nums 和一个整数 k &#xff0c;按以下方法修改该数组&#xff1a; 选择某个下标 i 并将 nums[i] 替换为 -nums[i] 。 重复这个过程恰好 k 次。可以多次选择同一个下标 i 。 以这种方式修改数组后&#xff0c;返回数组 可能…

word 调整列表缩进

word 调整列表缩进的一种方法&#xff0c;在试了其他方法无效后&#xff0c;按下图所示顺序处理&#xff0c;编号和文字之间的空白就没那么大了。 即右键word上方样式->点击修改格式->定义新编号格式->字体->取消勾选 “……对齐到网格”->确定

AndroidStudio3.5.2修改项目项目包名

公司项目要打造成产品进行演示&#xff0c;需要更换不同的包名进行安装在同一设备上&#xff0c;即所谓的马甲包 更改步骤基本一样 https://blog.csdn.net/qq_35270692/article/details/78336049 需要注意的是&#xff0c;按照上边的步骤修改完后&#xff0c;如果项目中有数据…

机器学习基础之《分类算法(4)—案例:预测facebook签到位置》

一、背景 1、说明 2、数据集 row_id&#xff1a;签到行为的编码 x y&#xff1a;坐标系&#xff0c;人所在的位置 accuracy&#xff1a;定位的准确率 time&#xff1a;时间戳 place_id&#xff1a;预测用户将要签到的位置 3、数据集下载 https://www.kaggle.com/navoshta/gr…

TCP数据报结构分析(面试重点)

在传输层中有UDP和TCP两个重要的协议&#xff0c;下面将针对TCP数据报的结构进行分析 关于UDP数据报的结构分析推荐看UDP数据报结构分析&#xff08;面试重点&#xff09; TCP结构图示 TCP报头结构的分析 一.16位源端口号 源端口表示发送数据时&#xff0c;发送方的端口号&am…

Flutter开发- iOS 问题CocoaPods not installed or not in valid state

解决问题方案&#xff1a; 1、先检查本机CocoaPods是否安装&#xff0c;通过gem list 查看是否安装 打开终端&#xff0c;执行gem list&#xff0c;出现图中的数据即为已安装。未安装看第4 步 2、已经安装了CocoaPods&#xff0c;还出现了图中的提示&#xff0c;你可能已经猜…

java内存模型讨论及案例分析

常用内存选项 -Xmx&#xff1a; 最大堆大小 -Xms&#xff1a;最小堆大小 -Xss &#xff1a;线程堆栈大小&#xff0c;默认1M 生产环境最好保持 Xms Xmx java内存研究 内存布局 可见&#xff1a; 堆大小 新生代 老年代&#xff0c;新生代EFrom SurvivorTo Survivor。新…

Particle Life粒子生命演化的MATLAB模拟

Particle Life粒子生命演化的MATLAB模拟 0 前言1 基本原理1.1 力影响-吸引排斥行为1.2 距离rmax影响 2 多种粒子相互作用2.1 双种粒子作用2.1 多种粒子作用 3 代码 惯例声明&#xff1a;本人没有相关的工程应用经验&#xff0c;只是纯粹对相关算法感兴趣才写此博客。所以如果有…

【已解决】Java 后端使用数组流 Array.stream() 将数组格式的 Cookie 转换成字符串格式

&#x1f389;工作中遇到这样一个场景&#xff1a;远程调用某个接口&#xff0c;该接口需要用户的 Cookie 信息进行权限认证&#xff0c;认证通过之后才可以打通并返回数据。 在后端拿到 httpServletRequest 后&#xff0c;调用 getCookies() 方法&#xff0c;返回的是一个 Coo…

WPF基础入门-Class6-WPF通知更改

WPF基础入门 Class6-WPF通知 1、显示页面&#xff1a; <Grid><StackPanel><TextBox Text"{Binding Name}"></TextBox><TextBox Text"{Binding Title}"></TextBox><Button Command"{Binding ShowCommand}&qu…

el-table动态生成多级表头的表格(js + ts)

展示形式&#xff1a; 详细代码&#xff1a; &#xff08;js&#xff09; <template><div><el-table :data"tableData" style"width: 100%"><el-table-column label"题目信息" align"center"><el-table-…

【C++】C++11的新特性(上)

引入 C11作为C标准的一个重要版本&#xff0c;引入了许多令人振奋的新特性&#xff0c;极大地丰富了这门编程语言的功能和表达能力。本章将为您介绍C11的一些主要变化和改进&#xff0c;为接下来的章节铺垫。 文章目录 引入 一、列表初始化 1、1 {} 初始化 1、2 std::initiali…

java 桥接模式

桥接模式 桥接模式简介桥接模式的实现总结 桥接模式简介 桥接模式&#xff08;Bridge&#xff09;是将抽象部分与它的实现部分分离&#xff0c;使它们都可以独立地变化。它是一种对象结构型模式&#xff0c;又称为柄体(Handle and Body)模式或接口(Interfce)模式。 桥接模式基于…

正则表达式 之 断言详解

正则表达式的先行断言和后行断言一共有 4 种形式&#xff1a; (?pattern) 零宽正向先行断言(zero-width positive lookahead assertion)(?!pattern) 零宽负向先行断言(zero-width negative lookahead assertion)(?<pattern) 零宽正向后行断言(zero-width positive lookb…

NEOVIM学习笔记

GitHub - blogercn/nvim-config: A pretty epic NeoVim setup 一直使用vim&#xff0c;每次到了新公司都要配置半天&#xff0c;而且常常配置失败&#xff0c;很多插件过期不好用。偶然看到别人的NEO VIM&#xff0c;就试着用了一下&#xff0c;感觉还不错。 用来开发和阅读C代…