线程池使用实战

线程池使用实战

  • 1. 线程池使用
    • 1.1 例子1
    • 1.2 例子2

1. 线程池使用

 // 使用 ThreadPoolExecutor 创建线程池private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),                   // 核心线程数Runtime.getRuntime().availableProcessors() * 2,                   // 最大线程数60,                   // 线程空闲时间TimeUnit.SECONDS,new ArrayBlockingQueue<>(100), // 阻塞队列大小new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用线程执行任务);

解释

  • executorService 是一个线程池,用于在多线程环境下并发执行任务。线程池的配置如下:

  • 核心线程数(corePoolSize):Runtime.getRuntime().availableProcessors(),即根据 CPU 可用核心数设置核心线程数,这意味着它将使用与 CPU 核心数相同的线程数来保持一定的并发性能和资源利用率。

  • 最大线程数(maximumPoolSize):设置为 Runtime.getRuntime().availableProcessors() * 2。最大线程数是线程池允许的最大并发线程数,当队列已满且核心线程都在工作时,会使用非核心线程来处理任务。这一配置保证了可以根据需求增加线程以应对高负载。

  • 线程空闲时间(keepAliveTime):当线程池中线程数超过核心线程数时,多余的线程会在空闲超过 60 秒后被终止。

  • 阻塞队列:new ArrayBlockingQueue<>(100) 表示线程池使用一个固定大小为 100 的阻塞队列存放任务。如果所有线程都在忙碌,且队列已满,新任务将根据拒绝策略处理。

  • 拒绝策略:ThreadPoolExecutor.CallerRunsPolicy 表示当任务无法被线程池处理时(例如线程池已满并且队列已满),该策略将由提交任务的线程来执行任务,而不是抛出异常。这在某些场景下可以防止任务丢失,但会减缓主线程的执行速度。

1.1 例子1

