java forkjoin 简书_浅谈Java的Fork/Join并发框架

dbd0930a0a981. Fork/Join是什么

Oracle的官方给出的定义是:Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。

Fork/Join实现了ExecutorService,所以它的任务也需要放在线程池中执行。它的不同在于它使用了工作窃取算法,空闲的线程可以从满负荷的线程中窃取任务来帮忙执行。(我个人理解的工作窃取大意就是:由于线程池中的每个线程都有一个队列,而且线程间互不影响。那么线程每次都从自己的任务队列的头部获取一个任务出来执行。如果某个时候一个线程的任务队列空了,而其余的线程任务队列中还有任务,那么这个线程就会从其他线程的任务队列中取一个任务出来帮忙执行。就像偷取了其他人的工作一样)

Fork/Join框架的核心是继承了AbstractExecutorService的ForkJoinPool类,它保证了工作窃取算法和ForkJoinTask的正常工作。

下面是引用Oracle官方定义的原文:

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.

The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.

2. Fork/Join的基本用法 (1)Fork/Join基类

上文已经提到,Fork/Join就是要讲一个大的任务分割成若干小的任务,所以第一步当然是要做任务的分割,大致方式如下:

if (这个任务足够小){   执行要做的任务 } else {   将任务分割成两小部分   执行两小部分并等待执行结果 }

要实现FrokJoinTask我们需要一个继承了RecursiveTask或RecursiveAction的基类,并根据自身业务情况将上面的代码放入基类的coupute方法中。RecursiveTask和RecursiveAction都继承了FrokJoinTask,它俩的区别就是RecursiveTask有返回值而RecursiveAction没有。下面是我做的一个选出字符串列表中还有"a"的元素的Demo:

