ThreadPoolExecutor源码详解

Java中的线程池

谈到池化,通常我们会想到线程池、连接池、对象池等池子。通常池化都会有以下这些共同的好处:

  1. 减少资源的消耗。在线程池中重复利用已经创建的线程可以减少创建和销毁的损耗。
  2. 方便资源的管理。在线程池中我们可以制定线程数量、什么情况下会创建线程、线程满了怎么拒绝等。

在Java线程池通常指的是ThreadPoolExecutor

类的声明

public class ThreadPoolExecutor extends AbstractExecutorService {}

构造方法

ThreadPoolExecutor提供以下几个构造方法:

  • public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
  • public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
  • public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
  • public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

实际上所有的构造方法都是最后一个构造方法的重载:

  • 当没有提供线程工厂ThreadFactory时,使用Executors.defaultThreadFactory
  • 当没有提供拒绝策略RejectedExecutionHandler时,使用defaultHandler, 这个defaultHandler对应的是抛弃策略AbortPolicy
    // 抛弃策略private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

这里虽然提供了很多的参数,但是需要了解这些参数的作用需要往下看ThreadPoolExecutor的执行策略。这里先给一个概述:

  • int corePoolSize: 核心线程池大小
  • int maximumPoolSize: 最大线程池大小
  • long keepAliveTime, TimeUnit unit: 空闲线程最大存活时间
  • ThreadFactory threadFactory: 线程工厂
  • BlockingQueue<Runnable> workQueue: 阻塞队列
  • RejectedExecutionHandler handler: 拒绝策略

线程池的状态和工作线程数量

在开始看任务执行前,我们需要先看一个原子变量ctl,这个变量隐含了两个概念一个是工作线程的数量,另一个是线程池的状态。
这两个值通过运算组合在一起,其中低位保存工作线程数量,高位保存状态。这里高位和低位的分隔是第29位,这意味着最多只能有2^29 - 1大概500万个工作线程。

线程池的状态包含以下几种:

  • RUNNING(运行中): 接受新任务和处理队列中的任务
  • SHUTDOWN(): 不再接受新任务,但是仍然会执行队列中的任务
  • STOP(停止): 不接受新任务,不执行队列中的任务,中断正在进行的任务
  • TIDYING(清理): 所有任务都执行完成,workerCount为0,线程过渡为TIDYING状态执行terminated()钩子方法
  • TERMINATED(): terminated已经执行

状态的改变:

  • RUNNING -> SHUTDOWN: 调用shutdown()
  • RUNNING or SHUTDOWN) -> STOP: 调用shutdownNow()
  • SHUTDOWN -> TIDYING: 当队列和线程池都空了
  • STOP -> TIDYING: 当线程池空了
  • TIDYING -> TERMINATED: 当terminated方法执行完毕
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~COUNT_MASK; }private static int workerCountOf(int c)  { return c & COUNT_MASK; }private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}private void decrementWorkerCount() {ctl.addAndGet(-1);}

任务的执行

execute概念讲解前,需要先厘清两种线程池,一种称为核心线程池,另一种称为最大线程池。注意这里虽然是这么叫,但是实际上只有一个线程池,corePoolSize可以想成是一个边界。

0 ... corePoolSize ... maximumPoolSize

边界里的劳工干到死除非有让他停下的理由,边界外的闲了一段时间就放假。

当提交一个任务给线程池时,会按以下的逻辑执行:

  1. 如果工作线程数量小于corePoolSize,那么核心线程池添加一个工作线程执行任务。
  2. 如果线程池还在运行,那么添加到工作队列。这里会有一个双重检测再判断一次线程池的状态。如果线程池不在运行状态了,从队列移除执行拒绝策略。如果工作线程数量为0那么添加一个到最大线程池。
  3. 如果既无法添加到线程池,又无法添加到队列中,执行拒绝策略。
    public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

工作者线程

