使用ThreadPoolExecutor并行化独立的单线程任务

Java SE 5.0中引入的任务执行框架是简化多线程应用程序的设计和开发的巨大飞跃。 该框架提供了用于管理任务概念,管理线程生命周期及其执行策略的工具。

在此博客文章中,我们将描述该框架的功能,灵活性和简单性,以展示一个简单的用例。

基础

执行程序框架引入了一个接口来管理任务执行: 执行程序。 Executor是用于提交任务的接口,表示为Runnable实例。 此接口还将任务提交与任务执行隔离开来 :具有不同执行策略的执行者都发布相同的提交接口:如果您更改执行策略,则提交逻辑将不受更改的影响。

如果您想提交一个Runnable实例来执行,它很简单:

Executor exec = …;
exec.execute(runnable);

线程池

如上一节所述,执行器合同未指定执行器如何执行可运行对象:这取决于您所使用的执行器的特定类型。 该框架提供了一些不同类型的执行器,每种执行器都有针对不同用例量身定制的特定执行策略。

您将要处理的最常见的执行程序类型是线程池执行程序 。,它们是ThreadPoolExecutor类(及其子类)的实例。 线程池执行程序管理一个线程池 (即将要执行任务的工作线程池)和一个工作队列

您肯定已经在其他技术中看到池的概念。 使用池的主要优点是减少了资源创建的开销,重用了使用后释放的结构(在这种情况下为线程)。 使用池的另一个隐式优势是可以调整资源使用量 :可以调整线程池大小以实现所需的负载,而不会损害系统资源。

该框架为线程池提供了一个工厂类,称为Executors 。 使用该工厂,您将能够创建具有不同特征的线程池。 通常,底层实现通常是相同的( ThreadPoolExecutor ),但是工厂类可帮助您快速配置线程池,而无需使用更复杂的构造函数。 出厂方法是:

  • newFixedThreadPool :此方法返回最大大小固定的线程池。 它将根据需要创建新线程,直到最大配置大小。 当线程数达到最大值时,线程池将保持大小不变​​。
  • newCachedThreadPool :此方法返回无限制的线程池,即没有最大大小的线程池。 但是,当负载减少时,这种线程池将拆除未使用的线程。
  • newSingleThreadedExecutor :此方法返回一个执行程序,该执行程序保证将在单个线程中执行任务。
  • newScheduledThreadPool :此方法返回固定大小的线程池,该线程池支持延迟和定时任务执行。

这仅仅是个开始。 执行器还提供了本教程中未涵盖的其他功能,我强烈建议您学习以下内容:

  • 生命周期管理方法,由ExecutorService接口声明(例如shutdown ()和awaitTermination ())。
  • 完成服务可轮询任务状态并检索其返回值(如果适用)。

该ExecutorService的接口就显得尤为重要,因为它提供了一种方法来关闭一个线程池,这是一件好事,你几乎肯定希望能够干净利落做。 幸运的是, ExecutorService接口非常简单且易于解释,我建议您彻底研究其JavaDoc。

基本上,您会向ExecutorService发送shutdown ()消息,此后它将不接受新提交的任务,但将继续处理已排队的作业。 您可以使用isTerminated ()来收集执行程序服务的终止状态,也可以使用awaitTermination (…)方法等待终止。 不过, awaitTermination方法不会永远等待:您必须将最大等待超时作为参数传递。

警告 :错误和混乱的根源是理解为什么JVM进程永不退出的原因。 如果不关闭执行程序服务,从而破坏基础线程,则JVM将永远不会退出: JVM在其最后一个非守护线程退出时退出。

配置ThreadPoolExecutor

如果决定手动创建ThreadPoolExecutor而不是使用Executors工厂类,则需要使用其构造函数之一来创建和配置ThreadPoolExecutor 。 此类的最广泛的构造方法是:

public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);

如您所见,您可以配置:

  • 核心池大小(线程池将尝试使用的大小)。
  • 最大池大小。
  • 保持活动时间,在该时间之后,空闲线程有资格被拆除。
  • 工作队列中包含等待执行的任务。
  • 拒绝任务提交时应用的策略。

限制排队的任务数

在可预测性和稳定性方面,限制正在执行的并发任务的数量,调整线程池的大小对您的应用程序及其执行环境具有巨大的好处:无限制的线程创建最终将耗尽运行时资源,结果您的应用程序可能会遇到严重的性能问题,甚至可能导致应用程序不稳定。

