在日常的开发工作当中,线程池往往承载着一个应用中最重要的业务逻辑,因此我们有必要更多地去关注线程池的执行情况,包括异常的处理和分析等。本文主要聚焦在如何正确使用线程池上,以及提供一些实用的建议。文中会稍微涉及到一些线程池实现原理方面的知识,但是不会过多展开。
线程池的异常处理
UncaughtExceptionHandler
我们都知道Runnable接口中的run方法是不允许抛出异常的,因此派生出这个线程的主线程可能无法直接获得该线程在执行过程中的异常信息。如下例:
为什么会这样呢?其实我们看一下Thread中的源码就会发现,Thread在执行过程中如果遇到了异常,会先判断当前线程是否有设置UncaughtExceptionHandler,如果没有,则会从线程所在的ThreadGroup中获取。
注意:每个线程都有自己的ThreadGroup,即使你没有指定,并且它实现了UncaughtExceptionHandler接口。
我们看下ThreadGroup中默认的对UncaughtExceptionHandler接口的实现:
这个ThreadGroup如果有父ThreadGroup,则调用父ThreadGroup的uncaughtException,否则调用全局默认的Thread.DefaultUncaughtExceptionHandler,如果全局的handler也没有设置,则只是简单地将异常信息定位到System.err中,这就是为什么我们应当在创建线程的时候,去实现它的UncaughtExceptionHandler接口的原因,这么做可以让你更方便地去排查问题。
通过execute提交任务给线程池
回到线程池这个话题,如果我们向线程池提交的任务中,没有对异常进行try...catch处理,并且运行的时候出现了异常,那会对线程池造成什么影响呢?答案是没有影响,线程池依旧可以正常工作,但是异常却被吞掉了。这通常来说不是一个好事情,因为我们需要拿到原始的异常对象去分析问题。
那么怎样才能拿到原始的异常对象呢?我们从线程池的源码着手开始研究这个问题。当然网上关于线程池的源码解析文章有很多,这里限于篇幅,直接给出最相关的部分代码:
这个方法就是真正去执行提交给线程池的任务的代码。
这里我们略去其中不相关的逻辑,重点关注第19行到第32行的逻辑,其中第23行是真正开始执行提交给线程池的任务,那么第20行是干什么的呢?其实就是在执行提交给线程池的任务之前可以做一些前置工作,同样的,我们看到第31行,这个是在执行完提交的任务之后,可以做一些后置工作。
beforeExecute这个我们暂且不管,重点关注下afterExecute这个方法。我们可以看到,在执行任务过程中,一旦抛出任何类型的异常,都会提交给afterExecute这个方法,然而查看线程池的源代码我们可以发现,默认的afterExecute是个空实现,因此,我们有必要继承ThreadPoolExecutor去实现这个afterExecute方法。
看源码我们可以发现这个afterExecute方法是protected类型的,从官方注释上也可以看到,这个方法就是推荐子类去实现的。
当然,这个方法不能随意去实现,需要遵循一定的步骤,具体的官方注释也有讲,这里摘抄如下
那么通过这种方式,就可以将原先可能被线程池吞掉的异常成功捕获到,从而便于排查问题。
但是这里还有个小问题,我们注意到在runWorker方法中,执行task.run();语句之后,各种类型的异常都被抛出了,那这些被抛出的异常去了哪里?事实上这里的异常对象最终会被传入到Thread的dispatchUncaughtException方法中,源码如下:
可以看到它会去获取UncaughtExceptionHandler的实现类,然后调用其中的uncaughtException方法,这也就回到了我们上一小节所分析的UncaughtExceptionHandler实现的具体逻辑。那么为了拿到最原始的异常对象,除了实现UncaughtExceptionHandler接口之外,也可以考虑实现afterExecute方法。
通过submit提交任务到线程池
这个同样很简单,我们还是先回到submit方法的源码:
这里的execute方法调用的是ThreadPoolExecutor中的execute方法,执行逻辑跟通过execute提交任务到线程池是一样的。我们先重点关注这里的newTaskFor方法,其源码如下:
可以看到提交的Callable对象用FutureTask封装起来了。我们知道最终会执行到上述runWorker这个方法中,并且最核心的执行逻辑就是task.run();这行代码。我们知道这里的task其实是FutureTask类型,因此我们有必要看一下FutureTask中的run方法的实现:
可以看到这其中跟异常相关的最关键的代码就在第17行,也就是setException(ex);这个地方。我们看一下这个地方的实现:
这里最关键的地方就是将异常对象赋值给了outcome,outcome是FutureTask中的成员变量,我们通过调用submit方法,拿到一个Future对象之后,再调用它的get方法,其中最核心的方法就是report方法,下面给出每个方法的源码:
首先是get方法:
可以看到最终调用了report方法,其源码如下:
上面是一些状态判断,如果当前任务不是正常执行完毕,或者被取消的话,那么这里的x其实就是原始的异常对象,可以看到会被ExecutionException包装。因此在你调用get方法时,可能会抛出ExecutionException异常,那么调用它的getCause方法就可以拿到最原始的异常对象了。
综上所述,针对提交给线程池的任务可能会抛出异常这一问题,主要有以下两种处理思路:
- 在提交的任务当中自行try...catch,但这里有个不好的地方就是如果你会提交多种类型的任务到线程池中,每种类型的任务都需要自行将异常try...catch住,比较繁琐。而且如果你只是catch(Exception e),可能依然会漏掉一些包括Error类型的异常,那为了保险起见,可以考虑catch(Throwable t)。
- 自行实现线程池的afterExecute方法,或者实现Thread的UncaughtExceptionHandler接口。
下面给出我个人创建线程池的一个示例,供大家参考:
BlockingQueue queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE); statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, 60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder() .setThreadFactory(new ThreadFactory() { private int count = 0; private String prefix = "StatisticsTask"; @Override public Thread newThread(Runnable r) { return new Thread(r, prefix + "-" + count++); } }).setUncaughtExceptionHandler((t, e) -> { String threadName = t.getName(); logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}