摘要
本文介绍了线程池基本概念、线程及线程池状态、java中线程池提交task后执行流程、Executors线程池工具类、最后介绍在springboot框架下使用线程池和定时线程池,以及task取消
线程池基本
背景
线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务
线程池优势
- 降低执行task重复创建销毁线程的消耗
- 提高执行task响应速度。切换到task时,无需创建线程,直接将task“挂”到线程执行
线程状态
线程共有6个状态,分别是new、runnable、waiting、time_waiting、blocked、terminal。其中runnable状态包括running、ready。且runnable和waiting、time_waiting、blocked、terminal之间切换。
new
线程实例化初始状态
RUNNABLE(running、ready)
running:正在运行
ready:running状态线程占用cpu时间片完(主动调用yield)后状态。
yield():向调度程序提示当前线程愿意放弃cpu的使用,调度程序可以忽略此提示。很少使用,可用于调试,它可能有助于重现由于竞争条件而产生的错误。
waiting
等待状态,例如调用wait、join、park方法,线程等待
time_waiting
限时等待,例如调用sleep(time),wait(time),join(time),parkNanos(),parkUntil(thread)
blocked
阻塞状态,例如等待锁,或等待进入synchronized块
terminal
线程终止
线程池状态
RUNNING
可接收新提交的任务,可处理队列中任务。
SHUTDOWN
不再接收新提交的任务,但是可处理队列中的任务。
STOP
不再接收新提交的任务,且不再处理队列中的任务。
TIDYING
所有任务已经被终结,工作线程数为0,该状态会执行钩子函数terminated()
TERMINATED
已执行完毕terminated()方法。
状态转换
RUNNING -> SHUTDOWN:调用shutdown()方法后。
(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()方法后。
SHUTDOWN -> TIDYING:当队列中任务为空,线程池中任务为空。
TIDYING -> TERMINATED:terminated()钩子方法执行完毕之后。
Java线程池实践
ThreadPool参数
corePoolSize,maximumPoolSize,workQueue,keepAliveTime,TimeUnit,threadFactory,RejectedExecutionHandler
核心线程数
corePoolSize:线程池中存活线程数量,除非allowCoreThreadTimeOut=true时会被kill
最大线程数
maximumPoolSize:线程池允许最大线程数,达到keepAliveTime后会被kill
等待时间和单位
keepAliveTime:多于核心线程之外的线程,超过该时间的线程会被kill
TimeUnit:keepAliveTime时间单位
阻塞队列
workQueue:保存未执行的Runnable 的task
线程工厂
threadFactory:创建线程的工厂类,默认Executors.defaultThreadFactory()
拒绝策略
RejectedExecutionHandler:因队列满、且超过最maxThreadPool限制的task处理策略
- AbortPolicy:拒绝task,抛出RejectedExecutionException 。是ThreadPoolExecutor和ScheduledThreadPoolExecutor的默认拒绝策略
- DiscardPolicy:丢弃task,无异常
- DiscardOldestPolicy:丢弃未处理的最old的task,无异常
- CallerRunsPolicy:拒绝task,抛回给提交task的线程执行
static volatile int count = 0;public static void main(String[]args){ //可接纳6+1=7个任务,再多则会执行默认拒绝策略ThreadPoolExecutor executor = new ThreadPoolExecutor(5,6,1000l, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));Runnable task = new Runnable() {@Overridepublic void run() {System.out.println(count++);try {Thread.sleep(1000000000l);} catch (InterruptedException e) {throw new RuntimeException(e);}}};for(int i=0;i<8;i++){executor.execute(task);//抛出RejectedExecutionException}System.out.println(executor.getActiveCount());}
Task提交流程
coreThread -- queue -- maxThread -- handler
- 若运行线程少于corePoolSize,尝试以给task启动一个新的core线程
- task进入BlockQueue排队
- BlockQueue已满,添加新线程。失败则拒绝该task,执行拒绝策略
Thread回收
回收逻辑:Runnable状态线程数ctl减1;从工作线程HashSet中移除工作线程Worker对象
- 回收超过keepAliveTime之外的非core线程
- 回收超过keepAliveTime的core线程,且allowCoreThreadTimeOut=true
线程池任务管理
获取活跃线程数
ThreadPoolExecutor.getActiveCount()。线程池中正在运行的线程个数,包括核心线程+队列满后新建的非核心线程
static volatile int count = 0;public static void main(String[]args) throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(5,9,1000l,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));Runnable task = new Runnable() {@Overridepublic void run() {System.out.println(count++);try {Thread.sleep(1000000000l);} catch (InterruptedException e) {throw new RuntimeException(e);}}};for(int i=0;i<8;i++){executor.execute(task);} //输出为7:5个在核心线程,1个在队列,2个为新建线程System.out.println("活跃线程:"+executor.getActiveCount());}
获取提交的task状态
ThreadPoolExecutor.getTaskCount():历史提交的task总数
ThreadPoolExecutor.getCompletedTaskCount():已完成task个数
取消任务
futrure.cancel()
task提交后状态
- 线程未启动
- 线程正在执行
- 线程已结束
task未启动,cancel后会从线程池的阻塞队列中remove掉task;
task已结束,cancel对task不会有任何影响
task正在执行:通过interrupted标志位,在sleep/join/wait处抛出中断异常终止task执行
无future第三方库
该场景适用于调用第三方库无future返回值情况。可以通过在第三方库中例如cancelTask等方法中重写
Future.cancel()取消task
Java Executors线程池工具
Executors是java一个线程池工具,便捷创建例如单个、固定数量、定时等特征的线程池。不同特征的线程池,其参数不同。
newSingleThreadPool
适用于提交的任务按顺序执行场景:核心线程数=最大线程数=1,一直存活,如果任务异常中断了线程,则创建一个新线程;使用链表阻塞无界队列;默认线程工厂Executors.defaultThreadFactory();默认阻塞策略:AbortPolicy
短期提交任务过多,则队列溢出,内存溢出
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
cachedThreadPool
适用于执行许多短期(60s)异步任务场景:核心线程为0,最大线程为Integer.MAX_VALUE,存活60s,使用同步队列SynchronousQueue;默认线程工厂Executors.defaultThreadFactory();默认阻塞策略:AbortPolicy
短期提交任务过多,则一直创建线程,内存溢出
SynchronousQueue(同步队列):队列中不存储元素,元素的offer和take阻塞对应
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool
适用于执行IO较少异步任务场景:核心线程=最大线程=输入参数,线程创建后不回收,使用LinkedBlockingQueue阻塞无界队列;默认线程工厂Executors.defaultThreadFactory();默认阻塞策略:AbortPolicy
短期提交任务过多,则队列溢出,内存溢出
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
newThreadScheduledExecutor
适用于单一定时任务:核心线程数=1,最大线程数=Integer.MAX_VALUE,一直存活,使用优先级队列DelayedWorkQueue定时,默认线程工厂Executors.defaultThreadFactory();默认阻塞策略:AbortPolicy
DelayedWorkQueue优先级队列,底层使用“堆”数据结构实现
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
SpringBoot线程池
SpringBoot线程池在JUC包下ThreadPoolExecutor基础上进行封装,通过注入Bean或注解的方式使用。SpringBoot提供的线程池包括普通提交task的线程池和定时线程池。
提交task方式
通过@Configuration注解和@Bean注解,注入定义的ThreadPoolTaskExecutor对象,通过@Autowired注入使用的threadPoolTaskExecutor对象,通过threadPoolTaskExecutor的submit()或execute()方法提交任务。
注意:@Autowired注入@Bean注解对象时,默认是@Bean对象的方法名
@Configuration
public class TaskThreadPoolConfig {
//将线程池注入Spring IOC@Beanpublic ThreadPoolTaskExecutor threadPoolTimeoutExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}//注入线程池对象@AutowiredThreadPoolTaskExecutor threadPoolTimeoutExecutor;//提交任务
public void test(){threadPoolTimeoutExecutor.submit(()->{runTask();});}
}
注解方式
通过@Configuration注解和@Bean注解,注入定义的ThreadPoolTaskExecutor对象,通过@Async注解异步执行方法,方法可以返回task的Future句柄,或返回void
注意:@Bean注解可以添加线程池名字,异步方法@Async可以指定线程池名字
@Configuration
public class TaskThreadPoolConfig {@Bean(value = "test-threadpool")//定义线程池名public ThreadPoolTaskExecutor threadPoolTimeoutExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}//方法上添加注解,该方法异步执行,方法可以返回Future任务句柄,或void@Async(value = "test-threadpool")//指定线程池public Future<Object> runTask(){System.out.println("--");return new AsyncResult<>(true);}
}
取消任务
获取task的Future句柄,通过future.cancel(true)取消任务。其内部则通过翻转interrupted标志位,遇到例如sleep、wait、join等则抛出中断异常,终止执行task的线程。
@AutowiredThreadPoolTaskExecutor threadPoolTimeoutExecutor;public static void main(String []args){Future<?> future = threadPoolTimeoutExecutor.submit(()->{try {Thread.sleep(1000l);} catch (InterruptedException e) {throw new RuntimeException(e);}});if(!future.isDone()){//task未结束future.cancel(true);//翻转执行task的线程的中断标志位,遇到例如sleep,join,wait等抛出中断异常终止task}}
觉得不错,点个👍吧,😄