Java并发编程-线程池

Java并发编程-线程池

  • 线程池运行原理
  • 线程池生命周期
  • 线程池的核心参数
  • 线程池的阻塞队列
  • 线程池的拒绝策略
  • 线程池的种类
    • newFixedThreadPool
    • newSingleThreadExecutor
    • newCachedThreadPool
    • newScheduledThreadPool
  • 创建线程池
    • jdk的Executors(不建议,会导致OOM)
    • jdk的ThreadPoolExecutor(阿里开发手册推荐)
    • Spring内置的ThreadPoolTaskExecutor(开发常用)

线程池(Thread Pool) 是一种并发编程技术,用于管理一组线程,以便复用这些线程来执行多个任务。使用线程池的核心目的就是用来减少线程的创建和销毁的开销,从而能提高系统的响应性能,同时线程池对线程的管理也能避免线程创建过多导致内存溢出。

线程池运行原理

线程池运行原理

线程池生命周期

线程池生命周期

  1. running(运行状态): 会接收新任务并且会处理队列中的任务
  2. shutdown(关闭状态): 不会接收新任务并且会处理队列中的任务,任务处理完之后会中断所有线程
  3. stop(停止状态): 不会接收新任务并且不会处理队列中的任务,并且会直接中断所有线程
  4. tidying(整理状态): 所有线程都停止之后,线程池的状态就会转为tidying,一旦达到此状态,就会调用线程池的terminated(),此方法内部是空的,可由程序员自定义
  5. terminated(终止状态): terminated()执行完之后就会转变为terminated状态

线程池的核心参数

new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)
  1. corePoolSize(核心线程数): 即使空闲也不会被回收的线程数量;
  2. maximumPoolSize(最大线程数): 线程池允许创建的最大线程数(含核心线程);
  3. keepAliveTime(空闲线程存活时间): 非核心线程空闲时的存活时间(超时后回收);
  4. unit(时间单位): keepAliveTime 的时间单位(如秒、毫秒);
  5. workQueue(任务队列): 用于缓存未执行任务的阻塞队列;
  6. ThreadFactory(线程工厂): 这是一个接口,用于创建新的线程,可自定义线程创建方式;
  7. RejectedExecutionHandler(拒绝策略): 当任务队列满且线程数达到上限时的处理策略;

应用程序大致可分为两种类型,IO密集型任务和CPU密集型任务。
IO密集型任务,一般指文件读写、DB读写、网络请求等,核心线程数大小设置为2N+1
CPU密集型任务,一般指计算型代码、Bitmap转换、Gson转换等,特点是高并发,任务执行时间短,核心线程数大小设置为N+1,可减少线程上下文的切换。
可通过这段代码获取CPU的逻辑线程数 int poolSize = Runtime.getRuntime().availableProcessors();

线程池的阻塞队列

workQueue:当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务。

  1. ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO
  2. LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO
  3. DelayedWorkQueue:一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的
  4. SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
LinkedBlockingQueueArrayBlockingQueue
默认无界,支持有界强制有界
底层是链表底层是数组
是懒惰的,创建节点的时候添加数据提前初始化 Node 数组
入队会生成新 NodeNode需要是提前创建好的
两把锁(头尾)一把锁

线程池的拒绝策略

  1. AbortPolicy: 默认的拒绝策略,当任务无法提交的时候就会抛出 RejectedExecutionException 异常
  2. CallerRunsPolicy: 当任务无法提交的时候就会把这个任务交给调用线程执行,这样就能避免任务被丢弃,但是有可能会导致调用者线程被阻塞
  3. DiscardPolicy: 当任务无法提交时,直接丢弃该任务,不做任何处理,如果任务不重要就可以用这个拒绝策略直接丢弃
  4. DiscardOldestPolicy: 当任务无法提交时,丢弃任务队列中最旧的任务,然后尝试重新提交当前任务,适合在需要尽快处理新任务的情况下使用

线程池的种类

newFixedThreadPool

Executors.newFixedThreadPool(nThreads); 创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待,特点如下:

  1. 核心线程数和最大线程数相同,意味者线程池能处理的任务就是 核心线程数 + 队列长度
  2. 队列使用了 LinkedBlockingQueue,长度没有设置,意味者里面用了一个无界队列,需要注意任务过多导致的内存溢出的问题
  3. 适用于任务量已知,相对耗时的任务
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

