一、阿里Java开发规范,为啥禁止直接使用Executors创建线程池
- newFixdThreadPool 及 singleThreadPool 中默认队列长度为 Integer.MAX_VALUE,如果线程执行比较耗时,执行任务的线程在队列中产生大量堆积,进而有导致虚拟机OOM 的风险。
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}//默认长度 Integer.MAX_VALUEpublic LinkedBlockingQueue() {this(Integer.MAX_VALUE);}
- newCachedThreadPool 允许创建最大线程数量为 Integer.MAX_VALUE,当大量任务需要执行时,可能会导致大量线程的创建,每个线程默认占1M,线程占据资源过多或队列中线程产生堆积,导致 CPU 过高或 java虚拟机 OOM 的问题 。
// 最大线程数 默认为:Integer.MAX_VALUEpublic static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
- 使用ThreadPoolExecutor 来构造线程池, 建议明确指定线程池的核心参数:最小线程数、最大线程数,队列长度、拒绝策略等等,确保线程池运稳定运行。
二、如何合理配置线程池的大小
- CPU 密集型: 主要执行计算任务,响应时间较快,任务 cpu 的利用率很高。线程数的配置由服务器 CPU 核心数来决定, 建议CPU 核心数=同时最大执行线程数,如 CPU 核心数为 8,表明服务器最多能同时执行 8个线程。创建过多的线程反而会导致线程之间上下文切换,降低执行效率。推荐:线程池的最大线程数= cpu 核心数+1。
- IO 密集型: 主要进行 IO 操作,执行 IO 操作(文件读写、网络通信等)相对比较耗时,cpu相对处于空闲状态, 导致 cpu 的利用率不高,建议可以增加线程的数量。IO密集型操作建议结合线程的等待时长来做判断,等待时间越长,线程数相对配置的越多。一般建议配置 cpu 核心数的 2 倍。 基础公式:线程池设定最佳线程数目 = ((线程池配置的线程等待时间+线程 CPU 时间)/ 线程 CPU 时间 )* CPU 数目 。公式中线程 cpu等待时间是计算出程序中单个线程在 cpu 上运行的时间。
三、线程池中的线程的初始化
线程池创建后,线程池默认没有初始化,简而言之就是线程池中没有线程,在提交任务之后才会创建线程。 在并发系统中,为提高系统的吞吐量,建议在线程池创建之后立即创建线程。通过以下两个方法可以实现线程的初始化:
ExecutorService executorService = new ThreadPoolExecutor(10,20,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());ThreadPoolExecutor threadPoolExecutor=(ThreadPoolExecutor)executorService;//方式1 初始化所有核心线程threadPoolExecutor.prestartAllCoreThreads();//方式2 初始化一个核心线程threadPoolExecutor.prestartCoreThread();
四、线程池的关闭
- shutdown():不会立即终止线程池,等所有任务队列中的任务都执行完后才终止,同时也不会接受新的任务 。
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务且清空任务队列,返回未执行的任务。
五、线程池核心参数支持动态调整及线程池监控
- setCorePoolSize:设置核心线程数
public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0)throw new IllegalArgumentException();int delta = corePoolSize - this.corePoolSize;this.corePoolSize = corePoolSize;if (workerCountOf(ctl.get()) > corePoolSize)interruptIdleWorkers();else if (delta > 0) {// We don't really know how many new threads are "needed".// As a heuristic, prestart enough new workers (up to new// core size) to handle the current number of tasks in// queue, but stop if queue becomes empty while doing so.int k = Math.min(delta, workQueue.size());while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())break;}}}
- setMaximumPoolSize:设置线程池最大线程数目
public void setMaximumPoolSize(int maximumPoolSize) {if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)throw new IllegalArgumentException();this.maximumPoolSize = maximumPoolSize;if (workerCountOf(ctl.get()) > maximumPoolSize)interruptIdleWorkers();}
- beforeExecute: 获取执行之前的核心参数
- afterExecute: 获取执行之后的核心参数
protected void beforeExecute(Thread t, Runnable r) {BlockingQueue<Runnable> blockingQueue = getQueue();log.info("执行前! 线程池名称:{},初始线程数:{},核心线程数:{},活跃的任务数量:{},已经执行的任务数:{},任务总数:{},允许最大的线程数:{}," +"线程允许空闲时间:{},队列的容量:{}",name,getPoolSize(),getCorePoolSize(),getActiveCount(),getCompletedTaskCount(),getTaskCount(),getMaximumPoolSize(),getKeepAliveTime(TimeUnit.MILLISECONDS),(blockingQueue.size() + blockingQueue.remainingCapacity()));}/*** @description: 线程监控使用* @param: r* @param: t* @return: void* @author: * @date: 2022/1/17 16:58*/protected void afterExecute(Runnable r, Throwable t) {BlockingQueue<Runnable> blockingQueue = getQueue();log.info("执行后! 线程池名称:{},初始线程数:{},核心线程数:{},活跃的任务数量:{},已经执行的任务数:{},任务总数:{},允许最大的线程数:{}," +"线程允许空闲时间:{},队列的容量:{}",name,getPoolSize(),getCorePoolSize(),getActiveCount(),getCompletedTaskCount(),getTaskCount(),getMaximumPoolSize(),getKeepAliveTime(TimeUnit.MILLISECONDS),(blockingQueue.size() + blockingQueue.remainingCapacity()));}
六、推荐开源线程池监控 Hippo4j
- 动态可观测线程池框架,为业务系统提高线上运行保障能力
官网:Hippo4j | Hippo4j