在此博客文章中,我们将描述该框架的功能,灵活性和简单性,以展示一个简单的用例。
基础
执行程序框架引入了一个接口来管理任务执行: 执行程序。 Executor是用于提交任务的接口,表示为Runnable实例。 此接口还将任务提交与任务执行隔离开来 :具有不同执行策略的执行者都发布相同的提交接口:如果您更改执行策略,则提交逻辑将不受更改的影响。
如果您想提交一个Runnable实例来执行,它很简单:
Executor exec = …;
exec.execute(runnable);
线程池
如上一节所述,执行器合同未指定执行器如何执行可运行对象:这取决于您所使用的执行器的特定类型。 该框架提供了一些不同类型的执行器,每种执行器都有针对不同用例量身定制的特定执行策略。
您将要处理的最常见的执行程序类型是线程池执行程序 。,它们是ThreadPoolExecutor类(及其子类)的实例。 线程池执行程序管理一个线程池 (即将要执行任务的工作线程池)和一个工作队列 。
您肯定已经在其他技术中看到池的概念。 使用池的主要优点是减少了资源创建的开销,重用了使用后释放的结构(在这种情况下为线程)。 使用池的另一个隐式优势是可以调整资源使用量 :可以调整线程池大小以实现所需的负载,而不会损害系统资源。
该框架为线程池提供了一个工厂类,称为Executors 。 使用该工厂,您将能够创建具有不同特征的线程池。 通常,底层实现通常是相同的( ThreadPoolExecutor ),但是工厂类可帮助您快速配置线程池,而无需使用更复杂的构造函数。 出厂方法是:
- newFixedThreadPool :此方法返回最大大小固定的线程池。 它将根据需要创建新线程,直到最大配置大小。 当线程数达到最大值时,线程池将保持大小不变。
- newCachedThreadPool :此方法返回无限制的线程池,即没有最大大小的线程池。 但是,当负载减少时,这种线程池将拆除未使用的线程。
- newSingleThreadedExecutor :此方法返回一个执行程序,该执行程序保证将在单个线程中执行任务。
- newScheduledThreadPool :此方法返回固定大小的线程池,该线程池支持延迟和定时任务执行。
这仅仅是个开始。 执行器还提供了本教程中未涵盖的其他功能,我强烈建议您学习以下内容:
- 生命周期管理方法,由ExecutorService接口声明(例如shutdown ()和awaitTermination ())。
- 完成服务可轮询任务状态并检索其返回值(如果适用)。
该ExecutorService的接口就显得尤为重要,因为它提供了一种方法来关闭一个线程池,这是一件好事,你几乎肯定希望能够干净利落做。 幸运的是, ExecutorService接口非常简单且易于解释,我建议您彻底研究其JavaDoc。
基本上,您会向ExecutorService发送shutdown ()消息,此后它将不接受新提交的任务,但将继续处理已排队的作业。 您可以使用isTerminated ()来收集执行程序服务的终止状态,也可以使用awaitTermination (…)方法等待终止。 不过, awaitTermination方法不会永远等待:您必须将最大等待超时作为参数传递。
警告 :错误和混乱的根源是理解为什么JVM进程永不退出的原因。 如果不关闭执行程序服务,从而破坏基础线程,则JVM将永远不会退出: JVM在其最后一个非守护线程退出时退出。
配置ThreadPoolExecutor
如果决定手动创建ThreadPoolExecutor而不是使用Executors工厂类,则需要使用其构造函数之一来创建和配置ThreadPoolExecutor 。 此类的最广泛的构造方法是:
public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);
如您所见,您可以配置:
- 核心池大小(线程池将尝试使用的大小)。
- 最大池大小。
- 保持活动时间,在该时间之后,空闲线程有资格被拆除。
- 工作队列中包含等待执行的任务。
- 拒绝任务提交时应用的策略。
限制排队的任务数
在可预测性和稳定性方面,限制正在执行的并发任务的数量,调整线程池的大小对您的应用程序及其执行环境具有巨大的好处:无限制的线程创建最终将耗尽运行时资源,结果您的应用程序可能会遇到严重的性能问题,甚至可能导致应用程序不稳定。
这只是解决部分问题的一种解决方案:您限制了正在执行的任务数量,但没有限制可以提交并排队供以后执行的作业数量。 该应用程序将在以后遇到资源短缺的问题,但是如果提交率始终超过执行率,它将最终遇到这种情况。
该问题的解决方案是:
- 向执行者提供阻塞队列以保留等待的任务。 如果队列已满,提交的任务将被“拒绝”。
- 当拒绝任务提交时,将调用RejectedExecutionHandler ,这就是为什么在上一项中引用了被拒绝的动词的原因。 您可以实施自己的拒绝策略,也可以使用框架提供的内置策略之一。
默认拒绝策略使执行程序抛出RejectedExecutionException 。 但是,其他内置策略可让您:
- 静默丢弃作业。
- 丢弃最旧的作业,然后尝试重新提交最后一份。
- 在调用者的线程上执行被拒绝的任务。
什么时候以及为什么要使用这样的线程池配置? 让我们来看一个例子。
一个示例:并行化独立的单线程任务
最近,有人打电话给我解决我的客户自很久以前就在运行的一项旧工作的问题。 基本上,作业由等待一组目录层次结构上的文件系统事件的组件组成。 每当触发事件时,都必须处理文件。 文件处理由专有的单线程进程执行。 说实话,就其本身的性质而言,即使我可以,但如果我可以并行化它,我就不会。 事件全天的到达率很高,不需要实时处理文件,而只需要在第二天之前进行处理即可。
当前的实现是技术的混合与匹配,包括UNIX shell脚本,该脚本负责扫描巨大的目录层次结构以检测应用更改的位置。 实施该实现后,执行环境中的核心数量也就只有两个。 同样,事件的发生率也很低:如今,它们的数量级约为数百万 ,总共要处理1到2 TB的原始数据。
如今,客户端正在运行这些进程的服务器是十二台核心计算机:这是并行化那些旧的单线程任务的巨大机会。 我们已经基本掌握了配方的所有成分,我们只需要决定如何构建和调整它即可。 在编写任何代码之前,需要进行一些思考以了解负载的性质,这些是我检测到的约束:
- 定期要扫描大量文件:每个目录包含一到两百万个文件。
- 扫描算法非常快,可以并行化。
- 处理文件至少需要1秒,甚至可能需要2或3秒的峰值。
- 处理文件时,除CPU外没有其他瓶颈。
- CPU使用率必须是可调的,以便根据一天中的时间使用不同的负载配置文件。
因此,我需要一个线程池,该线程池的大小由调用流程时活动的负载配置文件确定。 然后,我倾向于创建根据负载策略配置的固定大小的线程池执行程序。 由于处理线程仅受CPU限制,其核心使用率为100%,并且无需等待其他资源,因此负载策略非常容易计算:只需获取处理环境中可用的核心数量,然后使用负载按比例缩小当时处于活动状态的因素(并检查在峰值时刻至少使用了一个内核):
int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);
然后,我需要使用阻塞队列来创建ThreadPoolExecutor来限制提交的任务数。 为什么? 好吧:目录扫描算法非常快,并且会生成大量文件以非常快速地处理。 有多大? 很难预测,其可变性很高。 我不会让执行者的内部队列乱七八糟地用代表我的任务的对象(包括一个非常大的文件描述符)填充。 我宁愿让执行程序在队列填满时拒绝文件。
另外,我将使用ThreadPoolExecutor.CallerRunsPolicy作为拒绝策略。 为什么? 好吧,因为当队列已满并且池中的线程正在忙于处理文件时,我将拥有正在提交执行该文件的任务的线程。 这样,扫描将停止处理文件,并在完成当前任务后立即恢复扫描。
这是创建执行程序的代码:
ExecutorService executorService =new ThreadPoolExecutor(maxThreads, // core thread pool sizemaxThreads, // maximum thread pool size1, // time to wait before resizing poolTimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxThreads, true),new ThreadPoolExecutor.CallerRunsPolicy());
代码的框架如下(已大大简化):
// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {File currentDir = dirsToProcess.pop();// listing childrenFile[] children = currentDir.listFiles();// processing childrenfor (final File currentFile : children) {// if it's a directory, defer processingif (currentFile.isDirectory()) {dirsToProcess.add(currentFile);continue;}executorService.submit(new Runnable() {@Overridepublic void run() {try {// if it's a file, process itnew ConvertTask(currentFile).perform();} catch (Exception ex) {// error management logic}}});
}// ...// wait for all of the executor threads to finish
executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the first tryexecutorService.shutdownNow();}if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the second try}
} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();
}
结论
如您所见,Java并发API非常易于使用,非常灵活并且功能强大。 几年前,我会花更多的精力编写这样一个简单的程序。 这样,我可以在几个小时内快速解决由遗留的单线程组件引起的可伸缩性问题。
参考: The Gray Blog中的 JCG合作伙伴 Enrico Crisostomo 使用ThreadPoolExecutor并行化独立的单线程任务 。
- 受限连接池的阻塞队列示例
- 更一般的等待/通知机制的CountDownLatch示例
- 任务运行器的重入锁示例
- 限制URL连接的信号量示例
相关文章 :
- Java并发教程–线程池
- 有益的CountDownLatch和棘手的Java死锁
- 并发优化–减少锁粒度
- Java并发教程– CountDownLatch
- JVM如何处理锁
- Java教程和Android教程列表
翻译自: https://www.javacodegeeks.com/2011/12/using-threadpoolexecutor-to-parallelize.html