【Java EE初阶七】多线程案例(阻塞队列与生产者消费者模型)

1. 阻塞队列

        队列是先进先出的一种数据结构;

        阻塞队列,是基于队列,做了一些扩展,适用于多线程编程中;

阻塞队列特点如下:

        1、是线程安全的

        2、具有阻塞的特性

                2.1、当队列满了时,就不能往队列里放数据,就会阻塞等待,等队列中的数据出队列后,导致队列没满时,才能放数据。

                2.2、当当队列空了时,就不能从队列里拿数据,就会阻塞等待,等有数据进入队列后,导致队列不为空时,才能拿数据。

        由于阻塞队列的用处非常大,基于阻塞队列的功能,我们就可以实现多线程案例的第三种案例~ 生产消费者模型(其实描述的就是一种多线程编程的方法),引入生产者消费者模型(尤其是后端开发),生产者往队列中写入数据,消费者从队列中消费数据;

        阻塞队列总的来说就是由于前后执行顺序的线程由于一方面的速度过快,另外一方面的速度过慢,而导致整体的执行顺序出现不流畅的画面(快的线程为了使自己的产出能被另外一方面合理的消化),该方面线程不得不阻塞,等待另外一方面将产能消化之后,继续执行线程,制造产能;

2. 生产者消费者模型

        生产者消费者模型是一种很朴素的概念,描述的是一种多线程编程的方法。

 2.1 引入生产者消费者模型的意义

        2.1.1 解耦合

        引入该模型,就可以更好的做到“解耦合”(把代码的耦合程度,从高将到低-->就称为解耦合)

        在实际开发中,会涉及到 “分布式系统” ,服务器的整个功能不是由一个服务器实现的,而是由多个服务器组成,各自实现各自的一部分功能,再通过网络通信,把这些服务器联系起来,最终完成整个服务器的功能。典型分布式例子通过下图来进行简单的讲解:

        如上图所示,在该模型中入口服务器A与B、入口服务器A与C服务器的联系是密切相关的,请求要经过入口服务器A,才能传达给B、C服务器,即B、C服务器拿到想要的数据,再返回给入口服务器A,通过入口服务器A,再把响应传给客户端。

        但是如果请求突然骤升,这时超过入口服务器A接收请求的峰值,这时入口服务器A就挂了,入口服务器A挂了后,B、C服务器拿不到请求,也会挂掉,这就体现了入口服务器A和B、C服务器的耦合性比较高。

        当然如果B或C挂了的话,A大概率也会挂;

        当我们在入口服务器A和B、C服务器之间引入阻塞队列时,如下图所示:

        如上图所示,如果入口服务器A挂了,但是阻塞队列中还有请求的数据,至少不会因入口服务器挂A了,B、C服务器也挂了

        故此,入口服务器A和B、C、D服务器的耦合性也就降低了。

        上述描述的阻塞队列,并非是简单的数据结构,而是基于这个数据结构实现的服务器程序,且被部署到单独的主机上来;

2.1.2 削峰填谷

       如上图所示:当客户端这边的请求突然骤增时,入口服务器A一般来说是比较能抗压的,但是也是有极限的,这时我们引入阻塞队列,可以把这些请求数据都放进阻塞队列中,形成一个缓冲区,如此一来,即使外面的请求达到了峰值,也是由阻塞队列来承担,这样就形成了削峰填谷的效果。

        关于阻塞队列和消息队列的区别:

        阻塞队列:数据结构

        消息队列:基于阻塞队列实现服务器程序

3. 手敲代码模拟实现阻塞队列

3.1 了解阻塞队列

      java标准库提供了现成的阻塞队列这一数据结构,如下图所示:

        

        阻塞队列是基于队列扩展而来的,且在阻塞队列中,put是在具备阻塞功能的入队列操作,take方法是带阻塞功能的出队列操作,阻塞队列没有提供带有阻塞功能获取首元素的方法;

        java自带的阻塞队列的代码实现如下:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Main {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);blockingQueue.put("smallye");String s1 = blockingQueue.take();System.out.println("第一个打印:s1 = " + s1);s1 = blockingQueue.take();System.out.println("第二个打印:s1 = " + s1);}
}

        结果如下:

        问题分析:

        主要是线程卡住了,当进行第二次出队列时,由于当前阻塞队列是空的,所以要等进行阻塞等待,当有元素入队列时,我们才能进行出队列操作。