这只是解决部分问题的一种解决方案:您限制了正在执行的任务数量,但没有限制可以提交并排队供以后执行的作业数量。 该应用程序将在以后遇到资源短缺的问题,但是如果提交率始终超过执行率,它将最终遇到这种情况。

该问题的解决方案是:

  • 向执行者提供阻塞队列以保留等待的任务。 如果队列已满,提交的任务将被“拒绝”。
  • 拒绝任务提交时,将调用RejectedExecutionHandler ,这就是为什么在上一项中引用了被拒绝的动词的原因。 您可以实施自己的拒绝策略,也可以使用框架提供的内置策略之一。

默认拒绝策略使执行程序抛出RejectedExecutionException 。 但是,其他内置策略可让您:

  • 静默丢弃作业。
  • 丢弃最旧的作业,然后尝试重新提交最后一份。
  • 在调用者的线程上执行被拒绝的任务。

什么时候以及为什么要使用这样的线程池配置? 让我们来看一个例子。

一个示例:并行化独立的单线程任务

最近,有人打电话给我解决我的客户自很久以前就在运行的一项旧工作的问题。 基本上,作业由等待一组目录层次结构上的文件系统事件的组件组成。 每当触发事件时,都必须处理文件。 文件处理由专有的单线程进程执行。 说实话,就其本身的性质而言,即使我可以,但如果我可以并行化它,我就不会。 事件全天的到达率很高,不需要实时处理文件,而只需要在第二天之前进行处理即可。

当前的实现是技术的混合与匹配,包括UNIX shell脚本,该脚本负责扫描巨大的目录层次结构以检测应用更改的位置。 实施该实现后,执行环境中的核心数量也就只有两个。 同样,事件的发生率也很低:如今,它们的数量级约为数百万 ,总共要处理1到2 TB的原始数据。

如今,客户端正在运行这些进程的服务器是十二台核心计算机:这是并行化那些旧的单线程任务的巨大机会。 我们已经基本掌握了配方的所有成分,我们只需要决定如何构建和调整它即可。 在编写任何代码之前,需要进行一些思考以了解负载的性质,这些是我检测到的约束:

  • 定期要扫描大量文件:每个目录包含一到两百万个文件。
  • 扫描算法非常快,可以并行化。
  • 处理文件至少需要1秒,甚至可能需要2或3秒的峰值。
  • 处理文件时,除CPU外没有其他瓶颈。
  • CPU使用率必须是可调的,以便根据一天中的时间使用不同的负载配置文件。

因此,我需要一个线程池,该线程池的大小由调用流程时活动的负载配置文件确定。 然后,我倾向于创建根据负载策略配置的固定大小的线程池执行程序。 由于处理线程仅受CPU限制,其核心使用率为100%,并且无需等待其他资源,因此负载策略非常容易计算:只需获取处理环境中可用的核心数量,然后使用负载按比例缩小当时处于活动状态的因素(并检查在峰值时刻至少使用了一个内核):

int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);

然后,我需要使用阻塞队列来创建ThreadPoolExecutor来限制提交的任务数。 为什么? 好吧:目录扫描算法非常快,并且会生成大量文件以非常快速地处理。 有多大? 很难预测,其可变性很高。 我不会让执行者的内部队列乱七八糟地用代表我的任务的对象(包括一个非常大的文件描述符)填充。 我宁愿让执行程序在队列填满时拒绝文件。

另外,我将使用ThreadPoolExecutor.CallerRunsPolicy作为拒绝策略。 为什么? 好吧,因为当队列已满并且池中的线程正在忙于处理文件时,我将拥有正在提交执行该文件的任务的线程。 这样,扫描将停止处理文件,并在完成当前任务后立即恢复扫描。

这是创建执行程序的代码:

ExecutorService executorService =new ThreadPoolExecutor(maxThreads, // core thread pool sizemaxThreads, // maximum thread pool size1, // time to wait before resizing poolTimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxThreads, true),new ThreadPoolExecutor.CallerRunsPolicy());

代码的框架如下(已大大简化):

// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {File currentDir = dirsToProcess.pop();// listing childrenFile[] children = currentDir.listFiles();// processing childrenfor (final File currentFile : children) {// if it's a directory, defer processingif (currentFile.isDirectory()) {dirsToProcess.add(currentFile);continue;}executorService.submit(new Runnable() {@Overridepublic void run() {try {// if it's a file, process itnew ConvertTask(currentFile).perform();} catch (Exception ex) {// error management logic}}});
}// ...// wait for all of the executor threads to finish
executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the first tryexecutorService.shutdownNow();}if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the second try}
} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();
}

结论

