阻塞队列
- 🌴生产者消费者模型
- 🌸强耦合
- 🌸松耦合(解耦合)
- 🎍Java标准库中的阻塞队列
- 🌳阻塞队列的模拟实现
- ⭕总结
阻塞队列是什么?
- 阻塞队列是⼀种特殊的队列. 也遵守 “先进先出” 的
- 阻塞队列能是⼀种线程安全的数据结构, 并且具有以下特性:
• 当队列满的时候, 继续⼊队列就会阻塞, 直到有其他线程从队列中取⾛元素.
• 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插⼊元素.
阻塞队列的⼀个典型应⽤场景就是 “⽣产者消费者模型”. 这是⼀种⾮常典型的开发模型.
🌴生产者消费者模型
什么是生产者消费者模型?
- ⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。
- ⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取.
什么是耦合与解耦?
- 耦合是两个或多个模块之间的相互关联。在软件工程中,两个模块之间的耦合度越高,维护成本越高。因此,在系统架构的设计过程中,应减少各个模块之间的耦合度,以提高应用的可维护性
耦合又分为紧耦合(强耦合)和 松耦合(解耦合)
🌸强耦合
紧耦合架构本质是Client/Server的模型,如下图所示
优点是:架构简单、设计简单、开发周期短、能够快速的开发、投入、部署、应用。
但随着集群规模的扩大,系统的稳定性逐渐变差,主要原因如下:
-
同步操作导致对网络资源消耗大。同步操作在数据发送和数据返回之间,有很大一段是空闲的,这种空闲占用是对网络资源的极大浪费。
-
安全控制力度差,因为服务器直接暴露给客户机,容易引发网络攻击行为。
-
程序代码之间关联度过高,不利于模块化处理。
🌸松耦合(解耦合)
松耦合架构本质上是在client/server模型之间加入一个代理,把CS模型变成CAS模型。 在新的架构下,客户机的角色不变,代理服务器承担起与客户机的通信,和对客户机的识别判断工作,服务器位于代理服务器后面,对客户机来说不可见,它只负责数据处理工作,另外我们也把CS模型的同步操作改为CAS的代理处理。 如下图所示:
优点如下:
-
多任务并行处理能力获得极大提升。
-
实现负载自适应机制(根据当时运行环境,松耦合架构分配并行工作任务,避免超载现象)。
-
基本杜绝了对Server服务端的网络攻击行为,由于代理服务器的隔绝和筛查作用, 同时结合其它安全管理手段,外部攻击在代理服务器处就被识别和过滤掉了,这样就保护了后面的服务器不受影响。
-
异步操作减少了网络资源消耗和操作关联。
-
提高了系统的可维护性。
了解了耦合之后,我们就可以通过一个阻塞队列来实现一个生产者消费者的模型
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
在这个模型里面
- 阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒. (削峰填⾕)
⽐如在 “秒杀” 场景下, 服务器同⼀时刻可能会收到⼤量的⽀付请求. 如果直接处理这些⽀付请求, 服务器可能扛不住(每个⽀付请求的处理都需要⽐较复杂的流程). 这个时候就可以把这些请求都放到⼀个阻塞队列中, 然后再由消费者线程慢慢的来处理每个⽀付请求.这样做可以有效进⾏ “削峰”, 防⽌服务器被突然到来的⼀波请求直接冲垮.
- 阻塞队列也能使⽣产者和消费者之间 解耦
⽐如过年⼀家⼈⼀起包饺⼦. ⼀般都是有明确分⼯, ⽐如⼀个⼈负责擀饺⼦⽪, 其他⼈负责包. 擀饺⼦⽪的⼈就是 “⽣产者”, 包饺⼦的⼈就是 “消费者”.擀饺⼦⽪的⼈不关⼼包饺⼦的⼈是谁(能包就⾏, ⽆论是⼿⼯包, 借助⼯具, 还是机器包), 包饺⼦的⼈也不关⼼擀饺⼦⽪的⼈是谁(有饺⼦⽪就⾏, ⽆论是⽤擀⾯杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)
🎍Java标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在⼀些程序中使⽤阻塞队列, 直接使⽤标准库中的即可.
- BlockingQueue 是⼀个接⼝. 真正实现的类是 LinkedBlockingQueue.
- put ⽅法⽤于阻塞式的⼊队列, take ⽤于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等⽅法, 但是这些⽅法不带有阻塞特性.
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// ⼊队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
使用阻塞队列模拟生产者消费者模型
public class Demo {public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();Thread customer = new Thread(() -> {while (true) {try {int value = blockingQueue.take();System.out.println("消费元素: " + value);} catch (InterruptedException e) {e.printStackTrace();}}}, "消费者");customer.start();Thread producer = new Thread(() -> {Random random = new Random();while (true) {try {int num = random.nextInt(1000);System.out.println("⽣产元素: " + num);blockingQueue.put(num);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}, "⽣产者");producer.start();customer.join();producer.join();}}
🌳阻塞队列的模拟实现
思路:
• 通过 “循环队列” 的⽅式来实现.
• 使⽤ synchronized 进⾏加锁控制.
• put 插⼊元素的时候, 判定如果队列满了, 就进⾏ wait. (注意, 要在循环中进⾏ wait. 被唤醒时不⼀定
队列就不满了, 因为同时可能是唤醒了多个线程).
• take 取出元素的时候, 判定如果队列为空, 就进⾏ wait. (也是循环 wait)
这里博主选择的是用数组的形式进行模拟实现
首先我们创建一个 “循环队列” ,关于循环队列了解的小伙伴,可以去看看博主相应的博客栈和队列
public class BlockingQueue {private int[] items = new int[1000];private volatile int size = 0;private volatile int head = 0;private volatile int tail = 0;
上述代码只是一个简单的环形队列,如果在多线程中进行操作的话,会出现线程安全问题,所以接下来我们要做的是就是解决上述线程安全问题
-
首先呢。我们要保证同一个对象,在出队列时不能入队列,在入队列时不能出队列
所以我们使用 synchronized 进行加锁控制 -
其次。put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
所以我们在判断为满或者为空时,使用while循环进行判断。
代码实现如下:
public class MyBlockingQueue {private int[] items = new int[1000];private int head = 0;private int tail = 0;private int size = 0;// 入队列public void put(int value) throws InterruptedException {synchronized (this) {while (size == items.length) {// 队列满了, 此时要产生阻塞.// return;this.wait();}items[tail] = value;tail++;if (tail >= items.length) {tail = 0;}size++;// 这个 notify 唤醒 take 中的 waitthis.notify();}}// 出队列public Integer take() throws InterruptedException {int result = 0;synchronized (this) {while (size == 0) {//return null;// 队列空, 也应该阻塞.this.wait();}result = items[head];head++;if (head >= items.length) {head = 0;}size--;// 唤醒 put 中的 waitthis.notify();}return result;}
}
测试代码如下:
public class ThreadDemo2 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();Thread customer = new Thread(() -> {while (true) {try {int result = queue.take();System.out.println("消费: " + result);} catch (InterruptedException e) {e.printStackTrace();}}});customer.start();Thread producer = new Thread(() -> {int count = 0;while (true) {try {System.out.println("生产: " + count);queue.put(count);count++;Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}});producer.start();}
}
⭕总结
关于《【多线程】阻塞队列详解及实现(模拟实现生产者消费者模)》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下!