Spring batch 系列文章
Spring Batch教程(一) 简单的介绍以及通过springbatch将xml文件转成txt文件
Spring Batch教程(二)示例:将txt文件转成xml文件以及读取xml文件内容存储到数据库mysql
Spring Batch教程(三)示例:从mysql中读取数据写入文本和从多个文本中读取内容写入mysql
Spring Batch教程(四)tasklet使用示例:spring batch的定时任务使用
Spring Batch教程(五)spring boot实现batch功能注解示例:读写文本文件
Spring Batch教程(六)spring boot实现batch功能注解示例:读文件写入mysql
文章目录
- Spring batch 系列文章
- 一、示例1:从mysql中读取数据写入txt文件
- 1、maven依赖
- 2、创建mysql 表并插入数据
- 3、PersonInfo bean
- 4、建立RowMapper
- 5、创建ItemProcessor实现类
- 6、添加Job listener(JobExecutionListener)
- 7、进行job的配置
- 1)、job配置
- 2)、数据源配置
- 8、创建一个运行job的main类
- 9、验证
- 1)、控制台输出
- 2)、程序结果输出
- 二、示例2:从多数据源文件读取写入mysql
- 1、maven依赖
- 2、创建表
- 3、PersonInfo bean
- 4、建立FieldSetMapper
- 5、创建ItemProcessor实现类
- 6、添加Job listener(JobExecutionListener)
- 7、进行job的配置
- 1)、数据源配置
- 2)、hibernate配置
- 3)、job配置
- 8、创建一个运行job的main类
- 9、准备测试数据
- 10、验证
- 1)、控制台输出
- 2)、程序结果输出
本文介绍了2个示例,即从mysql中读取数据写入文本和从多个文本中读取内容写入mysql。
本文使用的是jdk版本,最新版本的spring core和springb batch用不了。
一、示例1:从mysql中读取数据写入txt文件
1、maven依赖
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><springframework.version>5.2.25.RELEASE</springframework.version><joda-time.version>2.12.5</joda-time.version><mysql.connector.version>5.1.31</mysql.connector.version><springbatch.version>4.2.8.RELEASE</springbatch.version></properties><dependencies><dependency><groupId>org.springframework.batch</groupId><artifactId>spring-batch-core</artifactId><version>${springbatch.version}</version></dependency><dependency><groupId>org.springframework.batch</groupId><artifactId>spring-batch-infrastructure</artifactId><version>${springbatch.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>${springframework.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-oxm</artifactId><version>${springframework.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><version>${springframework.version}</version></dependency><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>${joda-time.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.connector.version}</version></dependency></dependencies>
2、创建mysql 表并插入数据
DROP TABLE IF EXISTS `personinfo`;
CREATE TABLE `personinfo` (`name` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`birthday` varchar(15) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`salary` double NOT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of personinfo
-- ----------------------------
INSERT INTO `personinfo` VALUES ('alanchanchn', '1985-02-01', 76);
INSERT INTO `personinfo` VALUES ('alan', '1979-09-01', 91.5);
INSERT INTO `personinfo` VALUES ('chan', '1993-03-01', 92);
INSERT INTO `personinfo` VALUES ('alanchan', '1995-08-01', 83);SET FOREIGN_KEY_CHECKS = 1;
3、PersonInfo bean
import lombok.Data;/*** @author alanchan**/
@Data
public class PersonInfo {private String name;private String birthday;private double salary;
}
4、建立RowMapper
import java.sql.ResultSet;
import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;import com.win.mysql2xml.bean.PersonInfo;/*** * @author alanchan**/
public class PersonInfoRowMapper implements RowMapper<PersonInfo> {public PersonInfo mapRow(ResultSet rs, int rowNum) throws SQLException {PersonInfo personInfo = new PersonInfo();personInfo.setName(rs.getString("name"));personInfo.setBirthday(rs.getString("birthday"));personInfo.setSalary(rs.getDouble("salary"));return personInfo;}}
5、创建ItemProcessor实现类
本示例仅仅是过滤一下,salary小于77的设置为salary*1.3。
import org.springframework.batch.item.ItemProcessor;import com.win.mysql2xml.bean.PersonInfo;/*** * @author alanchan**/
public class PersonInfoItemProcessor implements ItemProcessor<PersonInfo, PersonInfo> {public PersonInfo process(PersonInfo personInfo) throws Exception {System.out.println("Processing result :" + personInfo);if (personInfo.getSalary() < 77) {PersonInfo tempPersonInfo = new PersonInfo();tempPersonInfo.setName(personInfo.getName());tempPersonInfo.setBirthday(personInfo.getBirthday());tempPersonInfo.setSalary(personInfo.getSalary() * 1.3);personInfo = tempPersonInfo;}return personInfo;}}
6、添加Job listener(JobExecutionListener)
import java.util.List;import org.joda.time.DateTime;
//import org.joda.time.DateTime;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;/*** * @author alanchan**/
public class PersonInfoJobListener implements JobExecutionListener {private DateTime startTime, stopTime;public void beforeJob(JobExecution jobExecution) {startTime = new DateTime();System.out.println("job开始 at :" + startTime);}public void afterJob(JobExecution jobExecution) {stopTime = new DateTime();System.out.println("job结束 at :" + stopTime);System.out.println("任务耗时(毫秒) :" + getTimeInMillis(startTime, stopTime));if (jobExecution.getStatus() == BatchStatus.COMPLETED) {System.out.println("job任务完成");// Here you can perform some other business logic like cleanup} else if (jobExecution.getStatus() == BatchStatus.FAILED) {System.out.println("job任务异常如下 ");List<Throwable> exceptionList = jobExecution.getAllFailureExceptions();for (Throwable th : exceptionList) {System.err.println("异常 :" + th.getLocalizedMessage());}}}private long getTimeInMillis(DateTime start, DateTime stop) {return stop.getMillis() - start.getMillis();}}
7、进行job的配置
1)、job配置
文件位置:/sping-batch/src/main/resources/spring-batch-context4.xml
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:batch="http://www.springframework.org/schema/batch"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"><import resource="classpath:context-datasource.xml"/><!-- JobRepository and JobLauncher are configuration/setup classes --><bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" /><bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"><property name="jobRepository" ref="jobRepository" /></bean><!-- ItemReader which reads from database and returns the row mapped by rowMapper --><bean id="databaseItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"><property name="dataSource" ref="dataSource" /><property name="sql" value="SELECT name,birthday,salary FROM `personinfo`" /><property name="rowMapper"><bean class="com.win.mysql2xml.PersonInfoRowMapper" /></property></bean><!-- ItemWriter writes a line into output flat file --><bean id="flatFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step"><property name="resource" value="file:d:/personInfo.txt" /><property name="lineAggregator"><!-- An Aggregator which converts an object into delimited list of strings --><bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator"><property name="delimiter" value="|" /><property name="fieldExtractor"><!-- Extractor which returns the value of beans property through reflection --><bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"><property name="names" value="name,birthday,salary" /></bean></property></bean></property></bean><!-- Optional JobExecutionListener to perform business logic before and after the job --><bean id="jobListener" class="com.win.mysql2xml.PersonInfoJobListener" /><!-- Optional ItemProcessor to perform business logic/filtering on the input records --><bean id="itemProcessor" class="com.win.mysql2xml.PersonInfoItemProcessor" /><!-- Step will need a transaction manager --><bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /><!-- Actual Job --><batch:job id="personInfoJob"><batch:step id="step1"><batch:tasklet transaction-manager="transactionManager"><batch:chunk reader="databaseItemReader" writer="flatFileItemWriter" processor="itemProcessor" commit-interval="10" /></batch:tasklet></batch:step><batch:listeners><batch:listener ref="jobListener" /></batch:listeners></batch:job></beans>
2)、数据源配置
文件位置:/sping-batch/src/main/resources/context-datasource.xml
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.0.xsd"><bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"><property name="driverClassName" value="com.mysql.jdbc.Driver" /><property name="url" value="jdbc:mysql://192.168.10.44:3306/test" /><property name="username" value="root" /><property name="password" value="1234" /></bean></beans>
8、创建一个运行job的main类
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;/*** * @author alanchan**/
public class App {@SuppressWarnings("resource")public static void main(String areg[]) {ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-context4.xml");JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");Job job = (Job) context.getBean("personInfoJob");try {JobExecution execution = jobLauncher.run(job, new JobParameters());System.out.println("Job执行状态 : " + execution.getStatus());} catch (JobExecutionException e) {System.out.println("Job 执行失败");e.printStackTrace();}}
}
9、验证
运行程序 ,查看输出文件内以及控制台内容
1)、控制台输出
job开始 at :2023-07-21T10:49:44.683+08:00
Processing result :PersonInfo(name=alanchanchn, birthday=1985-02-01, salary=76.0)
Processing result :PersonInfo(name=alan, birthday=1979-09-01, salary=91.5)
Processing result :PersonInfo(name=chan, birthday=1993-03-01, salary=92.0)
Processing result :PersonInfo(name=alanchan, birthday=1995-08-01, salary=83.0)
job结束 at :2023-07-21T10:49:44.918+08:00
任务耗时(毫秒) :235
job任务完成
Job执行状态 : COMPLETED
2)、程序结果输出
二、示例2:从多数据源文件读取写入mysql
1、maven依赖
在这里插入代码片
2、创建表
DROP TABLE IF EXISTS `personinfo`;
CREATE TABLE `personinfo` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`salary` double(10, 2) NOT NULL,`birthday` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
3、PersonInfo bean
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;import lombok.Data;/*** @author alanchan**/
@Data
@Entity
@Table(name = "personinfo")
public class PersonInfo {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private int id;@Column(name = "name", nullable = false)private String name;@Column(name = "birthday", nullable = false)private String birthday;@Column(name = "salary", nullable = false)private double salary;
}
4、建立FieldSetMapper
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;import com.win.multireaderhibernatewriter.bean.PersonInfo;/*** * @author alanchan**/
public class PersonInfoFieldSetMapper implements FieldSetMapper<PersonInfo> {public PersonInfo mapFieldSet(FieldSet fieldSet) throws BindException {PersonInfo personInfo = new PersonInfo();personInfo.setName(fieldSet.readString(0));personInfo.setBirthday(fieldSet.readString(1));personInfo.setSalary(fieldSet.readDouble(2));return personInfo;}}
5、创建ItemProcessor实现类
import org.springframework.batch.item.ItemProcessor;import com.win.multireaderhibernatewriter.bean.PersonInfo;/*** * @author alanchan**/
public class PersonInfoItemProcessor implements ItemProcessor<PersonInfo, PersonInfo> {public PersonInfo process(PersonInfo personInfo) throws Exception {System.out.println("Processing result :" + personInfo);if (personInfo.getSalary() < 60) {PersonInfo tempPersonInfo = new PersonInfo();tempPersonInfo.setName(personInfo.getName());tempPersonInfo.setBirthday(personInfo.getBirthday());tempPersonInfo.setSalary(personInfo.getSalary() * 1.5);personInfo = tempPersonInfo;}return personInfo;}
}
6、添加Job listener(JobExecutionListener)
import java.util.List;import org.joda.time.DateTime;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;/*** * @author chenw**/
public class PersonInfoJobListener implements JobExecutionListener {private DateTime startTime, stopTime;public void beforeJob(JobExecution jobExecution) {startTime = new DateTime();System.out.println("job开始 at :" + startTime);}public void afterJob(JobExecution jobExecution) {stopTime = new DateTime();System.out.println("job结束 at :" + stopTime);System.out.println("任务耗时(毫秒) :" + getTimeInMillis(startTime, stopTime));if (jobExecution.getStatus() == BatchStatus.COMPLETED) {System.out.println("job任务完成");// Here you can perform some other business logic like cleanup} else if (jobExecution.getStatus() == BatchStatus.FAILED) {System.out.println("job任务异常如下 ");List<Throwable> exceptionList = jobExecution.getAllFailureExceptions();for (Throwable th : exceptionList) {System.err.println("异常 :" + th.getLocalizedMessage());}}}private long getTimeInMillis(DateTime start, DateTime stop) {return stop.getMillis() - start.getMillis();}}
7、进行job的配置
1)、数据源配置
文件位置:/sping-batch/src/main/resources/context-datasource2.xml
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:batch="http://www.springframework.org/schema/batch"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.0.xsd"><bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close"><property name="driverClass" value="com.mysql.jdbc.Driver" /><property name="jdbcUrl" value="jdbc:mysql://192.168.10.44:3306/test" /><property name="user" value="root" /><property name="password" value="1234" /></bean></beans>
2)、hibernate配置
文件位置:/sping-batch/src/main/resources/context-model.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:tx="http://www.springframework.org/schema/tx"xmlns:aop="http://www.springframework.org/schema/aop"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"default-autowire="byName" default-init-method="init"><import resource="classpath:context-datasource2.xml"/><bean id="sessionFactory" class="org.springframework.orm.hibernate5.LocalSessionFactoryBean" ><property name="dataSource" ref="dataSource"/><property name="packagesToScan"><list><value>com.win.multireaderhibernatewriter.bean</value></list></property><property name="hibernateProperties"><props><prop key="hibernate.dialect">org.hibernate.dialect.MySQLDialect</prop><prop key="hibernate.show_sql">true</prop> <!-- <prop key="hibernate.format_sql">true</prop> --></props></property></bean><bean id="transactionManager" class="org.springframework.orm.hibernate5.HibernateTransactionManager" /><tx:annotation-driven transaction-manager="transactionManager"/></beans>
3)、job配置
文件位置:/sping-batch/src/main/resources/spring-batch-context5.xml
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:batch="http://www.springframework.org/schema/batch"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"><import resource="classpath:context-model.xml"/><!-- JobRepository and JobLauncher are configuration/setup classes --><bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" /><bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"><property name="jobRepository" ref="jobRepository" /></bean><bean id="multiResourceItemReader" class="org.springframework.batch.item.file.MultiResourceItemReader"><property name="resources" value="classpath:testmultireader/personinfo*.txt" /><property name="delegate" ref="flatFileItemReader" /></bean><!-- ItemReader reads a complete line one by one from input file --><bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"><property name="lineMapper"><bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"><property name="fieldSetMapper"><!-- Mapper which maps each individual items in a record to properties in POJO --><bean class="com.win.multireaderhibernatewriter.PersonInfoFieldSetMapper" /></property><property name="lineTokenizer"><!-- A tokenizer class to be used when items in input record are separated by specific characters --><bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"><property name="delimiter" value="|" /></bean></property></bean></property></bean><!-- ItemWriter which writes data to database --><bean id="databaseItemWriter" class="org.springframework.batch.item.database.HibernateItemWriter"><property name="sessionFactory" ref="sessionFactory" /></bean><!-- Optional ItemProcessor to perform business logic/filtering on the input records --><bean id="itemProcessor" class="com.win.multireaderhibernatewriter.PersonInfoItemProcessor" /><!-- Optional JobExecutionListener to perform business logic before and after the job --><bean id="jobListener" class="com.win.multireaderhibernatewriter.PersonInfoJobListener" /><!-- Actual Job --><batch:job id="personInfoJob"><batch:step id="step1"><batch:tasklet transaction-manager="transactionManager"><batch:chunk reader="multiResourceItemReader" writer="databaseItemWriter" processor="itemProcessor" commit-interval="10" /></batch:tasklet></batch:step><batch:listeners><batch:listener ref="jobListener" /></batch:listeners></batch:job></beans>
8、创建一个运行job的main类
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;/*** * @author alanchan**/
public class App {@SuppressWarnings("resource")public static void main(String areg[]) {ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-context5.xml");JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");Job job = (Job) context.getBean("personInfoJob");try {JobExecution execution = jobLauncher.run(job, new JobParameters());System.out.println("Job 执行状态 : " + execution.getStatus());} catch (JobExecutionException e) {System.out.println("Job 失败");e.printStackTrace();}}
}
9、准备测试数据
文件目录位置:/sping-batch/src/main/resources/testmultireader
数据源文件都在该部目录下
- /sping-batch/src/main/resources/testmultireader/personinfo-1.txt
alanchanchn|1985-02-01|98.8
alan|1979-09-01|91.5
- /sping-batch/src/main/resources/testmultireader/personinfo-2.txt
zhangsan|1998-03-01|92.0
lisi|1995-08-01|60
- /sping-batch/src/main/resources/testmultireader/personinfo-3.txt
wangking|1989-04-01|18.0
tony|1995-08-01|86.0
- /sping-batch/src/main/resources/testmultireader/personinfo-4.txt
zhaoqin|1997-03-01|36.0
sunmonkey|1999-08-01|23.0
10、验证
1)、控制台输出
job开始 at :2023-07-21T13:00:47.780+08:00
Processing result :PersonInfo(id=0, name=alanchanchn, birthday=1985-02-01, salary=98.8)
Processing result :PersonInfo(id=0, name=alan, birthday=1979-09-01, salary=91.5)
Processing result :PersonInfo(id=0, name=zhangsan, birthday=1998-03-01, salary=92.0)
Processing result :PersonInfo(id=0, name=lisi, birthday=1995-08-01, salary=60.0)
Processing result :PersonInfo(id=0, name=wangking, birthday=1989-04-01, salary=18.0)
Processing result :PersonInfo(id=0, name=tony, birthday=1995-08-01, salary=86.0)
Processing result :PersonInfo(id=0, name=zhaoqin, birthday=1997-03-01, salary=36.0)
Processing result :PersonInfo(id=0, name=sunmonkey, birthday=1999-08-01, salary=23.0)
Hibernate: insert into personinfo (birthday, name, salary) values (?, ?, ?)
Hibernate: insert into personinfo (birthday, name, salary) values (?, ?, ?)
Hibernate: insert into personinfo (birthday, name, salary) values (?, ?, ?)
Hibernate: insert into personinfo (birthday, name, salary) values (?, ?, ?)
Hibernate: insert into personinfo (birthday, name, salary) values (?, ?, ?)
Hibernate: insert into personinfo (birthday, name, salary) values (?, ?, ?)
Hibernate: insert into personinfo (birthday, name, salary) values (?, ?, ?)
Hibernate: insert into personinfo (birthday, name, salary) values (?, ?, ?)
job结束 at :2023-07-21T13:00:48.044+08:00
任务耗时(毫秒) :264
job任务完成
Job 执行状态 : COMPLETED
2)、程序结果输出
以上,介绍了2个示例,即从mysql中读取数据写入文本和从多个文本中读取内容写入mysql。