自定义线程池
自定义线程池好处参考:
线程是稀缺资源,如果被无限制的创建,不 仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、 调优和监控的框架。
自定义线程池ThreadPoolExecutor_自定义threadpoolexecutor-CSDN博客
代码:
package com.insigma.business.dwbt.qyssyh.help;/*** <简述>* <详细描述>** @author syf* @date 2024年06月24日 16:58*/import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class ThreadPoolConfig {/*** 线程池维护线程所允许的空闲时间(单位秒)*/private int keepAliveSeconds = 120;/*** 时间单位,这里设为秒*/private final TimeUnit unit = TimeUnit.SECONDS;/*** 核心线程数 = cpu 核心数 + 1*/private final int core = Runtime.getRuntime().availableProcessors() + 1;/*** 最大线程数,可以根据具体需求调整*/private final int maximumPoolSize = 2 * Runtime.getRuntime().availableProcessors() + 1;/*** 队列最大长度*/private int queueCapacity = 150;/*** 阻塞队列,这里使用有界LinkedBlockingQueue* 设置容量为1000,超过这个数量的元素将被阻塞,直到队列中有可用空间*/private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(queueCapacity);@Beanpublic ThreadPoolExecutor threadPoolTaskExecutor() {ThreadPoolExecutor executor = new ThreadPoolExecutor(core,maximumPoolSize,keepAliveSeconds,unit,workQueue);/** ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)* ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,如果执行器已关闭,则丢弃.*/executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}}
一、多线程循环处理数据
场景:集合总数1万条,每次处理2000条。则需要开启五个线程处理场景。
//dd38NO 假设一万条数据集合List<DD38DTO> dd38NO = new ArrayList();// 每批处理的数据量(限制10000条数据,最多调用五个线程)int batchSize = 2000;// 计算完整的批次数量(没有余数的批次)int totalBatches = dd38NO.size() / batchSize;// 检查是否有余数来确定是否需要额外的批次if (dd38NO.size() % batchSize != 0) {// 如果有余数,则增加一个批次来处理剩余的记录totalBatches++;}// 存放所有CompletableFuture的列表List<CompletableFuture<List<DD38DTO> >> futuresAll = new ArrayList<>();// 循环处理每个批次for (int i = 0; i < totalBatches; i++) {int start = i * batchSize;int end = Math.min(start + batchSize, dd38NO.size());List<DD38DTO> dd38DTOS = dd38NO.subList(start, end);// 提交异步任务到线程池CompletableFuture<List<DD38DTO> > future = CompletableFuture.supplyAsync(() -> {//checkPersons 业务逻辑List<DD38DTO> list = checkPersons(dd38DTOS);return list;}, executor).exceptionally(ex -> {log.error("发生异常: " , ex.getMessage());return Collections.EMPTY_LIST;});futuresAll.add(future);}// 等待所有异步任务完成CompletableFuture.allOf(futuresAll.toArray(new CompletableFuture[0])).join();//allList 处理完的集合List<DD38DTO> allList = new ArrayList<>();for (CompletableFuture<List<DD38DTO> > future : futuresAll) {List<DD38DTO> dd38DTOS = dd38DTOS = future.get();allList.addAll(dd38DTOS);}
二、查询商品
场景:查询商品info
之后根据商品id ,分别查询sku、 spu 等信息
sku、 spu 操作则可以异步执行。
@Overridepublic List<SkuInfoEntity> getSkusBySpuId(Long spuId) {return this.list(new QueryWrapper<SkuInfoEntity>().eq("spu_id", spuId));}@Overridepublic SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {SkuItemVo skuItemVo = new SkuItemVo();//开启线程 supplyAsync返回结果CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {//1、sku基本信息的获取 pms_sku_infoSkuInfoEntity info = this.getById(skuId);skuItemVo.setInfo(info);return info;}, executor);//infoFuture 结束 异步执行CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {//3、获取spu的销售属性组合List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());skuItemVo.setSaleAttr(saleAttrVos);}, executor);//infoFuture 结束 异步执行CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {//4、获取spu的介绍 pms_spu_info_descSpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());skuItemVo.setDesc(spuInfoDescEntity);}, executor);//infoFuture 结束 异步执行CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {//5、获取spu的规格参数信息List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());skuItemVo.setGroupAttrs(attrGroupVos);}, executor);// Long spuId = info.getSpuId();// Long catalogId = info.getCatalogId();//2、sku的图片信息 pms_sku_images 新开 runAsync 没有返回结果CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);skuItemVo.setImages(imagesEntities);}, executor);/*CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {//3、远程调用查询当前sku是否参与秒杀优惠活动R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);if (skuSeckilInfo.getCode() == 0) {//查询成功SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {});skuItemVo.setSeckillSkuVo(seckilInfoData);if (seckilInfoData != null) {long currentTime = System.currentTimeMillis();if (currentTime > seckilInfoData.getEndTime()) {skuItemVo.setSeckillSkuVo(null);}}}}, executor);*///等到所有任务都完成//CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture).get();return skuItemVo;}