SpringBoot异步任务(2)|(线程池使用)
文章目录
- SpringBoot异步任务(2)|(线程池使用)
- @[TOC]
- 前言
- 一、使用场景
- 二、springboot添加异步任务
- 1.配置线程池
- 2.线程池的使用
- 总结
文章目录
- SpringBoot异步任务(2)|(线程池使用)
- @[TOC]
- 前言
- 一、使用场景
- 二、springboot添加异步任务
- 1.配置线程池
- 2.线程池的使用
- 总结
章节
第一章链接: SpringBoot异步任务(1)|(异步任务执行以及回调)
前言
线程池开启异步任务在springboot中的使用
一、使用场景
项目中有一个批量调度的任务,客户上传批量的文章,让后将这些文章去进行任务处理
二、springboot添加异步任务
1.配置线程池
在springboot容器中配置线程池,后续使用直接将bean注入使用即可
@Configuration
@EnableAsync
public class ExecutorEmbPoolConfig {private static final Logger logger = LoggerFactory.getLogger(ExecutorEmbPoolConfig.class);@Value("${embedding.pool.corePoolSize:20}")private int corePoolSize = 20;@Value("${embedding.pool.maxPoolSize:20}")private int maxPoolSize = 20;@Value("${embedding.pool.queueCapacity:100000}")private int queueCapacity = 100000;private String namePrefix = "embedding-service-";@Bean(name = "embeddingServiceExecutor")public ThreadPoolTaskExecutor asyncServiceExecutor() {logger.debug("start embedding embeddingServiceExecutor");ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// 允许回收核心线程executor.setAllowCoreThreadTimeOut(true);// CALLER_RUNS: 不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//执行初始化executor.initialize();return executor;}
}
2.线程池的使用
@Resource(name = "embeddingServiceExecutor")private ThreadPoolTaskExecutor executor;@Scheduled(cron = "0/30 * * * * ?")public void FileToMilvesJob() {//定义计数器List<DocumentMilvusRecord> documentMilvusRecords = recordService.findByStatus(RecordStatus.WAIT);if (CollectionUtils.isEmpty(documentMilvusRecords)) {return;}List<DocumentMilvusRecord> excuteList;if (documentMilvusRecords.size() > 50) {excuteList = documentMilvusRecords.subList(0, 50);} else {excuteList = documentMilvusRecords;}log.info("本次任务需要执行任务“{}条", excuteList.size());for (DocumentMilvusRecord record : excuteList) {recordService.updateRecordStatus(record);executor.execute(() -> {try {docEmbeddingCreate(record); // 执行业务逻辑} catch (Exception e) {log.error(e.getMessage());}});}}
总结
上面的方式实现了自定义一个线程池,然后执行任务的时候获取线程池并执行任务。