@Override protected List compute() {     // 当end与start之间的差小于阈值时,开始进行实际筛选     if (end - this.start  s.contains("a")).collect(Collectors.toList());     } else {         // 如果当end与start之间的差大于阈值时         // 将大任务分解成两个小任务。         int middle = (this.start + end) / 2;         ForkJoinTest left = new ForkJoinTest(list, this.start, middle, threshold);         ForkJoinTest right = new ForkJoinTest(list, middle, end, threshold);         // 并行执行两个“小任务”         left.fork();         right.fork();         // 把两个“小任务”的结果合并起来         List join = left.join();         join.addAll(right.join());         return join;     } }

(2)执行类

做好了基类就可以开始调用了,调用时首先我们需要Fork/Join线程池ForkJoinPool,然后向线程池中提交一个ForkJoinTask并得到结果。ForkJoinPool的submit方法的入参是一个ForkJoinTask,返回值也是一个ForkJoinTask,它提供一个get方法可以获取到执行结果。

代码如下:

ForkJoinPool pool = new ForkJoinPool(); // 提交可分解的ForkJoinTask任务 ForkJoinTask> future = pool.submit(forkJoinService); System.out.println(future.get()); // 关闭线程池 pool.shutdown();

就这样我们就完成了一个简单的Fork/Join的开发。

提示:Java8中java.util.Arrays的parallelSort()方法和java.util.streams包中封装的方法也都用到了Fork/Join。(细心的读者可能注意到我在Fork/Join中也有用到stream,所以其实这个Fork/Join是多余的,因为stream已经实现了Fork/Join,不过这只是一个Demo展示,没有任何实际用处也就无所谓了)

引用官方原文:

One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. These methods are similar to sort(), but leverage concurrency via the fork/join framework. Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems.

Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part of Project Lambda scheduled for the Java SE 8 release.

附完整代码以便以后参考: 1. 定义抽象类(用于拓展,此例中没有实际作用,可以不定义此类): import java.util.concurrent.RecursiveTask;  /**  * Description: ForkJoin接口  * Designer: jack  * Date: 2017/8/3  * Version: 1.0.0  */ public abstract class ForkJoinService extends RecursiveTask{     @Override     protected abstract T compute(); }

2. 定义基类 import java.util.List; import java.util.stream.Collectors;  /**  * Description: ForkJoin基类  * Designer: jack  * Date: 2017/8/3  * Version: 1.0.0  */ public class ForkJoinTest extends ForkJoinService> {      private static ForkJoinTest forkJoinTest;     private int threshold;  //阈值     private List list; //待拆分List      private ForkJoinTest(List list, int threshold) {         this.list = list;         this.threshold = threshold;     }      @Override     protected List compute() {         // 当end与start之间的差小于阈值时,开始进行实际筛选         if (list.size()  s.contains("a")).collect(Collectors.toList());         } else {             // 如果当end与start之间的差大于阈值时,将大任务分解成两个小任务。             int middle = list.size() / 2;             List leftList = list.subList(0, middle);             List rightList = list.subList(middle, list.size());             ForkJoinTest left = new ForkJoinTest(leftList, threshold);             ForkJoinTest right = new ForkJoinTest(rightList, threshold);             // 并行执行两个“小任务”             left.fork();             right.fork();             // 把两个“小任务”的结果合并起来             List join = left.join();             join.addAll(right.join());             return join;         }     }      /**      * 获取ForkJoinTest实例      * @param list  待处理List      * @param threshold 阈值      * @return ForkJoinTest例      */     public static ForkJoinService> getInstance(List list, int threshold) {         if (forkJoinTest == null) {             synchronized (ForkJoinTest.class) {                 if (forkJoinTest == null) {                     forkJoinTest = new ForkJoinTest(list, threshold);                 }             }         }         return forkJoinTest;     } }  3. 执行类  import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask;  /**  * Description: Fork/Join执行类  * Designer: jack  * Date: 2017/8/3  * Version: 1.0.0  */ public class Test {      public static void main(String args[]) throws ExecutionException, InterruptedException {          String[] strings = {"a", "ah", "b", "ba", "ab", "ac", "sd", "fd", "ar", "te", "se", "te",                 "sdr", "gdf", "df", "fg", "gh", "oa", "ah", "qwe", "re", "ty", "ui"};         List stringList = new ArrayList<>(Arrays.asList(strings));          ForkJoinPool pool = new ForkJoinPool();         ForkJoinService> forkJoinService = ForkJoinTest.getInstance(stringList, 20);         // 提交可分解的ForkJoinTask任务         ForkJoinTask> future = pool.submit(forkJoinService);         System.out.println(future.get());         // 关闭线程池         pool.shutdown();      }  }

===============================================================================

原文转载自网络,IT学习交流群QQ543427910

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/573347.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pycharm新建python file没有默认头_Pycharm 设置默认头的图文教程

Pycharm 设置默认头的图文教程1. 设置的路径是File->settings->Editor->File and Code Templates->Python Script内容见图&#xff1a;这样新建文件的时候就会默认头如下&#xff1a;这里说明下#!/usr/bin/evn python 和 #!/usr/bin/python的区别&#xff1a;第一种…

python不支持的数据类型有achar bint cfloat dlist_第1篇:Cython的数据类型(第二部分)

Cython的C指针与C一样&#xff0c;尽管指针性与变量而不是类型相关联&#xff0c;但可以在类型或变量附近声*号。%%cythoncdef int *acdef int *b但这样在变量a,b写在一行,cython编译器会发出警告的信息,因此建议每个变量单独声明%%cythoncdef int *a,*bCython中的指针的解引操…

【micropython】用python来进行BadUSB的USB-HID测试(含无线控制)

转载请注明&#xff1a;小五义http://www.cnblogs.com/xiaowuyi QQ群&#xff1a;64770604 本文以TPYBoardv101开发板为例讲解了利用micropython进行BadUSB的usb-HID设备测试的主要方法&#xff0c;使用mt7681模块进行了一个简单的实验&#xff0c;实现了手机摇控键盘输入的测…

php返回类中方法,php如何获取类中所有的方法名

php获取类中所有的方法名的方法&#xff1a;可以利用【get_class_methods()】函数来获取&#xff0c;【get_class_methods()】函数可以返回指定类中所有的方法名&#xff0c;并且会将方法名保存到数组中。【相关学习推荐&#xff1a;php编程(视频)】php获取类所有方法名的方法&…

如何安装php5.5,源码安装php5.5

centos6.6首先上传php-5.5.10至服务器安装依赖环境yum -y install gcc gcc-c autoconf libjpeg libjpeg-devel libpng libpng-devel freetype freetype-devel libxml2 libxml2-devel zlib zlib-devel glibc glibc-devel glib2 glib2-devel bzip2 bzip2-devel ncurses ncurses-d…

Fiddler中session的请求/响应类型与图标对照表

转载于:https://www.cnblogs.com/chengchengla1990/p/5681978.html

NOIP2014 uoj20解方程 数论(同余)

又是数论题 Q&A Q&#xff1a;你TM做数论上瘾了吗 A&#xff1a;没办法我数论太差了&#xff0c;得多练&#xff08;shui&#xff09;啊 题意 题目描述 已知多项式方程&#xff1a; a0a1xa2x^2..anx^n0 求这个方程在[1, m ] 内的整数解&#xff08;n 和m 均为正整数&#x…

大专学的pHp找什么工作,大专毕业能做什么工作 毕业都去干嘛了

大专毕业能做什么工作&#xff0c;大专毕业后都去干嘛了&#xff0c;我梳理了基本信息&#xff0c;看来一下&#xff01;大专毕业能做什么工作专科生毕业之后从业的职位基础包含了社会发展的各个方面。一般能够从业自身技术专业有关的工作中&#xff0c;如果有做生意的技能&…

SpringMVC启动过程详解(li)

通过对SpringMVC启动过程的深入研究&#xff0c;期望掌握Java Web容器启动过程&#xff1b;掌握SpringMVC启动过程&#xff1b;了解SpringMVC的配置文件如何配置&#xff0c;为什么要这样配置&#xff1b;掌握SpringMVC是如何工作的&#xff1b;掌握Spring源码的设计和增强阅读…

fusioncharts json java,FusionCharts使用教程:利用XML/JSON属性加载外部LOGO

在使用FusionCharts图表时&#xff0c;你可以在运行时加载外部Logo并显示于图表中。Logo可以是GIF/JPEG/PNG或SWF文件。你可以使用chart元素的logoURL属性来加载一个Logo。XMLJSON{"chart":{ "yaxisname":"Sales Figure", "caption":&…

matlab检测串口数据帧头,MATLAB 串口读取姿态数据及GUI实时动态显示设计

上一篇实现了Matlab 对串口数据的读取&#xff0c;数据可以读取并且保存到本地。本文主要设计GUI并且动态的显示曲线。可以更直观的观察实时的姿态数据和传感器数据。GUI设计效果&#xff1a;姿态GUi.png分别设置三个区域&#xff0c;分别为数据接收显示区域&#xff0c;串口设…

[转]面向对象的六大原则

现在编程的主流语言基本上都是面向对象的。如C#&#xff0c;C&#xff0c;JAVA。我们在使用时&#xff0c;已经构造了一个个的类。但是往往由于我们在类内部或外部的设计上存在种种问题&#xff0c;导致尽管是面向对象的语言&#xff0c;却是面向过程的逻辑&#xff0c;甚至维护…

上下文管理、redis发布订阅、RabbitMQ发布订阅、SQLAlchemy

一、上下文管理 import contextlib contextlib.contextmanager def work_state(state_list,worker_thread):state_list.append(worker_thread)try:yieldfinally:state_list.remove(worker_thread) free_list[] current_thread"alex" with work_state(free_list,curr…

JavaScript进阶(下)

指定分隔符连接数组元素join() join()方法用于把数组中的所有元素放入一个字符串。元素是通过指定的分隔符进行分隔的。 语法&#xff1a; arrayObject.join(分隔符) 参数说明: 注意&#xff1a;返回一个字符串&#xff0c;该字符串把数组中的各个元素串起来&#xff0c;用<…

ongl 表达式

struts.xml简单配置 <!-- &#xff08;默认false&#xff09;设置ognl表达式是否支持静态方法 --><constant name"struts.ognl.allowStaticMethodAccess" value"true"></constant><package name"ognl" namespace"/ogn…

Python开发-- Lesson 2--Python数据类型(2016/07/30)

1、文件操作 python中对文件、文件夹&#xff08;文件操作函数&#xff09;的操作需要涉及到os模块和shutil模块。 得到当前工作目录&#xff0c;即当前Python脚本工作的目录路径: os.getcwd() 返回指定目录下的所有文件和目录名:os.listdir() 函数用来删除一个文件:os.remove(…

oracle什么是重复组,规范化:“重复组”是什么意思?

扬帆大鱼英语的价值一次又一次地重复。这是重复组吗&#xff1f;不。在SUBJECT_MODULE中英语的多次出现不是重复组&#xff0c;甚至不是人们误认为重复组的两件事中的任何一个。它们也不是冗余或缺乏规范化的证据。这样的多个外观可能与冗余或规范化有关&#xff0c;但是在没有…

清除浮动php,CSS清除浮动

今天看到一篇文章关于清除浮动的&#xff0c;突然间脑袋短路了&#xff0c;咦&#xff1f;为什么要清除浮动&#xff1f;原谅我的无知&#xff0c;搜了下原来是这样&#xff0c;又倒腾出原来的笔记&#xff0c;唉&#xff0c;本来就有记录啊&#xff0c;而且也会经常用到&#…

Linux下使用Speedtest测试网速

导读Speedtest是用来测试网络性能的开源软件&#xff0c;在Linux下面安装Speedtest可以用来测试网络出口的上传和下载速度&#xff0c;帮助排查网络方面导致的故障。Speedtest介绍由于公司几个项目用户访问的时候响应较慢&#xff0c;项目本身没问题&#xff0c;服务及调用的接…

iOS开发ARC内存管理

本文的主要内容&#xff1a; ARC的本质ARC的开启与关闭ARC的修饰符ARC与BlockARC与Toll-Free BridgingARC的本质 ARC是编译器&#xff08;时&#xff09;特性&#xff0c;而不是运行时特性&#xff0c;更不是垃圾回收器(GC)。 Automatic Reference Counting (ARC) is a compile…