一、先说结论
得看线程池的实现,JUC 的线程池(ThreadPoolExecutor)的话
- 不会影响其他的线程
- 若是 submit 方法,或者任务为 future 任务,异常只有在 get 的时候才会抛出
- 若是 execute + runnable 任务,异常就会抛出,线程挂掉后,线程池移除该线程并创建一个新的线程
若通过 submit 提交任务,会将任务封装到 future 里面或者原任务本身就是 future 任务,而 future 的 run 方法执行的时候,抛出的异常在其内部会被捕获,等到 get 方法的时候才会抛出。
execute + runnable,并没有涉及 future,虽然异常会被捕获,但也以为被重新抛出,导致线程中断,线程池需要移除线程,并创建新的线程
二、submit、execute 方法
线程运行任务核心代码
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
三、异常处理
比较全面的处理,主要是,afterExecute 方法,这个哪怕抛出异常也会执行
new ThreadPoolExecutor(IO_CORE,IO_MAX,KEEP_ALIVE_SECOND,TimeUnit.SECONDS,// 任务队列存储超过核心线程数的任务new LinkedBlockingDeque<>(QUEUE_SIZE),r -> {Thread thread = new Thread(r);thread.setDaemon(Boolean.TRUE);thread.setName(String.format("%s, message-process-thread-%d", threadName, NUM.getAndIncrement()));return thread;}
) {@Overrideprotected void afterExecute(Runnable r, Throwable t) {// 若 t 不为 null,正常处理if (Objects.nonNull(t)) {log.error(t.getMessage());}// 特别注意的是 futureTask 在 run 的时候不会立即抛异常,而是吞掉,在调用 get 的时候才能抛出// 如果是 submit 提交,原本的任务被封装成 futureTask,异常不会在 t 里,而是在 futureTask 里(但原本的任务是 futureTask 的话,则应该是原本的任务 get 的时候才会抛异常)// 如果是 execute,则 r 还是原来的任务,但不排除 r 本来就是 futureTask,那么其错误信息本来就应该通过 get 获取,在这里处理一下也无妨,不影响原本的处理结果即可// 原任务为 futureTask 的时候,get 时一定要处理异常if (r instanceof Future<?> futureTask) {try {futureTask.get();} catch (Exception e) {log.error(e.getMessage());}}}
};
log.info("线程池已经初始化");
EXECUTOR.allowCoreThreadTimeOut(Boolean.TRUE);
// JVM 关闭时的钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread("IO 密集型任务线程池", (Callable<Void>) () -> {shutdownThreadPoolGracefully(EXECUTOR);return null;})
);