在当今世界,互联网已经改变了我们的生活方式,其主要原因之一是大多数日常琐事都使用互联网。 这导致可用于处理的大量数据。
其中涉及大量数据的一些示例是处理工资单,银行对帐单,利息计算等。因此,请设想一下,如果所有这些工作都必须手动完成,那么完成这些工作将花费很多时间。
在当前年龄该如何做? 答案是批处理。
1.简介
批处理是对批量数据执行的,无需人工干预,并且可以长时间运行。 它可能是数据或计算密集型的。 批处理作业可以按预定义的时间表运行,也可以按需启动。 另外,由于批处理作业通常是长时间运行的作业,因此在批处理作业中发现了经常检查和从特定故障中重新启动的常见功能。
1.1 Java批处理的历史
Java平台的批处理是作为JSR 352规范(Java EE 7平台的一部分)引入的,它定义了批处理应用程序的编程模型以及运行和管理批处理作业的运行时。
1.2 Java Batch的体系结构
下图显示了批处理的基本组件。
批处理应用程序的体系结构解决了批处理问题,例如作业,步骤,存储库,读取器处理器编写器模式,块,检查点,并行处理,流,重试,排序,分区等。
让我们了解架构的流程。
- 作业存储库包含需要运行的作业。
-
JobLauncher
从Job存储库中取出一个作业。 - 每个工作都包含步骤。 这些步骤是
ItemReader
,ItemProcessor
和ItemWriter
。 - 项目读取器是读取数据的一种。
- 项目处理是一种将基于业务逻辑处理数据的处理。
- 条目编写器会将数据写回到定义的源。
1.3批处理组件。
现在,我们将尝试详细了解批处理组件。
- 作业:作业包含整个批处理过程。 它包含一个或多个步骤。 使用作业指定语言(JSL)将作业组合在一起,该语言指定必须执行步骤的顺序。 在JSR 352中,JSL在称为作业XML文件的XML文件中指定。 一项工作基本上就是一个存放步骤的容器。
- 步骤:步骤是一个域对象,其中包含作业的独立顺序阶段。 步骤包含执行实际处理所需的所有必要逻辑和数据。 根据批处理规范,步骤的定义含糊不清,因为步骤的内容纯粹是特定于应用程序的,并且可以像开发人员想要的那样复杂或简单。 有两种步骤: 面向块和面向任务 。
- 作业操作员:它提供了一个界面来管理作业处理的各个方面,包括操作命令(例如开始,重新启动和停止)以及作业存储库命令(例如,检索作业和步骤执行)。
- 作业存储库:它包含有关当前正在运行的作业的信息以及有关该作业的历史数据。
JobOperator
提供用于访问此存储库的API。JobRepository
可以使用数据库或文件系统来实现。
下一节将帮助您了解批处理体系结构的一些常见特征。
1.3工作步骤
步骤是作业的独立阶段。 如上所述,作业中有两种类型的步骤。 我们将在下面尝试详细了解这两种类型。
1.3.1面向块的步骤
块状步骤将一次读取和处理一项,并将结果分组。 然后,当块达到预定义的大小时,将结果存储起来。 当数据集很大时,面向块的处理使存储结果的效率更高。 它包括三个部分。
- 项读取器从数据源一个接一个地读取输入,该数据源可以是数据库,平面文件,日志文件等。
- 处理器将根据已定义的业务逻辑逐一处理数据。
- 编写器将数据分块写入。 块的大小是预定义的并且是可配置的
作为块步骤的一部分,有一些检查点可向框架提供信息以完成块。 如果在块处理期间发生错误,则可以根据最后一个检查点重新开始该过程。
1.3.2面向任务的步骤
除了处理数据源中的项目外,它还执行任务。 其中包括创建或删除目录,移动文件,创建或删除数据库表等。与块步骤相比,任务步骤通常不会长时间运行。
在正常情况下,在需要清理的面向块的步骤之后使用面向任务的步骤。 例如,我们获取日志文件作为应用程序的输出。 块步骤用于处理数据并从日志文件中获取有意义的信息。
然后,使用任务步骤来清理不再需要的较旧的日志文件。
1.3.3并行处理
批处理作业通常执行昂贵的计算操作并处理大量数据。 批处理应用程序可以在两种情况下受益于并行处理。
- 本质上独立的步骤可以在不同的线程上运行。
- 面向块的步骤(其中每个项目的处理独立于处理先前项目的结果)可以在多个线程上运行。
批处理有助于完成任务并更快地执行操作以处理海量数据。
2.工具与技术
让我们看看用于构建程序的技术和工具。
- Eclipse Oxygen.2发布(4.7.2)
- Java –版本9.0.4
- 摇篮– 4.3
- Spring启动– 2.0.1-发布
- HSQL数据库
3.项目结构
项目结构如下图所示。
上面的项目结构使用Gradle。 也可以使用maven创建该项目,并且build.gralde将替换为pom.xml文件。 该项目的结构将稍微延迟使用Maven进行构建。
4.方案目标
作为程序的一部分,我们将尝试使用spring boot创建一个简单的Java批处理应用程序。 该应用程序将执行以下任务。
- 读取:–从CSV文件读取员工数据。
- 处理数据:–将员工数据转换为全部大写。
- 写入:–将已处理的员工数据写回到数据库中。
4.1 Gradle构建
我们正在使用Gradle作为该程序的一部分进行构建。 build.gradle
文件将如下所示。
build.gradle
buildscript {repositories {mavenCentral()}dependencies {classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.1.RELEASE")}
}apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'bootJar {baseName = 'java-batch'version = '1.0'
}repositories {mavenCentral()
}sourceCompatibility = 1.8
targetCompatibility = 1.8dependencies {compile("org.springframework.boot:spring-boot-starter-batch")compile("org.hsqldb:hsqldb")testCompile("junit:junit")
}
在上面的build.gradle
文件中, apply plugin: 'java'
告诉我们需要设置的插件。 对我们来说,它是Java插件。
repositories{}
让我们知道应该从中提取依赖关系的存储库。 我们选择了mavenCentral
拉依赖罐。 我们还可以使用jcenter
提取相应的依赖罐。
dependencies {}
标签用于提供应为项目提取的必要的jar文件详细信息。 apply plugin: 'org.springframework.boot'
此插件用于指定spring-boot项目。 boot jar{}
将指定将从构建生成的jar的属性。
4.2样本数据文件
为了提供读取阶段的数据,我们将使用包含员工数据的CSV文件。
该文件将如下所示。
样本CSV文件
John,Foster
Joe,Toy
Justin,Taylor
Jane,Clark
John,Steve
示例数据文件包含员工的名字和姓氏。 我们将使用相同的数据进行处理,然后将其插入数据库。
4.3 SQL脚本
我们正在使用HSQL数据库,它是基于内存的数据库。 该脚本将如下所示。
SQL脚本
DROP TABLE employee IF EXISTS;CREATE TABLE employee (person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,first_name VARCHAR(20),last_name VARCHAR(20)
);
Spring Boot启动时会自动运行schema-@@platform@@.sql
。 -all
是所有平台的默认设置。 因此,表创建将在应用程序启动时自行进行,直到应用程序启动并运行后才可用。
4.4模型类别
我们将创建一个Employee.java
类作为模型类。 该类如下所示。
程序的模型类
package com.batch;public class Employee {private String lastName;private String firstName;public Employee() {}public Employee(String firstName, String lastName) {this.firstName = firstName;this.lastName = lastName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getFirstName() {return firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}@Overridepublic String toString() {return "firstName: " + firstName + ", lastName: " + lastName;}}
@Override
用于覆盖toString()
方法的默认实现。
4.5配置类
我们将创建一个BatchConfiguration.java
类,它将作为批处理的配置类。 Java文件如下所示。
BatchConfiguration.java
package com.batch.config;import javax.sql.DataSource;import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;import com.batch.Employee;
import com.batch.processor.EmployeeItemProcessor;@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;// tag::readerwriterprocessor[]@Beanpublic FlatFileItemReader reader() {return new FlatFileItemReaderBuilder().name("EmployeeItemReader").resource(new ClassPathResource("sample-data.csv")).delimited().names(new String[]{"firstName", "lastName"}).fieldSetMapper(new BeanWrapperFieldSetMapper() {{setTargetType(Employee.class);}}).build();}@Beanpublic EmployeeItemProcessor processor() {return new EmployeeItemProcessor();}@Beanpublic JdbcBatchItemWriter writer(DataSource dataSource) {return new JdbcBatchItemWriterBuilder().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()).sql("INSERT INTO employee (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(dataSource).build();}// end::readerwriterprocessor[]// tag::jobstep[]@Beanpublic Job importUserJob(JobCompletionNotificationListener listener, Step step1) {return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1).end().build();}@Beanpublic Step step1(JdbcBatchItemWriter writer) {return stepBuilderFactory.get("step1").<Employee, Employee> chunk(10).reader(reader()).processor(processor()).writer(writer).build();}// end::jobstep[]
}
@EnableBatchProcessing
批注用于启用批处理。
JobBuilderFactory
是用于构建作业的工厂。
StepBuilderFactory
用于创建步骤。 方法step1()
具有属性chunk()
。 这是用于将输入分块为定义大小的属性。 对于我们来说,大小为10。
4.6项目处理器
项目处理器是一个接口,将负责处理数据。 我们将在EmployeeItemProcessor.java
实现该接口。 Java类如下所示。
EmployeeItemProcessor.java
package com.batch.processor;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;import com.batch.Employee;public class EmployeeItemProcessor implements ItemProcessor<Employee, Employee> {private static final Logger log = LoggerFactory.getLogger(EmployeeItemProcessor.class);@Overridepublic Employee process(Employee emp) throws Exception {final String firstName = emp.getFirstName().toUpperCase();final String lastName = emp.getLastName().toUpperCase();final Employee transformedEmployee = new Employee(firstName, lastName);log.info("Converting (" + emp + ") into (" + transformedEmployee + ")");return transformedEmployee;}}
在process()
方法中,我们将获取数据,并将其转换为大写名称。
4.7 JobExecutionSupportListener类
JobExecutionListenerSupport
是将在作业完成时通知的接口。 作为接口的一部分,我们有afterJob
方法。 此方法用于过帐作业的完成。
JobCompletionNotificationListener.java
package com.batch.config;
import java.util.List;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;import com.batch.Employee;@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);private final JdbcTemplate jdbcTemplate;@Autowiredpublic JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}@Overridepublic void afterJob(JobExecution jobExecution) {RowMapper rowMapper = (rs, rowNum) -> {Employee e = new Employee();e.setFirstName(rs.getString(1));e.setLastName(rs.getString(2));return e;};if(jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("!!! JOB FINISHED! Time to verify the results");List empList= jdbcTemplate.query("SELECT first_name, last_name FROM employee",rowMapper);log.info("Size of List "+empList.size());for (Employee emp: empList) {log.info("Found: "+emp.getFirstName()+" "+emp.getLastName());}}}
}
在这种方法中,我们是在作业完成后从数据库中获取数据的,并将结果打印在控制台上以验证对数据执行的处理。
4.8应用类别
我们将创建一个应用程序类,其中包含负责触发Java批处理程序的main方法。 该类如下所示。
应用程序
package com.batch;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);}
}
@SpringBootApplication
是用于将程序指定为Spring Boot程序的注释。
5.输出
让我们将该应用程序作为Java应用程序执行。 我们将在控制台上获得以下输出。
批处理程序的工作流在输出中非常清楚。 Job以importUserJob
,然后第一步执行开始,将读取的数据转换为大写。
步骤的后处理,我们可以在控制台上看到大写的结果。
6.总结
在本教程中,我们学习了以下内容:
- Java批处理包含作业,该作业可以包含多个步骤。
- 每一步都是阅读,处理,写作的结合。
- 我们可以将数据分块为不同大小以进行处理。
7.下载Eclipse项目
这是带有SpringBoot的JavaBatch教程。
翻译自: https://www.javacodegeeks.com/2018/05/java-batch-tutorial.html