Fork/Join框架

什么是 Fork/Join

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

分治

我们再通过 Fork 和 Join 这两个单词来理解下 Fork/Join 框架,Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算 1+2+。。+10000,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和,最终汇总这 10 个子任务的结果。

是不是和mapreduce有点像

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

为什么需要使用工作窃取算法

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

原理

在了解原理前,其实可以思考下,自己来设计个 fork/join 框架应该怎么设计。

  1. 因为需要切分成很多的小任务,所以任务之间不能有依赖,独立的执行功能
  2. 事先分配好固定的双端队列
  3. 每个线程执行对应的队列的任务
  4. 当前线程做完队列的任务后,查看其他队列有没有多余的任务,有的话,从头取出来执行
  5. 执行完的任务,结果放到一个队列里面,启动线程从队列里面拿数据,然后合并这些数据

实现

ForkJoinTask

在这里插入图片描述

我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask :用于有返回结果的任务。

ForkJoinPool

在这里插入图片描述

ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

构造函数

public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode)
  • parallelism

可并行级别,Fork/Join框架将依据这个并行级别的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理,但是千万不要将这个属性理解成Fork/Join框架中最多存在的线程数量,也不要将这个属性和ThreadPoolExecutor线程池中的corePoolSize、maximumPoolSize属性进行比较,因为ForkJoinPool的组织结构和工作方式与后者完全不一样。而后续的讨论中,读者还可以发现Fork/Join框架中可存在的线程数量和这个参数值的关系并不是绝对的关联(有依据但并不全由它决定)。

  • factory

当Fork/Join框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread的方法。在Fork/Join框架中有一个默认的ForkJoinWorkerThreadFactory接口实现:DefaultForkJoinWorkerThreadFactory。

  • handler

异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获。

  • asyncMode

这个参数也非常重要,从字面意思来看是指的异步模式,它并不是说Fork/Join框架是采用同步模式还是采用异步模式工作。Fork/Join框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即是说存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式。

当asyncMode设置为ture的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为false

ForkJoinPool还有另外两个构造函数,一个构造函数只带有parallelism参数,既是可以设定Fork/Join框架的最大并行任务数量;另一个构造函数则不带有任何参数,对于最大并行任务数量也只是一个默认值——当前操作系统可以使用的CPU内核数量Runtime.getRuntime().availableProcessors()。实际上ForkJoinPool还有一个私有的、原生构造函数,之上提到的三个构造函数都是对这个私有的、原生构造函数的调用。

private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

如果你对Fork/Join框架没有特定的执行要求,可以直接使用不带有任何参数的构造函数。也就是说推荐基于当前操作系统可以使用的CPU内核数作为Fork/Join框架内最大并行任务数量,这样可以保证CPU在处理并行任务时,尽量少发生任务线程间的运行状态切换(实际上单个CPU内核上的线程间状态切换基本上无法避免,因为操作系统同时运行多个线程和多个进程)。

ForkJoinWorkerThread

执行任务的线程
在这里插入图片描述

使用

例如我们现在要计算从 1 加到 100

使用 Fork/Join 框架首先要考虑到的是如何分割任务,如果我们希望每个子任务最多执行10个数的相加,那么我们设置分割的阈值是10

public class CountTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 10;private int first;private int last;public CountTask(int first, int last) {this.first = first;this.last = last;}@Overrideprotected Integer compute() {int sum = 0;if (last - first <= THRESHOLD) {for (int i = first; i <= last; i++) {sum += i;}} else {int middle = (first + last) / 2;CountTask leftTask = new CountTask(first, middle);CountTask rightTask = new CountTask(middle + 1, last);leftTask.fork();rightTask.fork();sum = leftTask.join() + rightTask.join();/*int leftResult = leftTask.join();int rightResult = rightTask.join();sum = leftResult + rightResult;*/}return sum;}public static void main(String[] args) throws Exception {CountTask countTask = new CountTask(1, 100);ForkJoinPool forkJoinPool = new ForkJoinPool();Future result = forkJoinPool.submit(countTask);System.out.println(result.get());}
}

流程分析

fork()

public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;
}

fork方法用于将新创建的子任务放入当前线程的work queue队列中,Fork/Join框架将根据当前正在并发执行ForkJoinTask任务的ForkJoinWorkerThread线程状态,决定是让这个任务在队列中等待,还是创建一个新的ForkJoinWorkerThread线程运行它,又或者是唤起其它正在等待任务的ForkJoinWorkerThread线程运行它。

这里面有几个元素概念需要注意,ForkJoinTask任务是一种能在Fork/Join框架中运行的特定任务,也只有这种类型的任务可以在Fork/Join框架中被拆分运行和合并运行。ForkJoinWorkerThread线程是一种在Fork/Join框架中运行的特性线程,它除了具有普通线程的特性外,最主要的特点是每一个ForkJoinWorkerThread线程都具有一个独立的任务等待队列(work queue),这个任务队列用于存储在本线程中被拆分的若干子任务。