如您所见,Java并发API非常易于使用,非常灵活并且功能强大。 几年前,我会花更多的精力编写这样一个简单的程序。 这样,我可以在几个小时内快速解决由遗留的单线程组件引起的可伸缩性问题。

参考: The Gray Blog中的 JCG合作伙伴 Enrico Crisostomo 使用ThreadPoolExecutor并行化独立的单线程任务 。

相关片段:
  • 受限连接池的阻塞队列示例
  • 更一般的等待/通知机制的CountDownLatch示例
  • 任务运行器的重入锁示例
  • 限制URL连接的信号量示例

相关文章 :

  • Java并发教程–线程池
  • 有益的CountDownLatch和棘手的Java死锁
  • 并发优化–减少锁粒度
  • Java并发教程– CountDownLatch
  • JVM如何处理锁
  • Java教程和Android教程列表

翻译自: https://www.javacodegeeks.com/2011/12/using-threadpoolexecutor-to-parallelize.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/374431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python定义一个圆_Python-矩形和圆形

原博文 2019-11-11 12:34 − Exercise 15.1. 定义一个叫做Circle 类&#xff0c;类的属性是圆心 (center) 和半径 (radius) , 其中&#xff0c;圆心 (center) 是一个 Point 类&#xff0c;而半径 (radius) 是一个数字。 实例化一个圆心 (center) 为 (150, 100) &#xff0c;半…

C语言代码规范(四)命名规则

一、宏定义全部字母大写&#xff0c;单词间下划线间隔 #define FLASH_PAGE_SIZE 256 #define FLASH_SECTOR_SIZE (4 * 1024) #define FLASH_BLOCK_SIZE (64 * 1024) #define FLASH_SIZE (16 * 1024 * 1024) 二、const修饰的常量全部字母大写&#xff0c;单词间…

Forbidden You don't have permission to access / on this server PHP

Forbidden You dont have permission to access / on this server PHP 在新安装的谷歌游览器里&#xff0c;打不了PHP网站了&#xff0c;错误显示&#xff1a; Forbidden You dont have permission to access / on this server. 原因还是配置权限问题 解决办法&#xff1a; wa…

Spring 3.1和JPA的持久层

1.概述 本教程显示了如何使用Hibernate作为持久性提供程序使用JPA设置Spring 。 有关使用基于Java的配置和项目的基本Maven pom设置Spring上下文的分步介绍&#xff0c;请参阅本文 。 2. Java的JPA Spring配置 要在Spring项目中使用JPA&#xff0c; 需要设置EntityManager 。…

150928错误认识

1. $arr array(); foreach ($re as $k>$v){  $arr[] $v[updatetime];} $arr的返回结果为&#xff1a; Array ([0] > 2014-09[1] > 2015-04[2] > 2015-09 )$arr array(); foreach ($re as $k>$v){  $arr[$k] $v[updatetime];} $arr的返回结果为&#xff…

STM32F1笔记(一)GPIO输出

GPIO&#xff1a;General Purpose Input Output &#xff08;通用输入/输出&#xff09;。 GPIO最经典应用&#xff1a;LED灯。 先看电路。声明&#xff1a;参考正点原子战舰开发板。 与LED串联的电阻称为限流电阻。 限流电阻计算公式&#xff1a;R(U-LED压降)/20ma。 U为LE…

dataframe转化为array_【Python专栏】12 种高效 Numpy 和 Pandas 函数为你加速分析

来源&#xff1a;机器之心编译&#xff1a;Jamin、杜伟、张倩我们都知道&#xff0c;Numpy 是 Python 环境下的扩展程序库&#xff0c;支持大量的维度数组和矩阵运算&#xff1b;Pandas 也是 Python 环境下的数据操作和分析软件包&#xff0c;以及强大的数据分析库。二者在日常…

具有GlassFish和一致性的高性能JPA –第1部分

您以前听说过连贯性吗&#xff1f; 大概是。 它是那些著名的内存网格解决方案之一&#xff0c;该解决方案承诺了超快的数据访问速度和对经常使用的数据的无限空间。 一些众所周知的竞争对手是Infinispan &#xff0c; Memcached和Terracotta Ehcache 。 它们都很棒&#xff0c;…

如何在自己的代码中实现分享视频文件或者是图片文件到微信 QQ微博 新浪微博等!!!...

首先在文档第一句我先自嘲下 &#xff0c; 我是大傻逼&#xff0c; 弄了两天微信是视频分享&#xff0c;一直被说为啥跟系统的相册分享的不一样&#xff0c;尼玛&#xff01;&#xff01;&#xff01; 这里来说正文&#xff0c;我这里不像多少太多&#xff0c;大家都是程序猿&a…

