Java 并发编程 —— Fork/Join 框架的原理详解

目录

一. 前言

二. 并发和并行

2.1. 并发

2.2. 并行

2.3. 分治法

三. ForkJoin 并行处理框架的理论

3.1. ForkJoin 框架概述

3.2. ForkJoin 框架原理

3.3. 工作窃取算法

四. ForkJoin 并行处理框架的实现

4.1. ForkJoinPool 类

4.2. ForkJoinWorkerThread 类

4.3. ForkJoinTask 类

4.4. ForkJoin 示例

五. 总结


一. 前言

    在 JDK 中,提供了这样一种功能:它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行,待每个并行执行的逻辑执行完成后,再将各个结果进行汇总,得出最终的结果数据。有点像Hadoop 中的 MapReduce。

    ForkJoin 是由 JDK1.7 之后提供的多线程并发处理框架。ForkJoin 框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin 将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。

二. 并发和并行

并发和并行在本质上还是有所区别的。

2.1. 并发

    并发指的是在同一时刻,只有一个线程能够获取到 CPU 执行任务,而多个线程被快速的轮换执行,这就使得在宏观上具有多个线程同时执行的效果,并发不是真正的同时执行,并发可以使用下图表示:

2.2. 并行

    并行指的是无论何时,多个线程都是在多个 CPU 核心上同时执行的,是真正的同时执行。

2.3. 分治法

    把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。

    步骤可分为:1. 分割原问题;2. 求解子问题;3. 合并子问题的解为原问题的解。我们可以使用如下伪代码来表示这个步骤:

if (任务很小){直接计算得到结果
} else {分拆成N个子任务调用子任务的fork()进行计算调用子任务的join()合并计算结果
}

典型应用有:二分搜索、大整数乘法、Strassen矩阵乘法、棋盘覆盖、合并排序、快速排序、线性时间选择、汉诺塔。

三. ForkJoin 并行处理框架的理论

3.1. ForkJoin 框架概述

    Java 1.7 引入了一种新的并发框架 —— Fork/Join Framework,主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数。

    ForkJoin 框架的本质是一个用于并行执行任务的框架,能够把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的计算结果。在 Java 中,ForkJoin 框架与 ThreadPool 共存,并不是要替换 ThreadPool。

    其实,在 Java 8 中引入的并行流计算,内部就是采用的 ForkJoinPool 来实现的。例如,下面使用并行流实现打印数组元组的程序:

public class SumArray {public static void main(String[] args) {List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);numberList.parallelStream().forEach(System.out::println);}
}

说到这里,可能有读者会问:可以使用线程池的 ThreadPoolExecutor 来实现啊?为什么要使用ForkJoinPool? 接下来,我们就来回答这个问题。

3.2. ForkJoin 框架原理

    ForkJoin 框架是从 JDK1.7 中引入的新特性,它同 ThreadPoolExecutor 一样,也实现了Executor 和 ExecutorService 接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入指定的线程数量,那么当前计算机可用的 CPU 数量会被设置为线程数量作为默认值。

    ForkJoinPool 主要使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool 能够使用相对较少的线程来处理大量的任务。

    比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。

    比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概200万+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

    所以当使用 ThreadPoolExecutor 时,使用分治法会存在问题,因为 ThreadPoolExecutor 中的线程无法向任务队列中再添加一个任务并在等待该任务完成之后再继续执行。而使用 ForkJoinPool就能够解决这个问题,它就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

那么使用 ThreadPoolExecutor 或者 ForkJoinPool,性能上会有什么差异呢?

    首先,使用 ForkJoinPool 能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用 ThreadPoolExecutor 时,是不可能完成的,因为 ThreadPoolExecutor 中的 Thread 无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,很显然这是不可行的,也是很不合理的!

3.3. 工作窃取算法

    假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点:
充分利用线程进行并行计算,并减少了线程间的竞争。

工作窃取算法的缺点:
在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。

四. ForkJoin 并行处理框架的实现

ForkJoin 框架中一些重要的类如下所示:

Fork/Join 框架类图

4.1. ForkJoinPool 类

    既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是ForkJoinPool,它与其它的 ExecutorService 区别主要在于它使用“工作窃取”。由类图可以看出,ForkJoinPool 类实现了线程池的 Executor接口。我们还可以使用 Executors.newWorkStealPool() 方法来创建 ForkJoinPool。

ForkJoinPool 中提供了如下提交任务的方法:

public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)

4.2. ForkJoinWorkerThread 类

实现 ForkJoin 框架中的线程。

4.3. ForkJoinTask<V> 类

    ForkJoinTask 封装了数据及其相应的计算,并且支持细粒度的数据并行。ForkJoinTask 比线程要轻量,ForkJoinPool 中少量工作线程能够运行大量的 ForkJoinTask。ForkJoinTask 类中主要包括两个方法 fork() 和 join(),分别实现任务的分拆与合并。

    fork() 方法类似于 Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join() 方法不同,ForkJoinTask 的 join() 方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用 join(),它将处理其他任务,直到注意到目标子任务已经完成。

