文章目录
- 一、线程池
- 1.1 线程池相关概念
- 1.2 线程池标准类
- 1.3 线程池工厂类
- 1.4 实现自己的线程池
- 二、定时器
- 2.1 java标准库中的定时器使用
- 2.2 实现一个自己的定时器
- 2.2.1 定义任务类
- 2.2.2 定义定时器
一、线程池
1.1 线程池相关概念
池这个概念在计算机中比较常见,常量池、数据库连接池、线程池、进程池、内存池…思想都是类似的。为什么我们要在多线程这里引入线程池,且听我娓娓道来。
并发编程中我们嫌弃进程的创建销毁开销大,于是我们使用线程。但是随着时代的发展,频繁创建销毁线程的开销也越来越明显了,对于此种情况优化的方式之一就是线程池。为什么使用线程池之后能够提升效率,其中的关键点就在于直接创建/销毁线程的操作是内核态配合用户态的工作,而线程池中的线程的创建/销毁只涉及到用户态不需要内核态的配合,因此能够提升效率。这里需要明白一点就是系统内核是不可控的,他要完成的工作很多,所以在用户态和其配合的过程中很可能它去先忙别的工作,从而拖慢逻辑执行的时间,因此单独涉及用户态的操作效率要比用户态和内核态配合的工作效率高。
如果使用线程池,会提前把线程建好,然后将线程保存到用用户态代码编写的数据结构中,后面需要用到线程的时候直接从池子中取,不用就放回去,这个过程完全是用户态,不需要和内核进行交互。
1.2 线程池标准类
java中线程池标准类为ThreadPoolExecutor,参数比较复杂,相对的对线程池的创建的控制就比较精细。参数如下图:
首先看前两个参数,标准库的线程池是这样设定的,把线程分为两类分别为核心线程以及非核心线程。举个例子来说核心线程相当于公司里的老员工,非核心相当于公司里的实习生,平时负责业务的都是老员工,但是当人手不够的时候会让实习生帮忙。标准库的线程池中的corePoolSize以及maxinumPoolSize也是这样类似的思想。corePoolSize表示核心线程,线程池最开始创建时就带有这么多线程,maxinumPoolSize代表最大线程数,线程池有一个方法submit,通过这个方法可以提交任务到线程池让线程池处理,当线程池的任务多到它自己忙不过来时就会创建新的线程来帮助处理任务,新线程和核心线程加起来的数目不能超过这里指定的最大线程数。当任务没那么多的时候线程池就会释放这些建立的新线程,回收只会回收新建立的线程不会回收核心线程,最终的线程数量肯定是大于等于核心线程数量的。
在实际开发中需要设置多少的线程数不仅和你的配置有关,还跟你的程序特点有关。
程序一般分为两种,第一种就是cpu密集型,如下图你代码的逻辑都需要cpu来完成,一旦程序跑起来一下就能占满一个cpu核心,因此在这种情况下你线程数不能超过cpu的逻辑核心的数目。
第二种程序是IO密集型程序,你的代码大部分都是在等待IO。(等待IO时不占用cpu,不被调度)你的代码此时应该考虑的不是cpu而是其它的事情,例如如果你的代码是一个网络程序,那么就需要考虑网卡的带宽。硬盘IO也是类似。
上述两个模型都太理想了,真正开发时一般程序会在IO密集型和cpu密集型之间,此时就需要去写代码实验,从而确定多少的核心线程数的效果最好。
这里的两个参数是配合起来指定非核心线程在空闲时可以存在的时间,非核心线程空闲时不是立即回收,而是等线程池不忙的时候回收。unit是枚举类型指定时间的基本单位,keepAliveTime指定单位时间的数目,总时间就是非核心线程可以存在的时间。unit可以提供的时间单位如下图:
这里的参数指的就是线程池中的任务队列,线程池会提供submit方法让其它线程将任务提交给线程池。线程池中需要队列这样的数据结构,来将任务保留起来,后面线程池中的工作线程就会消费队列中的任务并且执行任务的具体内容。
上图中的参数看名字就知道是一个和工厂模式相关的参数,实际上这里就是标准库提供的用来设置线程池创建的线程的一些属性的工厂类,一般就是配合线程池使用,一般使用线程池标准类时这里可以使用默认的参数。然后工厂模式就是一种设计模式,工厂方法就是将构造函数进行一层包装并返回对象,专门用来包装构造方法的类就是工厂类。
最后一个参数最重要的参数,如下图。这个参数也是一个枚举类型,就是用来表面在向线程池提交任务时,线程池采用的是哪种拒绝策略。
每种拒绝策略对应的参数如下图:
第一个就是当提交任务被线程池拒绝时,要直接抛出异常“罢工”。第二个参数是当线程池拒绝时,将任务交给提交任务的线程去执行。第三个参数是当线程池拒绝提交的任务时,就将任务队列中的最老的任务丢弃,将这个任务加入线程池中的任务队列。第四个参数则是将任务队列中最新的任务给丢弃,将这个任务加入到线程池中的任务队列。
1.3 线程池工厂类
因为java标准库自己也知道ThreadPoolExecutor使用起来比较费劲,所以提供了创建线程池的工厂类Executors,这样使用起来简单很多,但是操作的精细度就不如标准的线程池的类。
如上图都是建立线程池的工厂方法,Executors.newCachedThreadPool()就是创建普通的线程池,根据任务的数目来对线程进行扩容。Executors.newFixedThreadPool(10)创建固定线程数的的线程池,线程数不能增加也不能减少。Executors.newScheduledThreadPool()这个也是创建固定线程数的线程池,但是其中的任务延时执行。Executors.newSingleThreadExecutor()是创建单个线程的线程池。注意这里设置的线程数都是最大线程数而并非是核心线程数。
下图给出工厂类建立线程池并使用的过程:
下图给出使用标准线程池类的过程:
需要注意的一点是线程池中的线程都是前台线程,当main线程执行结束时进程不会结束会等线程池中的线程全都执行完毕。
1.4 实现自己的线程池
我们要实现自己的线程池要考虑哪些东西?
(1)线程池中需要若干个线程。
(2)要有存放任务的队列。
(3)要提供提交任务的submit方法。
因为这里只是实现以下线程池的简单代码帮助理解线程池,所以代码中并未实现拒绝策略以及回收线程等操作,代码如下:
package Thread;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;class MyThreadPool {//注意细节 LinkedBlockingQueue时添加元素会自动扩容导致添加时不会堵塞 只有移除元素时可能会堵塞//ArrayBlocking不会自动扩容 添加和删除元素都有可能堵塞private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);private int threadMaxSize = 0;private List<Thread> list = new ArrayList<>();// 初始化线程池public MyThreadPool(int coreSize, int threadMaxSize) {this.threadMaxSize = threadMaxSize;for (int i = 0; i < coreSize; i++) {// 创建若干个线程Thread t = new Thread(new Runnable() {@Overridepublic void run() {try {while (true) {Runnable runnable = queue.take();runnable.run();}} catch (InterruptedException e) {throw new RuntimeException(e);}}});t.start();list.add(t);}}// 把任务添加到线程池中public void submit(Runnable runnable) throws InterruptedException {// 此处进行判定, 判定说当前任务队列的元素个数, 是否比较长.// 如果队列元素比较长, 说明已有的线程, 不太能处理过来了. 创建新的线程即可.// 如果队列不是很长, 没必要创建新的线程.queue.put(runnable);if (list.size() < threadMaxSize && queue.size() >= 500) {//创建新线程Thread t = new Thread(new Runnable() {@Overridepublic void run() {try {while (true) {Runnable task = queue.take();task.run();}} catch (InterruptedException e) {throw new RuntimeException(e);}}});t.start();list.add(t);}}
}public class Demo38 {public static void main(String[] args) throws InterruptedException {MyThreadPool pool = new MyThreadPool(10, 20);for (int i = 0; i < 10000; i++) {int temp = i;pool.submit(new Runnable() {@Overridepublic void run() {System.out.println("hello" + temp + "," + Thread.currentThread().getName());}});}}}
二、定时器
定时器就是“闹钟”的效果,指定一个时间,再指定一个任务,此时这个任务不会立即执行,而是等时间到达之后再去执行。定时器是日常开发中非常重要的组件。举个例子,短信验证码,验证码只在五分钟内是有效的,发送验证码时会将验证码保存起来。设定定时器,会在五分钟延时之后执行删除验证码的逻辑。定时器在未来开发中非常重要,被封装成服务器供整个分布式系统使用。
2.1 java标准库中的定时器使用
通过timer对象的schedule方法指定要执行的任务以及要延时的时间,其中的TimerTask就是类似于Runnable的抽象类,执行效果如下:
以上结果符合延时的效果,但是执行完进程并未结束,应该是因为定时器中的线程是前台线程。
2.2 实现一个自己的定时器
编写代码首先要确定两个需求,写一个定时器:
(1)能够延时执行任务/指定时间执行任务。
(2)能够管理多个任务。
2.2.1 定义任务类
模仿标准库定时器的模式,我们也定义一个任务类来表示要执行的任务。在定时器的实现中我们在定义的任务中设定任务需要执行的绝对时间,为了后续代码执行时可以方便的判断,该任务是否应该执行。如果保存相对时间的话就比较麻烦。这里可以举一个例子。就比如说领导叫你去汇报工作,第一种说法是叫你三十分钟后去,这种情况你还要根据当前的时间进行换算。第二种说法是直接叫你五点半去,就不需要换算了。
任务类代码编写如上图,实际上就是包装了Runnable接口,然后在类里面通过设定的参数确定了任务要执行时的时间,然后还重写了一个为了方便给后续优先级任务队列按时间从小到大排序的Comparable接口的compareTo方法。
2.2.2 定义定时器
定时器编写首先要关注的就是保存任务的数据结构,我们使用按时间排序的优先级队列,这样出队的队列就是最早的任务,其它任务都未到时间。如果不这样写数据结构的话,那么后续执行任务的线程就要通过循环来不断遍历这里的数据结构来找到满足执行时间的任务。
定时器中还需要线程去执行任务,在类的构造方法中来创建新的线程去执行队列中的任务,线程会先拿出队首任务,判断是否满足时间,如果满足则执行不满足则等待相差的时间之后再执行,这个过程是通过while循环实现的,另外当队列为空时线程也要等待,因为此时没有任务需要执行。
然后是schedule方法的编写,传入任务以及相对时间后建立任务对象然后将任务对象添加到队列中,还要对进入WAITING状态的线程进行一对一唤醒。对于陷入WAITING状态的线程有两种可能,第一种就是队列为空,此时添加了新任务到队列中当然应该唤醒。第二种就是新加入了任务,要唤醒线程进行一次判断有没有符合执行时间的任务,因为新添加的那个任务可能就是符合执行时间的任务。
注意:因为这里使用的队列不是线程安全的,并且有两个线程在修改队列,一个是使用schedule方法的线程,另一个是定时器构造方法中的线程,所以就像图中代码写的要给定时器构造方法以及schedule方法加锁。为什么不直接用PriorityBlockingQueue这样的队列?因为使用它只能处理队列为空的阻塞,对于一些执行时间相关的阻塞也无能为力,你要使用wait还要在外面写个锁,阻塞队列内部还有锁,两个锁很容易发生死锁,为了便于理解及编写就直接使用了普通的优先级队列。
在这里还是补一下定时器的代码:
package Thread;import java.util.PriorityQueue;class MyTimerTask implements Comparable<MyTimerTask> {private Runnable runnable;//这里的time是绝对时间private long time;public MyTimerTask(Runnable runnable, long delay) {this.runnable = runnable;//绝对时间,当前时间加上需要延迟的时间长度this.time = System.currentTimeMillis() + delay;}public void run() {runnable.run();}public long getTime() {return time;}@Overridepublic int compareTo(MyTimerTask o) {return (int) (this.time - o.time);}
}class MyTimer {//优先级队列存储任务可以根据时间的前后来输出从而避免让线程去遍历PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();public MyTimer() {Thread t = new Thread(() -> {try {while (true) {//因为这里的线程修改任务队列以及schedule方法的线程也会修改队列就会出现线程安全问题,故而加锁synchronized (this) {//当任务队列为空的时候,线程先等着任务加入再执行if (queue.size() == 0) {this.wait();}MyTimerTask task = queue.peek();long curTime = System.currentTimeMillis();//达到任务开始的时间直接执行并在队列中删除任务if (curTime >= task.getTime()) {task.run();queue.poll();} else {//未达到任务开始的时间线程等待从而避免执行多次循环占满cputhis.wait(task.getTime()-curTime);}}}} catch (InterruptedException e) {throw new RuntimeException(e);}});t.start();}public void schedule(Runnable runnable, Long delay) {//两个线程修改队列故而加锁synchronized (this) {MyTimerTask task = new MyTimerTask(runnable, delay);queue.offer(task);//加入新任务后要唤醒线程this.notify();}}}public class Demo40 {public static void main(String[] args) {MyTimer myTimer = new MyTimer();myTimer.schedule(new Runnable() {@Overridepublic void run() {System.out.println(3000);}}, 3000L);myTimer.schedule(new Runnable() {@Overridepublic void run() {System.out.println(2000);}}, 2000L);myTimer.schedule(new Runnable() {@Overridepublic void run() {System.out.println(1000);}}, 1000L);}}