从Java 5开始就已经存在ExecutorService
抽象。在这里我们谈论的是2004。 提醒一下:Java 5和6不再受支持,Java 7 将不在半年之内 。 之所以提出这一点,是因为许多Java程序员仍然不完全了解ExecutorService
工作方式。 有很多地方可以学习,今天,我想分享一些鲜为人知的功能和做法。 但是,本文仍然针对中级程序员,没有什么特别高级的。
1.名称池线程
我不能强调这一点。 转储正在运行的JVM的线程时或在调试过程中,默认的线程池命名方案为pool-N-thread-M
,其中N
代表池序列号(每次创建新的线程池时,全局N
计数器都会递增),而M
是池中的线程序列号。 例如, pool-2-thread-3
表示在JVM生命周期中创建的第二个池中的第三个线程。 请参阅: Executors.defaultThreadFactory()
。 描述性不强。 由于命名策略隐藏在ThreadFactory
,因此JDK使得正确命名线程变得有些复杂。 幸运的是,番石榴为此提供了一个帮助器类:
import com.google.common.util.concurrent.ThreadFactoryBuilder;final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Orders-%d").setDaemon(true).build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
默认情况下,线程池会创建非守护线程,并确定是否适合您。
2.根据上下文切换名称
这是我从Supercharged jstack学到的技巧:如何以100mph的速度调试服务器 。 一旦我们记住了线程名称,我们就可以在运行时随时更改它们! 这是有道理的,因为线程转储显示类和方法名称,而不显示参数和局部变量。 通过调整线程名称以保留一些必要的事务标识符,我们可以轻松跟踪哪个消息/记录/查询/等。 缓慢或导致死锁。 例:
private void process(String messageId) {executorService.submit(() -> {final Thread currentThread = Thread.currentThread();final String oldName = currentThread.getName();currentThread.setName("Processing-" + messageId);try {//real logic here...} finally {currentThread.setName(oldName);}});
}
在try
内部- finally
阻止当前线程被命名为Processing-WHATEVER-MESSAGE-ID-IS
。 在跟踪通过系统的消息流时,这可能会派上用场。
3.明确安全关闭
在客户端线程和线程池之间有一个任务队列。 当您的应用程序关闭时,您必须注意两件事:排队任务正在发生的事情以及已运行的任务的行为方式(稍后会详细介绍)。 令人惊讶的是,许多开发人员没有正确或有意识地关闭线程池。 有两种技术:让所有排队的任务执行( shutdown()
)或删除它们( shutdownNow()
)–这完全取决于您的用例。 例如,如果我们提交了一堆任务,并希望所有任务完成后立即返回,请使用shutdown()
:
private void sendAllEmails(List<String> emails) throws InterruptedException {emails.forEach(email ->executorService.submit(() ->sendEmail(email)));executorService.shutdown();final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);log.debug("All e-mails were sent so far? {}", done);
}
在这种情况下,我们发送了一堆电子邮件,每个电子邮件都是线程池中的一个单独任务。 提交这些任务后,我们将关闭池,以使其不再接受任何新任务。 然后,我们最多等待一分钟,直到所有这些任务完成。 但是,如果某些任务仍未完成,则awaitTermination()
将仅返回false
。 此外,待处理的任务将继续处理。 我知道赶时髦的人会去:
emails.parallelStream().forEach(this::sendEmail);
称我为老式,但我喜欢控制并行线程的数量。 没关系,优雅的shutdown()
的替代方法是shutdownNow()
:
final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());
这次所有排队的任务都将被丢弃并返回。 允许已运行的作业继续。
4.小心处理中断
Future
接口鲜为人知的功能是取消。 与其重复自己,不如查看我的较早文章: InterruptedException和中断线程说明
5.监视队列长度并使其有界
大小不正确的线程池可能会导致运行缓慢,不稳定和内存泄漏。 如果配置的线程太少,则会建立队列,从而消耗大量内存。 另一方面,由于上下文切换过多,线程过多会减慢整个系统的速度,并导致相同的症状。 重要的是要查看队列的深度并使其有界,以便过载的线程池只是暂时拒绝新任务:
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,0L, TimeUnit.MILLISECONDS,queue);
上面的代码等效于Executors.newFixedThreadPool(n)
,但是我们使用固定容量为100
ArrayBlockingQueue
代替了默认的无限LinkedBlockingQueue
。 这意味着,如果已经有100个任务排队(并且正在执行n
个任务),则新任务将被RejectedExecutionException
。 另外,由于queue
现在可以从外部使用,因此我们可以定期调用size()
并将其放入日志/ JMX /您使用的任何监视机制中。
6.记住关于异常处理
以下代码段将产生什么结果?
executorService.submit(() -> {System.out.println(1 / 0);
});
我被那太多次咬伤:它不会打印出任何东西 。 没有java.lang.ArithmeticException: / by zero
符号java.lang.ArithmeticException: / by zero
,没有。 线程池只是吞没了这个异常,就好像它从未发生过一样。 如果这是一个很好的从头开始创建的java.lang.Thread
, UncaughtExceptionHandler
可以工作。 但是对于线程池,您必须更加小心。 如果您要提交Runnable
(没有任何结果,如上所示),则必须用try
– catch
至少将其记录下来。 如果要提交Callable<Integer>
,请确保始终使用阻塞get()
取消引用它以重新引发异常:
final Future<Integer> division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();
有趣的是,即使是Spring框架也使用@Async
造成了此错误,请参阅: SPR-8995和SPR-12090 。
7.监视队列中的等待时间
监视工作队列深度是一方面。 但是,在对单个事务/任务进行故障排除时,值得一看的是在提交任务和实际执行之间经过了多少时间。 此持续时间最好应接近0(当池中有一些空闲线程时),但是当必须将任务排队时,它将持续增长。 此外,如果池中没有固定数量的线程,则运行新任务可能需要生成线程,这也消耗了很短的时间。 为了干净地监视此指标,请使用类似于以下内容的东西包装原始ExecutorService
:
public class WaitTimeMonitoringExecutorService implements ExecutorService {private final ExecutorService target;public WaitTimeMonitoringExecutorService(ExecutorService target) {this.target = target;}@Overridepublic <T> Future<T> submit(Callable<T> task) {final long startTime = System.currentTimeMillis();return target.submit(() -> {final long queueDuration = System.currentTimeMillis() - startTime;log.debug("Task {} spent {}ms in queue", task, queueDuration);return task.call();});}@Overridepublic <T> Future<T> submit(Runnable task, T result) {return submit(() -> {task.run();return result;});}@Overridepublic Future<?> submit(Runnable task) {return submit(new Callable<Void>() {@Overridepublic Void call() throws Exception {task.run();return null;}});}//...}
这不是一个完整的实现,但是您可以了解基本思想。 当我们向线程池提交任务时,我们立即开始计算时间。 我们一接到任务就立即停止并开始执行。 不要被源代码中的startTime
和queueDuration
紧密联系着。 实际上,这两行是在不同的线程中求值的,可能相隔数毫秒甚至数秒,例如:
Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue
8.保留客户端堆栈跟踪
最近,反应式编程似乎引起了很多关注。 反应性清单 , 反应性流 , RxJava (刚刚发布1.0!), Clojure代理 , scala.rx …它们都很好用 ,但是堆栈跟踪不再是您的朋友,它们最多没有用。 以提交给线程池的任务中发生的异常为例:
java.lang.NullPointerException: nullat com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
我们很容易发现MyTask
在第76行处抛出了NPE。但是我们不知道谁提交了此任务,因为堆栈跟踪仅显示Thread
和ThreadPoolExecutor
。 从技术上讲,我们可以浏览源代码,以期仅找到创建MyTask
地方。 但是如果没有线程(更不用说事件驱动,反应式,演员忍者编程),我们将立即看到完整的画面。 如果我们可以保留客户端代码(提交任务的代码)的堆栈跟踪并显示出来(例如在失败的情况下)怎么办? 这个想法并不新鲜,例如Hazelcast将异常从所有者节点传播到客户端代码 。 这看起来可能是天真的支持,以便在发生故障时保持客户端堆栈跟踪:
public class ExecutorServiceWithClientTrace implements ExecutorService {protected final ExecutorService target;public ExecutorServiceWithClientTrace(ExecutorService target) {this.target = target;}@Overridepublic <T> Future<T> submit(Callable<T> task) {return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));}private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {return () -> {try {return task.call();} catch (Exception e) {log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);throw e;}};}private Exception clientTrace() {return new Exception("Client stack trace");}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {return tasks.stream().map(this::submit).collect(toList());}//...}
这次如果发生故障,我们将检索提交任务的地方的完整堆栈跟踪和线程名称。 与之前看到的标准异常相比,它具有更大的价值:
Exception java.lang.NullPointerException in task submitted from thrad main here:
java.lang.Exception: Client stack traceat com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
9.首选CompletableFuture
在Java 8中,引入了更强大的CompletableFuture
。 请尽可能使用它。 没有扩展ExecutorService
来支持这种增强的抽象,因此您必须自己照顾它。 代替:
final Future<BigDecimal> future = executorService.submit(this::calculate);
做:
final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);
CompletableFuture
扩展了Future
因此一切都CompletableFuture
运行。 但是,API的更高级的使用者将真正欣赏CompletableFuture
提供的扩展功能。
10.同步队列
SynchronousQueue
是一个有趣的BlockingQueue
,它实际上不是队列。 它本身并不是一个数据结构。 最好将其解释为容量为0的队列。引用JavaDoc:
每个
insert
操作必须等待另一个线程进行相应的remove
操作,反之亦然。 同步队列没有任何内部容量,甚至没有一个容量。 您无法窥视同步队列,因为仅当您尝试删除它时,该元素才存在。 您不能插入元素(使用任何方法),除非另一个线程试图将其删除; 您无法迭代,因为没有要迭代的内容。 […]同步队列类似于CSP和Ada中使用的集合通道。
这与线程池有什么关系? 尝试将SynchronousQueue
与ThreadPoolExecutor
:
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(n, n,0L, TimeUnit.MILLISECONDS,queue);
我们创建了一个线程池,该线程池具有两个线程和一个在其前面的SynchronousQueue
。 由于SynchronousQueue
本质上是一个容量为0的队列,因此,如果有可用的空闲线程,则此类ExecutorService
将仅接受新任务。 如果所有线程都忙,则新任务将立即被拒绝并且永远不会等待。 当后台处理必须立即开始或被丢弃时,此行为可能是理想的。
就是这样,希望您发现至少一个有趣的功能!
翻译自: https://www.javacodegeeks.com/2014/11/executorservice-10-tips-and-tricks.html