Java多线程篇(3)——线程池

文章目录

  • 线程池
  • ThreadPoolExecutor源码分析
    • 1、如何提交任务
    • 2、如何执行任务
    • 3、如何停止过期的非核心线程
    • 4、如何使用拒绝策略
  • ScheduledThreadPoolExecutor源码分析

线程池

快速过一遍基础知识
7大参数
corePoolSize : 核心线程数
maximumPoolSize: 最大线程数
keepAliveTime: 空闲线程存活时间
TimeUnit: 时间单位
BlockingQueue:任务队列
ThreadFactory: 创建线程的工厂
RejectedExecutionHandler:拒绝策略

拒绝策略
AbortPolicy:中止策略,线程池会抛出异常并中止执行此任务;
CallerRunsPolicy:把任务交给添加此任务的(main)线程来执行;
DiscardPolicy:忽略此任务,忽略最新的一个任务;
DiscardOldestPolicy:忽略最早的任务,最先加入队列的任务。

内置的线程池
SingleThreadExecutor(单线程):1 - 1 - Interge.MAX(核心线程-最大线程-队列长度)
FixedThreadPool(固定大小):N - N - Interge.MAX
CachedThreadPool(缓存):0 - Integer.MAX - 0
ScheduledThreadPool(定时):线程池的另一个关于定时的分支

为什么不推荐使用内置的线程池?
SingleThreadExecutor和FixedThreadPool无法控制队列长度可能导致OOM ,而CachedThreadPool无法控制线程数量可能导致大量的线程创建。


ThreadPoolExecutor源码分析

先不考虑ScheduledThreadPool,后面再单独说明定时线程池。

1、如何提交任务

ThreadPoolExecutor#execute

 public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//新建核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//加入阻塞队列if (isRunning(c) && workQueue.offer(command)) {//双重检测int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);//如果当前没有正在运行的线程,则新增一个非核心线程(任务为null,表示线程的任务将会从阻塞队列中获取)else if (workerCountOf(recheck) == 0)addWorker(null, false);}//新建非核心线程else if (!addWorker(command, false))reject(command);}

也就是
在这里插入图片描述
submit和execute的区别
在这里插入图片描述
其实没啥太大的区别,submit最后也是调用的execute,只不过在调用之前封装了task为FutureTask,表示有返回值的任务,最后将返回值返回
不过有一点需要注意的是。FutureTask,不仅会返回结果,还会把原本runnable中的异常吃了。所以submit提交的任务如果抛异常了,外部是无法感知的
FutureTask#run
在这里插入图片描述
测试结果
在这里插入图片描述

2、如何执行任务

