🌈🌈🌈今天给大家分享的是——阻塞队列的自定义实现,通过自定义实现一个阻塞队列,可以帮助我们更清晰、更透彻的理解阻塞队列的底层原理。
清风的CSDN博客
🛩️🛩️🛩️希望我的文章能对你有所帮助,有不足的地方还请各位看官多多指教,大家一起学习交流!🛩️🛩️🛩️
✈️✈️✈️动动你们发财的小手,点点关注点点赞!在此谢过啦!哈哈哈!✈️✈️✈️
目录
一、阻塞队列的作用
二、阻塞队列实现
2.1 普通队列实现
2.1.1 构造方法
2.1.2 入队列
2.1.3 出队列
2.2 阻塞队列实现
2.2.1 保证线程安全
2.2.2 保证内存可见性
2.2.3 阻塞功能的实现
三、基于自定义阻塞队列,模拟生产者消费者模型
一、阻塞队列的作用
一个分布式系统中,会经常出现这样的情况:有的机器能承担的压力更大,有的能承担的压力更小:
如果按照生产者消费者模型,那就另当别论了。
假设此时通过队列来让A和B进行交互:
二、阻塞队列实现
2.1 普通队列实现
在实现阻塞队列之前,我们先把普通的队列(基于数组的循环队列)进行一个简单的实现,然后通过进一步的改进,把普通的队列改造成一个阻塞队列。
class MyBlockingQueue{private int[] items;private int head = 0;//队列头指针private int tail = 0;//队列尾指针private int size = 0;//队列当前元素个数public MyBlockingQueue(){}//入队列public void put(int elem){}//出队列public int take(){}}
2.1.1 构造方法
public MyBlockingQueue(){this.items = new int[100];}
2.1.2 入队列
//入队列public void put(int elem){if (size >= items.length){return;}items[tail] = elem;if (tail >= items.length){//判断尾指针是否到达末尾tail = 0;}tail++;size++;}
2.1.3 出队列
//出队列public int take(){if(size == 0){return -1;}int elem = items[head];if (head >= items.length){head = 0;}head++;size--;return elem;}
2.2 阻塞队列实现
现在,我们就把上面的队列改造成阻塞队列。
2.2.1 保证线程安全
在当前的代码下,如果是多线程的情况,调用put或者take,这两个方法中都涉及到了对变量的修改,这样就会出现线程安全问题。这就需要我们进行加锁。
//入队列public void put(int elem){synchronized (this){if (size >= items.length){return;}items[tail] = elem;if (tail >= items.length){tail = 0;}tail++;size++;}}
//出队列public int take(){synchronized (this){if(size == 0){return -1;}int elem = items[head];if (head >= items.length){head = 0;}head++;size--;return elem;}}
2.2.2 保证内存可见性
光加锁就够吗?我们可以看到,多线程的情况下,不光是对变量进行修改,还有读操作等等,那就有可能出现一个线程在读,另外一个线程在修改,这个读的线程没有读到。所以,此处除了加锁之外,还需要考虑内存可见性问题。也就是说,当其他线程进行修改的时候,我们要保证当前线程可以读到这个修改,所以我们把变量加上volatile关键字。
volatile private int head = 0;volatile private int tail = 0;volatile private int size = 0;
2.2.3 阻塞功能的实现
解决了上述问题后,我们就需要考虑一下如何实现阻塞功能了。
实现阻塞有两方面:
- 当队列满的时候,再进行put(入队),就会产生阻塞。阻塞到队列中元素出队后,就去唤醒当前因队列满而被阻塞的状态。
- 当队列空的时候,再进行take(出队),就会产生阻塞。阻塞到队列中有元素入队时,去唤醒当前因队列空而被阻塞的状态。
//入队列public void put(int elem) throws InterruptedException {synchronized (this){while (size >= items.length){//队列满了//return;this.wait();}items[tail] = elem;if (tail >= items.length){tail = 0;}tail++;size++;//成功入队this.notify();//唤醒因队列空而被阻塞的状态}}//出队列public int take() throws InterruptedException {synchronized (this){while (size == 0){//队列空//return -1;this.wait();}int elem = items[head];if (head >= items.length){head = 0;}head++;size--;this.notify();//使用这个notify唤醒队列满的阻塞状态return elem;}}}
好了,经过上面的改进,我们就已经实现了一个简单的阻塞队列,下面是改进后的完整代码:
class MyBlockingQueue{private int[] items;volatile private int head = 0;volatile private int tail = 0;volatile private int size = 0;public MyBlockingQueue(){this.items = new int[100];}//入队列public void put(int elem) throws InterruptedException {synchronized (this){while (size >= items.length){//队列满了//return;this.wait();}items[tail] = elem;if (tail >= items.length){tail = 0;}tail++;size++;//成功入队this.notify();//唤醒因队列空而被阻塞的状态}}//出队列public int take() throws InterruptedException {synchronized (this){while (size == 0){//队列空//return -1;this.wait();}int elem = items[head];if (head >= items.length){head = 0;}head++;size--;this.notify();//使用这个notify唤醒队列满的阻塞状态return elem;}}}
三、基于自定义阻塞队列,模拟生产者消费者模型
实现阻塞队列之后,我们利用阻塞队列简单模拟一下生产者消费者模型:
public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();//生产者线程Thread product = new Thread(()->{int count = 0;while (true){try {queue.put(count);System.out.println("生产元素:>"+count);count++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});//消费者线程Thread consummer = new Thread(()->{while (true){try {int elem = queue.take();System.out.println("消费元素:>"+elem);} catch (InterruptedException e) {e.printStackTrace();}}});product.start();consummer.start();}
运行结果:
🌈🌈🌈好啦,今天的分享就到这里!
🛩️🛩️🛩️希望各位看官读完文章后,能够有所提升。
🎉🎉🎉创作不易,还希望各位大佬支持一下!
✈️✈️✈️点赞,你的认可是我创作的动力!
⭐⭐⭐收藏,你的青睐是我努力的方向!
✏️✏️✏️评论:你的意见是我进步的财富!