JAVA并发编程(二)_线程池

JAVA线程池

1.1Java 线程池之 Executor 框架

为了实现线程池和管理线程池,JDK 给我们提供了基于 Executor 接口的一系列接口、抽象类、实现类,我们把它称作线程池的 Executor 框架,Executor 框架本质上是一个线程池;

在这里插入图片描述

​ Java 线程(java.lang.Thread)被一对一映射为本地操作系统内核线程,Java 线程启动时会创建一个本地操作系统线程,操作系统会调度所有线程并将它们分配给可用的 CPU 执行,当该 Java 线程终止时,这个操作系统线程也会被回收;
实际上这是两层线程调度模型:
​ (1)上层 Java 线程的调度由 Executor 框架调度;
​ (2)下层操作系统的线程调度由操作系统调度;
​ Java 的线程是这么设计的,包含两部分:
​ 1、工作任务;(Runnable 和 Callable)
​ 2、执行机制;(Thread、Executor 框架)

1.2Executor 框架 的接口与类结构

  • java.util.concurrent (并发编程的工具) juc
  • java.util.concurrent.atomic (变量的线程安全的原子性操作)
  • java.util.concurrent.locks (用于锁定和条件等待同步等)
  • Executor [ɪɡˈzekjʊtə] 执行人、执行者

1.3线程池的七大参数

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
15,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);

构造方法最多是 7 个参数;

1)int corePoolSize,

​ 指定线程池中的核心线程数量(最少的线程个数),线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,它们也不会被销毁,除非设置了 allowCoreThreadTimeOut;默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程;在实际中如果需要线程池创建之后立即创建线程,可以通过以下两种方式:
​ prestartCoreThread():boolean prestartCoreThread(),初始化一个核心线程;
​ prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心线程;

2) BlockingQueue workQueue,

任务队列,当核心线程全部繁忙时,由 execute/submit 方法提交的 Runnable 任务存放到该任务队列中,等待被核心线程来执行;

3)int maximumPoolSize

指定线程池中允许的最大线程数,当核心线程全部繁忙且任务队列存满之后,线程池会临时追加线程,直到总线程数达到 maximumPoolSize 这个上限;

4)long keepAliveTime,

线程空闲超时时间,如果一个线程处于空闲状态,并且当前的线程数量大于 corePoolSize,那么在指定时间后,这个空闲线程会被销毁;

5) TimeUnit unit

​ keepAliveTime 的时间单位 (天、小时、分、秒…)

6) ThreadFactory threadFactory,

线程工厂,用于创建线程,一般采用默认的即可,也可以自定义实现;
Executors.defaultThreadFactory(),
Executors.privilegedThreadFactory(),

7) RejectedExecutionHandler handler,

拒绝策略(饱和策略),当任务太多来不及处理时,如何“拒绝”任务?
任务拒绝是线程池的保护措施,当核心线程 corePoolSize 正在执行任务、线程池的任务队列
workQueue 已满、并且线程池中的线程数达到 maximumPoolSize 时,就需要“拒绝”掉新提交
过来的任务;

示例:

package com.lisus.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class Test01 {public static void main(String[] args) {Thread t=new Thread(){@Overridepublic void run() {System.out.println("Runnable任务1");}};t.start();Thread t2=new Thread(new Runnable() {@Overridepublic void run() {System.out.println("Runnable任务2");}});t2.start();//基于Executor框架实现线程池ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5,10,15,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(5),new ThreadPoolExecutor.CallerRunsPolicy());threadPoolExecutor.execute(()->{System.out.println("工作任务2");});//线程池关闭threadPoolExecutor.shutdown();}
}
package com.lisus.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class Test02 {//基于Executor框架实现线程池 (此时线程池中一个线程也没有)public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5,10,15,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());//当提交了一个工作任务,此时线程池中就有一个线程threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName());});//关闭线程池//threadPoolExecutor.shutdown();//当核心线程处于空闲状态时候,允许销毁这些空闲的核心线程,默认是不允许销毁核心线程的//threadPoolExecutor.allowCoreThreadTimeOut(true);//如果想创建线程池后,立刻就创建好线程,那么执行:threadPoolExecutor.prestartCoreThread();//初始化/创建一个核心线程threadPoolExecutor.prestartAllCoreThreads();//初始化/创建所有的核心线程for (int i=0;i<50;i++){threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName());});}threadPoolExecutor.shutdown();}
}
package com.lisus.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class Test03 {public static void main(String[] args) {//基于Executor框架实现线程池ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5,12,5,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(5),new MyThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName());});}/*** 自己实现线程工厂*/static class MyThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "my-thread");}}
}
package com.lisus.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** ThreadPoolExecutor线程池** @author Cat老师,关注我,抖音搜索:java512*/
public class Test04 {public static void main(String[] args) {//基于Executor框架实现线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,1,15,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(2),Executors.defaultThreadFactory(),//Executors.privilegedThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());//同时提交4个任务threadPoolExecutor.execute(new MyRunnable(1));threadPoolExecutor.execute(new MyRunnable(2));threadPoolExecutor.execute(new MyRunnable(3));threadPoolExecutor.execute(new MyRunnable(4));threadPoolExecutor.shutdown();}static class MyRunnable implements Runnable {private int i;public MyRunnable(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": " + this.i);}}
}运行结果:
main: 4
pool-1-thread-1: 1
pool-1-thread-1: 2
pool-1-thread-1: 3Process finished with exit code 0

1.4线程池的拒绝策略

JDK 提供了 4 种内置的拒绝策略:AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy 和DiscardPolicy;
1、AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常,这是线程池默认、的拒绝策略,在任务不能再提交的时候抛出异常,让开发人员及时知道程序运行状态,这样能在系统不能承载更大的并发量时,及时通过异常信息发现;

package com.lisus.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class Test05 {public static void main(String[] args) {//基于Executor框架实现线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,16,15,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),//Executors.privilegedThreadFactory(),new ThreadPoolExecutor.AbortPolicy()//new MyRejectedExecutionHandler());for (int i = 0; i < 40000; i++) {threadPoolExecutor.execute(new MyRunnable(i));}threadPoolExecutor.shutdown();}static class MyRunnable implements Runnable {private int i;public MyRunnable(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": " + this.i);}}
}
运行结果
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lisus.threadpool.Test05$MyRunnable@61bbe9ba rejected from java.util.concurrent.ThreadPoolExecutor@610455d6[Running, pool size = 16, active threads = 16, queued tasks = 10, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at com.lisus.threadpool.Test05.main(Test05.java:24)
pool-1-thread-1: 0
pool-1-thread-1: 8
pool-1-thread-1: 9
pool-1-thread-1: 10
pool-1-thread-1: 11
pool-1-thread-1: 12
pool-1-thread-1: 13
pool-1-thread-1: 14
pool-1-thread-1: 15
pool-1-thread-1: 16
pool-1-thread-1: 17
pool-1-thread-2: 1
pool-1-thread-3: 2
pool-1-thread-4: 3
pool-1-thread-5: 4
pool-1-thread-6: 5
pool-1-thread-7: 6
pool-1-thread-8: 7
pool-1-thread-9: 18
pool-1-thread-10: 19
pool-1-thread-11: 20
pool-1-thread-12: 21
pool-1-thread-13: 22
pool-1-thread-14: 23
pool-1-thread-15: 24
pool-1-thread-16: 25    

2、DiscardPolicy:直接丢弃任务,不抛出异常,使用此策略可能会使我们无法发现系统的异、常状态,建议一些无关紧要的业务采用此策略;

package com.lisus.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class Test05 {public static void main(String[] args) {//基于Executor框架实现线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,16,15,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),//Executors.privilegedThreadFactory(),new ThreadPoolExecutor.DiscardPolicy()//new MyRejectedExecutionHandler());for (int i = 0; i < 40000; i++) {threadPoolExecutor.execute(new MyRunnable(i));}threadPoolExecutor.shutdown();}static class MyRunnable implements Runnable {private int i;public MyRunnable(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": " + this.i);}}
}
运行结果
pool-1-thread-1: 0
pool-1-thread-1: 8
pool-1-thread-1: 9
pool-1-thread-1: 10
pool-1-thread-1: 11
pool-1-thread-1: 12
pool-1-thread-1: 13
pool-1-thread-1: 14
pool-1-thread-1: 15
pool-1-thread-1: 16
pool-1-thread-1: 17
pool-1-thread-2: 1
pool-1-thread-3: 2
pool-1-thread-4: 3
pool-1-thread-5: 4
pool-1-thread-6: 5
pool-1-thread-7: 6
pool-1-thread-8: 7
pool-1-thread-9: 18
pool-1-thread-10: 19
pool-1-thread-11: 20
pool-1-thread-12: 21
pool-1-thread-13: 22
pool-1-thread-14: 23
pool-1-thread-15: 24
pool-1-thread-16: 25Process finished with exit code 0

3、DiscardOldestPolicy:丢弃任务队列中靠最前的任务,并执行当前任务,是否要采用此拒绝策略,根据实际业务是否允许丢弃老任务来评估和衡量;

package com.lisus.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class Test05 {public static void main(String[] args) {//基于Executor框架实现线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,16,15,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),//Executors.privilegedThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy()//new MyRejectedExecutionHandler());for (int i = 0; i < 40000; i++) {threadPoolExecutor.execute(new MyRunnable(i));}threadPoolExecutor.shutdown();}static class MyRunnable implements Runnable {private int i;public MyRunnable(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": " + this.i);}}
}

4、CallerRunsPolicy: 交由任务的调用线程(提交任务的线程)来执行当前任务;这种拒绝策略会让所有任务都能得到执行,适合大量计算类型的任务执行,使用这种策略的最终目标是要、让每个任务都能执行完毕,而使用多线程执行计算任务只是作为增大吞吐量的手段;
新来的任务可以用 main 线程去执行,不用线程池里面的线程执行;

package com.lisus.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class Test05 {public static void main(String[] args) {//基于Executor框架实现线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,16,15,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),//Executors.privilegedThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy()//new MyRejectedExecutionHandler());for (int i = 0; i < 40000; i++) {threadPoolExecutor.execute(new MyRunnable(i));}threadPoolExecutor.shutdown();}static class MyRunnable implements Runnable {private int i;public MyRunnable(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": " + this.i);}}
}

除了上面的四种拒绝策略,还可以通过实现 RejectedExecutionHandler 接口,实现自定义的拒绝策略;

package com.lisus.threadpool;import java.util.concurrent.*;public class Test05 {public static void main(String[] args) {//基于Executor框架实现线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,16,15,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),//Executors.privilegedThreadFactory(),//new ThreadPoolExecutor.CallerRunsPolicy()new MyRejectedExecutionHandler());for (int i = 0; i < 40000; i++) {threadPoolExecutor.execute(new MyRunnable(i));}threadPoolExecutor.shutdown();}static class MyRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {//如果任务队列满了,就超时等待,可以设置一个时间try {executor.getQueue().offer(r, 60, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}}}static class MyRunnable implements Runnable {private int i;public MyRunnable(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": " + this.i);}}
}

AbortPolicy 异常中止策略:异常中止,无特殊场景;
DiscardPolicy 丢弃策略:无关紧要的任务(文章点击量、商品浏览量等);
DiscardOldestPolicy 弃老策略:允许丢掉老数据的场景;
CallerRunsPolicy 调用者运行策略:不允许失败场景(对性能要求不高、并发量较小的场景);

1.5线程池的原理

在这里插入图片描述

1.6线程池底层源码实现

1.6.1线程池构造方法

在这里插入图片描述

1.6.2线程池源码-控制变量

在这里插入图片描述

COUNT_BITS = 29
CAPACITY = (1 << COUNT_BITS) - 1
int 类型的数是占用 4 字节,32 位,所以前面填了一堆 0;
原码:00000000 00000000 00000000 00000001
左移:00100000 00000000 00000000 00000000
减一:00011111 11111111 11111111 11111111 (536870911 = 5 亿多)

1.6.3线程池源码-线程池状态值

在这里插入图片描述

1.6.7线程池源码- 核心源码解读-execute
    public void execute(Runnable command) {if (command == null)throw new NullPointerException();//获取 clt 控制变量的值,clt 控制变量记录着 runState 和 workerCount 的值;int c = ctl.get();/*** workerCountOf方法获取控制变量ctl低29位值* 如果当前活动线程小于核心线程corePoolSize,,则新建一个线程放入线程池中,并把任务添加到该线程中运行*/if (workerCountOf(c) < corePoolSize) {/*** addWorker 方法* 第一参数是要提交的工作任务* 第二个参数:* 如果是true,根据corePoolSize来判断表示添加核心线程;(保持稳定的线程数来处理任务)*  如果是 false,根据 maximumPoolSize 来判断,表示添加非核心线程;(应对突发的任务处理)*///addWorker()方法会检查运行状态和工作线程数,如果返回 false 则说明线程没有创建成功;if (addWorker(command, true))//添加成功则返回;return;//如果添加失败,则重新获取控制变量 ctl 的值;c = ctl.get();}//到这里了,说明 workerCountOf(c) >= corePoolSize,并且如果当前线程池是运行状态并且工作任务添加到任务队列成功if (isRunning(c) && workQueue.offer(command)) {// 重新获取 ctl 值int recheck = ctl.get();//再次判断线程池是否是运行状态,如果不是运行状态,由于之前已经把 command 添加到 workQueue 中了,此时需要移除该 command;if (! isRunning(recheck) && remove(command))reject(command);/*** 线程池是运行状态,获取一下线程池中的有效线程数,如果是 0,则执行 addWorker()方法;* addWorker()方法:* 第一个参数为 null,表示在线程池中创建一个线程,但不启动;* 第二个参数为 false,表示是非核心线程;** 接下来这里没有写 else,表示如果判断 workerCount 大于 0,则不需要做什么处理,直接返回,* 加入到 workQueue 中的 command 会在将来的某个时刻被执行;*/else if (workerCountOf(recheck) == 0)//此处是创建一个线程,但并没有传入任务,因为任务已经被添加到 workQueue 中了,到时候线程会从从 workQueue 中获取任务来执行;//所以当 workerCountOf(recheck) == 0 时执行 addWorker(null, false);//是为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务;addWorker(null, false);}/*** 如果执行到这里,有两种情况:* 1. 线程池已经不是 RUNNING 状态;* 2. 线程池是 RUNNING 状态,但往 workQueue 已经放不进去,即 workerCount >= corePoolSize,并且 workQueue 已满;* 此时再次调用 addWorker()方法,第二个参数为 false,表示非核心线程,如果失败则拒绝该任务;*/else if (!addWorker(command, false))reject(command);}
1.6.8线程池源码-核心源码解读-addWorker
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {// 获取线程池控制变量的值int c = ctl.get();//线程运行状态int rs = runStateOf(c);//  if判断,如果rs>=SHUTDOWN,并且(判断3个条件,只要有1个不满足)返回falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {//获取线程数int wc = workerCountOf(c);// 如果 wc 超过 CAPACITY,也就是 ctl 的低 29 位的最大值(二进制是 29 个 1),返回 false;if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 尝试增加 workerCount,如果成功,则跳出外层 for 循环if (compareAndIncrementWorkerCount(c))break retry;// 如果增加 workerCount 失败,则重新获取控制变量 ctl 的值c = ctl.get();  // Re-read ctl// 如果当前线程池的运行状态不等于 rs,说明线程池运行状态已被改变,返回外层 for 循环继续执行if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// Worker 线程是否启动boolean workerStarted = false;// Worker 线程是否添加boolean workerAdded = false;Worker w = null;try {// 根据 firstTask 来创建 Worker 对象w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查线程池运行状态int rs = runStateOf(ctl.get());// rs < SHUTDOWN 表示是 RUNNING 状态;// 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程。// 因为在 SHUTDOWN 时不会在添加新的任务,但还是会执行 workQueue 中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查线程已经是运行状态,抛出非法线程状态异常if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// workers 是一个 HashSetworkers.add(w);// largestPoolSize 记录着线程池中出现过的最大线程数量int s = workers.size();if (s > largestPoolSize)// 把历史上出现过的最大线程数的值更新一下largestPoolSize = s;// Worker 线程添加成功workerAdded = true;}} finally {// 释放 ReentrantLock 锁mainLock.unlock();}if (workerAdded) {t.start();// Worker 线程已经启动workerStarted = true;}}} finally {if (! workerStarted)// Worker 线程没有启动成功addWorkerFailed(w);}// 返回 Worker 线程是否启动成功return workerStarted;}

在这里插入图片描述

1.6.9线程池源码-核心源码解读-runWorker 方法
  final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//允许响应中断w.unlock(); // allow interrupts// 线程退出的原因,true 是任务导致,false 是线程正常退出boolean completedAbruptly = true;try {// 当前任务为空,且当前任务队列为空,停止循环while (task != null || (task = getTask()) != null) {// 上锁处理并发问题,防止在 shutdown()时终止正在运行的 workerw.lock();// 如果线程池是 stop 状态,并且线程没有被中断,就要确保线程被中断,如果线程池不是,确保线程池没有被中断;// 清除当前线程的中断标志,做一个 recheck 来应对 shutdownNow 方法if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行前(空方法,由子类重写实现)beforeExecute(wt, task);Throwable thrown = null;try {// 执行 Runnable 类的 run()方法task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行后(空方法,由子类重写实现)afterExecute(task, thrown);}} finally {task = null;// 完成的任务数+1w.completedTasks++;// 释放锁w.unlock();}}// 到此,线程是正常退出completedAbruptly = false;} finally {// 处理 worker 的退出processWorkerExit(w, completedAbruptly);}}
1.6.10线程池源码-核心源码解读-getTask 方法
private Runnable getTask() {// 表示上一次从任务队列中取任务时是否超时boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary./**如果线程池为`SHUTDOWN`状态且任务队列为空(线程池 shutdown 状态可以处理任务队列中的任务,不再接受新任务)或者线程池状态>=STOP,则意味着线程池不必再获取任务了,将当前工作线程数量-1 并返回 null;*/if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);/**timed 变量用于判断是否需要进行超时控制;allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;表示对于超过核心线程数量的这些线程,需要进行超时控制(默认情况)*/boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 两个条件全部为 true,则通过 CAS 使工作线程数-1,即去除工作线程:* 条件 1:工作线程数大于 maximumPoolSize,或(工作线程需要超时控制且上次在任务队列拉取任务超时)* 条件 2:wc > 1 或任务队列为空* 如果减 1 失败,则返回重试;*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {/*** 执行到这里,说明已经经过前面的校验,开始真正获取 task;* 根据 timed 来判断,如果工作线程有超时时间,则通过任务队列的 poll 方法进行超时等待方式获取任务 ,* 如果在 keepAliveTime 时间内没有获取到任务,则返回 null,否则通过 take 方法;* take 方法表示如果这时任务队列为空,则会阻塞直到任务队列不为空;* 一般 poll()用于普通线程、take()用于核心线程*/Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;// 如果 r == null,说明已经超时得不到任务,timedOut 设置为 truetimedOut = true;} catch (InterruptedException retry) {// 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并返回循环重试timedOut = false;}}}
1.6.11线程池源码-核心源码解读- 线程池复用
1、threadPoolExecutor.execute(runnable)
2addWorker(command, boolean)
3Worker w = new Worker(firstTask); //已经创建了 Thread
4HashSet workers.add(w);
5、t.start(); //w.thread.start();
6、worker.run();
7runWorker(this)
8、task = w.firstTask 或者 task = getTask()
9、task.run();
1.6.12线程池源码-核心源码解读- 线程池大小变化
    private void processWorkerExit(Worker w, boolean completedAbruptly) {//completedAbruptly 为 true 表示线程异常执行结束//completedAbruptly 为 false 表示线程正常执行结束if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();//从线程 set 集合中移除工作线程,该过程需要加锁,因为 HashSet 是线程不安全的集合final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//统计完成的任务数:将该 worker 已完成的任务数追加到线程池已完成的任务数completedTaskCount += w.completedTasks;//从 HashSet<Worker>中移除该 workerworkers.remove(w);} finally {//释放锁mainLock.unlock();}//根据线程池状态进行判断是否结束线程池tryTerminate();int c = ctl.get();//当线程池是 RUNNING 或 SHUTDOWN 状态时if (runStateLessThan(c, STOP)) {//如果 worker 不是异常结束if (!completedAbruptly) {//如果 allowCoreThreadTimeOut=true,最小线程个数就可以变为 0;int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//但是,如果等待队列有任务,至少保留一个 worker 来处理任务if (min == 0 && ! workQueue.isEmpty())min = 1;//如果工作线程大于等于核心线程,直接 return 就行了,否则就需要添加一个线程;if (workerCountOf(c) >= min)return; // replacement not needed}//是异常执行结束的,添加一个线程去执行任务addWorker(null, false);}}
    try {//统计完成的任务数:将该 worker 已完成的任务数追加到线程池已完成的任务数completedTaskCount += w.completedTasks;//从 HashSet<Worker>中移除该 workerworkers.remove(w);} finally {//释放锁mainLock.unlock();}//根据线程池状态进行判断是否结束线程池tryTerminate();int c = ctl.get();//当线程池是 RUNNING 或 SHUTDOWN 状态时if (runStateLessThan(c, STOP)) {//如果 worker 不是异常结束if (!completedAbruptly) {//如果 allowCoreThreadTimeOut=true,最小线程个数就可以变为 0;int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//但是,如果等待队列有任务,至少保留一个 worker 来处理任务if (min == 0 && ! workQueue.isEmpty())min = 1;//如果工作线程大于等于核心线程,直接 return 就行了,否则就需要添加一个线程;if (workerCountOf(c) >= min)return; // replacement not needed}//是异常执行结束的,添加一个线程去执行任务addWorker(null, false);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/802743.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python的基础知识学习路线1—python简介与环境配置(最全路线,每部分附有代码操作结果)

一、Python简介 &#xff08;1&#xff09;发展史 Python是由创始人贵铎范罗萨姆&#xff08;Guido van Rossum&#xff09;在阿姆斯特丹于1989年圣诞节期间&#xff0c;为了打发圣诞节的无趣&#xff0c;开发的一个新的解释型脚本语言。之所以选中Python&#xff08;大蟒蛇的…

免费SSL证书跟付费SSL证书有什么区别?

免费SSL证书与付费SSL证书的主要区别如下&#xff1a; 1. 类型与验证级别&#xff1a; - 免费SSL证书通常仅提供域名验证&#xff08;DV&#xff09;&#xff0c;这是一种最基本的验证级别&#xff0c;仅验证域名的所有权&#xff0c;确认申请者对所申请域名的有效控制。 - 付费…

怎样买国债逆回购最划算,国债逆回购手续费是多少?1折

国债逆回购是一种以国债作为抵押物的贷款&#xff0c;是一种能为投资者提高闲置资金增值能力的金融品种。国债逆回购的优势有&#xff1a; 安全性高&#xff0c;因为国债是信用等级最高、违约风险最低的金融资产&#xff0c;而且国债逆回购是在证券交易所进行的受监管的交易 …

深度学习500问——Chapter06: 循环神经网络(RNN)(2)

文章目录 6.4 CNN和RNN的区别 6.5 RNNs与FNNs有什么区别 6.6 RNNs训练和传统ANN训练异同点 6.7 为什么RNN训练的时候Loss波动很大 6.8 标准RNN前向输出流程 6.9 BPTT算法推导 6.9 RNN中为什么会出现梯度消失 6.10 如何解决RNN中的梯度消失问题 6.4 CNN和RNN的区别 类别特点描述…

博客系统实现

一.准备工作 1.创建项目&#xff0c;把前端写好的博客静态页面拷贝到webapp目录中 2.引入依赖&#xff0c;这里主要用到servlet&#xff0c;mysql5.1.47&#xff0c;jacson2.15.0 3.找到右上角的edit configurations->smartTomcat->进行配置 4.数据库设计&#xff1a…

吴恩达深度学习 (week1,2)

文章目录 1、神经网络监督学习2、深度学习兴起原因3、深度学习二元分类4、深度学习Logistic 回归5、Logistic 回归损失函数6、深度学习梯度下降法7、深度学习向量法8、Python 中的广播9、上述学习总结10、大作业实现:rocket::rocket:&#xff08;1&#xff09;训练初始数据&…

Matlab进阶绘图第49期—气泡堆叠图

气泡堆叠图是堆叠图与气泡图的组合—在堆叠图每根柱子上方添加大小不同的气泡&#xff0c;用于表示另外一个数据变量&#xff08;如每根柱子各组分的平均值&#xff09;的大小。 本文利用自己制作的BarBubble工具&#xff0c;进行气泡堆叠图的绘制&#xff0c;先来看一下成品效…

从数据采集到可视化展示Node-Red二次开发4G模块

环境监测正逐步迈入数字化、智能化时代。Node-Red作为一种开源流式编程工具&#xff0c;以其强大的数据处理能力和设备集成便捷性&#xff0c;在构建环境监测数据站中发挥着至关重要的作用。钡铼技术支持Node-Red编程开发&#xff0c;支持BLIoTLink软网关和自定义开发非标协议。…

为什么 MySQL 采用 B+ 树作为索引?

资料来源 : 小林coding 小林官方网站 : 小林coding (xiaolincoding.com) 「为什么 MySQL 采用 B 树作为索引&#xff1f;」这句话&#xff0c;是不是在面试时经常出现。 要解释这个问题&#xff0c;其实不单单要从数据结构的角度出发&#xff0c;还要考虑磁盘 I/O 操作次数&am…

【六 (3)机器学习-机器学习建模步骤/kaggle房价回归实战】

目录 文章导航一、确定问题和目标&#xff1a;1、业务需求分析&#xff1a;2、问题定义&#xff1a;3、目标设定&#xff1a;4、数据可行性评估&#xff1a;5、资源评估&#xff1a;6、风险评估&#xff1a; 二、数据收集&#xff1a;1、明确数据需求2、选择数据来源3、考虑数据…

SpringCloud Alibaba Seata 处理分布式事务

一、前言 接下来是开展一系列的 SpringCloud 的学习之旅&#xff0c;从传统的模块之间调用&#xff0c;一步步的升级为 SpringCloud 模块之间的调用&#xff0c;此篇文章为第十八篇&#xff0c;即使用 Seata 处理分布式事务。 二、分布式事务问题 当单体应用被拆分成微服务应用…

【超简单】基于PaddleSpeech搭建个人语音听写服务

一、【超简单】之基于PaddleSpeech搭建个人语音听写服务 1.需求分析 亲们,你们要写会议纪要嘛?亲们,你们要写会议纪要嘛?亲们,你们要写会议纪要嘛?当您面对成吨的会议录音,着急写会议纪要而不得不愚公移山、人海战术?听的头晕眼花,听的漏洞百出,听的怀疑人生,那么你…

代码随想录算法训练营Day48|LC198 打家劫舍LC213 打家劫舍IILC337 打家劫舍III

一句话总结&#xff1a;前两题白给&#xff0c;第三题树形DP有点难。 原题链接&#xff1a;198 打家劫舍 滚动数组直接秒了。 class Solution {public int rob(int[] nums) {int n nums.length;int first 0, second nums[0];for (int i 2; i < n; i) {int tmp Math.m…

如何开始用 C++ 写一个光栅化渲染器?

光栅化渲染器是计算机图形学中最基础且广泛应用的一种渲染技术&#xff0c;它将三维模型转化为二维图像。下面我们将逐步介绍如何使用C语言从零开始构建一个简单的光栅化渲染器。 一、理解光栅化渲染原理 光栅化是一种将几何数据&#xff08;如点、线、三角形&#xff09;转换…

电商选品难?那是因为你不会用大数据选品工具…

电商选品之所以难&#xff0c;主要有以下几个方面的原因。电商市场更新换代非常快&#xff0c;新的产品不断涌现&#xff0c;旧的产品可能很快就被淘汰。电商选品紧跟市场趋势&#xff0c;不断调整和更新&#xff0c;这对电商运营市场敏感度和反应速度提出了很高的要求。 电商…

110V降9V1A非隔离降压恒压WT5112

110V降9V1A非隔离降压恒压WT5112 嘿&#xff0c;让我来给你说说这个WT5112控制芯片。这可是个厉害的东西&#xff0c;特别适合用在充电器啊、适配器啊还有LED灯这些地方。它最牛的地方就是能稳稳地控制电压和电流&#xff0c;而且还有个什么原边反馈技术让控制得更准。更酷的是…

MySQL_00001_00000

数据准备 员工表&#xff1a;emp Oracle: create table emp ( empno number(4) not null, ename varchar2(10), job varchar2(9), mgr number(4), hiredate date, sal number(7, 2), comm number(7, 2), deptno number(2) ); insert into em…

数据库讲解---(SQL语句--表的使用)【MySQL版本】

零.前言 数据库讲解&#xff08;MySQL版&#xff09;&#xff08;超详细&#xff09;【第一章】-CSDN博客 数据库-ER图教程_e-r图数据库-CSDN博客 数据库讲解&#xff08;MySQL版&#xff09;&#xff08;超详细&#xff09;【第二章】【上】-CSDN博客 一.SQL概述 1.1SQL简…

组合逻辑电路中的竞争与冒险

竞争与冒险 进行理想的组合逻辑电路分析与设计时&#xff0c;没有考虑逻辑门的延迟时间&#xff08;原因&#xff09;对电路产生的影响&#xff0c;且认为电路的输入和输出均处于稳定的逻辑电平。 实际上&#xff0c;信号经过逻辑门需要一定的时间。不同路径上门电路数目不同…

【Qt】文件与音视频

目录 一、输入输出设备类 二、文件读写类 三、文件和目录信息类 四、音视频 4.1 音频 4.2 视频 文件操作是应用程序必不可少的部分。Qt作为一个通用开发库&#xff0c;提供了跨平台的文件操作能力。Qt提供了很多关于文件的类&#xff0c;通过这些类能够对文件系统进行操作…