ForkJoinWorkerThread 里的 workQueue 是这样的

final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下:

final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;if ((a = array) != null) {    // ignore if queue removedint m = a.length - 1;     // fenced write for task visibilityU.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);U.putOrderedInt(this, QTOP, s + 1);if ((n = s - b) <= 1) {if ((p = pool) != null)p.signalWork(p.workQueues, this);}else if (n >= m)growArray();}
}

join()

join方法用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。或者,如果这个子任务存在于当前线程的任务等待队列(work queue)中,则取出这个子任务进行“递归”执行。其目的是尽快得到当前子任务的运行结果,然后继续执行。

public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();
}

它首先调用doJoin方法,通过doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有4种:

  1. 已完成(NORMAL)
  2. 被取消(CANCELLED)
  3. 信号(SIGNAL)
  4. 出现异常(EXCEPTIONAL)。

如果任务状态是已完成,则直接返回任务结果。
如果任务状态是被取消,则直接抛出CancellationException
如果任务状态是抛出异常,则直接抛出对应的异常
让我们分析一下doJoin方法的实现

private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();
}final int doExec() {int s; boolean completed;if ((s = status) >= 0) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)s = setCompletion(NORMAL);}return s;
}

在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。

异常处理

ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了**isCompletedAbnormally()**方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:

if(task.isCompletedAbnormally()){System.out.println(task.getException());
}

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

public final Throwable getException() {int s = status & DONE_MASK;return ((s >= NORMAL)    ? null :(s == CANCELLED) ? new CancellationException() :getThrowableException());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/865318.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库设计规范(DOC文件)

1 编写目的 2 数据库策略 2.1 数据库对象长度策略 2.2 数据完整性策略 2.3 规范化设计与性能之间的权衡策略 2.4 字段类型的定义与使用策略 3 命名规范 3.1 数据库命名规则 3.2 数据库对象命名的一般原则 3.3 表空间(Tablespace)命名规则 3.4 表(Table)命名规则 3.5…

Java 虚拟机 一

运行时数据区 我们先看线程隔离的数据区 程序计数器 程序计数器&#xff08; Program Counter Register&#xff09; 是一块较小的内存空间&#xff0c; 它可以看作是当前线程所执行的字节码的行号指示器。 字节码解释器工作时就是通过改变这个计数器的值来选取下一条需要执…

提升用户体验之requestAnimationFrame实现前端动画

1)requestAnimationFrame是什么? 1.MDN官方解释 2.解析这段话&#xff1a; 1、那么浏览器重绘是指什么呢&#xff1f; ——大多数电脑的显示器刷新频率是60Hz&#xff0c;1000ms/6016.66666667ms的时间刷新一次 2、重绘之前调用指定的回调函数更新动画&#xff1f; ——requ…

六西格玛绿带培训ROI:你的投资究竟值不值?

近年来&#xff0c;企业对于员工培训的投入日益增加&#xff0c;六西格玛绿带培训更是作为提升企业运营效率和质量管理的利器&#xff0c;更是备受关注。然而&#xff0c;面对高昂的培训成本&#xff0c;企业如何评估六西格玛绿带培训的投资回报率&#xff08;ROI&#xff09;呢…

安装Intel Realsense D435i驱动与ROS包报错

1.下载安装realsense SDK 1.1 安装依赖 sudo apt install libudev-dev pkg-config libgtk-3-dev sudo apt install libusb-1.0-0-dev pkg-config sudo apt install libglfw3-dev sudo apt install libssl-dev1.2 权限 cd librealsense/ sudo cp config/99-realsense-libusb.…

一万年太久,只争朝夕 | Foundation model的进展仍不够快

编者按&#xff1a;如今根基模型&#xff08;Foundation Models&#xff09;的应用和相关创新正在快速涌现&#xff0c;但仍有很大的提升空间&#xff0c;目前还无法充分发挥根基模型的潜能、将其高效快速地应用于企业级AI应用中。 根基模型的加速应用和落地&#xff0c;带动了…

64、基于去噪卷积神经网络的彩色图像去噪(matlab)

1、基于去噪卷积神经网络的彩色图像去噪的原理及流程 基于去噪卷积神经网络的彩色图像去噪是一种基于深度学习的图像处理技术&#xff0c;可以有效地去除图像中的噪声&#xff0c;提高图像的质量。下面是在Matlab中实现基于去噪卷积神经网络的彩色图像去噪的原理及流程&#x…

移动端响应式布局开发的四大方案

移动端响应式布局开发的四大方案 media媒体查询remflexvh/vw media媒体查询 媒体查询通常会结合百分比实现自适应&#xff0c;它经常应用于pc端与移动端是一套项目代码的情况&#xff0c;依据媒体查询写多套不同的样式 rem pc端和移动端是两套代码的&#xff0c;通常pc端不做…

