此篇文章与大家分享多线程的第六篇文章,关于线程池
如果有不足的或者错误的请您指出!
目录
- 3.线程池
- 3.1标准库的线程池
- 3.2 标准库自己提供的几个工厂类
- 3.3自己实现一个线程池
- 完成大体框架
- 接下来完善构造方法
- 但是此时创建的线程相当于是"核心线程",我们还要设计在任务数量过多,线程忙不过来的时候,就要创建新的线程来缓解
3.线程池
池这个概念在计算机里面是很常见的概念.如常量池,线程池,进程池,内存池…
而普遍引入池的意义就是为了提高效率
我们在之前讲过,为了处理高并发场景,我们引入了线程池.是因为线程相对于进程在 创建和销毁 的时更占优势
但是随着时代的发展,对"频繁"这一概念又有了新的定义,越来越频繁了
在这种环境下,即使是线程创建和销毁的开销也不敢忽略,变得很明显了
那么我们就要进行优化:有两种优化方法
(1)线程池
(2)协程(也叫纤程)
注意:纤程是高版本java才引入的"虚拟线程" (本文不讲)
为什么引入多线程 / 纤程后就能够提高效率?
本质上就是因为 直接调用系统API去创建和销毁线程,需要用户态+内核 配合完成的工作
而通过 线程池 / 纤程 ,只需要通过用户态即可,不需要内核的配合
如果我们直接调用API,创建线程和销毁线程,这个过程需要内核来完成,而内核完成的工作很多时候是不可控的,会带来很大的开销
而使用线程池,就是提前将线程创建好了,放进用户态方面的数据结构里,后面要用的时候,直接从池子里面拿即可,用完了再放回到池子里面去,这个过程就完全是用户态代码,就不需要与内核进行交互了
3.1标准库的线程池
标准库提供的线程池是ThreadPoolExecutor
使用起来比较复杂,因为构造方法中包含了很多参数
其中第四个参数数量是最多的,我们理解完最后一个,就都能够理解了
(1)corePoolSize 和 maxmumPoolSize
第一个是核心线程库,第二个是最大线程数
标准库是这样设定的,将现线程分为核心线程(正式工)和非核心线程(临时工)
而maxmumPoolSize就是 核心线程数量 + 非核心线程数量
一个线程池被创建出来的时候,所包含的线程数量就有 核心线程数量那么多,此时线程池会提供一个submit方法,就是往线程池里面添加任务,一旦任务多了,已有的线程处理不过来(有很多任务在排队),那么此时就会自动创建额外线程来支撑更多的任务
但是即使创建了新的线程,总线程数量还是不能大于最大线程数量
过了一段时间后,任务没那么重了,线程相对清闲,此时就会将创建出来的非核心线程回收
而即使是回收,也只回收掉非核心线程,至少还是要保证,线程池里面的线程数量,不少于核心线程数
这样就能保证,在任务多的时候保证效率,在任务少的时候,系统开销小
在实际开发中,线程数量设置为多少合适?
实际上我们可能听说最多的是 根据计算机的cpu核心数来具体配置
但是与之相关的还有一个更为重要的,就是和你写的程序的时机特点有关
极端一点就可以分成两类:
(1)cpu密集型
在程序中涉及大量的cpu操作,比如
while(true) { count++; }
这种,代码逻辑都是在进行算术运算,逻辑判断,条件运算,函数调用等等
这种程序一跑,就能够立即吃满一个cpu核心
在这种情况下,你的最大线程数量是不能超过cpu核心数的
(2)IO密集型
在程序中涉及大量的等待IO操作(等待过程是不参与调度的)
IO密集型不只是标准输入输出,sleep,操作硬盘,操作网络…也属于IO密集型
那么此时最大线程数的瓶颈就不在与cpu上了,而是考虑其他方面
比如是网络操作程序,那么就要考虑网络带宽的瓶颈
例如你的网卡是 1Gbps,一个线程的读写速率是100Mbps,那么最多也就只能搞10个这样的线程了,硬盘也是同理,而至于标准输入输出这种,搞百八十个也没问题
但是实际上上述两种模型都太极端了,现实我们的程序应该是处于二者之间的,即逻辑中既包含cpu操作,又包含IO操作
最终的答案就是通过实验操作,找到一个合适的值
即对程序进行性能测试,测试的过程中设定不同的线程数量,最终根据实际程序的响应速度和系统开销,找到一个最合适的数目
(2)keepAliveTime,TimeUnit uiit
表示非核心线程,允许空闲的最大时间
非核心线程在线程池不忙的时候,不是立即回收的,通过我们设定的值,比如3s,那么在3s内,如果非核心线程没有任务执行了,此时就可以立即回收了
后面的unit实际上就是等待时间的单位,他是一个枚举类型
(3)workQueue
是线程池里面的任务队列
线程池会提供一个submit方法,让其他线程将任务提交到线程池里面,那么线程池里面就要有一个任务队列,把要执行的任务组织起来,实际上在线程池里面,线程消耗的任务就是任务队列里面的任务,来完成具体工作
具体来说就是其他线程会submit任务到这个队列中,这个队列里面存的元素就是一个Runnable对象,要执行的逻辑就是run方法里面的内容
而我们通过实际场景,就可以指定一个capacity的队列进去,也可以指定带有优先级的队列
(4)工厂模式
也是一种设计模式
主要是解决,基于构造方法创建对象太坑了的问题
举个例子:假设我们要描述一个点
但是我们可能要通过极坐标来表示一个点
但是构不成重载就会编译报错
此时就无法通过构造方法来表示不同构造点的方式了
而工厂模式的核心思路就在于不再使用构造方法创建对象,而是相当于给构造方法包装一层
这样的方法就叫做工厂方法 ,这种写法套路就是工厂模式
甚至,工厂方法被单独的类封装,此时这个类就是工厂类
而ThreadFactory就是标准库中用来创建线程的工厂类
这个工厂类主要就是用来批量给要创建的线程设置一些属性啥的,在工厂方法中就已经把线程的属性设置好了
平时我们一般也不会直接用这个,主要是搭配线程池来使用
(5)RejectedExcutionHandle handler
指的是拒绝策略,实际是是一个枚举类型
用于决定,如果当前任务队列满了,仍然要添加任务进来,要进行怎么样的处理
第一种是abortPolicy,表示直接抛出异常,让程序员快速知道,任务处理不过来了,代码罢工了
第二种是callerPolicy,表示该任务不是由线程池负责执行了,而是交给负责submit的对象去执行
第三种是discardOldestPolicy,表示删除掉原来任务列表里面最开始的任务,让新来的任务来队列中排队
第四种是discardPolicy,表示直接拒绝新来的任务.还是按照原来到节奏执行
3.2 标准库自己提供的几个工厂类
由于标准库自己也知道,ThreadPoolExecuter使用起来比较费劲,于是自己提供了几个工厂类,对于上述线程池又进行进一步封装了
上面这些就是标准库提供的创建线程池的工厂方法
public static void main(String[] args) {//创建一个最普通的线程池,能够根据任务的数目,自动进行线程扩容Executors.newCachedThreadPool();//创建固定线程数目的线程池Executors.newFixedThreadPool(4);//创建一个只包含单线程的线程池Executors.newSingleThreadExecutor();//创建一个固定线程个数,但是任务延时执行的线程池Executors.newScheduledThreadPool(4);}
此时我们就可以通过线程池提供的submit方法往线程池里面添加任务了
如果我们限制好线程池里面线程的数量,就能更好发现线程的复用
注意此时由于线程池内部的线程是前台线程,因此进程不会结束
3.3自己实现一个线程池
一个线程池里面要包含哪些东西??
(1)有若干个线程
(2)有任务队列
(3)提供submit方法
完成大体框架
public class MyThreadPool {//任务队列private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(100);public MyThreadPool(int nThread) {//这里初始化线程池}public void submit (Runnable runnable) throws InterruptedException {//提交任务队列this.blockingQueue.put(runnable);}
}
接下来完善构造方法
public class MyThreadPool {//任务队列private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(100);public MyThreadPool(int nThread) {//这里初始化线程池for(int i = 0; i < nThread; i++) {Thread t = new Thread(() -> {while(true) {try {blockingQueue.take().run();} catch (InterruptedException e) {throw new RuntimeException(e);}}});t.start();}}public void submit (Runnable runnable) throws InterruptedException {//提交任务队列this.blockingQueue.put(runnable);}
}
但是此时创建的线程相当于是"核心线程",我们还要设计在任务数量过多,线程忙不过来的时候,就要创建新的线程来缓解
public class MyThreadPool {//任务队列private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(100);private int maxPoolSize = 0;private List<Thread> threadList = new ArrayList<>();public MyThreadPool(int corePoolSize,int maxPoolSize) {this.maxPoolSize = maxPoolSize;//这里初始化线程池for (int i = 0; i < corePoolSize; i++) {Thread t = new Thread(() -> {while(true) {try {blockingQueue.take().run();} catch (InterruptedException e) {throw new RuntimeException(e);}}});threadList.add(t);t.start();}}public void submit (Runnable runnable) throws InterruptedException {//提交任务队列this.blockingQueue.put(runnable);//判断当前任务队列里面的任务是否比较多//如果是,那么就创建新的线程if(blockingQueue.size() >= 600 && threadList.size() <= maxPoolSize) {Thread t = new Thread(() -> {while(true) {try {blockingQueue.take().run();} catch (InterruptedException e) {throw new RuntimeException(e);}}});t.start();threadList.add(t);}}
}
比较麻烦的是要回收非核心线程,就需要引入更多的数据结构了,同时也需要引入"定时器"(后面讲)
关于拒绝策略
实现拒绝策略的核心就在于submit这里,
(1)构造方法里面提供参数,用参数来表示哪种拒绝策略,使用成员变量记录一下
(2)submit的时候,先要判断当前任务队列里面的元素是否已经满了(自己设定一个阈值)
(3)如果确实比较多了,就根据刚刚构造时指定的拒绝策略,执行不同的逻辑
a)直接抛出异常
b)直接在submit里面调用Runnable方法
c)删除队列的元素(可以一次删多个)
d)丢弃当前元素,不添加到队列里面