线程池的基本参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
主要是
核心线程数:可以一直存活的工作线程
最大线程数:当线程队列满的时候增加线程数,超过核心线程数的新增的为非核心线程,超过固定时长没任务执行就进行销毁
非核心线程的空闲时长:非核心线程超过多久进行销毁
工作队列:当核心线程都在工作的时候,新来的任务放入工作队列,等待执行
拒绝策略:核心队列满了,最大线程数也达到了,新来的任务执行拒绝策略
还有可以自定义ThreadFactory,控制线程的创建,这里没特殊需求,使用的是默认的线程工厂
线程池的拒绝策略
可以自定义线程的拒绝策略,默认提供拒绝策略主要分为四种
- CallerRunsPolicy让主线程进行执行
- AbortPolicy拒绝任务,抛出异常RejectedExecutionException
- DiscardPolicy 直接丢掉这个任务
- DiscardOldestPolicy 直接丢掉最早的任务
一般情况下可以满足使用,如有特殊情况,可以进行自定义拒绝策略,实现接口RejectedExecutionHandler
线程池执行过程
public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 这里的c 是定义的AtomicInteger的一个成员变量,使用29位来表示线程数,剩下3位表示线程池的//状态,所以最大支持的线程数(2^29)-1int c = ctl.get();// 如果当前线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {// 执行添加工作线程if (addWorker(command, true))return;c = ctl.get();}// 这里经过上面,证明没有添加核心线程,如果线程池在运行状态,添加到工作队列里面if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);// 这里判断如果工作线程数为0(因为核心线程数允许为0),是要添加一个线程的,else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果没添加到队列,应该队列满了,直接添加非核心线程,如果添加失败走拒绝策略else if (!addWorker(command, false))reject(command);}
线程池的几种状态
RUNNING: 接受新任务并处理排队的任务
SHUTDOWN: 不接受新任务,但是处理队列里面的人物
STOP: 不接受新任务,不处理排队的任务,并中断正在进行的任务
TIDYING: 所有任务都已终止,workerCount为零,转换到状态TIDYING的线程将运行terminated()
TERMINATED: terminated()钩子方法完成时
分析下addWorker方法做了什么
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {// 判断能不能添加线程,是不是超过设置的核心线程,最大线程数if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;// 增加线程数量,跳出循环if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl// 继续添加if (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded = true;int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {mainLock.unlock();}if (workerAdded) {// 启动线程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
简单总结下,判断下添加工作线程,线程池是不是运行状态,是不是达到了设置的核心线程和最大线程限制,不符合条件的直接返回添加失败,符合条件的进行添加,如果添加成功,跳出循环,没添加成功就继续循环cas添加。
检查通过并且增加数量之后,开始实际创建Worker,然后归入workers进行管理,处理完之后调用线程的start方法,线程的start方法最后会调用jni新开线程,执行到run()方法,那么就去看下run方法的执行逻辑,run里面比较简单,直接调用的 runWorker(this);
runWoker 是怎么执行的
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 获取task,如果fisrtTask为空,就执行getTask进行获取while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 判断下线程池状态和线程状态,是不是进行了打断操作if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行任务之前的扩展操作,beforeExecute(wt, task);try // 执行任务task.run();// 任务执行之后的扩展操作afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
总结下就是循环获取任务进行执行,执行判断下线程池的状态和线程的状态,然后可以执行下扩展的任务执行前后需要执行的操作,发生异常执行worker退出操作
核心线程和非核心线程是怎么区分管理的
这个主要就在getTask方法的管理了,task究竟是怎么进行获取的呢
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?// 死循环获取任务for (;;) {int c = ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?// 判断是不是有超时时间,核心线程是不是有超时时间boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 判断线程是不是具备销毁条件,超过最大线程数,或者具备超时时间并且超过的// 减去线程数if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 这里就是区分是不是需要超时时间的,主要区分于核心线程和非核心线程,// 常驻的线程直接就是take()死等// 需要具备超时时间的线程就使用workQueue的poll来设置超时时间,如果超时了// 下次循环判断出来就执行上面的减去线程的操作了Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
总结下,就是线程是不是常驻线程,会不会超时,怎么进行控制的,主要就是通过getTask里面进行区别对待的,使用工作队列的take()和poll()来进行区别设置超时时间,超时的再执行消除,然后返回的task就是null,由上层执行销毁线程的操作