ChatGPT如何应用在谷歌seo?

ChatGPT在提升博客和创作效率方面非常有用。它可以帮助你快速生成吸引人的标题&#xff0c;确保内容第一眼就能抓住读者的注意力。不仅如此&#xff0c;ChatGPT还能根据你的主题生成详细的文章提纲&#xff0c;让你在写作时思路更加清晰。关键词优化也是它的强项&#xff0c;可…

300KG载重履带式无人车技术详解

一、动力系统 300KG载重履带式无人车采用了高效且稳定的动力系统&#xff0c;通常由电池组或燃油发动机作为动力源。电池组提供了较长的续航时间和较低的运行噪音&#xff0c;适用于需要静音作业的场合。而燃油发动机则能提供更高的功率和续航能力&#xff0c;适用于需要长时间…

STM32远程烧录程序

目录 简介 不同的程序下载方式 ICP&#xff1a;In-Circuit Programming ISP&#xff1a;In-System Programing IAP&#xff1a;In-Application Programming BootLoader Bootloader 是什么&#xff1f; STM32的启动方式 存储器组织 存储器映像 嵌入式SRAM 嵌入式FL…

不同行业如何选择适合自己行业的项目管理工具?

在当今的信息化时代&#xff0c;项目管理软件已成为各行各业不可或缺的工具。然而&#xff0c;由于各行业具有不同的特点和需求&#xff0c;因此选择合适的项目管理软件成为了一个重要问题。本文将探讨不同行业在选择项目管理软件时需要考虑的因素&#xff0c;希望能帮助大家更…

vue实现一个简单的审批绘制功能

1、vue代码 <div class"approval"><div class"approval_ul" v-for"(item,key) in approvalList" :key"key"><div><el-radio-group v-model"item.jointlySign"><el-radio label"1">…

【超全详解】耳机怎么清理?手把手教你清洁各种耳机!

平时听歌、听书、打电话耳机往往有时候几个小时都戴着&#xff0c;时间久了难免会堆积污垢堵孔。这可能就会造成耳机左右音量不一致、声音小、降噪效果变差、嗡嗡声等问题&#xff0c; 而且耳机用久了如果不及时清理&#xff0c;可能导致耳朵感染细菌&#xff0c;严重的话会影响…

亚马逊测评策略全攻略:详析各方案优势与局限,你精通了吗?

亚马逊测评&#xff0c;一个绕不开的话题。不管是对于新手卖家还是资深卖家来说&#xff0c;它都是提升产品销量和排名的有效手段之一。接下来&#xff0c;我将为大家详细解析亚马逊测评的各种方式和注意事项。 一、精准筛选真人测评资源 在寻找真人测评资源时&#xff0c;许多…

难道 Java 已经过时了?

当一门技术已经存在许多年了&#xff0c;它可能会失去竞争力&#xff0c;而后黯然退场&#xff0c;默默地离开&#xff0c;这对大部分的人来说就已经算是过时了。 Java 于 1995 年正式上线&#xff0c;至今已经走过了 27 个年头&#xff0c;在众多编程技术里算是年龄比较大的语…

数据结构----栈和队列之队列的实现

目录 1.基本概况 2.队列组成 3.队列的实现 &#xff08;1&#xff09;队列的初始化 &#xff08;2&#xff09;队列的销毁 &#xff08;3&#xff09;队列的尾插 &#xff08;4&#xff09;队列的头删 &#xff08;5&#xff09;队列的判空 &#xff08;6&#xff09;队…

外挂级OCR神器:免费文档解析、表格识别、手写识别、古籍识别、PDF转Word

智能文档解析&#xff1a;大模型友好的文档解析工具 PDF转Markdown 支持将任意格式的文件&#xff08;图片、PDF、Doc&#xff0f;Docx、网页等&#xff09;解析为Markdown或Json格式&#xff0c;以对LLM友好的方式呈现。 更高速度&#xff1a;100页PDF最快1.5s完成解析 更大…

SAR目标检测

Multi-Stage with Filter Augmentation 多阶段滤波器增强(MSFA) 对SAR合成孔径雷达目标检测性能的改善 MSFA ON SAR 传统方法: 预训练:传统方法开始于在通用数据集上预训练一个基础模型。 微调:这个预训练的模型会被微调以适应特定的SAR图像&#xff0c;试图缩小域间的差距 …

【JAVA多线程】JDK中的各种锁,看这一篇就够了

目录 1.概论 1.1.实现锁的要素 1.2.阻塞队列 1.3.Lock接口和Sync类 2.各种锁 2.1.互斥锁 2.1.1.概论 2.1.2.源码 1.lock() 2.unlock() 2.2.读写锁 2.3.Condition 2.3.1.概论 2.3.2.底层实现 1.概论 1.1.实现锁的要素 JAVA中的锁都是可重入的锁&#xff0c;因为…