浅谈 parallelStream和Stream 源码及其应用场景

上篇讲述了list.forEach()和list.stream().forEach() 异同点
谈到了并行流的概念,本篇则从源码出发,了解一下其原理。

一、流的初始操作流程

jdk8中 将Collection中加入了转换流的概念。

default Stream<E> stream() {return StreamSupport.stream(spliterator(), false);}default Stream<E> parallelStream() {return StreamSupport.stream(spliterator(), true);}

在这里插入图片描述
目前看到的两者是一个参数的区别。

//boolean parallel 是否为并行流
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {//用于检查传入的spliterator是否为空Objects.requireNonNull(spliterator);//ReferencePipeline.Head 表示流的开始,根据spliterator以及parallel创建对应的流操作链return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);}
//该构造方法用于初始化Head类的实例
Head(Spliterator<?> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}
ReferencePipeline(Spliterator<?> source,int sourceFlags, boolean parallel) {//super关键字调用父类的构造方法,完成对父类的初始化工作super(source, sourceFlags, parallel);}
//按照给定的参数初始化AbstractPipeline类的实例
AbstractPipeline(Spliterator<?> source,int sourceFlags, boolean parallel) {this.previousStage = null;this.sourceSpliterator = source;  this.sourceStage = this;  //当前阶段作为源操作this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;// The following is an optimization of:// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;  //左移一位取反操作this.depth = 0;this.parallel = parallel; //当前流水线是否支持并行操作}

二、forEach操作

在这里插入图片描述

@Overridepublic void forEach(Consumer<? super E_OUT> action) {if (!isParallel()) {//串行流执行sourceStageSpliterator().forEachRemaining(action);}else {//并行流执行super.forEach(action);}}

在这里插入图片描述

1)串行流

demo debug add操作,看为何会报错?

public static void main(String[] args) {List<String> list= new ArrayList<>();list.add("Sunday");list.add("Monday");list.add("Tuesday");list.add("Wednesday");list.add("Thursday");list.add("Friday");list.add("Saturday");list.stream().forEach(d->{System.out.println("value="+d);if (d.equals("Thursday")){list.add(d);}});}
//如果此管道截断是源阶段,则获取源阶段拆分器。调用此方法并成功返回后,将消耗管道
final Spliterator<E_OUT> sourceStageSpliterator() {if (this != sourceStage)throw new IllegalStateException();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;if (sourceStage.sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;return s;}else if (sourceStage.sourceSupplier != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;return s;}else {throw new IllegalStateException(MSG_CONSUMED);}}

forEachRemaining实现方法
在这里插入图片描述
在这里插入图片描述
return未走,直接走了异常返回 throw new ConcurrentModificationException();

public void forEachRemaining(Consumer<? super E> action) {int i, hi, mc; // hoist accesses and checks from loopArrayList<E> lst; Object[] a;if (action == null)throw new NullPointerException();if ((lst = list) != null && (a = lst.elementData) != null) {if ((hi = fence) < 0) {mc = lst.modCount;hi = lst.size;}elsemc = expectedModCount;//i表示开始迭代的位置//i=index index表示上次迭代的位置,将上次迭代器正在迭代的位置复制给i//(i=index)>=0 保证当前迭代的下标大于等于0//表示最大迭代到hi,设置最大的hi=a.length//(index = hi) <= a.length保证数组不跨界if ((i = index) >= 0 && (index = hi) <= a.length) {for (; i < hi; ++i) {@SuppressWarnings("unchecked") E e = (E) a[i];//执行具体的迭代action.accept(e);}if (lst.modCount == mc)return;}}throw new ConcurrentModificationException();}

2)并行流

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

@Overridepublic void forEach(Consumer<? super P_OUT> action) {evaluate(ForEachOps.makeRef(action, false));}
//构造一个 {@code TerminalOp},用于对流的每个元素执行操作。
//action,接收流所有元素的 {@code Consumer} 
//ordered,是否请求有序遍历,因为是并行流,所以ordered未false
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,boolean ordered) {Objects.requireNonNull(action);return new ForEachOp.OfRef<>(action, ordered);}
//使用终端操作评估管道以产生结果。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}

三、核心原理-ForkJoinPool

其核心原理则-ForkJoinPool

1)Diagrams

在这里插入图片描述

2)compute()

ForkJoin进行计算任务时,计算类是要继承ForkJoinTask并且重写compute方法的。
我们看一下ForkJoinTask内部类是如何重写compute()方法的。

//类似于 AbstractTask,但不需要跟踪子任务
public void compute() {Spliterator<S> rightSplit = spliterator, leftSplit;//先调用当前splititerator 方法的estimateSize 方法,预估这个分片中的数据量long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;if ((sizeThreshold = targetSize) == 0L)targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());boolean forkRight = false;Sink<S> taskSink = sink;ForEachTask<S, T> task = this;while (!isShortCircuit || !taskSink.cancellationRequested()) {if (sizeEstimate <= sizeThreshold ||(leftSplit = rightSplit.trySplit()) == null) {task.helper.copyInto(taskSink, rightSplit);break;}ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);task.addToPendingCount(1);ForEachTask<S, T> taskToFork;if (forkRight) {forkRight = false;rightSplit = leftSplit;taskToFork = task;task = leftTask;}else {forkRight = true;taskToFork = leftTask;}//根据预估的数据量获取最小处理单元的大小阈值,即当数据量已经小于这个阈值的时候进行计算,否则进行fork.fork方法中调用了ForkJoinPool线程池并行计算taskToFork.fork();//将任务划分成更小的数据块,进行求解sizeEstimate = rightSplit.estimateSize();}task.spliterator = null;task.propagateCompletion();}

重写的compute()方法,当进行fork方法时,实际就是调用了ForkJoinPool线程池进行计算了,那么线程池本身是无顺序的,谁先计算完谁展示。

3)ForkJoinPool核心算法

在这里插入图片描述

“工作窃取”(work-stealing)算法

ForkJoinPool的基本原理是基于“工作窃取”(work-stealing)算法。它维护着一个工作队列(WorkQueue)的数组,每个工作队列对应一个工作线程(WorkerThread)。当一个线程需要执行一个任务时,它会将任务添加到自己的工作队列中。当一个线程的工作队列为空时,它会从其他线程的工作队列中“窃取”一个任务来执行。这个“窃取”操作可以在不同的线程间实现任务的负载均衡。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

“分治法”(Divide-and-Conquer Algorithm)

分治法-典型的应用比如快速排序算法。使用分治法来实现任务的并行执行。分治法是一种将大问题划分成小问题,并通过递归地解决小问题来解决大问题的方法。

四、结果无顺序

在这里插入图片描述

1)若想并行且有顺序,用.forEachOrdered替代

