并发模式:生产者和消费者

在我15年的职业生涯中,生产者和消费者的问题是我仅遇到过几次。 在大多数编程情况下,我们正在做的事情是以同步方式执行功能,其中JVM或Web容器自行处理多线程的复杂性。 但是,在编写某些需要的用例时。 上周,我遇到了一个这样的用例,使我在上一次这样做的时候回溯了三年。 但是,上次完成的方式却大不相同。

当我第一次听到问题陈述时,我立即知道需要什么。 但是,这次我的做法与上次有所不同。 这与我今天如何看待技术有关。 我不会涉足任何非技术方面,并且会直接跳入问题及其解决方案。 我开始研究市场上存在的东西,并发现了几篇文章,这些文章帮助我以正确的方式传达了我的想法。

问题陈述

我们需要一个用于批量迁移的解决方案。 我们正在将数据从系统1迁移到系统2,在此过程中,我们需要执行以下三个任务:

  • 根据组从数据库加载数据
  • 处理数据
  • 通过修改来更新在步骤1中加载的记录

我们必须处理100个小组,每个小组大约有4万条记录。 您可以想象如果我们以同步方式执行此练习将花费多少时间。 这里的图像有效地解释了这个问题。

生产者消费者:问题

生产者和消费者模式

首先让我们看一下生产者消费者模式。 如果您参考上面的问题说明并查看图片,我们会看到有太多实体准备使用其部分数据。 但是,没有足够的工人可以处理所有数据。 因此,随着生产者继续排队,它只会继续增长。 我们看到系统开始占用线程并花费大量时间。

中级解决方案

生产者消费者:中级方法

我们确实有一个中间解决方案。 参考该图像,您将立即注意到,生产者将他们的工作堆积在文件柜中,而工人在完成上一项任务时继续将其捡起来。 但是,这种方法确实存在一些明显的缺点:

  1. 仍然只有一名工人必须完成所有工作。 外部系统可能很高兴,但是任务将继续存在,直到工作人员完成所有任务为止
  2. 生产者将他们的数据堆积在队列中,并且需要资源来保存它们。 就像在此示例中,机柜可以装满一样,JVM资源也可能发生同样的情况。 我们需要注意要在内存中放入多少数据,在某些情况下可能不会太多。

解决方案

生产者消费者:解决方案

解决方案是我们每天在很多地方都能看到的,例如电影院大厅排队,汽油泵等。有很多人来订票,而根据进来的人数,增加了更多的人来发行票。 本质上,请参考此处的图像,您会注意到生产者将继续向内阁添加他们的工作,并且我们有更多的工人来处理工作量。

Java提供了并发包来解决此问题。 到现在为止,我一直在较低级别上进行线程工作,这是我第一次使用此程序包。 当我开始浏览网络并阅读其他博客作者的言论时,我遇到了一篇非常好的文章 。 它有助于非常有效地理解BlockingQueue的使用。 但是,Dhruba提供的解决方案并不能帮助我实现所需的高吞吐量。 因此,我开始探索对ArrayBlockingQueue的使用。

控制器

这是管理生产者和消费者之间的合同的第一类。 控制器将为生产者设置1个线程,为消费者设置2个线程。 根据需要,我们可以创建所需数量的线程。 甚至甚至可以从属性中读取数据或做一些动态魔术。 现在,我们将保持简单。

