目录
一、阻塞队列
阻塞队列的概念
生产者消费者模型
二、自定义实现阻塞队列
一、阻塞队列
阻塞队列的概念
队列我们并不默认,一提起队列,我们立马就能想到 "先进先出"的特性。
今天我们就来学习一下特殊的队列: 阻塞队列,它具有下面两个特性:
- 如果队列为空,执行出队列操作时,就会阻塞,直到另一个线程往队列里添加元素(队列不为空时)
- 如果队列为满,执行入队列操作时,就会阻塞,直到另一个线程从队列里取走元素
(队列不为满时)
我们的JUC下为我们提供了一个泛型接口BlockingQueue.
同时也提供了一下具体的实现类:
ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
SynchronousQueue: 一个不存储元素的阻塞队列。
同时也为我们提供了一些操作阻塞队列的方法。
方法 | 作用 |
void put() | 带有阻塞特性的入队操作 |
E take() | 带有阻塞特性的出队特性 |
boolean contains(Object o) | 判断阻塞队列是否包含某元素 |
使用一下阻塞队列
public static void main(String[] args) throws InterruptedException {MyBlockingQueue queue = new MyBlockingQueue(3);queue.put(1);queue.put(2);queue.put(3);System.out.println("三个元素");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println("4");}
因为阻塞对列中put进了三个元素,但是有四个take()操作,当阻塞队列中三个元素全部take出去,阻塞队列为空,继续等待第四个元素的加入,从而进行第四个take操作。
生产者消费者模型
为什么会有生产者消费者模型?
在我们多线程编程种,为了解决生产者(生产数据的线程)和消费者(消费数据的线程)之间的执行速度的差异和解决生产者和消费者之间的强耦合关系。
通过在生产者和消费者之间增加一个队列来避免生产者和消费者之间的直接通信,生产者将数据直接添加到队列中,消费者直接从队列中取数据处理
通过在生产者和消费者之间增加一个队列来避免生产者和消费者之间的直接通信,生产者将数据直接添加到队列中,消费者直接从队列中取数据处理。
那么这样的模型有什么好处呢?
1.降低生产者和消费者之间的耦合度
2.增加一个阻塞队列,平衡生产者和消费者之间的处理能力
比如我们的双11,12的购物狂欢节,某一时刻的秒杀活动,我们的服务器A某一时刻会向服务器B发送巨量的强求数据,但是我们的服务器处理速度有限,一次性招架不住这么多请求,有可能服务器之间就崩了。
我们使用生产者消费者模型时,就不会受到影响,即使服务器的请求暴涨,但是他发送到阻塞队列中,我们的服务器B按照自己的处理速度去阻塞队列中取元素,所以不会产生实质性的影响。
二、自定义实现阻塞队列
public class MyBlockingQueue {//定义一个数组来存放数据private Integer[] elementData = null;//定义头尾下标private volatile int head=0;private volatile int tail=0;//定义数组中元素的个数private volatile int size=0;//构造public MyBlockingQueue(int capacity) {if (capacity<=0){throw new RuntimeException("队列容量必须大于等于0");}elementData=new Integer[capacity];}//插入数据的方法public void put(Integer value) throws InterruptedException {//判断队列是否已满synchronized (this) {while(size >= elementData.length) {//阻塞队列在队列满的时候应该阻塞等待//阻塞等待this.wait();}//插入元素的个数//在队列尾部插入元素elementData[tail] = value;tail++;if (tail >= elementData.length) {tail = 0;}size++;this.notifyAll();}}//获取数据的方法public synchronized Integer take() throws InterruptedException {//判断队列是否为空while (size == 0) {this.wait();}//获取出队的元素Integer value=elementData[head];//移动head下标head++;//处理下标if (head >= elementData.length) {head=0;}size--;this.notifyAll();return value;}
}