目录
- Day 10:多线程(8)
- 单例模式
- 阻塞队列
- 1. 生产者消费者模型
- 1.1 生产者消费者模型解耦合
- 1.2 生产者消费者模型削峰填谷
- 2. 生产者消费者代码
- 3. 阻塞队列实现
Day 10:多线程(8)
单例模式
单例模式:某个类在进程中只能有唯一实例,需要一定的编程技巧,作出限制,一旦代码写的有问题,创建了多个实例,直接编译报错
-
饿汉模式:程序运行的时候,就立即创建实例
-
懒汉模式:首次使用的时候,才创建实例
-
加锁:把if和new包裹起来
-
双重if
-
给变量上加上volatile
可能会涉及到内存可见性问题:t1线程修改了Instance引用,t2有可能读不到(概率应该比较小),加上volatile是为了万无一失,另一方面,加上volatile也能够解决指令重排序引起的线程安全问题
-
指令重排序:也是编译器的一种优化策略,编译器优化有很多种策略,比如把读内存优化到读寄存器、指令重排序、循环展开、条件分支预测等
写的代码最终编译成了一系列的二进制指令,正常来说,CPU是按照顺序,一条一条地执行,但是编译器比较智能,会根据实际情况,生成的二进制指令的执行顺序可能和最初写代码的顺序存在差别,调整顺序的最主要的目的就是为了提高效率(前提是保证逻辑是等价的)
- 指令重排序的前提一定是重新排序之后,逻辑和之前等价
- 单线程下,编译器进行指令重排序的操作,一般都是没有问题的,编译器可以准确地识别出,哪些操作可以重排序,而不会影响到逻辑
- 多线程下,判定就可能不准确了,可能出现重排序后,逻辑发生了改变
对于instance = new SingletonLazy();
可以大体上细分为三个步骤:
- 申请内存空间
- 调用构造方法(对内存空间进行初始化)
- 把此时内存空间的地址,赋值给instance引用
在指令重排序优化策略下,上述执行的过程,不一定是123,有可能是132(1一定是先执行的),这两种执行方式,单线程下都是可以的,但是如果是132,在多线程下,可能会引起bug
package thread;class SingletonLazy {private static SingletonLazy instance = null;private static Object locker = new Object();public static SingletonLazy getInstance(){if (instance == null){synchronized (locker){if (instance == null){instance = new SingletonLazy();}}}return instance;}private SingletonLazy() {}
}
public class Demo28 {public static void main(String[] args) {SingletonLazy s1 = SingletonLazy.getInstance();SingletonLazy s2 = SingletonLazy.getInstance();System.out.println(s1 == s2);}}
- t1线程判断
instance == null
成立,进行加锁,进一步判断instance == null
成立,进行instance = new SingletonLazy()
,在这一过程中完成了1申请内存和3把地址赋值给引用,一旦3执行完,意味着instance为非null,但是指向的对象其实是一个未初始化的对象(里面的成员都是默认值) - 此时t2线程判断
instance == null
不成立,直接返回instance这个未初始化完毕的对象 - 然后接下来t1线程才开始进行2调用构造函数
这种情况下,后续的对SingletonLazy s1 = SingletonLazy.getInstance();
操作,都是针对未初始化的对象进行操作,存在严重问题
要解决上述问题,就需要引入volatile
- volatile不仅仅能解决内存可见性问题,也能禁止针对这个变量读写操作的指令重排序问题
- 指令重排序在很多地方都可能发生,volatile特指的是针对某个对象的读写操作过程中,不会出现重排序
- 按照加上volatile之后,此时t2线程读到的数据,一定是t1已经构造完毕的完整对象了
上述谈到的指令重排序涉及到的问题很难进行验证,本身就是一个小概率的事件,即使不加volatile运行程序,运行几百次几千次,应该也是正确的,指不定啥时候会出现问题,加上volatile总是万无一失的做法,程序员也不确定是否在某个JVM这样版本中更好的处理这样的问题
面试中考察单例模式
- 先写最初的版本,即不考虑线程安全的版本
- 加上锁
- 加上双重if
- 最后加上volatile
关于单例模式的延伸
(1)单例模式要确保反射下安全,即使动用反射也无法破坏单例特性
(2)单例模式要确保序列化下安全,即使动用Java标准库的序列化机制,也无法破坏单例特性
阻塞队列
之前学习过的普通队列和优先级队列都是线程不安全的,阻塞队列是先进先出的、线程安全的并且带有阻塞功能
- 队列为空,尝试出队列,出队列操作就会阻塞,一直阻塞到队列不为空为止
- 队列为满,尝试入队列,入队列操作也会阻塞,一直阻塞到队列不满为止
BlockingQueue就是标准库提供的阻塞队列
除了阻塞队列之外,还有消息队列:不是普通的先进先出,而是通过topic这样的参数来对数据进行归类,出队列的时候,指定topic,每个topic下的数据是先进先出的,消息队列往往也会带有阻塞特性
由于消息队列这样的数据结构太好用了,因此实际开发中,经常会把这样的数据结构封装成单独的服务器程序,单独部署
消息队列能够起到的作用,就是实现“生产者消费者模型”
1. 生产者消费者模型
生产者消费者模型,在开发中主要有两方面的意义:
- 能够让程序进行解耦合
- 能够使程序削峰填谷
生产者消费者模型的实现:
- 需要在一个进程内实现,使用阻塞队列即可
- 需要在分布式系统中实现,需要使用单独部署的消息队列服务器
简单来说生产者消费者模型就是一些线程负责“生产产品”,另一些线程负责“消费产品”
如果“生产产品”速度较慢,那么“消费产品”就会阻塞等待
如果“消费产品”速度较慢,那么“生产产品”就会阻塞等待
也就是说生产者和消费者之间多了一个消息队列
1.1 生产者消费者模型解耦合
如果让A直接调用B,意味着A的代码中就要包含很多和B相关的逻辑,B的代码中也会包含和A相关的逻辑,彼此之间就有一定的耦合
- 一旦A做出了修改,可能就会影响到B,反之亦然
- 一旦A出现了BUG,也容易把B牵连到,反之亦然
然而在引入了消息队列之后:
- 站在A的视角,不知道B的存在,只关心和队列的交互
- 站在B的视角,不知道A的存在,只关心和队列的交互
- 此时,对A的修改就不太容易影响到B,A如果挂了,也不会影响到B,反之亦然
- 未来如果再引入C,也让A访问C,A不需要修改任何代码,直接让C从队列里读取数据即可,提升了程序的可扩展能力
1.2 生产者消费者模型削峰填谷
客户端发来的请求,个数多少,没办法提前预知,遇到某些突发情况,就可能会导致客户端给服务的请求激增
正常情况下,A收到一个客户端的请求,就同样要请求一次B,A收到的请求激增了,B的请求也会激增,但是由于A做的工作比较简单,消耗的资源少,B做的工作更复杂,消耗的资源多,一旦请求量大了,B就容易挂,所以引入消息队列
- 无论A给队列写的多快,B都可以按照固有的节奏来消费数据
- B的节奏,就不一定完全跟着A了,相当于队列把B保护起来了
- B要进行很多重量级操作,比如操作数据库之类的,要消耗很多系统资源花费一定的时间
- 消息队列没有什么业务逻辑,消耗的硬件资源少,本身就抗造,同时,实际开发中,部署消息队列的机器一般都会给配置比较高的机器/集群
引入消息队列来实现生产者消费者模型,效率是不如直接访问来得更快的,多了一次周转,也多了一次网络通信
2. 生产者消费者代码
package thread;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Demo29 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);queue.put("A");String elem = queue.take();System.out.println("elem = " + elem);elem = queue.take();System.out.println("elem = " + elem);}
}
package thread;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo30 {public static void main(String[] args) {BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1000);Thread t1 = new Thread(() ->{try {while (true){Integer value = queue.take();System.out.println("t1 消费:" + value);Thread.sleep(1000);}}catch (InterruptedException e){e.printStackTrace();}});Thread t2 = new Thread(() ->{try {int count = 1;while (true){queue.put(count);System.out.println("t2 生产:" + count);count++;}}catch (InterruptedException e){e.printStackTrace();}});t1.start();t2.start();}
}
3. 阻塞队列实现
package thread;class MyBlockingQueue {private String[] elems = null;//[head, tail)//head位置指向的是第一个元素,tail指向的是最后一个元素的下一个元素private volatile int head = 0;private volatile int tail = 0;private volatile int size = 0;public MyBlockingQueue(int capacity){elems = new String[capacity];}void put(String elem) throws InterruptedException {synchronized (this) {while (size >= elems.length){//队列满了,进行队列阻塞this.wait();}//把新的元素放到tail所在的位置上elems[tail] = elem;tail++;if (tail >= elems.length) {//到达末尾,就回到开头tail = 0;}//更新size的值size++;//唤醒下面 take 阻塞的waitthis.notify();}}String take() throws InterruptedException {synchronized (this) {while (size == 0) {//队列空了,进行阻塞this.wait();}//取出 head 指向的元素String result = elems[head];head++;if (head >= elems.length) {head = 0;}size--;//take 成功一个元素,就唤醒上面put中的wait操作this.notify();return result;}}
}public class Demo31 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(1000);Thread t1 = new Thread(() -> {try {int count = 1;while (true) {queue.put(count + "");System.out.println("生产" + count);count++;}} catch (InterruptedException e) {e.printStackTrace();}});Thread t2 = new Thread(() -> {try {while (true){String result = queue.take();System.out.println("消费" + result);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}});t1.start();t2.start();}
}