解密Java线程池源码

一、线程池中的保活和回收源码分析

1、线程池中线程的创建时机

1、核心线程创建时机

在研究线程池的源码前首先想一个问题

public class Main {public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0l, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());executor.execute(new Runnable() {@Overridepublic void run() {System.out.println("业务流程");throw new NullPointerException();}});executor.shutdown();executor.shutdownNow();}}

在以上代码中,当程序执行完成创建线程池实例,并且还没有走到 .execute()方法时,此时线程池中有多少个线程呢?是10个,20个还是0个呢?

答案是0个

也就是说线程池中创建的线程是类似懒加载的机制,在被实际调用前是不会创建实际线程的。

那么是怎么实现的懒加载机制呢?

咱们进入.execute()方法内部一探究竟

public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

原来.execute()方法内部会进行一个判断当前的工作线程数是否小于核心线程数,如果小于核心线程数就会在.addWorker()方法中创建一个新的线程,并将任务交给新的线程来执行

2、最大线程创建时机

在线程池的执行方法 .execute() 中

int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);

可以发现,jdk的任务执行逻辑是,首先判断线程池工作中的任务数是否大于核心线程数,如果大于核心线程数,就会判断阻塞任务队列能否存入新的任务(阻塞队列是否达到上限),如果队列被填满了最后才会继续增加新的线程直到线程数达到设置的最大线程数。如果还有新的任务进来就会通过执置的拒绝策略来拒绝任务。

即任务在jdk的线程池存放的优先级是

核心线程-->阻塞队列-->最大线程

Dubbo的线程池逻辑就有一些区别,在Dubbo的线程池中,当线程池工作的任务数大于核心线程数后,会优先创建新的线程直到达到设置的最大线程数,最后才会把多余的任务存入阻塞队列中。

即任务在Dubbo的线程池存放的优先级是

核心线程-->最大线程-->阻塞队列

2、线程池中的线程是如何保活的源码分析

现在我们知道了线程池中的线程是何时被创建的,那么被创建的线程是如何运行的呢?

或者说在没有任务需要线程池中的线程处理时,此时线程池中的线程在做什么呢?

答案当然是会阻塞

那么线程池是如何阻塞池中的空闲线程的呢?

其实在我们创建ThreadPoolExecutor()实例时就已经设置了,那就是我们传入的 BlockingQueue<Runnable> workQueue 这个阻塞队列

现在线程池是在哪个地方如何使用这个阻塞队列的呢?

进入Worker的 .runWorker()方法,

   final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

首先会从Worker中取出待执行的任务 firstTask,然后将firstTask置为空,接下来就是一个没有跳出的while循环,首先判断从firstTask取出的任务是否为空,如果不是空就会执行具体的任务,然后会通过.getTask()方法继续获取任务。

然后进入.getTask()方法

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 从阻塞队列之获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

在该方法中就可以看到线程池是从工作队列(就是咱们创建线程池时传入的阻塞队列)中获取任务

Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();

这两种获取方式都是阻塞式的获取,区别就是一个是有等待时间的获取,另一个是没有等待时间的获取。

此时又会有一个疑问,那就是线程池中的线程如果没有从阻塞队列中获取任务,应该一直处于等待状态才对,为什么还要有一个这种带有等待时间获取任务的方式呢?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)        

这里就要提到创建线程池时,传入的最大线程数(maximumPoolSize)了,我们知道当任务数大于核心线程数时,线程池就会继续创建线程直到达到设置的最大线程数。但是超过核心线程数的线程是不应该保活的,也就是说在任务少于核心线程数时,线程池不应该让超过核心线程数的线程继续阻塞,而是应该让他们在没有任务的情况下阻塞一段时间后退出。所以就需要这种带有等待时间的阻塞获取了

.getTask()方法中设置的进入 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)  的时机

// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

只看 || 后的判断也可以看出 当工作线程数大于核心线程数时timed就会为true,此时就会就会走workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)  方法了

然后 程序继续往下走,此时因为r(需要执行的任务)为null就不会return null任务了,紧接着将tiemOut设置为 true,然后在下一次循环中 当程序走到这个判断时

