Java并发编程实战~生产者-消费者模式

前面我们在《Worker Thread 模式》中讲到,Worker Thread 模式类比的是工厂里车间工人的工作模式。但其实在现实世界,工厂里还有一种流水线的工作模式,类比到编程领域,就是生产者 - 消费者模式。

生产者 - 消费者模式在编程领域的应用也非常广泛,前面我们曾经提到,Java 线程池本质上就是用生产者 - 消费者模式实现的,所以每当使用线程池的时候,其实就是在应用生产者 - 消费者模式。

当然,除了在线程池中的应用,为了提升性能,并发编程领域很多地方也都用到了生产者 - 消费者模式,例如 Log4j2 中异步 Appender 内部也用到了生产者 - 消费者模式。所以今天我们就来深入地聊聊生产者 - 消费者模式,看看它具体有哪些优点,以及如何提升系统的性能。

生产者 - 消费者模式的优点

生产者 - 消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。下面是生产者 - 消费者模式的一个示意图,你可以结合它来理解。

11

从架构设计的角度来看,生产者 - 消费者模式有一个很重要的优点,就是解耦。解耦对于大型系统的设计非常重要,而解耦的一个关键就是组件之间的依赖关系和通信方式必须受限。在生产者 - 消费者模式中,生产者和消费者没有任何依赖关系,它们彼此之间的通信只能通过任务队列,所以生产者 - 消费者模式是一个不错的解耦方案。

除了架构设计上的优点之外,生产者 - 消费者模式还有一个重要的优点就是支持异步,并且能够平衡生产者和消费者的速度差异。在生产者 - 消费者模式中,生产者线程只需要将任务添加到任务队列而无需等待任务被消费者线程执行完,也就是说任务的生产和消费是异步的,这是与传统的方法之间调用的本质区别,传统的方法之间调用是同步的。

你或许会有这样的疑问,异步化处理最简单的方式就是创建一个新的线程去处理,那中间增加一个“任务队列”究竟有什么用呢?我觉得主要还是用于平衡生产者和消费者的速度差异。我们假设生产者的速率很慢,而消费者的速率很高,比如是 1:3,如果生产者有 3 个线程,采用创建新的线程的方式,那么会创建 3 个子线程,而采用生产者 - 消费者模式,消费线程只需要 1 个就可以了。Java 语言里,Java 线程和操作系统线程是一一对应的,线程创建得太多,会增加上下文切换的成本,所以 Java 线程不是越多越好,适量即可。生产者 - 消费者模式恰好能支持你用适量的线程

支持批量执行以提升性能

前面我们在《Thread-Per-Message 模式》中讲过轻量级的线程,如果使用轻量级线程,就没有必要平衡生产者和消费者的速度差异了,因为轻量级线程本身就是廉价的,那是否意味着生产者 - 消费者模式在性能优化方面就无用武之地了呢?当然不是,有一类并发场景应用生产者 - 消费者模式就有奇效,那就是批量执行任务。

例如,我们要在数据库里 INSERT 1000 条数据,有两种方案:第一种方案是用 1000 个线程并发执行,每个线程 INSERT 一条数据;第二种方案是用 1 个线程,执行一个批量的 SQL,一次性把 1000 条数据 INSERT 进去。这两种方案,显然是第二种方案效率更高,其实这样的应用场景就是我们上面提到的批量执行场景。

在《两阶段终止模式》文章中,我们提到一个监控系统动态采集的案例,其实最终回传的监控数据还是要存入数据库的(如下图)。但被监控系统往往有很多,如果每一条回传数据都直接 INSERT 到数据库,那么这个方案就是上面提到的第一种方案:每个线程 INSERT 一条数据。很显然,更好的方案是批量执行 SQL,那如何实现呢?这就要用到生产者 - 消费者模式了。

11

利用生产者 - 消费者模式实现批量执行 SQL 非常简单:将原来直接 INSERT 数据到数据库的线程作为生产者线程,生产者线程只需将数据添加到任务队列,然后消费者线程负责将任务从任务队列中批量取出并批量执行。

