基于SpringBoot自定义线程池实现多线程执行方法,以及多线程之间的协调和同步

前言

在服务端开发中,多线程开发是非常重要的。因为多线程可以同时处理多个请求,从而提高应用程序的性能,大大改善用户体验。

一、先来了解三个问题

1.在SpringBoot项目中为啥需要自定义线程池?

(1)在SpringBoot项目中,通常会有很多异步的任务需要执行,比如发送邮件、短信、推送等。如果这些任务都直接在主线程中执行,会导致主线程被阻塞,影响用户的体验。因此,通常会使用线程池来管理这些异步任务,从而提高系统的性能和并发能力。
(2)SpringBoot默认提供了一个线程池,但是它的默认配置可能并不适合所有的应用场景。如果应用中的异步任务比较密集,可能会导致线程池中的线程不足,从而影响系统的性能。此时,就需要自行定义线程池,根据应用的实际情况来配置线程池的大小和其他参数,以达到最优的性能表现。
(3)另外,自行定义线程池还可以避免线程池满载时的任务被拒绝执行的问题,从而提高系统的稳定性。

2.java.util.concurrent.CountDownLatch这个类有啥作用?

(1)CountDownLatch 是 Java 中的一个同步工具类,用于协调多个线程之间的执行。它可以让某个线程等待直到倒计时器计数器为 0,然后再继续执行。 
(2)CountDownLatch 的作用是,它可以让一个或多个线程等待其他线程执行完毕后再继续执行。在某些场景下,我们需要等待多个线程都执行完毕后才能进行下一步操作,这时候就可以使用 CountDownLatch。
(3)CountDownLatch 的使用方式是,首先创建一个计数器,然后在需要等待的线程中调用计数器的 countDown() 方法,每次调用会将计数器减 1。在需要等待的线程中调用 await() 方法,该方法会一直阻塞直到计数器为 0。当计数器为 0 时,所有等待的线程都会被唤醒,继续执行下一步操作。
(4)例如,我们可以在主线程中创建一个 CountDownLatch,然后将其传递给多个子线程,子线程在执行完任务后调用 countDown() 方法,主线程在需要等待子线程执行完毕后再继续执行时调用 await() 方法,这样就可以实现多个线程之间的协调和同步。

3.同一个类里面,for循环调用异步方法会被串行同步?

(1)在SpringBoot的自定义线程池中,同一个类里面,for循环调用异步方法会被串行同步执行的原因是因为异步方法默认使用的是调用线程的线程池,而在同一个类中,for循环中的所有异步方法都是由同一个调用线程调用的,因此它们会使用同一个线程池,导致它们被串行同步执行。
(2)要解决这个问题,可以在异步方法上添加@Async注解,并在调用异步方法的地方使用代理对象调用。这样每次调用异步方法时,都会使用新的线程池,避免了同一个线程池被多个异步方法共享的问题,从而实现并行执行。另外,为了避免for循环中的异步方法过多导致线程池资源耗尽,可以考虑使用线程池的拒绝策略来处理任务过多的情况。

二、示例代码

1.自定义线程池

(1)CommonThreadPoolConfig.java