Executors.newSingleThreadExecutor(); 创建一个单线程的线程池,所有任务将按照提交的顺序依次执行,特点如下:

  1. 核心线程数为 1,最大线程数是 1,意味者只有一个线程
  2. 队列使用了 LinkedBlockingQueue,容量没有限制,需要注意任务过多导致的内存溢出的问题
  3. 适用于对执行顺序有要求的任务
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool

Executors.newCachedThreadPool(); 创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程,特点如下:

  1. 核心线程数为 0,最大线程数是 Integer.MAX_VALUE,这意味着能处理的任务数没有限制,同时创建出来的线程 60S 内没有处理任务就会被回收掉
  2. 队列使用了 SynchronousQueue,不存放任务,需要注意线程创建过多导致的内存溢出的问题
  3. 适合处理大量短期任务,也就是任务数比较密集,但每个任务执行时间较短的情况
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

newScheduledThreadPool

Executors.newScheduledThreadPool(corePoolSize); 创建一个可调度任务的线程池,适用于需要延迟执行或定期执行任务的场景,特点如下:

  1. 核心线程数可设置,最大线程数是 Integer.MAX_VALUE,意味者可以接收的任务没有限制
  2. 队列使用了 DelayedWorkQueue,支持延迟执行和定期执行任务,需要注意线程过多导致的内存溢出的问题
  3. 适合用于执行一些定时任务的场景,比如定时提醒
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// scheduled 提交任务到线程池中。
// 参数一,提交的任务;参数二,任务执行的延迟时间;参数三,时间单位
scheduled.schedule(() -> {System.out.println("1234");
}, 10, TimeUnit.SECONDS);

创建线程池

jdk的Executors(不建议,会导致OOM)

创建方式参考【线程池的种类】这一章节。Executors返回的线程池对象的弊端如下:

  1. FixedThreadPool和SingleThreadPool
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
  2. CachedThreadPool和ScheduledThreadPool
    允许的创建线程数量为 Iteger.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

jdk的ThreadPoolExecutor(阿里开发手册推荐)

int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 1;
TimeUnit unit = TimeUnit.MINUTES;
// 阻塞队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
// 线程池工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 异常处理策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
// 手动构造线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

使用线程池,并发获取数据并,整合到一起


