并发模式:生产者和消费者

在我15年的职业生涯中,生产者和消费者的问题是我仅遇到过几次。 在大多数编程情况下,我们正在做的事情是以同步方式执行功能,其中JVM或Web容器自行处理多线程的复杂性。 但是,在编写某些需要的用例时。 上周,我遇到了一个这样的用例,使我在上一次这样做的时候回溯了三年。 但是,上次完成的方式却大不相同。

当我第一次听到问题陈述时,我立即知道需要什么。 但是,这次我的做法与上次有所不同。 这与我今天如何看待技术有关。 我不会涉足任何非技术方面,并且会直接跳入问题及其解决方案。 我开始研究市场上存在的东西,并发现了几篇文章,这些文章帮助我以正确的方式传达了我的想法。

问题陈述

我们需要一个用于批量迁移的解决方案。 我们正在将数据从系统1迁移到系统2,在此过程中,我们需要执行以下三个任务:

  • 根据组从数据库加载数据
  • 处理数据
  • 通过修改来更新在步骤1中加载的记录

我们必须处理100个小组,每个小组大约有4万条记录。 您可以想象如果我们以同步方式执行此练习将花费多少时间。 这里的图像有效地解释了这个问题。

生产者消费者:问题

生产者和消费者模式

首先让我们看一下生产者消费者模式。 如果您参考上面的问题说明并查看图片,我们会看到有太多实体准备使用其部分数据。 但是,没有足够的工人可以处理所有数据。 因此,随着生产者继续排队,它只会继续增长。 我们看到系统开始占用线程并花费大量时间。

中级解决方案

生产者消费者:中级方法

我们确实有一个中间解决方案。 参考该图像,您将立即注意到,生产者将他们的工作堆积在文件柜中,而工人在完成上一项任务时继续将其捡起来。 但是,这种方法确实存在一些明显的缺点:

  1. 仍然只有一名工人必须完成所有工作。 外部系统可能很高兴,但是任务将继续存在,直到工作人员完成所有任务为止
  2. 生产者将他们的数据堆积在队列中,并且需要资源来保存它们。 就像在此示例中,机柜可以装满一样,JVM资源也可能发生同样的情况。 我们需要注意要在内存中放入多少数据,在某些情况下可能不会太多。

解决方案

生产者消费者:解决方案

解决方案是我们每天在很多地方都能看到的,例如电影院大厅排队,汽油泵等。有很多人来订票,而根据进来的人数,增加了更多的人来发行票。 本质上,请参考此处的图像,您会注意到生产者将继续向内阁添加他们的工作,并且我们有更多的工人来处理工作量。

Java提供了并发包来解决此问题。 到现在为止,我一直在较低级别上进行线程工作,这是我第一次使用此程序包。 当我开始浏览网络并阅读其他博客作者的言论时,我遇到了一篇非常好的文章 。 它有助于非常有效地理解BlockingQueue的使用。 但是,Dhruba提供的解决方案并不能帮助我实现所需的高吞吐量。 因此,我开始探索对ArrayBlockingQueue的使用。

控制器

这是管理生产者和消费者之间的合同的第一类。 控制器将为生产者设置1个线程,为消费者设置2个线程。 根据需要,我们可以创建所需数量的线程。 甚至甚至可以从属性中读取数据或做一些动态魔术。 现在,我们将保持简单。