package org.example.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** 公共线程池配置*/
@EnableAsync
@Configuration
public class CommonThreadPoolConfig {@Bean("CommonThreadPoolExecutor")public Executor syncExecutor() {// 获取可用处理器的Java虚拟机的数量int sum = Runtime.getRuntime().availableProcessors();System.out.println("系统最大线程数 -> " + sum);// 实例化自定义线程池ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置线程池中的核心线程数(最小线程数)// 线程池的核心线程数指的是线程池中一直存在的线程数量,即使它们没有任务可执行,处于空闲状态。// 如果线程池中的线程数小于核心线程数,则创建新线程来处理任务,即使其他空闲线程可用。// 如果线程池中的线程数已经等于核心线程数,那么新的任务就会被放入任务队列等待执行executor.setCorePoolSize(16);// 设置线程池中的最大线程数// 如果线程池中的线程数已经达到了核心线程数,并且任务队列已满,则创建新线程来处理任务。// 如果线程池中的线程数等于最大线程数,则任务将被拒绝。executor.setMaxPoolSize(64);// 设置线程池中任务队列的容量// 线程池中的任务队列用于存储还未被执行的任务,当线程池中的线程已经全部被占用时,新的任务会被放入任务队列中等待执行。如果任务队列已满,那么新的任务就会被拒绝执行。executor.setQueueCapacity(500);// 设置线程池中空闲线程的存活时间// 当线程池中的某个线程执行完任务后,如果当前线程池中的线程数大于核心线程数,那么这个空闲线程就会被放入线程池的等待队列中。// 在等待队列中的空闲线程,如果在`keepAliveSeconds`时间内没有被再次使用,就会被回收销毁,以释放系统资源。如果`keepAliveSeconds`设置为0,则表示空闲线程立即被回收销毁。executor.setKeepAliveSeconds(60);// 设置线程池中线程的名称前缀// 线程池中的每个线程都有一个唯一的名称,这个名称通常是由线程池的名称和线程的编号组成的。使用`setThreadNamePrefix()`方法可以在默认的线程名前面添加一个前缀,以便更好地区分不同的线程池。executor.setThreadNamePrefix("async-");// 设置线程池关闭时等待所有任务完成的时间。// 当调用executor.shutdown()方法关闭线程池时,线程池会等待一段时间,如果在这段时间内所有任务都完成了,线程池会正常关闭;如果还有任务没有完成,线程池将强制关闭,未完成的任务将被丢弃。executor.setAwaitTerminationSeconds(60);// 设置线程池中任务队列已满时的拒绝策略,当线程池中的任务队列已满,而且线程池中的线程已经达到了最大线程数时,新的任务就无法被执行。这时就需要设置拒绝策略来处理这种情况。// setRejectedExecutionHandler()方法提供了几种拒绝策略,包括:// 1. AbortPolicy:直接抛出RejectedExecutionException异常,阻止系统正常运行。// 2. CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。// 3. DiscardOldestPolicy:丢弃队列里最老的一个任务,并执行当前任务。// 4. DiscardPolicy:不处理,直接丢弃掉当前任务。executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// 设置线程池在关闭时是否等待所有任务完成// 如果设置为`true`,则在调用`shutdown()`方法时,线程池会等待所有已提交的任务执行完毕后再关闭。// 如果设置为`false`,则在调用`shutdown()`方法时,线程池会立即关闭,未执行的任务将被丢弃。executor.setWaitForTasksToCompleteOnShutdown(true);// 初始化线程池的配置executor.initialize();return executor;}
}

2.控制层

(1)UserController.java

package org.example.controller;import org.example.service.impl.UserServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;@Controller
@RequestMapping(value = "api")
public class UserController {@Autowiredprivate UserServiceImpl userService;/*** 基于线程池的异步接口*/@GetMapping(value = "threadPoolAsyncTest")@ResponseBody@CrossOriginpublic <T> T threadPoolAsyncTest () throws InterruptedException {return userService.threadPoolAsyncTest();}/*** 基于线程池的同步任务*/@GetMapping(value = "threadPoolSyncTasks")@ResponseBody@CrossOriginpublic <T> T threadPoolSyncTasks () {return userService.threadPoolSyncTasks();}
}

3.接口层

(1)IUserService.java

package org.example.service;public interface IUserService {<T> T threadPoolAsyncTest();<T> T threadPoolSyncTasks();
}

(2)IAsyncService.java

package org.example.service;import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;public interface IAsyncService {void sendSms(String mobile, String content);void sendEmail(String email, String content);Future<String> sendCode(String mobile) throws InterruptedException;void syncTasks();void asyncSaveTask(List<String> blockTaskList, CountDownLatch countDownLatch);
}

4.实现层

(1)UserServiceImpl.java

package org.example.service.impl;import org.example.service.IAsyncService;
import org.example.service.IUserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;@Primary
@Service
public class UserServiceImpl implements IUserService {private static final Logger log = LoggerFactory.getLogger(UserServiceImpl.class);@Autowiredprivate IAsyncService asyncService;@Autowiredprivate CommonThreadPoolConfig commonThreadPoolConfig;// commonThreadPoolConfig.destroy(); // 关闭自定义线程池@Overridepublic <T> T threadPoolAsyncTest() {try {long startTime = System.currentTimeMillis();String mobile = "13801380000";String email = "123@abc.com";String content = "你好,世界!";asyncService.sendSms(mobile, content);asyncService.sendEmail(email, content);Future<String> future = asyncService.sendCode(mobile);// String result = future.get(); // 阻塞获取结果String result = "OK";long endTime = System.currentTimeMillis();log.info("Main cost {} ms, Future return 【{}】", endTime - startTime, result);return (T) "success";} catch (Exception e) {return (T) "fail";}}@Overridepublic <T> T threadPoolSyncTasks() {long startTime = System.currentTimeMillis();HashMap<String, Object> responseObj = new HashMap<>();asyncService.syncTasks();responseObj.put("code", 200);responseObj.put("success", true);responseObj.put("msg", "开始同步任务");long endTime = System.currentTimeMillis();log.info("threadPoolSyncTasks -> 线程名:{},运行时长:{} ms", Thread.currentThread().getName(), endTime - startTime); // 3 msreturn (T) responseObj;}/*** 同步任务*/public void syncTasks() {// 构建一个有100000个任务的列表 [Task-1 ~ Task-10000]List<String> taskList = new ArrayList<>();for (int i = 0; i < 10001; i++) {taskList.add("Task-" + (i + 1));}// 每个区块可容纳1000条任务int blockSize = 1000;// 区块数量 10int blockSum = taskList.size() % blockSize == 0 ? taskList.size() / blockSize : taskList.size() / blockSize + 1;// 以区块分段的任务列表List<List<String>> targetTaskList = new ArrayList<>();for (int i = 0; i < blockSum - 1; i++) { // 10 - 1 = 9// 0 [Task-1 ~ Task-1000]// ...// 8 [Task-8001 ~ Task-9000]targetTaskList.add(i, taskList.subList(i * blockSize, blockSize * (i + 1)));}// 9 [Task-9001 ~ Task-10000]targetTaskList.add(blockSum - 1, taskList.subList((blockSum - 1) * blockSize, taskList.size()));System.out.println(targetTaskList);this.batchSaveTask(targetTaskList);}/*** 批量同步任务*/public void batchSaveTask(List<List<String>> targetTaskList) {// 主线程中创建一个CountDownLatch计数器,数值为9,然后将其传递给多个子线程CountDownLatch countDownLatch = new CountDownLatch(targetTaskList.size());try {long startTime = System.currentTimeMillis();for (int i = 0; i < targetTaskList.size(); i++) {List<String> blockTaskList = targetTaskList.get(i);asyncService.asyncSaveTask(blockTaskList, countDownLatch);}// 主线程在需要等待子线程执行完毕后再继续执行时调用 await() 方法countDownLatch.await();long endTime = System.currentTimeMillis();log.info("batchSaveTask -> 线程名:{},所有任务都执行完毕,运行时长:{} ms", Thread.currentThread().getName(), endTime - startTime); // 1016 ms} catch (Exception e) {e.printStackTrace();}}
}

(2)AsyncServiceImpl.java

package org.example.service.impl;import org.example.service.IAsyncService;
import org.example.service.IUserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;@Primary
@Service
public class AsyncServiceImpl implements IAsyncService {private static final Logger log = LoggerFactory.getLogger(AsyncServiceImpl.class);@Autowiredprivate IUserService userService;/*** 发送短信*/@Override@Async(value = "CommonThreadPoolExecutor")public void sendSms(String mobile, String content) {try {Thread.sleep(5000);// xxxHandle.sendSms(mobile, content)log.info("发送短信至 {} 成功,短信内容:{}", mobile, content);} catch (Exception e) {log.error("发送短信至 {} 失败,异常信息:{}", mobile, e);}}/*** 发送邮件*/@Override@Async(value = "CommonThreadPoolExecutor")public void sendEmail(String email, String content) {try {Thread.sleep(5000);// xxxHandle.sendEmail(email, content)log.info("发送邮件至 {} 成功,邮件内容:{}", email, content);} catch (Exception e) {log.error("发送邮件至 {} 失败,异常信息:{}", email, e);}}/*** 发送验证码*/@Override@Async(value = "CommonThreadPoolExecutor")public Future<String> sendCode(String mobile) throws InterruptedException {Thread.sleep(3000);log.info("尊敬的开发者,Thread: [{}], 为您服务...", Thread.currentThread().getName());return new AsyncResult<>("发送验证码至 " + mobile + " 成功");}@Async("CommonThreadPoolExecutor")public void syncTasks() {userService.syncTasks();}@Async("CommonThreadPoolExecutor")public void asyncSaveTask(List<String> tasks, CountDownLatch countDownLatch) {try {Thread.sleep(1000);log.info("asyncSaveTask -> 线程名:{},保存数量为{}的任务成功", Thread.currentThread().getName(), tasks.size());} catch (Exception e) {e.printStackTrace();} finally {// 子线程在执行完任务后调用countDown()方法,将计数器减1countDownLatch.countDown();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/189321.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

正则表达式的基本语法

1.正则表达式基本语法 两个特殊的符号^和$。他们的作用是分别指出一个字符串的开始和结束。例子如下&#xff1a; "^The"&#xff1a;表示所有以"The"开始的字符串&#xff08;"There"&#xff0c;"The cat"等&#xff09;&#xff1…

SpringBoot Bean解析

Bean解析 IOC介绍 松耦合灵活性可维护 注解方式配置Bean 实现方式1: Component声明,直接类上进行添加注解, 同时保证包扫描能扫到即可实现方式2: 配置类中使用Bean Configuration public class BeanConfiguration implements SuperConfiguration{Bean("dog")Ani…

基于DigiThread的仿真模型调参功能

仿真模型调参是指通过调整模型内部的参数值&#xff0c;使仿真模型的输出更符合实际系统的行为或者预期结果的过程。 仿真过程中&#xff0c;往往需要频繁对模型参数进行调整&#xff0c;通过观察不同参数下系统整体的运行情况&#xff0c;实现系统的性能、可靠性和效率的优化…

一小时玩转【负载均衡】

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。 &#x1f60a; 座右铭&#xff1a;不…

搜索与回溯算法③

题目&#xff1a; 求0-9所有数字组成的k位不重复的数字。 说明&#xff1a;我们要找到所有不重复数字的k位组合。这个问题相对于前一个问题&#xff08;搜索与回溯算法②&#xff09;增加了一个约束&#xff1a;每个数字只能使用一次。这就需要在代码中加入剪枝逻辑来确保不…

初中数学网上考试系统的设计与实现

摘 要&#xff1a; 科技在人类的历史长流中愈洗愈精&#xff0c;不仅包括人们日常的生活起居&#xff0c;甚至还包括了考试的变化。之前的考试需要大量的时间和精力&#xff0c;组织者还需要挑选并考查结果&#xff0c;以及为了强制有效地进行考试所需要采取的一些步骤&#x…

RocketMQ Copilot 一款面向 Apache RocketMQ 的智能辅助运维系统

一、RocketMQ简介 ocketMQ是阿里巴巴研发的一款分布式消息中间件&#xff0c;后开源给Apache基金会&#xff0c;成为apache的顶级开源项目。它具有高性能、高可靠、高实时和分布式的特点。RocketMQ主要应用于解决应用耦合&#xff0c;消息分发&#xff0c;流量削锋等问题。 R…

flutter布局详解及代码示例(补充)

布局 基本布局 Container&#xff08;基本布局&#xff09;&#xff1a;最常见widgetPadding&#xff08;内边距布局&#xff09;&#xff1a;Container增加padding的布局Center&#xff08;居中布局&#xff09;&#xff1a;Container设置居中的布局Align&#xff08;对齐布…

Java零基础——vue篇

1.【熟悉】Vue简介 1.1 简介 它是一个构建用户界面的框架 Vue是一个前端框架 js jq https://www.pmdaniu.com/#file UI网站 UI 一般开发者使用蓝湖 工具 看着UI图 写接口 https://lanhuapp.com/web/#/item 是一个轻量级的MVVM&#xff08;Model-View-ViewModel&#xff0…

统计素数并求和(Python)

题目描述 统计素数并求和 本题要求统计给定整数 M M M 和 N N N 区间内素数的个数并对它们求和。 输入格式: 输入在一行中给出两个正整数 M M M 和 N ( 1 ≤ M ≤ N ≤ 500 ) N(1≤M≤N≤500) N(1≤M≤N≤500)。 输出格式: 在一行中顺序输出 M M M 和 N N N 区间内…

(使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))

使用vite搭建vue3项目&#xff08;vite vue3 vue router pinia element plus&#xff09; 初始化项目安装依赖&#xff0c;运行项目初始配置 初始化项目 1.需要在创建项目的位置cmd目录下执行 2. npm init vitelatest 回车 npm init vitelatest3.填上自己的项目名称 回车…

【开源】基于JAVA的厦门旅游电子商务预订系统

项目编号&#xff1a; S 030 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S030&#xff0c;文末获取源码。} 项目编号&#xff1a;S030&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 景点类型模块2.2 景点档案模块2.3 酒…

Django的回顾的第4天

1.模型层 1.1简介 你可能已经注意到我们在例子视图中返回文本的方式有点特别。 也就是说&#xff0c;HTML被直接硬编码在 Python代码之中。 def current_datetime(request):now datetime.datetime.now()html "<html><body>It is now %s.</body><…

[网鼎杯 2020 青龙组]singal 1

前言 在主函数中找到了一个vm的译码器&#xff0c;译码器主要是解释传入的opcode&#xff0c;然后对我们输入的字符操作&#xff0c;这里我们发现他是单字节比较的&#xff0c;方法很多可以使用单字节映射&#xff0c;也可以是使用符号化执行&#xff0c;当然也可以硬着头皮去…

17、神经网络的性能以及那些框架存在的意义

前几节,我们介绍了推理和训练的大致过程,以及训练过程中要用损失函数来作为评判预测值和真实值差距的标准。 在很多时候,一个神经网络从开始训练到训练完成是要经过很长的时间的,这是因为模型需要不断的校正自己学习到的参数,直到最终loss值降为0。 如果一轮迭代训练耗时…

canvas基础:渲染文本

canvas实例应用100 专栏提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。 canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重要的帮助。 文章目录 示例…

Postman如何导入和导出接口文件

本文介绍2种导出和导入的操作方法&#xff1a;一种是分享链接&#xff0c;导入链接的方式&#xff08;需要登录&#xff09;&#xff1b;另一种是导出json文件&#xff0c;再次导入。下面将详细介绍。 由于第一种分享链接&#xff0c;导入链接的方式需要登录&#xff0c;所以推…

jsp 分页查询展示,实现按 上一页或下一页实现用ajax刷新内容

要实现按上一页或下一页使用 Ajax 刷新内容&#xff0c;可以按照以下步骤进行操作&#xff1a; 1. 在前端页面中添加两个按钮&#xff0c;分别为“上一页”和“下一页”。当用户点击按钮时&#xff0c;触发 Ajax 请求。 2. 在后端控制器中接收 Ajax 请求&#xff0c;并根据传…

KNN回归-GridSearchCV模型调优(波士顿房价)

数据集简介 数据介绍 波士顿房价数据集(Boston Housing Dataset) 是一个经典的用于回归分析的数据集。它包含了波士顿地区506个街区的房价信息以及与房价相关的13个特征。这个数据集的目标是根据这些特征来预测波士顿地区房屋的中位数价格(以千美元为单位) 数据说明 Data S…

Vue 3.0 组合式API 生命周期钩子

文章目录 前言配置项api图表on配置项api后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;vue.js &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出现错误&#xff0…