转载自https://www.cnblogs.com/imyijie/p/4478074.html
Java8 提供了三个我们渴望的重要的功能:Lambdas 、 Stream API、以及接口的默认方法。不过我们很容易滥用它们甚至破坏自己的代码。
今天我们来看看Stream api,尤其是 parallel streams。这篇文章概述了其中的陷阱。
但是首先让我们看看Stream api备受称赞的原因——并行执行。它通过默认的ForkJoinPool,可能提高你的多线程任务的速度。
Parallel Streams 的陷阱
以下是一个使用 parallel streams 完美特性的经典例子。在这个例子中,我们想同时查询多个搜索引擎并且获得第一个返回的结果。
1 public static String query(String question) {2 List<String> engines = new ArrayList<String>() {{3 add("http://www.google.com/?q=");4 add("http://duckduckgo.com/?q=");5 add("http://www.bing.com/search?q=");6 }};7 // get element as soon as it is available8 Optional<String> result = engines.stream().parallel().map((base) -> {9 String url = base + question; 10 // open connection and fetch the result 11 return WS.url(url).get(); 12 }).findAny(); 13 return result.get(); 14 }
是不是很棒?但是让我们细思深挖一下背后发生了什么。Parallel streams 被父线程执行并且使用JVM默认的 fork join pool: ForkJoinPool.common()
.(关于fork join 上面有链接)
然而,这里需要注意的一个重要方面是——查询搜索引擎是一个阻塞操作。所以在某时刻所有线程都会调用get()
方法并且在那里等待结果返回。
等等,这是我们一开始想要的吗?我们是在同一时间等待所有的结果,而不是遍历这个列表按顺序等待每个回答。
然而,由于ForkJoinPool workders的存在,这样平行的等待相对于使用主线程的等待会产生的一种副作用。现在ForkJoin pool 的实现是:它并不会因为产生了新的workers而抵消掉阻塞的workers。那么在某个时间所有ForkJoinPool.common()
的线程都会被用光。
也就是说,下一次你调用这个查询方法,就可能会在一个时间与其他的parallel stream同时运行,而导致第二个任务的性能大大受损。
不过也不要急着去吐槽ForkJoinPool的实现。在不同的情况下你可以给它一个ManagedBlocker实例并且确保它知道在一个阻塞调用中应该什么时候去移除掉卡住的workers。
现在有意思的一点是,在一个parallel stream处理中并不一定是阻塞调用会拖延程序的性能。任何被用于映射在一个集合上的长时间运行的函数都会产生同样的问题。
看下面这个例子
1 long a = IntStream.range(0, 100).mapToLong(x -> { 2 for (int i = 0; i < 100_000_000; i++) { 3 System.out.println("X:" + i); 4 } 5 return x; 6 }).sum();
这段代码同上面那个网络访问的代码遇到了相同的问题。每个lambda的执行并不是瞬间完成的,并且在执行过程中程序中的其他部分将无法访问这些workers。
这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机。
那又怎么样?我不还是我程序的主人
确实,如果你正在写一个其他地方都是单线程的程序并且准确地知道什么时候你应该要使用parallel streams,这样的话你可能会觉得这个问题有一点肤浅。然而,我们很多人是在处理web应用、各种不同的框架以及重量级应用服务。
一个服务器是怎样被设计成一个可以支持多种独立应用的主机的?谁知道呢,给你一个可以并行的却不能控制输入的parallel stream?(offer you a predictable parallel stream performance if it doesn’t control the inputs)
一种方式是限制ForkJoinPool提供的并行数。可以通过使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=1 来限制线程池的大小为1。不再从并行化中得到好处可以杜绝错误的使用它。
另一种方式就是,一个被称为工作区的可以让ForkJoinPool平行放置的 parallelStream()
实现。不幸的是现在的JDK还没有实现。
总结
Parallel streams 是无法预测的,而且想要正确地使用它有些棘手。几乎任何parallel streams的使用都会影响程序中无关部分的性能,而且是一种无法预测的方式。我毫不怀疑有人能够设法去正确有效地使用它们。但是在打出stream.parallel()在我的代码里之前我仍然会仔细思考并且再三地审阅包含它的所有代码。