第十二章 并行Stream流

目录

一、引言

二、获取并行Stream流的两种方式

三、并行和串行Stream流的效率对比

四、parallelStream线程安全问题

五、parallelStream背后的技术

5.1. Fork/Join框架介绍

5.2. Fork/Join原理-分治法

5.3. Fork/Join原理-工作窃取算法

5.4. Fork/Join案例


一、引言

串行的Stream流:前面章节我们使用的Stream流是串行的,就是在一个线程上执行。运行上图中的串行流,我们查看下图中的运行结果,可以看到运行的都是同一个主线程:那么JDK 8中的并行的Stream流,即parallelStream,其实就是一个并行执行的流。它通过默认的ForkJoinPool,可能提高多线程任务的速度。

二、获取并行Stream流的两种方式

1. 直接获取并行的流

2. 将串行流转成并行流 并行操作代码:

运行结果:

三、并行和串行Stream流的效率对比

使用for循环,串行Stream流,并行Stream流来对5亿个数字求和。看消耗的时间。

import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.stream.LongStream;public class StreamTest {private static long times = 500000000L;private long start;@Beforepublic void init() {start = System.currentTimeMillis();}@Afterpublic void destroy() {long end = System.currentTimeMillis();System.out.println("消耗时间:" + (end - start));}// 测试效率,parallelStream 120@Testpublic void parallelStream() {System.out.println("serialStream");LongStream.rangeClosed(0, times).parallel().reduce(0, Long::sum);}// 测试效率,普通Stream 342@Testpublic void serialStream() {System.out.println("serialStream");LongStream.rangeClosed(0, times).reduce(0, Long::sum);}// 测试效率,正常for循环 421@Testpublic void forAdd() {System.out.println("forAdd");long result = 0L;for (long i = 1L; i < times; i++) {result += i;}}
}

我们可以看到parallelStream的效率是最高的,Stream并行处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作。

四、parallelStream线程安全问题

并行流为我们带来高性能的同时,也带来了线程安全性问题:

运行效果:

我们可以看到,往集合中添加1000个元素,而实际上只有917个元素,这就是并发处理时线程安全问题导致的。

解决方法: 加锁、使用线程安全的集合或者调用StreamtoArray() / collect() 操作就是满足线程安全的了。

package com.wzx;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;public class Test {public static void main(String[] args) {ArrayList<Integer> list = new ArrayList<>();for (int i = 0; i < 1000; i++) {list.add(i);}// 解决parallelStream线程安全问题方案一:使用同步代码块// Object obj = new Object();// List<Integer> newList = new ArrayList<>();// 使用并行的流往集合中添加数据// list.parallelStream()//     .forEach(s -> {//        synchronized (obj) {//            newList.add(s);//        }//    });// 解决parallelStream线程安全问题方案二:使用线程安全的集合// Vector<Integer> newList = new Vector<>();// 1. 使用并行的流往集合中添加数据// list.parallelStream()//    .forEach(s -> {//        newList.add(s);//    });// 2. 使用Collections.synchronizedList()// List<Integer> newList = new ArrayList<>();// List<Integer> synchronizedList = Collections.synchronizedList(newList);// 使用并行的流往集合中添加数据// list.parallelStream()//        .forEach(s -> {//            synchronizedList.add(s);//        });// 解决parallelStream线程安全问题方案三:调用Stream流的collect/toArrayList<Integer> newList = IntStream.rangeClosed(1, 1000).parallel().boxed().collect(Collectors.toList());System.out.println("newList = " + newList.size());}
}

五、parallelStream背后的技术

5.1. Fork/Join框架介绍

parallelStream使用的是Fork/Join框架。Fork/Join框架自JDK 7引入。Fork/Join框架可以将一个大任务拆分为很多小任务来异步执行。 Fork/Join框架主要包含三个模块:

1. 线程池:ForkJoinPool

2. 任务对象:ForkJoinTask

3. 执行任务的线程:ForkJoinWorkerThread

5.2. Fork/Join原理-分治法

ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

5.3. Fork/Join原理-工作窃取算法

Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念,Fork/Join工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

上文中已经提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,也就是我们使用了ForkJoinPoolParallelStream

对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。可以通过设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism=N N为线程数量),来调整ForkJoinPool的线程数量,可以尝试调整成不同的参数来观察每次的输出结果。

5.4. Fork/Join案例

需求:使用Fork/Join计算1-10000的和,当一个任务的计算数量大于3000时拆分任务,数量小于3000时计算。

