原理概述
其实 Java 线程池的实现原理很简单,说白了就是一个线程集合 workerSet 和一个阻塞队列 workQueue。
当用户向线程池提交一个任务时,线程池会先将任务放入 workQueue 中。workerSet 中的线程会不断的从 workQueue 中获取线程然后执行。当 workQueue 中没有任务的时候,worker 就会阻塞,直到队列中有任务了就取出来继续执行。
上图是一张线程池工作的精简图,实际的过程比这个要复杂的多,不过这些应该能够完全覆盖到线程池的整个工作流程了。
整个过程可以拆分成以下几个部分:
1、提交任务
当向线程池提交一个新的任务时,线程池有三种处理情况,分别是:创建一个工作线程来执行该任务、将任务加入阻塞队列、拒绝该任务。
提交任务的过程也可以拆分成以下几个部分:
- 当工作线程数小于核心线程数时,直接创建新的核心工作线程
- 当工作线程数不小于核心线程数时,就需要尝试将任务添加到阻塞队列中去
- 如果能够加入成功,说明队列还没有满,那么需要做以下的二次验证来保证添加进去的任务能够成功被执行
- 验证当前线程池的运行状态,如果是非RUNNING状态,则需要将任务从阻塞队列中移除,然后拒绝该任务
- 验证当前线程池中的工作线程的个数,如果为 0,则需要主动添加一个空工作线程来执行刚刚添加到阻塞队列中的任务
- 如果加入失败,则说明队列已经满了,那么这时就需要创建新的“临时”工作线程来执行任务
- 如果创建成功,则直接执行该任务
- 如果创建失败,则说明工作线程数已经等于最大线程数了,则只能拒绝该任务了
整个过程可以用下面这张图来表示:
2、创建工作线程
创建工作线程需要做一系列的判断,需要确保当前线程池可以创建新的线程之后,才能创建。
首先,当线程池的状态是 SHUTDOWN 或者 STOP 时,则不能创建新的线程。
另外,当线程工厂创建线程失败时,也不能创建新的线程。
还有就是当前工作线程的数量与核心线程数、最大线程数进行比较,如果前者大于后者的话,也不允许创建。
除此之外,会尝试通过 CAS 来自增工作线程的个数,如果自增成功了,则会创建新的工作线程,即 Worker 对象。
然后加锁进行二次验证是否能够创建工作线程,最后如果创建成功,则会启动该工作线程。
3、启动工作线程
当工作线程创建成功后,也就是 Worker 对象已经创建好了,这时就需要启动该工作线程,让线程开始干活了,Worker 对象中关联着一个 Thread,所以要启动工作线程的话,只要通过 worker.thread.start() 来启动该线程即可。
启动完了之后,就会执行 Worker 对象的 run 方法,因为 Worker 实现了 Runnable 接口,所以本质上 Worker 也是一个线程。
通过线程 start 开启之后就会调用到 Runnable 的 run 方法,在 run 方法中,调用了 runWorker(this) 方法,也就是把当前对象传递给了 runWorker 方法,让他来执行。
4、获取任务并执行
在 runWorker 方法被调用之后,就是执行具体的任务了,首先需要拿到一个可以执行的任务,而 Worker 对象中默认绑定了一个任务,如果该任务不为空的话,那么就是直接执行。
执行完了之后,就会去阻塞队列中获取任务来执行,而获取任务的过程,需要考虑当前工作线程的个数。
- 如果工作线程数大于核心线程数,那么就需要通过 poll 来获取,因为这时需要对闲置的线程进行回收;
- 如果工作线程数小于等于核心线程数,那么就可以通过 take 来获取了,因此这时所有的线程都是核心线程,不需要进行回收,前提是没有设置 allowCoreThreadTimeOut
线程池到底是如何复用线程的
在 Java 当中,每个 Thread 线程都有一个 start 方法,在程序调用 start 方法启动线程时,Java 虚拟机会调用该类的 run 方法。 在 Thread 类的 run 方法中其实调用了 Runnable 对象的 run 方法,因此可以继承 Thread 类,在 run 方法中不断循环调用传递进来的 Runnable 对象的 run 方法,程序就会不断地执行 run 方法中的代码。
Java 线程池可以把线程和任务进行解耦,线程归线程,任务归任务,摆脱了之前通过 Thread 创建线程时的一个线程必须对应一个任务的限制。在线程池中,同一个线程可以从 BlockingQueue 中不断提取新任务来执行,其核心原理在于线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的 run 方法,把 run 方法当作和普通方法一样的地位去调用,相当于把每个任务的 run() 方法串联了起来,所以线程数量并不增加。
在 execute()
方法中,多次调用 addWorker()
方法把任务传入,addWorker
方法会添加并启动一个 Worker
。
这里的 Worker
是何方神圣呢?Worker
可以理解为是对 Thread
的包装,Worker
内部有一个 Thread
对象,它正是最终真正执行任务的线程,Worker
承担了任务执行者的角色,添加到任务队列中的每一个 Task
对象(Runnable
)的 run()
方法最终都是由 Worker
对象来执行的。实际上 Worker
对象本身既是一个 AQS 组件,同时也是一个Runnable
对象。
所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。线程复用的逻辑实现主要在 Worker 类中的 run 方法里执行的 runWorker 方法中,简化后的 runWorker 方法代码如下所示。
可以看出,实现线程复用的逻辑主要在一个不停循环的 while 循环体中。
- 通过取
Worker
的firstTask
或者通过getTask
方法从workQueue
中获取待执行的任务。 - 直接调用
task
的run
方法来执行具体的任务(而不是新建线程)。
在这里,我们找到了最终的实现,通过取 Worker
的 firstTask
或者 getTask
方法从 workQueue
中取出了新任务,并直接调用 Runnable
的 run
方法来执行任务,也就是如之前所说的,每个线程都始终在一个大循环中,反复获取任务,然后执行任务,从而实现了线程的复用。
所以总结一下,大概的整体执行结构图应当如下所示:
从整体上看,这是一种职责分离的设计思想,Worker 是 Worker,任务是任务,它俩没必要纠缠在一起。
参考:
- Java线程池的工作原理
- Java 线程池的工作原理
- 线程池实现“线程复用”的原理?