Java8实战-总结44
- CompletableFuture:组合式异步编程
- Future 接口
- Future 接口的局限性
- 使用 CompletableFuture 构建异步应用
CompletableFuture:组合式异步编程
最近这些年,两种趋势不断地推动我们反思我们设计软件的方式。第一种趋势和应用运行的硬件平台相关,第二种趋势与应用程序的架构相关,尤其是它们之间如何交互。随着多核处理器的出现,提升应用程序处理速度最有效的方式是编写能充分发挥多核能力的软件。通过切分大型的任务,让每个子任务并行运行,这一目标是能够实现的;相对直接使用线程的方式,使用分支/合并框架(在Java 7
中引入)和并行流(在Java 8
中新引入)能以更简单、更有效的方式实现这一目标。
第二种趋势反映在公共API
日益增长的互联网服务应用。著名的互联网大鳄们纷纷提供了自己的公共API
服务,比如谷歌提供了地理信息服务,Facebook
提供了社交信息服务,Twitter
提供了新闻服务。现在,很少有网站或者网络应用会以完全隔离的方式工作。更多的时候,下一代网络应用都采用“混聚”(mash-up)的方式:它会使用来自多个来源的内容,将这些内容聚合在一起,方便用户的生活。
比如,你可能希望为你的法国客户提供指定主题的热点报道。为实现这一功能,你需要向谷歌或者Twitter
的API
请求所有语言中针对该主题最热门的评论,可能还需要依据你的内部算法对它们的相关性进行排序。之后,可能还需要使用谷歌的翻译服务把它们翻译成法语,甚至利用谷歌地图服务定位出评论作者的位置信息,最终将所有这些信息聚集起来,呈现在你的网站上。
当然,如果某些外部网络服务发生响应慢的情况,你希望依旧能为用户提供部分信息,比如提供带问号标记的通用地图,以文本的方式显示信息,而不是呆呆地显示一片空白屏幕,直到地图服务器返回结果或者超时退出。下图解释了这种典型的“混聚”应用如何与所需的远程服务交互。
要实现类似的服务,你需要与互联网上的多个Web
服务通信。可是,你并不希望因为等待某些服务的响应,阻塞应用程序的运行,浪费数十亿宝贵的CPU
时钟周期。比如,不要因为等待Facebook
的数据,暂停对来自Twitter
的数据处理。
这些场景体现了多任务程序设计的另一面。前面介绍的分支/合并框架以及并行流是实现并行处理的宝贵工具;它们将一个操作切分为多个子操作,在多个不同的核、CPU甚至是机器上并行地执行这些子操作。
与此相反,如果你的意图是实现并发,而非并行,或者你的主要目标是在同一个CPU
上执行几个松耦合的任务,充分利用CPU
的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想做的是避免因为等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,因为这种等待的时间很可能相当长。Future
接口,尤其是它的新版实现CompletableFuture
,是处理这种情况的利器。下图说明了并行和并发的区别。
Future 接口
Future
接口在Java 5
中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future
中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。打个比方,你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你什么时候你的衣服会洗好(这就是一个Future
事件)。衣服干洗的同时,你可以去做其他的事情。Future
的另一个优点是它比更底层的Thread
更易用。要使用Future
,通常你只需要将耗时的操作封装在一个Callable
对象中,再将它提交给ExecutorService
,就万事大吉了。下面这段代码展示了Java 8
之前使用Future
的一个例子。
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() { public Double call() {return doSomeLongComputation(); }});
doSomethingElse(); try { Double result = future.get(1, TimeUnit.SECONDS); //获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
} catch (ExecutionException ee) {// 计算抛出一个异常
} catch (InterruptedException ie) { // 当前线程在等待过程中被中断
} catch (TimeoutException te) { // 在Future对象完成之前超过已过期
}
正像上图介绍的那样,这种编程方式让你的线程可以在ExecutorService
以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。接着,如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get
方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。
这种场景存在怎样的问题?如果该长时间运行的操作永远不返回了会怎样?为了处理这种可能性,虽然Future
提供了一个无需任何参数的get
方法,还是推荐大家使用重载版本的get
方法,它接受一个超时的参数,通过它,可以定义线程等待Future
结果的最长时间,而不是像上述代码中那样永无止境地等待下去。
Future 接口的局限性
通过第一个例子,我们知道Future
接口提供了方法来检测异步计算是否已经结束(使用isDone
方法),等待异步操作结束,以及获取计算的结果。但是这些特性还不足以让你编写简洁的并发代码。比如,很难表述Future
结果之间的依赖性;从文字描述上这很简单,“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。但是,使用Future
中提供的方法完成这样的操作又是另外一回事。这也是需要更具描述能力的特性的原因,比如下面这些:
- 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
- 等待
Future
集合中的所有任务都完成。 - 仅等待
Future
集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。 - 通过编程方式完成一个
Future
任务的执行(即以手工设定异步操作结果的方式)。 - 应对
Future
的完成事件(即当Future
的完成事件发生时会收到通知,并能使用Future
计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
使用 CompletableFuture 构建异步应用
为了展示CompletableFuture
的强大特性,创建一个名为“最佳价格查询器”(best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程中,你会学到几个重要的技能:
-
首先,你会学到如何为你的客户提供异步
API
(如果你拥有一间在线商店的话,这是非常有帮助的)。 -
其次,你会掌握如何让你使用了同步
API
的代码变为非阻塞代码。你会了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。这种情况肯定会出现,比如,在线商店返回了你想要购买商品的原始价格,并附带着一个折扣代码——最终,要计算出该商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。 -
你还会学到如何以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格(这种方式存在着一定的风险,一旦某家商店的服务中断,用户可能遭遇白屏)。
同步API与异步API 同步API其实只是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。与此相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是通过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。这种方式的计算在I/O系统程序设计中非常常见:你发起了一次磁盘访问,这次访问和你的其他计算操作是异步的,你完成其他的任务时,磁盘块的数据可能还没载入到内存,你只需要等待数据的载入完成。