sql 数据库中用创建好的视图修改表数据

只要满足下列条件&#xff0c;即可通过视图修改基础基表的数据&#xff1a; 1、任何修改&#xff08;包括 UPDATE、INSERT 和 DELETE 语句&#xff09;都只能引用一个基表的列。 2、视图中被修改的列必须直接引用表列中的基础数据。不能通过任何其他方式对这些列进行派生&#…

boost原理与sklearn源码_机器学习sklearn系列之决策树

一、 Sklearn库 Scikit learn 也简称 sklearn, 自2007年发布以来&#xff0c;scikit-learn已经成为Python重要的机器学习库了。支持包括分类、回归、降维和聚类四大机器学习算法。还包含了特征提取、数据处理和模型评估三大模块。sklearn是Scipy的扩展&#xff0c;建立在NumPy和…

STM32F1笔记(二)GPIO输入

STM32 GPIO输入的经典应用是按键。 先看电路。声明&#xff1a;参考正点原子战舰开发板。 在这里可以看到&#xff0c;KEY_UP按键是高电平有效的&#xff0c;即当按下该按键时&#xff0c;GPIO读到高电平。 KEY0/1/2是低电平有效的&#xff0c;即当按下该按键时&#xff0c;G…

Google Authenticator:将其与您自己的Java身份验证服务器配合使用

用于移动设备的Google Authenticator应用程序是一个非常方便的应用程序&#xff0c;它实现了TOTP算法&#xff08;在RFC 6238中指定&#xff09;。 使用Google Authenticator&#xff0c;您可以生成时间密码&#xff0c;该密码可用于在共享请求用户密钥的身份验证服务器中授权用…

[Week2 作业] 代码规范之争

这四个问题均是出自 http://goodmath.scientopia.org/2011/07/14/stuff-everyone-should-do-part-2-coding-standards/ 。 我对这四个问题均持反驳的看法&#xff0c;下面是我的理由~ Q1&#xff1a;这些规范都是官僚制度下产生的浪费大家的编程时间、影响人们开发效率, 浪费时…

STM32F1笔记(三)UART/USART

UART&#xff1a;Universal Asynchronous Receiver/Transmitter&#xff08;通用异步收/发器&#xff09; USART&#xff1a;Universal Synchronous/Asynchronous Receiver/Transmitter&#xff08;通用同步/异步串行收/发器&#xff09; 从命名即可看出USART就是UART的基础上…

python安装界面翻译_python环境搭建

如果想要运行python需要有解释器和编辑器。 什么是解释器 解释器我们可以把它理解成翻译官&#xff0c;它是将我们写的python代码翻译成计算机能够懂得机器语言。 然后计算机收到解释器的命令来干活&#xff0c;最终再将结果反馈在解释器中。 解释器推荐使用anaconda3 什么是an…

无需重新部署Eclipse和Tomcat即可进行更改

他们说&#xff0c;由于应用程序服务器过大&#xff0c;Java的开发速度很慢–您必须重新部署应用程序才能看到所做的更改。 使用PHP&#xff0c;Python等脚本语言时&#xff0c;可以“保存并刷新”。 这个法定问题总结了这个“神话”。 是的&#xff0c;这是一个神话。 您也可以…

进阶篇-用户界面:4.Android中常用组件

1.下拉菜单 在Web开发中&#xff0c;HTML提供了下拉列表的实现&#xff0c;就是使用<select>元素实现一个下拉列表&#xff0c;在其中每个下拉列表项使用<option>表示即可。这是在Web开发中一个必不可少的交互性组件&#xff0c;而在Android中的对应实现就是Spinne…

收款单单据编号不正确

问题现象:现在在应收&#xff0c;应付的收款单录入和付款单录入里点击增加的话&#xff0c;单据编号如果是出现2024呢&#xff0c;按保存的话&#xff0c;就会出现单据号重复&#xff1b;查到的最大的单据号是3034&#xff0c;在流水号里改成3038后再增回加的话还是出现2024。然…

STM32F1笔记(四)NVIC中断优先级管理

STM32将中断分为5个组&#xff0c;组0~4。配置代码如下&#xff1a; NVIC_PriorityGroupConfig(NVIC_PriorityGroup_2); 在标准库里&#xff0c;分组的定义如下&#xff1a; /** defgroup Preemption_Priority_Group * {*/#define NVIC_PriorityGroup_0 ((uint32_t…