【阻塞队列】阻塞队列的模拟实现及在生产者和消费者模型上的应用

文章目录

  • 📄前言
  • 一. 阻塞队列初了解
    • 🍆1. 什么是阻塞队列?
    • 🍅2. 为什么使用阻塞队列?
    • 🥦3. Java标准库中阻塞队列的实现
  • 二. 阻塞队列的模拟实现
    • 🍚1. 实现普通队列
    • 🍥2. 实现队列的阻塞功能
    • 🧊3. 解除阻塞状态
  • 三. 使用模拟的阻塞队列验证生产者和消费者模型

📄前言

本文是对阻塞队列的应用场景的介绍,对阻塞队列的作用以及具体实现的讨论。


一. 阻塞队列初了解

🍆1. 什么是阻塞队列?

阻塞队列是一种带有阻塞功能的“先进先出”线性表。即在一个带有最大容量的队列中,在某时刻队列容量已满时继续入队 或 队列为空时继续出队,就会进入阻塞等待状态,直到队列变为 未满或非空 便解除阻塞状态,继续入队或出队。

🍅2. 为什么使用阻塞队列?

若存在以下简易的分布式系统:
在这里插入图片描述
上述分布式系统虽然能完成客户端与服务器端的交互需求,但可能存在以下问题:

  1. 在正常情况下,用户可以通过客户端想服务器发起请求并获取相应的服务,但假如在某刻服务器A突然出现了故障,与服务器A直接通信的服务器B也可能因此出现故障,导致整个服务瘫痪。
  2. 若未来想增加 更多的服务器 来处理服务器A发起的请求,则需求对 服务器A 的接口 进行一定的改动,付出一定的时间和人力成本。
  3. 当某个时刻,很多的客户端同时向 服务器A 发起请求,作为与用户直接交互的服务器,服务器A具备承载这些并发量的能力,但服务器集群中负责其他功能的服务器接收请求的承载能力可能较弱,此时可能造成其他服务器的崩溃。

造成上述现象的原因可以归结为以下两点:

  1. 模块间的耦合性较高(例如问题1和2)
  2. 承载能力较弱的模块不具备抗冲击能力。(例如问题3)

上述的解决方法是在服务器之间加入一个阻塞队列,利用生产者和消费者模型解决以上问题。
什么是生产者消费者模型呢?(如下图)
在这里插入图片描述

当服务器A接收来自客户端的请求时,不把请求直接发给服务器B,而是将请求数据加入到队列中,服务器B通过队列接收请求并把请求除了的结果返回给A。


当上述分布式系统引入阻塞队列后工作模式如下图所示:
在这里插入图片描述

引入阻塞队列的好处:

  1. 解耦合。当服务器A或服务器B出现问题时,就不会对其他服务器造成直接的影响;当需要添加新的服务器来处理这些请求时,新的服务器也同样只需从队列中取数据,无需对原有服务器的接口(代码)进行任何的改动。
  2. 削峰填谷”。当服务器A 瞬间接收客户端发来的大量请求时,由于服务器B处理请求的速度较慢,剩余的请求会在阻塞队列里面堆积,虽然客户端获取服务的时间相对增加了,但一定程度上缓解了其他承受并发量能力较弱的服务器的压力。

🥦3. Java标准库中阻塞队列的实现

在这里插入图片描述

BlockingQueue的主要方法:
在这里插入图片描述
方法演示如下:(使用普通入队方法入队4次,再使用带有阻塞的出队方法出队4次)

public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> q = new ArrayBlockingQueue<>(3);System.out.println("数据 5 入队状态: " + q.offer(5));System.out.println("数据 6 入队状态: " + q.offer(6));System.out.println("数据 7 入队状态: " + q.offer(7));System.out.println("数据 8 入队状态: " + q.offer(8));System.out.print("队列中的数据: ");System.out.println(q);System.out.println("数据出队: ");for (int i = 0; i < 4; i++) {System.out.print(q.take() + " ");}System.out.println("程序结束 !");
}

在这里插入图片描述

可以发现,当调用 take()方法取出队列元素时,因为队列最终为空,程序进入了阻塞状态,没有打印“程序结束”。


二. 阻塞队列的模拟实现

🍚1. 实现普通队列

