大数据集的处理是软件世界中最重要的问题之一。 Spring Batch是一个轻量级且强大的批处理框架,用于处理数据集。
Spring Batch Framework提供了“面向TaskletStep”和“面向块”的处理风格。 在本文中,将解释面向块的处理模型。 此外,绝对建议在Spring Batch中使用面向TaskletStep的处理在本文中,绝对建议您研究如何在Spring Batch中开发面向TaskletStep的处理。
Spring Batch v2.0附带了面向块的处理功能。 它是指一次读取一个数据,并在事务边界内创建要写出的“块”。 从ItemReader读取一项,并将其交给ItemProcessor并写入。 一旦读取的项目数等于提交间隔,就通过ItemWriter写入整个块,然后提交事务。
基本上,如果需要至少一个数据项的读写,则应使用此功能。 否则,如果只需要读取或写入数据项,则可以使用面向TaskletStep的处理。
面向块的处理模型通过org.springframework.batch.item包公开了三个重要的接口,分别是ItemReader , ItemProcessor和ItemWriter 。
- ItemReader:此接口用于提供数据。 它读取将要处理的数据。
- ItemProcessor:此接口用于项目转换。 它处理输入对象并转换为输出对象。
- ItemWriter:此接口用于常规输出操作。 它写入由ItemProcessor转换的数据。 例如,可以将数据写入数据库,内存或输出流(等)。 在此示例应用程序中,我们将写入数据库。
让我们看一下如何开发面向块的处理模型。
二手技术:
- JDK 1.7.0_09
- Spring3.1.3
- Spring批次2.1.9
- 休眠4.1.8
- Tomcat JDBC 7.0.27
- MySQL 5.5.8
- MySQL连接器5.1.17
- Maven的3.0.4
步骤1:建立已完成的专案
创建一个Maven项目,如下所示。 (可以使用Maven或IDE插件来创建它)。
步骤2:图书馆
通过执行以下脚本来创建新的用户表:
CREATE TABLE ONLINETECHVISION.USER (id int(11) NOT NULL AUTO_INCREMENT,name varchar(45) NOT NULL,surname varchar(45) NOT NULL,PRIMARY KEY (`id`)
);
步骤3:图书馆
首先,将依赖项添加到Maven的pom.xml中。
<properties><spring.version>3.1.3.RELEASE</spring.version><spring-batch.version>2.1.9.RELEASE</spring-batch.version></properties><dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency> <dependency><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-orm</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework.batch</groupId><artifactId>spring-batch-core</artifactId><version>${spring-batch.version}</version></dependency><!-- Hibernate dependencies --><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-core</artifactId><version>4.1.8.Final</version></dependency><!-- Tomcat DBCP --><dependency><groupId>org.apache.tomcat</groupId><artifactId>tomcat-jdbc</artifactId><version>7.0.27</version></dependency><!-- MySQL Java Connector library --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.17</version></dependency><!-- Log4j library --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency></dependencies>
maven-compiler-plugin (Maven插件)用于使用JDK 1.7编译项目
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.7</source><target>1.7</target></configuration></plugin>
以下Maven插件可用于创建runnable-jar ,
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><configuration><source>1.7</source><target>1.7</target></configuration><transformers><transformerimplementation='org.apache.maven.plugins.shade.resource.
ManifestResourceTransformer'><mainClass>com.onlinetechvision.exe.Application</mainClass></transformer><transformerimplementation='org.apache.maven.plugins.shade.resource.
AppendingTransformer'><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation='org.apache.maven.plugins.shade.resource.
AppendingTransformer'><resource>META-INF/spring.schemas</resource></transformer></transformers></configuration></execution></executions></plugin>
步骤4:建立使用者实体
用户实体已创建。 该实体将在处理后存储。
package com.onlinetechvision.user;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;/*** User Entity** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
@Entity
@Table(name='USER')
public class User {private int id;private String name;private String surname;@Id@GeneratedValue(strategy=GenerationType.AUTO)@Column(name='ID', unique = true, nullable = false)public int getId() {return id;}public void setId(int id) {this.id = id;}@Column(name='NAME', unique = true, nullable = false)public String getName() {return name;}public void setName(String name) {this.name = name;}@Column(name='SURNAME', unique = true, nullable = false)public String getSurname() {return surname;}public void setSurname(String surname) {this.surname = surname;} @Overridepublic String toString() {StringBuffer strBuff = new StringBuffer();strBuff.append('id : ').append(getId());strBuff.append(', name : ').append(getName());strBuff.append(', surname : ').append(getSurname());return strBuff.toString();}
}
步骤5:建立IUserDAO介面
创建IUserDAO接口以公开数据访问功能。
package com.onlinetechvision.user.dao;import java.util.List;import com.onlinetechvision.user.User;/*** User DAO Interface** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public interface IUserDAO {/*** Adds User** @param User user*/void addUser(User user);/*** Gets User List**/List<User> getUsers();
}
步骤6:建立UserDAO IMPL
通过实现IUserDAO接口创建UserDAO类。
package com.onlinetechvision.user.dao;import java.util.List;import org.hibernate.SessionFactory;import com.onlinetechvision.user.User;/*** User DAO** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class UserDAO implements IUserDAO {private SessionFactory sessionFactory;/*** Gets Hibernate Session Factory** @return SessionFactory - Hibernate Session Factory*/public SessionFactory getSessionFactory() {return sessionFactory;}/*** Sets Hibernate Session Factory** @param SessionFactory - Hibernate Session Factory*/public void setSessionFactory(SessionFactory sessionFactory) {this.sessionFactory = sessionFactory;}/*** Adds User** @param User user*/@Overridepublic void addUser(User user) {getSessionFactory().getCurrentSession().save(user);}/*** Gets User List** @return List - User list*/@SuppressWarnings({ 'unchecked' })@Overridepublic List<User> getUsers() {List<User> list = getSessionFactory().getCurrentSession().createQuery('from User').list();return list;}}
步骤7:建立IUserService介面
为服务层创建了IUserService接口。
package com.onlinetechvision.user.service;import java.util.List;import com.onlinetechvision.user.User;/**** User Service Interface** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public interface IUserService {/*** Adds User** @param User user*/void addUser(User user);/*** Gets User List** @return List - User list*/List<User> getUsers();
}
步骤8:创建UserService IMPL
通过实现IUserService接口创建UserService类。
package com.onlinetechvision.user.service;import java.util.List;import org.springframework.transaction.annotation.Transactional;import com.onlinetechvision.user.User;
import com.onlinetechvision.user.dao.IUserDAO;/**** User Service** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
@Transactional(readOnly = true)
public class UserService implements IUserService {IUserDAO userDAO;/*** Adds User** @param User user*/@Transactional(readOnly = false)@Overridepublic void addUser(User user) {getUserDAO().addUser(user);}/*** Gets User List**/@Overridepublic List<User> getUsers() {return getUserDAO().getUsers();}public IUserDAO getUserDAO() {return userDAO;}public void setUserDAO(IUserDAO userDAO) {this.userDAO = userDAO;}
}
步骤9:建立TestReader IMPL
TestReader类是通过实现ItemReader接口创建的。 调用该类是为了读取项目。 当read方法返回null时,读取操作完成。 以下步骤详细说明了如何执行firstJob。
firstjob的commit-interval值为2,并执行以下步骤:
1)调用firstTestReader以读取第一项(firstname_0,firstsurname_0)
2)再次调用firstTestReader以读取第二个项目(firstname_1,firstsurname_1)
3)调用testProcessor处理第一项(FIRSTNAME_0,FIRSTSURNAME_0) 4)调用testProcessor处理第二个项目(FIRSTNAME_1,FIRSTSURNAME_1) 5)调用testWriter将第一项(FIRSTNAME_0,FIRSTSURNAME_0)写入数据库 6)调用testWriter将第二项(FIRSTNAME_1,FIRSTSURNAME_1)写入数据库 7)提交第一项和第二项,并且关闭交易。 调用firstTestReader以读取第三项(firstname_2,firstsurname_2) 9)firstTestReader的maxIndex值为3。read方法返回null,并且项读取操作完成。 10)调用testProcessor处理第三项(FIRSTNAME_2,FIRSTSURNAME_2) 11)调用testWriter将第一项(FIRSTNAME_2,FIRSTSURNAME_2)写入数据库 12)第三项已提交,交易已关闭。
第一步已完成,状态为“已完成”,第二步已开始。 secondJob和thirdJob以相同的方式执行。
package com.onlinetechvision.item;import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;import com.onlinetechvision.user.User;/*** TestReader Class is created to read items which will be processed** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class TestReader implements ItemReader<User> {private int index;private int maxIndex;private String namePrefix;private String surnamePrefix;/*** Reads items one by one** @return User** @throws Exception* @throws UnexpectedInputException* @throws ParseException* @throws NonTransientResourceException**/@Overridepublic User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {User user = new User();user.setName(getNamePrefix() + '_' + index);user.setSurname(getSurnamePrefix() + '_' + index);if(index > getMaxIndex()) {return null;}incrementIndex();return user;}/*** Increments index which defines read-count** @return int**/private int incrementIndex() {return index++;}public int getMaxIndex() {return maxIndex;}public void setMaxIndex(int maxIndex) {this.maxIndex = maxIndex;}public String getNamePrefix() {return namePrefix;}public void setNamePrefix(String namePrefix) {this.namePrefix = namePrefix;}public String getSurnamePrefix() {return surnamePrefix;}public void setSurnamePrefix(String surnamePrefix) {this.surnamePrefix = surnamePrefix;}}
步骤10:创建FailedCaseTestReader IMPL
创建FailedCaseTestReader类以模拟失败的作业状态。 在此示例应用程序中,当在thirdStep处理thirdJob时,将调用failedCaseTestReader并引发异常,因此其状态将为FAILED。
package com.onlinetechvision.item;import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;import com.onlinetechvision.user.User;/*** FailedCaseTestReader Class is created in order to simulate the failed job status.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class FailedCaseTestReader implements ItemReader<User> {private int index;private int maxIndex;private String namePrefix;private String surnamePrefix;/*** Reads items one by one** @return User** @throws Exception* @throws UnexpectedInputException* @throws ParseException* @throws NonTransientResourceException**/@Overridepublic User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {User user = new User();user.setName(getNamePrefix() + '_' + index);user.setSurname(getSurnamePrefix() + '_' + index);if(index >= getMaxIndex()) {throw new Exception('Unexpected Error!');}incrementIndex();return user;}/*** Increments index which defines read-count** @return int**/private int incrementIndex() {return index++;}public int getMaxIndex() {return maxIndex;}public void setMaxIndex(int maxIndex) {this.maxIndex = maxIndex;}public String getNamePrefix() {return namePrefix;}public void setNamePrefix(String namePrefix) {this.namePrefix = namePrefix;}public String getSurnamePrefix() {return surnamePrefix;}public void setSurnamePrefix(String surnamePrefix) {this.surnamePrefix = surnamePrefix;}}
步骤11:创建TestProcessor IMPL
通过实现ItemProcessor接口来创建TestProcessor类。 此类称为处理项目。 从TestReader接收用户项,对其进行处理并返回给TestWriter。
package com.onlinetechvision.item;import java.util.Locale;import org.springframework.batch.item.ItemProcessor;import com.onlinetechvision.user.User;/*** TestProcessor Class is created to process items.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class TestProcessor implements ItemProcessor<User, User> {/*** Processes items one by one** @param User user* @return User* @throws Exception**/@Overridepublic User process(User user) throws Exception {user.setName(user.getName().toUpperCase(Locale.ENGLISH));user.setSurname(user.getSurname().toUpperCase(Locale.ENGLISH));return user;}}
步骤12:建立TestWriter IMPL
TestWriter类是通过实现ItemWriter接口创建的。 此类称为将项目写入DB,内存等…
package com.onlinetechvision.item;import java.util.List;import org.springframework.batch.item.ItemWriter;import com.onlinetechvision.user.User;
import com.onlinetechvision.user.service.IUserService;/*** TestWriter Class is created to write items to DB, memory etc...** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class TestWriter implements ItemWriter<User> {private IUserService userService;/*** Writes items via list** @throws Exception**/@Overridepublic void write(List<? extends User> userList) throws Exception {for(User user : userList) {getUserService().addUser(user);}System.out.println('User List : ' + getUserService().getUsers());}public IUserService getUserService() {return userService;}public void setUserService(IUserService userService) {this.userService = userService;}}
步骤13:创建失败的StepTasklet类
通过实现Tasklet接口创建FailedStepTasklet 。 它说明了失败步骤中的业务逻辑。
package com.onlinetechvision.tasklet;import org.apache.log4j.Logger;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;/*** FailedStepTasklet Class illustrates a failed job.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class FailedStepTasklet implements Tasklet {private static final Logger logger = Logger.getLogger(FailedStepTasklet.class);private String taskResult;/*** Executes FailedStepTasklet** @param StepContribution stepContribution* @param ChunkContext chunkContext* @return RepeatStatus* @throws Exception**/public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {logger.debug('Task Result : ' + getTaskResult());throw new Exception('Error occurred!');}public String getTaskResult() {return taskResult;}public void setTaskResult(String taskResult) {this.taskResult = taskResult;} }
步骤14:创建BatchProcessStarter类
创建BatchProcessStarter类以启动作业。 此外,它记录他们的执行结果。
package com.onlinetechvision.spring.batch;import org.apache.log4j.Logger;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;/*** BatchProcessStarter Class launches the jobs and logs their execution results.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class BatchProcessStarter {private static final Logger logger = Logger.getLogger(BatchProcessStarter.class);private Job firstJob;private Job secondJob;private Job thirdJob;private JobLauncher jobLauncher;private JobRepository jobRepository;/*** Starts the jobs and logs their execution results.**/public void start() {JobExecution jobExecution = null;JobParametersBuilder builder = new JobParametersBuilder();try {getJobLauncher().run(getFirstJob(), builder.toJobParameters());jobExecution = getJobRepository().getLastJobExecution(getFirstJob().getName(), builder.toJobParameters());logger.debug(jobExecution.toString()); getJobLauncher().run(getSecondJob(), builder.toJobParameters());jobExecution = getJobRepository().getLastJobExecution(getSecondJob().getName(), builder.toJobParameters());logger.debug(jobExecution.toString());getJobLauncher().run(getThirdJob(), builder.toJobParameters());jobExecution = getJobRepository().getLastJobExecution(getThirdJob().getName(), builder.toJobParameters());logger.debug(jobExecution.toString());} catch (JobExecutionAlreadyRunningException| JobRestartException| JobInstanceAlreadyCompleteException| JobParametersInvalidException e) {logger.error(e);}} public Job getFirstJob() {return firstJob;}public void setFirstJob(Job firstJob) {this.firstJob = firstJob;}public Job getSecondJob() {return secondJob;}public void setSecondJob(Job secondJob) {this.secondJob = secondJob;} public Job getThirdJob() {return thirdJob;}public void setThirdJob(Job thirdJob) {this.thirdJob = thirdJob;}public JobLauncher getJobLauncher() {return jobLauncher;}public void setJobLauncher(JobLauncher jobLauncher) {this.jobLauncher = jobLauncher;}public JobRepository getJobRepository() {return jobRepository;}public void setJobRepository(JobRepository jobRepository) {this.jobRepository = jobRepository;} }
步骤15:创建dataContext.xml
jdbc.properties已创建。 它定义数据源信息,并通过dataContext.xml读取
jdbc.db.driverClassName=com.mysql.jdbc.Driver
jdbc.db.url=jdbc:mysql://localhost:3306/onlinetechvision
jdbc.db.username=root
jdbc.db.password=root
jdbc.db.initialSize=10
jdbc.db.minIdle=3
jdbc.db.maxIdle=10
jdbc.db.maxActive=10
jdbc.db.testWhileIdle=true
jdbc.db.testOnBorrow=true
jdbc.db.testOnReturn=true
jdbc.db.initSQL=SELECT 1 FROM DUAL
jdbc.db.validationQuery=SELECT 1 FROM DUAL
jdbc.db.timeBetweenEvictionRunsMillis=30000
步骤16:创建dataContext.xml
Spring配置文件dataContext.xml已创建。 它涵盖了dataSource,sessionFactory和transactionManager定义。
<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns:context='http://www.springframework.org/schema/context'xmlns:p='http://www.springframework.org/schema/p'xmlns:batch='http://www.springframework.org/schema/batch'xmlns:tx='http://www.springframework.org/schema/tx'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/batchhttp://www.springframework.org/schema/batch/spring-batch-2.1.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-3.0.xsd'><context:property-placeholder location='classpath:jdbc.properties'/><!-- Enable the configuration of transactional behavior based on annotations --><tx:annotation-driven transaction-manager='transactionManager'/><!-- Data Source Declaration --><bean id='dataSource' class='org.apache.tomcat.jdbc.pool.DataSource' destroy-method='close'p:driverClassName='${jdbc.db.driverClassName}'p:url='${jdbc.db.url}'p:username='${jdbc.db.username}'p:password='${jdbc.db.password}'p:initialSize='${jdbc.db.initialSize}'p:minIdle='${jdbc.db.minIdle}'p:maxIdle='${jdbc.db.maxIdle}'p:maxActive='${jdbc.db.maxActive}'p:testWhileIdle='${jdbc.db.testWhileIdle}'p:testOnBorrow='${jdbc.db.testOnBorrow}'p:testOnReturn='${jdbc.db.testOnReturn}'p:initSQL='${jdbc.db.initSQL}'p:validationQuery='${jdbc.db.validationQuery}'p:timeBetweenEvictionRunsMillis='${jdbc.db.timeBetweenEvictionRunsMillis}'/> <!-- Session Factory Declaration --><bean id='sessionFactory' class='org.springframework.orm.hibernate4.LocalSessionFactoryBean'><property name='dataSource' ref='dataSource' /><property name='annotatedClasses'><list><value>com.onlinetechvision.user.User</value></list></property><property name='hibernateProperties'><props><prop key='hibernate.dialect'>org.hibernate.dialect.MySQLDialect</prop><prop key='hibernate.show_sql'>true</prop></props></property></bean><!-- Transaction Manager Declaration --><bean id='transactionManager' class='org.springframework.orm.hibernate4.HibernateTransactionManager'><property name='sessionFactory' ref='sessionFactory'/></bean></beans>
步骤17:创建jobContext.xml
Spring配置文件jobContext.xml已创建。 它涵盖jobRepository,jobLauncher,项目读取器,项目处理器,项目编写器,tasklet和作业定义。
<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns:batch='http://www.springframework.org/schema/batch'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/batchhttp://www.springframework.org/schema/batch/spring-batch-2.1.xsd'><import resource='dataContext.xml'/><!-- jobRepository Declaration --><bean id='jobRepository' class='org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean'><property name='transactionManager' ref='transactionManager' /></bean><!-- jobLauncher Declaration --><bean id='jobLauncher' class='org.springframework.batch.core.launch.support.SimpleJobLauncher' ><property name='jobRepository' ref='jobRepository'/></bean><!-- Reader Bean Declarations --><bean id='firstTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='2'/><property name='namePrefix' value='firstname'/><property name='surnamePrefix' value='firstsurname'/></bean><bean id='secondTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='2'/><property name='namePrefix' value='secondname'/><property name='surnamePrefix' value='secondsurname'/></bean><bean id='thirdTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='3'/><property name='namePrefix' value='thirdname'/><property name='surnamePrefix' value='thirdsurname'/></bean><bean id='fourthTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='3'/><property name='namePrefix' value='fourthname'/><property name='surnamePrefix' value='fourthsurname'/></bean><bean id='fifthTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='3'/><property name='namePrefix' value='fifthname'/><property name='surnamePrefix' value='fifthsurname'/></bean><bean id='failedCaseTestReader' class='com.onlinetechvision.item.FailedCaseTestReader'><property name='maxIndex' value='1'/><property name='namePrefix' value='failedcasename'/><property name='surnamePrefix' value='failedcasesurname'/></bean><!-- Processor Bean Declaration --><bean id='testProcessor' class='com.onlinetechvision.item.TestProcessor' /><!-- Writer Bean Declaration --><bean id='testWriter' class='com.onlinetechvision.item.TestWriter' ><property name='userService' ref='userService'/></bean><!-- Failed Step Tasklet Declaration --><bean id='failedStepTasklet' class='com.onlinetechvision.tasklet.FailedStepTasklet'><property name='taskResult' value='Error occurred!' /></bean> <!-- Batch Job Declarations --><batch:job id='firstJob'><batch:step id='firstStep' next='secondStep'><batch:tasklet><batch:chunk reader='firstTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet></batch:step><batch:step id='secondStep'><batch:tasklet><batch:chunk reader='secondTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet></batch:step></batch:job><batch:job id='secondJob'><batch:step id='thirdStep'><batch:tasklet><batch:chunk reader='thirdTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet><batch:next on='*' to='fourthStep' /><batch:next on='FAILED' to='firstFailedStep' /></batch:step><batch:step id='fourthStep'><batch:tasklet><batch:chunk reader='fourthTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet></batch:step><batch:step id='firstFailedStep'><batch:tasklet ref='failedStepTasklet' /></batch:step></batch:job><batch:job id='thirdJob'><batch:step id='fifthStep'><batch:tasklet><batch:chunk reader='failedCaseTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet><batch:next on='*' to='sixthStep' /><batch:next on='FAILED' to='secondFailedStep' /></batch:step><batch:step id='sixthStep'><batch:tasklet><batch:chunk reader='fifthTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet></batch:step><batch:step id='secondFailedStep'><batch:tasklet ref='failedStepTasklet' /></batch:step></batch:job></beans>
步骤18:创建applicationContext.xml
Spring配置文件applicationContext.xml已创建。 它涵盖了bean的定义。
<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns:batch='http://www.springframework.org/schema/batch'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/batchhttp://www.springframework.org/schema/batch/spring-batch-2.1.xsd'><import resource='jobContext.xml'/><!-- User DAO Declaration --><bean id='userDAO' class='com.onlinetechvision.user.dao.UserDAO'><property name='sessionFactory' ref='sessionFactory' /></bean><!-- User Service Declaration --><bean id='userService' class='com.onlinetechvision.user.service.UserService'><property name='userDAO' ref='userDAO' /></bean> <!-- BatchProcessStarter Declaration --><bean id='batchProcessStarter' class='com.onlinetechvision.spring.batch.BatchProcessStarter'><property name='jobLauncher' ref='jobLauncher'/><property name='jobRepository' ref='jobRepository'/><property name='firstJob' ref='firstJob'/><property name='secondJob' ref='secondJob'/><property name='thirdJob' ref='thirdJob'/></bean> </beans>
步骤19:创建应用程序类
创建应用程序类以运行应用程序。
package com.onlinetechvision.exe;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;import com.onlinetechvision.spring.batch.BatchProcessStarter;/*** Application Class starts the application.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class Application {/*** Starts the application** @param String[] args**/public static void main(String[] args) {ApplicationContext appContext = new ClassPathXmlApplicationContext('applicationContext.xml');BatchProcessStarter batchProcessStarter = (BatchProcessStarter)appContext.getBean('batchProcessStarter');batchProcessStarter.start();}}
步骤20:建立专案
在构建OTV_SpringBatch_Chunk_Oriented_Processing项目之后,将创建OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar 。
步骤21:运行项目
运行创建的OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar文件后,将显示以下数据库和控制台输出日志:
数据库截图:
First Job的控制台输出:
16.12.2012 19:30:41 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=firstJob]] launched with the following parameters: [{}]16.12.2012 19:30:41 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=0, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:41 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]]User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2]16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:43) - JobExecution: id=0, version=2, startTime=Sun Dec 16 19:30:41 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]]
Second Job的控制台输出:
16.12.2012 19:30:42 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=secondJob]] launched with the following parameters: [{}]16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=1, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]]User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2, id : 187, name : THIRDNAME_0, surname : THIRDSURNAME_0, id : 188, name : THIRDNAME_1, surname : THIRDSURNAME_1, id : 189, name : THIRDNAME_2, surname : THIRDSURNAME_2, id : 190, name : THIRDNAME_3, surname : THIRDSURNAME_3, id : 191, name : FOURTHNAME_0, surname : FOURTHSURNAME_0, id : 192, name : FOURTHNAME_1, surname : FOURTHSURNAME_1, id : 193, name : FOURTHNAME_2, surname : FOURTHSURNAME_2, id : 194, name : FOURTHNAME_3, surname : FOURTHSURNAME_3]16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:47) - JobExecution: id=1, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]]
Third Job的控制台输出:
16.12.2012 19:30:42 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=thirdJob]] launched with the following parameters: [{}]16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=2, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]]16.12.2012 19:30:42 DEBUG (TransactionTemplate.java:159) - Initiating transaction rollback on application exception
org.springframework.batch.repeat.RepeatException: Exception in batch process; nested exception is java.lang.Exception: Unexpected Error!
...16.12.2012 19:30:43 DEBUG (BatchProcessStarter.java:51) - JobExecution: id=2, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:43 GMT 2012, lastUpdated=Sun Dec 16 19:30:43 GMT 2012, status=FAILED, exitStatus=exitCode=FAILED;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]]
步骤22:下载
https://github.com/erenavsarogullari/OTV_SpringBatch_Chunk_Oriented_Processing
资源:
Spring Batch中的块处理
参考: Online Technology Vision博客中来自我们JCG合作伙伴 Eren Avsarogullari的Spring Batch中面向块的处理 。
翻译自: https://www.javacodegeeks.com/2012/12/chunk-oriented-processing-in-spring-batch.html