ThreadPoolExecutor#addWorker

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))return false;for (;;) {//COUNT_MASK掩码,舍去前3位(因为前3位是状态位,后面的才是任务数)if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();if (runStateAtLeast(c, SHUTDOWN))continue retry;}}//上面主要是ctl++,其他很多都是检测boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建一个worker,封装了firstTask//(worker也实现了Runnable,相当于对firstTask封装了一层)w = new Worker(firstTask);//这里线程的runable实现是worker而不是firstTaskfinal Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int c = ctl.get();//一些检测if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive())throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//Thread.start()->runnable.run()也就是worker.run()->runWorker(worker)t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

addWorker新建worker对象,封装了新建的线程对象和原始task。线程的执行调用如下:
thread.start()->runnable.run()也就是worker.run()->runWorker(worker)
在这里插入图片描述

ThreadPoolExecutor#runWorker

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//worker的task为null(addWorker传入的参数)则从阻塞队列中获取一个taskwhile (task != null || (task = getTask()) != null) {w.lock();//检测是否需要中止线程if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {//执行前回调beforeExecute(wt, task);try {//执行任务task.run();//执行后回调afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// finally 调用processWorkerExit(w, completedAbruptly);}}

所以runWorker就是如果worker手上有task,就先把手头上的task执行了,然后再(循环)去阻塞队列获取task执行。如果没有就直接去阻塞队列获取task执行。

那么 finally 那里的 processWorkerExit 是干嘛用的?

执行到processWorkerExit要么就是异常情况跳出循环(completedAbruptly=true),要么就是worker手上和阻塞队列均没有task跳出循环(completedAbruptly=false)。

private void processWorkerExit(Worker w, boolean completedAbruptly) {//如果是异常退出的,此时workerCount还没调整,所以需要工作线程数减1if (completedAbruptly)decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();//更新 完成任务数,以及移除workertry {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//尝试终止线程tryTerminate();int c = ctl.get();//如果不是异常退出,则根据配置计算需要的最小工作线程数//如果是异常退出,或者当前工作线程小于上面根据配置计算的最小工作线程//则都用一个新worker来替换原来的workerif (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return;}//启动一个worker替换原来的workeraddWorker(null, false);}}

总之这段代码的主要作用是在工作线程退出时,更新线程池的状态、计数,以及根据配置来决定是否需要新的worker替代退出的工作线程,以保持线程池的正常运行。

3、如何停止过期的非核心线程

答案在getTask()。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 一些退出的状态就直接返回if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);//是否需要超时淘汰boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//在确保当workQueue不为空时至少有一个工作线程的前提下//来淘汰超出 maximumPoolSize 或者超时的线程if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//阻塞获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//标记超时timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

其实线程池并没有标记谁是核心线程,谁是非核心线程,只关心核心线程和非核心线程的数量。也就是说无论是哪个线程在获取任务时都有可能被标记为timeOut,并且每次获取任务都会根据核心线程数,最大线程数,当前线程数,timeout标记等判断是否需要当前worker,如果不需要就返回null,跳出runWorker的循环,进而结束线程。

4、如何使用拒绝策略

在提交任务的时候,如果addWorker失败就会进入拒绝策略的逻辑。

 public void execute(Runnable command) {//...//加入阻塞队列if (isRunning(c) && workQueue.offer(command)) {//...if (! isRunning(recheck) && remove(command))//双重检测失败进入拒绝策略reject(command);//...               }//新建非核心线程else if (!addWorker(command, false))//非核心线程添加失败,进入拒绝策略reject(command);
}final void reject(Runnable command) {handler.rejectedExecution(command, this);
}

ScheduledThreadPoolExecutor源码分析

.schedule():延迟执行,只执行一次。
.scheduleAtFixedRate():固定频率执行,按照固定的时间间隔来调度任务。
.scheduleWithFixedDelay():固定延迟执行,在上一次任务完成后的固定延迟之后再次执行任务。

无论是哪种都会先将task封装成 ScheduledFutureTask,然后调用 delayedExecute
scheduleAtFixedRate为例:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0L)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),//scheduleWithFixedDelay与scheduleAtFixedRate的区别就只在这里//scheduleWithFixedDelay 传的是 -unit.toNanos(period)//后续会根据这个值的正负来判断是固定频率还是固定延迟unit.toNanos(period),sequencer.getAndIncrement());//封装成 ScheduledFutureTask RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;//调用 delayedExecutedelayedExecute(t);return t;}

delayedExecute

