扩展Spring Batch –步骤分区

在之前的几篇文章中,我们已经讨论了如何启动和运行Spring Batch。 现在,我们将开始讨论可用于扩展Spring Batch的一些策略。

本文将重点介绍如何对步骤进行分区,以使该步骤具有多个线程,每个线程并行处理一块数据。 如果您有大量的数据可以在逻辑上拆分为可以并行处理的较小的块,这将非常有帮助。 这种工作方式是,您将定义一个主要步骤,该步骤负责确定块的基础,然后将所有这些块都植入到一组从属步骤中,以处理每个块。

分区

如果我能回到过去的经验,那么一个很好的例子就是在大型采购系统中处理每个公司的所有每日发票。 我们将要处理的数据可以按处理的每个公司在逻辑上进行拆分。 假设有250家公司参与此采购系统,我们的分区步骤已定义为具有15个线程。 我们的分区程序可能会运行查询,以查找当天有等待处理发票的所有公司。 此时,分区程序的职责是为每个公司创建一个ExecutionContext并将其添加到具有唯一键的地图中。 该ExecutionContext应该包含处理该公司的发票所需的任何信息,例如公司ID和任何其他相关信息。 当分区程序返回ExecutionContexts的映射时,Spring Batch将为映射中的每个条目创建一个新的Step ,并将键值用作步骤名称的一部分。 根据配置(例如15个线程),它将创建15个线程的池并开始一次并行执行15个步骤。 例如,如果您有85个步骤,Spring Batch将开始执行15个步骤,并且在完成每个步骤后,完成该步骤的线程将接下一个步骤并开始执行,直到所有步骤都已完成。

一个例子

现在,我们对分区的工作原理有了基本的了解,让我们看一个简单的示例。 对于我们的用例,我们将检查入站目录,传入的供应商目录文件将被转储到该目录中。 因此,要创建一个Spring Batch分区程序,我们需要创建一个实现Spring Batch的Partitioner接口的类。 由于这是通用的并且可以重用,所以我们将调用此类MultiFileResourcePartitioner ,这是一个简单的POJO,并且只有一个字段名称“ inboundDir”代表包含要处理文件的目录的路径。 Partitioner接口指定该类应实现一个名为“ partition”的方法,该方法采用一个表示网格大小的int参数,并返回一个保存ExecutionContext的Map。

这是MultiFileResourcePartitioner的类列表:

package com.keyhole.example.partition;import java.io.File;
import java.util.HashMap;
import java.util.Map;import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.core.io.FileSystemResource;public class MultiFileResourcePartitioner implements Partitioner {private String inboundDir;@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();File dir = new File(inboundDir);if (dir.isDirectory()) {File[] files = dir.listFiles();for (File file : files) {ExecutionContext context = new ExecutionContext();context.put("fileResource", file.toURI().toString());partitionMap.put(file.getName(), context);}}return partitionMap;}public String getInboundDir() {return inboundDir;}public void setInboundDir(String inboundDir) {this.inboundDir = inboundDir;}}

在我们的应用程序上下文中,此bean的配置如下所示:

<bean id="inventoryFilePartitioner"class="com.keyhole.example.partition.MultiFileResourcePartitioner"scope="step"><property name="inboundDir" value="/data/inbound" />
</bean>

查看实现的分区方法,我们只列出所有在指定入站目录中找到的文件,并为找到的每个文件创建一个ExecutionContext并将其添加到返回的地图中。 用于将每个ExecutionContext放入映射中的唯一键也将成为为映射中的每个条目创建的步骤名称的一部分。 Spring Batch将使用该分区映射根据映射中找到的每个键创建一个从属步骤。

要对步骤进行分区,您需要首先创建分区配置将引用的步骤。 该步骤应与Spring Batch中的任何其他步骤一样进行配置,在此示例中,我们将定义FlatFileItemReader和简单的ItemWriter ,它们将仅调用toString()方法并将其记录到控制台。

