文章目录
- 1 问题背景
- 2 前言
- 3 4种常用的方法
- 4 代码
- 4.1 isTerminated()
- 4.2 线程池的任务总数是否等于已执行的任务数
- 4.3 `CountDownLatch`计数器
- 4.4 `CyclicBarrier`计数器
1 问题背景
真实生产环境的电商项目,常使用线程池应用于执行大批量操作达到高性能的效果。应用场景有批量补偿修正数据库历史数据、定时批量执行业务逻辑(涉及到百万级数据)、批量初始化新业务的数据等等。用到线程池,必须要知道任务是否执行完了,才能进行下一步业务操作。今天总结归纳4种常用的方法判断线程池是否执行完所有任务
2 前言
先给出解决方案,文末再贴出详细代码
参考自:面试突击35:如何判断线程池已经执行完所有任务了?
3 4种常用的方法
- 线程池提供的
isTerminated()
方法。缺点是需要调用shutdown()
关闭线程池 - 判断线程池的任务总数是否等于已执行的任务数。优点是无需关闭线程池。缺点是两个数值都是动态计算的,只是一个近似值
CountDownLatch
计数器。写法很优雅,且无需关闭线程池,但它的缺点是只能使用一次,CountDownLatch 创建之后不能被重复使用CyclicBarrier
计数器。和 CountDownLatch 类似,它可以理解为一个可以重复使用的循环计数器,CyclicBarrier 可以调用reset()
将自己重置到初始状态
4 代码
4.1 isTerminated()
@Slf4j
public class IsTerminatedDemo {private static final int BLOCKING_QUEUE_CAPACITY = 100;private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");/*** 使用isTerminated判断线程池是否执行完任务,缺点是要关闭线程池** @param args*/public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,10,10 * 60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY),new DefaultThreadFactory("complete_thread_pool"),new ThreadPoolExecutor.AbortPolicy());// 添加任务for (int i = 0; i < 5; i++) {int finalI = i;threadPool.submit(() -> {// 随机休眠int r = new Random().nextInt(5);try {Thread.sleep(r);} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("Task NO.{} finish.", finalI);});}threadPool.shutdown();// 判断线程池是否执行完所有任务,前提是要执行shutdownwhile (!threadPool.isTerminated()) {log.info("{}: ThreadPool handleing task.", LocalDateTime.now().format(FORMATTER));}log.info("All tasks have been finished!");}
}
4.2 线程池的任务总数是否等于已执行的任务数
@Slf4j
public class GetCompletedTaskCountDemo {private static final int BLOCKING_QUEUE_CAPACITY = 100;private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");/*** 判断线程池是否执行完所有任务,如果计划执行任务数=已完成任务数,那么线程池的任务就全部执行完了。* 优点是无需关闭线程池* 缺点是 getTaskCount() 和 getCompletedTaskCount() 返回的是一个近似值,因为线程池中的任务和线程的状态可能在计算过程中动态变化,所以它们两个返回的都是一个近似值** @param args*/public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,10,10 * 60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY),new DefaultThreadFactory("complete_thread_pool"),new ThreadPoolExecutor.AbortPolicy());// 添加任务for (int i = 0; i < 5; i++) {int finalI = i;threadPool.submit(() -> {// 随机休眠int r = new Random().nextInt(5);try {Thread.sleep(r);} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("Task NO.{} finish.", finalI);});}// 判断线程池是否执行完所有任务,如果计划执行任务数=已完成任务数,那么线程池的任务就全部执行完了。// 优点是无需关闭线程池// 缺点是 getTaskCount() 和 getCompletedTaskCount() 返回的是一个近似值,因为线程池中的任务和线程的状态可能在计算过程中动态变化,所以它们两个返回的都是一个近似值while (threadPool.getTaskCount() != threadPool.getCompletedTaskCount()) {log.info("{}: ThreadPool handleing task.", LocalDateTime.now().format(FORMATTER));}log.info("All tasks have been finished!");}
}
4.3 CountDownLatch
计数器
@Slf4j
public class CountDownLatchDemo {private static final int BLOCKING_QUEUE_CAPACITY = 100;private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");/*** 写法很优雅,且无需关闭线程池,但它的缺点是只能使用一次,CountDownLatch 创建之后不能被重复使用,* 也就是说 CountDownLatch 可以理解为只能使用一次的计数器** @param args*/public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,10,10 * 60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY),new DefaultThreadFactory("complete_thread_pool"),new ThreadPoolExecutor.AbortPolicy());int taskCount = 5;CountDownLatch cdl = new CountDownLatch(taskCount);// 添加任务for (int i = 0; i < taskCount; i++) {int finalI = i;threadPool.submit(() -> {// 随机休眠int r = new Random().nextInt(5);try {Thread.sleep(r);} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("Task NO.{} finish.", finalI);// 线程执行完,计数器减1cdl.countDown();});}log.info("{}: ThreadPool handleing task.", LocalDateTime.now().format(FORMATTER));try {// 阻塞等待所有线程执行完任务cdl.await();} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("All tasks have been finished!");}
}
4.4 CyclicBarrier
计数器
@Slf4j
public class CyclicBarrierDemo {private static final int BLOCKING_QUEUE_CAPACITY = 100;private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");/***和 CountDownLatch 类似,它可以理解为一个可以重复使用的循环计数器,CyclicBarrier 可以调用 reset 方法将自己重置到初始状态** @param args*/public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,10,10 * 60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY),new DefaultThreadFactory("complete_thread_pool"),new ThreadPoolExecutor.AbortPolicy());int taskCount = 5;CyclicBarrier cb = new CyclicBarrier(taskCount, () -> log.info("log from CyclicBarrier, all tasks of ThreadPool have been finished"));// 添加任务for (int i = 0; i < taskCount; i++) {int finalI = i;threadPool.submit(() -> {// 随机休眠int r = new Random().nextInt(5);try {Thread.sleep(r);} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("Task NO.{} finish.", finalI);// 阻塞等待try {cb.await();} catch (Exception e) {throw new RuntimeException(e);}});}log.info("All tasks have been finished!");}
}