3.2 实现阻塞队列

        我们尝试实现一个阻塞队列,要求达到与标准库中的队列有着类似的效果;

        步骤如下:

        1、先实现普通队列

        2、再加上线程安全

        3、再加上阻塞功能

3.2.1 先实现普通队列

        代码如下所示:

// 为了简单, 不写作泛型的形式. 考虑存储的元素就是单纯的 String
class MyBlockingQueue {private String elems[] = null;private int head = 0;//记录头结点private  int tail = 0;//记录尾结点private int size = 0;//队列元素个数//构造方法,定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}//入队列public void put(String elem) {//判断容量满了没,满了就不能入队列,要阻塞等待if(size >= this.elems.length) {//阻塞等待,先不写,先实现普通功能的队列return;}//入队列elems[tail] = elem;tail++;//因为是循环队列,所以要判断尾巴有没有超过容量大小下标,超过了就要从0开始了if(tail > elems.length) {tail = 0;}//队列元素要++size++;}//出队列public String take() {String elem = null;//要判断队列是不是空的,空就不能出队列了,要阻塞等待if(size == 0) {//阻塞等待,因为是先实现普通队列的功能,所以后面再补充return null;}elem = elems[head];head++;//因为是循环队列,所以要判断头结点有没有超过容量大小下标,超过了就要0开始了if(head >= elems.length) {head = 0;}//出队列后,队列元素要--size--;return elem;}
}

        测试代码及结果如下:

public class Main {public static void main(String[] args) {MyBlockingQueue blockingQueue = new MyBlockingQueue(10);blockingQueue.put("smallye");String s1 = blockingQueue.take();System.out.println("第一个打印:s1 = " + s1);}
}

3.2.2 再加上线程安全

        对于不线程安全的代码我们要进行加锁操作,首先针对的就是写操作,该部分的代码块肯定是要加锁的,因为多线程同时执行写操作,会导致线程不安全,如下图所示:

        下面,我们讨论一下这两个代码要不要加锁,以take为例,如下图所示:

        当前代码里面的队列为空,但是依旧执行出队列的逻辑,所以我们判断条件也应该加锁;

        以put为例,如下图所示:

        当前代码里面的队列已经满了,但是依旧执行入队列的逻辑;

        修改后代码如下:

class MyBlockingQueue {Object locker = new Object();private String elems[] = null;private int head = 0;//记录头结点private  int tail = 0;//记录尾结点private int size = 0;//队列元素个数//构造方法,定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}//入队列public void put(String elem) {synchronized (locker) {//判断容量满了没,满了就不能入队列,要阻塞等待if(size >= this.elems.length) {//阻塞等待,先不写,先实现普通功能的队列return;}//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//入队列elems[tail] = elem;tail++;//因为是循环队列,所以要判断尾巴有没有超过容量大小下标,超过了就要从0开始了if(tail > elems.length) {tail = 0;}//队列元素要++size++;}}}//出队列public String take() {String elem = null;//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//要判断队列是不是空的,空就不能出队列了,要阻塞等待if(size == 0) {//阻塞等待,因为是先实现普通队列的功能,所以后面再补充return null;}elem = elems[head];head++;//因为是循环队列,所以要判断头结点有没有超过容量大小下标,超过了就要0开始了if(head >= elems.length) {head = 0;}//出队列后,队列元素要--size--;return elem;}}
}

3.2.3 再加上阻塞功能

        我们给put要加上阻塞功能,就要在这条件判断上加上wait,我们用locker的对象给他wait,而且wait必须要在synchronized内使用,这里的locker正好能对应上;当这个队列满时,就阻塞等待,等take方法拿走一个数据时,才被唤醒,加上阻塞功能后的代码如下:

