此篇文章与大家分享关于多线程的文章第五篇关于 多线程代码案例二 阻塞队列
如果有不足的或者错误的请您指出!
目录
- 2.阻塞队列
- 2.1常见队列
- 2.2 生产者消费者模型
- 有利于进行解耦合
- 程序进行削峰填谷
- 2.3通过代码看一下阻塞队列和生产者消费者模型
- 2.4自己实现阻塞队列
- 实现一个普通的循环队列
- 保证线程安全
- 加入阻塞
2.阻塞队列
2.1常见队列
关于队列,我们之前学过的队列是最普通的队列,但是实际上队列还有其他几个版本,总体可以分成一下几类:
(1)普通队列
(2)优先级队列
对于上述两种队列,都是线程不安全的
(3)阻塞队列
这种队列就是线程安全的,且实现了阻塞功能
所谓阻塞功能就是,当队满了的时候,此时如果要往队列里面插入元素,就会触发阻塞等待,直到队列不满为止
同理,如果队列为空,此时如果要往队列里面拿元素,也会触发阻塞等待,直到队列不为空为止
BlockingQueue就是标准库提供的阻塞队列
(4)消息队列
我们知道队列的性质就是先进先出
但是对于消息队列来说就不是普通的先进先出,而是通过topic这样的参数,来对不同的数据进行分类
在出队列的时候,是按照指定topic里面的参数来 进行 先进先出 的
举个例子就是
在医院做检查的时候,有时候一个检查室可能不只是检查一种器官
如果按照如图所示的排队顺序,如果医生指定检查的是哪种器官,那么对应的器官检查就相当于 一个 topic,那么此时的先后顺序是按照这个topic来的
而消息队列往往也是带有阻塞功能的
2.2 生产者消费者模型
我们在上面讲到的 阻塞队列 和 消息队列 都能实现生产者消费者模型
而实现所谓的生产者消费者模型,在实际开发中又有两方面的意义
(1)方便进行解耦合
(2)有利于程序进行削峰填谷
下面我们就来讲讲所谓的 “生产者消费者模型”
有利于进行解耦合
如果是在耦合比较高的情况下
此时A客户端要是想给B服务器发送请求,意味着A中的代码就要包含很多关于 B服务器的逻辑,
此时就有了一定的耦合
那么如果A中的代码逻辑一旦进行修改,那么在B中也需要对应的修改
同时如果A/ B 出现问题,另一方也很有可能被影响到
但是如果我们采用生产者消费者模型,引入的消息队列
此时站在A的视角是不知道B的存在的,只关心与消息队列的交互
同时,站在B的视角是不知道A的存在的,只关心与 消息队列的交互
那么此时A 和 B之间的耦合就很小了
更重要的是,如果此时引入 C服务器,那么也只是需要让C直接从消息队列里面拿数据即可
程序进行削峰填谷
此时A对于接受到的请求只是进行一些简单的操作,而B相对进行的是重量级操作
而某一时刻 A 收到的数据激增,此时B进行的操作也会激增,消耗的资源多,B就容易挂
当我们引入生产者消费者模型
那么此时无论A给队列写多块,B都可以按照自己固有的节奏来消费数据,B的节奏就不一定完全跟着A了,相当于把B保护起来了
但是此时效率是一定会有折损的,不太适合对于响应速度比较高的场景
2.3通过代码看一下阻塞队列和生产者消费者模型
BIockingQueue是一个接口
java提供了3个类供我们使用
阻塞队列只需要考虑,入队列和出队列即可,阻塞队列没有"取队首元素"操作(也不是完全没有,只不过这些操作没有阻塞功能)
阻塞队列也提供了offer和poll方法,这两个是不带有阻塞功能的,我们实际上用的是put和take方法
利用阻塞队列实现生产者消费者模型
public static void main(String[] args) {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(100);Thread t1 = new Thread(() -> {//生产int count = 0;while (true) {try {System.out.println("生产了" + count);queue.put(count++);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(() -> {//消费while (true) {try {System.out.println("消费了" + queue.take());Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
此时就能实现,当队列满了的时候,要往队列里面插入元素就需要 阻塞等待
2.4自己实现阻塞队列
实现一个普通的循环队列
public class MyBlockingQueue {private String[] elems;//[head,tail) head -> 位置指的是第一个元素 tail -> 指的是最后一个元素的下一个元素 以实现循环队列private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}public String take() {if(size == 0) {//后面实现阻塞return null;}String ret = elems[head];head++;if (head == elems.length) {head = 0;}this.size--;return ret;}public void put (String elem) {if(size >= elem.length()) {//后面实现阻塞return;}this.elems[tail++] = elem;if(tail == elems.length) {tail = 0;}this.size++;}
}
保证线程安全
public class MyBlockingQueue {private String[] elems;//[head,tail) head -> 位置指的是第一个元素 tail -> 指的是最后一个元素的下一个元素 以实现循环队列private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}public String take() {synchronized (this) {if(size == 0) {//后面实现阻塞return null;}String ret = elems[head];head++;if (head == elems.length) {head = 0;}this.size--;return ret;}}public void put (String elem) {synchronized (this) {if(szie >= elems.length) {//后面实现阻塞return;}this.elems[tail++] = elem;if(tail == elems.length) {tail = 0;}this.size++;}}
}
加入阻塞
public class MyBlockingQueue {private String[] elems = null;//[head,tail) head -> 位置指的是第一个元素 tail -> 指的是最后一个元素的下一个元素 以实现循环队列private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}public String take() throws InterruptedException {synchronized (this) {if(size == 0) {this.wait();//阻塞,直到有元素put的时候唤醒}String ret = elems[head];head++;if (head == elems.length) {head = 0;}this.size--;this.notify();return ret;}}public void put (String elem) throws InterruptedException {synchronized (this) {if(size >= elems.length) {this.wait();//阻塞,直到有元素take的时候唤醒}this.elems[tail++] = elem;if(tail == elems.length) {tail = 0;}this.size++;this.notify();}}
}
那么我们此时就可以利用我们自己的阻塞队列来实现生产者消费者模型
但是实际上还会存在一个问题
在java官方文档中
其实就是说,wait不只是能够被notify唤醒,比如interrupt也行,但是如果我们的程序用try-catch处理异常,程序就会继续往下允许,那么简单使用if判断的时候,如果不是我们设定的notify唤醒wait,程序往下运行就会出bug
因此我们最好搭配while使用,是最稳妥的做法,即被唤醒的时候,再次确认一下,看看条件是否成立
因此最后的版本就是:
public class MyBlockingQueue {private String[] elems = null;//[head,tail) head -> 位置指的是第一个元素 tail -> 指的是最后一个元素的下一个元素 以实现循环队列private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}public String take() throws InterruptedException {synchronized (this) {while(size == 0) {this.wait();//阻塞,直到有元素put的时候唤醒}String ret = elems[head];head++;if (head == elems.length) {head = 0;}this.size--;this.notify();return ret;}}public void put (String elem) throws InterruptedException {synchronized (this) {while(size >= elems.length) {this.wait();//阻塞,直到有元素take的时候唤醒}this.elems[tail++] = elem;if(tail == elems.length) {tail = 0;}this.size++;this.notify();}}
}