我们可以使用下图来表示这个过程:

ForkJoinTask有3个子类:

  1. RecursiveAction:无返回值的ForkJoinTask,并实现 Runnable。
  2. RecursiveTask:有返回值的ForkJoinTask,并实现 Callable。
  3. CountedCompleter:完成任务后将触发其他任务。

4.4. ForkJoin 示例

package com.lm.concurrency.example;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;// 如果任务足够小就计算任务boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分裂成两个子任务计算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待任务执行结束合并其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();// 生成一个计算任务,计算1+2+3+4ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);// 执行一个任务Future<Integer> result = forkjoinPool.submit(task);try {System.out.println("result: " + result.get());} catch (Exception e) {System.out.println(e);}}
}

五. 总结

Fork/Join 框架局限性:

对于 Fork/Join 框架而言,当一个任务正在等待它使用 Join 操作创建的子任务结束时,执行这个任务的工作线程查找其他未被执行的任务,并开始执行这些未被执行的任务,通过这种方式,线程充分利用它们的运行时间来提高应用程序的性能。为了实现这个目标,Fork/Join 框架执行的任务有一些局限性。

  1. 任务只能使用 Fork 和 Join 操作来进行同步机制,如果使用了其他同步机制,则在同步操作时,工作线程就不能执行其他任务了。比如,在 Fork/Join 框架中,使任务进行了睡眠,那么,在睡眠期间内,正在执行这个任务的工作线程将不会执行其他任务了。
  2. 在 Fork/Join 框架中,所拆分的任务不应该去执行 IO 操作,比如:读写数据文件。
  3. 任务不能抛出检查异常,必须通过必要的代码来处理这些异常。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/235644.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MongoDB的原子操作findAndModify和findOneAndUpdate

本文主要介绍MongoDB的原子操作findAndModify和findOneAndUpdate。 目录 MongoDB的原子操作一、findAndModify二、findOneAndUpdate MongoDB的原子操作 MongoDB的原子操作指的是在单个操作中对数据库的数据进行读取和修改&#xff0c;并确保操作是原子的&#xff0c;即要么完全…

Swagger2之SpringBoot集成使用

前言&#xff1a; 我们对于Mybatis-Plus的分享较多&#xff0c;都是接触的一些数据库相关的知识&#xff0c;今天给大家带来的是Swagger2 Swagger2 1.介绍&#xff1a; Swagger2是一个规范和完整的框架&#xff0c;用于生成、描述、调用和可视化Restful风格的web服务&#xff…

【蓝桥杯】专题练习

前缀和 3956. 截断数组 - AcWing题库 一看到题目很容易想到的思路是对数组求前缀和&#xff0c;然后枚举两个分段点就好&#xff0c;时间复杂度是On^2&#xff0c;n是1e5会t&#xff0c;需要优化。 朴素的代码&#xff0c;会超时&#xff1a; #include <bits/stdc.h> u…

4-高可用-限流详情

在开发高并发系统时&#xff0c;有很多手段来保护系统&#xff0c;如缓存、降级和限流等。缓存目的是提升系统访问速度和增大系统处理能力&#xff0c;可谓是抗高并发流量的银弹。 而降级是当服务出问题或者影响到核心流程的性能&#xff0c;需要暂时屏蔽掉&#xff0c;待高峰…

亚马逊云科技-如何缩容/减小您的AWS EC2根卷大小-简明教程

一、背景 Amazon EBS提供了块级存储卷以用于 EC2 实例&#xff0c;EBS具备弹性的特点&#xff0c;可以动态的增加容量、更改卷类型以及修改预配置的IOPS值。但是EBS不能动态的减少容量&#xff0c;在实际使用中&#xff0c;用户也许会存在此类场景&#xff1a; 在创建AWS EC2…

【Python】循环语句

一、while循环的基础语法 二、while循环的嵌套应用 三、while循环的嵌套案例 四、for循环的基础语法 五、for循环的嵌套应用 六、循环中断 : break和continue 一、while循环的基础语法 使用while循环的基础应用 while循环语句 while循环注意点 while的条件需得到布尔类型&am…

高并发神经网络推理部署

高并发的神经网络推理框架部署 highport 是一款封装神经网络推理的高并发的软件架构&#xff0c;已在ESWEEK 2023年皮肤病检测比赛中获得第一名。 这里记录一下highport的软件架构和几个trick优化 软件架构图 解密模块&#xff1a;我们训练完的模型文件是带加密的&#xff0c;…

2023优秀开源项目获选榜名单(开放原子开源基金会)|JeecgBoot 成功入选