在这里插入图片描述

2).forEachOrdered是如何保证有序的?

private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {Spliterator<S> rightSplit = task.spliterator, leftSplit;long sizeThreshold = task.targetSize;boolean forkRight = false;while (rightSplit.estimateSize() > sizeThreshold &&(leftSplit = rightSplit.trySplit()) != null) {ForEachOrderedTask<S, T> leftChild =new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);ForEachOrderedTask<S, T> rightChild =new ForEachOrderedTask<>(task, rightSplit, leftChild);// 分叉父任务 完成左右子项 “发生在”父项完成之前task.addToPendingCount(1);// 完成左边的孩子“发生在”完成右边的孩子之前rightChild.addToPendingCount(1);task.completionMap.put(leftChild, rightChild);// If task is not on the left spineif (task.leftPredecessor != null) {/** Completion of left-predecessor, or left subtree,* "happens-before" completion of left-most leaf node of* right subtree.* The left child's pending count needs to be updated before* it is associated in the completion map, otherwise the* left child can complete prematurely and violate the* "happens-before" constraint.*/leftChild.addToPendingCount(1);// Update association of left-predecessor to left-most// leaf node of right subtreeif (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {// If replaced, adjust the pending count of the parent// to complete when its children completetask.addToPendingCount(-1);} else {// Left-predecessor has already completed, parent's// pending count is adjusted by left-predecessor;// left child is ready to completeleftChild.addToPendingCount(-1);}}ForEachOrderedTask<S, T> taskToFork;if (forkRight) {forkRight = false;rightSplit = leftSplit;task = leftChild;taskToFork = rightChild;}else {forkRight = true;task = rightChild;taskToFork = leftChild;}taskToFork.fork();}/** Task's pending count is either 0 or 1.  If 1 then the completion* map will contain a value that is task, and two calls to* tryComplete are required for completion, one below and one* triggered by the completion of task's left-predecessor in* onCompletion.  Therefore there is no data race within the if* block.*/if (task.getPendingCount() > 0) {// Cannot complete just yet so buffer elements into a Node// for use when completion occurs@SuppressWarnings("unchecked")IntFunction<T[]> generator = size -> (T[]) new Object[size];Node.Builder<T> nb = task.helper.makeNodeBuilder(task.helper.exactOutputSizeIfKnown(rightSplit),generator);task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();task.spliterator = null;}task.tryComplete();}

看代码会大概得理解,此采用了“happens-before”原则,左子树需右子树之前完成,通过计数策略保证前后顺序的完成,继而保证了其有序性。最终执行依旧是ForkJoinPool线程池执行。
在这里插入图片描述

五、应用场景

1)并行的前提是需要硬件支持

