1,概述
线程池是一种池化技术,本质是减少线程对象创建销毁的开销,同对象池、连接池一样,达到对象复用的效果。那么线程池怎么复用呢?即一个或多个Thread对象怎么执行更多的Task?这里面的关键就涉及到了阻塞队列。
其一,所谓阻塞队列,是借助锁实现的生产-消费模型,
以BlockLinkedList为例,其taskFirst方法,通过notEmpy,即Condition实现阻塞等待功能,而Condition内部使用了AQS来实现锁的功能,详情可见Java Lock源码解读-CSDN博客
、
其二,一个线程池有一个极限,即所有的线程对象都在被使用,且达到了最大池数量时,该怎么办呢?这时就产生了Reject策略;
其三,其他参数有
核心线程池数,懒加载式创建;
最大线程数,
线程存活时间,即非核心线程的阻塞等待时间,blockQueue.poll(timeout),
线程工厂,创建线程对象的工厂模式
2,ThreadPoolExecutor解读
以一个简单例子,即ThreadPoolExecutor类为例。
完成构造参数如上,
需要理清除池化技术的实现,
1,如何生产Command?
通过execute方法,
逻辑清晰,
1,如果小于核心线程数时,直接addWorker一个
2,如果线程池状态是RUNNING,并且提供阻塞队列workQueue成功,recheck下状态二重检查,如果check pass则什么都不用做。workQueue.offer提供的command,多个消费者会通过workQueue.take获取。
3,最后,如果添加到非核心线程失败,即addWorker(command,false)返回false,则使用拒绝策略,
addWorker,添加到线程池具体Worker职责。
笔者看下addWorker细节,
这个函数第一段式,
通过CAS操作count+1,如果当前线程池状态是SHUTDOWN或STOP,直接失败。
通过core参数判断是否达到对应最大值,如果达到直接失败。
只有CAS操作成功,才break retry,进入第二段式。
1,创建一个Worker,并且传入第一个task作为参数,并且通过线程工厂创建一个Thread对象,注意newThread时将worker自己传入,其woeker实现了Runnable接口,
另外,Worker实现了AQS,初始状态setState为-1,表示创建时阻止中断。
2,使用mainLock,将新创建的work加入wokers中,
3,添加workers成功后,直接start
4,跟进到Worker#run方法,
通过runWorker启动了一个线程,笔者看下此函数细节。
1,task赋值firstTask,第一次运行不用通过从workQueue中获取task,
2,w.unlock,前文创建Worker时值-1,unlock传release传入-1,state是0,可响应中断。
3,进入无限循环,第一个task不为null,则加锁后执行task。
4,后续的task通过从getTask中获取。
当线程池状态为SHUTDOWN且STOP且队列中没有task时,返回null,线程对象就能在runWorker中返回而被销毁。
否则,通过keepAliveTime参数从阻塞队列中获取task,如果超时,r取值null,timeOut为true,符合条件的线程会退出循环而销毁。
核心便是从workQueue中获取task。
至此,线程池原理完毕。
笔者再啰嗦下,阻塞队列的超时怎么实现的?
答案很简单,通过Condition#awaitNanos传入超时时间即可在超时时返回,更具体是内部通过AQS实现调度,详见Java Lock源码解读-CSDN博客#Condition相关。