package com.wzx;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class Test {public static void main(String[] args) {long start = System.currentTimeMillis();ForkJoinPool pool = new ForkJoinPool();SumRecursiveTask task = new SumRecursiveTask(1, 10000L);Long result = pool.invoke(task);System.out.println("result = " + result);long end = System.currentTimeMillis();System.out.println("消耗的时间为: " + (end - start));}
}
class SumRecursiveTask extends RecursiveTask<Long> {private static final long THRESHOLD = 3000L;private final long start;private final long end;public SumRecursiveTask(long start, long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {long length = end - start;// 任务不用再拆分了.可以计算了if (length <= THRESHOLD) {long sum = 0;for (long i = start; i <= end; i++) {sum += i;}System.out.println("计算: " + start + " -> " + end + ",结果为: " + sum);return sum;// 数量大于预定的数量,任务还需要再拆分} else {long middle = (start + end) / 2;System.out.println("拆分: 左边 " + start + " -> " + middle + ", 右边 " + (middle +1) + " -> " + end);SumRecursiveTask left = new SumRecursiveTask(start, middle);left.fork();SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);right.fork();return left.join() + right.join();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/61230.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

gitlab cicd搭建及使用笔记(二)

cicd之gitlab-runner使用要点 官方链接&#xff1a;https://docs.gitlab.com/runner/ 附历史文章链接 https://blog.csdn.net/qq_42936727/article/details/143624523?spm1001.2014.3001.5501 gitlab-runner常用命令及解释 gitlab-runner verify 容器内&#xff0c;检查注…

2411rust,1.81,1.82

1.81.0稳定版 core::error::错误 1.81稳定了核心中的Error特征,允许在#![no_std]库中使用特征.这样在相同错误特征上,可标准化更广泛的Rust生态系统,而不管库的环境. 新的排序实现 都已按新算法更新了标准库中的稳定和不稳定排序实现,从而改进了它们的运行时性能和编译时间…

【EasyExcel】复杂导出操作-自定义颜色样式等(版本3.1.x)

文章目录 前言一、自定义拦截器二、自定义操作1.自定义颜色1.1.样式未生效原因&#xff1a;1.2.解决方法&#xff1a; 2.合并单元格 三、复杂操作示例1.实体(使用了注解式样式)&#xff1a;2.自定义拦截器3.代码4.最终效果 前言 本文简单介绍阿里的EasyExcel的复杂导出操作&…

Excel单元格中自适应填充多图

实例需求&#xff1a;在Excel插入图片时&#xff0c;由于图片尺寸各不相同&#xff0c;如果希望多个图片填充指定单元格&#xff0c;依靠用户手工调整&#xff0c;不仅费时费力&#xff0c;而且很难实现完全填充。如下图中的产品图册&#xff0c;有三个图片&#xff0c;如下图所…

SQL面试题——间隔连续问题

间隔连续问题 某游戏公司记录的用户每日登录数据如下 +----+----------+ | id| date| +----+----------+ |1001|2021-12-12| |1001|2021-12-13| |1001|2021-12-14| |1001|2021-12-16| |1001|2021-12-19| |1001|2021-12-20| |1002|2021-12-12| |1002|2021-12-16| |1002|…

【C++笔记】vector使用详解及模拟实现

前言 各位读者朋友们&#xff0c;大家好&#xff01;上期我们讲了string类的模拟实现&#xff0c;这期我们开启vector的讲解。 一.vector的介绍及使用 1.1 vector的介绍 vector的文档 使用STL的三个境界&#xff1a;能用、明理、能扩展&#xff0c;下面学习vector&#xff…

LLM评测范式与方法

文章目录 基础大语言模型的评测微调大语言模型的评测不同评测方法的利弊为了有效地评估大语言模型的性能,一种主流的途径就是选择不同的能力维度并且构建对应的评测任务,进而使用这些能力维度的评测任务对模型的性能进行测试与对比。可供选择的能力维度包括但不限于本书所介绍…

3D Gaussian Splatting的全面理解

1.概述 高斯泼溅是一种表示 3D 场景和渲染新视图的方法,在“用于实时辐射场渲染的 3D 高斯泼溅3d-gaussian-splatting”这篇论文中被首先提出。它可以被认为是类似 NeRF模型型的替代品,就像过去的 NeRF 一样,高斯泼溅衍生出了许多新的研究工作,研究人员选择将其用作各种用…

Layui的select控件的onchange事件 无效的解决方法

举例&#xff1a; <select id"UserID" class"my-css" lay-filter"onchange"><option value"">请选择</option><option value"117">张三</option><option value"92">李四<…

《生成式 AI》课程 第3講 CODE TASK 任务3:自定义任务的机器人

课程 《生成式 AI》课程 第3講&#xff1a;訓練不了人工智慧嗎&#xff1f;你可以訓練你自己-CSDN博客 我们希望你创建一个定制的服务机器人。 您可以想出任何您希望机器人执行的任务&#xff0c;例如&#xff0c;一个可以解决简单的数学问题的机器人0 一个机器人&#xff0c…

vue包含二维码、背景图片、Logo图片和文本说明的图片生成及下载功能

要使用npm安装vue-qr和html2canvas这两个库 npm install vue-qr html2canvas 完整代码 可以根据实际项目需求调整&#xff0c;改成调用接口展示 <template><div><div ref"qrContainer" class"qr-container"><img class"back…

使用ajax-hook修改http请求响应数据,篡改后再返回给正常的程序

import { proxy } from "ajax-hook";//正经的项目这样用 proxy({ //代理response&#xff0c; onResponse: (response, handler) > { console.log(response.config.url)//这里判断是不是自己想要监听的url console.log(response.response)//这里查看响应数据 //r…

SpringBoot服务多环境配置

一个项目的的环境一般有三个&#xff1a;开发(dev)、测试(test)、生产(proc)&#xff0c;一般对应三套环境&#xff0c;三套配置文件。 像下面这样直接写两个配置文件是不行的。 application.ymlserver:port: 8080application-dev.ymlspring:datasource:driver-class-name: co…

Oracle ADB 导入 BANK_GRAPH 的学习数据

Oracle ADB 导入 BANK_GRAPH 的学习数据 1. 下载数据2. 导入数据运行 setconstraints.sql 1. 下载数据 访问 https://github.com/oracle-quickstart/oci-arch-graph/tree/main/terraform/scripts&#xff0c;下载&#xff0c; bank_accounts.csvbank_txns.csvsetconstraints.…

html数据类型

数据类型是字面含义&#xff0c;表示各种数据的类型。在任何语言中都存在数据类型&#xff0c;因为数据是各式各样。 1.数值类型 number let a 1; let num 1.1; // 整数小数都是数字值 ​ // 数字肯定有个范围 正无穷大和负无穷大 // Infinity 正无穷大 // -Infinity 负…

问:Spring MVC DispatcherServlet流程步骤梳理

DispatcherServlet是Spring MVC框架中的核心组件&#xff0c;负责接收客户端请求并将其分发到相应的控制器进行处理。作为前端控制器&#xff08;Front Controller&#xff09;的实现&#xff0c;DispatcherServlet在整个请求处理流程中扮演着至关重要的角色。本文将探讨Dispat…

【Jmeter相关】

Jmeter 可以作为接口测试问题&#xff0c;也会涉及到性能相关的问题 一、JMeter中用户定义的变量(User Defined Variables&#xff09;和用户参 数&#xff08;User Parameters&#xff09;的区别是什么? 在JMeter中都是用于定义和存储测试数据的方法&#xff0c;但它们有一…

【GNU】gcc -O编译选项 -Og -O0 -O1 -O2 -O3 -Os

1、gcc -O的作用 GCC 提供的 -O 系列选项用于优化代码。这些选项可以控制编译器对代码进行优化的程度和类型&#xff0c;从而提高代码的性能、减小代码体积或优化其他特性。 2、gcc -Og -O0 -O1 -O2 -O3 -Os 2.1 gcc -Og 启用调试友好的优化&#xff0c;平衡调试器功能与性…

基于深度学习的文本信息提取方法研究(pytorch python textcnn框架)

&#x1f497;博主介绍&#x1f497;&#xff1a;✌在职Java研发工程师、专注于程序设计、源码分享、技术交流、专注于Java技术领域和毕业设计✌ 温馨提示&#xff1a;文末有 CSDN 平台官方提供的老师 Wechat / QQ 名片 :) Java精品实战案例《700套》 2025最新毕业设计选题推荐…

leetcode400第N位数字

代码 class Solution {public int findNthDigit(int n) {int base 1;//位数int weight 9;//权重while(n>(long)base*weight){//300n-base*weight;base;weight*10;}//n111 base3 weight900;n--;int res (int)Math.pow(10,base-1)n/base;int index n%base;return String…