SpringBoot:使用Spring Batch实现批处理任务

引言

在这里插入图片描述

在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。

项目初始化

首先,我们需要创建一个SpringBoot项目,并添加Spring Batch相关的依赖项。可以通过Spring Initializr快速生成项目。

添加依赖

pom.xml中添加以下依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency><groupId>org.hsqldb</groupId><artifactId>hsqldb</artifactId><scope>runtime</scope>
</dependency>

配置Spring Batch

基本配置

Spring Batch需要一个数据库来存储批处理的元数据。我们可以使用HSQLDB作为内存数据库。配置文件application.properties

spring.datasource.url=jdbc:hsqldb:mem:testdb
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.username=sa
spring.datasource.password=
spring.batch.initialize-schema=always
创建批处理任务

一个典型的Spring Batch任务包括三个主要部分:ItemReader、ItemProcessor和ItemWriter。

  1. ItemReader:读取数据的接口。
  2. ItemProcessor:处理数据的接口。
  3. ItemWriter:写数据的接口。
创建示例实体类

创建一个示例实体类,用于演示批处理操作:

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;@Entity
public class Person {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String firstName;private String lastName;// getters and setters
}
创建ItemReader

我们将使用一个简单的FlatFileItemReader从CSV文件中读取数据:

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;@Configuration
public class BatchConfiguration {@Beanpublic FlatFileItemReader<Person> reader() {return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited().names(new String[]{"firstName", "lastName"}).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{setTargetType(Person.class);}}).build();}
}
创建ItemProcessor

创建一个简单的ItemProcessor,将读取的数据进行处理:

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {@Overridepublic Person process(Person person) throws Exception {final String firstName = person.getFirstName().toUpperCase();final String lastName = person.getLastName().toUpperCase();final Person transformedPerson = new Person();transformedPerson.setFirstName(firstName);transformedPerson.setLastName(lastName);return transformedPerson;}
}
创建ItemWriter

我们将使用一个简单的JdbcBatchItemWriter将处理后的数据写入数据库:

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;@Configuration
public class BatchConfiguration {@Beanpublic JdbcBatchItemWriter<Person> writer(NamedParameterJdbcTemplate jdbcTemplate) {return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()).sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(jdbcTemplate.getJdbcTemplate().getDataSource()).build();}
}

配置Job和Step

一个Job由多个Step组成,每个Step包含一个ItemReader、ItemProcessor和ItemWriter。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;@Beanpublic Job importUserJob(JobCompletionNotificationListener listener, Step step1) {return jobBuilderFactory.get("importUserJob").listener(listener).flow(step1).end().build();}@Beanpublic Step step1(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).build();}
}

监听Job完成事件

创建一个监听器,用于监听Job完成事件:

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;@Component
public class JobCompletionNotificationListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {System.out.println("Job Started");}@Overridepublic void afterJob(JobExecution jobExecution) {System.out.println("Job Ended");}
}

测试与运行

创建一个简单的CommandLineRunner,用于启动批处理任务:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class BatchApplication implements CommandLineRunner {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate Job job;public static void main(String[] args) {SpringApplication.run(BatchApplication.class, args);}@Overridepublic void run(String... args) throws Exception {jobLauncher.run(job, new JobParameters());}
}

在完成配置后,可以运行应用程序,并检查控制台输出和数据库中的数据,确保批处理任务正常运行。

扩展功能

在基本的批处理任务基础上,可以进一步扩展功能,使其更加完善和实用。例如:

  • 多步骤批处理:一个Job可以包含多个Step,每个Step可以有不同的ItemReader、ItemProcessor和ItemWriter。
  • 并行处理:通过配置多个线程或分布式处理,提升批处理任务的性能。
  • 错误处理和重试:配置错误处理和重试机制,提高批处理任务的可靠性。
  • 数据验证:在处理数据前进行数据验证,确保数据的正确性。
多步骤批处理
@Bean
public Job multiStepJob(JobCompletionNotificationListener listener, Step step1, Step step2) {return jobBuilderFactory.get("multiStepJob").listener(listener).start(step1).next(step2).end().build();
}@Bean
public Step step2(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step2").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).build();
}
并行处理

