【Spring底层原理高级进阶】Spring Batch清洗和转换数据,一键处理繁杂数据!Spring Batch是如何实现IO流优化的?本文详解!

🎉🎉欢迎光临,终于等到你啦🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟持续更新的专栏《Spring 狂野之旅:从入门到入魔》 🚀

本专栏带你从Spring入门到入魔 

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽icon-default.png?t=N7T8http://suzee.blog.csdn.net/


Spring Batch的应用场景和作用

批处理是企业级业务系统不可或缺的一部分,spring batch是一个轻量级的综合性批处理框架,可用于开发企业信息系统中那些至关重要的数据批量处理业务.SpringBatch基于POJO和Spring框架,相当容易上手使用,让开发者很容易地访问和利用企业级服务.spring batch具有高可扩展性的框架,简单的批处理,复杂的大数据批处理作业都可以通过SpringBatch框架来实现。

先来个例子

假设一家电商公司,每天从不同渠道收集大量的销售数据。这些数据包含了各种商品的销售记录,但是格式和质量可能不一致。您希望将这些销售数据进行清洗和转换,以便进行后续的分析和报告生成。

使用Spring Batch,可以创建一个批处理作业来处理销售数据。作业的步骤可以包括从不同渠道读取销售数据,对数据进行清洗和转换,例如去除无效数据、修复格式错误、计算额外的指标等。然后,将清洗和转换后的数据写入数据库,以备后续的分析和报告生成使用。

先来介绍其架构

  • Application应用层:包含了所有任务batch jobs和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
  • Batch Core核心层:包含启动和管理任务的运行环境类,如JobLauncher等。
  • Batch Infrastructure基础层:上面两层是建立在基础层之上的,包含基础的读入reader写出writer、重试框架等。

为什么它能够如此优秀?

Chunk 的中文意思是:大块、厚块;大部分,大量。Chunk 在Spring Batch 中就是“批量操作”的概念的抽象。它本身是一个类,这个类就是用来将原本的单条操作改成批量进行。
在Spring Batch 中就提出了chunk 的概念。首先我们设定一个chunk 的size,随后Spring Batch 一条条地区处理数据,但是到ItemWriter 阶段,Spirng Batch 不会选择立刻将数据提交到数据库,只有在处理的数据累积数量达到了之前设置的chunk 的size 之后,才会进行提交操作。

 

实战详细操作

引入 依赖

首先,引Spring Batch的依赖项。在Maven项目中,在pom.xml文件中添加以下依赖项:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency>
</dependencies>

创建一个Spring配置文件(例如batch-config.xml),并配置Spring Batch的相关组件和属性。

<beans xmlns="http://www.springframework.org/schema/beans"xmlns:batch="http://www.springframework.org/schema/batch"xmlns:task="http://www.springframework.org/schema/task"xmlns:util="http://www.springframework.org/schema/util"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"><!-- 数据源配置 --><bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"><property name="driverClassName" value="com.mysql.jdbc.Driver" /><property name="url" value="jdbc:mysql://localhost:3306/mydb" /><property name="username" value="root" /><property name="password" value="password" /></bean><!-- JobRepository配置 --><bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"><property name="dataSource" ref="dataSource" /><property name="transactionManager" ref="transactionManager" /><property name="databaseType" value="mysql" /></bean><!-- 并发任务执行器配置 --><task:executor id="taskExecutor" pool-size="10" /><!-- 事务管理器配置 --><bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /><!-- JobLauncher配置 --><bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"><property name="jobRepository" ref="jobRepository" /><property name="taskExecutor" ref="taskExecutor" /></bean></beans>

定义数据模型:


根据需求,定义需要清洗和转换的数据模型。例如,假设数据模型是一个简单的用户对象,包含id、姓名和年龄字段。创建一个名为User的Java类,如下所示:

public class User {private Long id;private String name;private Integer age;// 省略构造函数、getter和setter方法
}

创建ItemReader:


创建一个实现ItemReader接口的自定义类,用于从数据源中读取数据。以下是一个读取数据库中用户数据的示例:

public class UserItemReader implements ItemReader<User> {private JdbcTemplate jdbcTemplate;private int rowCount = 0;private int currentRow = 0;public UserItemReader(DataSource dataSource) {this.jdbcTemplate = new JdbcTemplate(dataSource);}@Overridepublic User read() throws Exception {if (currentRow < rowCount) {String sql = "SELECT id, name, age FROM users LIMIT ?, 1";User user = jdbcTemplate.queryForObject(sql, new Object[]{currentRow}, (rs, rowNum) -> {User u = new User();u.setId(rs.getLong("id"));u.setName(rs.getString("name"));u.setAge(rs.getInt("age"));return u;});currentRow++;return user;} else {return null;}}public void setRowCount(int rowCount) {this.rowCount = rowCount;}
}

