Java实现生产消费模型的5种方式

**

前言

**

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

在这里插入图片描述
以下这些解法,其实本质上都是实现了一个阻塞队列。为空,则消费者阻塞,满了,则生产者阻塞。

**

1.使用wait()和notify()实现

**

这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

public static void testProductConsumeByWaitAndNotify() {final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);final Object lock = new Object();Runnable producer = new Runnable() {public void run() {for(int i=0;i<30;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;//队列未满,一直往里放消息synchronized (lock) {while (size == queue.size()) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.offer(msg);lock.notifyAll();}System.out.println(msg+" 已发送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}synchronized (lock) {while (queue.size() == 0) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}String msg = queue.poll();System.out.println(msg+"已消费");lock.notifyAll();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}

**

2.可重入锁ReentrantLock的实现

**

java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,通过对lock的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。
可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。

ReentrantLock的Condition:

//阻塞当前线程,直到收到通知或者被中断(将当前线程加入到当前Condition对象的等待队列里)
//Block until signalled or interrupted
public final void await() throws InterruptedException;/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
* 把在当前Condition对象的等待队列里的等待最久的线程,转移到当前Lock的等待队列里
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
*         returns {@code false}
*/
public final void signal() ;

ReentrantLock实现生产消费模型:

public static void testProductConsumeByLock() {final Lock lock = new ReentrantLock();final Condition empty = lock.newCondition();final Condition full = lock.newCondition();final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);Runnable producer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}for(int i=0;i<20;i++) {lock.lock();try {if(queue.size() == size) {try {full.await();} catch (InterruptedException e) {e.printStackTrace();}}String msg = "生产消息:"+i;queue.add(msg);System.out.println(msg);empty.signal();} finally {lock.unlock();}}}};Runnable consumer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}while (true) {lock.lock();try {if(queue.isEmpty()) {try {empty.await();} catch (InterruptedException e) {e.printStackTrace();}}else {String msg = queue.remove();System.out.println(msg + "已消费");full.signal();}} finally {lock.unlock();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}

**

3.阻塞队列BlockingQueue实现

**

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作

因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
从上可知,阻塞队列是线程安全的。

下面是BlockingQueue接口的一些方法:

操作抛异常特定值阻塞超时
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
移除remove(o)poll(o)take(o)poll(timeout, timeunit)
检查element(o)peek(o)

这四类方法分别对应的是:

  1. ThrowsException:如果操作不能马上进行,则抛出异常
  2. SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  3. Blocks:如果操作不能马上进行,操作会被阻塞
  4. TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false

下面来看由阻塞队列实现的生产消费模型,这里我们使用take()和put()方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象

/*** 生产者消费者* 使用阻塞队列实现* @throws InterruptedException */public static void testProductConsumeByBlockingQueue() throws InterruptedException {//因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
//		final BlockingQueue<String> queue = new SynchronousQueue<String>(true);//使用有界阻塞队列final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);Runnable producer = new Runnable() {public void run() {for(int i=0;i<100;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;try {queue.put(msg);} catch (InterruptedException e1) {e1.printStackTrace();}System.out.println(msg+" 已发送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}String msg = null;try {msg = queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg+"已消费");}}};new Thread(producer).start();new Thread(consumer).start();}

**

4.信号量Semaphore的实现

**
信号量可以控制访问相应资源的线程的数量,从而实现生产消费模型

import java.util.concurrent.Semaphore;public class BySemaphore {int count = 0;final Semaphore put = new Semaphore(5);// 初始令牌个数final Semaphore get = new Semaphore(0);final Semaphore mutex = new Semaphore(1);   //该信号量相当于锁public static void main(String[] args) {BySemaphore bySemaphore = new BySemaphore();new Thread(bySemaphore.new Producer()).start();new Thread(bySemaphore.new Consumer()).start();new Thread(bySemaphore.new Consumer()).start();new Thread(bySemaphore.new Producer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}try {put.acquire();// 注意顺序mutex.acquire();count++;System.out.println("生产者" + Thread.currentThread().getName()+ "已生产完成,商品数量:" + count);} catch (Exception e) {e.printStackTrace();} finally {mutex.release();get.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {try {Thread.sleep(1000);} catch (InterruptedException e1) {e1.printStackTrace();}try {get.acquire();// 注意顺序mutex.acquire();count--;System.out.println("消费者" + Thread.currentThread().getName()+ "已消费,剩余商品数量:" + count);} catch (Exception e) {e.printStackTrace();} finally {mutex.release();put.release();}}}}
}

**

5.使用消息队列

**

这个是取巧的办法,直接使用现成的消息中间件服务(如RocketMq、RabbitMq、Kafka等),分分钟搞定。手动微笑~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/313810.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#.NET 一颗璀璨的全能明星

C# 是微软推出的一种基于.NET框架的、面向对象的高级编程语言&#xff0c;她可以做什么呢&#xff1f;1.桌面开发&#xff0c;WinForm/GUI可视化编程&#xff1a;Windows开发中的葵花宝典&#xff0c;霸主地位至今无出其右&#xff0c;开发效率令人发指&#xff0c;大部分营销软…

#3328. PYXFIB(单位根反演)

#3328. PYXFIB ∑i0⌊nk⌋CnikFik∑i0nCniFi[i≡0(modk)]i≡0(modk)&#xff0c;单位根反演有1k∑j0k−1wkij1k∑i0nCniFi∑j0k−1wkij\sum_{i 0} ^{\lfloor \frac{n}{k} \rfloor} C_{n} ^{i \times k} \times F_{i \times k}\\ \sum_{i 0} ^{n} C_n ^{i} \times F_i \times …

Leetcode 86. 分隔链表

给定一个链表和一个特定值 x&#xff0c;对链表进行分隔&#xff0c;使得所有小于 x 的节点都在大于或等于 x 的节点之前。你应当保留两个分区中每个节点的初始相对位置。示例:输入: head 1->4->3->2->5->2, x 3输出: 1->2->2->4->3->5题目分析…

深入理解 JVM Class文件格式(一)

** 一、JVM体系结构 ** ** 二、class格式文件概述 ** class文件是一种8位字节的二进制流文件&#xff0c; 各个数据项按顺序紧密的从前向后排列&#xff0c; 相邻的项之间没有间隙&#xff0c; 这样可以使得class文件非常紧凑&#xff0c; 体积轻巧&#xff0c; 可以被J…

min_25 推导及例题总结

min_25 筛 一个亚线性筛&#xff0c;复杂度大概是O(n34log⁡n)O(\frac{n ^{\frac{3}{4}}}{ \log n})O(lognn43​​)。 使用min_25min\_25min_25求前缀和&#xff0c;有两个基本特征&#xff1a;① 积性函数&#xff0c;② 满足质数点为多项式。 算法思路 给定n≤1011n \leq…

asp.net core 使用 signalR(一)

asp.net core 使用 signalR&#xff08;一&#xff09;IntroSignalR 是什么&#xff1f;ASP.NET Core SignalR 是一个开源代码库&#xff0c;它简化了向应用添加实时 Web 功能的过程。实时 Web 功能使服务器端代码能够即时将内容推送到客户端。SignalR 的适用对象&#xff1a;需…

深入理解 JVM Class文件格式(二)

** class文件中的特殊字符串 ** 特殊字符串是常量池中符号引用的一部分&#xff0c;包括三种&#xff1a; 类的全限定名&#xff0c; 字段和方法的描述符&#xff0c; 特殊方法的方法名。 下面我们就分别介绍这三种特殊字符串。 &#xff08;1&#xff09; 类的全限定名 在…

P4211 [LNOI2014]LCA(离线 + 在线 做法)

P4211 [LNOI2014]LCA 有一棵根节点为111树&#xff0c;有mmm次询问&#xff0c;每次给定l,r,zl, r, zl,r,z&#xff0c;输出∑ilrdep[lca(i,z)]\sum\limits_{i l} ^{r} dep[lca(i, z)]il∑r​dep[lca(i,z)]。 乍一看这题好像无从下手&#xff0c;仔细想想lca(i,z)lca(i, z)l…

.NET框架之“小马过河”

.NET框架之“小马过河”有许多流行的 .NET框架&#xff0c;大家都觉得挺“重”&#xff0c;认为很麻烦&#xff0c;重量级&#xff0c;不如其它“轻量级”框架&#xff0c;从而不愿意使用。面对形形色色的框架发愁&#xff0c;笔者也曾发愁。但我发现只要敢于尝试&#xff0c;这…

深入理解 JVM Class文件格式(三)

** JVM常量池中各数据项类型详解 ** 关于常量池的大概内容&#xff0c; 已经在 深入理解 JVM Class文件格式&#xff08;一&#xff09; 中讲解过了&#xff0c; 这篇文章中还介绍了常量池中的11种数据类型。 本文的任务是详细讲解这11种数据类型&#xff0c; 深度剖析源文件…

#6073. 「2017 山东一轮集训 Day5」距离(树链剖分 + 永久标记主席树)

#6073. 「2017 山东一轮集训 Day5」距离 给定一颗有nnn个节点带边权的树&#xff0c;以及一个排列ppp&#xff0c;path(u,v)path(u, v)path(u,v)为u,vu, vu,v路径上的点集&#xff0c;dist(u,v)dist(u, v)dist(u,v)为u,vu, vu,v之间的最短路的长度。 有mmm次询问&#xff0c;…

ML.NET 示例:搜索引擎结果排名

ML.NET 示例中文版&#xff1a;https://github.com/feiyun0112/machinelearning-samples.zh-cn/edit/master/samples/csharp/getting-started/Ranking_Web英文原版请访问&#xff1a;https://github.com/dotnet/machinelearning-samples/tree/master/samples/csharp/getting-st…

深入理解 JVM Class文件格式(四)

&#xff08;3&#xff09;CONSTANT_Integer_info 一个常量池中的CONSTANT_Integer_info数据项, 可以看做是CONSTANT_Integer类型的一个实例。 它存储的是源文件中出现的int型数据的值。 同样&#xff0c; 作为常量池中的一种数据类型&#xff0c; 它的第一个字节也是一个tag值…

.Net Core中使用Quartz.Net Vue开即用的UI管理

Quartz.NETQuartz.Net 定制UI维护了常用作业添加、删除、修改、停止、启动功能&#xff0c;直接使用cron表达式设置作业执行间隔&#xff0c;有完整的日志记录。Quartz.NET是一个功能齐全的开源作业调度系统&#xff0c;可用于从最小的应用程序到大型企业系统。Quartz.NET是一个…

Ancient Distance(妙啊!!!) [2020牛客暑期多校训练营(第四场)]

Ancient Distance 给定一颗根为111有nnn个节点的树&#xff0c;每次可以选定树上kkk节点当作特殊节点&#xff0c; 定义dis(u)dis(u)dis(u)为&#xff0c;从u−>1u->1u−>1遇上的第一个特殊点的距离&#xff0c;如果遇不上特殊点则dis(u)dis(u)dis(u)无穷大。 有nn…

深入理解 JVM Class文件格式(五)

&#xff08;8&#xff09; CONSTANT_Class_info 常量池中的一个CONSTANT_Class_info&#xff0c; 可以看做是CONSTANT_Class数据类型的一个实例。 他是对类或者接口的符号引用。 它描述的可以是当前类型的信息&#xff0c; 也可以描述对当前类的引用&#xff0c; 还可以描述对…

混沌工程详细介绍——Netflix持续交付实践探寻

内容来源&#xff1a;DevOps案例深度研究 – Netflix的文化与工程实践战队&#xff08;本文只展示部分案例PPT及研究成果&#xff0c;更多细节请关注案例分享活动&#xff0c;及本公众号&#xff09;。本案例内容贡献者&#xff1a;高金梅&#xff0c;李晓莉&#xff0c;潘雄鹰…

P4175 [CTSC2008]网络管理(整体二分)

P4175 [CTSC2008]网络管理 给定一棵有nnn个节点的树&#xff0c;点有点权&#xff0c;有两种操作&#xff1a;① 修改某个点的点权&#xff0c;② 查询两点路径间的点权第kkk大。 给定u,vu, vu,v&#xff0c;选定111号节点为根节点&#xff0c;设inf(x)inf(x)inf(x)表示从根节…

深入理解 JVM Class文件格式(六)

经过前几篇文章&#xff0c; 终于将常量池介绍完了&#xff0c; 之所以花这么大的功夫介绍常量池&#xff0c; 是因为对于理解class文件格式&#xff0c;常量池是必须要了解的&#xff0c; 因为class文件中其他地方&#xff0c;大量引用了常量池中的数据项。 对于还不了解常量池…

远程开发初探 - VS Code Remote Development

如果你是学生&#xff0c;你还在你的 windows 电脑上为各种环境配置头疼的时候&#xff0c;你应该了解一下 Remote Development。如果你喜欢 linux 的开发环境和舒适的 shell&#xff0c;但却不舍得抛弃 windows/macos 图形界面给你带来的用户体验和一些软件的兼容(QQ, 微信), …