日常工作中有用到线程池吗?什么是线程池?为什么要使用线程池?
作为 JUC 包下的门面担当,线程池是名副其实的 JUC 一哥,不了解线程池,那说明你对 JUC 包其他工具也了解的不咋样吧,对 JUC 没深入研究过,那就是没掌握到 Java 的精髓,给面试官这样一个印象,那结果可想而知了。
可以这样说:计算机发展到现在,摩尔定律
在现有工艺水平下已经遇到难易突破的物理瓶颈,通过多核 CPU 并行计算来提升服务器的性能已经成为主流,随之出现了多线程技术
。
线程作为操作系统宝贵的资源,对它的使用需要进行控制管理,线程池就是采用池化思想(类似连接池、常量池、对象池等)管理线程的工具。
JUC 提供了 ThreadPoolExecutor 体系类来帮助我们更方便的管理线程、并行执行任务。
下图是 Java 线程池继承体系:
顶级接口Executor提供了一种方式,解耦任务的提交和执行,只定义了一个 execute(Runnable command) 方法用来提交任务,至于具体任务怎么执行则交给他的实现者去自定义实现。
ExecutorService 接口继承 Executor,且扩展了生命周期管理的方法、返回 Futrue 的方法、批量提交任务的方法。
AbstractExecutorService 抽象类继承 ExecutorService 接口,对 ExecutorService 相关方法提供了默认实现,用 RunnableFuture 的实现类 FutureTask 包装 Runnable 任务,交给 execute() 方法执行,然后可以从该 FutureTask 阻塞获取执行结果,并且对批量任务的提交做了编排。
ThreadPoolExecutor 继承 AbstractExecutorService,采用池化思想管理一定数量的线程来调度执行提交的任务,且定义了一套线程池的生命周期状态,用一个 ctl 变量来同时保存当前池状态(高3位)和当前池线程数(低29位)。看过源码的小伙伴会发现,ThreadPoolExecutor 类里的方法大量有同时需要获取或更新池状态和池当前线程数的场景,放一个原子变量里,可以很好的保证数据的一致性以及代码的简洁性,说到 ctl 了,可以顺便讲下几个状态之间的流转过程。
// 用此变量保存当前池状态(高3位)和当前线程数(低29位)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 可以接受新任务提交,也会处理任务队列中的任务// 结果:111跟29个0:111 00000000000000000000000000000private static final int RUNNING = -1 << COUNT_BITS;// 不接受新任务提交,但会处理任务队列中的任务// 结果:000 00000000000000000000000000000private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接受新任务,不执行队列中的任务,且会中断正在执行的任务// 结果:001 00000000000000000000000000000private static final int STOP = 1 << COUNT_BITS;// 任务队列为空,workerCount = 0,线程池的状态在转换为TIDYING状态时,会执行钩子方法terminated()// 结果:010 00000000000000000000000000000private static final int TIDYING = 2 << COUNT_BITS;// 调用terminated()钩子方法后进入TERMINATED状态// 结果:010 00000000000000000000000000000private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctl// 低29位变为0,得到了线程池的状态private static int runStateOf(int c) { return c & ~CAPACITY; }// 高3位变为为0,得到了线程池中的线程数private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
使用线程池可以带来以下好处:
1、降低资源消耗。降低频繁创建、销毁线程带来的额外开销,复用已创建线程 ;
2、降低使用复杂度。将任务的提交和执行进行解耦,我们只需要创建一个线程池,然后往里面提交任务就行,3、具体执行流程由线程池自己管理,降低使用复杂度;
4、提高线程可管理性。能安全有效的管理线程资源,避免不加限制无限申请造成资源耗尽风险;
5、提高响应速度。任务到达后,直接复用已创建好的线程执行。
线程池的使用场景简单来说可以有:
1、快速响应用户请求,响应速度优先。比如一个用户请求,需要通过 RPC 调用好几个服务去获取数据然后聚合返回,此场景就可以用线程池并行调用,响应时间取决于响应最慢的那个 RPC 接口的耗时;又或者一个注册请求,注册完之后要发送短信、邮件通知,为了快速返回给用户,可以将该通知操作丢到线程池里异步去执行,然后直接返回客户端成功,提高用户体验;
2、单位时间处理更多请求,吞吐量优先。比如接受 MQ 消息,然后去调用第三方接口查询数据,此场景并不追求快速响应,主要利用有限的资源在单位时间内尽可能多的处理任务,可以利用队列进行任务的缓冲。
基于以上使用场景,可以套到自己项目中,说下为了提升系统性能,自己对负责的系统模块使用线程池做了哪些优化,优化前后对比 Qps 提升多少、Rt 降低多少、服务器数量减少多少等等。
ThreadPoolExecutor 都有哪些核心参数?
尽量的熟悉线程池执行流程。
不能仅仅掌握以下内容:
包含核心线程数(corePoolSize)、最大线程数(maximumPoolSize),空闲线程超时时间(keepAliveTime)、时间单位(unit)、阻塞队列(workQueue)、拒绝策略(handler)、线程工厂(ThreadFactory)这7个参数。
还需要深入理解:
会再主动描述下线程池的执行流程,也就是 execute() 方法执行流程。
execute()方法执行逻辑如下:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}
可以总结出如下主要执行流程,当然看上述代码会有一些异常分支判断,可以自己梳理加到下述执行主流程里
1、判断线程池的状态,如果不是RUNNING状态,直接执行拒绝策略;
2、如果当前线程数 < 核心线程池,则新建一个线程来处理提交的任务;
3、如果当前线程数 > 核心线程数且任务队列没满,则将任务放入阻塞队列等待执行;
4、如果 核心线程池 < 当前线程池数 < 最大线程数,且任务队列已满,则创建新的线程执行提交的任务;
5、如果当前线程数 > 最大线程数,且队列已满,则执行拒绝策略拒绝该任务。
使用场景也要了解:
执行流程主要用在 CPU 密集型场景下。
像 Tomcat、Dubbo 这类框架,他们内部的线程池主要用来处理网络 IO 任务的,所以他们都对 JUC 线程池的执行流程进行了调整来支持 IO 密集型场景使用。
他们提供了阻塞队列 TaskQueue,该队列继承 LinkedBlockingQueue,重写了 offer() 方法来实现执行流程的调整。
@Overridepublic boolean offer(Runnable o) {// we can't do any checksif (parent==null) return super.offer(o);// we are maxed out on threads, simply queue the objectif (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);// we have idle threads, just add it to the queueif (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);// if we have less threads than maximum force creation of a new threadif (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;// if we reached here, we need to add it to the queuereturn super.offer(o);}
可以看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象。
1、如果 parent 为 null,直接调用父类 offer 方法入队;
2、如果当前线程数等于最大线程数,则直接调用父类 offer()方法入队;
3、如果当前未执行的任务数量小于等于当前线程数,仔细思考下,是不是说明有空闲的线程呢,那么直接调用父类 offer() 入队后就马上有线程去执行它;
4、如果当前线程数小于最大线程数量,则直接返回 false,然后回到 JUC 线程池的执行流程回想下,是不是就去添加新线程去执行任务了呢;
5、其他情况都直接入队。
可以看出当当前线程数大于核心线程数时,JUC 原生线程池首先是把任务放到队列里等待执行,而不是先创建线程执行。
如果 Tomcat 接收的请求数量大于核心线程数,请求就会被放到队列中,等待核心线程处理,如果并发量很大,就会在队列里堆积大量任务,这样会降低请求的总体响应速度。
所以 Tomcat并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer() 方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:
1、判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务;
2、如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务;
3、如果当前线程数等于最大线程数,则将任务放入任务队列等待执行;
4、如果队列已满,则执行拒绝策略。
而且 Tomcat 会做核心线程预热,在创建好线程池后接着会去创建核心线程并启动,服务启动后就可以直接接受客户端请求进行处理了,避免了冷启动问题。
然后再说下线程池的 Worker 线程模型,继承 AQS 实现了锁机制。线程启动后执行 runWorker() 方法,runWorker() 方法中调用 getTask() 方法从阻塞队列中获取任务,获取到任务后先执行 beforeExecute() 钩子函数,再执行任务,然后再执行 afterExecute() 钩子函数。若超时获取不到任务会调用 processWorkerExit() 方法执行 Worker 线程的清理工作。
runworker()、getTask()、addWorker()
刚说到了 Worker 继承 AQS 实现了锁机制,那ThreadPoolExecutor 都用到了哪些锁?为什么要用锁?
1)mainLock 锁
ThreadPoolExecutor 内部维护了 ReentrantLock 类型锁 mainLock,在访问 workers 成员变量以及进行相关数据统计记账(比如访问 largestPoolSize、completedTaskCount)时需要获取该重入锁。
为什么要有 mainLock?
private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/private final HashSet<Worker> workers = new HashSet<Worker>();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;
可以看到 workers 变量用的 HashSet 是线程不安全的,是不能用于多线程环境的。largestPoolSize、completedTaskCount 也是没用 volatile 修饰,所以需要在锁的保护下进行访问。
为什么不直接用个线程安全容器呢?
其实 Doug 老爷子在 mainLock 变量的注释上解释了,意思就是说事实证明,相比于线程安全容器,此处更适合用 lock,主要原因之一就是串行化 interruptIdleWorkers() 方法,避免了不必要的中断风暴。
怎么理解这个中断风暴呢?
其实简单理解就是如果不加锁,interruptIdleWorkers() 方法在多线程访问下就会发生这种情况。一个线程调用interruptIdleWorkers() 方法对 Worker 进行中断,此时该 Worker 出于中断中状态,此时又来一个线程去中断正在中断中的 Worker 线程,这就是所谓的中断风暴。
那 largestPoolSize、completedTaskCount 变量加个 volatile 关键字修饰是不是就可以不用 mainLock 了?
其他一些内部变量能用 volatile 的都加了 volatile 修饰了,这两个没加主要就是为了保证这两个参数的准确性,在获取这两个值时,能保证获取到的一定是修改方法执行完成后的值。如果不加锁,可能在修改方法还没执行完成时,此时来获取该值,获取到的就是修改前的值,然后修改方法一提交,就会造成获取到的数据不准确了。
2)Worker 线程锁
刚也说了 Worker 线程继承 AQS,实现了 Runnable 接口,内部持有一个 Thread 变量,一个 firstTask,及 completedTasks 三个成员变量。
基于 AQS 的 acquire()、tryAcquire() 实现了 lock()、tryLock() 方法,类上也有注释,该锁主要是用来维护运行中线程的中断状态。在 runWorker() 方法中以及刚说的 interruptIdleWorkers() 方法中用到了。
这个维护运行中线程的中断状态怎么理解呢?
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }
在runWorker() 方法中获取到任务开始执行前,需要先调用 w.lock() 方法,lock() 方法会调用 tryAcquire() 方法,tryAcquire() 实现了一把非重入锁,通过 CAS 实现加锁。
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}
final void runWorker(ThreadPoolExecutor.Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock(); // 需要先调用 w.lock() 方法// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
interruptIdleWorkers() 方法会中断那些等待获取任务的线程,会调用 w.tryLock() 方法来加锁,如果一个线程已经在执行任务中,那么 tryLock() 就获取锁失败,就保证了不能中断运行中的线程了。
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}
在项目中是怎样使用线程池的?Executors 了解吗?
现在大多数公司都在遵循阿里巴巴 Java 开发规范,该规范里明确说明不允许使用 Executors 创建线程池,而是通过 ThreadPoolExecutor 显示指定参数去创建。
Executors 创建的线程池有发生 OOM 的风险。
Executors.newFixedThreadPool 和 Executors.SingleThreadPool 创建的线程池内部使用的是无界(Integer.MAX_VALUE)的 LinkedBlockingQueue 队列,可能会堆积大量请求,导致 OOM。
Executors.newCachedThreadPool 和 Executors.scheduledThreadPool 创建的线程池最大线程数是用的Integer.MAX_VALUE,可能会创建大量线程,导致 OOM。
自己在日常工作中也有封装类似的工具类,但是都是内存安全的,参数需要自己指定适当的值,也有基于 LinkedBlockingQueue 实现了内存安全阻塞队列 MemorySafeLinkedBlockingQueue,当系统内存达到设置的最大剩余阈值时,就不在往队列里添加任务了,避免发生 OOM。
package cn.com.codingce.juc.thread;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** 线程池工具类*/
public class ThreadPoolUtils {/*** 线程池*/private final ThreadPoolExecutor executor;/*** 线程工厂*/private CustomThreadFactory threadFactory;/*** 异步执行结果*/private final List<CompletableFuture<Void>> completableFutures;/*** 拒绝策略*/private final CustomAbortPolicy abortPolicy;/*** 失败数量*/private final AtomicInteger failedCount;public ThreadPoolUtils(int corePoolSize, int maximumPoolSize, int queueSize, String poolName) {this.failedCount = new AtomicInteger(0);this.abortPolicy = new CustomAbortPolicy();this.completableFutures = new ArrayList<>();this.threadFactory = new CustomThreadFactory(poolName);this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize), this.threadFactory, abortPolicy);}/*** 执行任务*/public void execute(Runnable runnable) {CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executor);// 设置好异常情况future.exceptionally(e -> {failedCount.incrementAndGet();System.out.println("Task Failed..." + e);e.printStackTrace();return null;});// 任务结果列表completableFutures.add(future);}/*** 执行自定义runnable接口(可省略,只是加了个获取taskName)*/public void execute(SimpleTask runnable) {CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executor);// 设置好异常情况future.exceptionally(e -> {failedCount.incrementAndGet();System.out.println("Task [" + runnable.taskName + "] Failed..." + e);e.printStackTrace();return null;});// 任务结果列表completableFutures.add(future);}/*** 停止线程池*/public void shutdown() {executor.shutdown();System.out.println("************************停止线程池************************");System.out.println("** 活动线程数:" + executor.getActiveCount() + "\t\t\t\t\t\t\t\t\t\t**");System.out.println("** 等待任务数:" + executor.getQueue().size() + "\t\t\t\t\t\t\t\t\t\t**");System.out.println("** 完成任务数:" + executor.getCompletedTaskCount() + "\t\t\t\t\t\t\t\t\t\t**");System.out.println("** 全部任务数:" + executor.getTaskCount() + "\t\t\t\t\t\t\t\t\t\t**");System.out.println("** 拒绝任务数:" + abortPolicy.getRejectCount() + "\t\t\t\t\t\t\t\t\t\t**");System.out.println("** 成功任务数:" + (executor.getCompletedTaskCount() - failedCount.get()) + "\t\t\t\t\t\t\t\t\t\t**");System.out.println("** 异常任务数:" + failedCount.get() + "\t\t\t\t\t\t\t\t\t\t**");System.out.println("**********************************************************");}/*** 获取任务执行情况* 之所以遍历taskCount数的CompletableFuture,是因为如果有拒绝的任务,相应的CompletableFuture也会放进列表,而这种CompletableFuture调用get方法,是会永远阻塞的。*/public boolean getExecuteResult() {// 任务数,不包含拒绝的任务long taskCount = executor.getTaskCount();for (int i = 0; i < taskCount; i++) {CompletableFuture<Void> future = completableFutures.get(i);try {// 获取结果,这个是同步的,目的是获取真实的任务完成情况future.get();} catch (InterruptedException | ExecutionException e) {System.out.println("java.util.concurrent.CompletableFuture.get() Failed ..." + e);return false;}// 出现异常,falseif (future.isCompletedExceptionally()) {return false;}}return true;}/*** 线程工厂*/private static class CustomThreadFactory implements ThreadFactory {private final String poolName;private final AtomicInteger count;private CustomThreadFactory(String poolName) {this.poolName = poolName;this.count = new AtomicInteger(0);}@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);// 线程名,利于排查thread.setName(poolName + "-[线程" + count.incrementAndGet() + "]");return thread;}}/*** 自定义拒绝策略*/private static class CustomAbortPolicy implements RejectedExecutionHandler {/*** 拒绝的任务数*/private final AtomicInteger rejectCount;private CustomAbortPolicy() {this.rejectCount = new AtomicInteger(0);}private AtomicInteger getRejectCount() {return rejectCount;}/*** 这个方法,如果不抛异常,则执行此任务的线程会一直阻塞*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {System.out.println("Task " + r.toString() + " rejected from " + e.toString() + " 累计:" + rejectCount.incrementAndGet());}}/*** 只是加了个taskName,可自行实现更加复杂的逻辑*/public abstract static class SimpleTask implements Runnable {/*** 任务名称*/private String taskName;public void setTaskName(String taskName) {this.taskName = taskName;}}}
测试:
package cn.com.codingce.juc.thread;import java.util.Random;/*** 线程池工具类*/
public class ThreadPoolUtilsMain {public static void main(String[] args) throws InterruptedException {test();}public static void test() throws InterruptedException {ThreadPoolUtils pool = new ThreadPoolUtils(5, 5, 10, "A业务线程池");// 14个正常任务for (int i = 0; i < 14; i++) {pool.execute(new ThreadPoolUtils.SimpleTask() {@Overridepublic void run() {try {// 模拟任务耗时Thread.sleep(600);} catch (InterruptedException e) {e.printStackTrace();}// 随机名称String taskName = randomName();super.setTaskName(taskName);System.out.println(Thread.currentThread().getName() + "-执行【" + taskName + "】");}});}// 1个异常任务pool.execute(new ThreadPoolUtils.SimpleTask() {@Overridepublic void run() {// 随机名称String taskName = randomName();super.setTaskName(taskName);throw new RuntimeException("执行【" + taskName + "】" + "异常");}});// 5个多余用来拒绝的任务for (int i = 0; i < 5; i++) {pool.execute(new ThreadPoolUtils.SimpleTask() {@Overridepublic void run() {// 随机名称String taskName = randomName();super.setTaskName(taskName);throw new RuntimeException("多余任务");}});}System.out.println("任务完成情况:" + pool.getExecuteResult());pool.shutdown();Thread.sleep(20000);}private static String randomName() {return "任务" + (char) (new Random().nextInt(60) + 65);}}
输出
Task java.util.concurrent.CompletableFuture$AsyncRun@378bf509 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:1
Task java.util.concurrent.CompletableFuture$AsyncRun@2d98a335 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:2
Task java.util.concurrent.CompletableFuture$AsyncRun@16b98e56 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:3
Task java.util.concurrent.CompletableFuture$AsyncRun@7ef20235 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:4
Task java.util.concurrent.CompletableFuture$AsyncRun@27d6c5e0 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:5
A业务线程池-[线程1]-执行【任务{】
A业务线程池-[线程4]-执行【任务H】
A业务线程池-[线程2]-执行【任务r】
A业务线程池-[线程5]-执行【任务N】
A业务线程池-[线程3]-执行【任务O】
A业务线程池-[线程1]-执行【任务[】
A业务线程池-[线程5]-执行【任务n】
A业务线程池-[线程4]-执行【任务M】
A业务线程池-[线程3]-执行【任务V】
A业务线程池-[线程2]-执行【任务n】
Task [任务J] Failed...java.util.concurrent.CompletionException: java.lang.RuntimeException: 执行【任务J】异常
java.util.concurrent.CompletionException: java.lang.RuntimeException: 执行【任务J】异常at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1629)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 执行【任务J】异常at cn.com.codingce.juc.thread.ThreadPoolUtilsMain$2.run(ThreadPoolUtilsMain.java:43)at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)... 3 more
A业务线程池-[线程1]-执行【任务Q】
A业务线程池-[线程3]-执行【任务C】
A业务线程池-[线程4]-执行【任务q】
A业务线程池-[线程5]-执行【任务t】
java.util.concurrent.CompletableFuture.get() Failed ...java.util.concurrent.ExecutionException: java.lang.RuntimeException: 执行【任务J】异常
任务完成情况:false
************************停止线程池************************
** 活动线程数:0 **
** 等待任务数:0 **
** 完成任务数:15 **
** 全部任务数:15 **
** 拒绝任务数:5 **
** 成功任务数:14 **
** 异常任务数:1 **
**********************************************************
我们一般都是在 Spring 环境中使用线程池的,直接使用 JUC 原生 ThreadPoolExecutor 有个问题,Spring 容器关闭的时候可能任务队列里的任务还没处理完,有丢失任务的风险。
我们知道 Spring 中的 Bean 是有生命周期的,如果 Bean 实现了 Spring 相应的生命周期接口(InitializingBean、DisposableBean接口),在 Bean 初始化、容器关闭的时候会调用相应的方法来做相应处理。
所以最好不要直接使用 ThreadPoolExecutor 在 Spring 环境中,可以使用 Spring 提供的 ThreadPoolTaskExecutor。
也会按业务类型进行线程池隔离,各任务执行互不影响,避免共享一个线程池,任务执行参差不齐,相互影响,高耗时任务会占满线程池资源,导致低耗时任务没机会执行;同时如果任务之间存在父子关系,可能会导致死锁的发生,进而引发 OOM。
使用线程池的常规操作是通过 @Bean 定义多个业务隔离的线程池实例。我们是参考美团线程池实践那篇文章做了一个动态可监控线程池的轮子,而且利用了 Spring 的一些特性,将线程池实例都配置在配置中心里,服务启动的时候会从配置中心拉取配置然后生成 BeanDefination 注册到 Spring 容器中,在 Spring 容器刷新时会生成线程池实例注册到 Spring 容器中。这样我们业务代码就不用显式用 @Bean 声明线程池了,可以直接通过依赖注入的方式使用线程池,而且也可以动态调整线程池的参数了。
@EnableAsync
@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolTaskExecutor excelExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(100000);executor.setKeepAliveSeconds(60);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());executor.setThreadNamePrefix("excel-");return executor;}
}
应用
@Resource
ThreadPoolTaskExecutor excelExecutor;// ...
List<CompletableFuture<List<SimplePeOnlineLabelDto>>> list = new ArrayList<>();List<List<String>> reportList = ListUtil.split(reportIds, 200);
reportList.forEach(ls -> {CompletableFuture<List<SimplePeOnlineLabelDto>> completableFuture = CompletableFuture.supplyAsync(() -> peOnlineDataService.getPeOnlineReptLabelData(ls),excelExecutor);list.add(completableFuture);
});for (CompletableFuture<List<SimplePeOnlineLabelDto>> listCompletableFuture : list) {log.info("========================读取数据=========================");try {List<SimplePeOnlineLabelDto> simplePeOnlineLabelDtoList = listCompletableFuture.get();peOnlineData.addAll(simplePeOnlineLabelDtoList);}catch (Exception e){log.error("========================读取数据失败=========================");}
}
// ...
通过 ThreadPoolExecutor 来创建线程池,核心参数设置多少合适呢?
可能很多人都看到过《Java 并发编程实践》这本书里介绍的一个线程数计算公式:
Ncpu = CPU 核数
Ucpu = 目标 CPU 利用率,0 <= Ucpu <= 1
W / C = 等待时间 / 计算时间
要程序跑到 CPU 的目标利用率,需要的线程数为:
Nthreads = Ncpu * Ucpu * (1 + W / C)
这公式太偏理论化了,很难实际落地下来,首先很难获取准确的等待时间和计算时间。再着一个服务中会运行着很多线程,比如 Tomcat 有自己的线程池、Dubbo 有自己的线程池、GC 也有自己的后台线程,我们引入的各种框架、中间件都有可能有自己的工作线程,这些线程都会占用 CPU 资源,所以通过此公式计算出来的误差一定很大。
所以说怎么确定线程池大小呢?
其实没有固定答案,需要通过压测不断的动态调整线程池参数
,观察 CPU 利用率、系统负载、GC、内存、RT、吞吐量
等各种综合指标数据
,来找到一个相对比较合理的值
。
所以不要再问设置多少线程合适了,这个问题没有标准答案,需要结合业务场景,设置一系列数据指标,排除可能的干扰因素,注意链路依赖(比如连接池限制、三方接口限流),然后通过不断动态调整线程数,测试找到一个相对合适的值。
线程池是如何监控的?
因为线程池的运行相对而言是个黑盒,它的运行我们感知不到,该问题主要考察怎么感知线程池的运行情况。
对线程池 ThreadPoolExecutor 做了一些增强,做了一个线程池管理框架。主要功能有监控告警、动态调参。主要利用了 ThreadPoolExecutor 类提供的一些 set、get方法以及一些钩子函数。
动态调参是基于配置中心实现的,核心参数配置在配置中心,可以随时调整、实时生效,利用了线程池提供的 set 方法。
监控,主要就是利用线程池提供的一些 get 方法来获取一些指标数据,然后采集数据上报到监控系统进行大盘展示。也提供了 Endpoint 实时查看线程池指标数据。
同时定义了5中告警规则。
线程池活跃度告警。活跃度 = activeCount / maximumPoolSize,当活跃度达到配置的阈值时,会进行事前告警;
队列容量告警。容量使用率 = queueSize / queueCapacity,当队列容量达到配置的阈值时,会进行事前告警;
拒绝策略告警。当触发拒绝策略时,会进行告警;
任务执行超时告警。重写 ThreadPoolExecutor 的 afterExecute() 和 beforeExecute(),根据当前时间和开始时间的差值算出任务执行时长,超过配置的阈值会触发告警;
任务排队超时告警。重写 ThreadPoolExecutor 的 beforeExecute(),记录提交任务时时间,根据当前时间和提交时间的差值算出任务排队时长,超过配置的阈值会触发告警。
通过监控 + 告警可以让我们及时感知到我们业务线程池的执行负载情况,第一时间做出调整,防止事故的发生。
execute() 提交任务和 submit() 提交任务有啥不同?
看到这个问题,是不是大多数人都觉得这个我行。execute() 无返回值,submit() 有返回值,会返回一个 FutureTask,然后可以调用 get() 方法阻塞获取返回值。
这个问题主要知道 FutureTask 的实现原理,FutureTask 继承体系如下:
调用 submit() 方法提交的任务(Runnable or Callable)会被包装成 FutureTask() 对象。FutureTask 类提供了 7 种任务状态和五个成员变量。
/** Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/// 构造函数中 state 置为 NEW,初始态private static final int NEW = 0;// 瞬时态,表示完成中private static final int COMPLETING = 1;// 正常执行结束后的状态private static final int NORMAL = 2;// 异常执行结束后的状态private static final int EXCEPTIONAL = 3;// 调用 cancel 方法成功执行后的状态private static final int CANCELLED = 4;// 瞬时态,中断中private static final int INTERRUPTING = 5;// 正常执行中断后的状态private static final int INTERRUPTED = 6;// 任务状态,以上 7 种private volatile int state;/** 通过 submit() 提交的任务,执行完后置为 null*/private Callable<V> callable;/** 任务执行结果或者调用 get() 要抛出的异常*/private Object outcome; // non-volatile, protected by state reads/writes/** 执行任务的线程,会在 run() 方法中通过 cas 赋值*/private volatile Thread runner;/** 调用get()后由等待线程组成的无锁并发栈,通过 cas 实现无锁*/private volatile WaitNode waiters;
创建 FutureTask 对象时 state 置为 NEW,callable 赋值为我们传入的任务。
run() 方法中会去执行 callable 任务。执行之前先判断任务处于 NEW 状态并且通过 cas 设置 runner 为当前线程成功。然后去调用 call() 执行任务,执行成功后会调用 set() 方法将结果赋值给 outcome,任务执行抛出异常后会将异常信息调用 setException() 赋值给 outcome。至于为什么要先将状态变为 COMPLETING,再变为 NORMAL,主要是为了保证在 NORMAL 态时已经完成了 outcome 赋值。finishCompletion() 会去唤醒(通过 LockSupport.unpark())那些因调用 get() 而阻塞的线程(waiters)。
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}
调用 get() 方法会阻塞获取结果(或异常),如果 state > COMPLETING,说明任务已经执行完成(NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED),则直接通过 report() 方法返回结果或抛出异常。如果state <= COMPLETING,说明任务还在执行中或还没开始执行,则调用 awaitDone() 方法进行阻塞等待。
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}
awaitDone() 方法则通过 state 状态判断来决定直接返回还是将当前线程添加到 waiters 里,然后调用LockSupport.park() 方法挂起当前线程。
还有个重要的 cancel() 方法,因为 FutureTask 源码类注释的第一句就说了 FutureTask 是一个可取消的异步计算。代码也非常简单,如果 state 不是 NEW 或者通过 CAS 赋值为 INTERRUPTING / CANCELLED 失败则直接返回。反之如果 mayInterruptIfRunning = ture,表示可能中断在运行中线程,则中断线程,state 变为 INTERRUPTED,最后去唤醒等待的线程。
public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}
以上简单介绍了下 FutureTask 的执行流程,篇幅有限,源码解读的不是很仔细,后面可以考虑单独出一篇文章好好分析下 FutureTask 的源码。
什么是阻塞队列?阻塞队列有哪些?
阻塞队列 BlockingQueue 继承 Queue,是我们熟悉的基本数据结构队列的一种特殊类型。
当从阻塞队列中获取数据时,如果队列为空,则等待直到队列有元素存入。当向阻塞队列中存入元素时,如果队列已满,则等待直到队列中有元素被移除。提供 offer()、put()、take()、poll() 等常用方法。
JDK 提供的阻塞队列的实现有以下前 7 种:
1)ArrayBlockingQueue:由数组实现的有界阻塞队列,该队列按照 FIFO 对元素进行排序。维护两个整形变量,标识队列头尾在数组中的位置,在生产者放入和消费者获取数据共用一个锁对象,意味着两者无法真正的并行运行,性能较低;
2)LinkedBlockingQueue:由链表组成的有界阻塞队列,如果不指定大小,默认使用 Integer.MAX_VALUE 作为队列大小,该队列按照 FIFO 对元素进行排序,对生产者和消费者分别维护了独立的锁来控制数据同步,意味着该队列有着更高的并发性能;
3)SynchronousQueue:不存储元素的阻塞队列,无容量,可以设置公平或非公平模式,插入操作必须等待获取操作移除元素,反之亦然;
4)PriorityBlockingQueue:支持优先级排序的无界阻塞队列,默认情况下根据自然序排序,也可以指定 Comparator;
5)DelayQueue:支持延时获取元素的无界阻塞队列,创建元素时可以指定多久之后才能从队列中获取元素,常用于缓存系统或定时任务调度系统;
6)LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,与LinkedBlockingQueue相比多了transfer和tryTranfer方法,该方法在有消费者等待接收元素时会立即将元素传递给消费者;
7)LinkedBlockingDeque:一个由链表结构组成的双端阻塞队列,可以从队列的两端插入和删除元素;
8)VariableLinkedBlockingQueue:说完以上 JDK 提供这几个阻塞队列后,还可以说下 LinkedBlockingQueue 是我们使用最广泛的阻塞队列,但是 LinkedBlockingQueue 一旦定义好后是不能修改容量 capacity 的。自己在使用线程池的过程中有动态去调整容量的需求,所以参考 RabbitMq 里的 VariableLinkedBlockingQueue,实现了一个可以调整容量的增强版 LinkedBlockingQueue,实现容量的动态调整;
9)MemorySafeLinkedBlockingQueue:而且 LinkedBlockingQueue 默认是使用 Integer.MAX_VALUE 作为容量的,也就是个无界队列,可能会有发生 OOM 的风险,所以自己实现了一个内存安全的 MemorySafeLinkedBlockingQueue,可以配置最大剩余内存,当内存达到该值的时候,再往队列放任务就会失败,很好的保证了不会发生令人头疼的 OOM 问题;
10)TaskQueue:上面讲 Tomcat 线程池时说过该阻塞队列,作为 LinkedBlockingQueue 的子类,覆写了 offer()、poll()、take() 等方法来调整线程池的执行流程。
重点说下 8、9 这两个自定义阻塞队列,来突出你对阻塞队列丰富的使用经验,这两队列源码可以自行搜索
MemorySafeLinkedBlockingQueue & VariableLinkedBlockingQueue 队列实现
线程池拒绝策略有哪些?适用场景是怎么样的?
当阻塞队列已满并且达到最大线程数时,再提交任务会走拒绝策略流程,JDK 提供了拒绝策略顶层接口 RejectedExecutionHandler,所有拒绝策略都需要继承该接口,JDK 内置了四种拒绝策略。
1)AbortPolicy:线程池默认的拒绝策略,触发时会抛出 RejectedExecutionException 异常。如果是一些比较重要的业务,可以使用该拒绝策略,在系统不能进一步支持更大并发量的情况下通过抛出异常及时发现问题并进行处理;
2)CallerRunsPolicy:在线程池没关闭的情况下,由调用者线程去处理任务,反之直接丢弃。此拒绝策略追求任务都能被执行,不丢失,比较适合并发量不大并且不允许丢失任务的场景场景,性能较低;
3)DiscardPolicy:丢弃任务,不抛出异常,一般无感知。建议一些无关紧要的任务可以使用此策略;
4)DiscardOldestPolicy:丢弃队列中最老的任务,然后重新提交被拒绝的任务。需要根据业务场景进行选择是否要用。
3、4 这两种拒绝策略都在会在无感知的情况下丢弃任务,需要根据业务场景决定是否要使用。
也可以根据自己需要自定义拒绝策略,比如 Dubbo 定义了拒绝策略 AbortPolicyWithReport,在抛出异常前会先进行线程堆栈信息的打印。
在使用线程池的过程中遇到过哪些坑或者需要注意的地方?
1)OOM 问题:
刚开始使用线程都是通过 Executors 创建的,前面说了,这种方式创建的线程池会有发生 OOM 的风险,可以举例说明;
2)任务执行异常丢失问题:
可以通过下述4种方式解决
在任务代码中增加 try、catch 异常处理;
如果使用的 Future 方式,则可通过 Future 对象的 get 方法接收抛出的异常;
为工作线程设置 setUncaughtExceptionHandler,在 uncaughtException 方法中处理异常;
可以重写 afterExecute(Runnable r, Throwable t) 方法,拿到异常 t。
3)共享线程池问题:
整个服务共享一个全局线程池,导致任务相互影响,耗时长的任务占满资源,短耗时任务得不到执行。同时父子线程间会导致死锁的发生,进而导致 OOM。
4)跟 ThreadLocal 配合使用,导致脏数据问题:
我们知道 Tomcat 利用线程池来处理收到的请求,会复用线程,如果我们代码中用到了 ThreadLocal,在请求处理完后没有去 remove,那每个请求就有可能获取到之前请求遗留的脏值。
5)ThreadLocal 在线程池场景下会失效,可以考虑用阿里开源的 TTL 来解决。
6)需要自定义线程工厂指定线程名称,不然发生问题都不知道咋定位。