目录
- 1. 什么是 parallelStream?
- 2. parallelStream 的优势
- 3. parallelStream 的使用
- 3.1 基本使用
- 3.2 计算总和示例
- 3.3 结合groupingByConcurrent实现线程安全的分组操作
- 4. parallelStream 的注意事项
- 4.1 适用场景
- 4.2 并行流的局限性
- 5. 控制并行流线程数
- 6. 总结
1. 什么是 parallelStream?
在 Java 8 中,Stream API
提供了 stream()
和 parallelStream()
两种流处理方式。
stream()
采用顺序流处理数据,每个元素按照流的顺序依次处理。parallelStream()
采用并行流处理数据,利用ForkJoinPool
并发执行,提高处理效率。
2. parallelStream 的优势
- 充分利用多核 CPU:并行流会将任务拆分为多个子任务,并分配到多个 CPU 核心执行。
- 提升处理速度:适用于计算密集型任务,能明显提高大数据量的处理效率。
- API 友好:
parallelStream()
的使用方式与stream()
类似,开发成本较低。
3. parallelStream 的使用
3.1 基本使用
import java.util.Arrays;
import java.util.List;public class ParallelStreamExample {public static void main(String[] args) {List<String> list = Arrays.asList("A", "B", "C", "D", "E", "F");// 使用并行流处理数据list.parallelStream().forEach(System.out::println);}
}
注意:并行流的输出顺序是无序的,因为多个线程并发执行。
3.2 计算总和示例
import java.util.stream.IntStream;public class ParallelSum {public static void main(String[] args) {int sum = IntStream.rangeClosed(1, 100).parallel().sum();System.out.println("Sum: " + sum);}
}
3.3 结合groupingByConcurrent实现线程安全的分组操作
- 使用parallelStream将分组过程并行化提升效率,使用
groupingByConcurrent
及ConcurrentMap
避免出现线程安全的问题
Map<String, List<CourseGrade>> collect = courseGrades.parallelStream().collect(Collectors.groupingByConcurrent(CourseGrade::getYear));
4. parallelStream 的注意事项
4.1 适用场景
适用场景 | 不适用场景 |
---|---|
数据量大 | 数据量小 |
计算密集型任务 | IO 密集型任务 |
不关心执行顺序 | 需要严格顺序执行 |
需要提升性能 | 线程切换成本高 |
4.2 并行流的局限性
- 线程开销:并行流会使用
ForkJoinPool
线程池,线程的创建和切换会带来一定的开销。 - 数据竞争:如果流操作涉及共享变量,可能会出现线程安全问题。
- 不适合小数据量:并行执行存在一定的启动成本,数据量太小可能会降低性能。
5. 控制并行流线程数
默认情况下,parallelStream()
使用 ForkJoinPool
的公共线程池,线程数与 CPU 核心数相同。
如果需要自定义线程池大小,可以使用以下方式:
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;public class CustomParallelStream {public static void main(String[] args) {ForkJoinPool customPool = new ForkJoinPool(4); // 设置 4 个线程customPool.submit(() ->System.out.println(IntStream.range(1, 100).parallel().sum())).join();customPool.shutdown();}
}
6. 总结
parallelStream()
能有效提高大数据量的处理效率。- 适用于计算密集型任务,但可能不适用于 IO 密集型任务。
- 默认使用
ForkJoinPool
,可以自定义线程池大小。 - 需谨慎使用,避免线程安全问题和不必要的开销。