什么是阻塞队列
相比于一般的队列,有两个特点
1.线程安全
2.带有阻塞功能
1)队伍为空时,出队列就会出现阻塞,阻塞到其他线程入队列为止
2)队伍为满时,入队列就会出现阻塞,阻塞到其他线程出队列为止
常用于生产者消费者模型
作用:
1.解耦合
2.削峰填谷
使用阻塞队列
public class Test12 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);queue.put("qqq");String elem = queue.take();System.out.println("elem = "+ elem);elem = queue.take();System.out.println("elem = " + elem);}
}
运行结果
不会结束运行,一直在等待。
使用put和offer一样的都是入队列,但是put是带有阻塞功能,offer没有带阻塞功能(队满了就会返回结果)
take方法用来出队列,也是带有阻塞功能
实现阻塞队列
1)实现普通队列
class MyBlockingQueue{private String[] elems = null;private int size = 0;private int head = 0;private int tail = 0;public MyBlockingQueue(int capacity){elems = new String[capacity];}public void put(String elem) throws InterruptedException {if (size > elems.length){//阻塞功能}elems[tail] = elem;tail++;if (tail >= elems.length){tail = 0;}size++;}public String take() throws InterruptedException {if (size == 0){//阻塞功能}String elem = null;elem = elems[head];head++;if (head >= elems.length){head = 0;}size--;return elem;}}}
2)加上线程安全
class MyBlockingQueue{private String[] elems = null;private int size = 0;private int head = 0;private int tail = 0;private Object locker = new Object();public MyBlockingQueue(int capacity){elems = new String[capacity];}public void put(String elem) throws InterruptedException {synchronized(locker){if (size > elems.length){}elems[tail] = elem;tail++;if (tail >= elems.length){tail = 0;}size++;}}public String take() throws InterruptedException {synchronized(locker){if (size == 0){}String elem = null;elem = elems[head];head++;if (head >= elems.length){head = 0;}size--;return elem;}}}
3)加上阻塞功能
class MyBlockingQueue{private String[] elems = null;private int size = 0;private int head = 0;private int tail = 0;private Object locker = new Object();public MyBlockingQueue(int capacity){elems = new String[capacity];}public void put(String elem) throws InterruptedException {synchronized(locker){while (size > elems.length){locker.wait();}elems[tail] = elem;tail++;if (tail >= elems.length){tail = 0;}size++;locker.notify();}}public String take() throws InterruptedException {synchronized(locker){while (size == 0){locker.wait();}String elem = null;elem = elems[head];head++;if (head >= elems.length){head = 0;}size--;locker.notify();return elem;}}}
代码解释:
最终代码将if改成了while,因为if只能判定一次,如果出现以下情况就会出bug(线程A,线程B都执行到了put中的wait,因为队列已满而停止运行,线程C出队列唤醒了线程A,线程A继续入队列,入队列后就会notify,导致唤醒了线程B,而此时队列已满,无法进行入队操作,就出现了bug),所以就使用while,wait之前判定一次,唤醒之后再进行一次判定,相当于多做一步确定操作
简单的生产者消费者模型
class MyBlockingQueue{private String[] elems = null;private int size = 0;private int head = 0;private int tail = 0;private Object locker = new Object();public MyBlockingQueue(int capacity){elems = new String[capacity];}public void put(String elem) throws InterruptedException {synchronized(locker){while (size > elems.length){locker.wait();}elems[tail] = elem;tail++;if (tail >= elems.length){tail = 0;}size++;locker.notify();}}public String take() throws InterruptedException {synchronized(locker){while (size == 0){locker.wait();}String elem = null;elem = elems[head];head++;if (head >= elems.length){head = 0;}size--;locker.notify();return elem;}}}
public class Test11 {public static void main(String[] args) throws InterruptedException {MyBlockingQueue myBlockingQueue = new MyBlockingQueue(100);Thread t1 = new Thread(()->{int n = 1;while(true){try {myBlockingQueue.put(n + "");System.out.println("生产元素 "+n);n++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(()->{while(true){try {String n = myBlockingQueue.take();System.out.println("消费元素 " + n);Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}