if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}

就会直接返回null,此时.runWorker()内的 while 判断就会不成立从而跳出循环,然后就会执行finally块中的  processWorkerExit(w, completedAbruptly);方法

finally {processWorkerExit(w, completedAbruptly);
}

 该方法的作用就是退出多余的线程,进入 processWorkerExit/() 方法内部

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)// 如果线程池中的当前线程数大于核心线程数 则直接返回不会在线程池中增加新的线程return; // replacement not needed}// 在线程池中增加新的线程addWorker(null, false);}}

可以看出 原来线程池会将线程池中的原有的线程全部退出,不过会在线程退出前进行判断,如果当前活跃线程数大于等于核心线程数那就直接退出,当小于核心线程数时,会启动一个新的线程进入线程池中。

二、线程池中5大状态变化流程源码分析

1、线程池的5大状态

我们知道线程池定义了线程的5大状态,分别是

  • RUNNING(= -1 << COUNT_BITS)
  • SHUTDOWN(= 0 << COUNT_BITS)
  • STOP(= 1 << COUNT_BITS)
  • TIDYING(=2 << COUNT_BITS)
  • TERMINATED(= 3 << COUNT_BITS)

5大状态的转换流程如图所示

我们知道  .shutdownNow() 和  .shutdown() 方法 都会退出线程池中的线程,区别就是  .shutdown() 会将线程池的状态设置为SHUTDOWN 先退出没有任务的线程,那些正在执行任务的线程会等他们执行完任务在退出。即SHUTDOW状态不会影响正在执行的任务。

而  .shutdownNow()会将线程池的状态设置为STOP 会退出所有线程,不管有没有正在执行的任务。

那么Java线程池是如何实现的呢?


2、.interrupt() 方法是如何打断线程的

在讲Java线程池是如何停止线程之前,需要先了解 .interrupt() 方法。

我们知道Java提供了一个用于打断线程的api  .interrupt() 方法,那么 .interrupt()到底是如何打断线程的呢?

现在有如下demo,3秒后t1线程会成功被打断吗?

Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {while (true){System.out.println("t1线程运行中...");}}});System.out.println("启动线程");t1.start();// 主线程休眠3秒后,调用打断操作Thread.sleep(3000);// 打断t1.interrupt();System.out.println("===中断了t1线程===");

通过控制台的打印结果可以发现在主线程调用了t1线程的打断操作后,t1线程并没有预想的那样被打断。那么为什么会出现这样的情况呢?

进入 .interrupt() 方法

  public void interrupt() {if (this != Thread.currentThread()) {checkAccess();// thread may be blocked in an I/O operationsynchronized (interruptLock) {Interruptible b = nioBlocker;if (b != null) {interrupted = true;interrupt0();  // inform VM of interruptb.interrupt(this);return;}}}interrupted = true;interrupt0();  // inform VM of interrupt}

可以看出其实 .interrupt() 方法只是把线程内一个名叫  interrupted 的变量标志置为了true,并不会真的去打断线程。

而我们想通过 .interrupt() 方法来打断线程就需要我们自己在线程实例中获取 interrupted 变量的状态,然后进行我们想要打断的操作。

将上面的代码进行修改,增加一个判断线程是否被打断,如果已被打断则结束循环。

Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {while (true){System.out.println("t1线程运行中...");if (Thread.interrupted()) {System.out.println("t1线程被打断了");break;}}}});System.out.println("启动线程");t1.start();// 主线程休眠3秒后,调用打断操作Thread.sleep(3000);// 打断t1.interrupt();System.out.println("===中断了t1线程===");

运行后可以发现,线程确实被终止了。

现在有了对 .interrupt() 方法的了解,可以继续线程池状态的研究了

3、STOP 和SHUTDOWN状态

前面提到过使用 .shutdownNow()会将线程池的状态设置为STOP 退出所有线程,不管线程有没有正在执行任务。

他是怎么实现的呢?进入 .shutdownNow()的源码

 public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}

只看关键代码,可以发现 会执行  interruptWorkers() 方法,接着进入 interruptWorkers() 方法