在此示例中,我们使用JdbcTemplate来执行数据库查询,并在read方法中逐行读取用户数据。

这里就可以根据你的业务需求设置各种各样的任务

创建ItemProcessor:
创建一个实现ItemProcessor接口的自定义类,用于对读取的数据进行清洗和转换。

temProcessor的作用是在Spring Batch的批处理作业中对读取的数据进行处理、清洗和转换。它是Spring Batch框架中的一个关键接口,用于执行中间处理逻辑,并将处理后的数据传递给ItemWriter进行写入操作。

以下是一个对用户数据进行简单处理的示例:

public class UserProcessor implements ItemProcessor<UserData, ProcessedUserData> {@Overridepublic ProcessedUserData process(UserData userData) throws Exception {// 获取用户数据String input = userData.getData();// 去除首尾空格String trimmedInput = input.trim();// 过滤敏感信息String filteredInput = filterSensitiveData(trimmedInput);// 转换为大写String upperCaseInput = filteredInput.toUpperCase();// 创建处理后的用户数据对象ProcessedUserData processedUserData = new ProcessedUserData();processedUserData.setProcessedData(upperCaseInput);return processedUserData;}private String filterSensitiveData(String input) {// 在这里可以根据实际需求实现敏感信息过滤逻辑// 使用正则表达式、敏感词库或其他方法进行过滤// 这里是过滤手机号码和邮箱地址String filteredInput = input.replaceAll("\\b\\d{11}\\b", "[PHONE_NUMBER]").replaceAll("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}\\b", "[EMAIL]");return filteredInput;}
}

我们做了以下处理和转换:

  1. 使用trim方法去除用户数据字符串首尾的空格。
  2. 使用filterSensitiveData方法过滤敏感信息,例如手机号码和邮箱地址。在示例中,我们使用了简单的正则表达式来过滤手机号码和邮箱地址,并将其替换为占位符。
  3. 使用toUpperCase方法将字符串转换为大写形式。
  4. 创建一个ProcessedUserData对象,将处理后的数据设置到输出对象中。

创建ItemWriter:


创建一个实现ItemWriter接口的自定义类,用于将处理后的数据写入目标位置。以下是一个将用户数据写入数据库的示例:

public class UserItemWriter implements ItemWriter<User> {private JdbcTemplate jdbcTemplate;public UserItemWriter(DataSource dataSource) {this.jdbcTemplate = new JdbcTemplate(dataSource);}@Overridepublic void write(List<? extends User> users) throws Exception {for (User user : users) {String sql = "INSERT INTO processed_users (id, name, age) VALUES (?, ?, ?)";jdbcTemplate.update(sql, user.getId(), user.getName(), user.getAge());}}
}

在此示例中,我们使用JdbcTemplate将处理后的用户数据插入到名为processed_users的数据库表中。

创建作业配置:


创建一个包含作业配置的类,用于将ItemReader、ItemProcessor和ItemWriter组合在一起,定义一个批处理作业。以下是一个示例的作业配置类:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate DataSource dataSource;@Beanpublic ItemReader<User> userItemReader() {return new UserItemReader(dataSource);}@Beanpublic ItemProcessor<User, User> userItemProcessor() {return new UserItemProcessor();}@Beanpublic ItemWriter<User> userItemWriter() {return new UserItemWriter(dataSource);}@Beanpublic Step step1(ItemReader<User> reader, ItemProcessor<User, User> processor, ItemWriter<User> writer) {return stepBuilderFactory.get("step1").<User, User>chunk(10).reader(reader).processor(processor).writer(writer).build();}@Beanpublic Job dataCleanupJob(Step step1) {return jobBuilderFactory.get("dataCleanupJob").incrementer(new RunIdIncrementer()).flow(step1).end().build();}
}

在此示例中,我们通过Spring Batch的注解@EnableBatchProcessing启用批处理功能,并定义了一个名为dataCleanupJob的作业,其中包含一个名为step1的步骤。

运行作业:

