目录
阻塞队列
消息队列
阻塞队列用于生产者消费者模型
概念
实现原理
生产者消费者主要优势
缺陷
阻塞队列的实现
1.写一个普通队列
2.加上线程安全和阻塞等待
3.解决代码中的问题
阻塞队列
阻塞队列,是带有线程安全功能的队列,拥有队列先进先出的特性,并带有阻塞功能。
队列为空,尝试出队列,出队操作就会阻塞,一直阻塞到不为空为止。
队列为满,尝试入队列,入队操作也会阻塞,一直阻塞到队列不满为止。
过程如下:
消息队列
消息队列的先进先出不是普通的先进先出,而是把topic这样的数据结构作为参数对数据进行分类,而在出对列的时候,指定topic ,每个topic下的数据是先进先出的。消息队列一般也带有阻塞功能。消息队列能够起到的作用是实现"生产者消费者模型",消息队列这种数据结构,在实际开发中经常会被封装成单独的服务器程序,单独部署,这样的服务器程序也被称为消息队列。
阻塞队列用于生产者消费者模型
概念
生产者-消费者模式是一种通过缓冲区将生产者和消费者解耦的设计模式。 生产者线程负责生成数据,而消费者线程负责消费数据。 由于生产者和消费者的工作速度可能不同,因此缓冲区的存在使得它们可以独立运行。
实现原理
在一个进程内,直接可以使用阻塞队列实现。在分布式系统中,需要使用单独部署的消息队列服务器,实现生产者消费者模型。
生产者消费者主要优势
1.解耦合
两个程序A,B,让他们互相调用,意味着A代码种就要包含很多关于B相关的逻辑,B的代码中也会包含和A相关的逻辑,彼此之间就有了一定的耦合,一旦A程序做出修改,可能会影响B相关的逻辑,反之亦然,一旦A出现Bug,那么很容易使B受到牵连。在生产者消费者模型中,使用一个消息队列,将A和B解耦合:
站在A的视角,只和消息队列进行交互,站在B的视角,也之和消息队列进行交互。如果对A程序进行修改,不太容易影响到B程序,遇到Bug,对B也没有影响。如果未来引入C,D等,通过消息队列可以直接让A访问C,D,不需要修改A中的任何代码,直接让A从队列里读取数据即可。
2.削峰填谷
客户端发送的请求个数是没办法提前预知的,当客户端发送大量请求时,就会导致服务器遇到的请求激增,此时服务器内部有些复杂的程序就会消耗大量资源,导致崩溃。服务器每次处理一个请求就会消耗一定的系统资源,如果同一时刻要处理等待请求多了,消耗的总资源数目超出机器能提供的上限,那么就会出现机器卡死的情况。引入消息队列(mq):
此时,当A段收到一个请求,就会把请求传递给消息队列,通过消息队列把请求传递给B,无论A给队列请求有多块,B都可以按照固有的节奏来处理这些请求,提高系统的可用性。
缺陷
引入阻塞队列实现生产者消费者模型,效率不如直接访问更快,如上图,多了一次A想mq传递请求,多了一次周转,也多了一次网络通信,效率会有所折损,不适合用在响应熟读要求特别高的场景。
阻塞队列的实现
阻塞队列是线程安全并带有阻塞功能的队列。
1.写一个普通队列
使用数组实现普通的队列,队列的属性包含一个数组,队首下标,队尾下标,元素个数,实现入队和出队操作。
public class QueueBlock {private int[] array;private int head;private int last;private int size;void put(int elem) {//判断是否满了,队列满进入阻塞状态if(array.length >= size) {return;}array[last] = elem;last++;if(last >= array.length) {last = 0;}size++;}int take() {//判断是否为空,为空时进入 阻塞if(size == 0) {return 0;}int ret = array[head];head++;if(head >= array.length) {head = 0;}size--;return ret;}
}
2.加上线程安全和阻塞等待
当进行关键代码时,需要加锁,防止在多线程情况下,误判队列满或者空。队满进行wait操作阻塞执行,直到进行take操作入队时解除阻塞状态,队空时也进行wait操作阻塞执行,直到进行put操作出队时解除阻塞状态,也就是说,两个方法互相唤醒对方,由于在一个队列中不会存在既是满又是空的情况,,所以调用的两个方法不会同时进入阻塞状态。
void put(int elem) throws InterruptedException {//判断是否满了,队列满进入阻塞状态//判满时需要加锁,保证数据真实有效synchronized (this) {if(array.length >= size) {wait();//阻塞等待}}array[last] = elem;last++;if(last >= array.length) {last = 0;}size++;this.notify();}int take() throws InterruptedException {//判断是否为空,为空时进入阻塞//判空时需要加锁,保证数据真实有效synchronized (this) {if(size == 0) {wait();}}int ret = array[head];head++;if(head >= array.length) {head = 0;}size--;this.notify();return ret;}
解除阻塞图片描述如下:
3.解决代码中的问题
wait操作不仅仅会被notify唤醒,还有可能被其他操作唤醒,比如interrupt。也就是说,在进行等待操作时,可能被其他操作终止,然后继续向下执行,这时可以使用while循环搭配条件使用,在线程唤醒之后再次对条件进行判断,队列为空/满将再次进行阻塞,等待真正的唤醒操作。于是代码最终改进为:
public class QueueBlock {private int[] array;private int head;private int last;private int size;void put(int elem) throws InterruptedException {//判断是否满了,队列满进入阻塞状态//判满时需要加锁,保证数据真实有效synchronized (this) {while(array.length >= size) {wait();//阻塞等待}}array[last] = elem;last++;if(last >= array.length) {last = 0;}size++;this.notify();}int take() throws InterruptedException {//判断是否为空,为空时进入阻塞//判空时需要加锁,保证数据真实有效synchronized (this) {while(size == 0) {wait();}}int ret = array[head];head++;if(head >= array.length) {head = 0;}size--;this.notify();return ret;}
}