可以通过配置多个线程来实现并行处理:

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).taskExecutor(taskExecutor()).build();
}@Bean
public TaskExecutor taskExecutor() {SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();taskExecutor.setConcurrencyLimit(10);return taskExecutor;
}

结论

通过本文的介绍,我们了解了如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。从项目初始化、配置Spring Batch、实现ItemReader、ItemProcessor和ItemWriter,到配置Job和Step,Spring Batch提供了一系列强大的工具和框架,帮助开发者高效地实现批处理任务。通过合理利用这些工具和框架

,开发者可以构建出高性能、可靠且易维护的批处理系统。希望这篇文章能够帮助开发者更好地理解和使用Spring Batch,在实际项目中实现批处理任务的目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/38135.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

6-Pandas使用自定义函数

Pandas使用自定义函数 如果想要应用自定义的函数&#xff0c;或者把其他库中的函数应用到 Pandas 对象中&#xff0c;有以下三种方法&#xff1a; 1) 操作整个 DataFrame 的函数&#xff1a;pipe()2) 操作行或者列的函数&#xff1a;apply()3) 操作单一元素的函数&#xff1a…

Python 教程---面向对象编程

面向对象编程 4.1 类和对象的概念类&#xff08;Class&#xff09;对象&#xff08;Object&#xff09;示例创建和使用类 4.2 类成员实例成员类成员静态成员示例 4.3 面向对象三要素封装&#xff08;Encapsulation&#xff09;继承&#xff08;Inheritance&#xff09;多态&…

单晶层状氧化物制作方法技术资料 纳离子技术

网盘 https://pan.baidu.com/s/1hjHsXvTXG74-0fDo5TtXWQ?pwd10jk 单晶型高熵普鲁士蓝正极材料及其制备方法与应用.pdf 厘米级铬氧化物单晶及其制备方法和存储器件.pdf 多孔氧化物单晶材料及其制备方法和应用.pdf 大单晶层状氧化物正极材料及其制备方法和应用.pdf 富钠P2相层状…

docker k8s

1、docker是什么&#xff1f; 将环境和程序一起打包给到 服务器运行的工具软件。 2、基础镜像base image是什么&#xff1f; 操作系统&#xff1a;用户空间、内核空间 阉割操作系统&#xff0c;利用其的用户空间&#xff08;因为应用程序运行在用户空间&#xff09;&#xf…

量化交易之机器学习篇 - 实现K近邻模型的两种方式

# 导入相关模块import numpy as npfrom collections import Counter import matplotlib.pyplot as pltfrom sklearn import datasets from sklearn.utils import shuffledef load_data():iris datasets.load_iris()# 打乱数据后的数据和标签X, y shuffle(iris.data, iris.tar…

【机器学习】在【Pycharm】中的应用:【线性回归模型】进行【房价预测】

专栏&#xff1a;机器学习笔记 pycharm专业版免费激活教程见资源&#xff0c;私信我给你发 python相关库的安装&#xff1a;pandas,numpy,matplotlib&#xff0c;statsmodels 1. 引言 线性回归&#xff08;Linear Regression&#xff09;是一种常见的统计方法和机器学习算法&a…

弹性力学讲义

弹性力学讲义 1. 基本假设和一些概念2. 应力3. 二维应力状态与摩尔库伦屈服准则 1. 基本假设和一些概念 力学&#xff1a;变形体力学–固体力学和流体力学&#xff08;连续介质力学&#xff09; 刚体力学–理论力学&#xff08;一般力学&#xff09; 物理受理后&#xff1a;要…

Facebook的投流技巧有哪些?

相信大家都知道Facebook拥有着巨大的用户群体和高转化率&#xff0c;在国外社交推广中的影响不言而喻。但随着Facebook广告的竞争越来越激烈&#xff0c;在Facebook广告上获得高投资回报率也变得越来越困难。IPIDEA代理IP今天就教大家如何在Facebook上投放广告的技巧&#xff0…

python–基础篇–正则表达式–是什么

文章目录 定义一&#xff1a;正则表达式就是记录文本规则的代码定义一&#xff1a;正则表达式是一个特殊的字符序列&#xff0c;用于判断一个字符串是否与我们所设定的字符序列是否匹配&#xff0c;也就是说检查一个字符串是否与某种模式匹配。初识 Python 正则表达式 定义一&a…

D : 合适的顺序