前提是硬件支持, 如果单核 CPU, 只会存在并发处理, 而不会并行

2)demo 测试性能(本机测试)

测试配置:16 GB 2667 MHz DDR4

@Testpublic void dateTest() {System.out.println("数据汇总开始");Long startTime = System.currentTimeMillis();int count1 = adminTaskReceiveService.receiveCount();int count2 = adminTaskReceiveService.inspectCount();int count3=adminTaskReceiveService.constructCount();int count4=adminTaskReceiveService.appointmentCount();TestResult testResult = new TestResult();testResult.setReceiveCount(count1);testResult.setInspectCount(count2);testResult.setConstructCount(count3);testResult.setAppointmentCount(count4);int count11 = adminTaskReceiveService.receiveCount1();int count22 = adminTaskReceiveService.inspectCount1();int count33=adminTaskReceiveService.constructCount1();int count44=adminTaskReceiveService.appointmentCount1();testResult.setReceiveCount1(count11);testResult.setInspectCount1(count22);testResult.setConstructCount1(count33);testResult.setAppointmentCount1(count44);System.out.println("数据汇总结束,result=" + testResult);Long endTime = System.currentTimeMillis();System.out.println("time=" + (endTime - startTime) + "毫秒");}@Testpublic void dateTest1() {System.out.println("数据汇总开始");Long startTime = System.currentTimeMillis();TestResult testResult = new TestResult();List<Runnable> taskList = new ArrayList<Runnable>() {{add(() -> testResult.setReceiveCount(adminTaskReceiveService.receiveCount()));add(() -> testResult.setInspectCount(adminTaskReceiveService.inspectCount()));add(() -> testResult.setConstructCount(adminTaskReceiveService.constructCount()));add(() -> testResult.setAppointmentCount(adminTaskReceiveService.appointmentCount()));add(() -> testResult.setReceiveCount1(adminTaskReceiveService.receiveCount1()));add(() -> testResult.setInspectCount1(adminTaskReceiveService.inspectCount1()));add(() -> testResult.setConstructCount1(adminTaskReceiveService.constructCount1()));add(() -> testResult.setAppointmentCount1(adminTaskReceiveService.appointmentCount1()));}};taskList.parallelStream().forEach(Runnable::run);System.out.println("数据汇总结束,result=" + testResult);Long endTime = System.currentTimeMillis();System.out.println("time=" + (endTime - startTime) + "毫秒");}