阻塞队列的关键方法是两个带有阻塞功能的 put() 和 take()方法,而这两个方法是在原有出入队方法上使用 Object类 带有wait()方法 和 notify() 方法让线程进入等待状态 或 唤醒线程。
因此,我们可以先把基础的队列进行实现,随后在原有基础上进行修改。队列可以使用数组(环形队列)或链表两种方式实现,这里我采用数组的方式实现队列。(由于队列的实现方法较为常见,这里直接给出实现代码)

class MyBlockingQueue<E> {private Object[] elem;private int defaultCapacity = 11;	// 阻塞队列默认容量private int front;	// 记录队头元素位置private int rear;	// 记录队尾元素位置private int size;   // 用于记录当前队列元素的实际个数public MyBlockingQueue(){this.elem = new Object[defaultCapacity + 1];}public MyBlockingQueue(int capacity) {defaultCapacity = capacity;this.elem = new Object[defaultCapacity + 1];}public boolean offer(E val) {// 判断队列是否已满if (size == defaultCapacity) {return false;}elem[rear] = val;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);return true;}public E poll() {// 判断队列是否为空if (front == rear) {return null;}Object ret = elem[front];size--;// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);return (E)ret;}
}

🍥2. 实现队列的阻塞功能

当阻塞队列容量已满时,调用 put() 方法会进入阻塞状态,因此在原先 offer()方法判断的基础上,我们需要使用 wait()方法 让线程进入阻塞等待状态,考虑到可能有多个线程同时调用 put()方法,可能会引起线程安全问题,因此我们应在 if()判断条件和整个修改操作上 加锁(或者直接在方法上加锁)。(代码如下)

public void put (E value) throws InterruptedException {// 判断队列是否已满synchronized (this) {if (size == defaultCapacity) {// 队列进入阻塞状态, 直到有元素出队时 解除阻塞this.wait();}queue[rear] = value;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);}
}

当队列为空时,调用 take() 方法会使线程进入阻塞状态,同理若判空条件成立,我们需要调用 wait() 方法使线程进入阻塞,为防止多个线程在队列即将为空时同时调用 take() 方法引发线程安全问题,我们需要在 if()判断语句 和 整个修改操作 进行加锁操作(或者直接在方法上加锁)。(代码如下)

public E take() throws InterruptedException {// 判断队列是否为空synchronized (this) {if (rear == front) {// 队列进入阻塞状态,直到有新的元素入队时 解除阻塞this.wait();}Object ret = queue[front];// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);size--;return (E)ret;}
}

🧊3. 解除阻塞状态

什么情况下队列会接触阻塞状态呢?

  1. 当队满时,某个线程从阻塞队列取出一个元素,即执行完出队操作后,需要使用 notify()方法 唤醒因执行 put()方法而阻塞的线程。
  2. 当队空时,某个线程向队列新增一个元素,即执行完入队操作后,需要使用 notify()方法唤醒因执行 take()方法而阻塞的线程。

对 put()方法和take()方法 修改后代码如下:

public void put (E value) throws InterruptedException {// 判断队列是否已满synchronized (this) {if (size == defaultCapacity) {// 队列进入阻塞状态, 直到有元素出队时 解除阻塞this.wait();}queue[rear] = value;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);// 此处的 notify 用来唤醒 队列为空时的 waitthis.notify();}
}public E take() throws InterruptedException {// 判断队列是否为空synchronized (this) {if (rear == front) {// 队列进入阻塞状态,直到有新的元素入队时 解除阻塞this.wait();}Object ret = queue[front];// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);size--;// 此处的 notify 用来唤醒 队列为满时的 waitthis.notify();return (E)ret;}
}

三. 使用模拟的阻塞队列验证生产者和消费者模型

为了方便看到效果,我们假设阻塞队列的容量为2,并将生产与消费的数据进行打印。
当生产者与消费者处理数据的频率一样,且生产速率为 次/1s、消费速率为 次/1s 时,程序的生产与消费数据应轮流打印:(模拟代码和程序运行结果如下)

public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);// 生产者Thread producer = new Thread(() -> {for (int i = 0; i < 5; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 消费者Thread consumer = new Thread(() -> {for (int i = 0; i < 5; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}

在这里插入图片描述

当生产速率 > 消费速率,且生产速率为 次/1s、消费速率为 次/2s 时:可以预估到,当经过5s后程序会因队满进入阻塞状态,且后续每消费一次伴随着一次生产,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)

