3.3.5 ThreadPoolExecutor的Worker工作线程
Worker对象主要包含了两个内容
● 工作线程要执行任务
● 工作线程可能会被中断,控制中断
// Worker继承了AQS,目的就是为了控制工作线程的中断。
// Worker实现了Runnable,内部的Thread对象,在执行start时,必然要执行Worker中断额一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
// =======================Worker管理任务================================
// 线程工厂构建的线程
final Thread thread;// 当前Worker要执行的任务
Runnable firstTask;
// 记录当前工作线程处理了多少个任务。
volatile long completedTasks;
// 有参构造
Worker(Runnable firstTask) {
// 将State设置为-1,代表当前不允许中断线程
setState(-1);
// 任务赋值
this.firstTask = firstTask;
// 基于线程工作构建Thread,并且传入的Runnable是Worker
this.thread = getThreadFactory().newThread(this);
}
// 当thread执行start方法时,调用的是Worker的run方法,
public void run() {
// 任务执行时,执行的是runWorker方法
runWorker(this);
}
// =======================Worker管理中断================================
// 当前方法是中断工作线程时,执行的方法
void interruptIfStarted() {Thread t;
// 只有Worker中的state >= 0的时候,可以中断工作线程
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 如果状态正常,并且线程未中断,这边就中断线程
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
3.3.6 ThreadPoolExecutor的runWorker方法
runWorker就是让工作线程拿到任务去执行即可。
并且在内部也处理了在工作线程正常结束和异常结束时的处理方案
// 工作线程启动后执行的任务。
final void runWorker(Worker w) {
// 拿到当前线程
Thread wt = Thread.currentThread();
// 从worker对象中拿到任务
Runnable task = w.firstTask;
// 将Worker中的firstTask置位空
w.firstTask = null;
// 将Worker中的state置位0,代表当前线程可以中断的
w.unlock(); // allow interrupts
// 判断工作线程是否是异常结束,默认就是异常结束
boolean completedAbruptly = true;
try {
// 获取任务// 直接拿到第一个任务去执行
// 如果第一个任务为null,去阻塞队列中获取任务
while (task != null || (task = getTask()) != null) {
// 执行了Worker的lock方法,当前在lock时,shutdown操作不能中断当前线程,因为当前线程正在处理任务
w.lock();
// 比较ctl >= STOP,如果满足找个状态,说明线程池已经到了STOP状态甚至已经要凉凉了
// 线程池到STOP状态,并且当前线程还没有中断,确保线程是中断的,进到if内部执行中断方法
// if(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()) {中断线程}
// 如果线程池状态不是STOP,确保线程不是中断的。
// 如果发现线程中断标记位是true了,再次查看线程池状态是大于STOP了,再次中断线程
// 这里其实就是做了一个事情,如果线程池状态 >= STOP,确保线程中断了。
if (
(
runStateAtLeast(ctl.get(), STOP) ||
( Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) )
)
&& !wt.isInterrupted())
wt.interrupt();
try {
// 勾子函数在线程池中没有做任何的实现,如果需要在线程池执行任务前后做一些额外的处理,可以重写勾子函数
// 前置勾子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务。
task.run();
} catch (RuntimeException x) {thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 前后置勾子函数
afterExecute(task, thrown);
}
} finally {
// 任务执行完,丢掉任务
task = null;
// 当前工作线程处理的任务数+1
w.completedTasks++;
// 执行unlock方法,此时shutdown方法才可以中断当前线程
w.unlock();
}
}
// 如果while循环结束,正常走到这,说明是正常结束
// 正常结束的话,在getTask中就会做一个额外的处理,将ctl - 1,代表工作线程没一个。
completedAbruptly = false;
} finally {
// 考虑干掉工作线程
processWorkerExit(w, completedAbruptly);
}
}
// 工作线程结束前,要执行当前方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是异常结束
if (completedAbruptly)
// 将ctl - 1,扣掉一个工作线程
decrementWorkerCount();
// 操作Worker,为了线程安全,加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当前工作线程处理的任务个数累加到线程池处理任务的个数属性中
completedTaskCount += w.completedTasks;
// 将工作线程从hashSet中移除
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 只要工作线程凉了,查看是不是线程池状态改变了。
tryTerminate();
// 获取ctl
int c = ctl.get();
// 判断线程池状态,当前线程池要么是RUNNING,要么是SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 如果正常结束工作线程if (!completedAbruptly) {
// 如果核心线程允许超时,min = 0,否则就是核心线程个数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min == 0,可能会出现没有工作线程,并且阻塞队列有任务没有线程处理
if (min == 0 && ! workQueue.isEmpty())
// 至少要有一个工作线程处理阻塞队列任务
min = 1;
// 如果工作线程个数 大于等于1,不怕没线程处理,正常return
if (workerCountOf(c) >= min)
return;
}
// 异常结束,为了避免出现问题,添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程
addWorker(null, false);
}
}
3.3.7 ThreadPoolExecutor的getTask方法
工作线程在去阻塞队列获取任务前,要先查看线程池状态
如果状态没问题,去阻塞队列take或者是poll任务
第二个循环时,不但要判断线程池状态,还要判断当前工作线程是否可以被干掉
// 当前方法就在阻塞队列中获取任务
// 前面半部分是判断当前工作线程是否可以返回null,结束。
// 后半部分就是从阻塞队列中拿任务
private Runnable getTask() {
// timeOut默认值是false。
boolean timedOut = false;
// 死循环
for (;;) {
// 拿到ctl
int c = ctl.get();
// 拿到线程池的状态
int rs = runStateOf(c);
// 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null
// 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null
if (rs >= SHUTDOWN &&
(rs >= STOP || workQueue.isEmpty())) {
// 如果可以返回null,先扣减工作线程个数
decrementWorkerCount();
// 返回null,结束runWorker的while循环
return null;
}
// 基于ctl拿到工作线程个数
int wc = workerCountOf(c);
// 核心线程允许超时,timed为true
// 工作线程个数大于核心线程数,timed为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if (
// 如果工作线程个数,大于最大线程数。(一般情况不会满足),把他看成false
// 第二个判断代表,只要工作线程数小于等于核心线程数,必然为false
// 即便工作线程个数大于核心线程数了,此时第一次循环也不会为true,因为timedOut默认值是false
// 考虑第二次循环了,因为循环内部必然有修改timeOut的位置
(wc > maximumPoolSize || (timed && timedOut))
&&
// 要么工作线程还有,要么阻塞队列为空,并且满足上述条件后,工作线程才会走到if内部,结束工作线程
(wc > 1 || workQueue.isEmpty())
) {
// 第二次循环才有可能到这。
// 正常结束,工作线程 - 1,因为是CAS操作,如果失败了,重新走for循环
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 工作线程从阻塞队列拿任务
try {
// 如果是核心线程,timed是false,如果是非核心线程,timed就是true
Runnable r = timed ?
// 如果是非核心,走poll方法,拿任务,等待一会
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 如果是核心,走take方法,死等。
workQueue.take();
// 从阻塞队列拿到的任务不为null,这边就正常返回任务,去执行if (r != null)
return r;
// 说明当前线程没拿到任务,将timeOut设置为true,在上面就可以返回null退出了。
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}