Java 通过引入结构化并发 API 简化并发编程。结构化并发将在不同线程中运行的相关任务组视为单一工作单元,从而简化错误处理和取消操作、提高可靠性并增强可见性
结构化并发由 JEP 428 提出,并在 JDK 19 作为孵化API。它由 JEP 437 在 JDK 20 中重新孵化,并带有一个继承
Scoped Values
的版本更新 (JEP 429)
非结构化并发 ExecutorService
java.util.concurrent.ExecutorService API 是在 Java 5 中引入的,可帮助开发人员同时执行子任务
在下列代码中,对于两个无关联性的IO操作
(如RPC、MySQL调用),通常使用线程池来并发执行findUser()
和fetchOrder()
两个子任务
Response handle() throws ExecutionException, InterruptedException {Future<String> user = esvc.submit(() -> findUser());Future<Integer> order = esvc.submit(() -> fetchOrder());String theUser = user.get(); // Join findUserint theOrder = order.get(); // Join fetchOrderreturn new Response(theUser, theOrder);
}
- 如果
findUser()
抛出异常,那么handle()
在调用user.get()
时会抛出异常,但是fetchOrder()
将继续在自己的线程中运行。这称为线程泄漏 - 如果
handle()
被中断,findUser()
和fetchOrder()
将继续执行,会导致线程泄露 - 如果
findUser()
需要很长时间执行,但fetchOrder()
失败了,handle()
主线程需要block在user.get()
后才能感知异常
这不仅会导致产生错误的可能性更大,而且会使诊断和排除此类错误变得更加困难。例如,线程转储等可视化工具无法对任务之间的堆栈进行关联和追踪
ExecutorService 和 Future 允许这种非结构化使用,所以它们不会强制执行甚至跟踪任务和子任务之间的关系,即使这种关系很常见并且有用
结构化并发
结构化并发(Structured Concurrency)是一种并发编程模型,旨在简化和规范并发代码的编写和管理。它提供了一种结构化的方式来组织和控制并发任务的执行,以减少常见的并发编程问题,如资源泄漏、死锁和竞态条件等
结构化并发的核心思想是将并发任务组织为层次结构,并通过一些基本原则来管理它们的执行:
- 嵌套关系:并发任务可以嵌套在其他任务中,形成层次结构。父任务可以控制子任务的执行,并等待子任务完成。
- 继承关系:子任务的生命周期受父任务的控制。当父任务完成或取消时,它会自动取消所有的子任务。
- 顺序执行:任务按照顺序执行,一个任务的完成是下一个任务的前提条件。这样可以确保任务之间的依赖关系和顺序执行的一致性。
- 取消机制:任务可以被取消,取消操作会向下传递,取消所有嵌套的子任务。
结构化并发模型的一个重要特性是异常传播。当一个任务抛出异常时,它会被传播到父任务,并取消整个任务层次结构的执行。这有助于避免未处理的异常导致的问题,并提供了一种可靠的错误处理机制
StructuredTaskScope
与ForkJoinPool
不同,ForkJoinPool
是为计算密集型任务设置的,StructuredTaskScope
默认使用虚线程,主要面向I/O密集型
计算密集型 ForkJoinPool
ForkJoinPool提供了一种结构化并发的机制,用于高效地执行并行任务,并在Java中广泛用于处理递归和分治算法等需要并行处理的场景。
ForkJoinPool适用于计算密集型任务
ForkJoinPool 是 Java 标准库中提供的一个用于并行执行任务的线程池实现。它基于工作窃取(work-stealing)算法,其中线程可以从其他线程的任务队列中窃取任务来执行
在 ForkJoinPool 中,任务被划分为更小的子任务,然后递归地执行这些子任务,直到达到某个终止条件。具体流程可见 parallelStream/ForkJoinPool 详解
IO密集型 StructuredTaskScope
StructuredTaskScope 允许开发人员将任务构建为一系列并发子任务,并将它们作为一个整体进行协调。子任务的成功的结果或异常由父任务聚合和处理
StructuredTaskScope适用于IO密集型任务
这是之前使用线程池的handle()
示例,改为使用StructuredTaskScope
而编写的
Response handle() throws ExecutionException, InterruptedException {try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {Supplier<String> user = scope.fork(() -> findUser());Supplier<Integer> order = scope.fork(() -> fetchOrder());scope.join() // Join both subtasks.throwIfFailed(); // ... and propagate errors// Here, both subtasks have succeeded, so compose their resultsreturn new Response(user.get(), order.get());}
}
与原始示例相比,理解涉及的线程的生命周期很容易:在所有情况下,它们的生命周期都限定在一个词法范围内,即try-with-resources
语句的主体部分。此外,使用StructuredTaskScope
确保了许多有价值的属性:
- 错误处理与短路: 如果任务中的任一子任务失败,如果另一任务尚未完成,则将其取消。(由
ShutdownOnFailure
实现的关闭策略来管理) - 取消传播: 如果运行
handle()
的线程在调用join()
之前或期间被中断,当线程退出作用域时,两个子任务将自动取消 - 清晰度: 上述代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功还是失败
- 可见性: 线程转储(thread dump)清楚地显示了任务层次结构,其中运行子任务的线程显示为作用域的子级
StructuredTaskScope 是一个预览版 API,默认禁用。要启用需指定JVM参数
--enable-preview
参考资料:
- JEP 453: Structured Concurrency (Preview)
- parallelStream/ForkJoinPool 详解