创建Job和Step配置:使用Spring Batch的配置文件,配置Job和Step。使用JobParametersBuilder创建一个包含当前时间戳的Job参数,然后通过jobLauncher.run()方法启动作业。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;public class BatchApplication {public static void main(String[] args) {// 创建Spring应用上下文ApplicationContext context = new AnnotationConfigApplicationContext(BatchConfiguration.class);// 获取JobLauncher和Job实例JobLauncher jobLauncher = context.getBean(JobLauncher.class);Job job = context.getBean("dataCleanupJob", Job.class);try {// 创建Job参数JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();// 启动作业jobLauncher.run(job, jobParameters);} catch (Exception e) {e.printStackTrace();}}
}

监听Listener

可以通过Listener接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。

import org.springframework.batch.core.*;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;public class MyJobListener extends JobExecutionListenerSupport {@Overridepublic void beforeJob(JobExecution jobExecution) {// 在作业执行之前执行的逻辑System.out.println("作业开始执行");}@Overridepublic void afterJob(JobExecution jobExecution) {// 在作业执行之后执行的逻辑if (jobExecution.getStatus() == BatchStatus.COMPLETED) {System.out.println("作业执行成功");} else if (jobExecution.getStatus() == BatchStatus.FAILED) {System.out.println("作业执行失败");}}@Overridepublic void onSkipInRead(Throwable t) {// 在读取过程中发生跳过记录的逻辑System.out.println("跳过读取记录");}@Overridepublic void onSkipInProcess(Object item, Throwable t) {// 在处理过程中发生跳过记录的逻辑System.out.println("跳过处理记录");}@Overridepublic void onSkipInWrite(Object item, Throwable t) {// 在写入过程中发生跳过记录的逻辑System.out.println("跳过写入记录");}
}

将这个自定义的监听器添加到作业配置中:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;// 省略其他配置@Beanpublic Job dataCleanupJob(Step step1, JobExecutionListener jobListener) {return jobBuilderFactory.get("dataCleanupJob").incrementer(new RunIdIncrementer()).listener(jobListener) // 添加自定义的监听器.flow(step1).end().build();}@Beanpublic JobExecutionListener jobListener() {return new MyJobListener();}// 省略其他配置
}

这样  我们就能很清晰的看到 任务运行的情况啦

Spring Batch 使用内存缓冲机制,将读取的数据记录暂存于内存中,然后批量处理这些数据。通过减少对磁盘或数据库的频繁访问,内存缓冲可以提高读取和处理的效率,而且Spring Batch 提供了批量读取的机制,允许一次性读取和处理多个数据记录,这两点都减轻 I/O 压力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/727584.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文阅读_世界模型

1 2 3 4 5 6 7 8英文名称: World Models 中文名称: 世界模型 链接: https://arxiv.org/abs/1803.10122 示例: https://worldmodels.github.io/ 作者: David Ha, Jurgen Schmidhuber 机构: Google Brain, NNAISENSE, Swiss AI Lab, IDSIA (USI & SUPSI) 日期: 27 Mar 2018 引…

【MetaGPT】多智能体协作——你画我猜(文字版)

多智能体协作 本篇将学习 MetaGPT中的 Environment 、 Team 组件。 1. Muti Agent 概念概述 多智能体系统 (Multi-Agent System, MAS) 是由一群具有一定自主性、协同性和学习能力的智能体组成的系统。智能体在环境中相互协作&#xff0c;以达到某种目标或完成特定任务。 2. 多…

阿珊解说Vue中`$route`和`$router`的区别

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

ResponseStatusException

目录 概述&#xff1a; 综合实例&#xff1a; 继承 ResponseStatusException-自定义异常类 继承 ResponseStatusException-自定义响应头信息 继承 ResponseStatusException-定制更多异常处理逻辑 继承 ResponseStatusException-根据异常发生的上下文动态改变 HTTP 状态码…

C++之类(一)

1&#xff0c;封装 1.1 封装的引用 封装是C面向对象三大特性之一 封装的意义&#xff1a; 将属性和行为作为一个整体&#xff0c;表现生活中的事物 将属性和行为加以权限控制 1.1.1 封装意义一&#xff1a; 在设计类的时候&#xff0c;属性和行为写在一起&#xff0c;表…

事务失效的八种情况!!!!

一、非publi修饰的方法。 /*** 私有方法上的注解&#xff0c;不生效&#xff08;因私有方法Spring扫描不到该方法&#xff0c;所以无法生成代理&#xff09;*/ Transactional private boolean test() {//test code }二、类内部访问。 类内部非直接访问带注解标记的方法 B&…

类初步认识与对象

一&#xff0c;对于面向对象的认识 Java是一门面向对象的语言&#xff0c;一切都可以称为对象。将一个大象装进冰箱&#xff0c;甭管步骤多复杂&#xff0c;大象便是对象&#xff1b;将牛奶放进冰箱&#xff0c;牛奶便是对象&#xff1b;你我均是对像。 再比如洗一个衣服&…

如何在Linux中安装ARM交叉环境编译链

安装ARM交叉环境编译链过程如下&#xff1a; 首先创建一个文件夹如下&#xff1a; mkdir -p Linux_ALPHA/toolcahin然后将arm交叉编译工具链安装包拖到Linux中如下&#xff1a; 先输入mv 拖入的安装包即可 mv /var/run/vmblock-fuse/blockdir/pXeysK/gcc-4.6.4.tar.xz .直接…

进程:守护进程

一、守护进程的概念 守护进程是脱离于终端控制&#xff0c;且运行在后端的进程。&#xff08;孤儿进程&#xff09;守护进程不会将信息显示在任何终端上影响前端的操作&#xff0c;也不会被终端产生的任何信息打断&#xff0c;例如&#xff08;ctrlc&#xff09;.守护进程独立…

【数据结构】哈希

在一个数据序列中查找某一个数据元素&#xff0c;是数据管理时经常涉及的&#xff0c;通常以比较的方式来完成&#xff0c;典型的案例有无序序列的暴力查找&#xff08;O(N)&#xff09;、有序序列的二分查找&#xff08;O(logN)&#xff09;、平衡搜索树&#xff08;O(logN)&a…

融合软硬件串流多媒体技术的远程控制方案

远程技术已经发展得有相当水平了&#xff0c;在远程办公&#xff0c;云游戏&#xff0c;云渲染等领域有相当多的应用场景&#xff0c;以向日葵&#xff0c;todesk rustdesk等优秀产品攻城略地&#xff0c;估值越来越高。占据了通用应用的方方面面。 但是细分市场&#xff0c;还…

试用Claude3

1 简介 好消息是&#xff0c;2024 年 3 月 4 日发布了 Claude3&#xff0c;据传比 GPT-4 更好&#xff0c;snooet 版本可以免费试用&#xff0c;坏消息是我们这儿不能用。 在官网注册时&#xff0c;需要选择国家并使用手机接收短信验证码。而在选项中没有中国这个选项。即使成…

IT外包怎样帮助企业控制成本?

在当今激烈的商业竞争中&#xff0c;企业不仅需要保持创新&#xff0c;还需要有效控制成本。IT外包作为一种管理模式&#xff0c;成为许多企业降低成本的得力工具。究竟IT外包如何帮助企业控制成本呢&#xff1f; 首先&#xff0c;IT外包在减少人力资源成本方面发挥了至关重要的…

【微服务生态】Nginx

文章目录 一、概述二、Nginx 的安装三、常用命令四、Nginx 配置4.1 反向代理配置&#xff08;1&#xff09;反向代理实例1&#xff08;2&#xff09;反向代理实例2 4.2 负载均衡配置4.3 动静分离4.4 集群配置 五、nginx 原理与优化参数配置 一、概述 本次为简易版&#xff0c;…

3.6 day1 FreeRTOS

1.总结keil5下载代码和编译代码需要注意的事项 注意要将魔术棒的的debug选项中的setting中的flashdownload中的reset and run 勾选上&#xff0c;同时将pack中的enable取消勾选 2.总结STM32Cubemx的使用方法和需要注意的事项 可以通过功能列表对引脚进行设置&#xff0c;并且可…

调用Mybatis plus中的saveBatch方法报找不到表的问题

1.问题现象 在用Mybatis plus开发的项目中&#xff0c;用自带的API批量保存的方法saveBatch操作时&#xff0c;发现报没有找到表的错误。 错误日志截图如下&#xff1a; 表实际是存在的&#xff0c;且发现其他的方法都没有问题&#xff0c;包括save、update等单个的方法&…

springcloud2022 feign超时时间配置

spring:application:name: order-webcloud:openfeign:client:config:default:connectTimeout: 60000readTimeout: 60000 默认connection10秒,readTimeout 60秒

C# Mel-Spectrogram 梅尔频谱

目录 介绍 Main features Philosophy of NWaves 效果 项目 代码 下载 C# Mel-Spectrogram 梅尔频谱 介绍 利用NWaves实现Mel-Spectrogram 梅尔频谱 NWaves github 地址&#xff1a;https://github.com/ar1st0crat/NWaves NWaves is a .NET DSP library with a lot …

计算机mfc140.dll文件缺失的修复方法分析,一键修复mfc140.dll

电脑显示mfc140.dll文件缺失信息时&#xff0c;不必担心&#xff0c;这通常是个容易解决的小问题。接下来让我们详细探究并解决mfc140.dll文件缺失的状况。以下将详述相应的解决方案&#xff0c;从而帮助您轻松克服这一技术难题。通过几个简单步骤&#xff0c;即可恢复正常使用…

elementUI表单验证遇到的问题

1.同一个addForm表单&#xff0c;同样的验证规则&#xff0c;有的输入框在没填写时能够显示红色&#xff0c;有的却毫无反应 解决方案&#xff1a;去elementUI官网看了一下验证表单的规则及属性&#xff0c;第一句就写 Form 组件提供了表单验证的功能&#xff0c;只需要通过 r…