class MyBlockingQueue {Object locker = new Object();private String elems[] = null;private int head = 0;//记录头结点private  int tail = 0;//记录尾结点private int size = 0;//队列元素个数//构造方法,定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}//入队列public void put(String elem) throws InterruptedException {synchronized (locker) {//判断容量满了没,满了就不能入队列,要阻塞等待if (size >= this.elems.length) {//阻塞等待,先不写,先实现普通功能的队列synchronized (locker) {locker.wait();}}//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//入队列elems[tail] = elem;tail++;//因为是循环队列,所以要判断尾巴有没有超过容量大小下标,超过了就要从0开始了if(tail > elems.length) {tail = 0;}//队列元素要++size++;locker.notify();}}}//出队列public String take() throws InterruptedException {String elem = null;//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//要判断队列是不是空的,空就不能出队列了,要阻塞等待if (size == 0) {//阻塞等待,因为是先实现普通队列的功能,所以后面再补充synchronized (locker) {locker.wait();}}elem = elems[head];head++;//因为是循环队列,所以要判断头结点有没有超过容量大小下标,超过了就要0开始了if(head >= elems.length) {head = 0;}//出队列后,队列元素要--size--;locker.notify();return elem;}}
}

        当我们进行阻塞wait时,一定要在适当的条件下notify,如下图所示:

        代码讲解:

        当put时,队列满了时就要阻塞等待,等take队列后,就会唤醒put操作,接着put就能入队列了;

        如果队列不满也不空时,每次put和take都会notify一次,其实不会有影响,因为就算没有其他线程在等待,唤醒也没有事,不会对程序造成啥影响。而且我们的代码,一定是要么满,要么空,要么不满也不空。

        但是,如果有两个线程同时put,现在队列是满的,A线程先阻塞,B线程也阻塞,这时有第三个线程take一次,把A线程的wait唤醒了,等A执行到下面的notify,A线程里put的notify就会唤醒B线程里的wait,但是因为A线程put了,和第三个线程的take一取一放抵消了,此时队列还是满的;因为A线程里的put把B线程里的wait唤醒了,这时已经是满了的队列还往里放元素,就造成了线程安全问题。

        解决方案:把条件判断if换成while循环语句,不是只判断一次,当有其他线程把wait唤醒后,还要再判断一次这个队列是不是满的或者是空的,如果不是满的或者不是空的,才释放这个wait,不然就要继续wait,如此该问题也就解决了。

最终代码:
 

class MyBlockingQueue {Object locker = new Object();private String elems[] = null;private int head = 0;//记录头结点private  int tail = 0;//记录尾结点private int size = 0;//队列元素个数//构造方法,定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}//入队列public void put(String elem) throws InterruptedException {synchronized (locker) {//判断容量满了没,满了就不能入队列,要阻塞等待while (size >= this.elems.length) {//阻塞等待,先不写,先实现普通功能的队列synchronized (locker) {locker.wait();}}//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//入队列elems[tail] = elem;tail++;//因为是循环队列,所以要判断尾巴有没有超过容量大小下标,超过了就要从0开始了if(tail > elems.length) {tail = 0;}//队列元素要++size++;locker.notify();}}}//出队列public String take() throws InterruptedException {String elem = null;//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//要判断队列是不是空的,空就不能出队列了,要阻塞等待while (size == 0) {//阻塞等待,因为是先实现普通队列的功能,所以后面再补充synchronized (locker) {locker.wait();}}elem = elems[head];head++;//因为是循环队列,所以要判断头结点有没有超过容量大小下标,超过了就要0开始了if(head >= elems.length) {head = 0;}//出队列后,队列元素要--size--;locker.notify();return elem;}}
}

        在实际开发中,生产者消费者模型,往往是多个生产者,多个消费者;这里的生产者和消费者往往不仅仅是一个线程,也可能是一个独立的服务器,甚至是一组服务器程序。生产者消费者模型,最核心的部分还是阻塞队列,可以使用synchronized和wait / notify 达到线程安全与阻塞。

3.3 实现生产者消费者模型

        代码如下:

