多线程案例
- 一、设计模式(单例模式+工厂模式)
- 1、单例模式
- 2、工厂模式
- 二、阻塞式队列
- 1、生产者消费者模型
- 2、阻塞对列在生产者消费者之间的作用
- 3、用标准库阻塞队列实现生产者消费者模型
- 4、模拟实现阻塞队列
- 三、定时器
- 1、标准库中的定时器
- 2、模拟实现定时器
- 四、线程池
- 1、线程池概述
- 2、ThreadPoolExecutor 参数
- 3、模拟实现线程池
- 4、创建线程池的两种方式
- 5、拓展:实际开发中应该如何确定线程池中线程的数量?
一、设计模式(单例模式+工厂模式)
设计模式就是软件开发中的“棋谱”,软件开发中也有很多常见的 “问题场景”。针对这些问题场景,大佬们总结出了一些固定的套路。按照这些套路来实现代码可能不会很好,但至少不会很差。当前阶段我们需要掌握两种设计模式:(1)单例模式 (2)工厂模式
1、单例模式
概念/特征:单例模式能保证某个类在程序中只存在唯一一份实例, 而不会创建出多个实例。
Java中实现单例模式的方式有很多种,但他们本质上都是一样的,这里主要介绍两种,即 饿汉模式 和 懒汉模式。
在计算机中“懒”往往是褒义词,很多时候并不是实质意义上的懒,而是凸显一种“从容”。而“饿”凸显一种“急迫”。
比如计算机读取硬盘文件并显示,这个场景下对于饿汉模式:会把文件的所有内容全部都读取到内存中,一并显示。这种模式下,如果文件很大,可能导致内存不足或显示卡顿等问题。反而对于懒汉模式:不会一次读取完毕,而是每次只读取一部分,先将当前屏幕填充上,如果后续翻页在继续读取文件内容,这种模式下就大大提高了效率以及用户体验。
Java 中的多线程下的单例模式,可以借助 Java 语法,保证某个类,只能创建出一个实例,而不能 new 多次,具体实现如下:
(1)饿汉模式
代码实现:
// 饿汉模式实现单例
class Singleton {// 唯一实例的本体private static Singleton instance = new Singleton();// 获取到实例的方法public static Singleton getInstance() {return instance;}// 禁止外部 new 实例(将构造方法私有化:类内可以使用,类外不能使用)private Singleton() { }
}
public class Test {public static void main(String[] args) {// 使用Instance instance = Instance.getInstance();//由于设置了私有的构造方法,所以这样写会报错//Instance instance1 = new Instance();}
}
说明:
- private static Singleton instance = new Singleton();这里被
static
修饰,该属性是类的属性,在 jvm 中类的属性只有唯一一份,因此类对象性里这个成员自然也是唯一的了。- 为了防止类 Singleton 可以继续 new 对象,这里需要将构造方法私有化,就可以实现此处在类内部把实例创建好,同时禁止外部重新创建实例。这里需要特别注意的是:这里的
private
虽然将构造方法私有化了,但是还是能保证在类内部随便使用滴!- 由于当前变量 instance 是静态的,是在类的加载阶段就完成了赋值,因此在多线程下通过调用静态方法getInstance 获取唯一实例,只是读操作,本身就是线程安全的。
(2)懒汉模式
代码实现:
// 懒汉模式实现单例
class SingletonLazy {volatile private static SingletonLazy instance = null;public static SingletonLazy getInstance() {// 这个条件, 判定是否要加锁. 如果对象已经有了, 就不必加锁了, 此时本身就是线程安全的.if (instance == null) {synchronized (SingletonLazy.class) {if (instance == null) {instance = new SingletonLazy();}}}return instance;}// 构造方法私有化private SingletonLazy() { }
}
public class Test {public static void main(String[] args) {// 使用Instance2 I1 = Instance2.getInstance2();}
}
我们上面提到懒汉模式实现单例,在后续多线程使用单例时只涉及到“读”操作,因此本身是线程安全的。但是对于饿汉模式实现单例,由于是非必要不new对象,只有在使用到的时候才创建单例,因此就涉及到了读和写。在多线程下,这种模式就可能产生线程安全问题,因此可以在上述代码中看到一些保证线程安全的逻辑,下面就来详细说明一下,上述代码中使用到的保证线程安全的代码逻辑:
(1)synchronized 加锁保证 if 判断和 new
是一个原子操作
在代码不加锁的情况下,由于线程的随机调度,每个线程很可能在 if 判断之后就切走了,这就很可能导致创建出多个实例,且不说这样会创建出多个实例,不满足单例模式。我们知道实际上对象是需要占用内存空间的,如果每个对象都占用非常大的内存空间,那么N个线程就有可能会创建N个对象,这时程序就可能吃不消了。
(2)优化:使用双层 if 判断
上述加锁后的代码如下:
synchronized (SingletonLazy.class) {if (instance == null) {instance = new SingletonLazy();}
}
return instance;
虽然通过加锁保证了if判断和new的原子性,但是这种加锁还是存在缺陷,由于if判断操作放到了synchronized锁的内部,因此在任何时候,无论是否已经创建好了单例,调用getInstance都会触发锁竞争,造成线程阻塞,而且加锁是一个开销比较大的操作,反复加锁会降低程序执行效率。
经过分析我们很容易得出,上述所说的线程不安全,只出现在首次创建对象时,一旦对象new好啦,后续调用getInstance,就只是单纯的读操作,直接ruturn instance即可,就没必要加锁了。因此我们可以增加一层if条件判断:instance == null
if (instance == null) {synchronized (SingletonLazy.class) {if (instance == null) {instance = new SingletonLazy();}}
}
return instance;
如果我们在加锁的外层,设置了加锁条件 instance==null
,即在未创建实例时才进行加锁,一旦创建好了实例,后续线程遇到条件 instance==null 为false ,就直接返回创建好的单例,不会再次加锁。此时这个程序在多线程下就避免了多次不必要加锁,降低了程序开销。
(3)使用volatile 防止指令重排
volatile private static SingletonLazy instance = null;
如果不使用volatile关键字,那么在创建新对象时,会出现重排序(对象创建过程中的变量赋值、引用关系建立等操作的顺序被调整),这可能会导致某个线程看到了对象引用的非空值,但是实际上该对象还没有完全初始化完成。这会导致程序出现不可预期的行为和错误。
使用volatile关键字可以禁止指令重排序,保证instance在任何时刻都是唯一且符合预期的,从而避免了对象创建过程中出现的线程安全问题。
单例模式线程安全问题
- 饿汉模式:天然是安全的,只读操作
- 懒汉模式:线程不安全,有读有写
(1)加锁,把 if 和 new 变成原子操作
(2)双层 if ,减少不必要的加锁操作
(3)使用 volatile 禁止指令重排序,保证后续线程肯定拿到的是完整对象。
2、工厂模式
工场模式用来填补构造方法的缺陷的:因为构造方法要是想要实现多种不同角度的构造,只能依赖方法重载,而方法重载有些场景下受语法限制并不是很友好。
例如想要表示平面上的点,有两种表示方式,一种是平面坐标,一种是极坐标:
class Point { public Point(double x , double y) {}public Point(double r,double a) {}
}
发现如果通过构造方法的重载,由于上述两种方法完全一样,不能构成重载,因此引入工厂模式(新建一个工厂类对Point类进行封装):
class PointBuild {public static Point planar(double x,double y){...}public static Point polar(double r,double a){...}
}
此时由于方法名不同,因此解决了上述不能表示两种坐标的问题。
二、阻塞式队列
阻塞队列,即带有阻塞的队列,满足队列的基本性质:先进先出。并且具有以下特性:
- 当队列满的时候,继续入队列就会阻塞,直到有其他线程从队列中取走元素。
- 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插入元素。
- 基于
1
、2
阻塞队列能是一种线程安全的数据结构
在写多线程代码的时候,多个线程之间进行数据交互,可以使用阻塞队列简化代码编写。更重要的是,它有一个典型应用场景就是 “生产者消费者模型”。 这是一种非常典型的开发模型。
1、生产者消费者模型
生产者和消费者之间,交互数据,据需要用到一个交易场所,这个交易场所就是“阻塞队列”。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。
餐厅中的服务员与厨师:
在餐厅中,服务员与厨师就是一个生产者消费者模型的典型实例。服务员是消费者,负责从顾客处获取点餐信息,并将其传递给厨师。厨师是生产者,负责根据服务员提供的点餐信息制作食物,并将食物传递给服务员,并由服务员将食物送到顾客处。
在整个过程中,服务员与厨师之间通过一个共享的点餐单
来协调工作,服务员负责消费顾客的点餐需求,将其加入点餐单中,而厨师则负责生产食物,按照点餐单上的需求进行制作并将制作好的食物放在柜台上待服务员取走,这个点餐单即是类似于一个阻塞队列。
2、阻塞对列在生产者消费者之间的作用
(1)阻塞队列能使生产者和消费者之间 解耦合。
我们写代码,一般要求是“高内聚,低耦合”,所谓耦合,是指两个模块之间的关联程度。关联强就是高耦合,关联程度低就叫低耦合。写代码时追求低耦合,避免牵一发而动全身。内聚,是指一个模块内部各成分之间相关联程度,低内聚就是相关联的代码没有放到一起,杂乱无章。高内聚,相关联的代码,分门别类的规制起来。
在生产者消费者模型中,阻塞队列就可以降低使生产者和消费者之间的耦合度。例如当下有A、B、C三个服务器:其中A是入口服务器(接受处理简单服务,不容易挂),B、C是业务服务器(处理复杂业务,容易挂)
(2)阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,起到削峰填谷的作用。
比如在 "秒杀"场景下,服务器同一时刻可能会收到大量的支付请求。如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程)。这个时候就可以把这些请求都放到一个阻塞队列中(阻塞队列没有业务,代码稳定,不容易挂),然后再由消费者线程从阻塞队列里获取请求,按正常的速率来处理每个支付请求。这样做可以有效进行
“削峰”, 防止服务器被突然到来的一波请求直接冲垮。
假如“秒杀”场景过后,流量下降,达到谷值,消费者线程仍然可以按照原有的速率从阻塞队列中获取并处理之前挤压的请求,使整个处理过程更平稳。
3、用标准库阻塞队列实现生产者消费者模型
在 Java 标准库中内置了阻塞队列。如果我们需要在一些程序中使用阻塞队列,直接使用标准库中的即可:
BlockingQueue
是一个接口,真正实现的类是 LinkedBlockingQueue。put
方法用于阻塞式的入队列,take
用于阻塞式的出队列。- BlockingQueue 也有
offer
,poll
,peek
等方法, 但是这些方法不带有阻塞特性。
public static void main(String[] args) {// 使用Java库中的阻塞队列:BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();//消费者Thread t1 = new Thread(()->{while (true) {try {int value = blockingQueue.take();System.out.println("消费元素:"+value);} catch (InterruptedException e) {e.printStackTrace();}}});//生产者Thread t2 = new Thread(()->{int value = 0;while (true) {try {blockingQueue.put(value);System.out.println("生产元素:"+value);value++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();t2.start();// 上述代码, 让生产者, 每隔 1s 生产一个元素。// 让消费者则直接消费, 不受限制。// 所以可能会看到,生产者生产1个元素就消费者就消费1个元素,消费完就阻塞,等待1s生产出新的元素后再唤醒消费者。}
4、模拟实现阻塞队列
- 实现一个普通队列
- 加上线程安全
- 加上阻塞功能
public class MyBlockingQueue {private int[] items = new int[1000];// 约定 [head, tail) 队列 的有效元素volatile private int head = 0;volatile private int tail = 0;volatile private int size = 0;// 入队列synchronized public void put(int value) throws InterruptedException {while (size == items.length) {// 队列为满,阻塞等待this.wait();}// 以下是不为满的情况:// 插入元素items[tail] = value;tail++;// 判断 tail 大小,如果 tail 达到末尾,需要从头开始if (tail == items.length) {tail = 0;}size++;// 生产元素后就唤醒阻塞this.notify();}//出队列synchronized public int take() throws InterruptedException {while (size == 0) {// 队列为空,阻塞等待this.wait();}// 以下是不为空的情况:// 取元素int value = items[head];head++;// 如果 head 达到末尾,需要从头开始if (head == items.length) {head = 0;}size--;// 消费元素后就唤醒阻塞this.notify();return value;}}
代码说明:
- 上述代码底层使用循环队列,我们知道判断循环对队满、队空有两种方式:(1)舍弃一个空间(2)单独使用一个变量记录有效元素的个数。上述代码使用了记录有效元素个数的方式。
- 在处理 tail 或 head 达到了数组最大长度,此时采用 tail = 0; 而不使用 tail = tail % items.length;主要是因为后者相对来说开发效率(可读性、可维护性)和执行效率都不高,不建议使用。
- 上述
put
和take
操作均涉及数据的修改,因此为保证多线程安全,选择直接在方法上加锁。同时为了防止出现多线程下出现内存可见性、指令重拍序问题,在变量上添加了volatile修饰。- Java官方是不建议使用
wait
的,因为 wait 时有可能被其他方法(如interrupt)给中断。此时 wait 等待的条件可能还未成熟就被提前唤醒了,继续向下执行就可能出现各种问题。因此较稳妥的办法是,将 if 替换为 while,这样就可以保证,即使提前中断还是会在判断一下是否满足条件,满足条件就向下执行,否则继续 wait。
个人观点:阻塞队列不一定能提高执行效率,但是能保证并发。
三、定时器
定时器也是软件开发中的一个重要组件,类似于一个 “闹钟”。达到一个设定的时间之后,就执行某个指定好的任务。定时器是一种实际开发中非常常用的组件,比如网络通信中,如果对方 500ms 内没有返回数据,则断开连接尝试重连。
1、标准库中的定时器
- 标准库中提供了一个
Timer
类,表示定时器。- Timer 类的核心方法为
schedule
。用来为定时器安排任务。- schedule 包含两个参数,第一个参数指定即将要执行的任务
TimerTask
, 第二个参数指定多长时间之后执行 (单位为毫秒)。
TimerTask本质上是一个实现了Runnable的抽象类,需要重写run方法。
public abstract class TimerTask implements Runnable {...}
简单使用Timer定时器:
public class Test_Official_Timer {public static void main(String[] args) {Timer timer = new Timer();// 这里的TimerTask就相当于Runnabletimer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("world1");}},1000);System.out.println("hello!");}
}
运行之后发现线程未结束,原因是因为Timer里面内置了线程,并且是前台线程会阻止进程结束。并且定时任务的执行是靠,Timer内部的线程在时间到了之后执行的。
2、模拟实现定时器
定时器内部可以管理很多个任务,虽然任务很多,但是它们的触发时间是不同的,因此只需要一个或一组工作线程,每次找到这些任务中最先到达时间的任务,先执行最早的任务,再执行第二早的任务……
显然,当前场景下需要一个带有优先级的队列来管理这么多的任务,我们可以使用Java库提供的优先即队列 PriorityQueue
,这也是定时器所需要的核心数据结构。
最终实现代码:
import java.util.PriorityQueue;// MyTask表示一个任务
class MyTask implements Comparable<MyTask> {public Runnable runnable;public long time;// 任务构造public MyTask(Runnable runnable, long delay) {this.runnable = runnable;// 取当前时刻的时间戳 + delay,为了方便后续判定,使用绝对的时间戳.this.time = System.currentTimeMillis() + delay;}// 比较方式@Overridepublic int compareTo(MyTask o) {// [任务按照时间从小到大排序]这样的写法意味着每次取出的是时间最小的元素.return (int)(this.time - o.time);}
}class MyTimer {// 优先级队列是核心数据结构private PriorityQueue<MyTask> queue = new PriorityQueue<>();// 创建一个锁对象private Object locker = new Object();// 添加任务public void schedule(Runnable runnable, long delay) {// 根据参数, 构造 MyTask, 插入队列即可.synchronized (locker) {MyTask myTask = new MyTask(runnable, delay);queue.offer(myTask);locker.notify();}}// 在这里构造工作线程, 负责执行具体任务public MyTimer() {Thread work = new Thread(() -> {while (true) {try {synchronized (locker) {// 队列为空阻塞等待while (queue.isEmpty()) {locker.wait();}MyTask myTask = queue.peek();long curTime = System.currentTimeMillis();if (curTime >= myTask.time) {// 时间到了, 可以执行任务了queue.poll();myTask.runnable.run();} else {// 时间还没到,阻塞等待locker.wait(myTask.time - curTime);}}} catch (InterruptedException e) {e.printStackTrace();}}});// 执行工作线程work.start();}
}
代码说明:
- 由于上述使用了优先级队列,所以需要为
PriorityQueue
里的数据元素 MyTask 添加比较方式。- 代码中的
wait/notify
具有重要作用:
(1)wait解决了忙等的问题。如果当前队列最早任务时间未到,不使用wait,就会导致 while (true) 转的太快了,造成无意义的 CPU 浪费。例如当前队列中最早任务设定的是 1 min 之后执行某个逻辑。但是这里的 while (true) 会导致每秒钟访问队首元素几万次,而当前距离任务执行的时间还有很久。
(2)在 Timer 的 schedule 方法中,每次有新任务到来的时候就使用 notify 唤醒一下 worker 线程,因为新插入的任务可能是需要马上执行的。例如当前队列中最早任务执行时间是10:30,当前时间为10:00,因此 wait 等待30分钟,如果在等待过程中,队列中添加了一个新的任务是10:10分执行,那么就需要唤醒 wait ,工作线程重新取队首元素进行比较。- 上述代码中 synchronized 锁位置是经过深思熟虑的,不可乱加。
为什么不使用 PriorityBlockingQueue?
Java库还为我们提供了一个带有阻塞的优先级队列,这个队列本身就是线程安全的,但是为什么不直接使用PriorityBlockingQueue,而是使用 PriorityQueue 再手动添加 wait?其实博主也是在这里踩过坑,之前在实现阻塞队列版本的定时器后,由于它自带阻塞,在加上定时器需要 wait 本身需要加锁,因此很容易就形成了死锁。总之就是带有阻塞版本的定时器不太好控制,不如使用普通优先级队列再手动 wait 更稳健。
四、线程池
1、线程池概述
线程的创建虽然比进程轻量,但是在频繁创建的情况下,开销也是不可忽略的。而使用线程池可以通过事先准备好一定数量的线程,并让它们处于等待状态,避免了频繁地创建和销毁线程带来的开销。在任务到达时,线程池中的线程可以立即响应并执行任务,避免了因线程创建和启动所产生的延迟。同时,在任务执行结束后,线程也不是马上被销毁,而是重新加入到线程池中,等待下一次任务的到来。因此,线程池可以大大降低线程的创建和销毁销,提高系统的性能和稳定性。
线程池最大的好处就是减少每次启动、销毁线程的损耗
为什么在线程池里取线程比直接创建线程更高效?
从线程池拿线程,纯粹的用户态操作。从系统创建线程,涉及到用户态和内核态之间的切换。而纯用户态操作,时间是可控的,涉及到内核态操作时间就不太可控了。
在Java标准库中提供了现成的线程池 ExecutorService
。例如创建1个内含10个线程的线程池:
ExecutorService pool = Executors.newFixedThreadPool(10);
此处并非直接 new ExecutorService 对象,而是通过 Executors 类里面的静态方法完成对对象的构造。这里其实就使用到了“工厂模式”,Executors 类就相当于一个工厂类,本质上是对 ThreadPoolExecutor 类的封装。
2、ThreadPoolExecutor 参数
ThreadPoolExecutor 提供了更多的可选参数,可以进一步细化线程池行为的设定,我们可以找到Java官方文档,在 java.util.concurrent包下,找到 ThreadPoolExecutor 的构造方法:
下面详细讲解一下构造方法中的这些参数含义:
- corePoolSize-核心线程数
- maximumPoolSize-最大线程数(核心线程数+临时线程数)
- KeepAliveTime-临时线程保持存活的时间
- unit-单位s、ms、分钟
- workQueue-线程池要管理很多任务,这些任务是通过阻塞队列来组织的,submit 就是将任务放到队列
- threadFactory-工厂模式,创建线程的辅助类
注:如果当前任务比较多,系统会创建一些临时线程,如果当前任务比较少,比较空闲,线程池会把多出来的临时工线程销毁掉。当比较空闲时,临时线程不会马上被销毁,而是有一定的存活时间。
除了以上六个参数外,还有一个 RejectedExecutionHandler 类型的参数,这个参数表示线程池的拒绝策略,下面我们详细介绍一下 RejectedExecutionHandler:
注:线程池的拒绝策略(RejectedExecutionHandler)当线程池中的任务队列已满并且所有线程都在执行任务时,再有新的任务请求到达时,该如何处理这个请求的策略。拒绝策略并不是拒绝线程,而是拒绝任务(线程池并不希望满了阻塞,空了阻塞就行)。
(1)
AbortPolicy
: 超过负荷, 直接抛出异常
(2)CallerRunsPolicy
: 调用者负责处理。即谁添加的谁去执行
(3)DiscardOldestPolicy
:丢弃队列中最老的任务
(4)DiscardPolicy
:丢弃新来的任务
3、模拟实现线程池
public class MyThreadPool {// 管理任务的阻塞队列(本身就是多线程安全)private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();// 添加任务方法public void submit(Runnable runnable) throws InterruptedException {queue.put(runnable);}// 实现一个固定线程个数的线程池public MyThreadPool(int n) {for (int i = 0; i < n; i++) {Thread t = new Thread(()->{while (true) {try {Runnable runnable = queue.take();runnable.run();} catch (InterruptedException e) {e.printStackTrace();}}});// 启动线程t.start();}}
}
测试:构造 10 个线程 ,执行 1000 个任务。
public class TestMyThreadPool {public static void main(String[] args) throws InterruptedException {MyThreadPool pool = new MyThreadPool(10);for (int i = 0; i < 1000; i++) {int number = i;pool.submit(new Runnable() {@Overridepublic void run() {System.out.println("pool"+number);}});}}
}
注:上述测试类中,打印的是 number 而不是 i,使用 i 会出现语法错误,因为匿名内部类同 lambda 一样具有变量捕获,只能捕获 final 或 实际 final(整个过程中没有修改)。
4、创建线程池的两种方式
- 通过 Executors 工厂类创建,创建方式比较简单,但是定制能力有限。
- 通过 ThreadPoolExecutor 创建,创建方式比较复杂,但是定制能力强。
5、拓展:实际开发中应该如何确定线程池中线程的数量?
首先不同的程序,线程做的工作不同:
- CPU密集型任务,主要做一些计算机工作,要在CPU上运行
- IO密集型任务,主要是等待IO操作(等待读写硬盘,读写显卡等)不是很占用CPU
极端情况:
- 如果你的线程全是使用CPU,线程数就不应该超过CPU核心数(逻辑核心数)
- 如果你的线程全是使用IO,线程数就可以设置成很多,可以远远超过cup核心数
在实践中如何设置?
实践中很少有这么极端的情况,具体要通过测试的方式来确定,通过测试取一个执行效率和占用资源最优的线程数量。