import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;public class ApplicationMainTest {// 手动构造线程池private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));public static void main(String[] args) {// 使用线程安全的 ConcurrentLinkedDeque 存储查询结果集ConcurrentLinkedDeque<List<Integer>> resultList = new ConcurrentLinkedDeque<>();threadPoolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 开始工作...");List<Integer> queryList = buildQueryData(); // 模拟数据查询操作System.out.println(queryList);resultList.offer(queryList);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成!!!");});threadPoolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 开始工作...");List<Integer> queryList = buildQueryData(); // 模拟数据查询操作System.out.println(queryList);resultList.offer(queryList);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成!!!");});// 关闭线程池threadPoolExecutor.shutdown();while (!threadPoolExecutor.isTerminated()) { // 检查所有任务是否完成System.out.println("等待所有任务完成...");try {Thread.sleep(100); // 每100毫秒检查一次} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("所有任务已完成");System.out.println("分隔符==============");// 汇总结果List<Integer> mergedList = new ArrayList<>();for (List<Integer> list : resultList) {mergedList.addAll(list);}System.out.println("Merged List: " + mergedList);}private static List<Integer> buildQueryData() {List<Integer> list = new ArrayList<>();Random rand = new Random();for (int i = 0; i < 3; i++) {list.add(rand.nextInt(100));}return list;}}

Spring内置的ThreadPoolTaskExecutor(开发常用)

Spring框架内置线程池

package cn.study.com.configbean;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@EnableAsync
public class AsyncScheduledTaskConfig implements AsyncConfigurer {@Bean("checkAsync")  // 将当前方法返回的对象,存到容器里public Executor scheduledTaskAsync() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//最大线程数executor.setMaxPoolSize(10);//核心线程数executor.setCorePoolSize(5);//任务队列大小executor.setQueueCapacity(5);//线程存活时间 当超过30s后,线程池中存有的线程数量大于核心线程,触发超时回收executor.setKeepAliveSeconds(30);//拒绝处理策略:回收最老的任务executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());// or 自定义拒绝策略【1.记录错误信息 2.通知消息系统,发送告警信息】executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {}});executor.setThreadNamePrefix("Async-Thread-Pool-");//初始化executor.initialize();return executor;}
}

使用注解,通过线程池异步执行方法。

@Async("checkAsync")
public Boolean deleteFolder(String bucketName, String path) {log.info("当前Minio文件夹清除线程:"+Thread.currentThread().getName());MinioFileManager minioFileManager = new MinioFileManager(minioHost, accessKey, secretKey);return minioFileManager.deleteFolder(bucketName, path);
}

依赖注入,并发执行任务。

@Autowired
private ExecutorService executorService;Future<List<User>> f1 = executorService.submit(()-> {List<User> userList = queryUserList();return userList;
});
Future<List<Order>> f2 = executorService.submit(()-> {List<Order> orderList = queryOrderList();return orderList;
});
// 汇总信息
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("user", f1.get());
resultMap.put("order", f2.get());

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/77791.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【前沿】成像“跨界”测量——扫焦光场成像

01 背景 眼睛是人类认识世界的重要“窗口”&#xff0c;而相机作为眼睛的“延伸”&#xff0c;已经成为生产生活中最常见的工具之一&#xff0c;广泛应用于工业检测、医疗诊断与影音娱乐等领域。传统相机通常以“所见即所得”的方式记录场景&#xff0c;传感器捕捉到的二维图像…

TM1640学习手册及示例代码

数据手册 TM1640数据手册 数据手册解读 这里我们看管脚定义DIN和SCLK&#xff0c;一个数据线一个时钟线 SEG1~SEG8为段码&#xff0c;GRID1~GRID16为位码&#xff08;共阴极情况下&#xff09; 这里VDD给5V 数据指令 数据命令设置 地址命令设置 显示控制命令 共阴极硬件连接图…

uni-app 开发企业级小程序课程

课程大小&#xff1a;7.7G 课程下载&#xff1a;https://download.csdn.net/download/m0_66047725/90616393 更多资源下载&#xff1a;关注我 备注&#xff1a;缺少两个视频5-14 tabs组件进行基本的数据展示和搜索历史 处理searchData的删除操作 1-1导学.mp4 2-10小程序内…

判断点是否在多边形内

代码段解析: const intersect = ((yi > y) !== (yj > y)) && (x < (xj - xi) * (y - yi) / (yj - yi) + xi); 第一部分:(yi > y) !== (yj > y) 作用:检查点 (x,y) 的垂直位置是否跨越多边形的当前边。 yi > y 和 yj > y 分别检查边的两个端…

【redis】集群 如何搭建集群详解

文章目录 集群搭建1. 创建目录和配置2. 编写 docker-compose.yml完整配置文件 3. 启动容器4. 构建集群超时 集群搭建 基于 docker 在我们云服务器上搭建出一个 redis 集群出来 当前节点&#xff0c;主要是因为我们只有一个云服务器&#xff0c;搞分布式系统&#xff0c;就比较…

[langchain教程]langchain03——用langchain构建RAG应用

RAG RAG过程 离线过程&#xff1a; 加载文档将文档按一定条件切割成片段将切割的文本片段转为向量&#xff0c;存入检索引擎&#xff08;向量库&#xff09; 在线过程&#xff1a; 用户输入Query&#xff0c;将Query转为向量从向量库检索&#xff0c;获得相似度TopN信息将…

C语言复习笔记--字符函数和字符串函数(下)

在上篇我们了解了部分字符函数及字符串函数,下面我们来看剩下的字符串函数. strstr 的使用和模拟实现 老规矩,我们先了解一下strstr这个函数,下面看下这个函数的函数原型. char * strstr ( const char * str1, const char * str2); 如果没找到就返回NULL指针. 下面我们看下它的…

FreeRTOS中的优先级翻转问题及其解决方案:互斥信号量详解

FreeRTOS中的优先级翻转问题及其解决方案&#xff1a;互斥信号量详解 在实时操作系统中&#xff0c;任务调度是基于优先级的&#xff0c;高优先级任务应该优先于低优先级任务执行。但在实际应用中&#xff0c;有时会出现"优先级翻转"的现象&#xff0c;严重影响系统…

深度学习-全连接神经网络

四、参数初始化 神经网络的参数初始化是训练深度学习模型的关键步骤之一。初始化参数&#xff08;通常是权重和偏置&#xff09;会对模型的训练速度、收敛性以及最终的性能产生重要影响。下面是关于神经网络参数初始化的一些常见方法及其相关知识点。 官方文档参考&#xff1…

GIS开发笔记(9)结合osg及osgEarth实现三维球经纬网格绘制及显隐

一、实现效果 二、实现原理 按照5的间隔分别创建经纬线的节点,挂在到组合节点,组合节点挂接到根节点。可以根据需要设置间隔度数和线宽、线的颜色。 三、参考代码 //创建经纬线的节点 osg::Node *GlobeWidget::createGraticuleGeometry(float interval, const osg::Vec4 …

《Relay IR的基石:expr.h 中的表达式类型系统剖析》

TVM Relay源码深度解读 文章目录 TVM Relay源码深度解读一 、从Constant看Relay表达式的设计哲学1. 类定义概述2. ConstantNode 详解1. 核心成员2. 关键方法3. 类型系统注册 3. Constant 详解1. 核心功能 二. 核心内容概述(1) Relay表达式基类1. RelayExprNode 和 RelayExpr 的…

自动驾驶地图数据传输协议ADASIS v2

ADASIS&#xff08;Advanced Driver Assistance Systems Interface Specification&#xff09;直译过来就是 ADAS 接口规格&#xff0c;它要负责的东西其实很简单&#xff0c;就是为自动驾驶车辆提供前方道路交通相关的数据&#xff0c;这些数据被抽象成一个标准化的概念&#…

Flutter 状态管理 Riverpod

Android Studio版本 Flutter SDK 版本 将依赖项添加到您的应用 flutter pub add flutter_riverpod flutter pub add riverpod_annotation flutter pub add dev:riverpod_generator flutter pub add dev:build_runner flutter pub add dev:custom_lint flutter pub add dev:riv…

【EasyPan】MySQL主键与索引核心作用解析

【EasyPan】项目常见问题解答&#xff08;自用&持续更新中…&#xff09;汇总版 MySQL主键与索引核心作用解析 一、主键&#xff08;PRIMARY KEY&#xff09;核心作用 1. 数据唯一标识 -- 创建表时定义主键 CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,use…

IcePlayer音乐播放器项目分析及学习指南

IcePlayer音乐播放器项目分析及学习指南 项目概述 IcePlayer是一个基于Qt5框架开发的音乐播放器应用程序&#xff0c;使用Visual Studio 2013作为开发环境。该项目实现了音乐播放、歌词显示、专辑图片获取等功能&#xff0c;展现了桌面应用程序开发的核心技术和设计思想。 技…

vscode 打开新页签

目录 vscode 打开新页签 完整settings.json内容&#xff1a; vscode 打开新页签 .vscode目录中 新建settings.json 在 settings.json 文件中&#xff0c;添加或修改以下行&#xff1a; json "workbench.editor.enablePreview": false 这将禁用预览模式&#xff0…

C语言高频面试题——常量指针与指针常量区别

1. 常量指针&#xff08;Pointer to Constant&#xff09; 定义&#xff1a; 常量指针是指向一个常量数据的指针&#xff0c;即指针指向的内容不能通过该指针被修改。 语法&#xff1a; const int* ptr;或者&#xff1a; int const* ptr;解释&#xff1a; const修饰的是指…

c++基础·列表初始化

目录 一、列表初始化的核心优势 二、基础数据类型与数组初始化 1. 基础类型初始化 2. 数组初始化 三、类与结构体初始化 1. 构造函数匹配规则 2. 注意事项 四、标准容器初始化 五、聚合类型&#xff08;Aggregate Types&#xff09;初始化 1. 聚合类型定义 2. 初始化…

数据分析与产品、运营、市场之间如何有效对齐

数据分析的重要性在于它能够将海量的原始信息转化为可操作的洞察。以产品开发为例,通过用户行为数据的分析,产品经理可以清晰了解哪些功能被频繁使用,哪些设计导致用户流失,从而优化迭代方向。运营团队则依靠数据分析来监控供应链效率、预测需求波动,甚至通过实时数据调整…

[C]基础11.深入理解指针(3)

博客主页&#xff1a;向不悔本篇专栏&#xff1a;[C]您的支持&#xff0c;是我的创作动力。 文章目录 0、总结1、字符指针变量2、数组指针变量2.1 数组指针变量是什么&#xff1f;2.2 数组指针变量怎么初始化&#xff1f; 3、二维数组传参的本质4、函数指针变量4.1 函数指针变量…