文章目录
- 🌴线程池的概念
- 🎄标准库中的线程池
- 🍀ThreadPoolExecutor 类
- 🚩corePoolSize与maximumPoolSize
- 🚩keepAliveTime
- 🚩ThreadFactory
- 🚩workQueue
- 🚩RejectedExecutionHandler handler
- 🎍模拟实现线程池
- ⭕总结
🌴线程池的概念
线程池,是一种线程的使用模式,它为了降低线程使用中频繁的创建和销毁所带来的资源消耗与代价。
通过创建一定数量的线程,让他们时刻准备就绪等待新任务的到达,而任务执行结束之后再重新回来继续待命。
想象这么一个场景:
在学校附近新开了一家快递店,老板很精明,想到一个与众不同的办法来经营。店里没有雇人,而是每次有业务来了,就现场找一名同学过来把快递送了,然后解雇同学。这个类比我们平时来一个任务,起一个线程进行处理的模式。
很快老板发现问题来了,每次招聘 + 解雇同学的成本还是非常高的。老板还是很善于变通的,知道了为什么大家都要雇人了,所以指定了一个指标,公司业务人员会扩张到 3 个人,但还是随着业务逐步雇人。于是再有业务来了,老板就看,如果现在公司还没 3 个人,就雇一个人去送快递,否则只是把业务放到一个本本上,等着 3 个快递人员空闲的时候去处理。这个就是我们要带出的线程池的模式
线程池最核心的设计思路:复用线程,平摊线程的创建与销毁的开销代价
相比于来一个任务创建一个线程的方式,使用线程池的优势体现在如下几点:
- 避免了线程的重复创建与开销带来的资源消耗代价
- 提升了任务响应速度,任务来了直接选一个线程执行而无需等待线程的创建
- 线程的统一分配和管理,也方便统一的监控和调优
提升了响应速度是怎么体现的呢?
这里博主和大家详细说一下:
比如你去银行取钱
有两种方法:
方法一,我们自己可以在银行的取款机自己取,注意这时候我们是自主的,就像程序里的“用户态”。用户态执行的是程序员自己的代码,我想干嘛就干嘛,想在取款机里取钱、存钱、查询余额等都在我的掌控范围内
方法二,我们去柜台取钱,我们不能进入银行,只能交给柜员,让他执行你给的命令,间接完成,就像程序里的“内核态”。此时你你给银行内部人员发去了取钱的命令,注意此时她是立马给你取钱吗?她可能给同事闲聊几句、可能喝口水、锤锤背、或者领导叫她,这时候就会耽搁,你就只能等着,非常的被动,办理时间也变长了
此时呢,我们的线程池就像方法一,我们可以利用已经存在的线程自己完成操作,而不是去重新创建。
🎄标准库中的线程池
Executors 创建方式有以下几个
newFixedThreadPool | 创建固定线程数的线程池 |
---|---|
newCachedThreadPool | 创建线程数目动态增长的线程池. |
newSingleThreadExecutor | 创建只包含单个线程的线程池 |
newScheduledThreadPool | 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer |
注意上述方法是有返回值的
返回值类型为 ExecutorService
使用实例如下:
-
使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池.
-
返回值类型为 ExecutorService
-
通过 ExecutorService.submit 可以注册一个任务到线程池中
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class TestDemo1 {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(10);for(int i = 1; i < 100;i++) {int n = i;pool.submit(new Runnable() {@Overridepublic void run() {System.out.println("任务"+n);}});}}
}
结果展示:
🍀ThreadPoolExecutor 类
Executors 本质上是 ThreadPoolExecutor 类的封装.
我们也可以去
Java文档
进行查找
我们在图示区域找到java.util.concurrent包
这个包里面放的很多都与并法编程相关的类
接下来我们点击并找到ThreadPoolExecutor类
我们对这个类往下翻我们就可以看到它提供了一些构造方法
我们可以就看到ThreadPoolExecutor 提供了更多的可选参数, 可以进一步细化线程池行为的设定,接下来博主带大家看一下这些参数代表的意义(这里第四个构造方法参数最多,所以这里讲第四个构造方法的参数)
🚩corePoolSize与maximumPoolSize
-
corePoolSize代表的是核心线程数
-
maximumPoolSize代表的最大线程数
corePoolSize 指的是核心线程数,线程池初始化时线程数默认为 0,当有新的任务提交后,会创建新线程执行任务,如果不做特殊设置,此后线程数通常不会再小于 corePoolSize ,因为它们是核心线程,即便未来可能没有可执行的任务也不会被销毁。
随着任务量的增加,在任务队列满了之后,线程池会进一步创建新线程,最多可以达到 maximumPoolSize 来应对任务多的场景,如果未来线程有空闲,大于 corePoolSize 的线程会被合理回收。所以正常情况下,线程池中的线程数量会处在 corePoolSize 与 maximumPoolSize 的闭区间内。
就好比一个公司有很多员工,有正式员工,还有零时工
这两者的区别在于:
-
正式员工是一定会存在的,而零时工是可以随时被辞退的
-
公司忙的时候,需要员工就招募一些,活儿少了之后一些零时工可能就会被辞退
整体的策略为:正式员工打底,零时工动态调节
接下来我们讨论一个问题,实际开法当中,线程池中的线程数应该设置为多少合适?
答案为:不同的程序特点不同,此时设置的线程数也是不同的
这是为什么呢?
这里我们考虑两个极端情况:
-
CPU密集型:每个线程要执行的任务都是通过狂转CPU来实现,此时的线程数最多不超过CPU的核数,此时,你设置的再大也没有用。
-
IO密集型:每个线程干的工作就是等待IO(读写硬盘、读写网卡、等待用户输入·······),不吃CPU,此时这样的线程处于阻塞状态,不参与CPU调度,这时候多搞一些线程都无所谓,不再受制于CPU核数。理论上来说,可以设成无穷大(实际上当然是不行的)
然而,咱们再实际开放当中并没有程序符合这两种理想模型
真实的程序,往往一部分要吃CPU,一部分要等待IO
具体这个程序,几成工作量吃CPU,几成工作量等待IO,我们并不确定
那么我们实际开发中应该子怎么设置呢?
设置后进行测试,选择一个最优的选择
🚩keepAliveTime
当线程池中线程数量多于核心线程数时,而此时又没有任务可做,线程池就会检测线程的 keepAliveTime,如果超过规定的时间,无事可做的线程就会被销毁,以便减少内存的占用和资源消耗。
如果后期任务又多了起来,线程池也会根据规则重新创建线程,所以这是一个可伸缩的过程,比较灵活,我们也可以用setKeepAliveTime 方法动态改变 keepAliveTime 的参数值。
就相当于零时工如果闲下来太久,就会被辞退。setKeepAliveTime 可以设置空闲的时间
🚩ThreadFactory
ThreadFactory 实际上是一个线程工厂,它的作用是生产线程以便执行任务。我们可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程,我们也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名
🚩workQueue
workQueue是线程池的任务队列,作为一种缓冲机制,线程池会把当下没有处理的任务放入任务队列中,由于多线程同时从任务队列中获取任务是并发场景,此时就需要任务队列满足线程安全的要求,所以线程池中任务队列采用
BlockingQueue 来保障线程安全。常用的队列主要有以下几种:
-
LinkedBlockingQueue
LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
这个队列需要注意的是,虽然通常称其为一个无界队列,但是可以人为指定队列大小,而且由于其用于记录队列大小的参数是int类型字段,所以通常意义上的无界其实就是队列长度为 Integer.MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer.MAX_VALUE。 -
SynchronousQueue
SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。拥有公平(FIFO)和非公平(LIFO)策略,使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。 -
ArrayBlockingQueue
ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。 -
DelayedWorkQueue
DelayedWorkQueue 的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。之所以线程池 ScheduledThreadPool 和 SingleThreadScheduledExecutor 选择 DelayedWorkQueue,是因为它们本身正是基于时间执行任务的,而延迟队列正好可以把任务按时间进行排序,方便任务的执行。
🚩RejectedExecutionHandler handler
在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会有问题,那么这些溢出的任务,ThreadPoolExecutor为我们提供了拒绝任务的处理方式,以便在必要的时候按照我们的策略来拒绝任务
线程池拒绝任务的时机有以下两种:
-
第一种情况是当我们调用 shutdown 等方法关闭线程池后,即便此时可能线程池内部依然有没执行完的任务正在执行,但是由于线程池已经关闭,此时如果再向线程池内提交任务,就会遭到拒绝。
-
第二种情况是线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。
线程池任务拒绝策略实现了 RejectedExecutionHandler 接口,JDK 中自带了四种任务拒绝策略。分别是AbortPolicy、DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy。其中AbortPolicy是ThreadPoolExecutor默认使用。
那他们分别代表是策略内容是什么呢?
-
AbortPolicy(默认)
这种拒绝策略在拒绝任务时,会直接抛出一个类RejectedExecutionException 的RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略。 -
DiscardPolicy
这种拒绝策略正如它的名字所描述的一样,当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。 -
DiscardOldestPolicy
如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险。 -
CallerRunsPolicy
相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处:
🎈第一点新提交的任务不会被丢弃,这样也就不会造成业务损失。
🎈第二点好处是,由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。
🎍模拟实现线程池
接下来我们简单模拟实现一个简单的线程池
-
创建MyThreadPool实现我们的线程池
-
使用阻塞队列组织所有任务
-
构造方法里创建相应大小的线程数
-
提供一个submit方法使用线程池里面的线程
代码实现如下:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class MyThreadPool {// 此处不涉及到 "时间" , 此处只有任务, 就直接使用 Runnable 即可~~private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();// n 表示线程的数量public MyThreadPool(int n) {// 在这里创建出线程.for (int i = 0; i < n; i++) {Thread t = new Thread(() -> {while (true) {try {Runnable runnable = queue.take();runnable.run();} catch (InterruptedException e) {e.printStackTrace();}}});t.start();}}// 注册任务给线程池.public void submit(Runnable runnable) {try {queue.put(runnable);} catch (InterruptedException e) {e.printStackTrace();}}
}
测试代码:
public class TestDemo2 {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(10);for(int i = 1; i < 100;i++) {int n = i;myThreadPool.submit(new Runnable() {@Overridepublic void run() {System.out.println("任务"+n);}});}}
}
测试结果:
⭕总结
关于《【JavaEE初阶】 线程池详解与实现》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下!