CompletableFuture工具类可以帮助实现Java并发编程中的任务编排
以上除了join用于阻塞调用该方法的线程并且接受CompletableFuture的返回值以外其它方法皆有Async异步和Executor指定线程池选项
对于supply,run,apply,accept的区别在于函数式编程的接口类型不同:
- supply: Supplier<T> supplier => T get()
- run: Runnable runnable => void run()
- apply: Function<T, R> => apply(T t)
- accept: Consumer<T> => void accept(T t)
对于以上方法,根据接受参数不同分为两类,一类参数接受如supply,run,apply,accept接口的实现类,另一类参数接受新的CompletableFuture:
FunctionInterface | CompletableFuture |
---|---|
supplyAsync | thenCompose |
runAsync | thenCombine |
thenApply | thenAcceptBoth |
thenAccept | runAfterBoth |
thenRun | applyToEither |
acceptEither | |
runAfterEither |
thenCompose用于先执行A再执行B
thenCombine,thenAcceptBoth,runAfterBoth用于A,B同时执行
applyToEither,acceptEither,runAfterEither用于A,B谁先执行完就跳到C
exceptionally,handle,whenComplete用于在并发编程中处理结果和异常:
exceptionally:
该方法仅处理异常情况:发生异常时,不会把异常抛出,而是由exceptionally处理结果返回
public static CompletableFuture exceptionally(int a, int b){return CompletableFuture.supplyAsync(() -> a/b).exceptionally(ex -> {System.out.println("ex:\t"+ex.getMessage());return 0;});
}
handle:
处理上一阶段返回值和异常,不会把异常抛出,而是由handle处理结果返回
public class CreateThread_FutureTask {public static void main(String[] args) {CompletableFuture<String> ctf = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);return "1";} catch (InterruptedException e) {throw new RuntimeException(e);}}).applyToEither(CompletableFuture.supplyAsync(() -> {try {Thread.sleep(50);return String.valueOf(1/0);} catch (InterruptedException e) {throw new RuntimeException(e);}}), str -> {return str;}).handle((str,ex)->{if(ex!=null){return ex.getMessage();}else{return str;}});System.out.println(ctf.join());System.out.println("主线程能继续执行");}
}
whenComplete:
处理上一阶段返回值和异常,会把异常抛出
public static CompletableFuture whenComplete(int a, int b){return CompletableFuture.supplyAsync(() -> a/b).whenComplete((result, ex) -> {if (null != ex) {System.out.println("whenComplete error:\t"+ex.getMessage());}});
}
CompletableFuture.allOf和anyOf
allOf:接受CompletableFuture<?>...ctfs,等待全部执行完毕
anyOf:接受CompletableFuture<?>...ctfs,等待任意一个执行完毕
CompletableFuture[] dishes = IntStream.rangeClosed(1, 10).mapToObj(i -> new Dish("菜" + i, 1L)).map(dish -> CompletableFuture.runAsync(dish::make)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(dishes).join();
System.out.println("菜全都做好了");
使用@Async指定线程池+CompletableFuture.completedFuture实现异步
CompletableFuture.completedFuture用于在已经知道返回值时生成CompletableFuture对象,直接使用当前方法的线程,可以与SpringBoot的@Async配合实现异步
@Configuration
public class ExecutorConfig {@Beanpublic Executor myAsyncTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(2);executor.setQueueCapacity(500);executor.setThreadNamePrefix("GithubLookup-");executor.initialize();return executor;}
}
@Async("myAsyncTaskExecutor")
@Override
public CompletableFuture<String> completeTask(int i) throws InterruptedException {log.info("当前线程为",Thread.currentThread().getName());Thread.sleep(1000*i);String value=String.valueOf(i);return CompletableFuture.completedFuture(value);
}