package thread;// 为了简单, 不写作泛型的形式. 考虑存储的元素就是单纯的 String
class MyBlockingQueue {private String[] elems = null;private int head = 0;private int tail = 0;private int size = 0;// 准备锁对象, 如果使用 this 也可以.private Object locker = new Object();public MyBlockingQueue(int capacity) {elems = new String[capacity];}public void put(String elem) throws InterruptedException {// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.synchronized (locker) {while (size >= elems.length) {// 队列满了.// 后续需要让这个代码能够阻塞.locker.wait();}// 新的元素要放到 tail 指向的位置上elems[tail] = elem;tail++;if (tail >= elems.length) {tail = 0;}size++;// 入队列成功之后唤醒locker.notify();}}public String take() throws InterruptedException {String elem = null;synchronized (locker) {while (size == 0) {// 队列空了.// 后续也需要让这个代码阻塞locker.wait();}// 取出 head 位置的元素并返回elem = elems[head];head++;if (head >= elems.length) {head = 0;}// 这个代码不要遗漏.size--;// 元素出队列成功之后, 加上唤醒locker.notify();}return elem;}
}public class ThreadDemo28 {public static void main(String[] args) throws InterruptedException {MyBlockingQueue queue = new MyBlockingQueue(1000);// 生产者Thread t1 = new Thread(() -> {int n = 1;while (true) {try {queue.put(n + "");System.out.println("生产元素 " + n);n++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 消费者Thread t2 = new Thread(() -> {while (true) {try {String n = queue.take();System.out.println("消费元素 " + n);Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}

        结果如下:

        如图所示,生产者消费者模型大抵是生产一个,消费一个,主要是生产之后消费者再消费;

ps:关于阻塞队列和生产着消费者模型的内容就到这里了,如果对你有帮助的话就请一键三连哦!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/593165.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

串口通信要点解析

目录 简介&#xff1a; UART 协议解析&#xff1a; 串口协议工作过程&#xff1a; 简介&#xff1a; 串行通信协议包括&#xff1a; UART通用异步收发传输器 (Universal Asynchronous ReceiverTransmitter) 是一种串行异步收发协议 (异步是指通信双方使用各自的时钟控制数据…

fineBI web组件传参

1、fineBI web组件传参 1.1、 Web组件- FineBI帮助文档 FineBI帮助文档1. 概述1.1 版本FineBI 版本HTML5移动端展现功能变动6.0--V11.0.83web组件适配移动端效果优化6.0.13-web组件支持传递参数 ${过滤组件https://help.fanruan.com/finebi/doc-view-143.html 1.2、自己做的例…

分布式(6)

目录 26.雪花算法如何实现的&#xff1f; 27.雪花算法有什么问题&#xff1f;有哪些解决思路&#xff1f; 28.有哪些方案实现分布式锁&#xff1f; 29.基于数据库如何实现分布式锁&#xff1f;有什么缺陷&#xff1f; 30.基于Redis如何实现分布式锁&#xff1f;有什么缺陷&…

VS 2022 控制台程序运行时不显示控制台

Visual Studio 2022&#xff0c;C#控制台程序运行时不显示控制台。此外&#xff0c;C#程序修改运行时的程序名。 文章目录 不显示控制台修改运行时的程序名打包成.exe 文件 不显示控制台 1 选中需要项目&#xff0c;右击属性&#xff0c;选中常规。 2 将输出类型从控制台改为…

微服务-@FeignClient 与 Feign 隔离

FeignClient 扫描 FeignClientsRegistrar#registerBeanDefinitions public void registerBeanDefinitions(AnnotationMetadata metadata,BeanDefinitionRegistry registry) { // 注册默认配置 registerDefaultConfiguration(metadata, registry); registerFeignClients(metada…

服务异步通讯---RabbitMQ实用篇

目录 一、初识MQ 一、同步调用 1、同步通讯和异步通讯 2、同步调用的问题 3.同步调用总结 二、异步调用 1、优势&#xff1a; ​编辑 2、异步总结 二、什么是MQ 一、RabbitMQ快速入门 1、RabbitMQ的结构和概念 2、常见消息模型 2.1、基础消息队列模型 2.2、总结 二…

Sharding-JDBC快速使用【笔记】

1 引言 最近在使用Sharding-JDBC实现项目中数据分片、读写分离需求&#xff0c;参考官方文档&#xff08;Sharding官方文档&#xff09;感觉内容庞杂不够有条理&#xff0c;重复内容比较多&#xff1b;现结合项目应用整理笔记如下供大家参考和自己回忆使用&#xff1b; 在…

为什么要太空探索?未来万亿人口 人类移居太空是不可避免的。大语言模型是发现 贝索斯

管理的思考 「最以客户为中心」「果断」「如何决策」 贝索斯给亚马逊的使命是「世上最以客户为中心的公司」(以客户需求为起点&#xff0c;反向推动工作)。贝索斯给蓝色起源的使命是「世上最果断的公司」(我们将变得非常擅长在技术上恰当地冒险&#xff0c;并快速地作出那些决…

一篇了解springboot3请求参数种类及接口测试

SpringBoot3数据请求&#xff1a; 原始数据请求&#xff1a; //原始方式RequestMapping("/simpleParam")public String simpleParam(HttpServletRequest request){//获取请求参数String name request.getParameter("name");String age request.getParame…

钉钉-蓝牙打卡和平台打卡的区别

钉钉的群是部门概念。 你的账号归属到哪个群&#xff0c;就是哪个群的员工。 -------------------------------------------------------------------- 蓝牙打卡是对账号归属进行打卡的。 平台打卡是只对属于自己平台内的账号打卡的。 ----------------------------------…

【MATLAB】PSO粒子群优化BiLSTM(PSO_BiLSTM)的时间序列预测

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 基于PSO粒子群优化的BiLSTM的时间序列预测算法的基本原理如下&#xff1a; 「双向长短时记忆&#xff08;BiLSTM&#xff09;模型」&#xff1a;这是一种深度学习模型&#xff0c;特别适用…

感恩客户相伴23载,泛微2024持续向上!

2023年&#xff0c;国家大力推动数字经济发展&#xff0c;各行各业在加速数字化转型&#xff0c;在这一年&#xff0c;泛微保持持续增长&#xff0c;引领行业发展&#xff0c;为组织的数字化转型助力。感恩客户与伙伴朋友的支持与信任&#xff01; 01.泛微中大客户总量突破8万余…

Unity中Shader的Reversed-Z(DirectX平台)

文章目录 前言一、在对裁剪坐标归一化设置NDC时&#xff0c;DirectX平台Z的特殊二、在图形计算器中&#xff0c;看一下Z值反转前后变化1、在图形计算器创建两个变量 n 和 f 分别 控制近裁剪面 和 远裁剪面2、带入公式得到齐次裁剪空间下Z值3、进行透视除法4、用 1 - Z 得出Z值反…

VS+QT五子棋游戏开发

1、首先安装好VS软件和QT库&#xff0c;将其配置好&#xff0c;具体不在此展开说明。 2、文件结构如下图&#xff1a; 3、绘制棋盘代码&#xff0c;如下&#xff1a; void Qwzq::paintEvent(QPaintEvent* event) {QPainter painter(this);painter.setRenderHint(QPainter::An…

【响应式编程-01】Lambda表达式初体验

一、简要描述 Lambda初体验Lambda表达式的语法格式Lambda表达式应用举例Lambda表达式底层实现 二、什么是Lambda表达式 Java8新特性&#xff0c;来源于数学中的λ[l:mdə]演算 是一套关于函数(f(x))定义、输入量、输出量的计算方案 Lambda表达式 -> 函数 使代码变得简洁…

维生素B5和琥珀酰辅酶A可以改善SF3B1基因突变引起无效造血过程

今天给同学们分享一篇实验文章“Vitamin B5 and succinyl-CoA improve ineffective erythropoiesis in SF3B1-mutated myelodysplasia”&#xff0c;这篇文章发表在Sci Transl Med期刊上&#xff0c;影响因子为17.1。 结果解读&#xff1a; SF3B1突变导致MDS-RS患者COASY异构体…

短视频账号矩阵系统源码/技术交付3年开发源头

账号矩阵3年技术独立开发打造是一个非常有挑战性和前景的项目。以下是一些建议&#xff0c;帮助你成功打造一个成功的短视频账号矩阵&#xff1a; 1. 确定目标受众&#xff1a;首先需要明确你的目标受众是谁&#xff0c;了解他们的兴趣爱好、年龄、性别等&#xff0c;以便为他们…

八大算法排序@选择排序(C语言版本)

目录 选择排序概念算法思想示例步骤1步骤2步骤...n最后一步 代码实现时间复杂度空间复杂度特性总结 选择排序 概念 选择排序&#xff08;Selection Sort&#xff09;是一种简单直观的排序算法。基本思想是在未排序的序列中找到最小&#xff08;或最大&#xff09;元素&#xf…

RS485数据采集模块,如何一次采集多个modbus设备数据?

在工业数据采集中&#xff0c;RS485是一种常见的数据通信协议&#xff0c;而Modbus则是其上的常用设备协议。那么&#xff0c;如何用一个模块高效采集多个Modbus设备的数据呢&#xff1f;这就是我们今天要探讨的话题&#xff01; 什么是RS485数据采集模块&#xff1f; 首先&a…

ALSA学习(5)——设备中的alsa

参考博客&#xff1a; https://blog.csdn.net/DroidPhone/article/details/7165482 &#xff08;一下内容基本是原博主的博客转载&#xff09; 文章目录 一、ASOC的由来二、硬件架构三、软件架构四、数据结构五、内核对ASoC的改进 一、ASOC的由来 ASoC–ALSA System on Chip …