线程池使用实战
- 1. 线程池使用
- 1.1 例子1
- 1.2 例子2
1. 线程池使用
// 使用 ThreadPoolExecutor 创建线程池private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // 核心线程数Runtime.getRuntime().availableProcessors() * 2, // 最大线程数60, // 线程空闲时间TimeUnit.SECONDS,new ArrayBlockingQueue<>(100), // 阻塞队列大小new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用线程执行任务);
解释
-
executorService 是一个线程池,用于在多线程环境下并发执行任务。线程池的配置如下:
-
核心线程数(corePoolSize):Runtime.getRuntime().availableProcessors(),即根据 CPU 可用核心数设置核心线程数,这意味着它将使用与 CPU 核心数相同的线程数来保持一定的并发性能和资源利用率。
-
最大线程数(maximumPoolSize):设置为 Runtime.getRuntime().availableProcessors() * 2。最大线程数是线程池允许的最大并发线程数,当队列已满且核心线程都在工作时,会使用非核心线程来处理任务。这一配置保证了可以根据需求增加线程以应对高负载。
-
线程空闲时间(keepAliveTime):当线程池中线程数超过核心线程数时,多余的线程会在空闲超过 60 秒后被终止。
-
阻塞队列:new ArrayBlockingQueue<>(100) 表示线程池使用一个固定大小为 100 的阻塞队列存放任务。如果所有线程都在忙碌,且队列已满,新任务将根据拒绝策略处理。
-
拒绝策略:ThreadPoolExecutor.CallerRunsPolicy 表示当任务无法被线程池处理时(例如线程池已满并且队列已满),该策略将由提交任务的线程来执行任务,而不是抛出异常。这在某些场景下可以防止任务丢失,但会减缓主线程的执行速度。
1.1 例子1
/*** 将信息传送给阿里方OSS** @param transmissionRequestDto 包括上传时间与文件类型 上传时间如20241025*/@Async@Overridepublic void transmission(TransmissionRequestDto transmissionRequestDto) {// 检查传输请求参数是否有效if (null == transmissionRequestDto || CollectionUtils.isEmpty(transmissionRequestDto.getUploadTime())) {throw new BusinessException("参数错误!");}// 查询文章全部信息List<TeachPapers> teachPapers = teachPapersMapper.find2024Papers();log.info("KeyAchievementOpServiceimpl:: transmission =>2024年论文信息共计:{}条", teachPapers.size());if (null == teachPapers || teachPapers.isEmpty()) {log.warn("KeyAchievementOpServiceimpl:: transmission => 未找到论文信息");}// 记录开始执行时间long startTimeMillis = System.currentTimeMillis();log.info("KeyAchievementOpServiceimpl:: transmission =>OSS下载地址传输 ,任务开始执行,当前时间:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));// 初始化成功更新计数器int totalSuccessfulUpdates = 0;// 定义批次大小int batchSize = 50;// 用于存储所有异步任务的FutureList<CompletableFuture<Integer>> futures = new ArrayList<>();// 将teachPapers分批并行处理for (int i = 0; i < teachPapers.size(); i += batchSize) {int end = Math.min(i + batchSize, teachPapers.size());List<TeachPapers> batch = teachPapers.subList(i, end);// 如果批次为空,则跳过if (CollUtil.isEmpty(batch)) {continue;}// 将每个批次提交为异步任务,并添加到 CompletableFuture 列表中CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {return processTeachPapers(batch, transmissionRequestDto);} catch (Exception e) {log.error("KeyAchievementOpServiceimpl:: transmission => 处理批次过程中发生异常,跳过该批次:message{}", e.getMessage(), e);return 0; // 异常时返回0,表示此批次没有成功更新}},executorService);futures.add(future);}// 等待所有批次处理完成for (CompletableFuture<Integer> future : futures) {try {// 获取每个批次的成功更新数量totalSuccessfulUpdates += future.get();} catch (Exception e) {// 记录异常信息log.error("批次处理过程中发生异常: ", e);}}// 关闭线程池executorService.shutdown();try {// 等待线程池终止if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {log.warn("KeyAchievementOpServiceimpl:: transmission => 线程池在超时后仍未终止,强制关闭线程池");executorService.shutdownNow();}} catch (InterruptedException e) {// 如果中断,则关闭线程池log.error("KeyAchievementOpServiceimpl:: transmission => 线程池等待终止时被中断,强制关闭线程池,message:{}", e.getMessage(), e);// 如果中断,则关闭线程池executorService.shutdownNow();}// 记录结束执行时间long endTimeMillis = System.currentTimeMillis();long executionTimeMillis = endTimeMillis - startTimeMillis;// 记录任务完成日志,包括耗时和成功更新数量log.info("KeyAchievementOpServiceimpl:: transmission =>OSS下载地址传输 ,任务执行完成,共耗时:{} 秒,累计成功更新总数: {}", executionTimeMillis / 1000.0, totalSuccessfulUpdates);}
解释代码:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {return processTeachPapers(batch, transmissionRequestDto);} catch (Exception e) {log.error("KeyAchievementOpServiceimpl:: transmission => 处理批次过程中发生异常,跳过该批次:message{}", e.getMessage(), e);return 0; // 异常时返回0,表示此批次没有成功更新}},executorService
);
futures.add(future);
该部分代码使用 CompletableFuture 来异步执行任务,并将结果存入 futures 列表。
-
CompletableFuture.supplyAsync:supplyAsync 方法用于异步执行任务,并返回一个 CompletableFuture 对象。任务的实际执行由线程池 executorService 管理。
-
任务执行内容:在 supplyAsync 方法中,任务执行内容是 processTeachPapers(batch, transmissionRequestDto) 方法,该方法用于处理数据批次 batch。如果在执行过程中发生异常,会捕获异常并记录日志,同时返回 0 表示该批次没有成功更新。
-
futures.add(future);:将 CompletableFuture 对象 future 添加到 futures 列表中。这样,可以在所有异步任务提交后,对 futures 列表中的所有 CompletableFuture 进行统一处理,等待任务完成并收集结果。
1.2 例子2
/*** 异步获取最新领域成果* <p>* 本方法旨在从技术树节点中提取并处理最新成果信息,以异步方式执行以提高系统响应性* 它首先查询所有技术树节点,然后分批处理这些节点,以避免一次性加载过多数据导致内存溢出* 使用多线程处理每个批次,以加速任务执行*/@Async@Overridepublic void getLatestResult() {// 记录任务开始执行的时间long startTimeMillis = System.currentTimeMillis();log.info("KeyAchievementOpServiceimpl:: getLatestResult =>获取最新领域成果 ,任务开始执行,当前时间:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));try {// 查询已生成重点成果的golaxy_vocab_idList<Long> generatedIds = techniqueTreeNodeMapper.generatedIdRecords();// 查询未生成记录的TechniqueTreeNode集合List<TechniqueTreeNode> techniqueTreeNodeList = techniqueTreeNodeMapper.selectAll().stream().filter(node -> !generatedIds.contains(node.getGolaxyVocabId())).collect(Collectors.toList());// 定义批次大小,用于分批处理数据int batchSize = 100;// 分批处理技术树节点List<List<TechniqueTreeNode>> batches = new ArrayList<>();for (int i = 0; i < techniqueTreeNodeList.size(); i += batchSize) {int end = Math.min(i + batchSize, techniqueTreeNodeList.size());batches.add(techniqueTreeNodeList.subList(i, end));}// 使用线程池处理每个批次,以并行方式处理数据for (List<TechniqueTreeNode> batch : batches) {executorService.submit(() -> processBatch(batch));}} catch (Exception e) {// 记录任务执行中的异常信息log.error("KeyAchievementOpServiceimpl:: getLatestResult =>获取最新领域成果,任务执行失败:{}", e.getMessage(), e);}// 记录任务完成执行的时间long endTimeMillis = System.currentTimeMillis();// 计算任务执行耗时long executionTimeMillis = endTimeMillis - startTimeMillis;log.info("KeyAchievementOpServiceimpl:: getLatestResult =>获取最新领域成果 ,任务执行完成,共耗时:{} 小时", executionTimeMillis / 3600000.0);}
for (List<TechniqueTreeNode> batch : batches) {executorService.submit(() -> processBatch(batch));
}
此段代码使用 executorService 线程池并发处理每个批次的节点:
- 通过遍历 batches 列表,将每个批次提交为一个异步任务。
- 每个批次的处理任务封装在 processBatch 方法中,通过 submit 方法提交给 executorService 执行。
- 这样,多个批次可以并发处理,从而加速整个任务的执行。