在下面的示例代码中,我们创建了 5 个消费者线程负责批量执行 SQL,这 5 个消费者线程以 while(true){} 循环方式批量地获取任务并批量地执行。需要注意的是,从任务队列中获取批量任务的方法 pollTasks() 中,首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;之所以首先采用阻塞方式,是因为如果任务队列中没有任务,这样的方式能够避免无谓的循环。

//任务队列
BlockingQueue<Task> bq=new LinkedBlockingQueue<>(2000);//启动5个消费者线程
//执行批量任务  
public void start() {ExecutorService es=executors.newFixedThreadPool(5);for (int i=0; i<5; i++) {es.execute(()->{try {while (true) {//获取批量任务List<Task> ts = pollTasks();//执行批量任务execTasks(ts);}} catch (Exception e) {e.printStackTrace();}});}
}//从任务队列中获取批量任务
public List<Task> pollTasks() throws InterruptedException {List<Task> ts = new LinkedList<>();//阻塞式获取一条任务Task t = bq.take();while (t != null) {ts.add(t);//非阻塞式获取一条任务t = bq.poll();}return ts;
}//批量执行任务
public execTasks(List<Task> ts) {//省略具体代码无数
}

利用生产者 - 消费者模式还可以轻松地支持一种分阶段提交的应用场景。我们知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,我们往往采用异步刷盘的方式。我曾经参与过一个项目,其中的日志组件是自己实现的,采用的就是异步刷盘方式,刷盘的时机是:

  • ERROR 级别的日志需要立即刷盘;
  • 数据积累到 500 条需要立即刷盘;
  • 存在未刷盘数据,且 5 秒钟内未曾刷盘,需要立即刷盘。

这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面我们具体看看用生产者 - 消费者模式如何实现。在下面的示例代码中,可以通过调用 info()和error() 方法写入日志,这两个方法都是创建了一个日志任务 LogMsg,并添加到阻塞队列中,调用 info()和error() 方法的线程是生产者;而真正将日志写入文件的是消费者线程,在 Logger 这个类中,我们只创建了 1 个消费者线程,在这个消费者线程中,会根据刷盘规则执行刷盘操作,逻辑很简单,这里就不赘述了。

class Logger {//任务队列  final BlockingQueue<LogMsg> bq = new BlockingQueue<>();//flush批量  static final int batchSize=500;//只需要一个线程写日志ExecutorService es = Executors.newFixedThreadPool(1);//启动写日志线程public void start(){File file = File.createTempFile("foo", ".log");final FileWriter writer = new FileWriter(file);this.es.execute(()->{try {//未刷盘日志数量int curIdx = 0;long preFT = System.currentTimeMillis();while (true) {LogMsg log = bq.poll(5, TimeUnit.SECONDS);//写日志if (log != null) {writer.write(log.toString());++curIdx;}//如果不存在未刷盘数据,则无需刷盘if (curIdx <= 0) {continue;}//根据规则刷盘if (log!=null && log.level==LEVEL.ERROR ||curIdx == batchSize ||System.currentTimeMillis()-preFT>5000){writer.flush();curIdx = 0;preFT=System.currentTimeMillis();}}}catch(Exception e){e.printStackTrace();} finally {try {writer.flush();writer.close();}catch(IOException e){e.printStackTrace();}}});  }//写INFO级别日志void info(String msg) {bq.put(new LogMsg(LEVEL.INFO, msg));}//写ERROR级别日志void error(String msg) {bq.put(new LogMsg(LEVEL.ERROR, msg));}
}
//日志级别
enum LEVEL {INFO, ERROR
}
class LogMsg {LEVEL level;String msg;//省略构造函数实现LogMsg(LEVEL lvl, String msg){}//省略toString()实现String toString(){}
}

总结

Java 语言提供的线程池本身就是一种生产者 - 消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。

生产者 - 消费者模式在分布式计算中的应用也非常广泛。在分布式场景下,你可以借助分布式消息队列(MQ)来实现生产者 - 消费者模式。MQ 一般都会支持两种消息模型,一种是点对点模型,一种是发布订阅模型。这两种模型的区别在于,点对点模型里一个消息只会被一个消费者消费,和 Java 的线程池非常类似(Java 线程池的任务也只会被一个线程执行);而发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,你可以结合观察者模式实现广播功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/496668.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI界的七大未解之谜:OpenAI丢出一组AI研究课题

来源&#xff1a;三体智讯今天&#xff0c;OpenAI在官方博客上丢出了7个研究过程中发现的未解决问题。OpenAI希望这些问题能够成为新手入坑AI的一种有趣而有意义的方式&#xff0c;也帮助从业者提升技能。OpenAI版AI界七大未解之谜&#xff0c;现在正式揭晓——丨1. Slitherin难…

vue 前端商城框架_前端工程师要掌握几个Vue框架

vue是一套用于构建用户界面的渐进式JavaScript框架&#xff0c;简单说Vue是类似于view的前端框架。vue开发核心是关注视图层&#xff0c;同时它更加容易与第三方库结合&#xff0c;再者我们在现有的项目中可以直接整合一起。目前vue技术社区在英文或中文都非常丰富&#xff0c;…

Python 模块 requests 模拟登录豆瓣 并 发表动态

如何抓取 WEB 页面&#xff1a;http://blog.csdn.net/chenguolinblog/article/details/45024643github 上一个关于模拟登录的项目&#xff1a;https://github.com/xchaoinfo/fuck-login Python爬虫之模拟登录总结&#xff1a;http://blog.csdn.net/churximi/article/details/50…

华为云BU总裁:如何把AI从噱头变为生产力?

来源&#xff1a;亿欧网 作者&#xff1a;张之颖“别跟着喊口号&#xff0c;少看朋友圈。…人工智能在中国被过分炒作了&#xff0c;现在国内人工智能已被娱乐化。不是做两个刷脸应用、搞一个APP就叫做人工智能。”华为云BU总裁郑叶来接受环球网记者的采访时表示&#xff0c;华…

Python 爬虫框架 - PySpider

Python爬虫进阶四之PySpider的用法&#xff1a;http://cuiqingcai.com/2652.html 网络爬虫剖析&#xff0c;以Pyspider为例&#xff1a;http://python.jobbole.com/81109 Python爬虫利器六之PyQuery的用法&#xff1a;https://cuiqingcai.com/2636.html 爬虫框架pyspider个人总…

AI技术加持,让协作机器人更安全

来源&#xff1a;机器人创新生态丨公众号来自众家新创公司与实验室的碰撞侦测与追踪技术&#xff0c;将使得在人类与其他移动物体周边的协作机器人更安全。一个美国圣地亚哥大学&#xff08;University of San Diego&#xff09;的团队便开发了一种更快速的算法&#xff0c;能协…

捕获异常_Recover捕获异常

“ 本文来源于《The Go Programming Language》”5.10. Recover捕获异常通常来说&#xff0c;不应该对panic异常做任何处理&#xff0c;但有时&#xff0c;也许我们可以从异常中恢复&#xff0c;至少我们可以在程序崩溃前&#xff0c;做一些操作。举个例子&#xff0c;当web服务…

仿msn弹出窗口

msnMessage.js文件代码&#xff1a; Code1 /** 2 ** 3 ** 类名&#xff1a;msnMessage 4 ** 功能&#xff1a;提供类似MSN消息框 5 ** 示例&#xff1a; 6 --------------------------------------------------------------------------------…

CPU诞生记|CPU制造全过程详解

来源&#xff1a;电子产品世界CPU(Centralprocessingunit)是现代计算机的核心部件&#xff0c;又称为“微处理器”。对于PC而言&#xff0c;CPU的规格与频率常常被用来作为衡量一台电脑性能强弱重要指标。Intelx86架构已经经历了二十多个年头&#xff0c;而x86架构的CPU对我们大…

二维数组 类型_Java第六章 | 二维数组的创建及使用、数组排序算法

二维数组的创建及使用1、二维数组的创建2、二维数组初始化3、使用二维数组二维数组的创建声明二维数组的方法有两种&#xff0c;语法如下所示&#xff1a;数组元素类型 数组名字[ ][ ];数组元素类型[ ][ ] 数组名字;数组元素类型&#xff1a;决定了数组的数据类型&#xff0c;它…

Semaphore及其用法

1、Semaphore 是什么 Semaphore 通常我们叫它信号量&#xff0c; 可以用来控制同时访问特定资源的线程数量&#xff0c;通过协调各个线程&#xff0c;以保证合理的使用资源。 比如&#xff1a;停车场入口立着的那个显示屏&#xff0c;每有一辆车进入停车场显示屏就会显示剩余…

sklearn 逻辑回归Demo

逻辑回归案例 假设表示 基于上述情况&#xff0c;要使分类器的输出在[0,1]之间&#xff0c;可以采用假设表示的方法。 设 h θ ( x ) g ( θ T x ) h_θ (x)g(θ^T x) hθ​(x)g(θTx)&#xff0c; 其中 g ( z ) 1 ( 1 e − z ) g(z)\frac{1}{(1e^{−z} )} g(z)(1e−z)1​…

URL原理、URL编码、URL特殊字符、输入URL到页面显示

​From&#xff1a;http://blog.csdn.net/zmx729618/article/details/51381655 From&#xff1a;http://www.cnblogs.com/coco1s/p/5038412.html HTML URL 编码参考手册&#xff1a;https://www.w3cschool.cn/htmltags/html-urlencode.html http://www.w3school.com.cn/t…

记忆模糊、记忆泛化的关键分子开关被发现

来源&#xff1a;brainnews2018年3月12日&#xff0c;Nature Medicine杂志在线刊登了麻省总医院Amar Sahay研究组的最新重要工作&#xff0c;他们发现了一种细胞骨架蛋白Actin-binding LIM protein 3 (ABLIM3)&#xff0c;降低该蛋白的表达水平可以增强海马齿状回细胞&#xff…

240多个jQuery插件 (转)

概述 jQuery 是继 prototype 之后又一个优秀的 Javascript 框架。其宗旨是—写更少的代码,做更多的事情。它是轻量级的 js 库(压缩后只有21k) &#xff0c;这是其它的 js 库所不及的&#xff0c;它兼容 CSS3&#xff0c;还兼容各种浏览器&#xff08;IE 6.0, FF 1.5, Safari 2.…

Exchanger及其用法

01 Exchanger 作用 使两个线程之间进行数据传递。&#xff08;对是两个之间而不是三个或者更多个线程之间&#xff09; 02 常用方法 exchange&#xff08;&#xff09; 阻塞当前线程并等待其他线程来取得数据&#xff0c;若没有其他线程来取数据则一直等待。 exchange&…

2 如何设置窗口title_如何设置华为4G路由2的WiFi黑白名单【设置方法】

不想让自家的Wi-Fi被蹭网&#xff0c;除了将Wi-Fi隐藏起来&#xff0c;您还可以设置Wi-Fi黑白名单。如果您发现有人蹭网了&#xff0c;可以将蹭网设备直接加入黑名单&#xff0c;这样就可以禁止这个设备再连接到您的Wi-Fi。如果您将家人、朋友的设备加入了白名单&#xff0c;那…

谷歌大脑AutoML最新进展:用进化算法发现神经网络架构

来源&#xff1a;AI中国大脑的进化进程持续已久&#xff0c;从5亿年前的蠕虫大脑到现如今各种现代结构。例如&#xff0c;人类的大脑可以完成各种各样的活动&#xff0c;其中许多活动都是毫不费力的。例如&#xff0c;分辨一个视觉场景中是否包含动物或建筑物对我们来说是微不足…

linux 的 ip 命令 和 ifconfig 命令

From&#xff08;试试Linux下的ip命令&#xff0c;ifconfig已经过时了&#xff09;&#xff1a; https://linux.cn/article-3144-1.html From&#xff08;linux网络配置命令之ifconfig、ip和route&#xff09;&#xff1a; http://chrinux.blog.51cto.com/6466723/1188108 From…

对于Office Live平台的思考

刚接触计算机编程的时候&#xff0c;脑子里想法比肚子里的墨水多得多&#xff0c;那时候想通过网络成立一个游戏开发团队&#xff0c;将不少人都很喜欢的一款FC游戏“重装机兵”&#xff08;Metal Max&#xff09;移植到电脑上来。当时的想法很激进也很宏大&#xff0c;我想的不…