背景
笔者有一个需求是把将近一亿条数据上传到FTP服务器中,这些数据目前是存储在mysql中,是通过关联几张表查询出来的,查询出来的数据结果集一共是6个字段。要求传输的时候拆分成一个个小文件,每个文件大小不能超过500M。我的测试思路是对测试数据进行分页查询,比如每次分页查询10万条数据,写入到一个txt格式的文件中,攒到50万条数据时,把这个txt文件上传到Ftp中(粗略估算了一下,每个字段长度假设不超过255,),这就是一个小文件的上传。
一、windows下FTP的安装
笔者的开发环境是windows11,所以必须要搭建一个FTP环境以供测试使用
配置IIS web服务器
打开运行窗口【win+R】快捷键,输入 optionalfeatures
后点击确定:
在出来的弹框中找到Internet信息服务,并打开勾选以下配置 ,点击确定,等待windows系统自行添加相关应用配置
配置IIS web站点
现在本地磁盘创建一个FtpServer
空文件夹
然后查看本机IP地址
打开运行【win+R】窗口输入cmd回车
然后输入ipconfig 查看IP
笔者本机连接的是无线网络,如果是连接的有线网络,则需要找对应的以太网适配器连接配置
接着 在开始栏中搜索 IIS 并点击进入IIS管理器
打开后在左侧 “网站” 右键菜单 打开 “添加FTP站点”
主要是填写FTP站点名称和服务的物理路径
点击下一页,填写本机当前网络的ip地址
再点下一页完成身份验证和授权信息
点击完成后,ftp服务器的windows搭建就结束了
打开防火墙,把以下服务勾选上
建立 FTP 服务之后,默认登陆 FTP 服务器的账号和密码就是本机 Administrator 的账户和密码,但是笔者不记得密码了,所以创建一个用户来管理FTP登录
此电脑->右击->显示更多选项->单击管理->本地用户和用户组->用户->右击创建新用户
ftp用户名和密码记好了
再在开始菜单找到IIS服务,点击FTP授权规则
右击编辑权限
点击添加
输入刚才创建的ftp用户名称,点击检查名称
把下面的权限都勾选上,点击确定
回到 Internet Information Services (IIS) 管理器,双击刚才选中的 “FTP授权规则”,点击右侧的"添加允许规则"
然后别忘了启动ftp,右击管理ftp站点,启动
登录ftp
地址是ftp://192.168.1.105,进入此电脑,输入地址回车
输入用户名和密码可以登录
至于浏览器访问,这在很早之前是可以的,但是后来各大浏览器厂商都禁止使用浏览器访问ftp资源,这里也就作罢了
更换ftp的ip
当本机网络环境发生改变时,比如无线网环境变了,导致ip地址变了,那么之前设置好的ip地址就失效了,ftp无法连接。
点开IIS管理器,点击绑定
点击编辑,修改IP地址即可
二、java连接ftp服务器
笔者使用java语言,所以给出springboot框架下访问ftp的方法
首先引入pom依赖 Apache Commons net
<dependency><groupId>commons-net</groupId><artifactId>commons-net</artifactId><version>3.10.0</version> <!-- 或者使用最新的版本 -->
</dependency>
我这里使用的是最新版,jdk21,可以根据自己的jdk版本适当降低版本,不报错就可以
FTP连接工具类
package com.execute.batch.executebatch.utils;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;import java.io.*;
import java.time.Duration;/*** FTP工具类* @author hulei*/
@Slf4j
public class FtpUtil {/*** 上传文件到FTP服务器的根目录** @param host FTP服务器地址* @param port FTP服务器端口号,默认为21* @param username 用户名* @param password 密码* @param localFile 本地要上传的文件* @return 成功返回true,否则返回false*/public static boolean uploadFileToRoot(String host, int port, String username, String password, File localFile) {FTPClient ftpClient = null;FileInputStream fis = null;try {ftpClient = connectAndLogin(host, port, username, password);setBinaryFileType(ftpClient);ftpClient.setConnectTimeout(1000000000);Duration timeout = Duration.ofSeconds(1000000000);ftpClient.setDataTimeout(timeout);String remoteFileName = localFile.getName();fis = new FileInputStream(localFile);return ftpClient.storeFile(remoteFileName, fis);} catch (IOException e) {log.error("上传文件失败", e);return false;} finally {assert ftpClient != null;disconnect(ftpClient);if(fis != null){try {fis.close();} catch (IOException e) {log.error("关闭文件流失败", e);}}}}/*** 上传文件到FTP服务器的指定路径** @param host FTP服务器地址* @param port FTP服务器端口号,默认为21* @param username 用户名* @param password 密码* @param remotePath FTP服务器上的目标路径* @param localFile 本地要上传的文件* @return 成功返回true,否则返回false*/public static boolean uploadFileToPath(String host, int port, String username, String password, String remotePath, File localFile) {FTPClient ftpClient = null;FileInputStream fis = null;try {ftpClient = connectAndLogin(host, port, username, password);setBinaryFileType(ftpClient);ftpClient.setConnectTimeout(1000000000);Duration timeout = Duration.ofSeconds(1000000000);ftpClient.setDataTimeout(timeout);createRemoteDirectories(ftpClient, remotePath);String remoteFileName = localFile.getName();String fullRemotePath = remotePath + "/" + remoteFileName;fis = new FileInputStream(localFile);return ftpClient.storeFile(fullRemotePath, fis);} catch (IOException e) {log.error("上传文件失败", e);return false;} finally {assert ftpClient != null;disconnect(ftpClient);if(fis != null){try {fis.close();} catch (IOException e) {log.error("关闭文件流失败", e);}}}}/*** 在FTP服务器上创建指定路径所需的所有目录** @param ftpClient FTP客户端* @param remotePath 需要创建的远程路径* @throws IOException 如果在创建目录时发生错误*/private static void createRemoteDirectories(FTPClient ftpClient, String remotePath) throws IOException {String[] directories = remotePath.split("/");String currentPath = "";for (String dir : directories) {if (!dir.isEmpty()) {currentPath += "/" + dir;if (!ftpClient.changeWorkingDirectory(currentPath)) {if (!ftpClient.makeDirectory(dir)) {throw new IOException("无法创建远程目录: " + currentPath);}ftpClient.changeWorkingDirectory(dir);}}}}/*** 连接到FTP服务器并登录。** @param host FTP服务器的主机名或IP地址。* @param port FTP服务器的端口号。* @param username 登录FTP服务器的用户名。* @param password 登录FTP服务器的密码。* @return 成功连接并登录后返回一个FTPClient实例,可用于后续操作。* @throws IOException 如果连接或登录过程中遇到任何网络问题,则抛出IOException。*/private static FTPClient connectAndLogin(String host, int port, String username, String password) throws IOException {FTPClient ftpClient = new FTPClient();ftpClient.connect(host, port);ftpClient.login(username, password);int replyCode = ftpClient.getReplyCode();if (!FTPReply.isPositiveCompletion(replyCode)) {throw new IOException("连接FTP服务器失败");}return ftpClient;}/*** 断开与FTP服务器的连接。* 该方法首先检查FTP客户端是否已连接到服务器。如果已连接,则尝试登出,* 如果登出失败,记录错误信息。接着尝试断开与服务器的连接,如果断开失败,同样记录错误信息。** @param ftpClient 与FTP服务器交互的客户端对象。*/private static void disconnect(FTPClient ftpClient) {if (ftpClient.isConnected()) {try {ftpClient.logout();} catch (IOException ioe) {log.error("登出FTP服务器失败", ioe);}try {ftpClient.disconnect();} catch (IOException ioe) {log.error("断开FTP服务器连接失败", ioe);}}}/*** 设置FTP客户端的文件传输类型为二进制。* 这个方法尝试将FTP文件传输类型设置为BINARY,这是进行二进制文件传输的标准方式。* 如果设置失败,会抛出一个运行时异常。** @param ftpClient 用于文件传输的FTP客户端实例。* @throws RuntimeException 如果设置文件传输类型为二进制时发生IOException异常。*/private static void setBinaryFileType(FTPClient ftpClient) {try {ftpClient.setFileType(FTP.BINARY_FILE_TYPE);} catch (IOException e) {throw new RuntimeException("设置传输二进制文件失败", e);}}}
主要提供了两个方法uploadFileToRoot
和uploadFileToPath
,前者是上传到ftp服务器根目录下,后者上传到指定目录下,其中的连接时间设置的有点夸张,主要是传输时间长、数据量大,害怕断开。
注意:所有涉及到操作文件的流,包括输入流和输出流,使用完了,要及时关闭,否则占用资源不说,还会导致临时生成的文件无法删除。
笔者在ftp服务器下新建了一个文件,测试上传一个txt格式的文本文件,一个上传到根目录下,一个上传到newFile
文件夹里
测试用例代码
package com.execute.batch.executebatch.utils;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;/*** FTP工具类* @author hulei*/
@Slf4j
public class FtpUtil {private static FTPClient connectAndLogin(String host, int port, String username, String password) throws IOException {FTPClient ftpClient = new FTPClient();ftpClient.connect(host, port);ftpClient.login(username, password);int replyCode = ftpClient.getReplyCode();if (!FTPReply.isPositiveCompletion(replyCode)) {throw new IOException("连接FTP服务器失败");}return ftpClient;}private static void disconnect(FTPClient ftpClient) {if (ftpClient.isConnected()) {try {ftpClient.logout();} catch (IOException ioe) {log.error("登出FTP服务器失败", ioe);}try {ftpClient.disconnect();} catch (IOException ioe) {log.error("断开FTP服务器连接失败", ioe);}}}private static void setBinaryFileType(FTPClient ftpClient) {try {ftpClient.setFileType(FTP.BINARY_FILE_TYPE);} catch (IOException e) {throw new RuntimeException("设置传输二进制文件失败", e);}}/*** 上传文件到FTP服务器的根目录* @param host FTP服务器地址* @param port FTP服务器端口号,默认为21* @param username 用户名* @param password 密码* @param localFile 本地要上传的文件* @return 成功返回true,否则返回false*/public static boolean uploadFileToRoot(String host, int port, String username, String password, File localFile) {FTPClient ftpClient = null;try {ftpClient = connectAndLogin(host, port, username, password);setBinaryFileType(ftpClient);String remoteFileName = localFile.getName();return ftpClient.storeFile(remoteFileName, new FileInputStream(localFile));} catch (IOException e) {log.error("上传文件失败", e);return false;} finally {assert ftpClient != null;disconnect(ftpClient);}}/*** 上传文件到FTP服务器的指定路径* @param host FTP服务器地址* @param port FTP服务器端口号,默认为21* @param username 用户名* @param password 密码* @param remotePath FTP服务器上的目标路径* @param localFile 本地要上传的文件* @return 成功返回true,否则返回false*/public static boolean uploadFileToPath(String host, int port, String username, String password, String remotePath, File localFile) {FTPClient ftpClient = null;try {ftpClient = connectAndLogin(host, port, username, password);setBinaryFileType(ftpClient);createRemoteDirectories(ftpClient, remotePath);String remoteFileName = localFile.getName();String fullRemotePath = remotePath + "/" + remoteFileName;return ftpClient.storeFile(fullRemotePath, new FileInputStream(localFile));} catch (IOException e) {log.error("上传文件失败", e);return false;} finally {assert ftpClient != null;disconnect(ftpClient);}}/*** 在FTP服务器上创建指定路径所需的所有目录* @param ftpClient FTP客户端* @param remotePath 需要创建的远程路径* @throws IOException 如果在创建目录时发生错误*/private static void createRemoteDirectories(FTPClient ftpClient, String remotePath) throws IOException {String[] directories = remotePath.split("/");String currentPath = "";for (String dir : directories) {if (!dir.isEmpty()) {currentPath += "/" + dir;if (!ftpClient.changeWorkingDirectory(currentPath)) {if (!ftpClient.makeDirectory(dir)) {throw new IOException("无法创建远程目录: " + currentPath);}ftpClient.changeWorkingDirectory(dir);}}}}
}
执行后查看ftp服务器
发现根目录下和文件夹下都有上传的文件了
批量数据生成
笔者这里只模拟生成500万条数据,供测试使用
批处理工具类
package com.execute.batch.executebatch.utils;import jakarta.annotation.Resource;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;import java.util.List;
import java.util.function.BiFunction;@Component
public class BatchInsertUtil {@Resourceprivate final SqlSessionFactory sqlSessionFactory;public BatchInsertUtil(SqlSessionFactory sqlSessionFactory) {this.sqlSessionFactory = sqlSessionFactory;}/*** 批量插入数据* @param entityList 待插入的数据列表* @param mapperClass 映射器接口的Class对象*/@SuppressWarnings("all")public <T,U,R> int batchInsert(List<T> entityList, Class<U> mapperClass, BiFunction<T,U,R> function) {int i = 1;SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);try {U mapper = sqlSession.getMapper(mapperClass);for (T entity : entityList) {function.apply(entity,mapper);i++;}sqlSession.flushStatements();sqlSession.commit();} catch (Exception e) {throw new RuntimeException("批量插入数据失败", e);}finally {sqlSession.close();}return i-1;}
}
跑批数据
package com.execute.batch.executebatch;import com.execute.batch.executebatch.entity.User;
import com.execute.batch.executebatch.mapper.UserMapper;
import com.execute.batch.executebatch.utils.BatchInsertUtil;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** DataSeeder 批量生成数据* @author hulei*/
@Component
public class DataSeeder implements CommandLineRunner {@Resourceprivate ApplicationContext applicationContext;private ExecutorService executorService;private static final int TOTAL_RECORDS = 5000000;private static final int BATCH_SIZE = 10000;private static final int THREAD_POOL_SIZE = 10;@PostConstructpublic void init() {executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);}@Overridepublic void run(String... args) {long startTime = System.currentTimeMillis();List<Runnable> tasks = new ArrayList<>();for (int i = 0; i < TOTAL_RECORDS; i += BATCH_SIZE) {int finalI = i;tasks.add(() -> insertBatch(finalI, BATCH_SIZE));}tasks.forEach(executorService::execute);executorService.shutdown();long endTime = System.currentTimeMillis();System.out.println("Total time taken: " + (endTime - startTime) / 1000 + " seconds.");}public void insertBatch(int startId, int batchSize) {List<User> batch = new ArrayList<>(batchSize);Random random = new Random();for (int i = 0; i < batchSize; i++) {User user = createUser(startId + i, random);batch.add(user);System.out.println(user);}BatchInsertUtil util = new BatchInsertUtil(applicationContext.getBean(SqlSessionFactory.class));util.batchInsert(batch, UserMapper.class, (item,mapper)-> mapper.insertBatch(item));}private User createUser(int id, Random random) {User user = new User();user.setId(id);user.setName("User" + id);user.setEmail("user" + id + "@example.com");user.setAge(random.nextInt(80) + 20); // 年龄在20到99之间user.setAddress("Address" + id);user.setPhoneNumber("1234567890"); // 简化处理,实际应生成随机电话号码return user;}
}
整个生成过程是十分漫长的,40分钟左右,数据查询结果生成了500万条数据
测试上传ftp
下面展示的两个是mybatis手动分页的写法,如果有其他查询参数,则可以建一个实体类,把rowbounds参数囊括进去作为一个属性即可
mapper接口层
mybatis的xml
测试用例
package com.execute.batch.executebatch.controller;import com.execute.batch.executebatch.entity.User;
import com.execute.batch.executebatch.mapper.UserMapper;
import com.execute.batch.executebatch.utils.FtpUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.RowBounds;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BiConsumer;/*** @author hulei* @date 2024/5/22 10:42*/@RestController
@RequestMapping("/FTP")
@Slf4j
public class FTPController {@Resourceprivate UserMapper userMapper;private final Object lock = new Object();@GetMapping(value = "/upload")public void upload() throws InterruptedException {String host = "192.168.1.103";int port = 21; // 默认FTP端口String username = "hulei";String password = "hulei";int pageSize = 450000;int offset = 0;int uploadCycle = 0;int totalUploaded = 0;boolean noData = false;while (true) {uploadCycle++;// 将查询结果处理并写入本地文件File tempFile = new File("D:/FTPFile", "user_data_" + uploadCycle + ".txt");while (true) {RowBounds rowBounds = new RowBounds(offset, pageSize);List<User> list = userMapper.queryBatch(rowBounds);if (!list.isEmpty()) {MultiThreadWriteToFile(list, tempFile, getConsumer());offset += pageSize;totalUploaded += list.size();}if (list.isEmpty()) {noData = true;break;}// 检查总数据量是否达到500000,如果达到则上传文件if (totalUploaded >= 600000) {break;}}// 上传本地文件到FTP服务器if(!tempFile.exists()){break;}boolean uploadSuccess = FtpUtil.uploadFileToRoot(host, port, username, password, tempFile);if (uploadSuccess) {System.out.println("文件上传成功");} else {System.out.println("文件上传失败");}System.out.println("上传完成,已上传" + uploadCycle + "个批次");totalUploaded = 0;if (noData) {break;}}}private <T> void MultiThreadWriteToFile(List<T> list, File tempFile, BiConsumer<BufferedWriter, T> writeItemConsumer) throws InterruptedException {Path filePath = tempFile.toPath(); // 将文件对象转换为路径对象,用于后续的文件写入操作。try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8,StandardOpenOption.CREATE, // 如果文件不存在则创建StandardOpenOption.WRITE, // 打开文件进行写入StandardOpenOption.APPEND)) { // 追加模式写入,而不是覆盖 // 使用 UTF-8 编码打开文件缓冲写入器。ExecutorService executor = Executors.newFixedThreadPool(10); // 创建一个固定大小的线程池,包含10个线程。BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(list.size()); // 创建一个阻塞队列,用于存储要处理的任务索引。for (int i = 0; i < list.size(); i++) { // 预填充任务队列,为每个列表元素创建一个任务。taskQueue.add(i);}for (int i = 0; i < list.size(); i++) { // 提取队列中的索引,并提交相应的任务给线程池执行。int index = taskQueue.take();executor.submit(() -> writeItemConsumer.accept(writer, list.get(index)));}executor.shutdown(); // 关闭线程池,等待所有任务完成。boolean terminated = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // 等待线程池中的所有任务完成。if (!terminated) { // 如果线程池在指定时间内未能关闭,则记录警告信息。log.warn("线程池关闭超时");}} catch (IOException e) { // 捕获并记录文件操作相关的异常。log.error("创建或写入文件发生错误: {},异常为: {}", tempFile.getAbsolutePath(), e.getMessage());}}private BiConsumer<BufferedWriter, User> getConsumer() {return (writer, item) -> {String str = String.join("|",String.valueOf(item.getId()),item.getName(),item.getEmail(),String.valueOf(item.getAge()),item.getAddress(),item.getPhoneNumber());log.info("告警入湖数据拼接字符串:{}", str);try {synchronized (lock) {writer.write(str);writer.newLine();}} catch (IOException e) {log.error("写入告警入湖数据发生异常: {}", e.getMessage());}};}
}
简单分析下:分页查询数据,每次查询pageSize条数据,写入一个txt文件,当写入的总条数超过totalUpload时,就跳出内部while循环,上传当前txt文件。然后进入第二次外层while循环,创建第二个txt文件,内部循环分页查询数据写入第二个txt文件。。。以此类推,直至最后查不出数据为止。
注意:pageSize和totalUpload最好是倍数关系,比如pageSize = 50000,那么totalUpload最好是pageSize 的整数倍,如100000,150000,200000,这样可以保证当文件数较多时,大部分的文件中数据条数一样。
以下是我分批上传到ftp服务器的文件,一共500万条数据,字段做了处理,使用 | 拼接
写入的数据是乱序的,要求顺序写入的话,就不要使用多线程了。