添加工作者线程有两个参数,第一个Runnable firstTask意味着这个线程第一个执行的任务,这个值可能是null意味着启动一个工作者线程但是不需要直接执行任务,而是到队列之类的地方拿,后面getTask会分析。
第二个参数意味着这个工作线程是否是添加到核心线程池。前面也说了其实只有一个池子,corePoolSize只是相当于中间的边界,从下面方法的实现我们也可以看出来,当coretrue时如果线程数量超过corePoolSize则添加失败。

    private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded = true;int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

方法的实现中前面标签retry部分是判断能不能添加,后面部分是添加的实际过程。

  • 添加一个工作者线程,这个过程会从线程池中创建一个新的线程。
  • 将这个工作线程添加进HashSet<Worker> workers中,注意本身HashSet是线程不安全的,所以实际上所有对它的操作都在持有锁的情况下进行。
  • 更新largestPoolSize
  • 启动线程。如果失败调用addWorkerFailed

启动线程实际执行的run方法:

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */public void run() {runWorker(this);}}

工作者线程不断从队列中(getTask())拿任务来执行。如果发现没有任务了就尝试自我毁灭(processWorkerExit())了。
工作者线程在执行的时候会先给自己上个排他锁,所以当workerisLocked的时候意味着正在执行任务。
任务的执行分为:beforeExecute(wt, task); -> task.run(); -> afterExecute(task, ex);,判断中断线程在beforeExecute之前。

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

getTask会不断的尝试在队列中拿任务,除非超时或者终止线程池或者队列空了。

    private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

拒绝策略

当线程池和阻塞队列无法执行任务时,由拒绝策略决定任务何去何从。

public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

接口RejectedExecutionHandler定义了rejectedExecution方法,该方法接受两个参数一个是待执行的任务,另一个是当前线程池。调用的地方在ThreadPoolExecutor中的:

    final void reject(Runnable command) {handler.rejectedExecution(command, this);}

其中内置了以下几种策略:

  • CallerRunsPolicy: 调用者线程执行
  • AbortPolicy: 抛弃且抛出异常
  • DiscardPolicy: 抛弃无异常
  • DiscardOldestPolicy: 抛弃最老的任务执行当前的任务

CallerRunsPolicy 调用者线程执行

如果线程池没有shutdown则执行。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}

AbortPolicy 抛弃并抛异常策略

这是默认的执行方式,拒绝执行并抛出一个异常。

    public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}

DiscardPolicy 抛弃不抛异常策略

抛弃但是不做任何事情。

    public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}

DiscardOldestPolicy 抛弃队列头的任务

抛弃队列中最老的任务然后执行当前任务。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}