JeecgBoot 是一个开源的企业级低代码开发平台&#xff0c;它成功入选2023年度生态开源项目&#xff0c;这是对其十年坚持开源的认可。作为一个开源项目&#xff0c;JeecgBoot 在过去的十年里一直秉承着开放、共享、协作的理念&#xff0c;不断推动着开源社区的发展。 2023年开放…

在windows上如何干净的卸载一个软件及其快捷方式

可以在控制面板里面卸载&#xff0c;可以卸载掉文件夹及其快捷方式&#xff0c;具体操作如下&#xff1a; 找到-》控制面板\程序\程序和功能 然后右键某一项&#xff0c;即可出现卸载功能项。 卸载不干净的方法&#xff1a;利用软件商店卸载&#xff0c;有可能卸载失败&#x…

maven学习和maven聚合工程搭建

1.学习maven maven的概念 项目管理工具 &#xff0c;对jar进行依赖管理&#xff0c;编译&#xff0c;打包&#xff0c;单元测试&#xff0c;安装&#xff0c;部署&#xff0c;贯穿整个项目 为什么要学maven 要解决的问题&#xff1a; 不同的开发工具开发出来的项目目录结构…

网络通信day5作业

1> 使用select完成TCP客户端程序 客户端: #include<myhead.h>#define FPORT 9999 #define FIP "192.168.125.130"#define KPORT 6666 #define KIP "192.168.125.130"int main(int argc, const char *argv[]) {//创建套接字文件描述符int cfd…

Android: Ubuntu下交叉环境编译常用调试工具demo for lspci命令(ARM设备)

lspci命令交叉环境编译(ARM设备) 交叉编译工具下载&#xff1a; https://releases.linaro.org/components/toolchain/binaries https://releases.linaro.org/components/toolchain/binaries/6.3-2017.05/aarch64-linux-gnu/ lspci命令交叉环境编译(ARM设备)&#xff1a; 1&a…

智能优化算法应用:基于梯度算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于梯度算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于梯度算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.梯度算法4.实验参数设定5.算法结果6.参考文献7.MA…

IDEA中如何创建各种类型的java工程

如果你的工程下面的module没有互相依赖&#xff0c;就相当于是一个小的项目&#xff0c;idea版本不同&#xff0c;细节可能不同 1、普通的Java 工程 在工程上&#xff0c;右键- New - Module&#xff0c;如下&#xff1a; 指明Java工程的名称及使用的JDK版本&#xff1a; 创建…

基于扩散的模糊文本图像超分辨率技术

恢复低分辨率文本图像具有挑战性&#xff0c;特别是对于在现实场景中具有复杂笔画和严重降质的中文文本图像。确保文本的准确性和样式的真实性对于高质量的文本图像超分辨率至关重要。最近&#xff0c;由于扩散模型强大的数据分布建模能力和数据生成能力&#xff0c;在自然图像…

Python---IP 地址的介绍

1. IP 地址的概念 IP 地址就是标识网络中设备的一个地址&#xff0c;好比现实生活中的家庭地址。 网络中的设备效果图: 2. IP 地址的表现形式 说明: IP 地址分为两类&#xff1a; IPv4 和 IPv6 IPv4 是目前使用的ip地址 IPv6 是未来使用的ip地址 IPv4 是由点分十进制组成 …

Pycharm 关闭控制台多余窗口详解(console)

文章目录 1 问题描述2 解决办法2.1 步骤1&#xff1a;编辑配置2.2 步骤2&#xff1a;使用 Python 控制台运行&#xff08;取消勾选&#xff09;2.3 验证&#xff1a;再次运行&#xff0c;多余窗口消失 1 问题描述 2 解决办法 2.1 步骤1&#xff1a;编辑配置 菜单路径&#xf…

anconda常用命令

一、基础指令说明 1、查看anconda版本号 conda --version 2、查看当前已有虚拟环境 conda env list 3、创建新环境 conda create -n classify python3.9 创建一个叫做classify的虚拟环境&#xff0c;其中python等于3.9 4、进入虚拟环境 activate classify 5、安装包 接下来…

5213A 综合数据通信分析仪

5213A 综合数据通信分析仪 数字通信测量仪器 5213A 综合数据通信分析仪是符合标准 PXI/CPCI 总线的模块化便携式仪器&#xff0c;用户可以 根据测试需要选配相应的模块&#xff0c;可选模块包括双端口 RapidIO 模块、双端口 2G FC 模块、双 端口 4G FC 模块、双端口 8G FC 模…

HP服务器idrac设置以及系统安装

HP服务器idrac设置以及系统安装 一、设置管理口的地址和密码1、HP服务器重新界面选择"F9"进入BIOS&#xff0c;设置iLo5(idrac)的IP和用户名密码。2、选择"系统配置"。3、选择"iLO 4"配置程序。4、网络选项是设置idrac管理口的地址&#xff0c;设…