private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {//task添加到队列//这同样也是自己实现的一个延迟队列,大概的逻辑就是:先按时间排,如果时间一样就按插入的顺序排。super.getQueue().add(task);//一些检测if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false);else//保证有足够的woker正在工作ensurePrestart();}}void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)//addWorker跟就上面的是一样的了addWorker(null, true);else if (wc == 0)addWorker(null, false);}

那么凭什么将Worker的task封装成 ScheduledFutureTask 能起到持续调用的效果,来看看他的 run 方法。
ScheduledFutureTask#run

        public void run() {//一些检测if (!canRunInCurrentRunState(this))cancel(false);//如果不是周期性任务就只调用一次(period不为0则表示不是周期性任务)else if (!isPeriodic())super.run();//如果是周期性任务就在调用完之后//设置下次调用时间并将任务放回队列且保证有足够的woker正在工作else if (super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}

ScheduledFutureTask#setNextRunTime

        private void setNextRunTime() {long p = period;//根据period的正负来区分是固定频率还是固定延迟if (p > 0)time += p;elsetime = triggerTime(-p);}

ScheduledThreadPoolExecutor#reExecutePeriodic

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(task)) {//放回队列super.getQueue().add(task);if (canRunInCurrentRunState(task) || !remove(task)) {//保证有足够的woker正在工作ensurePrestart();return;}}task.cancel(false);}

所以ScheduledThreadPoolExecutor的总体框架设计和上面的ThreadPoolExecutor是一样的(毕竟是他的子类)。
最主要的区别在于ScheduledThreadPoolExecutor里worker使用的task是自己内部实现的 ScheduledFutureTask 类,而该类的run方法在执行完后会设置下一次的执行时间并将任务放回队列中等待执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/82850.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java基于SpringBoot的藏区特产销售系统的研究与实现

今天为大家带来的是基于 Java SpringBootVue 的藏区特产销售系统&#xff0c;大家有兴趣的可以看一下 博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W,Csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 文章目…

提升效率,实现组织与创造的完美结合——Agenda for Mac

现代快节奏的生活中&#xff0c;时间管理和任务协调变得尤为关键。如果您正在寻找一个功能全面、简洁易用的日历、任务和笔记管理工具&#xff0c;那么Agenda for Mac定会成为您的得力助手。这款强大的应用程序将帮助您提高工作效率&#xff0c;实现组织与创造的完美结合。 Ag…

北邮22级信通院数电:Verilog-FPGA(2)modelsim北邮信通专属下载、破解教程

北邮22信通一枚~ 跟随课程进度更新北邮信通院数字系统设计的笔记、代码和文章 持续关注作者 迎接数电实验学习~ 获取更多文章&#xff0c;请访问专栏&#xff1a; 北邮22级信通院数电实验_青山如墨雨如画的博客-CSDN博客 目录 1.下载 2.解压打开 3.modelsim初安装 4.…

Java代码质量评估工具

概述 Java代码的质量评估主要包括代码的可维护性、健壮性、以及在运行时能达到既定的性能目标&#xff0c;可维护性主要包括代码的可读性、在关键的代码上提供详细注释、在设计类、方法以及代码逻辑时符合设定的编码规范&#xff0c;健壮性主要包括编写代码时应使用常用的设计…

k8s(Kubernetes)集群部署--使用 kubeadm方式部署

k8s集群部署--使用 kubeadm方式部署 一、测试所需环境&#xff08;三台均要执行&#xff09;二、配置准备&#xff08;三台均要执行&#xff09;1. 重命名hostname、添加hosts2. 关闭防火墙、selinux与swap3. 添加网桥过滤及内核转发配置文件4.同步时间5.安装ipset及ipvsadm 三…

Mysql002:(库和表)操作SQL语句

目录&#xff1a; 》SQL通用规则说明 SQL分类&#xff1a; 》DDL&#xff08;数据定义&#xff1a;用于操作数据库、表、字段&#xff09; 》DML&#xff08;数据编辑&#xff1a;用于对表中的数据进行增删改&#xff09; 》DQL&#xff08;数据查询&#xff1a;用于对表中的数…

【Verilog教程】2.3 Verilog 数据类型

Verilog 最常用的 2 种数据类型就是线网&#xff08;wire&#xff09;与寄存器&#xff08;reg&#xff09;&#xff0c;其余类型可以理解为这两种数据类型的扩展或辅助。 线网&#xff08;wire&#xff09; wire 类型表示硬件单元之间的物理连线&#xff0c;由其连接的器件输…

十分钟理解OSPF路由协议

十分钟理解OSPF路由协议 1.RIP的缺陷以跳数为度量值最大跳数为15更新路由表采用全更新收敛速度慢 2.RIP与OSPF比较OSPF概述运行OSPF协议之前运行OSPF协议之后 3.OSPF协议工作过程1.发现邻居2.建立邻接关系3.传递链路状态信息4.计算路由 4.OSPF分区域管理 有RIP协议&#xff0c;…

Visual Studio Code配置开发Maven项目、Spring Boot项目

配置开发Maven项目、Spring Boot项目 配置全局配置项目配置注意 Maven项目开发安装插件创建项目启动项目 Spring Boot项目开发安装插件创建项目启动项目 其他插件 配置 全局配置 ctrlshiftp打开搜索setting.json&#xff0c;这个setting.json配置属于全局配置 配置全局的Java与…

Python配置与测试利器:Hydra + pytest的完美结合

简介&#xff1a;Hydra 和 pytest 可以一起使用&#xff0c;基于 Hydra Pytest 的应用可以轻松地管理复杂配置&#xff0c;并编写参数化的单元测试&#xff0c;使得Python开发和测试将变得更为高效。 安装&#xff1a; pip install hydra-core pytest案例源码&#xff1a;my…

毕业设计|基于stm32单片机的app视频遥控抽水灭火小车设计

基于stm32单片机的app视频遥控抽水灭火水泵小车设计 1、项目简介1.1 系统构成1.2 系统功能 2、部分电路设计2.1 L298N电机驱动电路设计2.2 继电器控制电路设计 3、部分代码展示3.1 小车控制代码3.1 水泵控制代码 4 演示视频及代码资料获取 1、项目简介 视频简介中包含资料http…

用selenium和xpath定位元素并获取属性值以及str字符型转json型

页面html如图所示&#xff1a; 要使用xpath定位这个div元素&#xff0c;并且获取其属性data-config的内容值。 from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Optionshost127.0.0.1 port10808 …

(图论) 1020. 飞地的数量 ——【Leetcode每日一题】

❓ 1020. 飞地的数量 难度&#xff1a;中等 给你一个大小为 m x n 的二进制矩阵 grid &#xff0c;其中 0 表示一个 海洋单元格、1 表示一个 陆地单元格。 一次 移动 是指从一个陆地单元格走到另一个相邻&#xff08;上、下、左、右&#xff09;的陆地单元格或跨过 grid 的边…

python基础语法(四)

感谢各位大佬对我的支持,如果我的文章对你有用,欢迎点击以下链接 &#x1f412;&#x1f412;&#x1f412;个人主页 &#x1f978;&#x1f978;&#x1f978;C语言 &#x1f43f;️&#x1f43f;️&#x1f43f;️C语言例题 &#x1f423;&#x1f413;&#x1f3c0;python 这…

9.19 QT作业

完成文本编辑器的保存工作 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include<QFontDialog> //字体对话框 #include<QFont> //字体类 #include<QMessageBox> //消息对话框 #inclu…

SpringMVC学习|JSON讲解、Controller返回JSON数据、Jackson、JSON乱码处理、FastJson

JSON讲解 JSON(JavaScript Object Notation,JS 对象标记)是一种轻量级的数据交换格式&#xff0c;目前使用特别 广泛。 采用完全独立于编程语言的文本格式来存储和表示数据。 简洁和清晰的层次结构使得 JSON成为理想的数据交换语言。 易于人阅读和编写&#xff0c;同时也易于机…

岩土工程安全监测中振弦采集仪连接振弦传感器时注意事项

岩土工程安全监测中振弦采集仪连接振弦传感器时注意事项 岩土工程安全监测是保障工程稳定和安全的重要手段之一&#xff0c;而振弦采集仪则是岩土工程安全监测的常用设备之一&#xff0c;可以用于实时监测地下水位、土体变形、岩体应力等。其中&#xff0c;振弦传感器是振弦采…

解决报错:npm ERR! code 1

我是 npm install --legacy-peer-deps 成功了 解决方案&#xff1a; 升级swiper库&#xff1a;你可以尝试升级你的项目中的swiper库到5.2.0或更高版本&#xff0c;以满足vue-awesome-swiper的需求。你可以使用以下命令来进行升级&#xff1a; npm install swiperlatest 注意…

Hadoop:YARN、MapReduce、Hive操作

目录 分布式计算概述 YARN概述 YARN架构 核心架构 辅助架构 MapReduce 概述 配置相关文件 提交MapReduce到YARN Hive Hive架构 Hive在VMware部署 Hive的启动 数据库操作 数据表操作 内部表操作 外部表操作 数据加载和导出 数据加载LOAD 数据加载 - INSERT SEL…

Flink sql 1.17笔记

环境准备 # 启动hadoop集群 # 启动Flink yarn session (base) [link999hadoop102 flink-1.17.0]$ bin/yarn-session.sh -d# 启动finksql客户端 (base) [link999hadoop102 flink-1.17.0]$ bin/sql-client.sh -s yarn-session# 如果有初始化文件 bin/sql-client.sh embedded -s …