Springboot 多线程分批切割处理 大数据量List集合 ,实用示例
Java使用多线程批次查询大量数据(Callable返回数据)方式
问题背景
业务要求对未完工的几十万甚至百万工单进行发短信提醒以及消息通知。所以每次查询需要将这海量数据查询出来。如果使用单线程,将会导致接口卡顿,影响用户体验。而且持久层框架使用in()方法也会有查询参数的限制。
解决思路
通过拆分查询参数列表的方式,按照需求拆分成多个等量大小的子列表进行in()查询。使用到的工具类如下
- hutool工具类中的拆分方法,ListUtil.split(list, 1000)
- CompletableFuture 异步调用线程
- ThreadPoolTaskExecutor 线程池
技术实现
线程池配置类
@Configuration
@Slf4j
public class ThreadPoolConfig {@Bean(name = "threadPoolTaskExecutor")public ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(10);threadPoolTaskExecutor.setQueueCapacity(200);threadPoolTaskExecutor.setMaxPoolSize(100);threadPoolTaskExecutor.setThreadNamePrefix("sendMsg");threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}
}
逻辑方法
List<Order> orders = new ArrayList<>();
List<CompletableFuture<List<Order>>> futureList = new ArrayList<>();
//海量列表进行相关拆分成多个等量子列表
List<List<Order>> split = ListUtil.split(list, 1000);
for (List<Order> each : split) {CompletableFuture<List<Order>> future = CompletableFuture.supplyAsync(() -> {//业务处理...}, sendMsgThreadPool);futureList.add(future);
}
//1等待所有子线程处理完成,或者使用方式2等待
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
try {//合并数据for (CompletableFuture<List<Order>> each : futureList) {if (Objects.nonNull(each) && !CollectionUtils.isEmpty(each.get())) {//2设置等待每个子线程的超时时间orders.addAll(each.get(10, TimeUnit.SECONDS));}}
} catch (Exception e) {//异常
}
拓展
当查询参数过多(超过1000个或者超过表中比例25%)的时候,有可能mysql优化器会选择全表扫描的方式导致索引失效。所以每次in方法参数不建议超过1000个
如果涉及到多线程之间数据合并,建议使用join或者CompletableFuture类下的allOf()方法等待所有线程处理完毕再进行合并。如果使用静态变量,可能会引发ConcurrentModificationException异常。