起因
一次开发过程中,送审之后向三方OA系统推送代办,其中由于优化的原因使用到线程池
ExecutorService todoMessageAsyncThread = ThreadPoolManager.getThreadPool("todoMessageAsyncThreadPool");todoMessageAsyncThread.submit(() -> {log.info("processKey:{}, processInstanceId:{}, 开启异步线程推送待办报文。", processKey, processInstanceId);//....实现具体业务log.info("processKey:{}, processInstanceId:{}, 结束异步线程推送待办报文。", processKey, processInstanceId);});
中间有一个判断,该不该推送,然后使用el表达式进行判断,但是测试环境中el表达式配置的不标准,导致现象就是没有推送,也没有日志,子线程就像停住了,没啥动静了。
原因分析
Executors线程池有两种提交线程的方式execute和submit方式,简单测试如下:
@Testpublic void submitTest()throws InterruptedException {Runnable runnable = () -> {int i = 1/0;};ExecutorService threadPool1 = Executors.newFixedThreadPool(5);System.out.println("execute开始执行");threadPool1.execute(runnable);Thread.sleep(1000);System.out.println("--------------------------");System.out.println("submit开始执行");Future<?> submit = threadPool1.submit(runnable);System.out.println("submit返回结果:"+submit);
/*
execute开始执行
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zeroat test.java.util.concurrent.ExecutorsTest.lambda$submitTest$1(ExecutorsTest.java:40)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)at java.base/java.lang.Thread.run(Thread.java:834)
--------------------------
submit开始执行
submit返回结果:java.util.concurrent.FutureTask@22eeefeb[Completed exceptionally: java.lang.ArithmeticException: / by zero]*/}
从测试的结果中可以看出来,execute方法中对异常信息进行的打印,而submit方法中没有对异常信息进行打印,而是将异常信息存储在了返回的future中,只有通过future.get()才能阻塞式的获取异常。
先看看execute的源码中的实现:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1 如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个任务来启动一个新线程。对addWorker的调用以原子方式检查runState和workerCount,从而通过返回false来防止错误警报,这些错误警报会在不应该添加线程的情况下添加线程。* 2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为自上次检查以来已有的线程已经失效),或者池是否在进入该方法后关闭。因此,我们重新检查状态,如果有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。* 3. 如果我们无法对任务进行排队,那么我们将尝试添加一个新线程。如果它失败了,我们知道我们已经关闭或饱和了,所以拒绝执行任务*/int c = ctl.get(); //这里使用32位的int型数据,前3位代表状态,后29位代表线程数,在多线程环境下避免状态恶化线程数不一致if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) //当前线程数少于核心线程数,直接添加到worker中return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) { //如果不能插入核心线程中,就放入到queue中int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) //重新检查状态reject(command);else if (workerCountOf(recheck) == 0) //queue满了addWorker(null, false);}else if (!addWorker(command, false)) //queue满了,放入最大线程中reject(command);}
其中最重要的启动子线程的方法是addWorker方法,将线程封装成Runable,传入execute方法中。
Worker也是一个线程,运行的时候调用worker的run方法:
public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run(); //自定义任务的run方法afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex; //执行后有异常抛异常}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
再来看submit方法的实现源码:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
submit方法在中间调用了execute方法,但是是将子线程封装成了FutureTask,然后调用的execute方法。
这样在执行这个子线程的时候会执行FutureTask的run方法,而在run方法中,callable.call()方法直接被catch,然后将异常信息使用setException方法获取,并将异常设置到outcome里,不会抛异常出去。源码如下:
public void run() {if (state != NEW ||!RUNNER.compareAndSet(this, null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call(); //callable的接口ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex); //这里直接吃掉了}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
解决方案
- 如果没有返回值,建议直接使用execute,不要使用submit
- 自己在业务代码中try-catch-finally
- 重写Runnable的afterExecute
//1.创建一个自己定义的线程池ExecutorService executorService = new ThreadPoolExecutor(2,3,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10)) {//重写afterExecute方法@Overrideprotected void afterExecute(Runnable r, Throwable t) {//这个是excute提交的时候if (t != null) {System.out.println("afterExecute里面获取到excute提交的异常信息,处理异常" + t.getMessage());}//如果r的实际类型是FutureTask 那么是submit提交的,所以可以在里面get到异常if (r instanceof FutureTask) {try {Future<?> future = (Future<?>) r;//get获取异常future.get();} catch (Exception e) {System.out.println("afterExecute里面获取到submit提交的异常信息,处理异常" + e);}}}};//当线程池抛出异常后 executeexecutorService.execute(new task());//当线程池抛出异常后 submitexecutorService.submit(new task());}