这是该踏板及其相关弹簧弹片的配置详细信息。 这里要注意的重要一点是,我们将ItemReader的作用域限定在步骤级别,这样我们就不会在使用同一bean处理数据的多个线程中遇到任何问题。 我们还需要以这种方式确定它们的作用域,以便我们可以使用Spring后期绑定在Step的ExecutionContext中指定保存文件资源的值。

<batch:step id="inventoryLoadStep"xmlns="http://www.springframework.org/schema/batch"><batch:tasklet transaction-manager="transactionManager"><batch:chunk reader="inventoryLoadReader" writer="logItemWriter"commit-interval="5000" /></batch:tasklet>
</batch:step>
<bean name="inventoryLoadReader" scope="step"class="org.springframework.batch.item.file.FlatFileItemReader"><property name="resource"value="#{stepExecutionContext['fileResource']}" /><property name="lineMapper" ref="inventoryLineMapper" />
<property name="linesToSkip" value="1" />
</bean><bean name="inventoryLineMapper"class="org.springframework.batch.item.file.mapping.DefaultLineMapper"><property name="fieldSetMapper" ref="inventoryFieldMapper" /><property name="lineTokenizer" ref="inventoryLineTokenizer" />
</bean><bean name="inventoryLineTokenizer" 		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" />

由于在此示例中我们正在读取和处理以逗号分隔的文本文件,因此对于此步骤配置,我们只需编写很少的代码。 我们唯一需要实现的代码是将的内容映射到表示文件记录的对象所需的FieldSetMapper 。 文件中的每一行都将包含“类别”,“子类别”,“描述”,“目录编号”,“颜色”,“尺寸”,“价格”和“数量”字段。 因此,我们的对象将包含这些字段,并且我们的FieldSetMapper代码清单将如下所示。

package com.keyhole.example.partition;import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindException;@Component("inventoryFieldMapper")
public class InventoryItemFieldSetMapper implements FieldSetMapper<InventoryItem> {@Overridepublic InventoryItem mapFieldSet(FieldSet fieldSet) throws BindException {InventoryItem item = new InventoryItem();item.setCategory(fieldSet.readString(0));item.setSubCategory(fieldSet.readString(1));item.setDescription(fieldSet.readString(2));item.setCatalogNum(fieldSet.readString(3));item.setColor(fieldSet.readString(4));item.setSize(fieldSet.readString(5));item.setPrice(fieldSet.readDouble(6));item.setQty(fieldSet.readInt(7));return item;}}

现在我们已经创建并配置了Partitioner和Step,剩下要做的就是配置分区步骤本身! 就像这样简单:

<batch:job id="InventoryLoader"><batch:step id="partitionedInventoryLoadStep"><batch:partition step="inventoryLoadStep" partitioner="inventoryFilePartitioner"><batch:handler grid-size="10" task-executor="inventoryLoadTaskExecutor" /></batch:partition></batch:step>
</batch:job>

在配置分区步骤时,您可以像定义其他步骤一样定义一个步骤,方法是为其指定一个ID,并根据需要指定下一步骤的值。 Spring Batch没有将步骤的内容定义为普通的块或任务集,而是提供了一个分区标签,该标签要求您指定要分区的作业步骤以及将用于确定数据块的Partitioner。 您还需要定义将要处理这些步骤的分区处理程序,在这种情况下,我们将使用ThreadPoolTask​​Executor ,该线程处理程序的线程池大小为10,如果不使用它们,则允许它们超时。

<bean id="inventoryLoadTaskExecutor"class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="10" /><property name="maxPoolSize" value="10" /><property name="allowCoreThreadTimeOut" value="true" />
</bean>

因此,如果您的Spring Batch流程具有处理大量记录的步骤,并且您对提高性能感兴趣,请考虑尝试进行步骤分区。 它应该易于实现,并为您提供一些其他性能,以帮助加快处理时间。

其他资源

对于与本文相关的示例代码,我已将源代码上传到位于https://github.com/jonny-hackett/batch-example的github存储库中。 要执行与本文相关的示例代码,有一个JUnit测试名称InventoryLoadTest 。 数据文件位于src / test / resources / data / inbound下,需要放置在与Partitioner入站目录匹配的本地目录中。 另外,请访问http://docs.spring.io/spring-batch/reference/html/scalability.html 。

