Java并发编程-线程池
- 线程池运行原理
- 线程池生命周期
- 线程池的核心参数
- 线程池的阻塞队列
- 线程池的拒绝策略
- 线程池的种类
- newFixedThreadPool
- newSingleThreadExecutor
- newCachedThreadPool
- newScheduledThreadPool
- 创建线程池
- jdk的Executors(不建议,会导致OOM)
- jdk的ThreadPoolExecutor(阿里开发手册推荐)
- Spring内置的ThreadPoolTaskExecutor(开发常用)
线程池(Thread Pool) 是一种并发编程技术,用于管理一组线程,以便复用这些线程来执行多个任务。使用线程池的核心目的就是用来减少线程的创建和销毁的开销,从而能提高系统的响应性能,同时线程池对线程的管理也能避免线程创建过多导致内存溢出。
线程池运行原理
线程池生命周期
- running(运行状态): 会接收新任务并且会处理队列中的任务
- shutdown(关闭状态): 不会接收新任务并且会处理队列中的任务,任务处理完之后会中断所有线程
- stop(停止状态): 不会接收新任务并且不会处理队列中的任务,并且会直接中断所有线程
- tidying(整理状态): 所有线程都停止之后,线程池的状态就会转为tidying,一旦达到此状态,就会调用线程池的terminated(),此方法内部是空的,可由程序员自定义
- terminated(终止状态): terminated()执行完之后就会转变为terminated状态
线程池的核心参数
new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)
- corePoolSize(核心线程数): 即使空闲也不会被回收的线程数量;
- maximumPoolSize(最大线程数): 线程池允许创建的最大线程数(含核心线程);
- keepAliveTime(空闲线程存活时间): 非核心线程空闲时的存活时间(超时后回收);
- unit(时间单位): keepAliveTime 的时间单位(如秒、毫秒);
- workQueue(任务队列): 用于缓存未执行任务的阻塞队列;
- ThreadFactory(线程工厂): 这是一个接口,用于创建新的线程,可自定义线程创建方式;
- RejectedExecutionHandler(拒绝策略): 当任务队列满且线程数达到上限时的处理策略;
应用程序大致可分为两种类型,IO密集型任务和CPU密集型任务。
IO密集型任务,一般指文件读写、DB读写、网络请求等,核心线程数大小设置为2N+1
CPU密集型任务,一般指计算型代码、Bitmap转换、Gson转换等,特点是高并发,任务执行时间短,核心线程数大小设置为N+1,可减少线程上下文的切换。
可通过这段代码获取CPU的逻辑线程数 int poolSize = Runtime.getRuntime().availableProcessors();
线程池的阻塞队列
workQueue:当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务。
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO
- LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO
- DelayedWorkQueue:一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
LinkedBlockingQueue | ArrayBlockingQueue |
---|---|
默认无界,支持有界 | 强制有界 |
底层是链表 | 底层是数组 |
是懒惰的,创建节点的时候添加数据 | 提前初始化 Node 数组 |
入队会生成新 Node | Node需要是提前创建好的 |
两把锁(头尾) | 一把锁 |
线程池的拒绝策略
- AbortPolicy: 默认的拒绝策略,当任务无法提交的时候就会抛出 RejectedExecutionException 异常
- CallerRunsPolicy: 当任务无法提交的时候就会把这个任务交给调用线程执行,这样就能避免任务被丢弃,但是有可能会导致调用者线程被阻塞
- DiscardPolicy: 当任务无法提交时,直接丢弃该任务,不做任何处理,如果任务不重要就可以用这个拒绝策略直接丢弃
- DiscardOldestPolicy: 当任务无法提交时,丢弃任务队列中最旧的任务,然后尝试重新提交当前任务,适合在需要尽快处理新任务的情况下使用
线程池的种类
newFixedThreadPool
Executors.newFixedThreadPool(nThreads); 创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待,特点如下:
- 核心线程数和最大线程数相同,意味者线程池能处理的任务就是 核心线程数 + 队列长度
- 队列使用了 LinkedBlockingQueue,长度没有设置,意味者里面用了一个无界队列,需要注意任务过多导致的内存溢出的问题
- 适用于任务量已知,相对耗时的任务
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor
Executors.newSingleThreadExecutor(); 创建一个单线程的线程池,所有任务将按照提交的顺序依次执行,特点如下:
- 核心线程数为 1,最大线程数是 1,意味者只有一个线程
- 队列使用了 LinkedBlockingQueue,容量没有限制,需要注意任务过多导致的内存溢出的问题
- 适用于对执行顺序有要求的任务
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool
Executors.newCachedThreadPool(); 创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程,特点如下:
- 核心线程数为 0,最大线程数是 Integer.MAX_VALUE,这意味着能处理的任务数没有限制,同时创建出来的线程 60S 内没有处理任务就会被回收掉
- 队列使用了 SynchronousQueue,不存放任务,需要注意线程创建过多导致的内存溢出的问题
- 适合处理大量短期任务,也就是任务数比较密集,但每个任务执行时间较短的情况
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
newScheduledThreadPool
Executors.newScheduledThreadPool(corePoolSize); 创建一个可调度任务的线程池,适用于需要延迟执行或定期执行任务的场景,特点如下:
- 核心线程数可设置,最大线程数是 Integer.MAX_VALUE,意味者可以接收的任务没有限制
- 队列使用了 DelayedWorkQueue,支持延迟执行和定期执行任务,需要注意线程过多导致的内存溢出的问题
- 适合用于执行一些定时任务的场景,比如定时提醒
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// scheduled 提交任务到线程池中。
// 参数一,提交的任务;参数二,任务执行的延迟时间;参数三,时间单位
scheduled.schedule(() -> {System.out.println("1234");
}, 10, TimeUnit.SECONDS);
创建线程池
jdk的Executors(不建议,会导致OOM)
创建方式参考【线程池的种类】这一章节。Executors返回的线程池对象的弊端如下:
- FixedThreadPool和SingleThreadPool
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM - CachedThreadPool和ScheduledThreadPool
允许的创建线程数量为 Iteger.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
jdk的ThreadPoolExecutor(阿里开发手册推荐)
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 1;
TimeUnit unit = TimeUnit.MINUTES;
// 阻塞队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
// 线程池工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 异常处理策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
// 手动构造线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
使用线程池,并发获取数据并,整合到一起
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;public class ApplicationMainTest {// 手动构造线程池private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));public static void main(String[] args) {// 使用线程安全的 ConcurrentLinkedDeque 存储查询结果集ConcurrentLinkedDeque<List<Integer>> resultList = new ConcurrentLinkedDeque<>();threadPoolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 开始工作...");List<Integer> queryList = buildQueryData(); // 模拟数据查询操作System.out.println(queryList);resultList.offer(queryList);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成!!!");});threadPoolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 开始工作...");List<Integer> queryList = buildQueryData(); // 模拟数据查询操作System.out.println(queryList);resultList.offer(queryList);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成!!!");});// 关闭线程池threadPoolExecutor.shutdown();while (!threadPoolExecutor.isTerminated()) { // 检查所有任务是否完成System.out.println("等待所有任务完成...");try {Thread.sleep(100); // 每100毫秒检查一次} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("所有任务已完成");System.out.println("分隔符==============");// 汇总结果List<Integer> mergedList = new ArrayList<>();for (List<Integer> list : resultList) {mergedList.addAll(list);}System.out.println("Merged List: " + mergedList);}private static List<Integer> buildQueryData() {List<Integer> list = new ArrayList<>();Random rand = new Random();for (int i = 0; i < 3; i++) {list.add(rand.nextInt(100));}return list;}}
Spring内置的ThreadPoolTaskExecutor(开发常用)
Spring框架内置线程池
package cn.study.com.configbean;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@EnableAsync
public class AsyncScheduledTaskConfig implements AsyncConfigurer {@Bean("checkAsync") // 将当前方法返回的对象,存到容器里public Executor scheduledTaskAsync() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//最大线程数executor.setMaxPoolSize(10);//核心线程数executor.setCorePoolSize(5);//任务队列大小executor.setQueueCapacity(5);//线程存活时间 当超过30s后,线程池中存有的线程数量大于核心线程,触发超时回收executor.setKeepAliveSeconds(30);//拒绝处理策略:回收最老的任务executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());// or 自定义拒绝策略【1.记录错误信息 2.通知消息系统,发送告警信息】executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {}});executor.setThreadNamePrefix("Async-Thread-Pool-");//初始化executor.initialize();return executor;}
}
使用注解,通过线程池异步执行方法。
@Async("checkAsync")
public Boolean deleteFolder(String bucketName, String path) {log.info("当前Minio文件夹清除线程:"+Thread.currentThread().getName());MinioFileManager minioFileManager = new MinioFileManager(minioHost, accessKey, secretKey);return minioFileManager.deleteFolder(bucketName, path);
}
依赖注入,并发执行任务。
@Autowired
private ExecutorService executorService;Future<List<User>> f1 = executorService.submit(()-> {List<User> userList = queryUserList();return userList;
});
Future<List<Order>> f2 = executorService.submit(()-> {List<Order> orderList = queryOrderList();return orderList;
});
// 汇总信息
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("user", f1.get());
resultMap.put("order", f2.get());