private void interruptWorkers() {// assert mainLock.isHeldByCurrentThread();for (Worker w : workers)w.interruptIfStarted();}

这里线程池会遍历所有工作线程(这里要注意workers的大小并不是静态地等于核心线程数,因为还有最大线程数会影响他的大小,但不会大于设置的最大线程数)。

进入 worker 的 interruptIfStarted() 方法

void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}

现在就找到了最终咱们想要的,其实 shutdownNow() 方法最终就是让每个工作线程都进行打断操作。然后由工作线程判断打断标记,进行后续的操作。

那么工作线程是在什么地方查看的打断标记呢?

回到  runWorker() 方法,前面提到过runWorker() 方法会一直循环调用 getTask() 从中获取新的任务执行。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

咱们主要看从阻塞队列中获取任务时的代码

 try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}

在从阻塞队列中获取任务时如果线程被打断,就会捕获打断异常,但并不会立刻退出循环,而是将 timeOut设置为 false,等待下次循环时判断线程池的状态是否为 STOP或者SHUTDOWN 如果是的话将会 return null 。

接下来runWorker就会退出 while 走到 finally 块的 processWorkerExit()方法

 private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();// 判断 当前线程池状态值是否小于 STOPif (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}

可以看到有一个runStateLessThan() 方法判断当前状态是否小于STOP

private static boolean runStateLessThan(int c, int s) {return c < s;}

如果大于等于STOP就直接退出不会执行后面的 addWorker(null, false) 重新添加线程的操作了。

这样空闲中的阻塞线程就都退出了。

那么正在执行任务的线程又是如何退出的呢?

其实  .shutdownNow(); 方法只将所有的线程的打断标记都设置为了true。给了我们正在执行任务一个判断打断标记的机会。具体是否真的要打断正在执行任务的线程决定权在我们,是否在任务代码中有判断打断标记。

.shutdown() 方法 只打断空闲线程,并将线程池状态设置为SHUTDOWN状态。

 public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将线程池状态设置为SHUTDOWNadvanceRunState(SHUTDOWN);// 打断空闲线程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}

已经提交到线程池中的任务(包括阻塞队列中的任务)会继续执行,直到所有任务都完成。

在  getTask() 的代码片段中会判断线程池当前状态是否为SHUTDOWN状态并且阻塞队列为空

            if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}

此时就会将工作线程数量减一,并返回null ,这样在积存的任务都执行完后线程池中的线程就会跳出  runWorker(Worker w) while 循环。走到  processWorkerExit(w, completedAbruptly);

        try {while (task != null || (task = getTask()) != null) {// 循环执行线程池中的任务}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}

之前讲 STOP 状态时有提到该方法,他会在线程池状态大于等于STOP就直接退出不会执行后面的 addWorker(null, false) 重新添加线程的操作了。

也就是说当状态是SHUTDOWN 时程序依然会进入addWorker(null, false) ,进入addWorker(null, false) 可以看到在刚开始就会判断线程池当前状态是否为SHUTDOWN状态并且阻塞队列为空

那此时判断当然成立,也就不会走接下来的创建新的线程部分的代码了。线程就会一个一个退出,然后关闭。

4、TIDYING和TERMINATED状态

当工作线程清零以后线程池状态就会变为 TIDYING 我们以 shutdown()方法为例,进入 shutdown()方法

  public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}

可以看到在将线程池状态设置为SHUTDOWN 并打断空闲工作线程后,会执行 onShutdown() 方法,进入onShutdown() 会发现在ThreadPoolExecutor中是一个空方法。作用是留给子类增强的预留钩子函数(类似生命周期),它的子类ScheduledThreadPoolExecutor 就通过这个钩子来取消不应运行的任务队列。

接下来就走到了 tryTerminate() 该方法就是用来改变线程池状态为TIDYING的,进入 tryTerminate()

final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();container.close();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}

可以看到程序会先判断任务是否都执行完成并且线程池中的线程是否都退出了,做完前面的准备工作,就会将线程池状态置为TIDYING,紧接着就会执行  terminated() 方法作用与onShutdown() 相同都是留给子类增强的预留钩子函数(类似生命周期)。最后将线程池状态设置为TERMINATED。到这里线程池的使命完成了。

