线程池核心原理浅析

前言

由于系统资源是有限的,为了降低资源消耗,提高系统的性能和稳定性,引入了线程池对线程进行统一的管理和监控,本文将详细讲解线程池的使用、原理。


为什么使用线程池

池化思想

线程池主要用到了池化思想,池化思想在计算机领域十分常见,主要用于减少资源浪费、提高性能等。

池化思想主要包含以下几个方面:

fuxing

一些常见的资源池包括线程池、数据库连接池、对象池、缓存池、连接池等。

池化思想可以提高系统的性能,因为它减少了资源的创建和销毁次数,避免了不必要的开销。通过池化,系统可以更好地应对高并发情况,降低资源竞争,提高响应速度。

什么是线程池

根据池化思想,在一个系统中,为了避免线程频繁的创建和销毁,让线程可以复用,引入了线程池的概念。线程池中,总有那么几个活跃线程。

当你需要使用线程时,可以从池子中随便拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将这个线程退回到池子,方便其他人使用。

简单说就是,在使用线程池后,创建线程变成了从线程池中获得空闲线程,关闭线程编程了向池子里归还线程。

大致流程如下:

fuxing

## 为什么使用线程池 Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。

在开发过程中,合理地使用线程池能够带来3个好处。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

要做到合理利用线程池,必须对其实现原理了如指掌。

线程池的使用

fuxing

## ThreadPoolExecutor ThreadPoolExecutor 的创建方法总体来说可分为 2 种:

  • 通过 ThreadPoolExecutor 构造函数
  • 通过 Executors 类创建

通过构造函数

1.1. 入参含义

这个也是推荐使用的方法,因为通过 Executors 类创建可能会导致 OOM,如下图阿里开发规范中的描述。

fuxing

构造函数入参:

 
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

构造函数入参含义:

fuxing

1.2. 阻塞队列

workQueue 可选的 BlockingQueue:

fuxing

1.3. 拒绝策略

fuxing

如下图,上述拒绝策略均实现 RejectedExecutionHandler 接口,且为 ThreadPoolExecutor 的内部类。

fuxing