public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);Thread producer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}

在这里插入图片描述

当生产速率 < 消费速率,且生产速率为 次/2s、消费速率为 次/1s 时:可以预估到,当经过2s后程序会因队满进入阻塞状态,且后续每生产一次伴随着一次消费,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)

public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);Thread producer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}

在这里插入图片描述


以上就是本篇文章的全部内容了,如果这篇文章对你有些许帮助,你的点赞、收藏和评论就是对我最大的支持。
另外,文章可能存在许多不足之处,也希望你可以给我一点小小的建议,我会努力检查并改进。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/650432.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

立创EDA学习:设计收尾工作

布线整理 ShiftM&#xff0c;关闭铺铜显示 调整结束后再使用快捷键”ShiftM“打开铺铜 过孔 在空白区域加上一些GND过孔&#xff0c;连接顶层与底层的铺铜。放置好”过孔“后&#xff0c;隐藏铺铜&#xff0c;观察刚才放置的过孔有没有妨碍到其他器件 调整铺铜 先打开铺铜区&…

《Vue3 基础知识》 Vue2+ElementUI 自动转 Vue3+ElementPlus(GoGoCode)

前言 GoGoCode 一个基于 AST 的 JavaScript/Typescript/HTML 代码转换工具。 AST abstract syntax code 抽象语法树。 实现 第一步&#xff1a;安装 GoGoCode 插件 全局安装最新的 gogocode-cli 即可 npm i gogocode-cli -g查看版本 gogocode-cli -V相关插件说明 插件描述…

【多态】10分钟大白话讲解Java中的多态

1.多态的概念 多态(polymorphism)本来是生物学里的概念&#xff0c;表示地球上的生物在形态和状态方面的多样性。 而在java的面向对象中&#xff0c;多态通俗点说就是多种形态&#xff0c;具体点就是去完成某个行为&#xff0c;当不同的对象去完成时会产生不同的状态。也就是说…

先进计算产业促湖南数字经济规模突破1.7万亿元

中新网湖南新闻1月26日电 (于冬阳 周沁怡)“截至2023年底&#xff0c;集群汇聚了中电长城、飞腾、麒麟等1400余家企业&#xff0c;产值达1800亿元。”1月26日&#xff0c;国家新一代自主安全计算系统产业集年度工作推进会在长沙举行&#xff0c;国家新一代自主安全计算系统产业…

python-分享篇-使用海龟turtle模块实现幸福大转盘

文章目录 准备代码效果 准备 一、根目录下放图片 代码 from turtle import * import turtle from random import randint import sys #屏幕初始化 screen turtle.Screen() screen.title("幸运大转盘 转转转~") screen.setup(480,450) screen.bgpic("转盘.png…

Android底部导航栏创建——ViewPager + RadioGroup

Android底部导航栏有多种实现方式&#xff0c;本文详解其中的ViewPager RadioGroup方式的实现步骤。 我们先来看以下看一下最终做出的效果&#xff0c;使大家有个基本概念。 本结构特点&#xff1a; 1&#xff0c;ViewPager部分触摸左右滑动切换页面&#xff0c;RadioGroup部…

怎么获取二维码的链接?二维码转链接只需3步

怎么从二维码中提取内容呢&#xff1f;现在很多内容都会用二维码方式来存储&#xff0c;但是有些场景下二维码是无法使用的时候&#xff0c;想要查看二维码中的内容&#xff0c;就需要分解二维码成链接后使用。那么二维码分解成链接具体该怎么做呢&#xff1f;今天就将在线二维…

计算机组成原理--4.指令系统

一.指令格式 二.指令分类

【笔记】顺利通过EMC试验(16-41)-视频笔记

目录 视频链接 P1:电子设备中有哪些主要骚扰源 P2:怎样减小DC模块的骚扰 P3:PCB上的辐射源究竟在哪里 P4:怎样控制PCB板的电磁辐射 P5:多层线路板是解决电磁兼容问题的简单方法 P6:怎样处理地线上的裂缝 P7:怎样降低时钟信号的辐射 P8:为什么IO接口的处理特别重要 P9…

数据结构——用链表实现Map