package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestProducerConsumer
{
public static void main(String args[])
{
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer("1", broker));
threadPool.execute(new Consumer("2", broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

我正在使用ExecuteService创建线程池并对其进行管理。 代替使用基本的Thread实现,这是一种更有效的方法,因为它将根据需要处理退出和重新启动线程。 您还将注意到,我正在使用Future类来获取生产者线程的状态。 该类非常有效,它将使我的程序停止进一步执行。 这是在线程上替换“ .join”方法的一种好方法。 注意:在这个例子中,我并不是很有效地使用Future。 因此您可能需要尝试一些适合自己的事情。
另外,您还应注意在生产者和消费者之间用作文件柜的Broker类。 我们将在短时间内看到它的实现。

生产者

此类负责产生需要处理的数据。

package com.kapil.techieforever.producerconsumer;
public class Producer implements Runnable
{
private Broker broker;
public Producer(Broker broker)
{
this.broker = broker;
}
@Override
public void run()
{
try
{
for (Integer i = 1; i < 5 + 1; ++i)
{
System.out.println("Producer produced: " + i);
Thread.sleep(100);
broker.put(i);
}
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}

此类正在做它所能做的最简单的事情-向代理添加一个整数。 需要注意的一些关键领域是:
1. Broker上有一个属性,生产者在完成生产后最终会对其进行更新。 这也称为“最终”或“毒药”条目。 消费者使用它来知道不再有数据 2.我使用Thread.sleep来模拟某些生产者可能需要更多时间来生产数据。 您可以调整此值并查看消费者的行为

消费者

此类负责从代理读取数据并完成其工作

package com.kapil.techieforever.producerconsumer;
public class Consumer implements Runnable
{
private String name;
private Broker broker;
public Consumer(String name, Broker broker)
{
this.name = name;
this.broker = broker;
}
@Override
public void run()
{
try
{
Integer data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + this.name + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + this.name + " finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}

这还是一个简单的类,它读取Integer并将其打印在控制台上。 但是,要注意的关键点是:
1.处理数据的循环是一个无限循环,它在两种情况下运行–直到生产者消费并且经纪人有一些数据为止
2.同样,Thread.sleep用于创建有效的不同方案

经纪人

package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Broker
{
public ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
public Boolean continueProducing = Boolean.TRUE;
public void put(Integer data) throws InterruptedException
{
this.queue.put(data);
}
public Integer get() throws InterruptedException
{
return this.queue.poll(1, TimeUnit.SECONDS);
}
}

首先要注意的是,我们使用ArrayBlockingQueue作为数据持有人。 我不会说这是什么,而是要您在此处的JavaDocs上阅读它。 但是,我将解释生产者将把数据放入队列,而使用者将以FIFO格式从队列中获取数据。 但是,如果生产者运行缓慢,则消费者将等待数据进入,如果阵列已满,生产者将等待数据填满。

另外,请注意,我使用的是“投票”功能,而不是进入队列。 这是为了确保消费者不会一直等待,等待会在几秒钟后超时。 这有助于我们进行相互交流,并在处理完所有数据后杀死消费者。 (注意:尝试用get代替poll,您将看到一些有趣的输出)。

我的代码位于Google项目托管上 。 随意浏览并从那里下载。 本质上,这是一个蚀(Spring STS)项目。 根据下载时间,您可能还会在下载时获得其他软件包和类。 也可以随意查看这些内容并分享您的评论
–您可以在SVN浏览器中浏览源代码,或者;
–您可以从项目本身下载它 。

侧面解决方案

最初,我在中间发布了此解决方案,但是后来我意识到这不是做事的方法,因此我从主要内容中删除了此内容,并将其放在最后。 最终解决方案的另一种变体是,工人/消费者一次不处理一项工作,而是一起处理多个工作,并在完成下一个工作之前先完成工作。 这种方法可以产生相似的结果,但是在某些情况下,如果我们有一些工作需要花费不同的时间才能完成,那么从本质上讲,这意味着某些工人比其他工人最终会更快地结束工作,从而造成了瓶颈。 并且,如果作业是事先分配的,这意味着所有消费者将在加工之前拥有所有作业(不是生产者-消费者模式),那么这个问题可能加起来甚至更多,并导致处理逻辑的更多延迟。

相关文章

  • 队列是Devil自己的数据结构 (petewarden.typepad.com)
  • 我对撒但的小帮手排队有错吗? (petewarden.typepad.com)
  • http://code.google.com/p/disruptor/

参考: 并发模式: JCG合作伙伴的 生产者和消费者   Scratch Pad博客上的Kapil Viren Ahuja。


翻译自: https://www.javacodegeeks.com/2012/02/concurrency-pattern-producer-and.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/373378.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

作业管理系统数据字典

转载于:https://www.cnblogs.com/heyangcan/p/5312394.html

使用Hive和iReport进行大数据分析

每个JJ Abrams的电视连续剧疑犯追踪从主要人物芬奇先生一个下列叙述情节开始&#xff1a;“ 你是被监视。 政府拥有一个秘密系统-每天每天每小时都会对您进行监视的机器。 我知道是因为...我建造了它。 “当然&#xff0c;我们的技术人员知道得更多。 庞大的电气和软件工程师团…

docker集群管理

docker集群管理 ps&#xff1a;docker machine docker swarm docker compose 在Docker Machine发布之前&#xff0c;你可能会遇到以下问题&#xff1a; 你需要登录主机&#xff0c;按照主机及操作系统特有的安装以及配置步骤安装Docker&#xff0c;使其能运行Docker…

从0学java_从零开始学JAVA(一.Java的基础语法)

基本语法编写 Java 程序时&#xff0c;应注意以下几点&#xff1a;大小写敏感&#xff1a;Java 是大小写敏感的&#xff0c;这就意味着标识符 Hello 与 hello 是不同的。类名&#xff1a;对于所有的类来说&#xff0c;类名的首字母应该大写。如果类名由若干单词组成&#xff0c…

mysql添加字符串日期时间_mysql学习笔记--- 字符串函数、日期时间函数

一、常见字符串函数&#xff1a;1、CHAR_LENGTH 获取长度(字符为单位)2、FORMAT 格式化3、INSERT 替换的方式插入4、INSTR 获取位置5、LEFT/RIGHT 取左、取右6、LENGTH 获取长度(字节为单位)7、LTRIM/RTRIM/TRIM 去空格(左/右/自定义)8、STRCMP 字符串比较9、CONCAT 字…

ADO.NET 核心对象简介

ADO.NET ADO.NET是.NET中一组用于和数据源进行交互的面向对象类库&#xff0c;提供了数据访问的高层接口。 ADO.NET类库在System.Data命名空间内&#xff0c;根据我们访问的不同数据库选择命名空间&#xff0c;System.Data.SqlClient。 ADO.NET类最重要的优点是支持数据库以断开…

简学LINGO(三)——实例篇

1. 装配线平衡模型 一个装配线含有一系列的工作站。在终于产品的加工过程中每一个工作站运行一种或者是几种特定的任务。装配线周期是指全部工作站完毕分配给他们各自任务所花费时间的最大值。平衡装配线的目标是为每一个工作站分配加工任务。尽可能使每一个工作站运行同样数量…

Android之卫星菜单的实现

卫星菜单是现在一个非常受欢迎的“控件”&#xff0c;很多Android程序员都趋之若鹜&#xff0c;预览如下图。传统的卫星菜单是用Animation实现的&#xff0c;需要大量的代码&#xff0c;而且算法极多&#xff0c;一不小心就要通宵Debug。本帖贴出用属性动画Animator来实现卫星菜…

为云量身定制您的服务

相信大家都听说过Amazon的AWS。作为业内最为成熟的云服务提供商&#xff0c;其运行规模&#xff0c;稳定性&#xff0c;安全性都已经经过了市场的考验。时至今日&#xff0c;越来越多的应用被部署在了AWS之上。这其中不乏Zynga及Netflix这样著名的服务。 然而这一切并没有停滞不…

在Vaadin和JSF之间选择

随着最新版本的Primefaces 3.0的发布&#xff0c;JSF终于达到了前所未有的成熟度和实用性&#xff0c;使其与其他流行的Rich Internet Applications&#xff08;RIA&#xff09;选项如Google Web Toolkit&#xff08;GWT&#xff09;&#xff0c;ExtJS&#xff0c;Vaadin&#…

flask开发restful api系列(1)

在此之前&#xff0c;向大家说明的是&#xff0c;我们整个框架用的是flask sqlalchemy redis。如果没有开发过web&#xff0c;还是先去学习一下&#xff0c;这边只是介绍如果从开发web转换到开发移动端。如果flask还不是很熟悉&#xff0c;我建议先到这个网站简单学习一下&am…

JS显示当前时间(包含农历时间)

时间格式&#xff1a; JavaScript代码&#xff1a; var sWeek new Array("星期日", "星期一", "星期二", "星期三", "星期四", "星期五", "星期六");var dNow new Date();var CalendarData new Arra…

MyBatis操作指南-与Spring集成(基于注解)

转载于:https://www.cnblogs.com/weilu2/p/mybatis_spring_integration_basic_on_annotation.html

JVM:如何分析线程转储

本文将教您如何分析JVM线程转储&#xff0c;并查明问题的根本原因。 从我的角度来看&#xff0c;线程转储分析是掌握Java EE生产支持的任何个人最重要的技能。 您可以从线程转储快照中获取的信息量通常远远超出您的想象。 我的目标是与您分享我在过去10年中积累的有关线程转储分…

极光推送JPush的快速集成

首先到极光推送的官网上创建一个应用&#xff0c;填写对应的应用名和包名。 创建好之后下载Demo 提取Sdk里面的图片和xml等资源文件放自己项目的相应位置&#xff0c;然后要注意的是.so文件的放置位置&#xff1a; 在main目录下新建一个jniLibs文件夹&#xff0c;放在这个文件夹…

elk系列1之入门安装与基本操作

preface 我们每天都要查看服务器的日志&#xff0c;一方面是为了开发的同事翻找日志&#xff0c;另一方面是巡检服务器查看日志&#xff0c;而随着服务器数量以及越来越多的业务上线&#xff0c;日志越来越多&#xff0c;人肉运维相当痛苦了&#xff0c;此时&#xff0c;参考现…

Java 7 –反编译项目硬币

大家好&#xff0c;该是从2012年开始写作的时候了。正如您在其他博客中可能已经看到的那样&#xff0c;有一些更改可以使您使用Java编程时的开发人员生活变得更加轻松&#xff1a;Diamond运算符&#xff0c;Switchs中的Strings&#xff0c;尝试使用资源&#xff0c;多次捕获等 …

在Excel表里面插入背景图

工作中我们会经常用到MS Excel&#xff0c;通常我们打开MS Excel&#xff0c;里面的工作表都是空白单调的背景。当然了&#xff0c;MS Excel可以在工作簿里面插入背景图片。那么问题来了&#xff0c;如果你没有安装Microsoft Office&#xff0c;该如何在Excel文件里面插入好看的…

实现两级下拉框的联动

1.实现两级下拉框的联动。 功能&#xff1a;实现点击年级下拉框&#xff0c;加载对应科目的下拉框。 第一步&#xff1a;首先要加载年级下拉框中的数据。 01.在GradeDAL层&#xff08;数据访问层&#xff09;写一个方法&#xff0c;查询所有年级的信息。 /// <summary>//…

python连接SQL Server取多个结果集:Pymssql模块

基本的用法可以参考&#xff1a;python连接SQL Server&#xff1a;Pymssql模块 和上一篇文章中的代码&#xff0c;只取一个结果集不同&#xff0c;这次会一次运行2个sql语句&#xff0c;然后分别取出2个结果集&#xff0c;打印输出。 代码中有详细的注释&#xff0c;一看就明白…