package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestProducerConsumer
{
public static void main(String args[])
{
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer("1", broker));
threadPool.execute(new Consumer("2", broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

我正在使用ExecuteService创建线程池并对其进行管理。 代替使用基本的Thread实现,这是一种更有效的方法,因为它将根据需要处理退出和重新启动线程。 您还将注意到,我正在使用Future类来获取生产者线程的状态。 该类非常有效,它将使我的程序停止进一步执行。 这是在线程上替换“ .join”方法的一种好方法。 注意:在这个例子中,我并不是很有效地使用Future。 因此您可能需要尝试一些适合自己的事情。
另外,您还应注意在生产者和消费者之间用作文件柜的Broker类。 我们将在短时间内看到它的实现。

生产者

此类负责产生需要处理的数据。

package com.kapil.techieforever.producerconsumer;
public class Producer implements Runnable
{
private Broker broker;
public Producer(Broker broker)
{
this.broker = broker;
}
@Override
public void run()
{
try
{
for (Integer i = 1; i < 5 + 1; ++i)
{
System.out.println("Producer produced: " + i);
Thread.sleep(100);
broker.put(i);
}
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}

此类正在做它所能做的最简单的事情-向代理添加一个整数。 需要注意的一些关键领域是:
1. Broker上有一个属性,生产者在完成生产后最终会对其进行更新。 这也称为“最终”或“毒药”条目。 消费者使用它来知道不再有数据 2.我使用Thread.sleep来模拟某些生产者可能需要更多时间来生产数据。 您可以调整此值并查看消费者的行为

消费者

此类负责从代理读取数据并完成其工作

package com.kapil.techieforever.producerconsumer;
public class Consumer implements Runnable
{
private String name;
private Broker broker;
public Consumer(String name, Broker broker)
{
this.name = name;
this.broker = broker;
}
@Override
public void run()
{
try
{
Integer data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + this.name + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + this.name + " finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}

这还是一个简单的类,它读取Integer并将其打印在控制台上。 但是,要注意的关键点是:
1.处理数据的循环是一个无限循环,它在两种情况下运行–直到生产者消费并且经纪人有一些数据为止
2.同样,Thread.sleep用于创建有效的不同方案

经纪人

package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Broker
{
public ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
public Boolean continueProducing = Boolean.TRUE;
public void put(Integer data) throws InterruptedException
{
this.queue.put(data);
}
public Integer get() throws InterruptedException
{
return this.queue.poll(1, TimeUnit.SECONDS);
}
}

首先要注意的是,我们使用ArrayBlockingQueue作为数据持有人。 我不会说这是什么,而是要您在此处的JavaDocs上阅读它。 但是,我将解释生产者将把数据放入队列,而使用者将以FIFO格式从队列中获取数据。 但是,如果生产者运行缓慢,则消费者将等待数据进入,如果阵列已满,生产者将等待数据填满。

另外,请注意,我使用的是“投票”功能,而不是进入队列。 这是为了确保消费者不会一直等待,等待会在几秒钟后超时。 这有助于我们进行相互交流,并在处理完所有数据后杀死消费者。 (注意:尝试用get代替poll,您将看到一些有趣的输出)。

我的代码位于Google项目托管上 。 随意浏览并从那里下载。 本质上,这是一个蚀(Spring STS)项目。 根据下载时间,您可能还会在下载时获得其他软件包和类。 也可以随意查看这些内容并分享您的评论
–您可以在SVN浏览器中浏览源代码,或者;
–您可以从项目本身下载它 。

侧面解决方案

最初,我在中间发布了此解决方案,但是后来我意识到这不是做事的方法,因此我从主要内容中删除了此内容,并将其放在最后。 最终解决方案的另一种变体是,工人/消费者一次不处理一项工作,而是一起处理多个工作,并在完成下一个工作之前先完成工作。 这种方法可以产生相似的结果,但是在某些情况下,如果我们有一些工作需要花费不同的时间才能完成,那么从本质上讲,这意味着某些工人比其他工人最终会更快地结束工作,从而造成了瓶颈。 并且,如果作业是事先分配的,这意味着所有消费者将在加工之前拥有所有作业(不是生产者-消费者模式),那么这个问题可能加起来甚至更多,并导致处理逻辑的更多延迟。

相关文章

  • 队列是Devil自己的数据结构 (petewarden.typepad.com)
  • 我对撒但的小帮手排队有错吗? (petewarden.typepad.com)
  • http://code.google.com/p/disruptor/

参考: 并发模式: JCG合作伙伴的 生产者和消费者   Scratch Pad博客上的Kapil Viren Ahuja。


翻译自: https://www.javacodegeeks.com/2012/02/concurrency-pattern-producer-and.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/373378.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

POJ 1006 - Biorhythms (中国剩余定理)

B - BiorhythmsTime Limit:1000MS Memory Limit:10000KB 64bit IO Format:%I64d & %I64u Submit Status Practice POJ 1006Description 人生来就有三个生理周期&#xff0c;分别为体力、感情和智力周期&#xff0c;它们的周期长度为23天、28天和33天。每一个周期中…

子线程中更新UI线程的三个方法

1、通过handler方式&#xff0c;sendmessage。 多个类间传递比较麻烦&#xff0c;也懒的写... 2、线程中通过runOnUiThread&#xff08;&#xff09; new Thread() { public void run() { //这儿是耗时操作&#xff0c;完成之后更新UI&#xff1b; runOnUiThread(new Runnab…

mysql limit acs_mysql查询操作

简单查询&#xff1a;select * from 表名;避免重复&#xff1a;select distinct 字段 from 表名;条件查询&#xff1a;select 字段,字段 from 表名 where id<5(条件);四则运算查询&#xff1a;select id,dep_id,id*dep_id from company.employee5 where id<5;定义显示格式…

作业管理系统数据字典

转载于:https://www.cnblogs.com/heyangcan/p/5312394.html

使用Hive和iReport进行大数据分析

每个JJ Abrams的电视连续剧疑犯追踪从主要人物芬奇先生一个下列叙述情节开始&#xff1a;“ 你是被监视。 政府拥有一个秘密系统-每天每天每小时都会对您进行监视的机器。 我知道是因为...我建造了它。 “当然&#xff0c;我们的技术人员知道得更多。 庞大的电气和软件工程师团…

docker集群管理

docker集群管理 ps&#xff1a;docker machine docker swarm docker compose 在Docker Machine发布之前&#xff0c;你可能会遇到以下问题&#xff1a; 你需要登录主机&#xff0c;按照主机及操作系统特有的安装以及配置步骤安装Docker&#xff0c;使其能运行Docker…

从0学java_从零开始学JAVA(一.Java的基础语法)

基本语法编写 Java 程序时&#xff0c;应注意以下几点&#xff1a;大小写敏感&#xff1a;Java 是大小写敏感的&#xff0c;这就意味着标识符 Hello 与 hello 是不同的。类名&#xff1a;对于所有的类来说&#xff0c;类名的首字母应该大写。如果类名由若干单词组成&#xff0c…

linux mount (挂载命令)详解

挂接命令(mount) 首先&#xff0c;介绍一下挂接(mount)命令的使用方法&#xff0c;mount命令参数非常多&#xff0c;这里主要讲一下今天我们要用到的。 命令格式&#xff1a;mount [-t vfstype] [-o options] device dir 其中&#xff1a; 1.-t vfstype 指定文件系统的类型&…

Android官方培训课程中文版(v0.9.5)

http://hukai.me/android-training-course-in-chinese/index.html转载于:https://www.cnblogs.com/xiaoyao095/p/6125715.html

使用SaxParser和完整代码进行XML解析

SAX解析器使用回调函数&#xff08;org.xml.sax.helpers.DefaultHandler&#xff09;通知客户端XML文档结构。 您应该扩展DefaultHandler并重写一些方法来实现xml解析。 覆盖的方法是 startDocument&#xff08;&#xff09;和endDocument&#xff08;&#xff09;–在XML文档…

mysql添加字符串日期时间_mysql学习笔记--- 字符串函数、日期时间函数

一、常见字符串函数&#xff1a;1、CHAR_LENGTH 获取长度(字符为单位)2、FORMAT 格式化3、INSERT 替换的方式插入4、INSTR 获取位置5、LEFT/RIGHT 取左、取右6、LENGTH 获取长度(字节为单位)7、LTRIM/RTRIM/TRIM 去空格(左/右/自定义)8、STRCMP 字符串比较9、CONCAT 字…

Android异常和工具使用笔记

Android异常和工具使用笔记 1、r文件找不到去你的工程目录下&#xff0c;手动的把gen删掉&#xff0c;然后去project中刷新一下&#xff0c;在编译看看。以前遇到过类似的问题&#xff0c;实在不行就把你的eclispe,adt升级到最新的版本吧 抓住那么一点点线索&#xff0c;就要去…

ADO.NET 核心对象简介

ADO.NET ADO.NET是.NET中一组用于和数据源进行交互的面向对象类库&#xff0c;提供了数据访问的高层接口。 ADO.NET类库在System.Data命名空间内&#xff0c;根据我们访问的不同数据库选择命名空间&#xff0c;System.Data.SqlClient。 ADO.NET类最重要的优点是支持数据库以断开…

MongoDB与Spring Data项目

如今&#xff0c;我们所有人都在观察NoSql解决方案的爆炸式增长。 我已经习惯了RDBMS&#xff0c;但这些并不是您可能遇到的所有挑战的解决方案。 根据最近的经验&#xff0c;我有机会使用MongoDB –文档数据库。 在本文中&#xff0c;我打算介绍将MongoDB与Spring Data项目一起…

java转换为字符串_java – 如何从int转换为字符串?

正常方式是Integer.toString(i)或String.valueOf(i)。串联将工作&#xff0c;但它是非常规的&#xff0c;可能是一个难闻的气味&#xff0c;因为它暗示作者不知道上述两种方法(他们不知道什么&#xff1f;)。Java在使用字符串(见the documentation)时对操作符提供了特殊的支持&…

简学LINGO(三)——实例篇

1. 装配线平衡模型 一个装配线含有一系列的工作站。在终于产品的加工过程中每一个工作站运行一种或者是几种特定的任务。装配线周期是指全部工作站完毕分配给他们各自任务所花费时间的最大值。平衡装配线的目标是为每一个工作站分配加工任务。尽可能使每一个工作站运行同样数量…

Hibernate缓存级别教程

开始使用Hibernate的人们常见的问题之一就是性能&#xff0c;如果您没有太多的Hibernate经验&#xff0c;您会发现应用程序变慢的速度。 如果启用sql跟踪&#xff0c;您将看到有多少查询被发送到数据库&#xff0c;而这些查询几乎不需要Hibernate知识就可以避免。 在当前文章中…

java方法执行的时间_计算Java中任意一个方法的执行时间的工具类

1 packagealgorithm.study.utils;23 importjava.lang.reflect.Method;45 /**6 * This class is getting a method execute time and provide some other functions.7 *8 *authorygh 2017年2月24日9 */10 public classMethodExecuteTimeUtils {1112 /**13 * Get a method execut…

如何在 IIS 中设置 HTTPS 服务

Windows Server2008、IIS7启用CA认证及证书制作完整过程 这篇文章介绍了如何安装证书申请工具&#xff1b; 如何在iis创建证书申请&#xff1b; 如何使用iis申请证书生成的txt文件&#xff0c;在工具中开始申请证书&#xff1b; 如何导出证书&#xff1b; 以及在网站中开始使用…

Android之卫星菜单的实现

卫星菜单是现在一个非常受欢迎的“控件”&#xff0c;很多Android程序员都趋之若鹜&#xff0c;预览如下图。传统的卫星菜单是用Animation实现的&#xff0c;需要大量的代码&#xff0c;而且算法极多&#xff0c;一不小心就要通宵Debug。本帖贴出用属性动画Animator来实现卫星菜…