目录 一、映射&#xff08;Map&#xff09; 二、代码实现 1.建立接口 2.方法实现 &#xff08;1&#xff09;映射的建立 键&#xff08;key&#xff09;和值&#xff08;val&#xff09;的建立 重写toString方法 &#xff08;2&#xff09;构造方法 &#xff08;3&…

102.乐理基础-五线谱-高音谱号

内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;五线谱的构造、谱号是什么-CSDN博客 谱号一共需要学习和了解四种&#xff0c;如下图&#xff1a;要牢牢掌握的是高音谱号和低音谱号这两种&#xff0c;如图1所示 首先高音谱号&#xff1a; 它大致范围&#xff0c;…

Vue3中的ref和shallowRef、reactive和shallowReactive

一&#xff1a;ref、reactive简介 ref和reactive是Vue3中定义响应式数据的一种方式。ref通常用来定义基础类型数据。reactive通常用来定义复杂类型数据。 二、shallowRef、shallowReactive简介 shallowRef和shallowReactive是Vue3中定义浅层次响应式数据的方式 三、Api使用对比…

Mac中java jdk、android sdk、flutter sdk目录

1、Java JDK 目录 &#xff08;1&#xff09;官网下载的 Java JDK Java JDK下载官网 /Library/Java/JavaVirtualMachines&#xff08;2&#xff09;Android Studio下载的 Java JDK /Users/用户名/Library/Java/JavaVirtualMachines2、Android SDK 目录 /Users/用户名/Libr…

ansible处理多台机器部署基础环境

本次以多台机器需部署zabbix客户端为例&#xff1a; 机器先做免密互信&#xff0c;ansible主机上执行ssh-keygen,一路回车&#xff0c;然后将公钥发送给需管理的主机&#xff1a; ssh-copy-id rootIP 1、编辑hosts文件&#xff0c;添加需配置的主机IP&#xff0c;并测试连通…

Opencv(C++)学习 TBB与OPENMP的加速效果实验与ARM上的实践

背景&#xff1a;在某个嵌入式上的图像处理项目功能开发告一段落&#xff0c;进入性能优化阶段。尝试从多线程上对图像处理过程进行加速。经过初步调研后&#xff0c;可以从OPENMP&#xff0c;TBB这两块进行加速&#xff0c;当前项目中有些算法已采用多线程加速&#xff0c;这次…

【蓝桥杯冲冲冲】[NOIP2000 提高组] 方格取数

蓝桥杯备赛 | 洛谷做题打卡day19 文章目录 蓝桥杯备赛 | 洛谷做题打卡day19[NOIP2000 提高组] 方格取数题目背景题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示题解代码我的一些话 [NOIP2000 提高组] 方格取数 题目背景 NOIP 2000 提高组 T4 题目描述 设有 N N…

如何用甘特图跟踪项目进度

甘特图是一个简单但是极其强大的项目管理工具,能够清晰可视化复杂项目的进度,在项目跟踪和控制上发挥重要作用。任何一个严肃的项目组织者都会使用甘特图来规划和管理项目中的任务。 甘特图的纵坐标表示项目的各项活动或任务,横坐标表示项目的时间进度。每个任务用一条横条表示…

使用vs2022将.net8的应用程序发布为一个单独文件

在使用.NetCore3.1时&#xff0c;可以通过设置以下工程配置文本来将项目发布为一个单独的应用程序文件&#xff1a; <Project Sdk"Microsoft.NET.Sdk.WindowsDesktop"><PropertyGroup><TargetFramework>netcoreapp3.1</TargetFramework><…

LLM应用开发与落地:基于上下文的文本信息检测与提取

最近一直用LLM解决各种各样的问题&#xff0c;感觉已经脱离不了LLM了。每次使用LLM解决一个之前解决不了的问题&#xff0c;或者大大提升我的工作效率的时候&#xff0c;我内心都小小会激动一下。我想这是只通过看文章或只是研究AI理论感受不到的小确幸。我也因此更加确信LLM是…

Kotlin MultiPlatform:构建跨平台应用的未来

Kotlin MultiPlatform&#xff1a;构建跨平台应用的未来 1 引言 1.1 Kotlin MultiPlatform简介 Kotlin MultiPlatform&#xff08;简称KMP&#xff09;是一种由JetBrains开发的跨平台开发解决方案&#xff0c;它建立在Kotlin语言之上。KMP允许开发者使用一套Kotlin代码来构建…