若以上策略仍无法满足实际应用需要,完全可以自已扩展 RejectedExecutionHandler 接口。

 
public interface RejectedExecutionHandler {
/**
* @param r 当前请求执行的任务
* @param executor 当前的线程池
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

示例:

 
public class RejectedExecutionDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(new Date() + ":Thread ID is" + Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask myTask = new MyTask();
ExecutorService executorService = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
(r, executor) -> System.out.println(r.hashCode() + "is discard")
);
for (int i = 0; i < 100; i++) {
executorService.submit(myTask);
Thread.sleep(10);
}
}
}

上述示例中,mytask 执行需要花费100毫秒,因此,必然会导致一些任务被直接丢弃。在实际应用中,我们可以将更详细的信息记录到日志中,来分析任务丢失情况和系统负载。

fuxing

通过 Executors

Executors 类扮演着线程池工厂的角色,通过该类可以取得一个拥有定功能的线程池。

该类可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
2.1. FixedThreadPool

固定线程数的线程池,该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂时存在任务队列中,待有线程空闲时,在处理队列中的任务。

FixedThreadPool 使用的无界任务队列 LinkedBlockingQueue,可能造成内存泄露。

 
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
2.2. SingleThreadExecutor

只有一个工作线程的线程池,当多于 1 个任务被提交时,会存到任务队列中。该线程池使用的无界任务队列 LinkedBlockingQueue,可能造成内存泄露。

 
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
2.3. CachedThreadPool

根据实际情况调整线程数的线程池,线程池的线程数量不确定,若有空闲线程可复用,则会优先使用。若所有线程均在工作,此时新的任务则会创建新的线程优先处理。所有线程在任务执行完毕后,将返回线程池进行复用。

corePoolSize 被设置为0,maximumPoolSize 被设置为无界,存活时间设置为 60s,空闲线程超过60秒后将会被
终止。极端情况线程创建过多,会导致内存泄露。

 
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

ScheduledThreadPoolExecutor

简介

如下图, ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor,它主要用来定期执行任务,功能与 Timer 类似且更加强大,可以在构造函数中指定多个对应的后台线程数。

fuxing

使用

可通过 Executors 创建,源码如下:

 
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

这里的返回值是 ScheduledExecutorService,根据时间对线程进行调度。有三个主要方法:

 
public interface ScheduledExecutorService extends ExecutorService {
/**
* 给定时间对任务进行调度
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
/**
* 周期性对任务进行调度
* 以第一个任务的开始时间 initialDelay + period
* 第一个任务在 initialDelay + period 执行
* 第二个任务在 initialDelay + period * 2 执行
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 周期性对任务进行调度
* 上一个任务结束后,再经过 period 时间开始执行
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}

如果任务遇到异常,那么后续的所有子任务都会停止调度,因此,必须保证异常被及时处理,为周期性任务的稳定调度提供条件。

ForkJoinPool

fork 是开启子进程,join 是等待,意思是分支子进程结束后才能得到结果,实际开发中,若频繁的 fork 开启线程可能严重影响系统性能,所以引入了 ForkJoinPool。

大致流程是,向 ForkJoinPool 线程池中提交一个 ForkJoinTask 任务,就是将任务分解成多个小任务,等任务全部完成后进行处理,这里采用了分治的思想,具体我将在后续单独展开,这里不多做赘述。

ForkJoin 可能出现两个问题:

  1. 子线程积累过多,可能导致系统性能严重下降;
  2. 调用层次过深,可能导致栈溢出。

线程池的任务提交

execute()

该方法用于提交不需要返回值的任务,且无法判断任务是否被线程池执行成功。

源码见下面的线程池原理章节。

submit()

该方法用于提交需要返回值的任务。线程池会返回 Future 对象,可以判断任务是否执行成功,还可以通过 Future 的get()方法来获取返回值。

get()方法会阻塞当前线程直到任务完成,还可以设置超时时间,到时立即返回,不过这时有可能任务没有执行完。

 
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

线程池的关闭

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。

它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt() 来中断线程,所以无法响应中断的任务可能永远无法终止。

两种方法存在一定的区别,shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,表示线程池关闭成功,这时调用isTerminaed方法会返回true。

至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。

线程池执行原理

执行源码

 
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前工作线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 添加核心线程去执行任务,成功则return
if (addWorker(command, true))
return;
// 添加失败,ctl有变化,需重新获取
c = ctl.get();
}
// 判断是否为RUNNING,此时核心线程数已满,需加入任务队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 检查若不是RUNNING则将任务从队列移除
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 正常则添加一个非核心空线程,执行队列中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 表示核心线程满了,队列也满了,创建非核心线程,执行任务
else if (!addWorker(command, false))
// 最大线程数也满了,走拒绝策略
reject(command);
}

流程图

fuxing


参考:
[1] 魏鹏. Java并发编程的艺术.
[2] 葛一鸣/郭超. 实战Java高并发程序设计.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/9056.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【计算机科学速成课】笔记一

文章目录 写在前面1.计算机的早期历史2.电子计算机3.布尔运算和逻辑门4.二进制5.算术逻辑单元-ALU6.寄存器和内存 写在前面 所有的一切源于这样一个网站——CS自学指南。 这是新手小白入门计算机科学必要了解的知识——【计算机科学速成课】[40集全/精校] - Crash Course Comp…

Dragonfly 拓扑的路由算法

Dragonfly 拓扑的路由算法 1. Dragonfly 上的路由 (1)最小路由(2)非最小路由 2. 评估3. 存在问题 (1)吞吐量限制(2)较高的中间延迟 references Dragonfly 拓扑的路由算法 John Kim, William J. Dally 等人在 2008 年的 ISCA 中提出技术驱动、高度可扩展的 Dragonfly 拓扑。而…

在做题中学习(53): 寻找旋转数组中的最小值

153. 寻找旋转排序数组中的最小值 - 力扣&#xff08;LeetCode&#xff09; 解法&#xff1a;O(logn)->很可能就是二分查找 思路&#xff1a;再看看题目要求&#xff0c;可以画出旋转之后数组中元素的大小关系&#xff1a; 首先&#xff0c;数组是具有二段性的(适配二分查…

数据库(MySQL)—— 索引

数据库&#xff08;MySQL&#xff09;—— 索引 什么是索引创建索引使用 CREATE INDEX 语句使用 ALTER TABLE 语句在创建表时定义索引特殊类型索引注意事项 举个例子无索引的情况有索引的情况为什么索引快索引的结构 今天我们来看看MySQL中的索引&#xff1a; 什么是索引 MyS…

财政部、交通运输部:推动北斗导航等新技术与交通基础设施融合

财政部、交通运输部&#xff1a;推动北斗导航等新技术与交通基础设施深度融合 近日&#xff0c;为深入贯彻落实中共中央、国务院关于加快建设交通强国、数字中国等决策部署&#xff0c;推进公路水路交通基础设施数字转型、智能升级、融合创新&#xff0c;加快发展新质生产力&a…

VisualGDB:Linux动态库项目创建、编译及库的使用

此篇接上篇 《VisualGDB:为Linux项目添加系统依赖库》,在本篇中我们重点分享一下如何基于VisualGDB 在VS中创建Linux动态库项目,如何编译及使用创建的动态库。 一、VisualGDB创建Linux动态库项目 如下,我们创建一个Linux下的动态库项目MyMath 二、编译动态库 我们稍微…

哈夫曼编码python算法实现(图片版)

一、问题&#xff1a; 请使用哈夫曼编码方法对给定的字符串&#xff0c;进行编码&#xff0c;以满足发送的编码总长度最小&#xff0c;且方便译码。“AABBCCDDEEABCDDCDBAEEAAA” 二、过程&#xff1a; 三、结果&#xff1a;

手动实现简易版RPC(四)

手动实现简易版RPC(四) 往期内容 手动实现简易版RPC&#xff08;一&#xff09;&#xff1a;RPC简介及系统架构 手动实现简易版RPC&#xff08;二&#xff09;&#xff1a;简单RPC框架实现 手动实现简易版RPC(三)&#xff1a;mock数据生成 前言 接上几篇博客我们实现了最…

【6D位姿估计】FoundationPose 跑通demo 训练记录

前言 本文记录在FoundationPose中&#xff0c;跑通基于CAD模型为输入的demo&#xff0c;输出位姿信息&#xff0c;可视化结果。 然后分享NeRF物体重建部分的训练&#xff0c;以及RGBD图为输入的demo。 1、搭建环境 方案1&#xff1a;基于docker镜像&#xff08;推荐&#xf…

重置密码之后无法ssh登录

背景描述 我这边有个服务器S&#xff0c;我从ServerA可以ssh上去&#xff0c;但是我从堡垒机B无法ssh上去&#xff1b;一开始以为是密码问题&#xff0c;手动重置密码&#xff0c;但是依然无法登录进去&#xff1b;一直提示密码错误&#xff1b;改了好几次密码都不行 问题原因…

5.9号模拟前端面试10问

5.9号模拟前端面试10问 1.html语义化的理解 HTML语义化是指使用具有明确含义的HTML标签来描述内容&#xff0c;而不仅仅是使用<div>和<span>等通用容器标签。语义化的HTML代码更易于阅读和维护&#xff0c;同时也有助于搜索引擎优化&#xff08;SEO&#xff09;。…

达梦数据库限制用户登录IP测试

达梦数据库创建用户时可以限制登录ip和时间段。 创建测试测试用户 create user test1 identified by Test_1234 ALLOW_IP "192.168.100.101"; 限定该用户只能通过192.168.100.101地址登录数据库 连接测试 上图可见&#xff0c;192.168.100.101客户端可以连接上19…

wish、亚马逊怎么给店铺引流?怎么运用自养号测评提高流量的转化率?

作为全球知名的跨境电商平台&#xff0c;wish、亚马逊为卖家提供了一个拓展海外市场的机会。然而&#xff0c;在wish、亚马逊平台上建立和经营一家成功的店铺需要有效的引流策略。那么&#xff0c;Wish、亚马逊怎样才能给店铺引流呢&#xff1f; 一、Wish、亚马逊怎么给店铺引…

C++学习笔记——仿函数

文章目录 仿函数——思维导图仿函数是什么仿函数的优势理解仿函数仿函数的原理举例 仿函数——思维导图 仿函数是什么 使用对象名调用operator&#xff08;&#xff09;函数看起来像是在使用函数一样&#xff0c;因此便有了仿函数的称呼&#xff1b;仿函数存在的意义是&#x…

javaMail快速部署——发邮件喽~

目录 功能阐述 前序步骤 &#xff08;1&#xff09;到QQ邮箱中获取到授权码 代码实现 坑 今天在写一个修改密码的功能的时候要用到邮箱的发送&#xff0c;然后因为这个项目比较老旧了&#xff0c;采用的是javaWeb和jsp的配置&#xff0c;对于我只使用过springBoot整合的ja…

苹果新款 M4 芯片专注于 AI

爆炸性消息&#xff01;苹果的新一代 M4 芯片来了&#xff01;这家伙拥有 38 万亿次操作的超强神经引擎&#xff0c;速度比苹果 A11 芯片的 NPU 快 60 倍&#xff01;虽然它的速度还没有达到 Snapdragon X Elite 的 45 TOPS&#xff0c;但苹果自夸 M4 将提供与最新 PC 芯片相同…

带你入门React

目录 前言一&#xff0c;基本配置1.1 环境搭建1.2 页面初始化渲染二&#xff0c;基础学习2.1 结构与样式开发2.2 数据展示2.3 行内样式2.4 条件渲染2.5 列表渲染2.6 点击事件 三&#xff0c;页面更新3.1 组件数据3.2 组件数据共享 总结 前言 笔者之前的工作经验都局限于Vue&am…

ICode国际青少年编程竞赛- Python-2级训练场-for循环中的变量

ICode国际青少年编程竞赛- Python-2级训练场-for循环中的变量 1、 for i in range(4):Dev.turnLeft()# 将i1作为Dev移动的步数Dev.step(i 1)2、 for i in range(4):Spaceship.step(i 1)Dev.step(3)Dev.step(-3)3、 for i in range(5):Dev.step(5 - i)Dev.turnRight()4、 …

开源文档管理系统Paperless-ngx如何在Linux系统运行并发布至公网

文章目录 1. 部署Paperless-ngx2. 本地访问Paperless-ngx3. Linux安装Cpolar4. 配置公网地址5. 远程访问6. 固定Cpolar公网地址7. 固定地址访问 Paperless-ngx是一个开源的文档管理系统&#xff0c;可以将物理文档转换成可搜索的在线档案&#xff0c;从而减少纸张的使用。它内置…

【Pytorch】1.读取训练数据集

导入Dataset类 from torch.utils.data import Dataset # 注意是Dataset&#xff08;大写&#xff09;的才是类通过jupyter我们可以阅读一下Dataset类的具体使用方法 help(Dataset) # 或者直接 Dataset??我们可以看到具体对Dataset类的解释 从蓝色字体我们可以得出 所有的代…