/*** 将信息传送给阿里方OSS** @param transmissionRequestDto 包括上传时间与文件类型 上传时间如20241025*/@Async@Overridepublic void transmission(TransmissionRequestDto transmissionRequestDto) {// 检查传输请求参数是否有效if (null == transmissionRequestDto || CollectionUtils.isEmpty(transmissionRequestDto.getUploadTime())) {throw new BusinessException("参数错误!");}// 查询文章全部信息List<TeachPapers> teachPapers = teachPapersMapper.find2024Papers();log.info("KeyAchievementOpServiceimpl:: transmission =>2024年论文信息共计:{}条", teachPapers.size());if (null == teachPapers || teachPapers.isEmpty()) {log.warn("KeyAchievementOpServiceimpl:: transmission => 未找到论文信息");}// 记录开始执行时间long startTimeMillis = System.currentTimeMillis();log.info("KeyAchievementOpServiceimpl:: transmission =>OSS下载地址传输 ,任务开始执行,当前时间:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));// 初始化成功更新计数器int totalSuccessfulUpdates = 0;// 定义批次大小int batchSize = 50;// 用于存储所有异步任务的FutureList<CompletableFuture<Integer>> futures = new ArrayList<>();// 将teachPapers分批并行处理for (int i = 0; i < teachPapers.size(); i += batchSize) {int end = Math.min(i + batchSize, teachPapers.size());List<TeachPapers> batch = teachPapers.subList(i, end);// 如果批次为空,则跳过if (CollUtil.isEmpty(batch)) {continue;}// 将每个批次提交为异步任务,并添加到 CompletableFuture 列表中CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {return processTeachPapers(batch, transmissionRequestDto);} catch (Exception e) {log.error("KeyAchievementOpServiceimpl:: transmission => 处理批次过程中发生异常,跳过该批次:message{}", e.getMessage(), e);return 0; // 异常时返回0,表示此批次没有成功更新}},executorService);futures.add(future);}// 等待所有批次处理完成for (CompletableFuture<Integer> future : futures) {try {// 获取每个批次的成功更新数量totalSuccessfulUpdates += future.get();} catch (Exception e) {// 记录异常信息log.error("批次处理过程中发生异常: ", e);}}// 关闭线程池executorService.shutdown();try {// 等待线程池终止if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {log.warn("KeyAchievementOpServiceimpl:: transmission => 线程池在超时后仍未终止,强制关闭线程池");executorService.shutdownNow();}} catch (InterruptedException e) {// 如果中断,则关闭线程池log.error("KeyAchievementOpServiceimpl:: transmission => 线程池等待终止时被中断,强制关闭线程池,message:{}", e.getMessage(), e);// 如果中断,则关闭线程池executorService.shutdownNow();}// 记录结束执行时间long endTimeMillis = System.currentTimeMillis();long executionTimeMillis = endTimeMillis - startTimeMillis;// 记录任务完成日志,包括耗时和成功更新数量log.info("KeyAchievementOpServiceimpl:: transmission =>OSS下载地址传输 ,任务执行完成,共耗时:{} 秒,累计成功更新总数: {}", executionTimeMillis / 1000.0, totalSuccessfulUpdates);}

解释代码:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {return processTeachPapers(batch, transmissionRequestDto);} catch (Exception e) {log.error("KeyAchievementOpServiceimpl:: transmission => 处理批次过程中发生异常,跳过该批次:message{}", e.getMessage(), e);return 0; // 异常时返回0,表示此批次没有成功更新}},executorService
);
futures.add(future);

该部分代码使用 CompletableFuture 来异步执行任务,并将结果存入 futures 列表。

  • CompletableFuture.supplyAsync:supplyAsync 方法用于异步执行任务,并返回一个 CompletableFuture 对象。任务的实际执行由线程池 executorService 管理。

  • 任务执行内容:在 supplyAsync 方法中,任务执行内容是 processTeachPapers(batch, transmissionRequestDto) 方法,该方法用于处理数据批次 batch。如果在执行过程中发生异常,会捕获异常并记录日志,同时返回 0 表示该批次没有成功更新。

  • futures.add(future);:将 CompletableFuture 对象 future 添加到 futures 列表中。这样,可以在所有异步任务提交后,对 futures 列表中的所有 CompletableFuture 进行统一处理,等待任务完成并收集结果。

1.2 例子2

 /*** 异步获取最新领域成果* <p>* 本方法旨在从技术树节点中提取并处理最新成果信息,以异步方式执行以提高系统响应性* 它首先查询所有技术树节点,然后分批处理这些节点,以避免一次性加载过多数据导致内存溢出* 使用多线程处理每个批次,以加速任务执行*/@Async@Overridepublic void getLatestResult() {// 记录任务开始执行的时间long startTimeMillis = System.currentTimeMillis();log.info("KeyAchievementOpServiceimpl:: getLatestResult =>获取最新领域成果 ,任务开始执行,当前时间:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));try {// 查询已生成重点成果的golaxy_vocab_idList<Long> generatedIds = techniqueTreeNodeMapper.generatedIdRecords();// 查询未生成记录的TechniqueTreeNode集合List<TechniqueTreeNode> techniqueTreeNodeList = techniqueTreeNodeMapper.selectAll().stream().filter(node -> !generatedIds.contains(node.getGolaxyVocabId())).collect(Collectors.toList());// 定义批次大小,用于分批处理数据int batchSize = 100;// 分批处理技术树节点List<List<TechniqueTreeNode>> batches = new ArrayList<>();for (int i = 0; i < techniqueTreeNodeList.size(); i += batchSize) {int end = Math.min(i + batchSize, techniqueTreeNodeList.size());batches.add(techniqueTreeNodeList.subList(i, end));}// 使用线程池处理每个批次,以并行方式处理数据for (List<TechniqueTreeNode> batch : batches) {executorService.submit(() -> processBatch(batch));}} catch (Exception e) {// 记录任务执行中的异常信息log.error("KeyAchievementOpServiceimpl:: getLatestResult =>获取最新领域成果,任务执行失败:{}", e.getMessage(), e);}// 记录任务完成执行的时间long endTimeMillis = System.currentTimeMillis();// 计算任务执行耗时long executionTimeMillis = endTimeMillis - startTimeMillis;log.info("KeyAchievementOpServiceimpl:: getLatestResult =>获取最新领域成果 ,任务执行完成,共耗时:{} 小时", executionTimeMillis / 3600000.0);}
for (List<TechniqueTreeNode> batch : batches) {executorService.submit(() -> processBatch(batch));
}

此段代码使用 executorService 线程池并发处理每个批次的节点:

  • 通过遍历 batches 列表,将每个批次提交为一个异步任务。
  • 每个批次的处理任务封装在 processBatch 方法中,通过 submit 方法提交给 executorService 执行。
  • 这样,多个批次可以并发处理,从而加速整个任务的执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/59442.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

asp.net网站项目如何设置定时器,定时获取数据

在 Global.asax.cs 文件中编写代码来初始化和启动定时器。Global.asax.cs 文件定义了应用程序全局事件&#xff0c;比如应用程序的启动和结束。在这里&#xff0c;我们将在应用程序启动时初始化和启动定时器。 using System; using System.Timers;public class Global : Syste…

【数据仓库】Hive 拉链表实践

背景 拉链表是一种数据模型&#xff0c;主要是针对数据仓库设计中表存储数据的方式而定义的&#xff1b;顾名思义&#xff0c;所谓拉链表&#xff0c;就是记录历史。记录一个事务从开始一直到当前状态的所有变化的信息。 拉链表可以避免按每一天存储所有记录造成的海量存储问题…

Mysql在oracle的安装与配置(怕忘)

1、获取iso 安装oracle:https://mirrors.tuna.tsinghua.edu.cn/openeuler/openEuler-24.03-LTS/ISO/x86_64/openEuler-24.03-LTS-x86_64-dvd.iso openEuler-22.03-LTS-x86_64-dvd.iso 2、安装os 手动设置固定IP,建议大家网卡vm net8 网关:x.x.x.2 DNS:114.114.114.11…

日常工作采坑,关于图片压缩哪些坑一次性踩完。

文章目录 0.前言1.代码实现2.压缩工具包的配置 0.前言 首先说明一下这个图片压缩为什么那么艰难&#xff0c;主要原因还是在于需求过于奇葩。比较奇葩的原因有如下几点&#xff1a;   1.图片是一个很大的文件&#xff0c;我长这么大还没见过这个大的文件。图下可以图片文件可…

语音识别ic赋能烤箱,离线对话操控,引领智能厨房新体验

一、智能烤箱产品的行业背景 随着科技的飞速发展&#xff0c;智能家居已经成为现代家庭的新宠。智能烤箱作为智能家居的重要组成部分&#xff0c;正逐渐从高端市场走向普通家庭。消费者对于烤箱的需求不再仅仅局限于基本的烘焙功能&#xff0c;而是更加注重其智能化、便捷化和…

LeetCode 每日一题 2024/10/28-2024/11/3

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录 10/28 685. 冗余连接 II10/29 3211. 生成不含相邻零的二进制字符串10/30 3216. 交换后字典序最小的字符串10/31 3165. 不包含相邻元素的子序列的最大和11/1 3259. 超级饮料…

faiss用于大数据量的向量检索

背景:10亿(Billion级别)的数据应该是一个很大的数据了,尤其是维度在768+级别(还有1024,1536等),这个数据量我做了一个实验,shape为(1kw,768)的array(numpy)占内存为30G(float32格式),如果能降低为float16更好不过,但似乎faiss没有这种方法或者精度有所损失。 …

一文详解开源ETL工具Kettle!

一、Kettle 是什么 Kettle 是一款开源的 ETL&#xff08;Extract - Transform - Load&#xff09;工具&#xff0c;用于数据抽取、转换和加载。它提供了一个可视化的设计环境&#xff0c;允许用户通过简单的拖拽和配置操作来构建复杂的数据处理工作流&#xff0c;能够处理各种数…

D59【python 接口自动化学习】- python基础之异常

day59 捕获异常常见问题 学习日期&#xff1a;20241105 学习目标&#xff1a;异常 -- 75 避坑指南&#xff1a;编写捕获异常程序时经常出现的问题 学习笔记&#xff1a; 捕获位置设置不当 设置范围不当 捕获处理设置不当 嵌套try-except语法错误 总结 位置&#xff0c;范围…

深度学习在大数据处理中的应用

深度学习在大数据处理中扮演着至关重要的角色&#xff0c;其应用广泛且深入。以下是一些深度学习在大数据处理中的具体应用&#xff1a; 1. 自然语言处理&#xff08;NLP&#xff09; 深度学习技术在大数据处理中的自然语言处理方面取得了显著进展。语义理解方面&#xff0c;…

Java开发配置文件的详情教程配置文件类型

学习总结 1、掌握 JAVA入门到进阶知识(持续写作中……&#xff09; 2、学会Oracle数据库入门到入土用法(创作中……&#xff09; 3、手把手教你开发炫酷的vbs脚本制作(完善中……&#xff09; 4、牛逼哄哄的 IDEA编程利器技巧(编写中……&#xff09; 5、面经吐血整理的 面试技…

应审稿人要求| pseudo bulk差异分析

一、写在前面 最近有粉丝提问&#xff0c;收到了如下的审稿人意见&#xff1a; 审稿人认为在单细胞测序过程中&#xff0c;利用findMarker通过Wilcox获得的差异基因虽然考虑到了不同组别细胞数量的不同&#xff0c;但是未能考虑到每组样本数量的不同。因此作者希望纳入样本水平…

Android13 系统/用户证书安装相关分析总结(二) 如何增加一个安装系统证书的接口

一、前言 接着上回说&#xff0c;最初是为了写一个SDK的接口&#xff0c;需求大致是增加证书安装卸载的接口&#xff08;系统、用户&#xff09;。于是了解了一下证书相关的处理逻辑&#xff0c;在了解了功能和流程之后&#xff0c;发现settings中支持安装的证书&#xff0c;只…

矩阵特殊打印方式

小伙伴们大家好&#xff0c;好几天没更新了&#xff0c;主要有个比赛。从今天起继续给大家更新&#xff0c;今天给大家带来一种新的题型&#xff1a;矩阵特殊打印方式。 螺旋打印矩阵 解题思路 首先给大家看一下什么是螺旋方式打印&#xff1a; 就像这样一直转圈圈。 我想大多…

IO同步异步/阻塞非阻塞

同步和异步&#xff1a;当前线程是否需要等待方法调用执行完毕。 阻塞和非阻塞&#xff1a;当前接口数据还未准备就绪时&#xff0c;线程是否被阻塞挂起 同步和异步其实是处理框架这种高层次维度来看待问题的&#xff0c;而阻塞和非阻塞往往是针对底层的系统调用方法来抉择&a…

C语言 流程控制语句

时间&#xff1a;2024.11.5 一、学习内容 流程控制语句&#xff1a; 通过一些语句&#xff0c;控制程序的执行流程。 1、顺序结构 从上往下依次执行&#xff0c;是程序默认的执行过程。 2、if的第一种格式 if(关系表达式) { 语句体&#xff1b; } //考试奖励&#xff1a;…

03集合基础

目录 1.集合 Collection Map 常用集合 List 接口及其实现 Set 接口及其实现 Map 接口及其实现 Queue 接口及其实现 Deque 接口及其实现 Stack类 并发集合类 工具类 2.ArrayList 3.LinkedList 单向链表的实现 1. 节点类&#xff08;Node&#xff09; 2. 链表类&a…

HTMLCSS:3D 旋转卡片的炫酷动画

效果演示 这段代码是一个HTML和CSS的组合&#xff0c;用于创建一个具有3D效果的动画卡片。 HTML <div class"obj"><div class"objchild"><span class"inn6"><h3 class"text">我是谁&#xff1f;我在那<…

总结:Vue2中双向绑定不生效的排查方法及原理

之前陆陆续续的学习了Vue2的双向绑定,深度监视,但是真正使用时,需要将它们融会贯通,还是需要刻意的练习和记忆的。我常常遇到的问题是,当页面上某element UI控件与data中的某属性进行了双向绑定,但是,要么是data中的属性数据发生了更新之后页面未实时更新,要么是页面上…

网络自动化03:简单解释send_config_set方法并举例

目录 拓扑图设备信息 netmiko涉及方法send_config_set()方法的简单示例代码输出结果代码解释导入模块配置信息config_device_interface_description 函数主程序块总结 send_config_set方法参数&#xff1a;1. enter_config_mode2. config_commands3. enter_config_mode4. error…