三、线程池ctl状态控制源码分析

前面提到了线程池通过 AtomicInteger 类型的 ctl  来存储线程池状态和线程池中的线程数量。

线程池是如何将两个数据都存在  ctl  中的呢,先看一下Java线程池中的5大状态是如何表示的

private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

我们知道Java中 Integer 的大小占4个字节即32位,所以 COUNT_BITS = 29,

将-1~3左移29位后,得到最终各个状态对应的二进制数就是

RUNNING:            11100000 00000000 00000000 00000000

SHUTDOWN:       00000000 00000000 00000000 00000000

STOP:                  00100000 00000000 00000000 00000000

TIDYING:             01000000 00000000 00000000 00000000

TERMINATED:    01100000 00000000 00000000 00000000

(可能有人会有疑问,为什么-1左移29位后是前3位是111(补码),而不是001(原码)或者110(反码)。那是因为在Java中数字的二进制是通过补码表示的。至于原码、反码和补码之间的关系,不理解的同学可以看附录)

这样Java线程池就使用ctl  的高三位来表示线程池状态。而线程池中的线程数量就由后面的29位来表示(在这里也可以看出Java线程池的最大线程数为2^29(2的29次方)个),就这样Java线程池就通过一个ctl  变量同时存储了线程池状态和线程池中线程数这两个参数。

四、线程池新增和关闭流程源码分析

1、增加线程时的并发问题

假设我们设置的Java线程池核心线程数为10,线程池中已经有了9个线程,此时如果同时向线程池中增加两个任务,线程池是如何保证线程数依然为10个的呢?

进入 .execute() 方法

这一块并没有做线程安全的考虑,在并发的情况下线程都会进入 addWorker(command, true) 