参考:在Keyhole Software博客上从我们的JCG合作伙伴 Keyhole Software 扩展Spring Batch –步骤分区 。

翻译自: https://www.javacodegeeks.com/2013/12/scaling-spring-batch-step-partitioning.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/366109.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zabbix磁盘的自动发现与磁盘指标监控

由于最近项目上需要对服务器监控进行规范化监控&#xff0c;再磁盘这块有几种方式 1.如果每台设备的磁盘是一样的 比如都有vda,vdb两块磁盘那么可以采用 1.1 每台客户端写脚本&#xff0c;服务端每台设备去加上监控项&#xff08;------最次的手段-------------&#xff09; 1.…

[代码笔记]VUE路由根据返回状态判断添加响应拦截器

//返回状态判断(添加响应拦截器)Axios.interceptors.response.use(res > {//对响应数据做些事if (res.data && !res.data.success) {Message({// 饿了么的消息弹窗组件,类似toastshowClose: true,message: res.data.error.message.message? res.data.error.messag…

php前台用户权限开通,vue实现网站前台的权限管理

本文主要介绍了基于vue实现网站前台的权限管理(前后端分离实践)&#xff0c;小编觉得挺不错的&#xff0c;现在分享给大家&#xff0c;也给大家做个参考。一起跟随小编过来看看吧&#xff0c;希望能帮助到大家。Javascript做为当下的热门语言&#xff0c;用途很广泛&#xff0c…

一种替代的多生产者方法

最近在InfoQ上&#xff0c;Aliasei Papou发表了一篇关于他的一些实验的文章 &#xff0c;该实验在线程之间进行了高性能的消息交换。 本文中有许多示例&#xff0c;但我将重点介绍多生产者案例。 文章显示的一种优化方法是&#xff0c;如果您知道初始化时拥有的生产者数量&…

maven连接国内仓库

<mirrors> <!-- 阿里云仓库 --> <mirror> <id>alimaven</id> <mirrorOf>central</mirrorOf> <name>aliyun maven</name> <url>http://maven.aliyun.c…

python知识点汇总_Python知识点总结大全(一)

python逻辑运算符 1.成员 and or not 优先级&#xff1a;() > not > and > or 2.and 逻辑运算符and&#xff0c;a andb&#xff0c;如果a和b都为True&#xff0c;则返回True&#xff0c;如果其中一个为False&#xff0c;返回False&#xff0c;简言之&#xff1a;一假则…

JS ES6中的箭头函数(Arrow Functions)使用

转载这篇ES6的箭头函数方便自己查阅。 ES6可以使用“箭头”&#xff08;>&#xff09;定义函数&#xff0c;注意是函数&#xff0c;不要使用这种方式定义类&#xff08;构造器&#xff09;。 一、语法 基础语法 (参数1, 参数2, …, 参数N) > { 函数声明 }(参数1, 参数…

php中trim的用法和例子,PHP ltrim()用法及代码示例

ltrim()函数是PHP中的内置函数&#xff0c;可从字符串左侧删除空格或其他字符(如果指定)。用法:ltrim( $string, $charlist )参数&#xff1a;函数ltrim()接受两个参数&#xff0c;如上面的语法所示。在这两个参数中&#xff0c;一个是必需的&#xff0c;另一个是可选的。下面将…

python怎样编写定时程序_Python如何实现定时器功能

Timer: 隔一定时间调用一个函数,如果想实现每隔一段时间就调用一个函数的话&#xff0c;就要在Timer调用的函数中&#xff0c;再次设置Timer。Timer是Thread的一个派生类 python中的线程提供了java线程功能的子集。 #!/usr/bin/env python from threading import Timer import …

应用ForkJoin –从最佳到快速

