Java实现生产消费模型的5种方式

**

前言

**

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

在这里插入图片描述
以下这些解法,其实本质上都是实现了一个阻塞队列。为空,则消费者阻塞,满了,则生产者阻塞。

**

1.使用wait()和notify()实现

**

这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

public static void testProductConsumeByWaitAndNotify() {final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);final Object lock = new Object();Runnable producer = new Runnable() {public void run() {for(int i=0;i<30;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;//队列未满,一直往里放消息synchronized (lock) {while (size == queue.size()) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.offer(msg);lock.notifyAll();}System.out.println(msg+" 已发送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}synchronized (lock) {while (queue.size() == 0) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}String msg = queue.poll();System.out.println(msg+"已消费");lock.notifyAll();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}

**

2.可重入锁ReentrantLock的实现

**

java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,通过对lock的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。
可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。

ReentrantLock的Condition:

//阻塞当前线程,直到收到通知或者被中断(将当前线程加入到当前Condition对象的等待队列里)
//Block until signalled or interrupted
public final void await() throws InterruptedException;/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
* 把在当前Condition对象的等待队列里的等待最久的线程,转移到当前Lock的等待队列里
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
*         returns {@code false}
*/
public final void signal() ;

ReentrantLock实现生产消费模型:

public static void testProductConsumeByLock() {final Lock lock = new ReentrantLock();final Condition empty = lock.newCondition();final Condition full = lock.newCondition();final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);Runnable producer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}for(int i=0;i<20;i++) {lock.lock();try {if(queue.size() == size) {try {full.await();} catch (InterruptedException e) {e.printStackTrace();}}String msg = "生产消息:"+i;queue.add(msg);System.out.println(msg);empty.signal();} finally {lock.unlock();}}}};Runnable consumer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}while (true) {lock.lock();try {if(queue.isEmpty()) {try {empty.await();} catch (InterruptedException e) {e.printStackTrace();}}else {String msg = queue.remove();System.out.println(msg + "已消费");full.signal();}} finally {lock.unlock();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}

**

3.阻塞队列BlockingQueue实现

**

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作

因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
从上可知,阻塞队列是线程安全的。

下面是BlockingQueue接口的一些方法:

操作抛异常特定值阻塞超时
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
移除remove(o)poll(o)take(o)poll(timeout, timeunit)
检查element(o)peek(o)

这四类方法分别对应的是:

  1. ThrowsException:如果操作不能马上进行,则抛出异常
  2. SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  3. Blocks:如果操作不能马上进行,操作会被阻塞
  4. TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false

下面来看由阻塞队列实现的生产消费模型,这里我们使用take()和put()方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象

/*** 生产者消费者* 使用阻塞队列实现* @throws InterruptedException */public static void testProductConsumeByBlockingQueue() throws InterruptedException {//因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
//		final BlockingQueue<String> queue = new SynchronousQueue<String>(true);//使用有界阻塞队列final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);Runnable producer = new Runnable() {public void run() {for(int i=0;i<100;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;try {queue.put(msg);} catch (InterruptedException e1) {e1.printStackTrace();}System.out.println(msg+" 已发送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}String msg = null;try {msg = queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg+"已消费");}}};new Thread(producer).start();new Thread(consumer).start();}

**

4.信号量Semaphore的实现

**
信号量可以控制访问相应资源的线程的数量,从而实现生产消费模型

import java.util.concurrent.Semaphore;public class BySemaphore {int count = 0;final Semaphore put = new Semaphore(5);// 初始令牌个数final Semaphore get = new Semaphore(0);final Semaphore mutex = new Semaphore(1);   //该信号量相当于锁public static void main(String[] args) {BySemaphore bySemaphore = new BySemaphore();new Thread(bySemaphore.new Producer()).start();new Thread(bySemaphore.new Consumer()).start();new Thread(bySemaphore.new Consumer()).start();new Thread(bySemaphore.new Producer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}try {put.acquire();// 注意顺序mutex.acquire();count++;System.out.println("生产者" + Thread.currentThread().getName()+ "已生产完成,商品数量:" + count);} catch (Exception e) {e.printStackTrace();} finally {mutex.release();get.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {try {Thread.sleep(1000);} catch (InterruptedException e1) {e1.printStackTrace();}try {get.acquire();// 注意顺序mutex.acquire();count--;System.out.println("消费者" + Thread.currentThread().getName()+ "已消费,剩余商品数量:" + count);} catch (Exception e) {e.printStackTrace();} finally {mutex.release();put.release();}}}}
}

**

5.使用消息队列

**

这个是取巧的办法,直接使用现成的消息中间件服务(如RocketMq、RabbitMq、Kafka等),分分钟搞定。手动微笑~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/313810.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode 86. 分隔链表

给定一个链表和一个特定值 x&#xff0c;对链表进行分隔&#xff0c;使得所有小于 x 的节点都在大于或等于 x 的节点之前。你应当保留两个分区中每个节点的初始相对位置。示例:输入: head 1->4->3->2->5->2, x 3输出: 1->2->2->4->3->5题目分析…

深入理解 JVM Class文件格式(一)

** 一、JVM体系结构 ** ** 二、class格式文件概述 ** class文件是一种8位字节的二进制流文件&#xff0c; 各个数据项按顺序紧密的从前向后排列&#xff0c; 相邻的项之间没有间隙&#xff0c; 这样可以使得class文件非常紧凑&#xff0c; 体积轻巧&#xff0c; 可以被J…

深入理解 JVM Class文件格式(二)

** class文件中的特殊字符串 ** 特殊字符串是常量池中符号引用的一部分&#xff0c;包括三种&#xff1a; 类的全限定名&#xff0c; 字段和方法的描述符&#xff0c; 特殊方法的方法名。 下面我们就分别介绍这三种特殊字符串。 &#xff08;1&#xff09; 类的全限定名 在…

.NET框架之“小马过河”

.NET框架之“小马过河”有许多流行的 .NET框架&#xff0c;大家都觉得挺“重”&#xff0c;认为很麻烦&#xff0c;重量级&#xff0c;不如其它“轻量级”框架&#xff0c;从而不愿意使用。面对形形色色的框架发愁&#xff0c;笔者也曾发愁。但我发现只要敢于尝试&#xff0c;这…

深入理解 JVM Class文件格式(三)

** JVM常量池中各数据项类型详解 ** 关于常量池的大概内容&#xff0c; 已经在 深入理解 JVM Class文件格式&#xff08;一&#xff09; 中讲解过了&#xff0c; 这篇文章中还介绍了常量池中的11种数据类型。 本文的任务是详细讲解这11种数据类型&#xff0c; 深度剖析源文件…

深入理解 JVM Class文件格式(四)

&#xff08;3&#xff09;CONSTANT_Integer_info 一个常量池中的CONSTANT_Integer_info数据项, 可以看做是CONSTANT_Integer类型的一个实例。 它存储的是源文件中出现的int型数据的值。 同样&#xff0c; 作为常量池中的一种数据类型&#xff0c; 它的第一个字节也是一个tag值…

.Net Core中使用Quartz.Net Vue开即用的UI管理

Quartz.NETQuartz.Net 定制UI维护了常用作业添加、删除、修改、停止、启动功能&#xff0c;直接使用cron表达式设置作业执行间隔&#xff0c;有完整的日志记录。Quartz.NET是一个功能齐全的开源作业调度系统&#xff0c;可用于从最小的应用程序到大型企业系统。Quartz.NET是一个…

深入理解 JVM Class文件格式(五)

&#xff08;8&#xff09; CONSTANT_Class_info 常量池中的一个CONSTANT_Class_info&#xff0c; 可以看做是CONSTANT_Class数据类型的一个实例。 他是对类或者接口的符号引用。 它描述的可以是当前类型的信息&#xff0c; 也可以描述对当前类的引用&#xff0c; 还可以描述对…

混沌工程详细介绍——Netflix持续交付实践探寻

内容来源&#xff1a;DevOps案例深度研究 – Netflix的文化与工程实践战队&#xff08;本文只展示部分案例PPT及研究成果&#xff0c;更多细节请关注案例分享活动&#xff0c;及本公众号&#xff09;。本案例内容贡献者&#xff1a;高金梅&#xff0c;李晓莉&#xff0c;潘雄鹰…

深入理解 JVM Class文件格式(六)

经过前几篇文章&#xff0c; 终于将常量池介绍完了&#xff0c; 之所以花这么大的功夫介绍常量池&#xff0c; 是因为对于理解class文件格式&#xff0c;常量池是必须要了解的&#xff0c; 因为class文件中其他地方&#xff0c;大量引用了常量池中的数据项。 对于还不了解常量池…

远程开发初探 - VS Code Remote Development

如果你是学生&#xff0c;你还在你的 windows 电脑上为各种环境配置头疼的时候&#xff0c;你应该了解一下 Remote Development。如果你喜欢 linux 的开发环境和舒适的 shell&#xff0c;但却不舍得抛弃 windows/macos 图形界面给你带来的用户体验和一些软件的兼容(QQ, 微信), …

深入理解 JVM Class文件格式(七)

本专栏列前面的一系列博客&#xff0c; 对Class文件中的一部分数据项进行了介绍。 本文将会继续介绍class文件中未讲解的信息。 先回顾一下上面一篇文章。 在上一篇博客中&#xff0c; 我们介绍了&#xff1a; this_class 对当前类的描述 super_class 对当前类的超类的描述 in…

微信小程序集成腾讯云 IM SDK

1、背景因业务功能需求需要接入IM&#xff08;即时聊天&#xff09;功能&#xff0c;一开始想到的是使用 WebSocket 来实现这个功能&#xff0c;然天意捉弄&#xff08;哈哈&#xff09;服务器版本太低不支持 wx 协议&#xff08;也就不支持 WebSocket了&#xff09;不得不寻找…

深入理解 JVM Class文件格式(八)

在本专栏的第一篇文章 深入理解Java虚拟机到底是什么 中&#xff0c; 我们主要讲解了什么是虚拟机&#xff0c; 这篇博客是对JVM的一个概述。 在随后的几篇文章中&#xff0c;一直在讲解class文件格式。 在今天这篇博客中&#xff0c; 将会继续讲解class文件中的其他信息。 在本…

深入理解 JVM Class文件格式(九)

经过前八篇关于class文件的博客&#xff0c; 关于class文件格式的内容也基本上讲完了。 本文是关于class文件格式的最后一篇。 在这篇博客中&#xff0c; 将会讲解关于方法的几个属性。 理解这篇博客的内容&#xff0c; 对于理解JVM执行引擎起着重要作用。 关于虚拟机执行引擎有…

MongoDB入门及 c# .netcore客户端MongoDB.Driver2.9.1使用

MongoDB 是一个基于分布式文件存储的数据库。由 C 语言编写。旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。什么场景下使用MongoDBMongoDB虽然是NoSQL(非关系型的数据库)&#xff0c;但是实际使用的时候可以当做关系型数据库来用&#xff0c;mysql等数据库中单表数据量…

《WTM送书活动:向更遥远的星辰大海起航~》

点击上方蓝字关注我们吧是的,没错~这一篇不是大老刘写的 哈哈~啥? 你想知道为啥? 大老刘为了你们不加班,熬夜改BUG,姑娘不乐意了...然后...后面请自行脑补~哎~生活还要继续鸭....那么,接下来由我陪大家唠一段儿~ 单口...各位看官老爷们:注意了!第一件事情呢我们的WTM框…

Java中的对象一定在堆上分配吗?

首先&#xff0c;为解释这个问题&#xff0c;需要的基本知识如下&#xff08;如果对以下概念不太熟悉&#xff0c; 可以先了解下&#xff09;&#xff1a; 1.JVM内存结构&#xff0c;传送门 2.即时编译&#xff08;JIT&#xff09;&#xff0c;传送门 3. 逃逸分析&#xff0c;…

最全的 netcore 3.0 升级实战方案

1、哈喽大家中秋节&#xff08;后&#xff09;好呀&#xff01;感觉已经好久没有写文章了&#xff0c;但是也没有偷懒哟&#xff0c;我的视频教程《系列一、NetCore 视频教程&#xff08;Blog.Core&#xff09;》也已经录制八期了&#xff0c;还在每周末同步更新中&#xff0c;…

微软发布.Net Core 3.0 RC1,最终版本定于9月23日

2019.9.17 微软 宣布推出.NET Core 3.0 Release Candidate 1。就像Preview 9一样&#xff0c;主要专注于为 .NET Core 3.0 发布最终版本 。现在变得非常非常接近。将在9月23日.NET Conf上发布最终版本。.NET Core 3.0是从仅支持Windows传统的 .NET框架向更现代化的开源实现过渡…