这里在Javadoc有提到以下的实现,这个实现优于DiscardOldestPolicy,它会触发被抛弃任务的回调或者记录一些额外的信息:

     new RejectedExecutionHandler() {public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {Runnable dropped = e.getQueue().poll();if (dropped instanceof Future<?>) {((Future<?>)dropped).cancel(false);// also consider logging the failure}e.execute(r);  // retry

线程池监控

  • getPoolSize(): 返回当前线程池中的线程数量
  • getActiveCount(): 返回线程池中(大约)活跃线程的数量
  • getLargestPoolSize(): 返回线程池中最多的时候运行的线程数量
  • getTaskCount(): 返回(大约)提交过的任务数量
  • completedTaskCount(): 返回(大约)执行完成的任务数量

线程池预热

  • prestartCoreThread: 预热一个核心线程
  • prestartAllCoreThreads: 预热所有核心线程

设置核心线程池数量

动态设置核心线程数量,如果当前核心线程数大于设置值,那么中断那些空闲的线程。当前核心线程数小于设置值,那么预热差值或队列大小的小值。

    public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0 || maximumPoolSize < corePoolSize)throw new IllegalArgumentException();int delta = corePoolSize - this.corePoolSize;this.corePoolSize = corePoolSize;if (workerCountOf(ctl.get()) > corePoolSize)interruptIdleWorkers();else if (delta > 0) {// We don't really know how many new threads are "needed".// As a heuristic, prestart enough new workers (up to new// core size) to handle the current number of tasks in// queue, but stop if queue becomes empty while doing so.int k = Math.min(delta, workQueue.size());while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())break;}}}

动态设置最大线程池数量

    public void setMaximumPoolSize(int maximumPoolSize) {if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)throw new IllegalArgumentException();this.maximumPoolSize = maximumPoolSize;if (workerCountOf(ctl.get()) > maximumPoolSize)interruptIdleWorkers();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/34606.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

echarts 图表设置 滚动条

效果图&#xff1a; 代码实现&#xff1a; 第一种方式&#xff1a; 通过 dataZoom 属性缩放进行配置滚动条。 //给x轴设置滚动条 dataZoom: [{start:0,//默认为0end: 100-1500/31,//默认为100type: slider,show: true,xAxisIndex: [0],handleSize: 0,//滑动条的 左右2个滑…

【数据结构•堆】轮廓线

题目描述 轮廓线   • 每一个建筑物用一个三元组表示(L, H, R), 表示左边界, 高度和右边界。   • 轮廓线用X, Y, X, Y…这样的交替式表示。   • 右图的轮廓线为: (1, 11, 3, 13, 9, 0, 12, 7, 16,3, 19, 18, 22, 3, 23, 13, 29, 0) 。   • 给N个建筑&#xff0c;求…

初步制作做一个AI智能工具网站,持续更新

文章目录 介绍AI对话AI绘画AI音视频AI图片处理AI小工具体验 介绍 网页有五大部分&#xff1a;AI对话、AI绘画、AI音视频、AI 图片处理、AI小工具。 AI对话 AI对话是指人工智能技术在模拟人类对话交流方面的应用。通过使用自然语言处理和机器学习算法&#xff0c;AI对话系统可…

SpringBoot复习:(40)@EnableConofigurationProperties注解的用法

一、配置文件&#xff1a; server.port9123 二、配置类&#xff1a; package cn.edu.tju.config;import com.mysql.fabric.Server; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.context.properties.EnableConfigu…

‘大数据技术与应用’和‘数据科学与大数据技术’有什么区别

一、侧重点不同 ‘大数据技术与应用’主要侧重于大数据的存储、处理和分析技术、包括数据挖掘、机器学习、数据仓库、分布式计算等方面的研究&#xff0c;旨在开发大数据相关的应用程序和系统&#xff0c;以满足商业和企业的需求。 ‘数据科学与大数据技术’则更加注重数据本…

Linux 终端命令之文件浏览(3) less

Linux 文件浏览命令 cat, more, less, head, tail&#xff0c;此五个文件浏览类的命令皆为外部命令。 hannHannYang:~$ which cat /usr/bin/cat hannHannYang:~$ which more /usr/bin/more hannHannYang:~$ which less /usr/bin/less hannHannYang:~$ which head /usr/bin/he…

Vue: el-form 自定义校验规则

Vue 的 el-form 组件可以使用自定义校验规则进行表单验证。自定义校验规则可以通过传递一个函数来实现&#xff0c;该函数接受要校验的字段的值作为参数&#xff0c;并返回一个布尔值或一个 Promise 对象。 下面是一个示例&#xff0c;演示如何在 el-form 中使用自定义校验规则…

Python“牵手”京东工业商城商品详情数据方法介绍

京东工业平台&#xff08;imall.jd.com&#xff09;是一个 B2B 电商平台&#xff0c;提供了丰富的工业品类商品&#xff0c;涵盖了机械、化工、建材、劳保用品等品类。如果您需要采集京东工业平台的商品详情数据&#xff0c;可以尝试以下步骤&#xff1a; 选定目标品类和 SKU …

433. 最小基因变化

基因序列可以表示为一条由 8 个字符组成的字符串&#xff0c;其中每个字符都是 A、C、G 和 T 之一。 假设我们需要调查从基因序列 start 变为 end 所发生的基因变化。一次基因变化就意味着这个基因序列中的一个字符发生了变化。 例如&#xff0c;"AACCGGTT" -->…

React组件实例的三大属性

React组件实例的三大属性分别是&#xff1a;state、props和refs。 State属性&#xff1a;用来存储组件内部的状态&#xff0c;只能在组件内部修改。当state被修改时&#xff0c;React会重新渲染组件。 Props属性&#xff1a;用来传递父组件的数据到子组件中&#xff0c;是组件…

Redis_哨兵模式

9. 哨兵模式 9.1 简介 当主库宕机&#xff0c;在从库中选择一个&#xff0c;切换为主库。 问题: 主库是否真正宕机?哪一个从库可以作为主库使用?如何实现将新的主库的信息通过给从库和客户端&#xff1f; 9.2 基本流程 哨兵主要任务&#xff1a; 监控选择主库通知 会有…

Win11中使用pip或者Cython报错 —— error: Microsoft Visual C++ 14.0 is required.

第一步&#xff1a;下载Visual Studio 2019 下载地址&#xff1a; https://learn.microsoft.com/zh-cn/visualstudio/releases/2019/release-notes 第二步&#xff1a;安装组件 选择单个组件&#xff0c;勾选以下两个组件 其他错误&#xff1a; 无法打开文件“python37.li…

AWS上传私有windows server2019镜像64位

一.制作自己的镜像 我使用的是esxi&#xff0c;建立一个windows虚拟机&#xff0c;开启。 根据aws官方文档&#xff0c;虚拟机里的系统重要需要注意以下几点&#xff1a; 1.只有一张网卡&#xff0c;ip获取配置成dhcp。 2.关闭系统防火墙。 3.开启windows rdp 远程功能。 …

iOS开发-WebRTC本地直播高分辨率不显示画面问题

iOS开发-WebRTC本地直播高分辨率不显示画面问题 在之前使用WebRTC结合ossrs进行推流时候&#xff0c;ossrs的播放端无法看到高分辨率画面问题。根据这个问题&#xff0c;找到了解决方案。 一、WebRTC是什么 WebRTC是什么呢&#xff1f; WebRTC (Web Real-Time Communicatio…

【ES】笔记-函数参数默认值

函数参数默认值 ES6 允许给函数参数赋值初始值 1. 形参初始值 具有默认值的参数&#xff0c;一般放到最后 function add(a,b,c10){return abc}let resultadd(1,2);console.log(result);2. 与解构赋值结合 function connect({host"127.0.0.1",username,password,port…

FlinkCDC能读取到快照,但是无法输出更新数据

FlinkCDC能读取到快照&#xff0c;但是无法输出更新数据 发现是并行度问题&#xff0c;当我的并行度超过1的时候就无法捕获更新。 public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "lcq");Configurati…

python selenium如何保存网站的cookie用于下次自动登录

## 一、python selenium如何保存网站的cookie 使用Selenium保存网站的Cookie非常简单。下面是一个示例&#xff0c;展示了如何使用Selenium打开网站&#xff0c;然后保存获取到的Cookie&#xff1a; from selenium import webdriver# 初始化浏览器 browser webdriver.Chrome…

数据结构与算法-数组(附阿里面试题)

一 面试经典&#xff1a; 给你一个文件里面包含全国人民&#xff08;14亿&#xff09;的年龄数据&#xff08;0~180&#xff09;&#xff0c;现在要你统计每一个年龄 有多少人&#xff1f; 给定机器为 单台2CPU2G内存。不得使用现成的容器&#xff0c;比如map等。&am…

多个 Github 账户访问 Github

文章目录 多个 Github 账户访问 Github背景步骤 参考 多个 Github 账户访问 Github 背景 如果我想在这台电脑上同时使用两个 Github 账号怎么办呢&#xff1f; 你主机上的 SSH 公钥只能标识出一个账号。如果需要使用另外一个git账号&#xff0c;访问仓库&#xff0c;你需要创…

Java基础篇--String 类

Java中的String类是用于处理字符串的核心类之一。它属于Java的标准库&#xff0c;并提供了许多操作字符串的方法。 String类是不可变的&#xff0c;这意味着一旦创建了一个String对象&#xff0c;它的值就不能被改变。当对字符串进行操作时&#xff0c;实际上是创建了一个新的…