文章目录
- 阻塞队列
- 1.生产者-消费者模式
- 生产者消费者模型的意义:
- 1.解耦合
- 2.削峰填谷:
- 2.阻塞队列的使用
- BlockingQueue
- 3.实现阻塞队列
- 唤醒:
- 使用阻塞队列实现生产者消费者模型
阻塞队列
阻塞队列是一种特殊的队列:
-
1.是线程安全的。
-
2.带有阻塞特性
如果队列为空,继续出队列,就会发生阻塞。直到其他线程往队列中添加队列为止
如果队列为满,继续入队列, 也会发生阻塞,直到其他线程从队列中取走元素为止
阻塞队列可以来实现生产者-消费者模型。
1.生产者-消费者模式
生产者:把生产出来的内容,放到阻塞队列中。
消费者:从阻塞队列中获取内容。
生产者消费者模型的意义:
1.解耦合
两个模块联系越紧密,耦合就越高。对于分布式系统来说,更加有意义。
可以使用生产者-消费者模型,实现解耦合的效果。
2.削峰填谷:
峰:短时间内,请求量比较多时。
谷:请求量比较少时。
在这种情况下:高峰时段,一旦客户端发起的请求量非常多时,每个A收到的请求,都会立即发给B。此时,A和B的访问量是相同的。但是在实际上,由于不同的服务器,上面跑的业务不同。虽然访问量一样,单个访问,消耗的硬件资源是不一样的。可能服务器A可以承担这些并发量,但是服务器B承担不了,就会挂掉。
在引入了生产者-消费者模型之后,就会解决这类问题。
- 当服务器A收到了大量请求之后,A会把对应的请求写入到队列中。B仍然按照之前的节奏来处理请求。(削峰)
- 一般情况下,峰值不会持续存在,峰值过后,A的请求量就会恢复正常、甚至减低。服务器B就可以在此时,逐渐把积压的请求给处理掉。(填谷)。
2.阻塞队列的使用
BlockingQueue
-
BlockingQueue是一个具体的接口,所以需要new一个具体的实现。
-
同时BlockingQueue继承自Queue。也可以使用Queue的方法(没有阻塞属性)
1.基于数组实现
2.基于链表实现
-
BlockingQueue带有阻塞的方法:
put:阻塞式入队列
take:阻塞式出队列
没有提供阻塞式获取队首元素的方法。
public static void main(String[] args) {// BlockingQueue<String> queue = new ArrayBlockingQueue<>();BlockingQueue<String>queue = new LinkedBlockingQueue<>();queue.put("111");queue.put("222");queue.put("333");queue.put("444");String elem = queue.take();System.out.println(elem);elem = queue.take();System.out.println(elem);elem = queue.take();System.out.println(elem);elem = queue.take();System.out.println(elem);}
3.实现阻塞队列
给一个普通的队列加上线性安全和阻塞
对入队和出队的方法进行加锁。对数据的修改实现原子性操作,保证线程安全
- put入队的时候,如果队列满了,就进行阻塞(wait)
在出队的时候,当size–后,队列中有位置了,调用notify()方法,对阻塞的put方法进行唤醒。
- 同样的,如果take出队列时,队列为空的话,也需要进行阻塞。
在入队时,当size++后,队列不为空了,调用notify()方法,对阻塞的take方法进行唤醒。
-
一个队列的阻塞情况,要么为空、要么为满。
put和take只有一边能阻塞。如果put阻塞了,其他线程继续调用put,也会进行阻塞。只有靠take来唤醒。
take阻塞,其他线程继续调用take也会进行阻塞,只能靠put来唤醒。
唤醒:
wait方法除了使用notify()方法进行唤醒,还可以通过interrupt()方法,来中断wait的状态。
使用interrupt方法唤醒的时候,会出现InterruptedException异常
public void put(String elem) throws InterruptedException {}
因为是throws抛出的异常,执行到interrupt()方法后,整个方法就会结束。
public void put(String elem) {synchronized (this) {if (size == data.length) {try {this.wait();}catch (InterruptedException e){ }}data[tail] = elem;tail++;if (tail == data.length) {tail = 0;}size++;this.notify();}
- 如果是try-catch来处理异常。如果出现异常,程序仍会继续执行下去。在满队列的情况下。强行修改,会覆盖掉tail的值,并且size会超出数组长度。
使用wait时,要考虑wait是notify唤醒的,还是通过Interrupt唤醒的。在wait返回时,还要进行判断wait执行的条件符不符合。可以直接将wait写在while循环中。循环的条件就是wait执行的条件。使wait在唤醒之后,再确定一下,条件是否满足。
while (size == data.length) {//队列满了,就会进行堵塞this.wait();}
- 最终再通过volatile修饰要频繁修改的变量,避免出现内存可见性问题。
class MyBlockingQueue {private String[] data = new String[1000];private volatile int head = 0;//队列起始位置private volatile int tail = 0;//队列结束位置的下一个元素。private volatile int size = 0;//队列中有效元素个数//入队public void put(String elem) throws InterruptedException {synchronized (this) {while (size == data.length) {//队列满了,就会进行堵塞this.wait();}//队列没满,向队列添加元素data[tail] = elem;tail++;if (tail == data.length) {//满了之后,环形队列要回到开头。tail = 0;}size++;this.notify();//唤醒take中的wait}}//出队public String take() throws InterruptedException {synchronized (this) {while (size == 0) {//队列为空时this.wait();}//队列不空时,把队首head位置删除String ret = data[head];head++;if (head == data.length) {head = 0;}size--;this.notify();//唤醒put中的wait.return ret;}}
}
使用阻塞队列实现生产者消费者模型
public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();//消费者Thread t1 = new Thread(() -> {while (true){try {String res = queue.take();System.out.println("消费元素: "+res);Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});//生产者Thread t2 = new Thread(() -> {int num = 1;while (true){try {queue.put(num+" ");System.out.println("生产元素:"+num);num++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}生产元素:1001
生产元素:1002
消费元素: 2
消费元素: 3
生产元素:1003
消费元素: 4
生产元素:1004
- 生产者快速生产了1000多个,消费者才消耗几个。队列填满之后,生产者进入了阻塞。直到消费者消费了之后,才会进行生产。消费一个生产一个。
点击移步博客主页,欢迎光临~