可以看到,addWorker(command, true) 通过compareAndIncrementWorkerCount(c) cas原子操作来保证现程安全,只有一个线程能够正常跳出循环新增线程

 for (;;) { // 自旋if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c)) // CASbreak retry;c = ctl.get();  // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}

这一段代码可以说是JUC非常经典 使用(这样好的代码值得好好体会),不管是 AtomicInteger 还是ConcurrentHashMap 都是使用这种自旋 CAS 的方式来处理的并发(可以说在看线程池的源码过程中能够深刻体会 juc的精髓 ,在工作中要能想到使用自旋 CAS 的方式,而不是只能想到使用 synchronized和ReentrantLock()来处理并发问题)

而其他的线程就会在下一次循环时返回false,将任务放入阻塞队列中。

如果队列也满了,就会走下一个判断,下一个判断又会调用addWorker() 方法,不过此时的core参数是false。当corefalse时,判断的就是最大线程数了,

线程池就可以继续增加线程,直到线程数等于最大线程数(当然在当前案例中队列是不会满的,因为使用的是 LinkedBlockingQueue 一个链表 形式的队列,是不会满的无界队列。如果使用的是有界队列,才会存在队列满了的情况)

2、新增线程

2.1、新增线程流程

前面我们讲到了新增线程的并发问题,解决了并发问题,接下来就到了增加线程的时候了。

找到addWorker 中增加线程的部分,这部分的主要逻辑很好理解

第一步,创建Worker对象

第二步,将worker存入池中

第三步,启动worker中的线程

        boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded = true;int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {mainLock.unlock();}if (workerAdded) {container.start(t);workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;

首先创建Worker对象,将 firstTask(就是当前待执行的任务)传入Worker 中,

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}

首先设置AQS状态为-1,禁止中断,直到执行 runWorker 方法,然后将待执行的任务赋给Worker自己的成员变量firstTask中,最后就是通过线程工长创建一个线程,但是这里可以看到,传入的并不是firstTask而是Worker本身,在Worker的类定义上也可以发现其实Worker本身也是一个Runnable。

那为什么不直接传firstTask而是传Worker本身呢?

 因为咱们要使用Worker中定义的runWorker 方法,我们知道Thread调用.start()方法后,会运行Runnable中的run()方法,而Worker的run()方法会调用runWorker 方法,然后又将本身传入,

   public void run() {runWorker(this);}

这样就执行runWorker 方法中的逻辑了。

runWorker 方法中去执行任务了,可以看到在真正执行任务之前,还调用了 beforeExecute(wt, task)

在 ThreadPoolExecutor beforeExecute(wt, task)是一个空方法,他的作用就是就类似Aop(面向切面),因为但凡要执行任务之前都要执行这个方法,我们就可以通过继承ThreadPoolExecutor并重写beforeExecute(wt, task)方法就可以增强功能。

除了执行之前在执行之后有一个afterExecute(task, null) 它的功能和 beforeExecute(wt, task)相同,在任务执行后做一些增强。

这样新增线程的整体流程就说完了。

2.2、新增线程时加锁的目的

接下来还有一些重要细节,比如在addWorker 中增加线程时加锁的目的

我们发现整个新增线程的逻辑中唯一有增加操作的只有  workers.add(w)

workers.add(w);

可以肯定加锁的目的就是保证 workers.add(w)的线程安全的;那么为什么要保证这个的线程安全呢?

答案就在线程池的停止操作上,我们知道想要停止线程池中的线程就要,对将线程池中的所有线程都执行打断操作。如果如果在打断线程的时候,此时有新的线程加进来而没有被设置打断标记,那么就会出现线程池中的线程无法全部打断退出的情况。所以必须要保证线程打断和新增时的线程安全问题。

查看 线程池的 shutdownNow()方法,如前面所说的那样,在打断之前也要加锁保证线程安全。

3、关闭线程

3.1关闭线程流程

在上一节对runWorker时可以看到有一个rty-catch块,这个的作用就是用来抛出异常的。

在自定义的任务中很可能有各种想不到的问题出现,出现问题线程池并不知道该如何解决,也不能让线程池吞掉异常,所有线程池必须要将异常抛出。那此时线程池中的异常被抛出后代码接下来该如何走呢?因为一旦执行了 throw 抛出这个异常后,程序就会返回跳出while循环,也就是说当前线程就会被释放掉,那线程池又是如何保证池中的线程保持在核心线程数的呢?

进入finally 中的 processWorkerExit 方法这个方法在前面讲线程池的5大状态时有讲过,他的作用就是给线程池兜底,如果线程池小于核心线程数那么就会在新添加一个线程,如果不小于那就返回不会新增加线程。

其他通过.shutdown()和 .shutdownNow()的关闭流程在前面线程池的STOP 和SHUTDOWN状态有详细的讲解,这里就不再赘述了。

3.2、allowCoreThreadTimeOut 变量

前面呢只是讲了具体的执行逻辑有一些细节并没有说,比如 allowCoreThreadTimeOut 变量的作用,这个变量是用来线程池中设置的核心线程是不是要和核心线程以外的线程一样,在一段时间没有任务时就退出;我们可以对线程池设置如下参数。

executor.allowCoreThreadTimeOut(true);

如果这么做了的话,那么线程池就不再维护池中的线程数量保持在设置的核心线程数了。

附录

原码、反码和补码

原码、反码和补码是计算机中对数字的二进制表示方法,主要用于有符号整数的表示。

  • 原码:是最直接的表示方法。对于正数,其原码就是其二进制表示;对于负数,其原码是将最高位(符号位)设为1,其余位表示该数的绝对值的二进制形式。例如,8位二进制数中,+1的原码为00000001,-1的原码为10000001。
  • 反码:正数的反码与其原码相同;负数的反码是其原码符号位不变,其余各位取反。例如,+1的反码为00000001,-1的反码为11111110。
  • 补码:正数的补码与其原码相同;负数的补码是其反码加1。补码在计算机中广泛使用,因为它可以使加法和减法操作使用相同的电路,从而简化了计算过程。例如,+1的补码为00000001,-1的补码为11111111。

需要注意的是,这三种编码方式主要针对有符号整数,并且主要用于计算机内部的运算和存储。在实际应用中,如系统环境设置,如linux平台的目录和文件的默认权限的设置umask,就是使用反码原理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/3251.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从Linux角度具体理解程序翻译过程-----预处理、编译、汇编、链接

目录 前言&#xff1a; 翻译过程 1.预处理 2.编译 3.汇编 4.链接 Linux下对其理解&#xff1a; 1.预处理 拓展&#xff1a; Linux下文件信息&#xff1a; 文件类型&#xff1a; 硬链接数&#xff1a; 文件拥有者&#xff1a; 文件所属组&#xff1a; other&#x…

区块链安全应用-------压力测试

基于已有的链进行测试&#xff08;build_chain默认建的链 四个节 点&#xff09;&#xff1a; 第一步&#xff1a;搭链 1. 安装依赖 在ubuntu操作系统中&#xff0c;操作步骤如下&#xff1a; sudo apt install -y openssl curl 2. 创建操作目录, 下载安装脚本 ## 创建操作…

3个比较不错的Linux云音乐应用程序整理

在现代音乐流媒体时代&#xff0c;基于云的音乐应用程序因其便利性和可访问性而变得非常流行。Linux 用户尤其寻求可靠且功能丰富的音乐播放器来无缝地享受他们喜爱的音乐。 在这里&#xff0c;我们探讨了三个最好的基于云的音乐应用程序&#xff0c;每个应用程序都提供专为 L…

Java Web 网页设计(1)

不要让追求之舟停泊在幻想的港湾 而应扬起奋斗的风帆 驶向现实生活的大海 网页设计 1.首先 添加框架支持 找到目录右键添加 找到Web Application选中 点击OK 然后 编辑设置 找到Tomcat--local 选中 点击OK 名称可以自己设置 找到对应文件夹路径 把Tomcat添加到项目里面 因为…

【Hadoop】-HDFS的Shell操作[3]

目录 前言 一、HDFS集群启停命令 1.一键启停脚本可用 2.独立进程启停可用 二、文件系统操作命令 1、创建文件夹 2、查看指定目录下内容 3、上传文件到HDFS指定目录下 4、查看HDFS文件内容 5、下载HDFS文件 6、拷贝HDFS文件 7、追加数据到HDFS文件中 8、HDFS数据移…

哪吒汽车把最后的翻身筹码,全压在了这辆新车上

正如比亚迪王传福所说&#xff0c;新能源车市场已进入惨烈淘汰赛环节。 近几年国内新能源车销量增长势头迅猛&#xff0c;仅过去的 2023 年产销便分别达 958.7 万辆和 949.5 万辆&#xff0c;同比增长 35.8% 和 37.9%。 销量高速增长背后自然也带来了越来越激烈的竞争。 过去…

Footprint Analytics 与 GalaChain 达成战略合作

​ Footprint Analytics 宣布与 GalaChain 达成战略合作。GalaChain 是 Gala 旗下的 Layer 1 区块链。此次合作标志着双方在游戏&#xff08;包括 Gala Games) 、娱乐和金融等多个行业的区块链生态系统革新方面迈出了重要的一步。 GalaChain 致力于满足企业级项目的广泛需求&…

算法-栈操作

1047. 删除字符串中的所有相邻重复项 - 力扣&#xff08;LeetCode&#xff09; class Solution { public:string removeDuplicates(string s) {string stack;for(char& ch:s){if(stack.size()>0&&chstack.back()){stack.pop_back();}else{stack.push_back(ch);}…

AI大模型实现软件智能化落地实践

1、什么是大模型 大型语言模型&#xff08;Large Language Model&#xff0c;LLM&#xff1b;Large Language Models&#xff0c;LLMs)。 大语言模型是一种深度学习模型&#xff0c;特别是属于自然语言处理&#xff08;NLP&#xff09;的领域&#xff0c;一般是指包含数干亿&…

Pandas 模块-操纵数据(11)-二元运算--超级add、sub、mul、div、mod、pow等等

目录 1. DataFrame.add 1.1 DataFrame.add 语法结构 1.2 DataFrame.add 参数说明 1.3 DataFrame.add 用法示例 1.3.1 正常的使用 1.3.2 需要注意类型相符合 2. DataFrame.sub 2.1 DataFrame.sub 语法结构 2.2 DataFrame.sub 参数说明 2.3 DataFrame.sub 用法示例 3.…

传媒论坛编辑部传媒论坛杂志社传媒论坛杂志2024年第7期目录

专题│场景传播研究 场景传播&#xff1a;一场遮盖自我与寻找自我的博弈 胡沈明; 3 基于CiteSpace的中国场景传播研究热点分析 管倩;粟银慧; 4-610《传媒论坛》投稿&#xff1a;cnqikantg126.com 数字世界的美与危&#xff1a;场景传播的失范与应对之举 王依晗;章洁…

分布式-知识体系

分布式系统 本质就是一堆机器的协同&#xff0c;要做的就是用各种手段来让机器的运行达到预期 分布式业务场景 分布式四纵四横说 基于 MSA&#xff08;微服务架构&#xff09;的分布式知识体系 相关概念 – 【摘自网络原文】 节点与网络 节点 传统的节点也就是一台单体的物…

MySQL数据类型:字符串类型详解

MySQL数据类型&#xff1a;字符串类型详解 在MySQL数据库中&#xff0c;字符串数据类型用于存储各种文本信息。这些数据类型主要包括CHAR、VARCHAR、TEXT和BLOB等。 CHAR与VARCHAR CHAR CHAR类型用于存储固定长度的字符串。它的长度在创建表时就已确定&#xff0c;长度范围…

QJ71C24N-R2 三菱Q系列串行通信模块

三菱Q系列串行通信模块是通过串行通信用的RS-232、RS-422/485线路将对方设备与Q系列可编程控制器CPU相连接,以实现如下所示的数据通信的模块。通过使用调制解调器/终端适配器,可以利用公共线路(模拟/数字)实现与远程设备间的数据通信。 QJ71C24N-R2参数说明&#xff1a;串行RS-…

为什么36KbRAM会配置为32K×1,少的那4Kb去哪了?

首先我们需要了解BRAM的相关知识&#xff0c;可以参考下面两篇文章&#xff1a; Xinlinx FPGA内的存储器BRAM全解-CSDN博客 为何有时简单双口RAM是真双口RAM资源的一半-CSDN博客 本问题的背景是&#xff1a; 每个36Kb块RAM也可以配置成深度宽度为64K 1(当与相邻的36KB块RA…

淘宝新店没有流量和访客怎么办

淘宝新店没有流量和访客时&#xff0c;可以采取以下措施来提升店铺的流量和吸引更多的访客&#xff1a; 3an推客是给商家提供的营销工具&#xff0c;3an推客CPS推广模式由商家自主设置佣金比例&#xff0c;以及设置商品优惠券&#xff0c;激励推广者去帮助商家推广商品链接&…

SVG 绘制微信订阅号icon

效果 代码 <!DOCTYPE html> <html> <body><svg xmlns"http://www.w3.org/2000/svg" version"1.1" width"600" height"600"><rect x"0" y"0" rx"0" ry"0" width&…

JavaEE 初阶篇-深入了解 UDP 通信与 TCP 通信(综合案例:实现 TCP 通信群聊)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 UDP 通信 1.1 DatagramSocket 类 1.2 DatagramPacket 类 1.3 实现 UDP 通信&#xff08;一发一收&#xff09; 1.3.1 客户端的开发 1.3.2 服务端的开发 1.4 实现 …

Arm功耗管理精讲与实战

安全之安全(security)博客目录导读 思考 1、为什么要功耗管理&#xff1f;SOC架构中功耗管理示例&#xff1f;功耗管理挑战&#xff1f; 2、从单核->多核->big.LITTLE->DynamIQ&#xff0c;功耗管理架构演进? 3、什么是电压域&#xff1f;什么是电源域&#xff1f…

C++高级特性:异常概念与处理机制(十四)

1、异常的基本概念 异常&#xff1a;是指在程序运行的过程中发生的一些异常事件&#xff08;如&#xff1a;除数为0&#xff0c;数组下标越界&#xff0c;栈溢出&#xff0c;访问非法内存等&#xff09; C的异常机制相比C语言的异常处理&#xff1a; 函数的返回值可以忽略&…