【Java多线程案例】实现阻塞队列

1. 阻塞队列简介

1.1 阻塞队列概念

阻塞队列:是一种特殊的队列,具有队列"先进先出"的特性,同时相较于普通队列,阻塞队列是线程安全的,并且带有阻塞功能,表现形式如下:

  • 当队列满时,继续入队列就会阻塞,直到有其他线程从队列中取出元素
  • 当队列空时,继续出队列就会阻塞,直到有其他线程往队列中插入元素

基于阻塞队列我们可以实现生产者消费者模型,这在后端开发场景中是相当重要的!

1.2 生产者-消费者模型优势

基于阻塞队列实现的 生产者消费者模型 具有以下两大优势:

  1. 解耦合:

image.png
以搜狗搜索的服务器举例,用户输入搜索关键字 **美容,**客户端的请求到达搜狗的"入口服务器"时,会将请求转发到 广告服务器大搜索服务器,此时广告服务器返回相关广告内容,大搜索服务器根据搜索算法匹配对应结果返回,如果按照这种方式通信,那么入口服务器需要编写两套代码分别同广告服务器和大搜索服务器进行交互,并且一个严重问题是如果其中广告服务器宕机了,会导致入口服务器无法正常工作进而影响大搜索服务器也无法正常工作!!
image.png
而引入阻塞队列后,入口服务器不需要知晓广告服务器和大搜索服务器的存在,只需要往阻塞队列中发送请求即可,而广告服务器和大搜索服务器也不需要知道入口服务器的存在,只需要从阻塞队列中取出请求处理完毕返回给阻塞队列即可,并且当其中大搜索服务器宕机时,不影响其他服务器以及入口服务器的正常运作!

  1. 削峰填谷:

image.png
如果没有阻塞队列,当遇到一些突发场景例如"双十一"大促等客户请求量激增的时候,入口服务器转发的请求量增多,压力就会变大,同理广告服务器和大搜索服务器处理过程复杂繁多,消耗的硬件资源就会激增,达到硬件瓶颈之后服务器就宕机了(直观现象就是客户端发送请求,服务器不会响应了)
image.png
而引入阻塞队列/消息队列之后,由于阻塞队列只负责存储相应的请求或者响应,无需额外的业务处理,因此抗压能力比广告服务器和大搜索服务器更强,当客户请求量激增的时候交由阻塞队列承受,而广告服务器和大搜索服务器只需要按照特定的速率进行读取并返回处理结果即可,就起到了 削峰填谷 的作用!

注意:此处的阻塞队列在现实场景中并不是一个单纯的数据结构,往往是一个基于阻塞队列的服务器程序,例如消息队列(MQ)

2. 标准库中的阻塞队列

2.1 基本介绍

Java标准库提供了现成的阻塞队列数据结构供开发者使用,即BlockingQueue接口
BlockingQueue:该接口具有以下实现类:

  1. ArrayBlockingQueue:基于数组实现的阻塞队列
  2. LinkedBlockingQueue:基于链表实现的阻塞队列
  3. PriorityBlockingQueue:带有优先级的阻塞队列

BlockingQueue方法:该接口具有以下常用方法

  1. 带有阻塞功能:
  • put:向队列中入元素,队列满则阻塞等待
  • take:向队列中取出元素,队列空则阻塞等待
  1. 不带有阻塞功能:
  • peek:返回队头元素(不取出)
  • poll:返回队头元素(取出)
  • offer:向队列中插入元素

2.2 代码示例

/*** 测试Java标准库提供的阻塞队列实现*/
public class TestStandardBlockingQueue {private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);public static void main(String[] args) {// 生产者Thread t1 = new Thread(() -> {int i = 0;while (true) {try {queue.put(i);System.out.println("生产数据:" + i);i++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 消费者Thread t2 = new Thread(() -> {while (true) {try {Thread.sleep(1000);int ele = queue.take();System.out.println("消费数据:" + ele);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}

运行效果
image.png
我们在主线程中创建了两个线程,其中t1线程作为生产者不断循环生产元素,而线程t2作为消费者每隔1s消费一个数据,所以我们很快看到当生产数据个数达到容量capacity时就会继续生产就会阻塞等待,直到消费者线程消费数据后才可以继续入队列,这样就实现了一个 生产者-消费者模型

3. 自定义实现阻塞队列

首先我们需要明确实现一个阻塞队列需要哪些步骤?

  1. 首先我们需要实现一个普通队列
  2. 使用锁机制将普通队列变成线程安全的
  3. 通过特殊机制让该队列能够带有"阻塞"功能

3.1 实现普通队列

相信大家如果学过 数据结构与算法 相关课程,应该对队列这种数据结构的实现并不陌生!实现队列有基于数组的也有基于链表的,我们此处采用基于数组实现的,基于数组实现的循环队列也有以下两种方式:

  1. 腾出一个空间用来判断队列空或者满
  2. 使用额外的变量size用来记录当前元素的个数

我们使用第二种方式实现,实现代码如下:

/*** 自定义实现阻塞队列*/
public class MyBlockingQueue {private int head = 0; // 头指针private int tail = 0; // 尾指针private int size = 0; // 当前元素个数private String[] array = null;private int capacity; // 容量public MyBlockingQueue(int capacity) {this.capacity = capacity;this.array = new String[capacity];}/*** 入队列方法*/public void put(String elem) {if (size == capacity) {// 队列已经满了return;}array[tail] = elem;tail++;if (tail >= capacity) {tail = 0;}size++;}/*** 出队列方法*/public String take() {// 判断队列是否为空if (size == 0) {return null;}String topElem = array[head];head++;if (head >= capacity) {head = 0;}size--;return topElem;}public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(3);queue.put("11");queue.put("22");queue.put("33");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}

3.2 引入锁机制实现线程安全

引入synchronized关键字在原有队列实现的基础上实现线程安全,代码如下:

/*** 自定义实现阻塞队列*/
public class MyBlockingQueue {private int head = 0; // 头指针private int tail = 0; // 尾指针private int size = 0; // 当前元素个数private String[] array = null;private int capacity; // 容量private Object locker = new Object(); // 锁对象public MyBlockingQueue(int capacity) {this.capacity = capacity;this.array = new String[capacity];}/*** 入队列方法*/public void put(String elem) {synchronized (locker) {if (size == capacity) {// 队列已经满了return;}array[tail] = elem;tail++;if (tail >= capacity) {tail = 0;}size++;}}/*** 出队列方法*/public String take() {String topElem = "";synchronized (locker) {// 判断队列是否为空if (size == 0) {return null;}topElem = array[head];head++;if (head >= capacity) {head = 0;}size--;}return topElem;}public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(3);queue.put("11");queue.put("22");queue.put("33");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}

我们在puttake等关键方法上将 多个线程修改同一个变量 部分的操作进行加锁处理,实现线程安全!

3.3 加入阻塞功能

在普通队列的实现中,如果队列满或者空我们直接使用return关键字返回,但是在多线程环境下我们希望实现阻塞等待的功能,这就可以使用Object类提供的wait/notify这组方法实现阻塞与唤醒机制了!我们就需要考虑阻塞与唤醒的时机了!
何时阻塞:这个问题非常简单,当队列满时入队列操作就应该阻塞等待,而当队列为空时出队列操作就需要阻塞等待
何时唤醒:想必大家都可以想到,对于入队列操作来说,只要队列不满就可以被唤醒,而对于出队列操作来说,队列不为空就可以被唤醒,因此,只要有线程调用take操作出队列,那么入队列的线程就可以被唤醒,而只要有线程调用put操作入队列,那么出队列的线程就可以被唤醒

/*** 自定义实现阻塞队列*/
public class MyBlockingQueue {private int head = 0; // 头指针private int tail = 0; // 尾指针private int size = 0; // 当前元素个数private String[] array = null;private int capacity; // 容量private Object locker = new Object(); // 锁对象public MyBlockingQueue(int capacity) {this.capacity = capacity;this.array = new String[capacity];}/*** 入队列方法*/public void put(String elem) {synchronized (locker) {while (size == capacity) {// 队列已经满了(进行阻塞)try {locker.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}array[tail] = elem;tail++;if (tail >= capacity) {tail = 0;}size++;locker.notifyAll();}}/*** 出队列方法*/public String take() {String topElem = "";synchronized (locker) {// 判断队列是否为空while (size == 0) {try {locker.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}topElem = array[head];head++;if (head >= capacity) {head = 0;}size--;locker.notifyAll();}return topElem;}public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(10);// 生产者Thread producer = new Thread(() -> {int i = 0;while (true) {queue.put(i + "");System.out.println("生产元素:" + i);i++;}});// 消费者Thread consumer = new Thread(() -> {while (true) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}String elem = queue.take();System.out.println("消费元素" + elem);}});producer.start();consumer.start();}
}

我们使用wait/notify这组操作实现了阻塞/唤醒功能,并且满足必须使用在synchronized关键字内部的使用条件,这里有一个注意点

为什么我们将if判断条件改成了while循环呢???这是需要考虑清楚的!

image.png
如图所示:一开始由于队列满所以生产者1进入阻塞状态,释放锁,然后生产者2也进入阻塞状态释放锁,此时消费者消费一个元素后唤醒生产者1,然后生产者1生产一个元素后(记住此时队列已满)继续唤醒,但是此时唤醒的恰恰是 生产者2 ,生产者2继续执行生产元素,于是就出现问题,我们总结一下出现问题的原因:

  1. notifyAll是随机唤醒,无法指定唤醒线程,因此可能出现生产者唤醒生产者,消费者唤醒消费者的情况
  2. if判定条件一经执行就无法继续判定,所以生产者2被唤醒后没有再次判断当前队列是否满

于是我们的应对策略就是使用while循环,当线程被唤醒使重新判断,如果队列仍满,入队列操作继续阻塞,而队列仍空,出队列操作继续阻塞!Java标准也推荐我们使用 while 关键字和 wait 关键字一起使用!
image.png

4. 应用场景(实现生产者消费者模型)

我们继续基于我们自定义实现的阻塞队列再来实现 生产者-消费者模型
代码示例(主函数)

public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(10);// 生产者Thread producer = new Thread(() -> {int i = 0;while (true) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}queue.put(i + "");System.out.println("生产元素:" + i);i++;}});// 消费者Thread consumer = new Thread(() -> {while (true) {String elem = queue.take();System.out.println("消费元素" + elem);}});producer.start();consumer.start();
}

运行效果
image.png
此时我们创建两个两个线程,producer作为生产者线程每隔1s生产一个元素,consumer作为消费者线程不断消费元素,此时我们看到的就是消费者消费很快,当阻塞队列空时就进入阻塞状态,直到生产者线程生产元素后才被唤醒继续执行!此时我们真正模拟实现了 阻塞队列 这样的数据结构!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/677711.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

fast.ai 深度学习笔记(四)

深度学习 2&#xff1a;第 2 部分第 8 课 原文&#xff1a;medium.com/hiromi_suenaga/deep-learning-2-part-2-lesson-8-5ae195c49493 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 来自 fast.ai 课程的个人笔记。随着我继续复习课程以“真正”理解它&#xff0c;这…

4核8g服务器能支持多少人访问?- 腾讯云

腾讯云轻量4核8G12M轻量应用服务器支持多少人同时在线&#xff1f;通用型-4核8G-180G-2000G&#xff0c;2000GB月流量&#xff0c;系统盘为180GB SSD盘&#xff0c;12M公网带宽&#xff0c;下载速度峰值为1536KB/s&#xff0c;即1.5M/秒&#xff0c;假设网站内页平均大小为60KB…

HiveQL——不借助任何外表,产生连续数值

注&#xff1a;参考文章&#xff1a; HiveSql一天一个小技巧&#xff1a;如何不借助其他任何外表&#xff0c;产生连续数值_hive生成连续数字-CSDN博客文章浏览阅读1.3k次。0 需求描述输出结果如下所示&#xff1a;12345...1001 问题分析方法一&#xff1a;起始值&#xff08;…

【ASP.NET Core 基础知识】--部署和维护--日志记录和错误处理

一、日志记录(Logging) 1.1 日志记录的概念 日志记录是一种记录系统运行状态、活动和事件的重要机制。在软件开发和系统管理中&#xff0c;日志记录扮演着关键角色&#xff0c;用于追踪应用程序的执行过程、监视系统的健康状况、诊断问题和安全审计等。在ASP.NET Core等现代W…

C#在窗体正中输出文字以及输出文字的画刷使用

为了在窗体正中输出文字&#xff0c;需要获得输出文字区域的宽和高&#xff0c;这使用MeasureString方法&#xff0c;方法返回值为Size类型&#xff1b; 然后计算输出的起点的x和y坐标&#xff0c;就可以输出了&#xff1b; using System; using System.Collections.Generic; …

基于css-vars-ponyfill实现换肤

文章目录 一、换肤二、换肤调研2.1、ElementUI2.2、ant.design 三、换肤痛点和思考四、换肤架构五、换肤技术选型和实现5.1、该方案的亮点和规则5.2、核心原理5.3、色组 & 色值平台设计5.4、获取在当前主题自定义变量颜色 六、总结七、最后 一、换肤 网站或者应用一键切换…

最简单的基于 FFmpeg 的音频编码器(PCM 编码为 AAC)

最简单的基于 FFmpeg 的音频编码器&#xff08;PCM 编码为 AAC&#xff09; 最简单的基于 FFmpeg 的音频编码器&#xff08;PCM 编码为 AAC&#xff09;正文结果工程文件下载其他参考链接 最简单的基于 FFmpeg 的音频编码器&#xff08;PCM 编码为 AAC&#xff09; 参考雷霄骅…

Android SystemConfig相关

SystemConfig在哪里初始化 它声明在PackageManagerService类的静态方法main()中。在该方法中间定义Injector类对象时&#xff0c;作为它的构造参数。它是调用的SystemConfig.getInstance()实现初始化&#xff0c;之后能通过Injector类对象的getSystemConfig()得到SystemConfig类…

MongoDB聚合:$replaceWith

r e p l a c e W i t h ‘ 可以将输入文档替换为指定的文档。该操作可以替换输入文档的所有字段&#xff0c;包括 ‘ i d ‘ 字段。使用 ‘ replaceWith可以将输入文档替换为指定的文档。该操作可以替换输入文档的所有字段&#xff0c;包括_id字段。使用 replaceWith‘可以将输…

nginx添加lua模块

目录 已安装了nginx&#xff0c;后追加lua模块nginx 重新编译知识参考&#xff1a; 从零安装一、首先需要安装必要的库&#xff08;pcre、zlib、openssl&#xff09;二、安装LUA环境及相关库 &#xff08;LuaJIT、ngx_devel_kit、lua-nginx-module&#xff09;注意&#xff1a;…

Python和Java的区别(不断更新)

主要通过几个方面区分Python和Java&#xff0c;让大家有一个对比&#xff1a; 语言类型 Java是一种静态类型、编译型语言。 Python是一种动态类型、解释型语言&#xff0c;注重简洁和灵活的语法。 语法 在Java中&#xff0c;变量需要显式地声明&#xff0c;指定其类型。例如&am…

STM32——OLED(2)

目录 一、OLED显示屏介绍 引脚说明&#xff1a; 二、OLED驱动 1. 基本认识 2. OLED 驱动原理 及过程 三、SSD1306工作时序 (8080时序&#xff09; 1. 8080并口读/写过程 2. SSD1306工作时序 (8080时序) 四、屏幕显示 1. GRAM 补&#xff1a; 2. 画点原理 3. 显示字…

一周学会Django5 Python Web开发-Django5创建项目(用PyCharm工具)

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计11条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

【C++从0到王者】第四十一站:特殊类的设计

文章目录 一、设计一个类&#xff0c;不能被拷贝1.C98方法2.C11方法 二、设计一个类&#xff0c;只能在堆上创建对象1.析构函数私有化2.构造函数私有化 三、请设计一个类&#xff0c;只能在栈上创建对象四、设计一个类不能被继承1.C98方式2.C11方式 五、设计一个类&#xff0c;…

【JavaScript 漫游】【013】Date 对象知识点摘录

文章简介 本文为【JavaScript 漫游】专栏的第 013 篇文章&#xff0c;记录了 JS 语言中 Date 对象的重要知识点。 普通函数的用法构造函数的用法日期的运算静态方法&#xff0c;包括&#xff1a;Date.now()、Date.parse() 和 Date.UTC()实例方法&#xff0c;包括&#xff1a;…

H5/CSS 笔试面试考题(31-40)

简述CSS 样式&#xff0c;边距&#xff1a; 10px 20px 40px 30px &#xff1b;哪一个是底边距 ( ) A&#xff1a;10px B&#xff1a;20px C&#xff1a;40px D&#xff1a;30px 面试通过率&#xff1a;45.0% 推荐指数&#xff1a; ★★★ 试题难度&#xff1a; 初级 试题类型&a…

JavaScript的原型与继承

原型 原型 prototype&#xff0c;我们所创建的每一个实例&#xff0c;解析器都会向这个函数中添加一个prototype&#xff0c;属性&#xff0c;这个属性会对应这个一个对象&#xff0c;这个对象就是原型对象&#xff08;显式原型&#xff09;&#xff0c;原型对象就相当于一个公…

156基于Matlab的光纤陀螺随机噪声和信号

基于Matlab的光纤陀螺随机噪声和信号&#xff0c;利用固定步长和可调步长的LMS自适应滤波、最小二乘法、滑动均值三种方法进行降噪处理&#xff0c;最后用阿兰方差评价降噪效果。程序已调通&#xff0c;可直接运行。 156 信号处理 自适应滤波 降噪效果评估 (xiaohongshu.com)

YOLOv5改进 | 融合改进篇 | 华为VanillaNet + BiFPN突破涨点极限

一、本文介绍 本文给大家带来的改进机制是华为VanillaNet主干配合BiFPN实现融合涨点,这个主干是一种注重极简主义和效率的神经网络我也将其进行了实验, 其中的BiFPN不用介绍了从其发布到现在一直是比较热门的改进机制,其主要思想是通过多层级的特征金字塔和双向信息传递来提…

[office] Excel自带的编辑函数求和方法 #其他#媒体

Excel自带的编辑函数求和方法 今天小编为大家分享Excel自带的编辑函数求和方法&#xff0c;方法很简单的&#xff0c;对于不是很熟悉excel表格的朋友可以参考一下&#xff0c;希望能对大家有所帮助 很多同学以及上班族需要大量使用Excel这款表格编辑器&#xff0c;当表格中有大…