一个单线程,一个并行,看结果:测试demo我是需要统计8个数量,由结果可见性能并没什么大区别。
由结果可知:并行处理并不总是能提高性能,特别是当任务规模较小或者任务之间依赖性较强时。此外,在使用并行流时,应该避免使用会修改原始集合的操作,因为这些操作可能会导致不可预测的结果。由于’foreach`操作是终端操作,它会阻塞主线程直到所有元素都被处理完毕,因此即使操作是并行的,它们仍然是按照顺序完成的。
在这里插入图片描述
在这里插入图片描述

3)最后总结

在数据量比较大的情况下,CPU负载本身不是很高,不要求顺序执行的时候,可以使用并行流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/18839.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第十三章 进程与线程

第十三章 进程与线程 程序与进程的概念 程序&#xff1a; 英文单词为Program&#xff0c;是指一系列有序指令的集合&#xff0c;使用编程语言所编写&#xff0c;用于实现一定的功能。 进程&#xff1a; 进程则是指启动后的程序&#xff0c;系统会为进程分配内存空间。 函数式…

【PingPong_注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞 …

奶奶也能看懂的耦合协调度分析

不会计算&#xff1f;跟着文献学起来~ 案例数据连接&#xff08;复制链接后粘贴到浏览器中&#xff09;&#xff1a; 耦合协调度数据​spssau.com/spssaudata.html?shareDataF363000CD033FF15E557BB75B9B0D412 假如你有这样一组数据&#xff1a; 如何进行计算分析耦合协调度…

内网安全之证书模版的管理

证书模板 Certificate templates 是 CA 证书颁发机构的一个组成部分&#xff0c;是证书策略中的重要元素&#xff0c;是用于证书注册、使用和管理的一组规则和格式。当 CA 收到对证书的请求时&#xff0c;必须对该请求应用一组规则和设置&#xff0c;以执行所请求的功能&#x…

前端知识1-4:性能优化进阶

性能优化进阶 Navigation Timing API navigationStart / end 表示从上一个文档卸载结束时 > 如果没有上一个文档&#xff0c;这个值和fetchStart相等 unloadEventStart / end 标识前一个网页unload的时间点 redirectStart / end 第一个http重定向发生和结束的时间 fetch…

Hadoop3:HDFS中DataNode与NameNode的工作流程

一、DataNode中的数据情况 数据位置 /opt/module/hadoop-3.1.3/data/dfs/data/current/BP-823420375-192.168.31.102-1714395693863/current/finalized/subdir0/subdir0块信息 每个块信息&#xff0c;由两个文件保存&#xff0c;xxx.meta保存的是数据长度、校验和、时间戳&am…

芝加哥大学最新研究:GPT-4与财务预测,重塑财务分析的未来

最近&#xff0c;芝加哥大学的研究团队发表了一篇突破性的研究&#xff0c;展示了大型语言模型&#xff08;LLM&#xff09;&#xff0c;特别是 OpenAI 开发的 GPT-4&#xff0c;如何在财务报表分析领域取得了与专业分析师相匹配甚至超越的表现。这项研究不仅凸显了人工智能在高…

GDPU Java 天码行空13

&#xff08;一&#xff09;实验目的 1、掌握JAVA中与网络程序开发相关的知识点&#xff1b; 2、理解并掌握网络编程开发思想及方法&#xff1b; 3、熟悉项目开发的分包方法和依据&#xff1b; 4、实现聊天室中客服端和服务器端的实现方法&#xff1b; 5、熟悉多线程程序开发方…

Kinetix5700罗克韦尔AB伺服驱动器维修2198-D020-ERS3

Allen-Bradley罗克韦尔运动控制/伺服驱动器维修Kinetix 5700/Kinetix 6000/Kinetix 5500等系列电机驱动器/运动控制系统维修。 AB驱动器的控制接口有两种类型&#xff1a; 类型1&#xff1a;脉冲接口 类型2&#xff1a;模拟量接口 大部分小型PLC和伺服驱动器的链接方式都是开…

通过vlan实现同一网段下的网络隔离

现有两个电脑通过交换机直接连接在一起 pc1&#xff1a; pc2&#xff1a; 正常状态下是可以ping成功的 现在先进入交换机命令行界面&#xff0c;创建两个vlan <Huawei>system-view Enter system view, return user view with CtrlZ. [Huawei]vlan 10 [Huawei-vlan10…

2024年西安交通大学程序设计校赛

A题 签到题 代码如下 //A #include<iostream> #include<algorithm> #define int long long #define endl \n #define IOS ios::sync_with_stdio(0),cin.tie(0),cout.tie(0); using namespace std; signed main() {IOSint a,b,c,d;cin>>a>>b>>c…

二叉树介绍及堆

文章目录 树 概念及结构 二叉树 概念及结构 特殊的二叉树 完全二叉树 满二叉树 性质 储存 顺序存储 链式储存 堆 概念及结构 小堆 大堆 建堆 向上调整建堆 向下调整建堆 TOPK问题 法一&#xff1a; 法二&#xff1a; 树 概念及结构 树是一种非线性的数据…

解决word里加入mathtype公式后行间距变大

1.布局>页面设置>文档网格&#xff0c;网格栏选为无网格 2.固定间距

探索标准差与方差的奥秘

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、标准差与方差的基础理解 代码案例 二、标准差与方差的计算方法 方差的计算 标准差的…

QT——QSlider实现,QT滑动控件的使用

目录 简介滑动块调节两种方法滑动条触发信号量理想滑动块运用&#xff08;参考&#xff09; 简介 QT中滑动条的控件叫QSlider&#xff0c;继承自QAbstractSlider类。 主要用途是通过滑块的滑动的方式在一定范围内调节某个值。根据调节的后得到的结果去执行一些处理&#xff0c…

【AI基础】数据获取与整理、打标、增强方法、增强库imgaug

文章目录 常见的数据集网站爬虫工具使用搜索引起图片爬虫视频网站爬虫 数据整理数据检查和清洗数据去重数据集划分 数据标注数据标注工具 label studio 数据增强什么是数据增强单样本数据增强多样本数据增强样本生成方法数据增强imgaugimgaug 操作imgaug 使用 常见的数据集网站…

这款AI绘画软件,带你快速生成高质量产品效果图!

前言 随着人工智能技术的飞速发展&#xff0c;AI在设计领域的应用越来越广泛&#xff0c;。今天&#xff0c;介绍的一款能够自动生成高质量产品效果图的AI绘画软件——STARTAI。这款软件以其强大的功能和便捷的操作&#xff0c;正在重新定义电商产品效果图的制作流程。 AI局部…

RocketMQ .NET

RocketMQ 是一款由阿里巴巴集团开发并开源给Apache软件基金会的分布式消息及流处理平台。以其高吞吐量、低延迟、高可用性等特点而广受欢迎。支持Java&#xff0c;C, Python, Go, .NET等。 异步解耦&#xff1a;可以实现上游和下游业务系统的松耦合设计&#xff0c;使得服务部…

小红书图文笔记怎么做?纯干货!

小红书图文笔记的制作是一门艺术&#xff0c;它需要结合精美的图片和有价值的内容&#xff0c;以吸引和留住用户的注意力。伯乐网络传媒给大家分享制作小红书图文笔记的干货指南&#xff0c;包括准备、制作、发布和优化的各个环节。 一、准备阶段 确定目标受众&#xff1a;找到…

【NumPy】权威指南:使用NumPy的percentile函数进行百分位数计算

&#x1f9d1; 博主简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…