1 需求
在项目开发中需要处理100万多的数据,这些数据需要从mysql数据库中读取出来,再通过调用其他平台的接口推送数据。由于时间紧迫,数据需要在短时间内完成推送,采用单线程推送很慢,所以采用多线程推送来提高效率。
2 配置多线程
2.1 application.yml
thread-pool:core-pool-size: 4max-pool-size: 16queue-capacity: 80keep-alive-seconds: 120
2.2 创建ThreadPoolProperties
import lombok.Data;
import org.springframework.stereotype.Component;
import org.springframework.boot.context.properties.ConfigurationProperties;@Data
@Component
@ConfigurationProperties(prefix = "thread-pool")
public class ThreadPoolProperties {/*** 线程池创建时候初始化的线程数*/private int corePoolSize;/*** 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程*/private int maxPoolSize;/*** 用来缓冲执行任务的队列*/private int queueCapacity;/*** 允许线程的空闲时间:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁*/private int keepAliveSeconds;
}
2.3 创建ThreadPoolConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@EnableAsync
@Configuration
public class ThreadPoolConfig {private final ThreadPoolProperties threadPoolProperties;@Autowiredpublic ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {this.threadPoolProperties = threadPoolProperties;}@Bean(name = "threadPoolTaskExecutor")public ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());executor.setThreadNamePrefix("thread-pool-");return executor;}
}
3 多线程批量数据处理
public RequestResult multiThreadPush() {List<HistoryStudent> historyStudentList = historyStudentMapper.getList(0, 65867);// 分割集合List<List<HistoryStudent>> partitionData = partitionData(historyStudentList, 4);ThreadPoolTaskExecutor executor = SpringUtil.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);// 计数器CountDownLatch latch = new CountDownLatch(partitionData.size());for (List<HistoryStudent> historyStudents : partitionData) {executor.execute(() -> {try {for (HistoryStudent historyStudent : historyStudents) {// 单个数据处理//processSingleData(historyStudent);}} catch (Exception e) {e.printStackTrace();} finally {latch.countDown();}});}try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}return RequestResult.success();
}
private List<List<HistoryStudent>> partitionData(List<HistoryStudent> dataList, int partitionSize) {List<List<HistoryStudent>> partitions = new ArrayList<>();int size = dataList.size();int batchSize = size / partitionSize;for (int i = 0; i < partitionSize; i++) {int fromIndex = i * batchSize;int toIndex = (i == partitionSize - 1) ? size : fromIndex + batchSize;partitions.add(dataList.subList(fromIndex, toIndex));}return partitions;
}
4 参考博客
Java多线程批量处理、线程池的使用
Java多线程处理大批量数据
java多线程批量处理数据