目录
1.Future
2.CompletableFuture
2.1.为什么会有CompletableFuture?
2.2.使用
2.2.1.提交任务+获取结果
2.2.2.回调函数
2.2.3.CompletableFuture嵌套问题
1.Future
Java中的Future接口代表一个异步计算。其提供了一组规范用来对异步计算任务进行管理控制。
-
V get(): 阻塞等待计算完成,然后返回结果。如果计算抛出了异常,则此方法将重新抛出该异常。它有两个重载版本,区别是是否允许设置阻塞超时的时间。
-
boolean isDone(): 返回true如果任务已完成(无论是否成功),否则返回false。这包括正常完成、被取消或执行时抛出异常的情况。
-
boolean cancel(boolean mayInterruptIfRunning): 尝试取消任务的执行。如果任务尚未开始,它将被取消;如果任务正在运行且mayInterruptIfRunning为true,则执行该任务的线程将被中断;如果任务已经完成,取消请求将被忽略。此方法返回true表示任务已被取消,无论是之前已取消还是由于此次调用而取消。
-
boolean isCancelled(): 如果任务在正常完成前被取消,则返回true。
代码示例:
下面为了演示的全面一点,会把经常用到的api都调用一遍,其实直接get就能去拿值了。isDone和isCancelled只是为了保险起见而已。
import java.util.concurrent.*;
public class FutureAPIDemo {
public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> future = executor.submit(() -> {Thread.sleep(2000); // 模拟耗时操作return "Hello from Future!";});
// 检查任务是否完成while (!future.isDone()) {System.out.println("Task is not done yet...");try {Thread.sleep(500); // 让主线程等待一段时间} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}
// 尝试获取结果if (!future.isCancelled()) {try {String result = future.get(); // 这里会阻塞直到获取到结果System.out.println("Result: " + result);} catch (InterruptedException | ExecutionException e) {System.err.println("Error getting result: " + e.getMessage());}} else {System.out.println("Task was cancelled.");}
// 尝试取消任务(实际在这个例子中不会改变状态,因为任务已经完成)boolean cancellationResult = future.cancel(true);System.out.println("Cancellation attempt result: " + cancellationResult);
executor.shutdown();}
}
Future接口只规定了规范,具体实现是什么样子的喃?Future怎么就能去控制异步任务了?我们具体选一个实现类来看看,可以看到JDK种带了很多Future的实现:
我们选FutureTask:
public class FutureTask<V> implements RunnableFuture<V>
可以看到FutureTask其实就是一条线程:
其实异步任务本质上就是一条线程脱离主线程另起炉灶去跑一个方法逻辑。
public interface RunnableFuture<V> extends Runnable, Future<V>
然后用一个函数式接口指向业务逻辑,有很多状态字段,通过状态去控制线程以及整个异步任务的退出和任务获取等,以get方法为例:
public class FutureTask<V> implements RunnableFuture<V> {private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;
private Callable<V> callable;private Object outcome; // non-volatile, protected by state reads/writesprivate volatile Thread runner;private volatile WaitNode waiters;
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}......
}
2.CompletableFuture
2.1.为什么会有CompletableFuture?
future只是实现了基础的异步编程而已,但是其性能仍然可以优化,其使用上还可以扩展能多能力,completableFuture可以理解为future的升级版本:
- CompletableFuture 提供了一系列方法,如thenApply, thenCompose, thenCombine等,允许你轻松地组合多个异步操作,构建复杂的异步工作流。相比之下,Future仅提供了获取结果或取消任务的基本功能。
- CompletableFuture 支持注册回调函数,当异步操作完成时自动执行这些函数,无需显式调用get()方法阻塞主线程。这使得代码更易于编写和理解,避免了“回调地狱”。
- CompletableFuture 内部使用了ForkJoinPool或其他线程池,这通常比手动管理线程更高效。此外,CompletableFuture的实现考虑了并发场景下的性能优化。
2.2.使用
2.2.1.提交任务+获取结果
CompletableFuture支持两种提交任务的方式:
-
runAsync
-
supplyAsync
两者的区别是前者没有返回值,后者有返回值。
不管是runAsync也好,还是supplyAsync也好,他们用来接收任务的参数都是一个函数式接口,这意味着什么喃?意味着可以直接通过lambda表达式来定义任务。
获取结果和Future是一样的,如果没有获取到任务的结果,就会一直阻塞,直到获取到为止。
以下以runAsync为例做一个代码演示:
CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}
});
completableFuture.get();//这里会阻塞,直到任务执行完成
2.2.2.回调函数
当任务执行完成后,我们期待有后续的关联操作,就需要用上回调函数了。CompletableFuture比起Future来说用起来很方便的一点就是CompletableFuture支持回调函数。
CompletableFuture支持三种回调函数:
-
thenRun,无参无返回。
-
thenAccept,有参无返回。
-
thenApply,有参有返回。
以下是代码示例:
thenRun:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureExample {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("CompletableFuture is done.");return null;}).thenRun(() -> System.out.println("Task completed."));future.get(); // 等待任务完成}
}
thenAccept:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureExample {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "Hello, CompletableFuture!";}).thenAccept(result -> System.out.println("Result: " + result));future.get(); // 等待任务完成}
}
thenApply:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureExample {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "Hello, CompletableFuture!";}).thenApply(result -> result.toUpperCase());System.out.println(future.get()); // 输出: HELLO, COMPLETABLEFUTURE!}
}
2.2.3.CompletableFuture嵌套问题
CompletableFuture存在嵌套问题,举个例:
以上两个函数的返回值都是一个CompletableFuture,链式调用它们就会现成一个CompletableFuture嵌套:
CompletableFuture提供了对CompletableFuture编排的API,支持对多个CompletableFuture做聚合操作,如下我们可以调用thenCompose来将多层嵌套展开:
CompletableFuture一共提供了四种对CompletableFuture进行编排的API:
-
thenCompose,对多个CompletableFuture进行链式编排。
-
thenCombine,对两个CompletableFuture进行map操作。
-
allof,将多个任务聚合成一个任务集,集合中全部任务完成后才能继续往下走。
-
anyof,将多个任务聚合成一个任务集,集合中任意一个任务完成后就能继续往下走。
以下是代码示例:
thenCompose:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureComposition {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "First Result";});CompletableFuture<String> secondFuture = firstFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return s + " Second Result";}));System.out.println(secondFuture.get());}
}
thenCombine:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureComposition {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "First Result";});CompletableFuture<String> secondFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "Second Result";});CompletableFuture<String> combinedFuture = firstFuture.thenCombine(secondFuture, (s1, s2) -> s1 + " " + s2);System.out.println(combinedFuture.get());}
}
allOf:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class CompletableFutureComposition {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> firstFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("First task done");});CompletableFuture<Void> secondFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("Second task done");});CompletableFuture<Void> allFutures = CompletableFuture.allOf(firstFuture, secondFuture);allFutures.get();System.out.println("All tasks are done");}
}
anyOf:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class CompletableFutureComposition {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> firstFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("First task done");});CompletableFuture<Void> secondFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("Second task done");});CompletableFuture<Void> anyFuture = CompletableFuture.anyOf(firstFuture, secondFuture);anyFuture.get();System.out.println("At least one task is done");}
}