Java多线程:生产者-消费者模型

在这里插入图片描述

  • 👑专栏内容:Java
  • ⛪个人主页:子夜的星的主页
  • 💕座右铭:前路未远,步履不停

目录

  • 一、阻塞队列
    • 1、标准库阻塞队列
    • 2、手动实现阻塞队列
  • 二、生产者-消费者模型
    • 1、使用标准库实现
    • 2、手动阻塞队列实现


一、阻塞队列

阻塞队列是一种特殊的队列,也遵守“先进先出”的原则。 阻塞队列是一种线程安全的数据结构,并且具有以下特性:

  • 当队列满的时候,继续入队列就会阻塞,直到有其他线程从队列中取走元素
  • 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插入元素

1、标准库阻塞队列

在 Java 标准库中内置了阻塞队列 BlockingQueue接口,下面有七个实现类:

类名特性
ArrayBlockingQueue由数组结构组成的有界阻塞队列
LinkedBlockingQueue由链表结构组成的有界的阻塞队列(有界,默认大小 Integer.MAX_VALUE,相当于无界)
PriorityBlockingQueue支持优先级排序的无界阻塞队列
DelayQueue使用优先级队列实现的延迟无界阻塞队列
SynchronousQueue不存储元素的阻塞队列,即单个元素的队列,生产一个,消费一个,不存储元素,不消费不生产
LinkedTransferQueue由链表结构组成的无界阻塞队列
LinkedBlockingDeque由链表结构组成的双向阻塞队列

详细说明:

  • ArrayBlockingQueue:使用数组实现,线程安全,有界,FIFO(先进先出)排序。适合生产者和消费者速度相近的场景。
  • LinkedBlockingQueue:使用链表实现,线程安全,有界(默认大小 Integer.MAX_VALUE,相当于无界),FIFO排序。适合生产者和消费者速度不一致的场景。
  • PriorityBlockingQueue:使用优先级队列实现,线程安全,无界,支持优先级排序。适合需要根据优先级处理任务的场景。
  • DelayQueue:使用优先级队列实现,线程安全,无界,支持延迟队列。适合需要延迟执行任务的场景。
  • SynchronousQueue:不存储元素,生产者和消费者必须同时存在才能成功操作队列。适合需要一对一交换数据的场景。
  • LinkedTransferQueue:使用链表实现,线程安全,无界,支持FIFO和LIFO(后进先出)排序。适合需要无锁操作的场景。
  • LinkedBlockingDeque:使用链表实现,线程安全,无界,支持双向操作。适合需要同时支持入队和出队的场景。

选择建议:

选择合适的阻塞队列取决具体需求。以下是一些建议:

  • 如果需要一个有界队列,并且生产者和消费者速度相近,可以使用 ArrayBlockingQueue
  • 如果需要一个无界队列,并且生产者和消费者速度不一致,可以使用 LinkedBlockingQueue
  • 如果需要根据优先级处理任务,可以使用 PriorityBlockingQueue
  • 如果需要延迟执行任务,可以使用 DelayQueue
  • 如果需要一对一交换数据,可以使用 SynchronousQueue
  • 如果需要无锁操作,可以使用 LinkedTransferQueue
  • 如果需要同时支持入队和出队,可以使用 LinkedBlockingDeque
常用方法描述
add(E e)将指定的元素插入此队列中,如果没有可用的空间,则抛出异常。
offer(E e)将指定的元素插入此队列中,如果可以在不违反容量限制的情况下立即执行,则成功返回 true,否则返回 false。
put(E e)将指定的元素插入此队列中,如果队列已满,则一直等待直到有空间可用。
remove()检索并删除此队列的头部,如果队列为空,则抛出异常。
poll()检索并删除此队列的头部,如果队列为空,则返回 null。
take()检索并删除此队列的头部,如果队列为空,则一直等待直到有元素可用。
element()检索但不删除此队列的头部,如果队列为空,则抛出异常。
peek()检索但不删除此队列的头部,如果队列为空,则返回 null。

这个表格列出了 ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue 这三个类的常用方法,它们分别表示由数组结构支持的有界阻塞队列、由链表结构支持的可选有界阻塞队列以及由堆实现支持优先级排序的无界阻塞队列。阻塞队列的一个典型应用场景就是 “生产者消费者模型”,这是一种非常典型的开发模型。

