引入
功能和灵活性:
Future:是Java 5引入的接口,用于表示一个异步操作的未来结果。它提供了基本的异步操作支持,如检查是否完成、等待结果以及获取结果,但在处理结果、异常和组合等方面功能有限。
CompletableFuture:是Java 8引入的类,扩展了Future的功能,提供了更强大和灵活的异步编程支持。它允许更容易地处理异步操作的结果、异常和组合,以及在操作完成时执行回调等。
异步编程支持:
Future:仅用于异步操作的基本管理,如提交任务和获取结果。
CompletableFuture:提供了丰富的操作符、方法和组合功能,可以在异步操作之间进行更复杂的处理,如转换、组合、串联、并行等。
组合和链式操作:
Future:没有提供内置的操作符来对多个异步操作进行链式组合。
CompletableFuture:支持在操作完成后进行链式操作,使多个异步操作可以依次执行,以及在其中一个或多个操作完成后执行其他操作。
异常处理:
Future:异常处理相对有限,通常需要使用try-catch块来捕获操作过程中的异常。
CompletableFuture:具有更强大的异常处理机制,可以使用exceptionally()、handle()等方法来处理操作过程中的异常。
回调执行:
Future:不支持在操作完成时执行回调操作。
CompletableFuture:支持使用thenApply()、thenCompose()、thenCombine()等方法来在操作完成后执行回调。
并行执行:
Future:通常不直接支持并行执行。
CompletableFuture:通过fork-join池实现了并行执行操作,允许一次性同时处理多个异步任务。
总之:
Future使用局限性:
- Future的get方法会导致主线程阻塞
- 轮询获取结果会消耗cpu资源
- 多个Future任务不能按照顺序执行
- Future Api无异常处理
创建:
// 无返回值 使用ForkJoinPool线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 无返回值 可以自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值 使用ForkJoinPool线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 有返回值 可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
获取结果:
通过get、join、getNow获取返回值,区别如下:
join:返回结果或者抛出一个unchecked异常(CompletionException),不需要显示捕获异常。
get:返回结果或者一个具体的异常(ExecutionException,
InterruptedException),此方法继承至Future是堵塞的。
getNow:如果当前任务执行完成,返回执行结果,否则返回valueIfAbsent(默认值)。
get:get() 方法会阻塞当前线程,直到异步计算完成并返回结果。如果计算过程中抛出异常,该异常会在调用 get() 方法的线程中重新抛出。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Hello, World!";
}); try { // 阻塞直到结果可用 String result = future.get(); System.out.println(result);
} catch (InterruptedException | ExecutionException e) { e.printStackTrace();
}
join:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作并抛出异常 throw new RuntimeException("Simulated Exception");
}); try { // 阻塞直到结果可用,但异常将作为未检查的异常抛出 String result = future.join(); System.out.println(result); // 这行代码不会被执行,因为会抛出异常
} catch (CompletionException e) { // 处理 CompletionException,它包装了原始异常 e.getCause().printStackTrace(); // 输出: java.lang.RuntimeException: Simulated Exception
}
getnow:如果计算尚未完成,它将立即返回一个给定的默认值。这不会阻塞调用线程。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Hello, World!";
}); // 尝试获取结果,但由于计算尚未完成,将返回默认值
String result = future.getNow("Default Value");
System.out.println(result); // 输出: Default Value(如果计算尚未完成) // 在稍后某个时间点,当计算完成后,你可以再次调用 get() 来获取结果
try { result = future.get(); System.out.println(result); // 输出: Hello, World!
} catch (InterruptedException | ExecutionException e) { e.printStackTrace();
}
handle()
它允许你处理异步操作的结果,无论是正常结果还是异常。这个方法接受两个参数:一个 BiFunction 用于处理正常结果,一个Function 用于处理异常(但是通常我们只使用第一个参数,因为异常处理可以通过 exceptionally 方法来完成
handle 方法的目的是允许你在单个回调中同时处理正常结果和异常,并返回一个新的值(或者一个新的 CompletableFuture)。
返回值:
handle 方法返回一个新的 CompletableFuture,该 CompletableFuture 的结果是由提供的 BiFunction 产生的。
异常处理:
你可以在 handle 方法的 BiFunction 中检查异常参数(Throwable 类型),并据此决定如何处理异常。但是,请注意,handle 本身并不区分正常结果和异常,它只是提供了一个函数来处理两者。
非阻塞:
handle 方法本身是非阻塞的,它立即返回一个新的 CompletableFuture,而不需要等待原始 CompletableFuture 完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; public class CompletableFutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Random error"); } return "Success"; }); // 使用 handle 处理结果和异常 CompletableFuture<String> handledFuture = future.handle((result, throwable) -> { if (throwable != null) { // 处理异常 return "Error occurred: " + throwable.getMessage(); } else { // 处理正常结果 return "Handled result: " + result; } }); // 获取最终结果 System.out.println(handledFuture.get()); }
}
exceptionally
当你使用 CompletableFuture 进行异步计算时,如果在执行过程中发生异常,你可以通过调用 exceptionally 方法来指定一个函数,这个函数将在异常发生时被调用,并允许你提供一个替代结果或者进行其他的异常处理。
exceptionally 方法接收一个参数,它是一个函数式接口 Function<Throwable, ? extends U> 的实现,其中 Throwable 是抛出的异常类型,U 是 CompletableFuture 的结果类型或其子类型。这个函数接收异常作为输入,并返回一个值作为替代结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; public class CompletableFutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Random error"); } return "Success"; }).exceptionally(throwable -> "Error occurred: " + throwable.getMessage()); // 获取最终结果,无论是正常结果还是异常处理后的结果 System.out.println(future.get()); }
}
allof()
用于创建一个新的 CompletableFuture,这个新的 CompletableFuture 会在所有给定的 CompletableFuture 实例都完成时完成。这里的“完成”意味着无论是正常完成还是异常完成。allOf() 方法本身并不返回任何结果,它只是等待所有给定的 CompletableFuture 都完成。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
参数 cfs:一个 CompletableFuture 数组,代表要等待完成的所有任务。 返回值 一个新的
CompletableFuture,该 CompletableFuture 在所有给定的 CompletableFuture
都完成时完成。
假设我们有一组异步任务,每个任务都表示为一个 CompletableFuture。我们想要等待所有任务都完成后再进行下一步操作。这时,我们就可以使用 CompletableFuture.allOf() 方法。
// 创建并启动三个异步任务CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "CompletableFuture");CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Java");// 使用 allOf 等待所有任务完成,但不处理结果CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);// 分别处理每个任务的结果future1.thenAccept(result -> System.out.println("Result of future1: " + result));future2.thenAccept(result -> System.out.println("Result of future2: " + result));future3.thenAccept(result -> System.out.println("Result of future3: " + result));// 等待所有任务完成combinedFuture.get();System.out.println("All tasks have completed.");
thenApply()
CompletableFuture.thenApply() 方法用于在原始 CompletableFuture 完成时,对其结果进行某种转换,并返回一个新的 CompletableFuture,这个新的 CompletableFuture 会包含转换后的结果。如果原始 CompletableFuture 异常完成,那么新的 CompletableFuture 也会异常完成,并携带相同的异常。
thenApply()是线程的后续操作,可以拿到上一次线程执行的返回结果作为本次thenApply()的参数一直传递下去。 并且是有返回结果的。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; public class CompletableFutureThenApplyExample { public static void main(String[] args) throws ExecutionException, InterruptedException { // 创建一个异步任务,计算一个数的平方 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42); // 使用 thenApply 来对结果进行转换,这里我们将结果乘以2 CompletableFuture<Integer> doubledFuture = future.thenApply(result -> result * 2); // 等待转换后的任务完成,并打印结果 Integer doubledResult = doubledFuture.get(); System.out.println("The result doubled is: " + doubledResult); }
}
// 模拟 1 + 1 + 1CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1).thenApply(v -> v + 1).thenApply(v -> v + 1);System.out.println("执行结果:" + future.getNow(-1));
返回结果:3
thenAccept() 和 thenRun()方法
thenAccept() 和 thenRun() 是 CompletableFuture 中两个常用的方法,它们用于在异步操作完成时执行回调函数,但它们的用途和参数有所不同。
thenAccept() 方法接收一个 Consumer 类型的参数,当 CompletableFuture 的计算完成时,Consumer 会被调用,并且会传入 CompletableFuture 的结果。thenAccept() 方法不返回新的 CompletableFuture,因为它不关注结果的进一步处理。
thenRun() 方法接收一个 Runnable 类型的参数,当 CompletableFuture 的计算完成时,Runnable 会被调用。与 thenAccept() 不同,thenRun() 不接收任何参数,它不关心 CompletableFuture 的结果。
import java.util.concurrent.CompletableFuture; public class CompletableFutureExample { public static void main(String[] args) throws Exception { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello, World!"); future.thenAccept(result -> { System.out.println("Received result: " + result); // 注意:这里不能返回一个新的值或者CompletableFuture }); // 注意:不要在这里添加Thread.sleep等待,因为thenAccept是异步的 // 可以通过其他方式(如等待另一个Future)来确保主线程不立即退出 // 例如,可以通过获取future的结果来阻塞主线程,但这不是thenAccept的目的 // future.get(); // 这会阻塞主线程直到future完成 }
}
import java.util.concurrent.CompletableFuture; public class CompletableFutureExample { public static void main(String[] args) throws Exception { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 模拟长时间运行的任务 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Long running task finished!"); }); future.thenRun(() -> { System.out.println("ThenRun action executed!"); // 这里不会接收任何参数,因为它不关心前面任务的结果 }); // 等待future完成以确保thenRun能执行 future.get(); // 这会阻塞主线程直到future完成 }
}
thenCompose()
thenCompose()方法用于在原始CompletableFuture完成时,根据其结果启动一个新的CompletableFuture链。这允许你基于原始结果动态地创建新的异步操作。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<String> futureString = future.thenCompose(result -> CompletableFuture.supplyAsync(() -> "The number is " + result * 2)
);
futureString.thenAccept(System.out::println); // Prints: The number is 2
complete():当前阶段异步任务执行完成
complete() 方法是用来手动完成一个 CompletableFuture 实例的,并设置其结果值。当你希望从外部(非异步任务本身)来设置 CompletableFuture 的结果时,这个方法就非常有用了。一旦一个 CompletableFuture 被完成(无论是通过 complete() 还是其他方式),它就不能再被完成第二次,并且任何后续的 complete() 或异常完成调用都会被忽略。
import java.util.concurrent.CompletableFuture; public class CompletableFutureCompleteDirectExample { public static void main(String[] args) { // 创建一个新的CompletableFuture实例 CompletableFuture<String> future = new CompletableFuture<>(); // 直接在主线程中调用complete()方法来完成这个CompletableFuture future.complete("这是直接设置的结果"); // 获取结果,因为complete()已经被调用,所以这里不会阻塞 String result = future.getNow(null); // getNow接受一个默认值,如果CompletableFuture没有完成则返回这个默认值 System.out.println("结果: " + result); // 输出 "结果: 这是直接设置的结果" }
}
链式:
import java.util.concurrent.CompletableFuture; public class CompletableFutureChainExample { public static void main(String[] args) throws Exception { // 假设我们有一个异步操作,它返回一个字符串 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Initial value"; }); // 使用 thenApply 链式操作,对结果进行转换 CompletableFuture<String> future2 = future1.thenApply(s -> s + " transformed"); // 不返回新的CompletableFuture future2.thenAccept(System.out::println); CompletableFuture<String> future3 = future2.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " composed") ); // 等待 future3 完成并打印结果 // 注意:这里我们使用了 future3.get() 来阻塞主线程,以便看到结果 System.out.println(future3.get()); // "Initial value transformed composed" }
}
thenCompose()和thenApplyAsync区别:
处理异步操作链的方式:
thenCompose():该方法用于链接两个异步操作,当第一个操作完成时,将其结果作为参数传递给一个返回CompletableFuture的函数。这使得你可以创建一个平坦的结果链,即只有一个级别的CompletableFuture,而不是嵌套的CompletableFuture<CompletableFuture>。
thenApplyAsync():该方法也用于在异步计算完成后执行一个函数,但它返回的是一个新的CompletableFuture,并且这个函数接收的参数是前一个异步操作的结果。然而,与thenCompose()不同,thenApplyAsync()只是简单地将结果应用于一个函数,并返回一个新的CompletableFuture,而不是再次链接一个异步操作。
返回结果的方式:
thenCompose():返回的是一个新的CompletableFuture,这个新的CompletableFuture是由提供的函数返回的CompletableFuture。这意味着你可以在这个新的CompletableFuture上继续链式调用其他方法。
thenApplyAsync():同样返回一个新的CompletableFuture,但这个新的CompletableFuture是由提供的函数直接返回的结果(经过转换后)所创建的。它没有再次链接一个异步操作,只是简单地对结果进行转换。
用途:
thenCompose():当你想要将多个异步操作链接在一起,并且每个操作都返回一个新的CompletableFuture时,应该使用thenCompose()。这样可以避免嵌套的CompletableFuture结构,使代码更加清晰和易于理解。
thenApplyAsync():当你只需要对前一个异步操作的结果进行简单的转换,并返回一个新的结果时,应该使用thenApplyAsync()。它不会再次链接一个异步操作,只是简单地对结果进行转换。
执行方式:
thenCompose():执行方式取决于你提供的函数内部的操作。如果函数内部的操作是异步的(例如,它返回一个新的CompletableFuture),那么thenCompose()会等待这个异步操作完成后再继续执行后续的操作。
thenApplyAsync():默认情况下,thenApplyAsync()是异步执行的,它会将提供的函数提交到某个执行器(Executor)进行异步处理。但是,你也可以通过提供一个自定义的执行器来控制它的执行方式。
thenCompose()和thenApplyAsync()都是用于处理异步操作链的方法,但它们在处理方式和返回结果的方式上有所不同。thenCompose()更适合用于链接多个异步操作,而thenApplyAsync()则更适合用于对结果进行简单的转换。
CompletableFuture综合使用
需求:
订单上有商品ID,通过商品ID可以查询到商品详细信息,图片信息存储在商品详细信息中。
那就需要查询完订单再查询商品最后查询图片信息,这3个异步任务需要串行执行。
CompletableFuture<Order> orderFuture = fetchOrder(orderId); // 查询订单的异步方法 CompletableFuture<Product> productFuture = orderFuture.thenCompose(order -> { // 使用订单中的商品ID来查询商品信息 List<Long> productIds = order.getProductIds(); // 订单有一个商品ID列表 // 简化只查询一个商品信息Long productId = productIds.get(0); return fetchProduct(productId); // 查询商品信息的异步方法
}); CompletableFuture<Image> imageFuture = productFuture.thenCompose(product -> { // 使用商品信息中的图片ID来查询图片信息 Long imageId = product.getImageId(); return fetchImage(imageId); //查询图片信息的异步方法
}); // 等待图片信息查询完成并处理结果
imageFuture.thenAccept(image -> { System.out.println("Image URL: " + image.getUrl());
}).exceptionally(throwable -> { // 处理异常 throwable.printStackTrace(); return null;
});