一、摘要(本系列汇总说明)
- 总纲
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(一)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(二)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(三)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(四)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(五)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(六)
- 篇章内容说明
第一篇:基础篇,讲FTP常规上传下载实现、SFTP常规上传下载实现、单元测试类
第二篇:FTP高级篇,讲FTP上传进度监控、断点续传,FTP下载进度监控、断点续传
第三篇:SFTP高级篇,讲SFTP上传进度监控、断点续传,SFTP下载进度监控、断点续传
第四篇:FTP进阶篇,讲FTP池化处理(连接池封装)
第五篇:SFTP进阶篇,讲SFTP池化处理(连接池封装)
第六篇:汇总篇,包含前面1~5篇所有内容,且增加更高级的相关知识点
- 本篇
本文是FTP高级篇,讲FTP上传进度监控、断点续传,FTP下载进度监控、断点续传
二、环境
- SpringBoot 2.7.18 官方下载地址:SpringBoot 2.7.18
- commons-net-3.10.0.jar 官方下载地址:commons-net-3.10.0.jar
- commons-pool2-2.12.0.jar 官方下载地址:commons-pool2-2.12.0.jar
- jsch-0.1.55.jar 官方下载地址:jsch-0.1.55.jar
- Oracle JDK8u202(Oracle JDK8最后一个非商业版本) 下载地址:Oracle JDK8u202
- FileZilla Client 官方下载地址:FileZilla Client
注意:
- (特别是MacOS用户)FileZilla有MacOS版本,下载客户端是下Client,不是Server(注意一下名字,不要下错了)。
三、POM依赖
该系列文章通用,几篇FTP文章的pom文件都一样
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>person.brickman</groupId><artifactId>ftp</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><httpclient.version>4.5.14</httpclient.version><!-- 工具 --><lombok.version>1.18.32</lombok.version><commons-logging.version>1.3.1</commons-logging.version><commons-lang3.version>3.14.0</commons-lang3.version><commons-io.version>2.15.1</commons-io.version><commons-configuration.version>1.10</commons-configuration.version><commons-net.version>3.10.0</commons-net.version><commons-pool2.version>2.12.0</commons-pool2.version><jsch.version>0.1.55</jsch.version>
<!-- <sshd.version>2.12.1</sshd.version>--><!-- 2.20.1 2.22.2 3.0.0-M2 3.2.5 --><maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version><!-- 3.0.1 2.4 --><maven-source-plugin.version>3.0.1</maven-source-plugin.version><!--忽略本包测试--><maven.test.skip>false</maven.test.skip><skipTests>false</skipTests></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId><scope>provided</scope></dependency><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><!-- 工具 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>commons-net</groupId><artifactId>commons-net</artifactId><version>${commons-net.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>${commons-logging.version}</version></dependency><dependency><groupId>com.jcraft</groupId><artifactId>jsch</artifactId><version>${jsch.version}</version></dependency><!-- 测试相关 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.6</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-source-plugin</artifactId><version>${maven-source-plugin.version}</version><configuration><attach>true</attach></configuration><executions><execution><phase>compile</phase></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version><executions><execution><id>deploy</id><phase>deploy</phase><goals><goal>deploy</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><!-- 跳过失败的单元测试 --><testFailureIgnore>false</testFailureIgnore><skipTests>${skipTests}</skipTests><argLine>${junit.test.params} -Xmx512m -XX:MaxPermSize=256m</argLine></configuration></plugin></plugins></build></project>
四、实现类
1、(常量)接口类:FtpKeyValue
接口类,读者觉得常量类更顺眼也可以改(阿里原装的)
package person.brickman.ftp.consts;/*** alibaba.datax.ftpreader** @author datax*/
public interface FtpKeyValue {/*** FTP 常用键定义*/String PROTOCOL = "protocol";String HOST = "host";String USERNAME = "username";String PASSWORD = "password";String PORT = "port";String TIMEOUT = "timeout";String CONNECTPATTERN = "connectPattern";String PATH = "path";String MAXTRAVERSALLEVEL = "maxTraversalLevel";/*** 默认值定义*/int DEFAULT_FTP_PORT = 21;int DEFAULT_SFTP_PORT = 22;int DEFAULT_TIMEOUT = 60000;int DEFAULT_MAX_TRAVERSAL_LEVEL = 100;String DEFAULT_FTP_CONNECT_PATTERN = "PASV";String CONTROL_ENCODING = "utf8";String NO_SUCH_FILE = "no such file";char C_STAR = '*';String STAR = "*";char C_QUESTION = '?';String QUESTION = "?";String SLASH = "/";String DOT = ".";String DOUBLE_DOT = "..";
}
2、统一抽象类:AbstractFtpHelper
抽象类,不管ftp还是sftp都使用此抽象类,而不是直接操作实现类
package person.brickman.ftp;import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Set;/*** alibaba.datax.ftpreader** @author datax*/
public abstract class AbstractFtpHelper {public abstract void setFtpClient(Object ftpClient);public abstract Object getFtpClient() ;/*** 与ftp服务器建立连接** @param @param host* @param @param username* @param @param password* @param @param port* @param @param timeout* @param @param connectMode PASV PORT* @return void* @throws*/public abstract void loginFtpServer(String host, String username, String password, int port, int timeout, String connectMode) throws InterruptedException;/*** 断开与ftp服务器的连接** @param* @return void* @throws*/public abstract void logoutFtpServer();/*** 判断指定路径是否是目录** @param @param directoryPath* @param @return* @return boolean* @throws*/public abstract boolean isDirExist(String directoryPath);/*** 判断指定路径是否是文件** @param @param filePath* @param @return* @return boolean* @throws*/public abstract boolean isFileExist(String filePath);/*** 判断指定路径是否是软链接** @param @param filePath* @param @return* @return boolean* @throws*/public abstract boolean isSymbolicLink(String filePath);/*** 递归获取指定路径下符合条件的所有文件绝对路径** @param @param directoryPath* @param @param parentLevel 父目录的递归层数(首次为0)* @param @param maxTraversalLevel 允许的最大递归层数* @param @return* @return HashSet<String>* @throws*/public abstract HashSet<String> getAllFilesInDir(String directoryPath, int parentLevel, int maxTraversalLevel);/*** 获取指定路径的输入流** @param @param filePath* @param @return* @return InputStream* @throws*/public abstract InputStream getInputStream(String filePath);/*** 写入指定路径的输出流** @param @param filePath* @param @return* @return InputStream* @throws*/public abstract OutputStream getOutputStream(String filePath);/*** 写入指定路径的输出流** @param @param filePath* @param @param mode OVERWRITE = 0; RESUME = 1; APPEND = 2;* @param @return* @return InputStream* @throws*/public abstract OutputStream getOutputStream(String filePath, int mode);/*** 获取指定路径列表下符合条件的所有文件的绝对路径** @param @param srcPaths 路径列表* @param @param parentLevel 父目录的递归层数(首次为0)* @param @param maxTraversalLevel 允许的最大递归层数* @param @return* @return HashSet<String>* @throws*/public HashSet<String> getAllFilesInDir(List<String> srcPaths, int parentLevel, int maxTraversalLevel) {HashSet<String> sourceAllFiles = new HashSet<String>();if (!srcPaths.isEmpty()) {for (String eachPath : srcPaths) {sourceAllFiles.addAll(getAllFilesInDir(eachPath, parentLevel, maxTraversalLevel));}}return sourceAllFiles;}/*** 创建远程目录* 不支持递归创建, 比如 mkdir -p** @param directoryPath*/public abstract void mkdir(String directoryPath);/*** 创建远程目录* 支持目录递归创建** @param directoryPath*/public abstract void mkDirRecursive(String directoryPath);/*** Q:After I perform a file transfer to the server,* printWorkingDirectory() returns null. A:You need to call* completePendingCommand() after transferring the file. wiki:* http://wiki.apache.org/commons/Net/FrequentlyAskedQuestions*/public abstract void completePendingCommand();/*** 删除文件* warn: 不支持文件夹删除, 比如 rm -rf** @param filesToDelete*/public abstract void deleteFiles(Set<String> filesToDelete);/*** 移动文件* warn: 不支持文件夹删除, 比如 rm -rf** @param filesToMove* @param targetPath*/public abstract void moveFiles(Set<String> filesToMove, String targetPath);
}
3、FTP实现类:StandardFtpHelper
FTP上传下载实现类
package person.brickman.ftp;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import person.brickman.ftp.consts.FtpKeyValue;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;/*** alibaba.datax.ftpreader** @author datax*/
@Slf4j
public class StandardFtpHelper extends AbstractFtpHelper {FTPClient ftpClient = null;HashSet<String> sourceFiles = new HashSet<String>();@Overridepublic void setFtpClient(Object ftpClient) {this. ftpClient=(FTPClient)ftpClient;}@Overridepublic Object getFtpClient() {return ftpClient;}@Overridepublic void loginFtpServer(String host, String username, String password, int port, int timeout,String connectMode) {ftpClient = new FTPClient();try {// 连接ftpClient.connect(host, port);// 登录ftpClient.login(username, password);// 不需要写死ftp server的OS TYPE,FTPClient getSystemType()方法会自动识别/// ftpClient.configure(new FTPClientConfig(FTPClientConfig.SYST_UNIX));ftpClient.setConnectTimeout(timeout);ftpClient.setDataTimeout(timeout);if ("PASV".equals(connectMode)) {ftpClient.enterRemotePassiveMode();ftpClient.enterLocalPassiveMode();} else if ("PORT".equals(connectMode)) {ftpClient.enterLocalActiveMode();/// ftpClient.enterRemoteActiveMode(host, port);}int reply = ftpClient.getReplyCode();if (!FTPReply.isPositiveCompletion(reply)) {ftpClient.disconnect();String message = String.format("与ftp服务器建立连接失败,请检查用户名和密码是否正确: [%s]","message:host =" + host + ",username = " + username + ",port =" + port);log.error(message);throw new RuntimeException(message);}//设置命令传输编码String fileEncoding = System.getProperty("file.encoding");ftpClient.setControlEncoding(fileEncoding);} catch (UnknownHostException e) {String message = String.format("请确认ftp服务器地址是否正确,无法连接到地址为: [%s] 的ftp服务器", host);log.error(message);throw new RuntimeException(message);} catch (IllegalArgumentException e) {String message = String.format("请确认连接ftp服务器端口是否正确,错误的端口: [%s] ", port);log.error(message);throw new RuntimeException(message);} catch (Exception e) {String message = String.format("与ftp服务器建立连接失败 : [%s]","message:host =" + host + ",username = " + username + ",port =" + port);log.error(message);throw new RuntimeException(message);}}@Overridepublic void logoutFtpServer() {if (ftpClient.isConnected()) {try {//todo ftpClient.completePendingCommand();//打开流操作之后必须,原因还需要深究ftpClient.logout();} catch (IOException e) {String message = "与ftp服务器断开连接失败";log.error(message);throw new RuntimeException(message);} finally {if (ftpClient.isConnected()) {try {ftpClient.disconnect();} catch (IOException e) {String message = "与ftp服务器断开连接失败";log.error(message);throw new RuntimeException(message);}}}}}@Overridepublic boolean isDirExist(String directoryPath) {try {return ftpClient.changeWorkingDirectory(new String(directoryPath.getBytes(), FTP.DEFAULT_CONTROL_ENCODING));} catch (IOException e) {String message = String.format("进入目录:[%s]时发生I/O异常,请确认与ftp服务器的连接正常", directoryPath);log.error(message);throw new RuntimeException(message);}}@Overridepublic boolean isFileExist(String filePath) {boolean isExitFlag = false;try {FTPFile[] ftpFiles = ftpClient.listFiles(new String(filePath.getBytes(), FTP.DEFAULT_CONTROL_ENCODING));if (ftpFiles.length == 1 && ftpFiles[0].isFile()) {isExitFlag = true;}} catch (IOException e) {String message = String.format("获取文件:[%s] 属性时发生I/O异常,请确认与ftp服务器的连接正常", filePath);log.error(message);throw new RuntimeException(message);}return isExitFlag;}@Overridepublic boolean isSymbolicLink(String filePath) {boolean isExitFlag = false;try {FTPFile[] ftpFiles = ftpClient.listFiles(new String(filePath.getBytes(), FTP.DEFAULT_CONTROL_ENCODING));if (ftpFiles.length == 1 && ftpFiles[0].isSymbolicLink()) {isExitFlag = true;}} catch (IOException e) {String message = String.format("获取文件:[%s] 属性时发生I/O异常,请确认与ftp服务器的连接正常", filePath);log.error(message);throw new RuntimeException(message);}return isExitFlag;}@Overridepublic HashSet<String> getAllFilesInDir(String directoryPath, int parentLevel, int maxTraversalLevel) {if (parentLevel < maxTraversalLevel) {// 父级目录,以'/'结尾String parentPath = null;int pathLen = directoryPath.length();if (directoryPath.contains(FtpKeyValue.STAR) || directoryPath.contains(FtpKeyValue.QUESTION)) {// path是正则表达式String subPath = UnstructuredStorageReaderUtil.getRegexPathParentPath(directoryPath);if (isDirExist(subPath)) {parentPath = subPath;} else {String message = String.format("不能进入目录:[%s]," + "请确认您的配置项path:[%s]存在,且配置的用户有权限进入", subPath, directoryPath);log.error(message);throw new RuntimeException(message);}} else if (isDirExist(directoryPath)) {// path是目录if (directoryPath.charAt(pathLen - 1) == File.separatorChar) {parentPath = directoryPath;} else {parentPath = directoryPath + File.separatorChar;}} else if (isFileExist(directoryPath)) {// path指向具体文件sourceFiles.add(directoryPath);return sourceFiles;} else if (isSymbolicLink(directoryPath)) {//path是链接文件String message = String.format("文件:[%s]是链接文件,当前不支持链接文件的读取", directoryPath);log.error(message);throw new RuntimeException(message);} else {String message = String.format("请确认您的配置项path:[%s]存在,且配置的用户有权限读取", directoryPath);log.error(message);throw new RuntimeException(message);}try {FTPFile[] fs = ftpClient.listFiles(new String(directoryPath.getBytes(), FTP.DEFAULT_CONTROL_ENCODING));for (FTPFile ff : fs) {String strName = ff.getName();String filePath = parentPath + strName;if (ff.isDirectory()) {if (!(strName.equals(FtpKeyValue.DOT) || strName.equals(FtpKeyValue.DOUBLE_DOT))) {//递归处理getAllFilesInDir(filePath, parentLevel + 1, maxTraversalLevel);}} else if (ff.isFile()) {// 是文件sourceFiles.add(filePath);} else if (ff.isSymbolicLink()) {//是链接文件String message = String.format("文件:[%s]是链接文件,当前不支持链接文件的读取", filePath);log.error(message);throw new RuntimeException(message);} else {String message = String.format("请确认path:[%s]存在,且配置的用户有权限读取", filePath);log.error(message);throw new RuntimeException(message);}}} catch (IOException e) {String message = String.format("获取path:[%s] 下文件列表时发生I/O异常,请确认与ftp服务器的连接正常", directoryPath);log.error(message);throw new RuntimeException(message);}return sourceFiles;} else {//超出最大递归层数String message = String.format("获取path:[%s] 下文件列表时超出最大层数,请确认路径[%s]下不存在软连接文件", directoryPath, directoryPath);log.error(message);throw new RuntimeException(message);}}@Overridepublic InputStream getInputStream(String filePath) {try {return ftpClient.retrieveFileStream(new String(filePath.getBytes(), FTP.DEFAULT_CONTROL_ENCODING));} catch (IOException e) {String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath);log.error(message);throw new RuntimeException(message);}}@Overridepublic OutputStream getOutputStream(String filePath){return getOutputStream(filePath, 2);}@Overridepublic OutputStream getOutputStream(String filePath, int mode) {try {this.printWorkingDirectory();String parentDir = filePath.substring(0, filePath.lastIndexOf(File.separatorChar));this.ftpClient.changeWorkingDirectory(parentDir);this.printWorkingDirectory();OutputStream writeOutputStream = this.ftpClient.appendFileStream(filePath);String message = String.format("打开FTP文件[%s]获取写出流时出错,请确认文件%s有权限创建,有权限写出等", filePath, filePath);if (null == writeOutputStream) {throw new RuntimeException(message);}return writeOutputStream;} catch (IOException e) {String message = String.format("写出文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限写, errorMessage:%s", filePath, filePath, e.getMessage());log.error(message);throw new RuntimeException(message);}}@Overridepublic void mkdir(String directoryPath) {String message = String.format("创建目录:%s时发生异常,请确认与ftp服务器的连接正常,拥有目录创建权限", directoryPath);try {this.printWorkingDirectory();boolean isDirExist = this.ftpClient.changeWorkingDirectory(directoryPath);if (!isDirExist) {int replayCode = this.ftpClient.mkd(directoryPath);message = String.format("%s,replayCode:%s", message, replayCode);if (replayCode != FTPReply.COMMAND_OK && replayCode != FTPReply.PATHNAME_CREATED) {throw new RuntimeException(message);}}} catch (IOException e) {message = String.format("%s, errorMessage:%s", message, e.getMessage());log.error(message);throw new RuntimeException(message);}}@Overridepublic void mkDirRecursive(String directoryPath) {StringBuilder dirPath = new StringBuilder();dirPath.append(FtpKeyValue.SLASH);String[] dirSplit = StringUtils.split(directoryPath, FtpKeyValue.SLASH);String message = String.format("创建目录:%s时发生异常,请确认与ftp服务器的连接正常,拥有目录创建权限", directoryPath);try {// ftp server不支持递归创建目录,只能一级一级创建for (String dirName : dirSplit) {dirPath.append(dirName);boolean mkdirSuccess = mkDirSingleHierarchy(dirPath.toString());dirPath.append(FtpKeyValue.SLASH);if (!mkdirSuccess) {throw new RuntimeException(message);}}} catch (IOException e) {message = String.format("%s, errorMessage:%s", message, e.getMessage());log.error(message);throw new RuntimeException(message);}}@Overridepublic void completePendingCommand() {/** Q:After I perform a file transfer to the server,* printWorkingDirectory() returns null. A:You need to call* completePendingCommand() after transferring the file. wiki:* http://wiki.apache.org/commons/Net/FrequentlyAskedQuestions*/try {boolean isOk = this.ftpClient.completePendingCommand();if (!isOk) {throw new RuntimeException("完成ftp completePendingCommand操作发生异常");}} catch (IOException e) {String message = String.format("完成ftp completePendingCommand操作发生异常, errorMessage:%s", e.getMessage());log.error(message);throw new RuntimeException(message);}}@Overridepublic void deleteFiles(Set<String> filesToDelete) {String eachFile = null;boolean deleteOk = false;try {this.printWorkingDirectory();for (String each : filesToDelete) {log.info(String.format("delete file [%s].", each));eachFile = each;deleteOk = this.ftpClient.deleteFile(each);if (!deleteOk) {String message = String.format("删除文件:[%s] 时失败,请确认指定文件有删除权限", eachFile);throw new RuntimeException(message);}}} catch (IOException e) {String message = String.format("删除文件:[%s] 时发生异常,请确认指定文件有删除权限,以及网络交互正常, errorMessage:%s", eachFile, e.getMessage());log.error(message);throw new RuntimeException(message);}}@Overridepublic void moveFiles(Set<String> filesToMove, String targetPath) {if (StringUtils.isBlank(targetPath)) {throw new RuntimeException("目标目录路径为空!" );}String eachFile = null;try {this.printWorkingDirectory();// 创建目录mkdir(targetPath);for (String each : filesToMove) {eachFile = each;String targetName = String.format("%s%s" , targetPath.endsWith(File.separator) ?targetPath.substring(0, targetPath.length() - 1) : targetPath, each.substring(each.lastIndexOf(File.separator)));log.info(String.format("rename file [%s] to [%s] ." , each, targetName));this.ftpClient.rename(each, targetPath);}} catch (IOException e) {String message = String.format("移动文件:[%s] 时发生异常,请确认指定文件有删除权限,以及网络交互正常, errorMessage:%s" , eachFile, e.getMessage());log.error(message);throw new RuntimeException(message);}}public boolean mkDirSingleHierarchy(String directoryPath) throws IOException {boolean isDirExist = this.ftpClient.changeWorkingDirectory(directoryPath);// 如果directoryPath目录不存在,则创建if (!isDirExist) {int replayCode = this.ftpClient.mkd(directoryPath);if (replayCode != FTPReply.COMMAND_OK && replayCode != FTPReply.PATHNAME_CREATED) {return false;}}return true;}private void printWorkingDirectory() {try {log.info(String.format("current working directory:%s", this.ftpClient.printWorkingDirectory()));} catch (Exception e) {log.warn(String.format("printWorkingDirectory error:%s", e.getMessage()));}}
}
4、FTP进度监控实现类:FileProgressMonitor
- FTP上传下载进度监控实现类
- 此类可以日志中打印上传/下载进度
- 日志打印的精度可通过调整代码中被除数的陪数控制
- 实际应用中如果是web应用可通过session变量实现前台页面进度展示
- 前台进度更新精度实现同程序日志进度打印精度控制逻辑
package person.brickman.ftp;import com.jcraft.jsch.SftpProgressMonitor;
import lombok.extern.slf4j.Slf4j;
/*** @Description: sftp 上传下载进度监控* @Author brickman* @CreateDate: 2025/1/2 20:30* @Version: 1.0*/
@Slf4j
public class FileProgressMonitor implements SftpProgressMonitor {private long count = 0; //当前接收的总字节数private long max = 0; /* 最终文件大小 */private long percent = -1;//进度/*** 当每次传输了一个数据块后,调用count方法,count方法的参数为这一次传输的数据块大小* 大** 这里显示的百分比是整数*/@Overridepublic boolean count(long count) {this.count += count;if (percent >= this.count * 100 / max) {return true;}percent = this.count * 100 / max;log.info("Completed {}({}%) out of {}.", this.count, percent, max ); //打印当前进度return true;}/*** 大大* 当传输结束时,调用end方法* 大*/@Overridepublic void end() {log.info("Transferring done.");}/*** 当文件开始传输时,调用init方法* 大*/@Overridepublic void init(int op, String src, String dest, long max) {log.info("Transferring begin. ");this.max = max;this.count = 0;this.percent = -1;}
}
5、文件过滤器 CustomFTPFileFilter
目前判断服务器文件是否存在时用,因不能直接判断某个文件是否存在,所以先列目录过滤出文件,再做判断
package person.brickman.ftp;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPFileFilter;/*** @Description: ftp 文件过滤用,* 目前判断服务器文件是否存在时用,因不能直接判断某个文件是否存在,所以先列目录过滤出文件,再做判断* @Author brickman* @CreateDate: 2025/1/2 20:30* @Version: 1.0*/
@Slf4j
public class CustomFTPFileFilter implements FTPFileFilter {private String fileName;public CustomFTPFileFilter(String fileName){this.fileName=fileName;}@Overridepublic boolean accept(FTPFile ftpFile) {// 文件名String name = ftpFile.getName();
// log.info("Objects.equals(name,fileName ) :{}",Objects.equals(name,fileName ) );
// log.info("name:{}, fileName:{}",name, fileName );// 获取指定文件return fileName.equalsIgnoreCase( name ) ;}
}
五、【重点】单元测试类
1、FTP单元测试类:StandardFtpHelperTest
使用springboot自带junit5实现
testUploadWithRESUME 上传支持断点续传的单元测试方法
testUploadWithProgressMonitor 上传支持进度监控的单元测试方法
testUploadWithProgressMonitorWithRESUME 上传支持断点续传和进度监控的单元测试方法
testDownloadWithProgressMonitor 下载支持断点续传的单元测试方法
testDownloadWithRESUME 下载支持进度监控的单元测试方法
testDownloadWithProgerssMonitorWithRESUME 下载支持断点续传和进度监控的单元测试方法
说明:
读者本地运行前先修改单元测试类中的常量:ip、port、username、password、路径、文件名等
package persion.brickman.ftp;import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import person.brickman.ftp.AbstractFtpHelper;
import person.brickman.ftp.CustomFTPFileFilter;
import person.brickman.ftp.StandardFtpHelper;
import person.brickman.ftp.consts.FtpKeyValue;import java.io.*;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Set;/*** @Description: 标准ftp单元测试类(不用池)** 上传:含 进度监控、断点续传* 下载:含 进度监控、断点续传* @Author: brickman* @CreateDate: 2025/1/2 20:30* @Version: 1.0*/
@Slf4j
@SpringBootTest
public class StandardFtpHelperTest {private static final int BUFFER_SIZE = 1024;String host = "172.16.0.232";int port = 21;String username = "ftpuser";String password = "ftpuser";String localDir = "/Users/brickman/tmp";String uploadLocalFileName = "工作JUDE_v5测试.zip";String uploadLocalFilePath = localDir+"/"+uploadLocalFileName;String remoteDir = "/usr/local/ftpdir";String downloadRemoteFileName = "工作JUDE_v5测试.zip";String downloadRemoteFilePath = remoteDir+"/"+downloadRemoteFileName;AbstractFtpHelper ftpHelper = new StandardFtpHelper();@Test@Disabledpublic void testGetFtpClient() {}@Test@Disabledpublic void testLoginFtpServer() {}/*** 使用 jsch* 文件上传,普通上传(覆盖)* @throws SftpException*/@Test@Disabledpublic void testUpload() throws SftpException, InterruptedException, IOException {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String remoteFileName = method+"-测试.zip";String remoteFilePath = remoteDir+"/"+remoteFileName;// 单位:毫秒int timeout = 60*1000;String connectMode= FtpKeyValue.DEFAULT_FTP_CONNECT_PATTERN;ftpHelper.loginFtpServer( host, username, password, port, timeout, connectMode );FTPClient ftp = (FTPClient)ftpHelper.getFtpClient();log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());ftp.cwd(remoteDir);
// ftp.changeWorkingDirectory(remoteDir);
// ftp.changeWorkingDirectory(remoteDir);log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());//long start = System.currentTimeMillis();// 上传文件InputStream is = null;try{// 读源文件上传is = Files.newInputStream(new File(uploadLocalFilePath).toPath());// 如果使用断点续传,这个必须设置ftp.setFileType( FTP.BINARY_FILE_TYPE);// 解决中文名乱码问题remoteFilePath = new String(remoteFilePath.getBytes("UTF-8"),"ISO-8859-1");boolean ret = ftp.storeFile(remoteFilePath, is );Assertions.assertTrue(ret);log.info("File transfer(upload) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, remoteFilePath);log.info(" localFilePath:{}, ", uploadLocalFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( ftpHelper.isFileExist(remoteFilePath) );Set<String> set = new HashSet<String>();set.add(remoteFilePath);ftpHelper.deleteFiles(set);}catch(Exception e){log.error("upload failed dir:{} fileName:{} isCover: {}", localDir, uploadLocalFileName, true, e);}finally {try {is.close();} catch (IOException e) {e.printStackTrace();}if (!FTPReply.isPositiveIntermediate(ftp.getReplyCode())) {ftpHelper.logoutFtpServer();// 包含logout和disconnectlog.error("File transfer failed.");}// 一定要放在关流之后
// if (null!=ftp && !ftp.completePendingCommand()) {try {ftpHelper.logoutFtpServer();// 包含logout和disconnect} catch (Exception e) {e.printStackTrace();}
// }}}/*** 使用 jsch* 文件上传 , 带断点续传* @throws SftpException*/@Test@Disabledpublic void testUploadWithRESUME() throws SftpException, InterruptedException, IOException {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String remoteFileName = method+"-测试.zip";String remoteFilePath = remoteDir+"/"+remoteFileName;// 单位:毫秒int timeout = 60*1000;String connectMode= FtpKeyValue.DEFAULT_FTP_CONNECT_PATTERN;ftpHelper.loginFtpServer( host, username, password, port, timeout, connectMode );FTPClient ftp = (FTPClient)ftpHelper.getFtpClient();log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());ftp.cwd(remoteDir);
// ftp.changeWorkingDirectory(remoteDir);
// ftp.changeWorkingDirectory(remoteDir);log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());//File f = new File(uploadLocalFilePath);FTPFile[] files = ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) );log.info("ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) ).length:{}",ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) ).length);long start = System.currentTimeMillis();// 上传文件InputStream is = null;try{// 读源文件上传is = new FileInputStream( f );// 远程文件存在if(files.length == 1){long lRemoteSize = files[0].getSize();log.info("远程文件大小为:{}, 启动断点续传...", lRemoteSize );if( lRemoteSize >= f.length()){log.error("远程文件大小大于本地文件大小,上传中止");return;}if(!(is.skip(lRemoteSize)==lRemoteSize)){log.error("尝试断点续传失败,上传中止。可能原因:stream does not support seek, or some other I/O error occurs.");return;}ftp.setRestartOffset(lRemoteSize);}else{log.info("远程没有该文件,普通模式上传开始..." );}ftp.enterLocalPassiveMode();// 被动模式 被动模式是“服务器”开放端口给客户端连接;
// ftp.enterLocalActiveMode(); // 主动模式 主动模式传输数据时是“服务器”连接到“客户端”的端口;// 如果使用断点续传,这个必须设置ftp.setFileType( FTP.BINARY_FILE_TYPE);// 解决中文名乱码问题remoteFilePath = new String(remoteFilePath.getBytes("UTF-8"),"ISO-8859-1");boolean ret = ftp.storeFile(remoteFilePath, is );Assertions.assertTrue(ret);log.info("File transfer(upload) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, remoteFilePath);log.info(" localFilePath:{}, ", uploadLocalFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( ftpHelper.isFileExist(remoteFilePath) );Set<String> set = new HashSet<String>();set.add(remoteFilePath);ftpHelper.deleteFiles(set);}catch(Exception e){log.error("upload failed dir:{} fileName:{} isCover: {}", localDir, uploadLocalFileName, true, e);}finally {try {is.close();} catch (IOException e) {e.printStackTrace();}// 一定要放在关流之后if (null!=ftp && !ftp.completePendingCommand()) {try {ftpHelper.logoutFtpServer();// 包含logout和disconnect} catch (Exception e) {e.printStackTrace();}}}}/*** 使用 jsch* 文件上传, 普通上传(覆盖),带进度监控* @throws SftpException*/@Test@Disabledpublic void testUploadWithProgressMonitor() throws SftpException, InterruptedException, IOException {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String remoteFileName = method+"-测试.zip";String remoteFilePath = remoteDir+"/"+remoteFileName;// 单位:毫秒int timeout = 60*1000;String connectMode= FtpKeyValue.DEFAULT_FTP_CONNECT_PATTERN;ftpHelper.loginFtpServer( host, username, password, port, timeout, connectMode );FTPClient ftp = (FTPClient)ftpHelper.getFtpClient();log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());ftp.cwd(remoteDir);
// ftp.changeWorkingDirectory(remoteDir);log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());//long start = System.currentTimeMillis();// 上传文件InputStream is = null;OutputStream os = null;try{File sourceFile = new File(uploadLocalFilePath);// 读源文件上传is = new FileInputStream( sourceFile );// 如果使用断点续传,这个必须设置
// ftp.setFileType( FTP.BINARY_FILE_TYPE);// 解决中文名乱码问题
// remoteFilePath = new String(remoteFilePath.getBytes("UTF-8"),"ISO-8859-1");os = ftp.storeFileStream(new String(remoteFilePath.getBytes("UTF-8"),"ISO-8859-1") );byte[] buffer = new byte[BUFFER_SIZE];int bytesRead;long totalBytesRead = 0;long fileSize = sourceFile.length();double percentCompleted = (double) totalBytesRead / fileSize * 100;while ((bytesRead = is.read(buffer)) != -1) {os.write(buffer, 0, bytesRead);totalBytesRead += bytesRead;
// double percentCompleted = (double) totalBytesRead / fileSize * 100;
// log.info("Transfer progress: {}", String.format("%.2f%%",percentCompleted) );// 保留两位时日志打太多了/*%s: 字符串%d: 十进制整数%f: 浮点数%c: 字符%b: 布尔值%n: 换行符*/if ((int)percentCompleted >= (int)totalBytesRead * 100 / fileSize) {}else{percentCompleted = (double) totalBytesRead / fileSize * 100;log.info("Completed {}({}) out of {}.", totalBytesRead, String.format("%.0f%%",percentCompleted), fileSize ); //打印当前进度}}log.info("File transfer(upload) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, remoteFilePath);log.info(" localFilePath:{}, ", uploadLocalFilePath);// 校验并删除(单元测试不留痕) java.net.SocketException: Connection resetAssertions.assertTrue( ftpHelper.isFileExist(remoteFilePath) );Set<String> set = new HashSet<String>();set.add(remoteFilePath);ftpHelper.deleteFiles(set);}catch(Exception e){log.error("upload failed dir:{} fileName:{} isCover: {}", localDir, uploadLocalFileName, true, e);}finally {try {is.close();} catch (IOException e) {e.printStackTrace();}try {os.close();} catch (IOException e) {e.printStackTrace();}// 一定要放在关流之后if (!ftp.completePendingCommand()) {try {ftpHelper.logoutFtpServer();// 包含logout和disconnect} catch (Exception e) {e.printStackTrace();}}}}/*** 使用 jsch* 文件上传, 带进度监控、带断点续传** @throws SftpException*/@Test@Disabledpublic void testUploadWithProgressMonitorWithRESUME() throws SftpException, InterruptedException, IOException {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String remoteFileName = method+"-测试.zip";String remoteFilePath = remoteDir+"/"+remoteFileName;// 单位:毫秒int timeout = 60*1000;String connectMode= FtpKeyValue.DEFAULT_FTP_CONNECT_PATTERN;ftpHelper.loginFtpServer( host, username, password, port, timeout, connectMode );FTPClient ftp = (FTPClient)ftpHelper.getFtpClient();log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());
// ftp.cwd(remoteDir);ftp.changeWorkingDirectory(remoteDir);log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());//// ftp.setControlEncoding("UTF-8");// 如果使用断点续传,这个必须设置ftp.setFileType( FTP.BINARY_FILE_TYPE);File f = new File(uploadLocalFilePath);FTPFile[] files = ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) );log.info("ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) ).length:{}",ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) ).length);long start = System.currentTimeMillis();// 上传文件InputStream is = null;OutputStream os = null;try{is = Files.newInputStream(f.toPath());byte[] buffer = new byte[BUFFER_SIZE];int bytesRead;long totalBytesRead = 0;long fileSize = f.length();// 远程文件存在if(files.length == 1){long lRemoteSize = files[0].getSize();log.info("远程文件大小为:{}, 启动断点续传...", lRemoteSize );if( lRemoteSize >= f.length()){log.error("远程文件大小大于本地文件大小,上传中止");return;}if(!(is.skip(lRemoteSize)==lRemoteSize)){log.error("尝试断点续传失败,上传中止。可能原因:stream does not support seek, or some other I/O error occurs.");return;}ftp.setRestartOffset(lRemoteSize);totalBytesRead = lRemoteSize;}else{log.info("远程没有该文件,普通模式上传开始..." );}// 解决中文名乱码问题remoteFilePath = new String(remoteFilePath.getBytes("UTF-8"),"ISO-8859-1");os = ftp.storeFileStream( remoteFilePath );double percentCompleted = (double) totalBytesRead / fileSize * 100;while ((bytesRead = is.read(buffer)) != -1) {os.write(buffer, 0, bytesRead);totalBytesRead += bytesRead;// double percentCompleted = (double) totalBytesRead / fileSize * 100;
// log.info("Transfer progress: {}", String.format("%.2f%%",percentCompleted) );// 保留两位时日志打太多了/*%s: 字符串%d: 十进制整数%f: 浮点数%c: 字符%b: 布尔值%n: 换行符*/if ((int)percentCompleted >= (int)totalBytesRead * 100 / fileSize) {}else{percentCompleted = (double) totalBytesRead / fileSize * 100;log.info("Completed {}({}) out of {}.", totalBytesRead, String.format("%.0f%%",percentCompleted), fileSize ); //打印当前进度}}log.info("File transfer(upload) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, remoteFilePath);log.info(" localFilePath:{}, ", uploadLocalFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( ftpHelper.isFileExist(remoteFilePath) );Set<String> set = new HashSet<String>();set.add(remoteFilePath);ftpHelper.deleteFiles(set);}catch(Exception e){log.error("upload failed dir:{} fileName:{} isCover: {}", localDir, uploadLocalFileName, true, e);}finally {try {is.close();} catch (IOException e) {e.printStackTrace();}try {os.close();} catch (IOException e) {e.printStackTrace();}// 一定要放在关流之后if (!ftp.completePendingCommand()) {try {ftpHelper.logoutFtpServer();// 包含logout和disconnect} catch (Exception e) {e.printStackTrace();}}}}/*** 文件下载,普通下载(覆盖)* 使用 jsch*/@Test@Disabledpublic void testDownload() throws SftpException, InterruptedException, JSchException, IOException {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String localFileName = method+"-测试.zip";String localFilePath = localDir+"/"+localFileName;// 单位:毫秒int timeout = 60*1000;// FTP主动模式(port)和被动模式(PASV)String connectMode=FtpKeyValue.DEFAULT_FTP_CONNECT_PATTERN;ftpHelper.loginFtpServer( host, username, password, port, timeout, connectMode );FTPClient ftp = (FTPClient)ftpHelper.getFtpClient();// 切换本地目录log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());ftp.changeWorkingDirectory(remoteDir);
// ftp.cwd(remoteDir);log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());// FTPFile[] files = ftp.listFiles(remoteDir, new CustomFTPFileFilter( downloadRemoteFileName ) );
// log.info("ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) ).length:{}",
// ftp.listFiles(remoteDir, new CustomFTPFileFilter( downloadRemoteFileName ) ).length);long start = System.currentTimeMillis();// 下载文件OutputStream os = null;try{os = new BufferedOutputStream(new FileOutputStream(localFilePath));ftp.enterLocalPassiveMode();// 被动模式 被动模式是“服务器”开放端口给客户端连接;
// ftp.enterLocalActiveMode(); // 主动模式 主动模式传输数据时是“服务器”连接到“客户端”的端口;// 如果使用断点续传,这个必须设置ftp.setFileType( FTP.BINARY_FILE_TYPE);// 解决中文名乱码问题 否则带中文名的文件下载不下来(下载需要)downloadRemoteFilePath = new String(downloadRemoteFilePath.getBytes("UTF-8"),"ISO-8859-1");boolean ret = ftp.retrieveFile(downloadRemoteFilePath, os);Assertions.assertTrue(ret);log.info("File transfer(download) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, downloadRemoteFilePath);log.info(" localFilePath:{}, ", localFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( new File(localFilePath).exists() );Files.delete( new File(localFilePath).toPath() );}catch(Exception e){log.error("download failed dir:{} fileName:{} isCover: {}", remoteDir, localFileName, true, e);}finally {try {os.close();} catch (IOException e) {e.printStackTrace();}if (!FTPReply.isPositiveIntermediate(ftp.getReplyCode())) {ftpHelper.logoutFtpServer();// 包含logout和disconnectlog.error("File transfer failed.");}// 一定要放在关流之后, 下载if (!ftp.completePendingCommand()) {try {ftpHelper.logoutFtpServer();// 包含logout和disconnect} catch (Exception e) {e.printStackTrace();}}}}/*** 文件下载* 普通下载(覆盖),带进度监控* 使用 jsch*/@Test@Disabledpublic void testDownloadWithProgressMonitor() throws SftpException, InterruptedException, JSchException, IOException {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String localFileName = method+"-测试.zip";String localFilePath = localDir+"/"+localFileName;// 单位:毫秒int timeout = 60*1000;// FTP主动模式(port)和被动模式(PASV)String connectMode=FtpKeyValue.DEFAULT_FTP_CONNECT_PATTERN;ftpHelper.loginFtpServer( host, username, password, port, timeout, connectMode );FTPClient ftp = (FTPClient)ftpHelper.getFtpClient();// 切换本地目录log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());ftp.changeWorkingDirectory(remoteDir);
// ftp.cwd(remoteDir);log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());long start = System.currentTimeMillis();// 下载文件OutputStream os = null;InputStream is = null;try{ftp.enterLocalPassiveMode();// 被动模式 被动模式是“服务器”开放端口给客户端连接;
// ftp.enterLocalActiveMode(); // 主动模式 主动模式传输数据时是“服务器”连接到“客户端”的端口;// 如果使用断点续传,这个必须设置
// ftp.setFileType( FTP.BINARY_FILE_TYPE);// int fileSize = ftp.size(remoteFilePath); // 不能用,会一直阻塞FTPFile[] files = ftp.listFiles(remoteDir, new CustomFTPFileFilter( downloadRemoteFileName ) );log.info("ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) ).length:{}",ftp.listFiles(remoteDir, new CustomFTPFileFilter( downloadRemoteFileName ) ).length);long lRemoteSize = files[0].getSize();// 解决中文名乱码问题 否则带中文名的文件下载不下来downloadRemoteFilePath = new String(downloadRemoteFilePath.getBytes("UTF-8"),"ISO-8859-1");// 读远程文件下载is = ftp.retrieveFileStream(downloadRemoteFilePath);os = new BufferedOutputStream(new FileOutputStream(localFilePath));byte[] buffer = new byte[BUFFER_SIZE];int bytesRead;long totalBytesRead = 0;log.info("remote fileSize:{}", lRemoteSize);double percentCompleted = (double) totalBytesRead / lRemoteSize * 100;// 这里日志中显示的百分比精确到2位小数while ((bytesRead = is.read(buffer)) != -1) {os.write(buffer, 0, bytesRead);totalBytesRead += bytesRead;
// double percentCompleted = (double) totalBytesRead / fileSize * 100;
// log.info("Transfer progress: {}", String.format("%.2f%%",percentCompleted) ); // 保留两位时日志打太多了/*%s: 字符串%d: 十进制整数%f: 浮点数%c: 字符%b: 布尔值%n: 换行符*/if ((int)percentCompleted >= (int)totalBytesRead * 100 / lRemoteSize) {}else{percentCompleted = (double) totalBytesRead / lRemoteSize * 100;log.info("Completed {}({}) out of {}.", totalBytesRead, String.format("%.0f%%",percentCompleted), lRemoteSize ); //打印当前进度}}log.info("File transfer(download) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, downloadRemoteFilePath);log.info(" localFilePath:{}, ", localFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( new File(localFilePath).exists() );Files.delete( new File(localFilePath).toPath() );}catch(Exception e){log.error("download failed dir:{} fileName:{} isCover: {}", remoteDir, localFileName, true, e);}finally {try {os.close();} catch (IOException e) {e.printStackTrace();}// 一定要放在关流之后, 下载if (!ftp.completePendingCommand()) {try {ftpHelper.logoutFtpServer();// 包含logout和disconnect} catch (Exception e) {e.printStackTrace();}}}} /*** 文件下载* 带断点续传* 使用 jsch*/@Test@Disabledpublic void testDownloadWithRESUME() throws SftpException, InterruptedException, JSchException, IOException {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String localFileName = method+"-测试.zip";String localFilePath = localDir+"/"+localFileName;// 单位:毫秒int timeout = 60*1000;// FTP主动模式(port)和被动模式(PASV)String connectMode=FtpKeyValue.DEFAULT_FTP_CONNECT_PATTERN;ftpHelper.loginFtpServer( host, username, password, port, timeout, connectMode );FTPClient ftp = (FTPClient)ftpHelper.getFtpClient();// 切换本地目录log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());ftp.changeWorkingDirectory(remoteDir);
// ftp.cwd(remoteDir);log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());File f = new File(localFilePath);// FTPFile[] files = ftp.listFiles(remoteFilePath );FTPFile[] files = ftp.listFiles(remoteDir, new CustomFTPFileFilter( downloadRemoteFileName ) );
// log.info("ftp.listFiles().length:{}", ftp.listFiles().length);
// log.info("ftp.listFiles(remoteDir).length:{}", ftp.listFiles(remoteDir).length);log.info("ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) ).length:{}",ftp.listFiles(remoteDir, new CustomFTPFileFilter( downloadRemoteFileName ) ).length);
// log.info("ftp.listFiles(remoteFilePath).length:{}", ftp.listFiles(remoteFilePath).length);if(files.length != 1){System.out.println("远程文件不唯一");// 一般是不存在return;}long lRemoteSize = files[0].getSize();long start = System.currentTimeMillis();// 本地文件不存在// 下载文件OutputStream os = null;try{// 本地文件存在if(f.exists()){log.info("本地文件大小为:{}, 启动断点续传...", f.length());if(f.length() >= lRemoteSize){log.warn("本地文件大小大于远程文件大小,下载中止");return;}ftp.setRestartOffset(f.length());os = new BufferedOutputStream(new FileOutputStream(localFilePath, true ));}else{log.info("本地没有该文件,普通模式下载开始..." );os = new BufferedOutputStream(new FileOutputStream(localFilePath));}ftp.enterLocalPassiveMode();// 被动模式 被动模式是“服务器”开放端口给客户端连接;
// ftp.enterLocalActiveMode(); // 主动模式 主动模式传输数据时是“服务器”连接到“客户端”的端口;// 如果使用断点续传,这个必须设置ftp.setFileType( FTP.BINARY_FILE_TYPE);// 解决中文名乱码问题 否则带中文名的文件下载不下来(下载需要)downloadRemoteFilePath = new String(downloadRemoteFilePath.getBytes("UTF-8"),"ISO-8859-1");boolean ret = ftp.retrieveFile(downloadRemoteFilePath, os);Assertions.assertTrue(ret);log.info("File transfer(download) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, downloadRemoteFilePath);log.info(" localFilePath:{}, ", localFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( new File(localFilePath).exists() );Files.delete( new File(localFilePath).toPath() );}catch(Exception e){log.error("download failed dir:{} fileName:{} isCover: {}", remoteDir, localFileName, true, e);}finally {try {os.close();} catch (IOException e) {e.printStackTrace();}if (!FTPReply.isPositiveIntermediate(ftp.getReplyCode())) {ftpHelper.logoutFtpServer();// 包含logout和disconnectlog.error("File transfer failed.");}// 一定要放在关流之后, 下载if (!ftp.completePendingCommand()) {try {ftpHelper.logoutFtpServer();// 包含logout和disconnect} catch (Exception e) {e.printStackTrace();}}}} /*** 文件下载* 带进度监控、 带断点续传* 使用 jsch*/@Test@Disabledpublic void testDownloadWithProgerssMonitorWithRESUME() throws SftpException, InterruptedException, JSchException, IOException {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String localFileName = method+"-测试.zip";String localFilePath = localDir+"/"+localFileName;// 单位:毫秒int timeout = 60*1000;// FTP主动模式(port)和被动模式(PASV)String connectMode=FtpKeyValue.DEFAULT_FTP_CONNECT_PATTERN;ftpHelper.loginFtpServer( host, username, password, port, timeout, connectMode );FTPClient ftp = (FTPClient)ftpHelper.getFtpClient();// 切换本地目录log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());ftp.changeWorkingDirectory(remoteDir);
// ftp.cwd(remoteDir);log.info("ftp.lpwd():{}",ftp.printWorkingDirectory());log.info("ftp.pwd():{}",ftp.pwd());File f = new File(localFilePath);FTPFile[] files = ftp.listFiles(remoteDir, new CustomFTPFileFilter( downloadRemoteFileName ) );log.info("ftp.listFiles(remoteDir, new CustomFTPFileFilter( remoteFileName ) ).length:{}",ftp.listFiles(remoteDir, new CustomFTPFileFilter( downloadRemoteFileName ) ).length);if(files.length != 1){System.out.println("远程文件不唯一");// 一般是不存在return;}long lRemoteSize = files[0].getSize();long start = System.currentTimeMillis();// 下载文件OutputStream os = null;InputStream is = null;try{ftp.enterLocalPassiveMode();// 被动模式 被动模式是“服务器”开放端口给客户端连接;
// ftp.enterLocalActiveMode(); // 主动模式 主动模式传输数据时是“服务器”连接到“客户端”的端口;// 如果使用断点续传,这个必须设置ftp.setFileType( FTP.BINARY_FILE_TYPE);byte[] buffer = new byte[BUFFER_SIZE];int bytesRead;long totalBytesRead = 0;log.info("remote lRemoteSize:{}", lRemoteSize);// 本地文件存在if(f.exists()){log.info("本地文件大小为:{}, 启动断点续传...", f.length());if(f.length() >= lRemoteSize){log.warn("本地文件大小大于远程文件大小,下载中止");return;}ftp.setRestartOffset(f.length());os = new BufferedOutputStream(new FileOutputStream(localFilePath, true ));totalBytesRead = f.length();}else{log.info("本地没有该文件,普通模式下载开始..." );os = new BufferedOutputStream(new FileOutputStream(localFilePath));}// 解决中文名乱码问题 否则带中文名的文件下载不下来downloadRemoteFilePath = new String(downloadRemoteFilePath.getBytes("UTF-8"),"ISO-8859-1");// 读远程文件下载is = ftp.retrieveFileStream(downloadRemoteFilePath);double percentCompleted = totalBytesRead * 100 / lRemoteSize;// 这里日志中显示的百分比精确到2位小数while ((bytesRead = is.read(buffer)) != -1) {os.write(buffer, 0, bytesRead);totalBytesRead += bytesRead;/*%s: 字符串%d: 十进制整数%f: 浮点数%c: 字符%b: 布尔值%n: 换行符*/if ((int)percentCompleted >= (int)totalBytesRead * 100 / lRemoteSize) {}else{percentCompleted = (double) totalBytesRead / lRemoteSize * 100;log.info("Completed {}({}) out of {}.", totalBytesRead, String.format("%.0f%%",percentCompleted), lRemoteSize ); //打印当前进度}
// log.info("Transfer progress: {}", String.format("%.2f%%",percentCompleted) ); // 保留两位时日志打太多了}log.info("File transfer(download) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, downloadRemoteFilePath);log.info(" localFilePath:{}, ", localFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( new File(localFilePath).exists() );Files.delete( new File(localFilePath).toPath() );}catch(Exception e){log.error("download failed dir:{} fileName:{} isCover: {}", remoteDir, localFileName, true, e);}finally {try {os.close();} catch (IOException e) {e.printStackTrace();}// 一定要放在关流之后, 下载if (!ftp.completePendingCommand()) {try {ftpHelper.logoutFtpServer();// 包含logout和disconnect} catch (Exception e) {e.printStackTrace();}}}}
}
六、总结
1、第二篇为FTP高级篇,讲FTP上传进度监控、断点续传,FTP下载进度监控、断点续传,单元测试类全是干货,啃完你就会变得很棒~
2、【重点】上传支持断点续传和进度监控的单元测试方法:testUploadWithProgressMonitorWithRESUME
3、【重点】下载支持断点续传和进度监控的单元测试方法:testDownloadWithProgerssMonitorWithRESUME
4、请务必阅读前面的“篇章内容说明”,以便精准命中目标