当我第一次听到问题陈述时,我立即知道需要什么。 但是,这次我的做法与上次有所不同。 这与我今天如何看待技术有关。 我不会涉足任何非技术方面,并且会直接跳入问题及其解决方案。 我开始研究市场上存在的东西,并发现了几篇文章,这些文章帮助我以正确的方式传达了我的想法。
问题陈述
我们需要一个用于批量迁移的解决方案。 我们正在将数据从系统1迁移到系统2,在此过程中,我们需要执行以下三个任务:
- 根据组从数据库加载数据
- 处理数据
- 通过修改来更新在步骤1中加载的记录
我们必须处理100个小组,每个小组大约有4万条记录。 您可以想象如果我们以同步方式执行此练习将花费多少时间。 这里的图像有效地解释了这个问题。
生产者消费者:问题 |
生产者和消费者模式
首先让我们看一下生产者消费者模式。 如果您参考上面的问题说明并查看图片,我们会看到有太多实体准备使用其部分数据。 但是,没有足够的工人可以处理所有数据。 因此,随着生产者继续排队,它只会继续增长。 我们看到系统开始占用线程并花费大量时间。
中级解决方案
生产者消费者:中级方法 |
我们确实有一个中间解决方案。 参考该图像,您将立即注意到,生产者将他们的工作堆积在文件柜中,而工人在完成上一项任务时继续将其捡起来。 但是,这种方法确实存在一些明显的缺点:
- 仍然只有一名工人必须完成所有工作。 外部系统可能很高兴,但是任务将继续存在,直到工作人员完成所有任务为止
- 生产者将他们的数据堆积在队列中,并且需要资源来保存它们。 就像在此示例中,机柜可以装满一样,JVM资源也可能发生同样的情况。 我们需要注意要在内存中放入多少数据,在某些情况下可能不会太多。
解决方案
生产者消费者:解决方案 |
解决方案是我们每天在很多地方都能看到的,例如电影院大厅排队,汽油泵等。有很多人来订票,而根据进来的人数,增加了更多的人来发行票。 本质上,请参考此处的图像,您会注意到生产者将继续向内阁添加他们的工作,并且我们有更多的工人来处理工作量。
Java提供了并发包来解决此问题。 到现在为止,我一直在较低级别上进行线程工作,这是我第一次使用此程序包。 当我开始浏览网络并阅读其他博客作者的言论时,我遇到了一篇非常好的文章 。 它有助于非常有效地理解BlockingQueue的使用。 但是,Dhruba提供的解决方案并不能帮助我实现所需的高吞吐量。 因此,我开始探索对ArrayBlockingQueue的使用。
控制器
这是管理生产者和消费者之间的合同的第一类。 控制器将为生产者设置1个线程,为消费者设置2个线程。 根据需要,我们可以创建所需数量的线程。 甚至甚至可以从属性中读取数据或做一些动态魔术。 现在,我们将保持简单。
package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestProducerConsumer
{
public static void main(String args[])
{
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer("1", broker));
threadPool.execute(new Consumer("2", broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
我正在使用ExecuteService创建线程池并对其进行管理。 代替使用基本的Thread实现,这是一种更有效的方法,因为它将根据需要处理退出和重新启动线程。 您还将注意到,我正在使用Future类来获取生产者线程的状态。 该类非常有效,它将使我的程序停止进一步执行。 这是在线程上替换“ .join”方法的一种好方法。 注意:在这个例子中,我并不是很有效地使用Future。 因此您可能需要尝试一些适合自己的事情。
另外,您还应注意在生产者和消费者之间用作文件柜的Broker类。 我们将在短时间内看到它的实现。
生产者
此类负责产生需要处理的数据。
package com.kapil.techieforever.producerconsumer;
public class Producer implements Runnable
{
private Broker broker;
public Producer(Broker broker)
{
this.broker = broker;
}
@Override
public void run()
{
try
{
for (Integer i = 1; i < 5 + 1; ++i)
{
System.out.println("Producer produced: " + i);
Thread.sleep(100);
broker.put(i);
}
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
此类正在做它所能做的最简单的事情-向代理添加一个整数。 需要注意的一些关键领域是:
1. Broker上有一个属性,生产者在完成生产后最终会对其进行更新。 这也称为“最终”或“毒药”条目。 消费者使用它来知道不再有数据 2.我使用Thread.sleep来模拟某些生产者可能需要更多时间来生产数据。 您可以调整此值并查看消费者的行为
消费者
此类负责从代理读取数据并完成其工作
package com.kapil.techieforever.producerconsumer;
public class Consumer implements Runnable
{
private String name;
private Broker broker;
public Consumer(String name, Broker broker)
{
this.name = name;
this.broker = broker;
}
@Override
public void run()
{
try
{
Integer data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + this.name + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + this.name + " finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
这还是一个简单的类,它读取Integer并将其打印在控制台上。 但是,要注意的关键点是:
1.处理数据的循环是一个无限循环,它在两种情况下运行–直到生产者消费并且经纪人有一些数据为止
2.同样,Thread.sleep用于创建有效的不同方案
经纪人
package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Broker
{
public ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
public Boolean continueProducing = Boolean.TRUE;
public void put(Integer data) throws InterruptedException
{
this.queue.put(data);
}
public Integer get() throws InterruptedException
{
return this.queue.poll(1, TimeUnit.SECONDS);
}
}
首先要注意的是,我们使用ArrayBlockingQueue作为数据持有人。 我不会说这是什么,而是要您在此处的JavaDocs上阅读它。 但是,我将解释生产者将把数据放入队列,而使用者将以FIFO格式从队列中获取数据。 但是,如果生产者运行缓慢,则消费者将等待数据进入,如果阵列已满,生产者将等待数据填满。
另外,请注意,我使用的是“投票”功能,而不是进入队列。 这是为了确保消费者不会一直等待,等待会在几秒钟后超时。 这有助于我们进行相互交流,并在处理完所有数据后杀死消费者。 (注意:尝试用get代替poll,您将看到一些有趣的输出)。
码
我的代码位于Google项目托管上 。 随意浏览并从那里下载。 本质上,这是一个蚀(Spring STS)项目。 根据下载时间,您可能还会在下载时获得其他软件包和类。 也可以随意查看这些内容并分享您的评论
–您可以在SVN浏览器中浏览源代码,或者;
–您可以从项目本身下载它 。
侧面解决方案
最初,我在中间发布了此解决方案,但是后来我意识到这不是做事的方法,因此我从主要内容中删除了此内容,并将其放在最后。 最终解决方案的另一种变体是,工人/消费者一次不处理一项工作,而是一起处理多个工作,并在完成下一个工作之前先完成工作。 这种方法可以产生相似的结果,但是在某些情况下,如果我们有一些工作需要花费不同的时间才能完成,那么从本质上讲,这意味着某些工人比其他工人最终会更快地结束工作,从而造成了瓶颈。 并且,如果作业是事先分配的,这意味着所有消费者将在加工之前拥有所有作业(不是生产者-消费者模式),那么这个问题可能加起来甚至更多,并导致处理逻辑的更多延迟。
相关文章
- 队列是Devil自己的数据结构 (petewarden.typepad.com)
- 我对撒但的小帮手排队有错吗? (petewarden.typepad.com)
- http://code.google.com/p/disruptor/
参考: 并发模式: JCG合作伙伴的 生产者和消费者 Scratch Pad博客上的Kapil Viren Ahuja。
翻译自: https://www.javacodegeeks.com/2012/02/concurrency-pattern-producer-and.html