比原始线程版本更好的方法是基于线程池的线程池,其中基于运行任务的系统定义了适当的线程池大小– CPU数量/(任务的1-Blocking Coefficient)。 Venkat Subramaniams书中有更多详细信息:
首先,在给定“报告部件请求”的情况下,我定义了一个自定义任务来生成“报告部件”,将其实现为Callable :
public class ReportPartRequestCallable implements Callable<ReportPart> {private final ReportRequestPart reportRequestPart;private final ReportPartGenerator reportPartGenerator;public ReportPartRequestCallable(ReportRequestPart reportRequestPart, ReportPartGenerator reportPartGenerator) {this.reportRequestPart = reportRequestPart;this.reportPartGenerator = reportPartGenerator;}@Overridepublic ReportPart call() {return this.reportPartGenerator.generateReportPart(reportRequestPart);}
}
public class ExecutorsBasedReportGenerator implements ReportGenerator {private static final Logger logger = LoggerFactory.getLogger(ExecutorsBasedReportGenerator.class);private ReportPartGenerator reportPartGenerator;private ExecutorService executors = Executors.newFixedThreadPool(10);@Overridepublic Report generateReport(ReportRequest reportRequest) {List<Callable<ReportPart>> tasks = new ArrayList<Callable<ReportPart>>();List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();for (ReportRequestPart reportRequestPart : reportRequestParts) {tasks.add(new ReportPartRequestCallable(reportRequestPart, reportPartGenerator));}List<Future<ReportPart>> responseForReportPartList;List<ReportPart> reportParts = new ArrayList<ReportPart>();try {responseForReportPartList = executors.invokeAll(tasks);for (Future<ReportPart> reportPartFuture : responseForReportPartList) {reportParts.add(reportPartFuture.get());}} catch (Exception e) {logger.error(e.getMessage(), e);throw new RuntimeException(e);}return new Report(reportParts);}......
}
在这里,使用Executors.newFixedThreadPool(10)调用创建线程池,线程池的大小为10,为每个报告请求部分生成一个可调用任务,并使用ExecutorService抽象将其移交给线程池
responseForReportPartList = executors.invokeAll(tasks);
此调用返回一个期货列表,该列表支持get()方法,这是对响应可用的阻塞调用。
与原始线程版本相比,这显然是一个更好的实现,线程数量在负载下被限制为可管理的数量。
基于Spring集成的实现
我个人最喜欢的方法是使用Spring Integration ,原因是使用Spring Integration时 ,我专注于完成不同任务的组件,并留给Spring Integration使用基于xml或基于注释的配置将流程连接在一起。 在这里,我将使用基于XML的配置:
在我的案例中,这些组件是:
1.给出报告部分的请求,生成报告部分的组件,我之前已经显示过 。
2.用于将报告请求拆分为报告请求部分的组件:
public class DefaultReportRequestSplitter implements ReportRequestSplitter{@Overridepublic List<ReportRequestPart> split(ReportRequest reportRequest) {return reportRequest.getRequestParts();}
}
3.用于将报告部分组装/汇总为整个报告的组件:
public class DefaultReportAggregator implements ReportAggregator{@Overridepublic Report aggregate(List<ReportPart> reportParts) {return new Report(reportParts);}}
这就是Spring Integration所需的所有Java代码,其余的都是接线–在这里,我使用了Spring Integration配置文件:
<?xml version='1.0' encoding='UTF-8'?>
<beans ....<int:channel id='report.partsChannel'/><int:channel id='report.reportChannel'/><int:channel id='report.partReportChannel'><int:queue capacity='50'/></int:channel> <int:channel id='report.joinPartsChannel'/><int:splitter id='splitter' ref='reportsPartSplitter' method='split' input-channel='report.partsChannel' output-channel='report.partReportChannel'/><task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' /><int:service-activator id='reportsPartServiceActivator' ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator><int:aggregator ref='reportAggregator' method='aggregate' input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> <int:gateway id='reportGeneratorGateway' service-interface='org.bk.sisample.springintegration.ReportGeneratorGateway' default-request-channel='report.partsChannel' default-reply-channel='report.reportChannel'/><bean name='reportsPartSplitter' class='org.bk.sisample.springintegration.processors.DefaultReportRequestSplitter'></bean><bean name='reportPartReportGenerator' class='org.bk.sisample.processors.DummyReportPartGenerator'/><bean name='reportAggregator' class='org.bk.sisample.springintegration.processors.DefaultReportAggregator'/><bean name='reportGenerator' class='org.bk.sisample.springintegration.SpringIntegrationBasedReportGenerator'/></beans>
Spring Source Tool Suite提供了一种可视化此文件的好方法:
这完全符合我对用户流的原始看法:
在代码的Spring Integration版本中,我定义了不同的组件来处理流程的不同部分:
1.将报告请求转换为报告请求部分的拆分器:
<int:splitter id='splitter' ref='reportsPartSplitter' method='split' input-channel='report.partsChannel' output-channel='report.partReportChannel'/>
2.服务激活器组件,用于根据报告部件请求生成报告部件:
<int:service-activator id='reportsPartServiceActivator' ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator>
3.聚合器,用于将报表部分重新加入报表,并且足够智能,可以适当地关联原始拆分报表请求,而无需任何显式编码:
<int:aggregator ref='reportAggregator' method='aggregate' input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator>
这段代码有趣的是,就像在基于执行者的示例中一样,使用xml文件,通过使用适当的通道将不同的组件连接在一起以及通过使用任务执行器 ,可以完全配置服务于每个组件的线程数。设置为执行程序属性的线程池大小。
在这段代码中,我定义了一个队列通道,其中报告请求部分进入其中:
<int:channel id='report.partReportChannel'><int:queue capacity='50'/></int:channel>
并由服务激活器组件使用任务执行器提供服务,该任务执行器的线程池大小为10,容量为50:
<task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' /><int:service-activator id='reportsPartServiceActivator' ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator>
所有这些都通过配置!
该示例的完整代码库可在以下github位置获得: https : //github.com/bijukunjummen/si-sample
参考: 并发–来自JCG合作伙伴 Biju Kunjummen的“ 执行程序和Spring集成” ,位于all和其他博客上。
翻译自: https://www.javacodegeeks.com/2012/06/concurrency-executors-and-spring.html