Description 给定 8 个数&#xff0c;如果将它们排成一列&#xff0c;每个数的权值是它与相邻的数之积&#xff0c;求一个排列方式&#xff0c;所有数的权值之和最大&#xff0c;输出该权值和. 例如 13242315 的权值和为 1∗33∗1∗22∗3∗44∗2∗22∗4∗33∗2∗11∗3∗55∗1…

新工具:轻松可视化基因组,部分功能超IGV~

本次分享一个Python基因组数据可视化工具figeno。 figeno擅长可视化三代long reads、跨区域基因组断点视图&#xff08;multi-regions across genomic breakpoints&#xff09;、表观组数据&#xff08;HiC、ATAC-seq和ChIP-seq等&#xff09;可视化、WGS中的CNV和SV可视化等。…

第四周——单词记忆

deploy 部署 attorney 律师 discrimination 歧视&#xff0c;区别 implicit 含蓄的 disposition 性格&#xff0c;倾向 entail 牵涉 retail 零售 imposing 印象深刻的 壮观的 implication 含义 entrenched 根深蒂固的 perplex 使复杂化 comply 遵守 composed 沉着…

小米平板6系列对比

小米平板6系列目前有4款&#xff0c;分别为6、6 Pro、6 Max、6S Pro。具体对比如下表所示。 小米平板型号66 Pro6 Max6S Pro实物图发布时间2023年4月21日2023年4月21日2023年8月14日2024年2月22 日屏幕大小11英寸11英寸14英寸12.4英寸分辨率2.8K2.8K2.8K3K刷新率144Hz144Hz120…

43 - 部门工资前三高的所有员工(高频 SQL 50 题基础版)

43 - 部门工资前三高的所有员工 # dense_rank 排名selectDepartment,Employee,Salary from(selectd.name as Department,e.name as Employee,e.salary as Salary,(dense_rank() over (partition by d.name order by e.salary desc)) as rankingfrom Employee e left join Depar…

数据库-存储过程,函数与触发器

创建存储过程&#xff1a;create procedure 存储过程名(参数) eg: CREATE PROCEDURE proc1() BEGIN SELECT * FROM user; END; 执行存储过程&#xff1a;call 存储过程名 创建带有参数的存储过程 存储过程的参数有三种&#xff1a; IN&#xff1a;输入参数&#xff0c;也是…

18 学渣的逆袭之路

在小学阶段&#xff08;本篇特指五年级&#xff0c;一到四年级随便学学就可以逆袭90分&#xff0c;六年级难度飙升&#xff09;&#xff0c;无论你的分数怎么低&#xff0c;只要有一颗上进的心&#xff0c;就绝对可以逆袭95&#xff01; 在本篇文章&#xff0c;我将会讲解“对于…

【前端那些事】Node.js的安装并配置镜像源

1、官网下载地址 Download Node.js 一步一步点击安装即可&#xff0c;可自定义安装目录 2、配置镜像源 # 设置淘宝镜像源 npm config set registry https://registry.npmmirror.com# 查看使用的镜像源 npm config get registry 如果需要恢复为npm默认的官方源&#xf…

intellij idea安装R包ggplot2报错问题求解

1、intellij idea安装R包ggplot2问题 在我上次解决图形显示问题后&#xff0c;发现安装ggplot2包时出现了问题&#xff0c;这在之前高版本中并没有出现问题&#xff0c; install.packages(ggplot2) ERROR: lazy loading failed for package lifecycle * removing C:/Users/V…

java:aocache的单实例缓存(一)

上一篇博客《java:aocache:基于aspectJ实现的方法缓存工具》介绍了aocache的基本使用&#xff0c; 介绍AoCacheable注解时说过&#xff0c;AoCacheable可以定义在构造方法上&#xff0c;定义在构造方法&#xff0c;该构建方法就成了单实例模式。 也就是说&#xff0c;只要构建…

Java实现按高度或宽度等比压缩图片尺寸

要实现一个能够按高度或宽度等比压缩图片并返回InputStream的Java方法&#xff0c;你需要先计算图片的原始宽高比&#xff0c;然后根据目标尺寸&#xff08;宽度或高度&#xff09;计算出等比缩放后的另一个维度。以下是一个示例代码&#xff1a; import javax.imageio.ImageI…