目录
阻塞队列
阻塞队列是什么?
生产者消费者模型
生产者消费者模型的两个重要优势
1.解耦合(不一定是两个线程之间,也可以是两个服务器之间)
2.阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力(削峰填谷)
标准库中的阻塞队列
生产者消费者模型
阻塞队列实现
阻塞队列
阻塞队列是什么?
阻塞队列是一种特殊的队列
也遵循“先进先出”的原则
阻塞队列是一种线程安全的数据结构,并具有以下特性
1.队列为空,尝试出队列,出队列操作就会阻塞,阻塞到其他线程添加元素为止
2.队列为满,尝试入队列,入队列操作就会阻塞,阻塞到其他线程取走元素为止
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合
生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯
生产者消费者模型的两个重要优势
1.解耦合(不一定是两个线程之间,也可以是两个服务器之间)
a和b不再直接交互了
a的代码中看不见b
b的代码也看不见a
a的代码中和b的代码中只能看到队列
2.阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力(削峰填谷)
理解成服务器收到的请求量的曲线图
b这边可以不关心数据量多少按照自己的节奏慢慢处理队列中的请求数据即可
趁着峰值过去任然继续消费数据
利用波谷的时间来赶紧消费之前积压的数据
三峡大坝起到的效果同样是削峰填谷
生产者消费者模型付出的代价
1.引入队列后,整体的结构会更复杂
此时需要更多的机器进行部署,生产环境的结构会更复杂,管理起来更麻烦
2.效率会有影响
标准库中的阻塞队列
Java标准库中,提供了现成的阻塞队列
BlockingQueue 是一个接口,真正实现的类是LinkedBlockingQueue
put方法用于阻塞式的入队列,take用于阻塞式的出队列
BlockingQueue也有offer,poll,peek等方法,但是则这些方法不带有阻塞特性
put和take才带有阻塞功能
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
queue.put("abc");//入队列
String elem = queue.take();//出队列,如果没有put直接take,就会阻塞
//容量,最大能容纳多少元素
BlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
如果不设置capacity,默认是一个非常大的数值
生产者消费者模型
public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();Thread customer = new Thread(()->{while (true){try{int value = blockingQueue.take();System.out.println("消费元素"+value);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"消费者");Thread producer = new Thread(()->{Random random = new Random();while (true){try{int num = random.nextInt(1000);System.out.println("生产元素"+num);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"生产者");producer.start();customer.join();producer.join();}
阻塞队列实现
通过“循环队列”的方式来实现
使用synchronized进行加锁控制
put插入元素的时候判定如果队列满了就进行wait(注意,要在循环中进行wait,被唤醒时不一定队列就不满了,因为同时可能是唤醒了多个线程)
take取出的时候,判定如果队列为空就进行wait(也是循环wait)
class MyBlockingQueue{private String[] data = null;private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {data = new String[capacity];}public void put(String elem) throws InterruptedException {synchronized (this){//此处最好使用while//否则notifyAll的时候,该线程从wait中被唤醒//但是紧接者并未抢占到锁,当锁被抢占的时候,可能又已经队列满了//就只能继续等待while (size >= data.length){//队列满了,需要阻塞//return;this.wait();}data[tail] = elem;tail++;if(tail >= data.length){tail = 0;}size++;this.notify();}}public String take() throws InterruptedException {synchronized (this){while (size == 0){//队列空了,需要阻塞//return null;this.wait();}String ret = data[head];head++;if(head>= data.length){head = 0;}size--;this.notify();return ret;}}
}
这里的wait是用来确保接下来的操作是有意义的
wait不一定只是被notify唤醒,还可能被Interrupt这样的方法给中断
如果使用if作为wait的判定条件
此时就存在wait被提前唤醒的风险
这里的循环的目的是为了“二次验证”
判定当前这里的条件是否成立
wait之前先判定一次
wait唤醒也判定一次(再确认一下,队列是否不空)
wait设计的时候本身就是搭配while用的