我已经写了很多有关InterruptedException和中断线程的文章 。 简而言之,如果您没有Future.cancel()
调用Future.cancel()
那么Future
将终止待处理的get()
,但还将尝试中断基础线程。 这是一个非常重要的功能,可以更好地利用线程池。 我还写信总是比标准Future
更喜欢CompletableFuture
。 事实证明,功能更强大的Future
弟兄没有那么优雅地处理cancel()
。 考虑以下任务,我们稍后将在整个测试中使用以下任务:
class InterruptibleTask implements Runnable {private final CountDownLatch started = new CountDownLatch(1)private final CountDownLatch interrupted = new CountDownLatch(1)@Overridevoid run() {started.countDown()try {Thread.sleep(10_000)} catch (InterruptedException ignored) {interrupted.countDown()}}void blockUntilStarted() {started.await()}void blockUntilInterrupted() {assert interrupted.await(1, TimeUnit.SECONDS)}}
客户端线程可以检查InterruptibleTask
以查看其是否已开始或被中断。 首先,让我们看一下InterruptibleTask
对外部的cancel()
反应:
def "Future is cancelled without exception"() {given:def task = new InterruptibleTask()def future = myThreadPool.submit(task)task.blockUntilStarted()and:future.cancel(true)when:future.get()then:thrown(CancellationException)
}def "CompletableFuture is cancelled via CancellationException"() {given:def task = new InterruptibleTask()def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)task.blockUntilStarted()and:future.cancel(true)when:future.get()then:thrown(CancellationException)
}
到目前为止,一切都很好。 显然, Future
和CompletableFuture
工作方式几乎相同-在取消结果后检索结果会引发CancellationException
。 但是myThreadPool
线程呢? 我以为它会被游泳池打断,从而被回收,我怎么了!
def "should cancel Future"() {given:def task = new InterruptibleTask()def future = myThreadPool.submit(task)task.blockUntilStarted()when:future.cancel(true)then:task.blockUntilInterrupted()
}@Ignore("Fails with CompletableFuture")
def "should cancel CompletableFuture"() {given:def task = new InterruptibleTask()def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)task.blockUntilStarted()when:future.cancel(true)then:task.blockUntilInterrupted()
}
第一个测试将普通的Runnable
提交给ExecutorService
并等待其启动。 稍后,我们取消Future
并等待直到观察到InterruptedException
。 当基础线程被中断时, blockUntilInterrupted()
将返回。 但是,第二次测试失败。 CompletableFuture.cancel()
永远不会中断基础线程,因此尽管Future
看起来好像已被取消,但后备线程仍在运行,并且sleep()
不会抛出InterruptedException
。 错误或功能? 它已记录在案 ,因此很遗憾地提供一个功能:
参数:
mayInterruptIfRunning
–此值在此实现中无效,因为不使用中断来控制处理。
您说RTFM,但是为什么CompletableFuture
这样工作? 首先,让我们研究“旧的” Future
实现与CompletableFuture
有何不同。 从ExecutorService.submit()
返回的FutureTask
具有以下cancel()
实现(我使用类似的非线程安全Java代码删除了Unsafe
,因此仅将其视为伪代码):
public boolean cancel(boolean mayInterruptIfRunning) {if (state != NEW)return false;state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;try {if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final statestate = INTERRUPTED;}}} finally {finishCompletion();}return true;
}
FutureTask
具有一个遵循该状态图的state
变量:
在的情况下, cancel()
我们可以进入CANCELLED
状态或去INTERRUPTED
通过INTERRUPTING
。 核心部分是我们拿runner
线程(如果存在的,也就是说,如果当前正在执行的任务),我们尽量打断它。 该分支负责急切和强制中断已运行的线程。 最后,我们必须通知阻塞的所有线程Future.get()
中finishCompletion()
这里无关紧要)。 因此,很明显,多大的Future
取消已经运行的任务。 那CompletableFuture
呢? cancel()
伪代码:
public boolean cancel(boolean mayInterruptIfRunning) {boolean cancelled = false;if (result == null) {result = new AltResult(new CancellationException());cancelled = true;}postComplete();return cancelled || isCancelled();
}
令人失望的是,我们几乎没有将result
设置为CancellationException
,而忽略了mayInterruptIfRunning
标志。 postComplete()
也有类似的作用finishCompletion()
-在通知未来注册的所有悬而未决的回调。 它的实现相当令人不快(使用非阻塞式Treiber stack ),但是它绝对不会中断任何底层线程。
原因和含义
在CompletableFuture
情况下,有限的cancel()
不是错误,而是设计决定。 CompletableFuture
并非固有地绑定到任何线程,而Future
几乎总是代表后台任务。 从零开始创建CompletableFuture
( new CompletableFuture<>()
)是完美的,其中根本没有要取消的底层线程。 我仍然不禁感到大多数CompletableFuture
都将具有关联的任务和后台线程。 在这种情况下, cancel()
可能会出现故障。 我不再建议用CompletableFuture
盲目地替换Future
,因为它可能会更改依赖cancel()
的应用程序的行为。 这意味着CompletableFuture
故意违反了Liskov替换原则 -这是需要考虑的严重问题。
翻译自: https://www.javacodegeeks.com/2015/03/completablefuture-cant-be-interrupted.html