java设置并行度
在掌握了这些新功能之后,随着Java 9的最新发布,我们有了许多新功能可以用来改进我们的解决方案。 Java 9的发布也是修改我们是否掌握Java 8功能的好时机。
在这篇文章中,我想解决关于Java并行流的最常见的误解。 人们通常说您不能以编程方式控制并行流的并行度,并行流始终在共享的ForkJoinPool.commonPool()上运行,您对此无能为力。 如果仅通过将parallel()调用添加到调用链来使流并行,就属于这种情况。 在某些情况下,这可能就足够了,例如,如果您仅对该流执行轻量级操作,但是,如果您需要对流的并行执行获得更多控制,则您需要做的不仅仅是调用parallel()。
让我们直接跳入自记录示例,而不是深入研究理论和技术。
在共享的ForkJoinPool.commonPool()上处理并行流:
Set<FormattedMessage> formatMessages(Set<RawMessage> messages) {return messages.stream().parallel().map(MessageFormatter::format).collect(toSet());
}
让我们将并行处理移到我们可以控制且不必共享的池中:
private static final int PARALLELISM_LEVEL = 8;Set<FormattedMessage> formatMessages(Set<RawMessage> messages) {ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_LEVEL);try {return forkJoinPool.submit(() -> formatMessagesInParallel(messages)).get();} catch (InterruptedException | ExecutionException e) {// handle exceptions} finally {forkJoinPool.shutdown();}
}private Set<FormattedMessage> formatMessagesInParallel(Set<RawMessage> messages) {return messages.stream().parallel().map(MessageFormatter::format).collect(toSet());
}
在此示例中,我们仅对ForkJoinPool的并行性级别感兴趣,尽管如果需要,我们也可以控制ThreadFactory和UncaughtExceptionHandler。
ForkJoinPool调度程序将在后台进行所有工作,包括合并工作窃取算法以提高并行处理效率。 值得一提的是,在某些情况下,例如,如果工作负载均匀地分布在工作线程上,使用ThreadPoolExecutor进行手动处理可能会更高效。
翻译自: https://www.javacodegeeks.com/2017/11/controlling-parallelism-level-java-parallel-streams.html
java设置并行度