优点
ThreadPoolExecutor 提供了强大的灵活性和自定义参数的能力,可以根据实际需求来灵活配置线程池的行为。
位置
java.util.concurrent 包下
构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)\public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
核心参数
corePoolSize:线程池的基本大小,即使没有任务执行时也保持存活的线程数。
maximumPoolSize:线程池所能容纳的最大线程数(包括核心线程)。
keepAliveTime:非核心线程空闲等待新任务的最长时间,在这个时间内如果没有新任务提交,那么非核心线程将会被终止。
unit:与keepAliveTime搭配使用的单位,如TimeUnit.SECONDS或TimeUnit.MILLISECONDS等。
workQueue:任务队列,用于保存待处理的任务,可以是无界队列(例如LinkedBlockingQueue)或有界队列(例如ArrayBlockingQueue)。
threadFactory:创建新线程的工厂,默认使用Executors.defaultThreadFactory(),也可以自定义来设置线程名称、优先级等属性。
handler:拒绝策略,当线程池和工作队列都已满,无法接收新的任务时,会调用此策略进行处理。AbortPolicy(直接抛出异常);CallerRunsPolicy(由调用者线程自己执行任务);DiscardPolicy(直接丢弃新任务);DiscardOldestPolicy(丢弃队列中最旧的任务并尝试提交当前任务)。
线程池工作流程
当一个任务提交到ThreadPoolExecutor时:
如果当前运行的线程数量小于corePoolSize,则创建一个新的线程来执行任务。
若当前线程数等于corePoolSize但小于maximumPoolSize,且工作队列已满,则创建新的线程来执行任务。
如果线程数已经达到maximumPoolSize并且工作队列也满了,则根据RejectedExecutionHandler策略处理该任务。
线程池状态
线程池具有五种状态:RUNNING、SHUTDOWN、STOP、TIDYING 和 TERMINATED。
状态转换通常发生在调用诸如shutdown(), shutdownNow(), 或者所有任务完成后
重要方法
shutdown():线程池不再接受新的任务,但会继续处理已经提交的任务。
shutdownNow():尝试停止所有正在执行的任务,并返回尚未开始执行的任务列表。
execute(Runnable command):提交一个Runnable任务给线程池执行。
submit(Callable<T> task):提交一个Callable任务,能够返回结果。
awaitTermination(long timeout, TimeUnit unit):阻塞当前线程,直到线程池关闭或者超时发生。
isTerminated():检查线程
示例
示例1:
package org.springblade.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
public class Test {@org.junit.jupiter.api.Testpublic void ThreadPoolExecutorExample() {// 自定义线程工厂ThreadFactory namedThreadFactory = new NamedThreadFactory();// 创建有界任务队列BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);// 定义拒绝策略RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(2, // 核心线程数4, // 最大线程数60L, // 空闲线程存活时间(单位:秒)TimeUnit.SECONDS, // 时间单位workQueue, // 工作队列namedThreadFactory, // 线程工厂handler // 拒绝策略);// 提交任务到线程池for (int i = 0; i < 20; i++) {final int taskId = i;Runnable worker = new Runnable() {@Overridepublic void run() {System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());try {Thread.sleep(500); // 模拟耗时操作} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Task " + taskId + " finished");}};executor.execute(worker);}// 关闭线程池executor.shutdown();while (!executor.isTerminated()) {// 等待所有任务执行完毕}System.out.println("所有任务已完成,线程池已关闭");}
}/*** 定义线程创建逻辑,设置线程名称前缀等*/
class NamedThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;NamedThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "MyTaskPool-" + poolNumber.getAndIncrement() + "-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
}
//我们提交了20个任务到线程池。由于工作队列容量有限(10),当队列满后会尝试增加线程直到达到最大线程数(4)。
//如果在达到最大线程数后仍有新的任务提交,将根据指定的拒绝策略处理这些任务。
// 在本例中,我们选择了 CallerRunsPolicy,这意味着超出线程池处理能力的任务将会由调用者线程(主线程)直接执行。
//执行结果
//Task 0 is running by MyTaskPool-1-thread-1
// Task 14 is running by main
// Task 1 is running by MyTaskPool-1-thread-2
// Task 13 is running by MyTaskPool-1-thread-4
// Task 12 is running by MyTaskPool-1-thread-3
// Task 12 finished
// Task 1 finished
// Task 0 finished
// Task 13 finished
// Task 14 finished
// Task 4 is running by MyTaskPool-1-thread-3
// Task 19 is running by main
// Task 5 is running by MyTaskPool-1-thread-4
// Task 3 is running by MyTaskPool-1-thread-1
// Task 2 is running by MyTaskPool-1-thread-2
// Task 5 finished
// Task 4 finished
// Task 19 finished
// Task 3 finished
// Task 8 is running by MyTaskPool-1-thread-1
// Task 2 finished
// Task 9 is running by MyTaskPool-1-thread-2
// Task 7 is running by MyTaskPool-1-thread-3
// Task 6 is running by MyTaskPool-1-thread-4
// Task 8 finished
// Task 10 is running by MyTaskPool-1-thread-1
// Task 6 finished
// Task 9 finished
// Task 7 finished
// Task 15 is running by MyTaskPool-1-thread-2
// Task 11 is running by MyTaskPool-1-thread-4
// Task 16 is running by MyTaskPool-1-thread-3
// Task 10 finished
// Task 17 is running by MyTaskPool-1-thread-1
// Task 11 finished
// Task 15 finished
// Task 16 finished
// Task 18 is running by MyTaskPool-1-thread-4
// Task 17 finished
// Task 18 finished
// 所有任务已完成,线程池已关闭
示例2:
package org.springblade.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Slf4j
public class Test {@org.junit.jupiter.api.Testpublic void ThreadPoolExecutorExample() {// 定义核心线程数、最大线程数、空闲线程存活时间以及时间单位int corePoolSize = 5;int maximumPoolSize = 10;long keepAliveTime = 60; // 60秒TimeUnit unit = TimeUnit.SECONDS;// 创建一个有界的LinkedBlockingQueue作为任务队列,容量为20BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);// 提交任务到线程池for (int i = 0; i < 30; i++) {final int taskId = i;Runnable worker = () -> {System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());try {Thread.sleep(1000); // 模拟耗时操作} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Task " + taskId + " finished");};executor.execute(worker);}// 关闭线程池(这里没有立即关闭,而是等待所有任务执行完毕)executor.shutdown();while (!executor.isTerminated()) {// 等待所有任务执行完毕}System.out.println("所有任务已完成,线程池已关闭");}
}//执行结果
// Task 0 is running by pool-1-thread-1
// Task 1 is running by pool-1-thread-2
// Task 3 is running by pool-1-thread-4
// Task 2 is running by pool-1-thread-3
// Task 4 is running by pool-1-thread-5
// Task 25 is running by pool-1-thread-6
// Task 26 is running by pool-1-thread-7
// Task 27 is running by pool-1-thread-8
// Task 29 is running by pool-1-thread-10
// Task 28 is running by pool-1-thread-9
// Task 2 finished
// Task 0 finished
// Task 3 finished
// Task 5 is running by pool-1-thread-1
// Task 6 is running by pool-1-thread-4
// Task 7 is running by pool-1-thread-3
// Task 1 finished
// Task 8 is running by pool-1-thread-2
// Task 4 finished
// Task 9 is running by pool-1-thread-5
// Task 25 finished
// Task 10 is running by pool-1-thread-6
// Task 27 finished
// Task 28 finished
// Task 26 finished
// Task 13 is running by pool-1-thread-7
// Task 29 finished
// Task 14 is running by pool-1-thread-10
// Task 11 is running by pool-1-thread-8
// Task 12 is running by pool-1-thread-9
// Task 7 finished
// Task 15 is running by pool-1-thread-3
// Task 5 finished
// Task 16 is running by pool-1-thread-1
// Task 6 finished
// Task 17 is running by pool-1-thread-4
// Task 8 finished
// Task 18 is running by pool-1-thread-2
// Task 9 finished
// Task 19 is running by pool-1-thread-5
// Task 10 finished
// Task 20 is running by pool-1-thread-6
// Task 12 finished
// Task 21 is running by pool-1-thread-9
// Task 14 finished
// Task 11 finished
// Task 22 is running by pool-1-thread-8
// Task 23 is running by pool-1-thread-10
// Task 13 finished
// Task 24 is running by pool-1-thread-7
// Task 15 finished
// Task 17 finished
// Task 16 finished
// Task 18 finished
// Task 19 finished
// Task 20 finished
// Task 23 finished
// Task 22 finished
// Task 24 finished
// Task 21 finished