到目前为止&#xff0c;JDK 7已很好地掌握在开发人员手中&#xff0c;并且大多数人都听说过ForkJoin&#xff0c;但是没有多少人有时间或机会去尝试它。 它引起了&#xff0c;并且可能仍然引起一些混乱&#xff0c;与普通线程池有什么不同。 [1] 我在本文中的目标是通过一个代…

Echarts-K线图提示框改头换面

工作&#xff1a; 使用Hbuilder建web工程&#xff0c;加入echarts相关库&#xff0c;根据需要更改K线图及其的提示样式&#xff0c;去除默认提示&#xff0c;使用异步加载echarts的数据&#xff0c;数据格式为json。 需要注意的K线图和5日均线&#xff0c;10日均线的意义&#…

JS对url进行编码和解码(三种方式区别)

Javascript语言用于编码的函数&#xff0c;一共有三个&#xff0c;最古老的一个就是escape()。虽然这个函数现在已经不提倡使用了&#xff0c;但是由于历史原因&#xff0c;很多地方还在使用它&#xff0c;所以有必要先从它讲起。 escape 和 unescape 实际上&#xff0c;esca…

elasticsearch启动错误解决

es启动默认不能使用root用户&#xff0c;所以需要新创建一个用户来启动。 启动时可能出现的问题: [1]: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536] [2]: max number of threads [1024] for user [esuser1] is too low, …

vc6.0 matlab混合编程,matlab2010 + vc6.0混合编程实例(调用dll)

不想解释太多直接上代码吧&#xff01;&#xff01;&#xff01;在对matlab配置后&#xff0c;上代码mcc -W cpplib:libMyAdd -T link:lib MyAdd -C就可以了&#xff0c;记得加上-C在对VC6.0进行配置的时候只要把 matlabroot\extern\include和matlabroot\extern\lib\win32\micr…

canvas换图时候会闪烁_Canvas实现图片上标注、缩放、移动和保存历史状态,纯干货(附CSS 3变化公式)...

(给前端大学加星标&#xff0c;提升前端技能.)作者&#xff1a;zhcxk1998https://juejin.im/user/5d4304bdf265da03d15531dc哈哈哈俺又来啦&#xff0c;这次带来的是canvas实现一些画布功能的文章&#xff0c;希望大家喜欢&#xff01;这个css3变化公式可以适用于平常我们使用的…

Azure DevOps Server (TFS)中代码文件换行问题解决方案(Git)

之前写过一篇博客“探索TFS Git 库文件换行&#xff08;CRLF&#xff09;的处理方式”&#xff0c;主要是针对TFVC代码库的。下面这篇文章说明如何在TFS的Git库中处理代码换行的问题。概述在Azure DevOps Server&#xff08;之前叫TFS&#xff09; 中使用Git管理源代码&#xf…

RMI强制Full GC每小时运行一次

在我职业生涯中进行的所有故障排除练习中&#xff0c;我感到随着时间的推移&#xff0c;我所追寻的错误在不断发展&#xff0c;变得越来越卑鄙和丑陋。 也许仅仅是我的年龄开始了。这个特别的Heisenbug –看起来像这篇帖子一样&#xff0c;再次让我清醒了很多&#xff0c;而不是…

apache配置-html碎片shtml格式

修改SSI 文件 conf–httpd.conf <Directory "D:/Android/Apache2.2/htdocs"> //修改文件目录 # # Possible values for the Options directive are "None", "All", # or any combination of: # Indexes Includes FollowSymLinks SymL…

php+条件限定符,const 限定符

const 限定符const 对象一旦创建后不可改变,所以const必须初始化.const int iget_size(); //运行时初始化const int j43;const int k; //错误,必须初始化默认状态下,const对象仅在文件中有效,解决办法是对于const变量不管是声明还说定义都添加extern关键字extern const int buf…

git删除远程分支文件,不改变本地文件

git提交项目时候踩的Git的坑 特别 由于准备春招&#xff0c;所以希望各位看客方便的话&#xff0c;能去github上面帮我Star一下项目https://github.com/Draymonders/Campus-Shop 经历 由于刚开始没有设置.gitignore文件&#xff0c;导致项目中所有的文件都被提交到了github上面…