- 👑专栏内容:Java
- ⛪个人主页:子夜的星的主页
- 💕座右铭:前路未远,步履不停
目录
- 一、阻塞队列
- 1、标准库阻塞队列
- 2、手动实现阻塞队列
- 二、生产者-消费者模型
- 1、使用标准库实现
- 2、手动阻塞队列实现
一、阻塞队列
阻塞队列是一种特殊的队列,也遵守“先进先出”的原则。 阻塞队列是一种线程安全的数据结构,并且具有以下特性:
- 当队列满的时候,继续入队列就会阻塞,直到有其他线程从队列中取走元素
- 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插入元素
1、标准库阻塞队列
在 Java 标准库中内置了阻塞队列 BlockingQueue
接口,下面有七个实现类:
类名 | 特性 |
---|---|
ArrayBlockingQueue | 由数组结构组成的有界阻塞队列 |
LinkedBlockingQueue | 由链表结构组成的有界的阻塞队列(有界,默认大小 Integer.MAX_VALUE ,相当于无界) |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
DelayQueue | 使用优先级队列实现的延迟无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列,即单个元素的队列,生产一个,消费一个,不存储元素,不消费不生产 |
LinkedTransferQueue | 由链表结构组成的无界阻塞队列 |
LinkedBlockingDeque | 由链表结构组成的双向阻塞队列 |
详细说明:
ArrayBlockingQueue
:使用数组实现,线程安全,有界,FIFO(先进先出)排序。适合生产者和消费者速度相近的场景。LinkedBlockingQueue
:使用链表实现,线程安全,有界(默认大小Integer.MAX_VALUE
,相当于无界),FIFO排序。适合生产者和消费者速度不一致的场景。PriorityBlockingQueue
:使用优先级队列实现,线程安全,无界,支持优先级排序。适合需要根据优先级处理任务的场景。DelayQueue
:使用优先级队列实现,线程安全,无界,支持延迟队列。适合需要延迟执行任务的场景。SynchronousQueue
:不存储元素,生产者和消费者必须同时存在才能成功操作队列。适合需要一对一交换数据的场景。LinkedTransferQueue
:使用链表实现,线程安全,无界,支持FIFO和LIFO(后进先出)排序。适合需要无锁操作的场景。LinkedBlockingDeque
:使用链表实现,线程安全,无界,支持双向操作。适合需要同时支持入队和出队的场景。
选择建议:
选择合适的阻塞队列取决具体需求。以下是一些建议:
- 如果需要一个有界队列,并且生产者和消费者速度相近,可以使用
ArrayBlockingQueue
。 - 如果需要一个无界队列,并且生产者和消费者速度不一致,可以使用
LinkedBlockingQueue
。 - 如果需要根据优先级处理任务,可以使用
PriorityBlockingQueue
。 - 如果需要延迟执行任务,可以使用
DelayQueue
。 - 如果需要一对一交换数据,可以使用
SynchronousQueue
。 - 如果需要无锁操作,可以使用
LinkedTransferQueue
。 - 如果需要同时支持入队和出队,可以使用
LinkedBlockingDeque
。
常用方法 | 描述 |
---|---|
add(E e) | 将指定的元素插入此队列中,如果没有可用的空间,则抛出异常。 |
offer(E e) | 将指定的元素插入此队列中,如果可以在不违反容量限制的情况下立即执行,则成功返回 true,否则返回 false。 |
put(E e) | 将指定的元素插入此队列中,如果队列已满,则一直等待直到有空间可用。 |
remove() | 检索并删除此队列的头部,如果队列为空,则抛出异常。 |
poll() | 检索并删除此队列的头部,如果队列为空,则返回 null。 |
take() | 检索并删除此队列的头部,如果队列为空,则一直等待直到有元素可用。 |
element() | 检索但不删除此队列的头部,如果队列为空,则抛出异常。 |
peek() | 检索但不删除此队列的头部,如果队列为空,则返回 null。 |
这个表格列出了 ArrayBlockingQueue
、LinkedBlockingQueue
和 PriorityBlockingQueue
这三个类的常用方法,它们分别表示由数组结构支持的有界阻塞队列、由链表结构支持的可选有界阻塞队列以及由堆实现支持优先级排序的无界阻塞队列。阻塞队列的一个典型应用场景就是 “生产者消费者模型”,这是一种非常典型的开发模型。
2、手动实现阻塞队列
我们使用循环队列逐步实现阻塞队列,先手动实现一下循环队列。
class MyBlockingQueue{//使用String数组保存元素private String[] items = new String[1000];//有效范围:[head,tail),当heda和tai相等,相当于空队列。private int head = 0; //指向队列头部private int tail = 0; //指向队列的尾部private int size = 0;//入队列public void put(String elem){if(size>=items.length){//队列满了return;}items[tail] = elem;tail++;if(tail>=items.length){tail = 0;}size++;}//出队列public String take(){if(size == 0){return null;}String elem = items[head];head++;if(head>=items.length){head = 0;}size--;return elem;}
}
把上面的队列改造为阻塞队列,因为是多线程,所以为了保证线程安全,就要给put
和take
进行加锁。
public void put(String elem) throws InterruptedException {synchronized (this){while(size>=items.length){ //使用while代替if,进行二次确认。//队列满了this.wait();}items[tail] = elem;tail++;if(tail>=items.length){tail = 0;}size++;this.notify(); // 用来唤醒队列为空的阻塞情况。}}
public String take() throws InterruptedException {synchronized (this){if(size == 0){//队列为空this.wait();}String elem = items[head];head++;if(head>=items.length){head = 0;}size--;this.notify(); // 使用这个notify来唤醒队列满return elem;}}
除了加锁之外,还需要考虑内存可见性问题。
volatile private int head = 0; //指向队列头部volatile private int tail = 0; //指向队列的尾部volatile private int size = 0;
此处的两个wait
不会同时出现,要么是这边的wait
,要么是另一边的wait
class MyBlockingQueue{//使用String数组保存元素private String[] items = new String[1000];//有效范围:[head,tail),当heda和tai相等,相当于空队列。volatile private int head = 0; //指向队列头部volatile private int tail = 0; //指向队列的尾部volatile private int size = 0;//入队列public void put(String elem) throws InterruptedException {synchronized (this){while(size>=items.length){ //使用while代替if,进行二次确认。//队列满了this.wait();}items[tail] = elem;tail++;if(tail>=items.length){tail = 0;}size++;this.notify(); // 用来唤醒队列为空的阻塞情况。}}//出队列public String take() throws InterruptedException {synchronized (this){if(size == 0){//队列为空this.wait();}String elem = items[head];head++;if(head>=items.length){head = 0;}size--;this.notify(); // 使用这个notify来唤醒队列满return elem;}}
}
//实现阻塞队列(基于数组(循环队列))
public class Demo {public static void main(String[] args) throws InterruptedException {MyBlockingQueue queue = new MyBlockingQueue();queue.put("aaa");queue.put("bbb");queue.put("ccc");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}
二、生产者-消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。
生产者: 负责生产数据的线程或者进程。生产者把生产出来的数据放入缓冲区。
消费者: 负责消费数据的线程或者进程。消费者从缓冲区取出数据进行消费。
缓冲区: 生产者和消费者之间共享的存储区域。缓冲区可以是有限的空间,也可以是阻塞队列。
- 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求,如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程)。这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求。这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮。
- 阻塞队列也能使生产者和消费者之间解耦。比如过年一家人一起包饺子,一般都是有明确分工。比如一个人负责擀饺子皮,其他人负责包。擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”。 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)。
1、使用标准库实现
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;//简单的生产者消费者模型
public class Demo10 {public static void main(String[] args) {//阻塞队列作为交易场所BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();Thread t1 = new Thread(()->{int count = 0;while(true){try {queue.put(count);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("生产元素:"+count);count++;try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});//负责消费元素Thread t2 = new Thread(()->{while (true){Integer n = null;try {n = queue.take();System.out.println("消费元素:" + n);} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();t2.start();}
}
2、手动阻塞队列实现
class MyBlockingQueue{//使用String数组保存元素private String[] items = new String[1000];//有效范围:[head,tail),当heda和tai相等,相当于空队列。volatile private int head = 0; //指向队列头部volatile private int tail = 0; //指向队列的尾部volatile private int size = 0;//入队列public void put(String elem) throws InterruptedException {synchronized (this){while(size>=items.length){ //使用while代替if,进行二次确认。//队列满了this.wait();}items[tail] = elem;tail++;if(tail>=items.length){tail = 0;}size++;this.notify(); // 用来唤醒队列为空的阻塞情况。}}//出队列public String take() throws InterruptedException {synchronized (this){if(size == 0){//队列为空this.wait();}String elem = items[head];head++;if(head>=items.length){head = 0;}size--;this.notify(); // 使用这个notify来唤醒队列满return elem;}}
}
//实现阻塞队列(基于数组(循环队列))
public class Demo11 {public static void main(String[] args) throws InterruptedException {//创建两个线程表示生产者和消费者MyBlockingQueue queue = new MyBlockingQueue();Thread t1 = new Thread(()->{int count = 0;while (true){try {queue.put(count + "");System.out.println("生产元素:"+count);count++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});Thread t2 = new Thread(()->{while (true){try {String count = queue.take();System.out.println("消费元素:"+count);} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();t2.start();}
}