2、手动实现阻塞队列

我们使用循环队列逐步实现阻塞队列,先手动实现一下循环队列。

class MyBlockingQueue{//使用String数组保存元素private String[] items = new String[1000];//有效范围:[head,tail),当heda和tai相等,相当于空队列。private int head = 0; //指向队列头部private int tail = 0; //指向队列的尾部private int size = 0;//入队列public void put(String elem){if(size>=items.length){//队列满了return;}items[tail] = elem;tail++;if(tail>=items.length){tail = 0;}size++;}//出队列public String take(){if(size == 0){return null;}String elem = items[head];head++;if(head>=items.length){head = 0;}size--;return elem;}
}

把上面的队列改造为阻塞队列,因为是多线程,所以为了保证线程安全,就要给puttake进行加锁。

    public void put(String elem) throws InterruptedException {synchronized (this){while(size>=items.length){ //使用while代替if,进行二次确认。//队列满了this.wait();}items[tail] = elem;tail++;if(tail>=items.length){tail = 0;}size++;this.notify(); // 用来唤醒队列为空的阻塞情况。}}
    public String take() throws InterruptedException {synchronized (this){if(size == 0){//队列为空this.wait();}String elem = items[head];head++;if(head>=items.length){head = 0;}size--;this.notify(); // 使用这个notify来唤醒队列满return elem;}}

除了加锁之外,还需要考虑内存可见性问题。

    volatile private int head = 0; //指向队列头部volatile private int tail = 0; //指向队列的尾部volatile private int size = 0;

在这里插入图片描述
此处的两个wait不会同时出现,要么是这边的wait,要么是另一边的wait

class MyBlockingQueue{//使用String数组保存元素private String[] items = new String[1000];//有效范围:[head,tail),当heda和tai相等,相当于空队列。volatile private int head = 0; //指向队列头部volatile private int tail = 0; //指向队列的尾部volatile private int size = 0;//入队列public void put(String elem) throws InterruptedException {synchronized (this){while(size>=items.length){ //使用while代替if,进行二次确认。//队列满了this.wait();}items[tail] = elem;tail++;if(tail>=items.length){tail = 0;}size++;this.notify(); // 用来唤醒队列为空的阻塞情况。}}//出队列public String take() throws InterruptedException {synchronized (this){if(size == 0){//队列为空this.wait();}String elem = items[head];head++;if(head>=items.length){head = 0;}size--;this.notify(); // 使用这个notify来唤醒队列满return elem;}}
}
//实现阻塞队列(基于数组(循环队列))
public class Demo {public static void main(String[] args) throws InterruptedException {MyBlockingQueue queue = new MyBlockingQueue();queue.put("aaa");queue.put("bbb");queue.put("ccc");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}

二、生产者-消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。

生产者: 负责生产数据的线程或者进程。生产者把生产出来的数据放入缓冲区。
消费者: 负责消费数据的线程或者进程。消费者从缓冲区取出数据进行消费。
缓冲区: 生产者和消费者之间共享的存储区域。缓冲区可以是有限的空间,也可以是阻塞队列。

  • 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求,如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程)。这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求。这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮。
  • 阻塞队列也能使生产者和消费者之间解耦。比如过年一家人一起包饺子,一般都是有明确分工。比如一个人负责擀饺子皮,其他人负责包。擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”。 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)。
    在这里插入图片描述

1、使用标准库实现

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;//简单的生产者消费者模型
public class Demo10 {public static void main(String[] args) {//阻塞队列作为交易场所BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();Thread t1 = new Thread(()->{int count = 0;while(true){try {queue.put(count);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("生产元素:"+count);count++;try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});//负责消费元素Thread t2 = new Thread(()->{while (true){Integer n = null;try {n = queue.take();System.out.println("消费元素:" + n);} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();t2.start();}
}

在这里插入图片描述

2、手动阻塞队列实现

class MyBlockingQueue{//使用String数组保存元素private String[] items = new String[1000];//有效范围:[head,tail),当heda和tai相等,相当于空队列。volatile private int head = 0; //指向队列头部volatile private int tail = 0; //指向队列的尾部volatile private int size = 0;//入队列public void put(String elem) throws InterruptedException {synchronized (this){while(size>=items.length){ //使用while代替if,进行二次确认。//队列满了this.wait();}items[tail] = elem;tail++;if(tail>=items.length){tail = 0;}size++;this.notify(); // 用来唤醒队列为空的阻塞情况。}}//出队列public String take() throws InterruptedException {synchronized (this){if(size == 0){//队列为空this.wait();}String elem = items[head];head++;if(head>=items.length){head = 0;}size--;this.notify(); // 使用这个notify来唤醒队列满return elem;}}
}
//实现阻塞队列(基于数组(循环队列))
public class Demo11 {public static void main(String[] args) throws InterruptedException {//创建两个线程表示生产者和消费者MyBlockingQueue queue = new MyBlockingQueue();Thread t1 = new Thread(()->{int count = 0;while (true){try {queue.put(count + "");System.out.println("生产元素:"+count);count++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});Thread t2 = new Thread(()->{while (true){try {String count = queue.take();System.out.println("消费元素:"+count);} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();t2.start();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/678459.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode:LCP 30. 魔塔游戏(贪心 Java)

目录 LCP 30. 魔塔游戏 题目描述&#xff1a; 实现代码与解析&#xff1a; 贪心 原理思路&#xff1a; LCP 30. 魔塔游戏 题目描述&#xff1a; 小扣当前位于魔塔游戏第一层&#xff0c;共有 N 个房间&#xff0c;编号为 0 ~ N-1。每个房间的补血道具/怪物对于血量影响记于…

P8756 [蓝桥杯 2021 省 AB2] 国际象棋 状压dp统计情况数的一些小理解

目录 建议有状压基础再食用&#xff1a;本题的状态转移方程是 dp代码片:参考代码 建议有状压基础再食用&#xff1a; n行m列 等价 n列m行 &#xff0c;因为n比较小&#xff0c;int是32位足够了&#xff0c;我们用比特位统计每一行的状态。 本题的状态转移方程是 dp[h][i][j]…

HTML世界之第一重天

一、HTML 元素 注&#xff1a;HTML 文档由 HTML 元素定义。 1.HTML 元素 开始标签 * 元素内容 结束标签 * <p> 这是一个段落 </p> <a href"default.htm"> 这是一个链接 </a> <br> 换行 开始标签常被称为起始标签&…

【OpenHarmony硬件操作】led灯和key的操作

文章目录 前言一、GPIO2.1 GPIO是什么?2.2 GPIO的工作模式2.3 点灯操作GPIO初始化设置引脚功能设置引脚的方向输出高低电平2.4 示例代码三、key的操作3.1 中断3.2 中断的触发方式3.3 相关函数设置上下拉电阻设置中断和触发模式

DataX源码分析 reader

系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录前言Reader组件如何处理…

ARP欺骗攻击利用之抓取https协议的用户名与密码

1.首先安装sslstrip 命令执行&#xff1a;apt-get install sslstrip 2.启动arp欺骗 arpspoof -i ech0 -t 192.168.159.148 192.168.159.2 arpspoof -i ech0(网卡) -t 目标机ip 本地局域网关 3.命令行输入: vim /etc/ettercap/etter.conf进入配置文件 找到下红框的内容&a…

【Linux】学习-深入了解文件的读与写

深入了解语言级别(C语言)文件操作的"读"与"写" 在学习前&#xff0c;我们先要知道在Linux下的一个原则&#xff1a;一切皆是文件 如何理解呢&#xff1f;举个外设的例子&#xff0c;比如键盘和显示器&#xff0c;这两个外设也可以其实本质上也是文件&…

Qt Windows和Android使用MuPDF预览PDF文件

文章目录 1. Windows MuPDF编译2. Android MuPDF编译3. 引用 MuPDF 库4. 解析本地PDF文件 1. Windows MuPDF编译 使用如下命令将MuPDF的源码克隆到本地 git clone --recursive git://git.ghostscript.com/mupdf.git直接用VS&#xff0c;打开 mupdf/platform/win32/mupdf.sln …

pandas 按相同站号重新整合出一个dataframe

情况1&#xff1a; 如果两个DataFrame都有一个共同的列&#xff08;不是索引&#xff09;&#xff0c;你可以使用merge或join来整合它们。 import pandas as pd # 创建两个示例DataFrame df1 pd.DataFrame({ ID: [001, 002, 003], A: [foo, bar, baz] }) df2 pd.Dat…

docker 部署 mongodb 集群【建议收藏】

一、简洁搭建mognodb副本集 环境说明 我都是在云服务器上搭建的&#xff0c;CentOS7&#xff0c;Docker环境&#xff0c;版本忘记了。我就直接在同一台服务器上搭建三个mongodb即可。 1、基本信息如下 服务器地址 www.it307.top 副本集名称 rs 容器节点及端口映射 ​ m0…

数据结构——6.1 图的基本概念

第六章 图 6.1 图的基本概念 概念 图的概念&#xff1a;G由点集V和边集E构成&#xff0c;记为G(V,E)&#xff0c;边集可以为空&#xff0c;但是点集不能为空 注意&#xff1a;线性表可以是空表&#xff0c;树可以是空树&#xff0c;但图不可以是空&#xff0c;即V一定是非空集…

leetcode:63.不同路径二

dp数组含义&#xff1a;由初始位置到最终位置路径个数 递推公式&#xff1a;如果没有障碍再进行递推公式 初始化&#xff1a;1.若起始位置和终止位置有障碍路径个数为0 2.dp[i][0] 1和dp[0][j] 1的for循环条件都需要加上一个and dp[i][0] 0和and dp[0][j] 0. 3.遍历顺序…

三维形体投影面积(c++题解)

题目描述 在 n x n 的网格 grid 中&#xff0c;我们放置了一些与 x&#xff0c;y&#xff0c;z 三轴对齐的 1 x 1 x 1 立方体。 每个值 v grid[i][j] 表示 v 个正方体叠放在单元格 (i, j) 上。 现在&#xff0c;我们查看这些立方体在 xy 、yz 和 zx 平面上的投影。 投影 就…

案例:三台主机实现 级联复制

介绍&#xff1a;级联复制架构 级联复制架构 是一种特殊的主从结构&#xff0c;之前聊到的几种主从结构都只有两层&#xff0c;但级联复制架构中会有三层&#xff0c;关系如下&#xff1a; 也就是在级联复制架构中&#xff0c;存在两层从库&#xff0c;这实际上属于一主多从架…

Deepin基本环境查看(九)【被封印的创世神】

文章目录 - 相关文章目录1、概述2、Deepin中的创世神和管理员1&#xff09;创世神root2&#xff09;root被封印原因3&#xff09;其他的神灵【管理员】 3、神殿管理【su与sudo】1&#xff09;su&#xff08;Switch User&#xff09;2&#xff09;sudo&#xff08;Superuser Do&…

Open CASCADE学习|环形弹簧建模

目录 Draw Test Harness&#xff1a; C&#xff1a; 环形弹簧&#xff0c;也称为弓簧&#xff0c;是由拉伸弹簧和连接弹簧构成的。在结构上&#xff0c;环形弹簧通常包括端环、外环和内环&#xff0c;其主要参数包括弹簧的内径、外径和自由高度。环形弹簧的一个显著特点是&am…

计算机毕业设计SSM基于的冷链食品物流信息管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; vue mybatis Maven mysql5.7或8.0等等组成&#xff0c;B…

prometheus之redis_exporter部署

下载解压压缩包 mkdir /opt/redis_exporter/ cd /opt/redis_exporter/ wget http://soft.download/soft/linux/prometheus/redis_exporter/redis_exporter-v1.50.0.linux-amd64.tar.gz tar -zxvf redis_exporter-v1.50.0.linux-amd64.tar.gz ln -s /opt/redis_exporter/redis_…

网络原理(一)

&#x1f495;"Echo"&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;网络原理(一) 一. 应用层 应用层是和程序员联系最密切的一层,对于应用层来说,程序员可以自定义应用层协议,应用层的协议一般要约定好以下两部分内容: 根据需求,明确要传输哪些信…

[算法学习]

矩阵乘法 只有当左矩阵列数等于右矩阵行数&#xff0c;才能相乘N*M的矩阵和M*K的矩阵做乘法后矩阵大小为N*k矩阵乘法规则&#xff1a;第一个矩阵A的第 i 行与第二个矩阵的第 j 列的各M个元素对应相乘再相加得到新矩阵C[i][j]的值 整除 同余 同余的性质 线性运算&#xff0c;…