CompletableFuture与CountDownLatch实现多任务并行执行
前言:CountDownLatch可以实现多任务并行,最后收集结果。业务中更广泛的使用CompletableFuture来实现多任务并行
一、 CompletableFuture实现多任务异步执行
1.1 描述
CompletableFuture.supplyAsync() 是 Java 中用于异步执行任务的一种方法。它返回一个CompletableFuture 对象,该对象表示一个异步计算的结果。supplyAsync() 方法接受一个 Supplier 函数式接口作为参数,该接口表示一个无参数、有返回值的函数。
1.2 使用
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>supplyAsync(Supplier<U> supplier, Executor executor)
第一个方法使用默认的 ForkJoinPool.commonPool() 作为执行器。
第二个方法允许你指定一个自定义的 Executor
1.3 使用demo
public static void main(String[] args) {List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();long t1 =System.currentTimeMillis();CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {int time = 6;try {//处理业务TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}//返回业务结果return time;},threadPool);CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {int time = 8;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;},threadPool);CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {int time = 10;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;});CompletableFuture<Integer> completableFuture4 = CompletableFuture.supplyAsync(() -> {int time = 13;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;},threadPool);CompletableFuture<Integer> completableFuture5 = CompletableFuture.supplyAsync(() -> {int time = 20;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;},threadPool);CompletableFuture<Integer> completableFuture6 = CompletableFuture.supplyAsync(() -> {int time = 25;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;},threadPool);completableFutureList.add(completableFuture1);completableFutureList.add(completableFuture2);completableFutureList.add(completableFuture3);completableFutureList.add(completableFuture4);completableFutureList.add(completableFuture5);completableFutureList.add(completableFuture6);//用于等待一组 CompletableFuture 全部完成的方法.join() 方法用于阻塞当前主线程,直到 CompletableFuture 完成CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();// 汇总所有结果List<Integer> dataList = new ArrayList<>();for (CompletableFuture<Integer> future : completableFutureList) {try {Integer time = future.get();dataList.addAll(Collections.singleton(time));} catch (InterruptedException | ExecutionException e) {}}long t2 =System.currentTimeMillis();System.out.println("6个任务总用时:"+(t2-t1)+"毫秒");System.out.println(dataList);}
线程池的定义
static Executor threadPool = new ThreadPoolExecutor(10,//核心活跃线程数,类比银行两个柜台一直保持营业30 ,//线程池最大大小,类比银行共25个柜台可以营业2L,//超时回收空闲的线程,类比有三个非活跃线程处于活跃状态,在一定时间还未接到任务就进入非活跃状态(就是不营业了)TimeUnit.SECONDS,//时间单位new ArrayBlockingQueue<>(3),//存放等待任务的队列,类比为银行的候客区,不指定大小的话就是最大整数Executors.defaultThreadFactory(),// 线程工厂,不修改!用来创建new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略,如果线程满了,线程池就会使用拒绝策略);
最终输出:
6个任务总用时:25029毫秒
[6, 8, 10, 13, 20, 25]
注意:
①join是为了让主线程等待所有子任务都执行完。例如:t1.join()用于让当前线程等待指定的线程(例如 t1)完成其执行。换句话说,t1.join() 方法会阻塞当前线程,直到 t1 线程结束
②线程池的定义要注意,核心线程太小的话,会在核心线程用完时,进入队列等待。除非队列装满才会唤醒非核心线程
二、CountDownLatch实现多任务并行执行
2.1 描述
CountDownLatch 是 Java 并发包 (java.util.concurrent)中的一个同步辅助类,它允许一个或多个线程等待其他线程完成操作。CountDownLatch 初始化时需要指定一个计数值(线程数量),当调用 countDown() 方法时,该计数器递减,直到计数器到达零,然后所有等待的线程被释放,继续执行后续的操作。
2.2 两个方法
CountDownLatch主要有2个方法,当一个或多个线程调用await方法时,调用线程会被阻塞,其他线程调用
countDown
方法时计数器减一(调用countDown方法不会阻塞线程),当计数器的值变为0时,此时调用方法被阻塞的线程会被唤醒,继续执行
countDown 方法时计数器减一await
尝试唤醒,当计数器的值变为0时,才会被唤醒
2.3 使用示例
CountDownLatch 使用
private static void countDownLatchDemo(){final CountDownLatch latch = new CountDownLatch(6);long t1 =System.currentTimeMillis();for (int i = 0; i < 6; i++) {threadPool.submit(new Worker(latch, i +" thread:"));}try {latch.await(); // 阻塞直到计数器为0} catch (InterruptedException e) {throw new RuntimeException(e);}long t2 =System.currentTimeMillis();System.out.println("countDownLatch执行完6个任务用时:"+(t2-t1)+"毫秒!");}
任务方法
class Worker implements Runnable {private final CountDownLatch latch;private final String taskName;public Worker(CountDownLatch latch, String taskName) {this.latch = latch;this.taskName = taskName;}@Overridepublic void run() {try {// 模拟任务处理时间Thread.sleep(10000);System.out.println(taskName + "完成了任务,线程id:"+ Thread.currentThread().getId());} catch (InterruptedException e) {e.printStackTrace();} finally {// 子任务完成,计数器减一latch.countDown();}}
}
线程池还是示例1中的线程池
输出结果:
0 thread:完成了任务,线程id:15
1 thread:完成了任务,线程id:16
3 thread:完成了任务,线程id:18
2 thread:完成了任务,线程id:17
4 thread:完成了任务,线程id:19
5 thread:完成了任务,线程id:20
countDownLatch执行完6个任务用时:10053毫秒!
由此可见他是并行,而非串行