读取超大文件的时候,如何避免出现OOM
需求背景如下:
从文件中读取数据并经过业务处理后存储到数据库中,同时避免出现OOM(Out of Memory)
1、使用分批处理文件数据
- 将文件数据分批读取,每次只处理一部分数据,避免一次性将整个文件加载到内存中。
- 可以使用Java的BufferedReader和InputStreamReader类来实现分批读取文件数据。
import java.io.*;
import java.util.*;
import java.nio.file.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;@Service
public class FileDataProcessor {@Autowiredprivate JdbcTemplate jdbcTemplate;private static final int BATCH_SIZE = 1000; // 每次处理的记录数public void processFile(String filePath) {try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath))) {String line;List<String> batch = new ArrayList<>();while ((line = reader.readLine()) != null) {batch.add(line);if (batch.size() >= BATCH_SIZE) {processBatch(batch);batch.clear();}}// 处理最后一批数据if (!batch.isEmpty()) {processBatch(batch);}} catch (IOException e) {e.printStackTrace();}}private void processBatch(List<String> batch) {List<ProcessedData> processedDataList = new ArrayList<>();for (String line : batch) {// 业务处理逻辑ProcessedData processedData = processLine(line);processedDataList.add(processedData);}saveToDatabase(processedDataList);}private ProcessedData processLine(String line) {// 示例业务处理逻辑ProcessedData processedData = new ProcessedData();processedData.setField(line);return processedData;}private void saveToDatabase(List<ProcessedData> processedDataList) {String sql = "INSERT INTO processed_data (field) VALUES (?)";List<Object[]> batchArgs = new ArrayList<>();for (ProcessedData data : processedDataList) {batchArgs.add(new Object[] { data.getField() });}jdbcTemplate.batchUpdate(sql, batchArgs);}public static class ProcessedData {private String field;public String getField() {return field;}public void setField(String field) {this.field = field;}}
}
2、使用流式处理文件数据
- 使用流式处理文件数据,可以避免将整个文件加载到内存中。
- 可以使用Java的InputStream和BufferedInputStream类来实现流式处理文件数据。
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.util.stream.Stream;@Service
public class FileDataProcessor {@Autowiredprivate JdbcTemplate jdbcTemplate;public void processFile(String filePath) {try (Stream<String> lines = Files.lines(Paths.get(filePath))) {lines.forEach(line -> {// 业务处理逻辑ProcessedData processedData = processLine(line);saveToDatabase(processedData);});} catch (IOException e) {e.printStackTrace();}}private ProcessedData processLine(String line) {// 示例业务处理逻辑ProcessedData processedData = new ProcessedData();processedData.setField(line);return processedData;}private void saveToDatabase(ProcessedData processedData) {String sql = "INSERT INTO processed_data (field) VALUES (?)";jdbcTemplate.update(sql, processedData.getField());}public static class ProcessedData {private String field;public String getField() {return field;}public void setField(String field) {this.field = field;}}
}
3、使用Spring Batch进行批处理
- Spring Batch是一个用于批处理任务的框架,可以用于处理大量数据。
- 可以使用Spring Batch的ItemReader、ItemProcessor和ItemWriter来实现批处理。
3.1、配置Spring Batch
首先添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency>
3.2、创建Batch配置类
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;import javax.sql.DataSource;@Configuration
@EnableBatchProcessing
public class BatchConfig {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;@Autowiredprivate DataSource dataSource;@Beanpublic FlatFileItemReader<InputData> reader() {FlatFileItemReader<InputData> reader = new FlatFileItemReader<>();reader.setResource(new FileSystemResource("path/to/your/input/file.csv"));reader.setLineMapper(new DefaultLineMapper<InputData>() {{setLineTokenizer(new DelimitedLineTokenizer() {{setNames("field1", "field2"); // 设置CSV文件的列名}});setFieldSetMapper(new BeanWrapperFieldSetMapper<InputData>() {{setTargetType(InputData.class);}});}});return reader;}@Beanpublic ItemProcessor<InputData, ProcessedData> processor() {return inputData -> {ProcessedData processedData = new ProcessedData();processedData.setField(inputData.getField1() + "-" + inputData.getField2());return processedData;};}@Beanpublic ItemWriter<ProcessedData> writer() {JdbcBatchItemWriter<ProcessedData> writer = new JdbcBatchItemWriter<>();writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());writer.setSql("INSERT INTO processed_data (field) VALUES (:field)");writer.setDataSource(dataSource);return writer;}@Beanpublic Job importUserJob(JobRepository jobRepository, Step step1) {return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).flow(step1).end().build();}@Beanpublic Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<InputData> reader, ItemProcessor<InputData, ProcessedData> processor, ItemWriter<ProcessedData> writer) {return stepBuilderFactory.get("step1").<InputData, ProcessedData>chunk(1000).reader(reader).processor(processor).writer(writer).build();}
}
在上面的代码中,我们定义了一个名为BatchConfig
的配置类,其中包含了读取CSV文件、处理数据和写入数据库的配置。
请注意,你需要将path/to/your/input/file.csv
替换为你的实际CSV文件路径。
另外,你还需要根据你的数据库表结构修改writer
方法中的SQL语句。
3.3、定义数据模型
- 创建两个简单的Java类来表示输入数据和处理后的数据。
public class InputData {private String field1;private String field2;// Getters and setterspublic String getField1() {return field1;}public void setField1(String field1) {this.field1 = field1;}public String getField2() {return field2;}public void setField2(String field2) {this.field2 = field2;}
}public class ProcessedData {private String field;// Getters and setterspublic String getField() {return field;}public void setField(String field) {this.field = field;}
}
3.4、配置数据源
- 在
application.properties
文件中配置你的数据库连接信息。
spring.datasource.url=jdbc:mysql://localhost:3306/your_database
spring.datasource.username=your_username
spring.datasource.password=your_password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# Spring Batch specific properties
spring.batch.jdbc.initialize-schema=always
请注意,你需要将your_database
、your_username
和your_password
替换为你的实际数据库信息。
3.5、运行批处理任务
- 创建一个简单的控制器或命令行运行器来启动批处理任务。
通过CommandLineRunner启动批处理任务
import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class BatchApplication implements CommandLineRunner {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate Job job;public static void main(String[] args) {SpringApplication.run(BatchApplication.class, args);}@Overridepublic void run(String... args) throws Exception {jobLauncher.run(job, new JobParameters());}
}
通过运行Spring Boot应用程序,上述配置将启动Spring Batch批处理任务,从文件中读取数据,处理数据,并将其存储到数据库中。
4、 使用Apache Commons IO
- Apache Commons IO是一个用于处理文件和流的Java库。
- Common-IO里面有一个方法FileUtils#lineIterator可以实现逐行读取文件。
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.springframework.batch.item.ItemReader;
import org.springframework.stereotype.Component;
@Component
public class CsvReader implements ItemReader<InputData> {private final File file;private LineIterator iterator;public CsvReader(File file) {this.file = file;}@Overridepublic InputData read() throws Exception {if (iterator == null || !iterator.hasNext()) {return null;}String line = iterator.nextLine();// 解析CSV行并创建InputData对象// ...return inputData;}
}
在上面的代码中,我们创建了一个名为CsvReader
的类,它实现了ItemReader
接口,用于从CSV文件中读取数据。
请注意,你需要根据你的CSV文件格式和InputData
类来解析CSV行并创建InputData
对象。
5、使用Java8 Stream
Java 8引入了Stream API,它提供了一种简洁的方式来处理集合数据。
你可以使用Stream API来处理文件中的每一行数据。
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;
import org.springframework.batch.item.ItemReader;
import org.springframework.stereotype.Component;
@Component
public class LineReader implements ItemReader<String> {private final String filePath;private Stream<String> lines;public LineReader(String filePath) {this.filePath = filePath;}@Overridepublic String read() throws Exception {if (lines == null) {lines = Files.lines(Paths.get(filePath));}return lines.findFirst().orElse(null);}
}
在上面的代码中,我们创建了一个名为LineReader
的类,它实现了ItemReader
接口,用于从文件中逐行读取数据。
请注意,你需要根据你的文件路径来创建LineReader
对象。
6、并发读取
- 逐行的读取方式,在解决OOM的问题时,只有一个线程在处理数据,接下来我们可以采用几个线程一起处理数据,增加并行度。
- 先逐行读取数据,加载到内存中,等到累计一定数据后,再交给线程池异步处理。
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.*;public class FileProcessor {@SneakyThrowspublic static void readInApacheIOWithThreadPool() {// 创建一个最大线程数为10,队列最大数为100的线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));// 使用 Apache 的方式逐行读取数据try (LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name())) {List<String> lines = new ArrayList<>();while (fileContents.hasNext()) {String nextLine = fileContents.nextLine();lines.add(nextLine);// 读取到十万行时,拆分成两个50000行的列表,交给异步线程处理if (lines.size() == 100000) {processLinesInBatches(lines, threadPoolExecutor);lines.clear(); // 清除已处理的内容}}// 处理剩余的行if (!lines.isEmpty()) {processTask(lines);}} finally {threadPoolExecutor.shutdown();}}private static void processLinesInBatches(List<String> lines, ThreadPoolExecutor threadPoolExecutor) throws InterruptedException, ExecutionException {List<List<String>> partitions = Lists.partition(lines, 50000);List<Future<?>> futureList = new ArrayList<>();for (List<String> partition : partitions) {Future<?> future = threadPoolExecutor.submit(() -> processTask(partition));futureList.add(future);}// 等待所有任务执行结束,防止OOMfor (Future<?> future : futureList) {future.get();}}private static void processTask(List<String> lines) {for (String line : lines) {// 模拟业务执行try {TimeUnit.MILLISECONDS.sleep(10L);} catch (InterruptedException e) {Thread.currentThread().interrupt();e.printStackTrace();}}}
}
上述代码,当内存达的数据量达到10000的时候,拆封两个任务交给异步线程执行,每个任务分别处理5000行数据。
后续使用future#get(),等待异步线程执行完成之后,主线程才能继续读取数据。
注意:上述代码中,BATCH_SIZE
和LINES_PER_FILE
需要根据实际情况进行调整。
另外,如果需要处理的数据量非常大,可以考虑将文件拆分为更小的部分,然后使用多线程并行处理每个部分。
7、大文件拆分成小文件
- 将大文件拆分成多个小文件,每个小文件包含固定数量的行。
- 使用多个异步线程分别逐行处理数据
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;public class LargeFileProcessor {private static final Logger logger = LogManager.getLogger(LargeFileProcessor.class);private static final int LINES_PER_FILE = 100000;private static final int BATCH_SIZE = 1000;public static void main(String[] args) {try {splitFileAndRead("temp/test.txt");} catch (Exception e) {logger.error("Failed to process large file", e);}}public static void splitFileAndRead(String largeFileName) throws Exception {// 先将大文件拆分成小文件List<File> fileList = splitLargeFile(largeFileName);// 创建一个最大线程数为10,队列最大数为100的线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));List<Future<?>> futureList = new ArrayList<>();for (File file : fileList) {Future<?> future = threadPoolExecutor.submit(() -> {try (Stream<String> inputStream = Files.lines(file.toPath(), StandardCharsets.UTF_8);Connection conn = getConnection()) {List<String> batch = new ArrayList<>();inputStream.forEach(line -> {batch.add(line);if (batch.size() == BATCH_SIZE) {insertBatch(conn, batch);batch.clear();}});if (!batch.isEmpty()) {insertBatch(conn, batch);}} catch (IOException | SQLException e) {logger.error("Error processing file: " + file.getName(), e);}});futureList.add(future);}for (Future<?> future : futureList) {future.get(); // 等待所有任务执行结束}threadPoolExecutor.shutdown();}private static List<File> splitLargeFile(String largeFileName) throws IOException {LineIterator fileContents = FileUtils.lineIterator(new File(largeFileName), StandardCharsets.UTF_8.name());List<String> lines = new ArrayList<>();int num = 1;List<File> files = new ArrayList<>();while (fileContents.hasNext()) {String nextLine = fileContents.nextLine();lines.add(nextLine);if (lines.size() == LINES_PER_FILE) {createSmallFile(lines, num++, files);}}if (!lines.isEmpty()) {createSmallFile(lines, num, files);}return files;}private static void createSmallFile(List<String> lines, int num, List<File> files) throws IOException {Path filePath = Files.createTempFile("smallfile_" + num, ".txt");Files.write(filePath, lines, StandardCharsets.UTF_8);files.add(filePath.toFile());lines.clear(); // 清空lines列表以便重新使用}private static void insertBatch(Connection conn, List<String> batch) {String insertSQL = "INSERT INTO my_table (column1, column2) VALUES (?, ?)";try (PreparedStatement pstmt = conn.prepareStatement(insertSQL)) {for (String line : batch) {String[] parts = line.split(",");pstmt.setString(1, parts[0]);pstmt.setString(2, parts[1]);pstmt.addBatch();}pstmt.executeBatch();conn.commit();} catch (SQLException e) {logger.error("Error inserting batch into database", e);}}private static Connection getConnection() throws SQLException {String jdbcUrl = "jdbc:mysql://localhost:3306/mydatabase";String username = "username";String password = "password";Connection conn = DriverManager.getConnection(jdbcUrl, username, password);conn.setAutoCommit(false); // 手动提交事务return conn;}
}
上述代码,首先将大文件分割成多个小文件,然后使用线程池并行处理这些小文件,每个线程处理一个小文件,并将数据批量插入数据库。