一 线程池基础和使用
1.1 什么是线程池
“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控。
1.2 线程池业务场景
在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。
那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?
这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。那么什么时候适合使用线程池呢?
● 单个任务处理时间比较短
● 需要处理的任务数量很大
1.3 线程池的优势
● 重用存在的线程,减少线程创建,销毁的开销,提高性能。
● 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
● 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
1.4 Java线程池架构
Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。下边是它的类继承关系图:
从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为:
● execute(Runnable command):履行Ruannable类型的任务。
● submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象.
● shutdown():在完成已提交的任务后封闭办事,不再接管新任务.
● shutdownNow():停止所有正在履行的任务并封闭办事。
● isTerminated():测试是否所有任务都履行完毕了
● isShutdown():测试该ExecutorService是否已被关闭
在Executor众多实现类中,我们在实际项目中使用的是ThreadPoolExecutor自定义创建线程池,我们的源码也是基于ThreadPoolExecutor来分析的,所以我们需要了解ThreadPoolExecutor类中重要的属性:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;
ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
ctj变量操作的相关方法:
● runStateOf:获取运行状态。
● workerCountOf:获取活动线程数。
● ctlOf:获取运行状态和活动线程数的值。
除了线程的状态,我们的线程池也有5种状态:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
● RNNING:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
● SHUTDOWN:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务.
调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
● STOP:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。当调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
● TIDYING:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重载terminated()函数来实现。当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的
任务为空时,就会由STOP -> TIDYING。
● TERMINATED:线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
进入TERMINATED的条件如下:
○ 线程池不是RUNNING状态。
○ 线程池状态不是TIDYING状态或TERMINATED状态。
○ 如果线程池状态是SHUTDOWN并且workerQueue为空。
○ workerCount为0。
○ 设置TIDYING状态成功。
1.5 线程池的创建和任务提交
线程创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
对创建线程的构造方法的七个参数分别讲解:
● corePoolSize:核心线程数,刚开始提交任务的时候,线程池都会创建一个工作线程执行提交的任务,直到达到核心线程数。
● maximumPoolSize:最大线程数,当我们的核心线程数达到最大值,且阻塞队列也满的时候,会创建一些救急线程来执行后边提交的任务,救急线程数最大值=最大线程数-核心线程数量。
● keepAliveTime:当救急线程执行完任务且没有新的任务的时候,这些救急线程就成为空闲线程,会被线程池回收掉,keepAliveTime就是这些救急线程的最大空闲时间。
● unit:救急线程最大空闲时间单位。
● workQueue:阻塞队列,存放我们任务的队列。
● threadFactory:线程工厂,指定线程池创建线程的方式,一般用于更改线程名称。
● handler:拒绝策略,当我们的核心线程达到最大值,队列达到最大值,救急线程达到最大值,这时候没有多余的线程和空闲的地方处理任务,就会触发拒绝策略。
Jdk自带的拒绝策略:
● AbortPolicy:直接抛出异常,默认策略;
● CallerRunsPolicy:用调用者所在的线程来执行任务;
● DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
● DiscardPolicy:直接丢弃任务
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义拒绝策略,如
记录日志或持久化存储不能处理的任务。
线程监控相关的方法:
public long getTaskCount();
public long getCompletedTaskCount();
public int getPoolSize();
public int getActiveCount();
● getTaskCount():线程池已执行与未执行的任务总数。
● getCompletedTaskCount():已完成的任务数。
● getPoolSize():线程池当前的线程数。
● getActiveCount():线程池中正在执行任务的线程数量
线程提交
线程提交的方法有两种:
public void execute(Runnable command);
public Future submit(Callable task)
public Future submit(Runnable task, T result)
● execute(Runnable command):提交一个Runnable类型的任务
● submit:提交一个Callable或者Runnable类型的任务。
二 线程池原理分析
线程池的原理图如下:
接下来我们从execute方法提交任务开始,一步步跟源码分析线程池是如何执行的。
execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//1.获取线程池状态变量
int c = ctl.get();
//2.查询工作线程数量是否小于核心线程数量
if (workerCountOf© < corePoolSize) {
//调用addWorker方法,成功直接return
if (addWorker(command, true))
return;
c = ctl.get();
}
//3.线程池是Running状态且任务添加到队列成功
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
//3.1如果线程池不是运行状态,把提交的任务移除队列
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
//3.2如果线程池中核心线程数为0,创建一个work,但是任务已经加到队列中了,所以这里 的addWorker传入的任务为null,会在阻塞队列中获取任务。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//4.如果addWorker执行失败,说明队列满了,线程也达到了最大线程数量,则执行拒绝策 略。
else if (!addWorker(command, false))
reject(command);
}
大概流程:
1.如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
2.如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
3.如果workerCount >= corePoolSize&&workerCount<maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个救急线程来执行新提交的任务;
4.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
5.要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
exectue方法执行流程:
分析exectue方法的时候,可以看到很多地方调用了addWorker方法,接着我们就来分析addWorker方法都做了什么:
addWorker方法:
//firstTask 我们要提交的任务
//core true代表以核心线程为准 false代表以最大线程数为准
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//循环
for (;😉 {
int c = ctl.get();
//获取线程池运行状态
int rs = runStateOf©;
//四个判断//rs >= SHUTDOW 代表不再接受任务//rs == SHUTDOWN 表示线程池已经关闭,不在接受新的任务,但是会处理队列中的 任务//firstTask == null 当前work绑定的第一个任务为空//workQueue.isEmpty() 队列为空 这三个添加一起满足就说明不需要再创建线程 处理任务if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//又一个循环for (;;) {//获取活跃的线程数量int wc = workerCountOf(c);//如果线程数量超过阈值,或者达到核心线程数或最大线程数if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))//直接返回falsereturn false;//cas对workCount加1if (compareAndIncrementWorkerCount(c))break retry;//cas失败,重新获取ctlc = ctl.get(); // Re-read ctl//如果ctl变化if (runStateOf(c) != rs)//跳出内存循环,重新判断continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建worker对象绑定第一个任务w = new Worker(firstTask);//获取worker绑定的线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//加锁,这里要操作共享变量mainLock.lock();try {//重新获取线程池运行状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//添加到worker集合workers.add(w);int s = workers.size();//largestPoolSize是线程池中出现过的最大线程数量if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {//解锁mainLock.unlock();}if (workerAdded) {//启动线程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
在这里我们先看下worker类是如何绑定任务和线程的
worker类:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
//绑定的线程final Thread thread;//绑定的第一个任务Runnable firstTask; volatile long completedTasks;//构造方法Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}
}
可以看出当我们new一个worker对象的时候,就会创建一个线程并且把任务绑定到firstTask属性中,而且Worker继承了AQS基类,这里可以留个疑问,Worker为什么要继承AQS同步器?
因为Worker实现了Runnable接口,且在构造方法中把自己传给了thread类中,那么thread调用start方法之后一定会调用worker方法的run方法:
worker.run():
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//拿到第一个任务
Runnable task = w.firstTask;
//第一个任务置为空
w.firstTask = null;
//先释放下锁
w.unlock();
boolean completedAbruptly = true;
try {
//循环,这里是线程复用的地方,如果第一个任务不为空或者从阻塞队列中可以拿到任务
while (task != null || (task = getTask()) != null) {
//加锁
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任务执行之前做一些操作,空实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行之后的一些操作,空实现
afterExecute(task, thrown);
}
} finally {
//任务执行完毕,task置为空
task = null;
//当前worker完成任务数量=1
w.completedTasks++;
//解锁
w.unlock();
}
}
//异常标志置位false
completedAbruptly = false;
} finally {
//线程任务执行完毕后的处理
processWorkerExit(w, completedAbruptly);
}
}
1.在这里会直接调用w.unlock(),先释放锁,至于原因后边在分析
2.beforeExecute,是线程池保留的扩展方法,在执行任务前可以做一些扩展操作。
3.执行任务,调用任务的run方法。
4.afterExecute,是线程池保留的扩展方法,在任务执行完毕后可以做一些扩展操作。
5.任务执行完毕后,把当前任务置为null,任务完成数量自增1,然后解锁
6.在最后,做一些任务完成之后的清理工作
在这里也有两个重要方法:getTask()和processWorkerExit()方法
getTask():从阻塞队列获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//死循环
for (;😉 {
int c = ctl.get();
int rs = runStateOf©;
//获取活跃线程数量int wc = workerCountOf(c);//清理救急线程判断标志boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//这里就是阻塞队列为空,不需要那么多线程,把多于核心线程数量的线程销毁if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//这里就是救急线程到时情况的原理,根据阻塞队列的poll方法,如果阻塞队列为空,那么等到超时时间后就不会阻塞。Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//走到这里说明救急线程空闲时间达到最大值timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
processWorkerExit():主要做一些任务执行完后的其他工作
completedAbruptly:当线程没有可执行的任务就会为true
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn’t adjusted
//线程执行任务出现异常则活跃线程数量-1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//统计worker完成数量completedTaskCount += w.completedTasks;//集合中清除当前worker,也就是队列中减少1个活跃线程workers.remove(w);} finally {mainLock.unlock();}//根据线程池状态判断是否关闭线程池tryTerminate();int c = ctl.get();//如果当前线程是RUNNING或SHUTDOWN状态if (runStateLessThan(c, STOP)) {//如果任务执行抛出异常if (!completedAbruptly) {//allowCoreThreadTimeOut=true且工作队列有任务,至少保留1个worker 执行任务int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}
processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
在这里再研究下worker这个类,因为他是我们线程池的核心
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
public void lock() { acquire(1); }
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;
}return false;}
Worker类继承AQS的主要目的:
● lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
● 如果正在执行任务,则不应该中断线程;
● 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
● 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
● 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?
是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。