一、线程池
提前创建一系列的线程,保存在这个线程池中,有任务要执行的时候,从线程池中取出线程来执行。没有任务的时候,线程池放回去。
二、为什么要使用线程池
线程使用上的问题:
-
线程的频繁创建 和 销毁
-
线程的数量过多,会造成CPU资源的开销
-
上下文切换(消耗CPU资源)
-
那么如何实现线程的复用呢? 池化技术
三、线程池的设计猜想
3.1线程池的设计思考?
需求: 实现线程的重复使用
分解:
-
如何使用线程的复用?
-
让线程实现复用的唯一方法,就是让线程不结束
-
那如何让线程执行新的任务呢?也就是说,任务怎么给他执行?
-
[共享内存]-> List.add()
-
-
线程一直处于运行状态,合理吗?
-
有任务来的时候,执行,没有任务的时候,阻塞
-
-
-
结论: 通过阻塞队列的方式,来实现线程池中线程的复用
思考: 通过阻塞队列的方式,如果队列满了,可以阻塞主线程/生产者线程吗?显然不能,那么怎么办:
-
1.增加消费者线程的数量(扩容) 也就是增加线程去执行任务,消费者多了,阻塞队列自然被消费的也快了,就不容易阻塞主线程了
-
2.如果扩容解决不了问题,那只能采用拒绝策略
-
报错
-
直接丢弃这个任务
-
直接普通线程调用task.run(直接重新开启一个线程)
-
队列中头部的等待最久的任务丢弃,然后把当前任务添加到阻塞队列
-
存储起来,后续等待空闲之后重试(自定义去完成)
-
结束的方法:让线程执行结束(run方法执行结束),也就是跳出while循环
3.2 线程池的核心参数
a.线程数量(核心线程数) [初始化时创建]
b. 最大线程数 [还能够扩容多少个线程 与核心线程数的差]
c. 存活时间 [扩容的线程要有一个存活时间]
d. 存活时间单位
e.阻塞队列(使用哪一种阻塞队列)
f.线程工厂(生产线程的工厂)(有默认值)
h.阻绝策略(有默认值 默认抛出异常)
四、Java中提供的线程池
Exectors
-
newScheduledThreadPool 提供周期的线程池
-
newFixedThreadPool 固定线程数量
-
newSingleThreadExecutor 只有一个线程的线程池
-
newCachedThreadPool 可以缓存的线程池->理论上来说,有多少情况,就构建多少个线程
五、自定义线程池源码分析
线程池中的核心线程是延迟初始化的
-
先初始化核心线程
-
调用阻塞队列的方法,把task存进去
-
如果true,说明当前的请求量不大,核心线程就可以搞定
-
false,增加工作线程(非核心线程)
-
如果添加失败,说明当前的工作线程数量达到了最大的线程数,直接调用拒绝策略
-
-
public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 前3位记录运行状态 后29位记录线程数int c = ctl.get();// 1.判断当前工作线程数是否小于核心线程数(延迟初始化)if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // 添加工作线程,并执行任务return;c = ctl.get();}// 2.添加到阻塞队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3. 增加工作线程else if (!addWorker(command, false))reject(command); // 4. 增加失败,则执行拒绝策略}
5.1 addWorker
private boolean addWorker(Runnable firstTask, boolean core) {// 1. 修改工作线程记录数retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c)) //1.1 CAS操作进行修改break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 2. 创建线程并启动boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {// 3. 添加失败 则回滚if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
分析: addWorker主要做3件事,.a.使用CAS操作修改原子值中的工作线程数 b.将任务放到Worker对象中,并启动线程 c.如果线程启动失败,则回滚。线程启动后,则自动调用Worker对象中的run方法。
5.2 runWorker
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 1. while循环保证当前线程不结束,直到task为空while (task != null || (task = getTask()) != null) {// 2.这里是因为表示当前线程正在运行一个任务,其它地方要执行shutdown 你要等我执行结束w.lock(); // worker继承了AQS 实现了互斥锁if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt(); // 是否应该中断 在gettask中处理try {// 空实现,方便扩展beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 空实现,方便扩展afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
5.3 getTask
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);//1. 判断如果线程池已经结束,直接返回Null,需要清理掉所有的工作线程if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 2. 是否允许超时 allowCoreThreadTimeOut 为true 也就是说设置为true 核心线程也 // 可以被销毁// 或者工作线程数大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c)) // cas操作减少工作线程return null;continue;}try {//* 执行任务Runnable r = timed ?// 超时阻塞workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 如果阻塞队列为空,则会阻塞在这个地方workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
分析: getTask中从队列中取出任务并执行。注意:allowCoreThreadTimeOut 设置为true,则核心线程也可以被销毁
5.4 拒绝策略
a. AbortPolicy(抛出异常)
// 报错 抛出异常public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}
b. CallerRunsPolicy (创建一个普通线程,并执行)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
c. DiscardOldestPolicy (从队列头部抛弃一个,执行当前的)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
d. DiscardPolicy (什么也不做)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}