一、摘要(本系列汇总说明)
- 总纲
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(一)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(二)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(三)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(四)
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(五)【完结篇】
FTP、SFTP上传下传、进度监控、断点续传、连接池封装JAVA一网打尽(六)【汇总篇】
- 篇章内容说明
第一篇:基础篇,讲FTP常规上传下载实现、SFTP常规上传下载实现、单元测试类
第二篇:FTP高级篇,讲FTP上传进度监控、断点续传,FTP下载进度监控、断点续传
第三篇:SFTP高级篇,讲SFTP上传进度监控、断点续传,SFTP下载进度监控、断点续传
第四篇:FTP进阶篇,讲FTP池化处理(连接池封装)
第五篇【完结篇】:SFTP进阶篇,讲SFTP池化处理(连接池封装)
第六篇:汇总篇,包含前面1~5篇所有内容,且增加更高级的相关知识点
- 本篇
本文是SFTP进阶篇,讲SFTP池化处理(连接池封装)
二、环境
- SpringBoot 2.7.18 官方下载地址:SpringBoot 2.7.18
- commons-net-3.10.0.jar 官方下载地址:commons-net-3.10.0.jar
- commons-pool2-2.12.0.jar 官方下载地址:commons-pool2-2.12.0.jar
- jsch-0.1.55.jar 官方下载地址:jsch-0.1.55.jar
- Oracle JDK8u202(Oracle JDK8最后一个非商业版本) 下载地址:Oracle JDK8u202
- FileZilla Client 官方下载地址:FileZilla Client
注意:
- (特别是MacOS用户)FileZilla有MacOS版本,下载客户端是下Client,不是Server(注意一下名字,不要下错了)。
三、POM依赖
该系列文章通用,几篇FTP文章的pom文件都一样
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>person.brickman</groupId><artifactId>ftp</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><httpclient.version>4.5.14</httpclient.version><!-- 工具 --><lombok.version>1.18.32</lombok.version><commons-logging.version>1.3.1</commons-logging.version><commons-lang3.version>3.14.0</commons-lang3.version><commons-io.version>2.15.1</commons-io.version><commons-configuration.version>1.10</commons-configuration.version><commons-net.version>3.10.0</commons-net.version><commons-pool2.version>2.12.0</commons-pool2.version><jsch.version>0.1.55</jsch.version>
<!-- <sshd.version>2.12.1</sshd.version>--><!-- 2.20.1 2.22.2 3.0.0-M2 3.2.5 --><maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version><!-- 3.0.1 2.4 --><maven-source-plugin.version>3.0.1</maven-source-plugin.version><!--忽略本包测试--><maven.test.skip>false</maven.test.skip><skipTests>false</skipTests></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId><scope>provided</scope></dependency><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><!-- 工具 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>commons-net</groupId><artifactId>commons-net</artifactId><version>${commons-net.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>${commons-logging.version}</version></dependency><dependency><groupId>com.jcraft</groupId><artifactId>jsch</artifactId><version>${jsch.version}</version></dependency><!-- 测试相关 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.6</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-source-plugin</artifactId><version>${maven-source-plugin.version}</version><configuration><attach>true</attach></configuration><executions><execution><phase>compile</phase></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version><executions><execution><id>deploy</id><phase>deploy</phase><goals><goal>deploy</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><!-- 跳过失败的单元测试 --><testFailureIgnore>false</testFailureIgnore><skipTests>${skipTests}</skipTests><argLine>${junit.test.params} -Xmx512m -XX:MaxPermSize=256m</argLine></configuration></plugin></plugins></build></project>
四、实现类
1、(常量)接口类:FtpKeyValue
接口类,读者觉得常量类更顺眼也可以改(阿里原装的)
package person.brickman.ftp.consts;/*** alibaba.datax.ftpreader** @author datax*/
public interface FtpKeyValue {/*** FTP 常用键定义*/String PROTOCOL = "protocol";String HOST = "host";String USERNAME = "username";String PASSWORD = "password";String PORT = "port";String TIMEOUT = "timeout";String CONNECTPATTERN = "connectPattern";String PATH = "path";String MAXTRAVERSALLEVEL = "maxTraversalLevel";/*** 默认值定义*/int DEFAULT_FTP_PORT = 21;int DEFAULT_SFTP_PORT = 22;int DEFAULT_TIMEOUT = 60000;int DEFAULT_MAX_TRAVERSAL_LEVEL = 100;String DEFAULT_FTP_CONNECT_PATTERN = "PASV";String CONTROL_ENCODING = "utf8";String NO_SUCH_FILE = "no such file";char C_STAR = '*';String STAR = "*";char C_QUESTION = '?';String QUESTION = "?";String SLASH = "/";String DOT = ".";String DOUBLE_DOT = "..";
}
2、统一抽象类:AbstractFtpHelper
抽象类,不管ftp还是sftp都使用此抽象类,而不是直接操作实现类
package person.brickman.ftp;import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Set;/*** alibaba.datax.ftpreader** @author datax*/
public abstract class AbstractFtpHelper {public abstract void setFtpClient(Object ftpClient);public abstract Object getFtpClient() ;/*** 与ftp服务器建立连接** @param @param host* @param @param username* @param @param password* @param @param port* @param @param timeout* @param @param connectMode PASV PORT* @return void* @throws*/public abstract void loginFtpServer(String host, String username, String password, int port, int timeout, String connectMode) throws InterruptedException;/*** 断开与ftp服务器的连接** @param* @return void* @throws*/public abstract void logoutFtpServer();/*** 判断指定路径是否是目录** @param @param directoryPath* @param @return* @return boolean* @throws*/public abstract boolean isDirExist(String directoryPath);/*** 判断指定路径是否是文件** @param @param filePath* @param @return* @return boolean* @throws*/public abstract boolean isFileExist(String filePath);/*** 判断指定路径是否是软链接** @param @param filePath* @param @return* @return boolean* @throws*/public abstract boolean isSymbolicLink(String filePath);/*** 递归获取指定路径下符合条件的所有文件绝对路径** @param @param directoryPath* @param @param parentLevel 父目录的递归层数(首次为0)* @param @param maxTraversalLevel 允许的最大递归层数* @param @return* @return HashSet<String>* @throws*/public abstract HashSet<String> getAllFilesInDir(String directoryPath, int parentLevel, int maxTraversalLevel);/*** 获取指定路径的输入流** @param @param filePath* @param @return* @return InputStream* @throws*/public abstract InputStream getInputStream(String filePath);/*** 写入指定路径的输出流** @param @param filePath* @param @return* @return InputStream* @throws*/public abstract OutputStream getOutputStream(String filePath);/*** 写入指定路径的输出流** @param @param filePath* @param @param mode OVERWRITE = 0; RESUME = 1; APPEND = 2;* @param @return* @return InputStream* @throws*/public abstract OutputStream getOutputStream(String filePath, int mode);/*** 获取指定路径列表下符合条件的所有文件的绝对路径** @param @param srcPaths 路径列表* @param @param parentLevel 父目录的递归层数(首次为0)* @param @param maxTraversalLevel 允许的最大递归层数* @param @return* @return HashSet<String>* @throws*/public HashSet<String> getAllFilesInDir(List<String> srcPaths, int parentLevel, int maxTraversalLevel) {HashSet<String> sourceAllFiles = new HashSet<String>();if (!srcPaths.isEmpty()) {for (String eachPath : srcPaths) {sourceAllFiles.addAll(getAllFilesInDir(eachPath, parentLevel, maxTraversalLevel));}}return sourceAllFiles;}/*** 创建远程目录* 不支持递归创建, 比如 mkdir -p** @param directoryPath*/public abstract void mkdir(String directoryPath);/*** 创建远程目录* 支持目录递归创建** @param directoryPath*/public abstract void mkDirRecursive(String directoryPath);/*** Q:After I perform a file transfer to the server,* printWorkingDirectory() returns null. A:You need to call* completePendingCommand() after transferring the file. wiki:* http://wiki.apache.org/commons/Net/FrequentlyAskedQuestions*/public abstract void completePendingCommand();/*** 删除文件* warn: 不支持文件夹删除, 比如 rm -rf** @param filesToDelete*/public abstract void deleteFiles(Set<String> filesToDelete);/*** 移动文件* warn: 不支持文件夹删除, 比如 rm -rf** @param filesToMove* @param targetPath*/public abstract void moveFiles(Set<String> filesToMove, String targetPath);
}
3、STP实现类:SftpHelper
FTP上传下载实现类
package person.brickman.ftp;import com.jcraft.jsch.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import person.brickman.ftp.consts.FtpKeyValue;import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;/*** alibaba.datax.ftpreader** @author datax*/
@Slf4j
public class SftpHelper extends AbstractFtpHelper {Session session = null;ChannelSftp channelSftp = null;HashSet<String> sourceFiles = new HashSet<String>();@Overridepublic void setFtpClient(Object channelSftp) {this. channelSftp=(ChannelSftp)channelSftp;}@Overridepublic Object getFtpClient() {return channelSftp;}@Overridepublic void loginFtpServer(String host, String username, String password, int port, int timeout, String connectMode) throws InterruptedException {JSch jsch = new JSch();try {session = jsch.getSession(username, host, port);
// Thread.sleep(3*1000);// 根据用户名,主机ip,端口获取一个Session对象// 如果服务器连接不上,则抛出异常if (session == null) {throw new RuntimeException("session is null,无法通过sftp与服务器建立链接,请检查主机名和用户名是否正确." );}// 设置密码session.setPassword(password);Properties config = new Properties();config.put("StrictHostKeyChecking" , "no" );// 为Session对象设置propertiessession.setConfig(config);// 设置timeout时间session.setTimeout(timeout);// 通过Session建立链接session.connect( );// 打开SFTP通道channelSftp = (ChannelSftp) session.openChannel("sftp" );
// channelSftp.// 建立SFTP通道的连接channelSftp.connect( 5*1000);//设置命令传输编码/// String fileEncoding = System.getProperty("file.encoding");// channelSftp.setFilenameEncoding(fileEncoding);} catch (JSchException e) {e.printStackTrace();if (null != e.getCause()) {String cause = e.getCause().toString();String unknownHostException = "java.net.UnknownHostException: " + host;String illegalArgumentException = "java.lang.IllegalArgumentException: port out of range:" + port;String wrongPort = "java.net.ConnectException: Connection refused";if (unknownHostException.equals(cause)) {String message = String.format("请确认ftp服务器地址是否正确,无法连接到地址为: [%s] 的ftp服务器" , host);log.error(message);throw new RuntimeException(message);} else if (illegalArgumentException.equals(cause) || wrongPort.equals(cause)) {String message = String.format("请确认连接ftp服务器端口是否正确,错误的端口: [%s] " , port);log.error(message);throw new RuntimeException(message);}} else {if ("Auth fail".equals(e.getMessage())) {String message = String.format("与ftp服务器建立连接失败,请检查用户名和密码是否正确: [%s]" , "message:host =" + host + ",username = " + username + ",port =" + port);log.error(message);throw new RuntimeException(message);} else {String message = String.format("与ftp服务器建立连接失败 : [%s]" , "message:host =" + host + ",username = " + username + ",port =" + port);log.error(message);throw new RuntimeException(message);}}}}@Overridepublic void logoutFtpServer() {if (channelSftp != null) {channelSftp.disconnect();}if (session != null) {session.disconnect();}}@Overridepublic boolean isDirExist(String directoryPath) {try {SftpATTRS sftpATTRS = channelSftp.lstat(directoryPath);return sftpATTRS.isDir();} catch (SftpException e) {if (e.getMessage().toLowerCase().equals(FtpKeyValue.NO_SUCH_FILE)) {String message = String.format("请确认您的配置项path:[%s]存在,且配置的用户有权限读取" , directoryPath);log.error(message);throw new RuntimeException(message);}String message = String.format("进入目录:[%s]时发生I/O异常,请确认与ftp服务器的连接正常" , directoryPath);log.error(message);throw new RuntimeException(message);}}@Overridepublic boolean isFileExist(String filePath) {boolean isExitFlag = false;try {SftpATTRS sftpATTRS = channelSftp.lstat(filePath);if (sftpATTRS.getSize() >= 0) {isExitFlag = true;}} catch (SftpException e) {if (e.getMessage().toLowerCase().equals(FtpKeyValue.NO_SUCH_FILE)) {String message = String.format("请确认您的配置项path:[%s]存在,且配置的用户有权限读取" , filePath);log.error(message);throw new RuntimeException(message);} else {String message = String.format("获取文件:[%s] 属性时发生I/O异常,请确认与ftp服务器的连接正常" , filePath);log.error(message);throw new RuntimeException(message);}}return isExitFlag;}@Overridepublic boolean isSymbolicLink(String filePath) {try {SftpATTRS sftpATTRS = channelSftp.lstat(filePath);return sftpATTRS.isLink();} catch (SftpException e) {if (e.getMessage().toLowerCase().equals(FtpKeyValue.NO_SUCH_FILE)) {String message = String.format("请确认您的配置项path:[%s]存在,且配置的用户有权限读取" , filePath);log.error(message);throw new RuntimeException(message);} else {String message = String.format("获取文件:[%s] 属性时发生I/O异常,请确认与ftp服务器的连接正常" , filePath);log.error(message);throw new RuntimeException(message);}}}@Overridepublic HashSet<String> getAllFilesInDir(String directoryPath, int parentLevel, int maxTraversalLevel) {if (parentLevel < maxTraversalLevel) {// 父级目录,以'/'结尾String parentPath;int pathLen = directoryPath.length();// *和?的限制if (directoryPath.contains(FtpKeyValue.STAR) || directoryPath.contains(FtpKeyValue.QUESTION)) {// path是正则表达式String subPath = UnstructuredStorageReaderUtil.getRegexPathParentPath(directoryPath);if (isDirExist(subPath)) {parentPath = subPath;} else {String message = String.format("不能进入目录:[%s]," + "请确认您的配置项path:[%s]存在,且配置的用户有权限进入" , subPath, directoryPath);log.error(message);throw new RuntimeException(message);}} else if (isDirExist(directoryPath)) {// path是目录if (directoryPath.charAt(pathLen - 1) == File.separatorChar) {parentPath = directoryPath;} else {parentPath = directoryPath + File.separatorChar;}} else if (isSymbolicLink(directoryPath)) {//path是链接文件String message = String.format("文件:[%s]是链接文件,当前不支持链接文件的读取" , directoryPath);log.error(message);throw new RuntimeException(message);} else if (isFileExist(directoryPath)) {// path指向具体文件sourceFiles.add(directoryPath);return sourceFiles;} else {String message = String.format("请确认您的配置项path:[%s]存在,且配置的用户有权限读取" , directoryPath);log.error(message);throw new RuntimeException(message);}try {Vector vector = channelSftp.ls(directoryPath);for (int i = 0; i < vector.size(); i++) {ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i);String strName = le.getFilename();String filePath = parentPath + strName;if (isDirExist(filePath)) {// 是子目录if (!(strName.equals(FtpKeyValue.DOT) || strName.equals(FtpKeyValue.DOUBLE_DOT))) {//递归处理getAllFilesInDir(filePath, parentLevel + 1, maxTraversalLevel);}} else if (isSymbolicLink(filePath)) {//是链接文件String message = String.format("文件:[%s]是链接文件,当前不支持链接文件的读取" , filePath);log.error(message);throw new RuntimeException(message);} else if (isFileExist(filePath)) {// 是文件sourceFiles.add(filePath);} else {String message = String.format("请确认path:[%s]存在,且配置的用户有权限读取" , filePath);log.error(message);throw new RuntimeException(message);}}} catch (SftpException e) {String message = String.format("获取path:[%s] 下文件列表时发生I/O异常,请确认与ftp服务器的连接正常" , directoryPath);log.error(message);throw new RuntimeException(message);}return sourceFiles;} else {//超出最大递归层数String message = String.format("获取path:[%s] 下文件列表时超出最大层数,请确认路径[%s]下不存在软连接文件" , directoryPath, directoryPath);log.error(message);throw new RuntimeException(message);}}@Overridepublic InputStream getInputStream(String filePath) {try {return channelSftp.get(filePath);} catch (SftpException e) {String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取" , filePath, filePath);log.error(message);throw new RuntimeException(message);}}@Overridepublic OutputStream getOutputStream(String filePath) {return getOutputStream(filePath, ChannelSftp.APPEND);}@Overridepublic OutputStream getOutputStream(String filePath, int mode) {try {this.printWorkingDirectory();String parentDir = filePath.substring(0, filePath.lastIndexOf(File.separatorChar));this.channelSftp.cd(parentDir);this.printWorkingDirectory();OutputStream writeOutputStream = this.channelSftp.put(filePath, mode);String message = String.format("打开FTP文件[%s]获取写出流时出错,请确认文件%s有权限创建,有权限写出等" , filePath, filePath);if (null == writeOutputStream) {throw new RuntimeException(message);}return writeOutputStream;} catch (SftpException e) {String message = String.format("写出文件[%s] 时出错,请确认文件%s有权限写出, errorMessage:%s" , filePath, filePath, e.getMessage());log.error(message);throw new RuntimeException(message);}}@Overridepublic void mkdir(String directoryPath) {boolean isDirExist = false;try {this.printWorkingDirectory();SftpATTRS sftpATTRS = this.channelSftp.lstat(directoryPath);isDirExist = sftpATTRS.isDir();} catch (SftpException e) {if (e.getMessage().toLowerCase().equals(FtpKeyValue.NO_SUCH_FILE)) {log.warn(String.format("您的配置项path:[%s]不存在,将尝试进行目录创建, errorMessage:%s" , directoryPath, e.getMessage()), e);isDirExist = false;}}if (!isDirExist) {try {// warn 检查mkdir -pthis.channelSftp.mkdir(directoryPath);} catch (SftpException e) {String message = String.format("创建目录:%s时发生I/O异常,请确认与ftp服务器的连接正常,拥有目录创建权限, errorMessage:%s" , directoryPath, e.getMessage());log.error(message, e);throw new RuntimeException(message);}}}@Overridepublic void mkDirRecursive(String directoryPath) {boolean isDirExist = false;try {this.printWorkingDirectory();SftpATTRS sftpATTRS = this.channelSftp.lstat(directoryPath);isDirExist = sftpATTRS.isDir();} catch (SftpException e) {if (e.getMessage().toLowerCase().equals(FtpKeyValue.NO_SUCH_FILE)) {log.warn(String.format("您的配置项path:[%s]不存在,将尝试进行目录创建, errorMessage:%s" , directoryPath, e.getMessage()), e);isDirExist = false;}}if (!isDirExist) {StringBuilder dirPath = new StringBuilder();dirPath.append(FtpKeyValue.SLASH);String[] dirSplit = StringUtils.split(directoryPath, FtpKeyValue.SLASH);try {// ftp server不支持递归创建目录,只能一级一级创建for (String dirName : dirSplit) {dirPath.append(dirName);mkDirSingleHierarchy(dirPath.toString());dirPath.append(FtpKeyValue.SLASH);}} catch (SftpException e) {String message = String.format("创建目录:%s时发生I/O异常,请确认与ftp服务器的连接正常,拥有目录创建权限, errorMessage:%s" , directoryPath, e.getMessage());log.error(message, e);throw new RuntimeException(message);}}}@Overridepublic void completePendingCommand() {}@Overridepublic void deleteFiles(Set<String> filesToDelete) {String eachFile = null;try {this.printWorkingDirectory();for (String each : filesToDelete) {log.info(String.format("delete file [%s]." , each));eachFile = each;this.channelSftp.rm(each);}} catch (SftpException e) {String message = String.format("删除文件:[%s] 时发生异常,请确认指定文件有删除权限,以及网络交互正常, errorMessage:%s" , eachFile, e.getMessage());log.error(message);throw new RuntimeException(message);}}@Overridepublic void moveFiles(Set<String> filesToMove, String targetPath) {if (StringUtils.isBlank(targetPath)) {throw new RuntimeException("目标目录路径为空!" );}String eachFile = null;try {this.printWorkingDirectory();// 创建目录mkdir(targetPath);for (String each : filesToMove) {log.info(String.format("rename file [%s]." , each));eachFile = each;String targetName = String.format("%s%s" , targetPath.endsWith(File.separator) ?targetPath.substring(0, targetPath.length() - 1) : targetPath, each.substring(each.lastIndexOf(File.separator)));this.channelSftp.rename(each, targetPath);}} catch (SftpException e) {String message = String.format("移动文件:[%s] 时发生异常,请确认指定文件有删除权限,以及网络交互正常, errorMessage:%s" , eachFile, e.getMessage());log.error(message);throw new RuntimeException(message);}}public boolean mkDirSingleHierarchy(String directoryPath) throws SftpException {boolean isDirExist = false;try {SftpATTRS sftpATTRS = this.channelSftp.lstat(directoryPath);isDirExist = sftpATTRS.isDir();} catch (SftpException e) {if (!isDirExist) {log.info(String.format("正在逐级创建目录 [%s]" , directoryPath));this.channelSftp.mkdir(directoryPath);return true;}}if (!isDirExist) {log.info(String.format("正在逐级创建目录 [%s]" , directoryPath));this.channelSftp.mkdir(directoryPath);}return true;}private void printWorkingDirectory() {try {log.info(String.format("current working directory:%s" , this.channelSftp.pwd()));} catch (Exception e) {log.warn(String.format("printWorkingDirectory error:%s" , e.getMessage()));}}
}
4、SFTP进度监控实现类:FileProgressMonitor
- FTP上传下载进度监控实现类
- 此类可以日志中打印上传/下载进度
- 日志打印的精度可通过调整代码中被除数的陪数控制
- 实际应用中如果是web应用可通过session变量实现前台页面进度展示
- 前台进度更新精度实现同程序日志进度打印精度控制逻辑
package person.brickman.ftp;import com.jcraft.jsch.SftpProgressMonitor;
import lombok.extern.slf4j.Slf4j;
/*** @Description: sftp 上传下载进度监控* @Author brickman* @CreateDate: 2025/1/2 20:30* @Version: 1.0*/
@Slf4j
public class FileProgressMonitor implements SftpProgressMonitor {private long count = 0; //当前接收的总字节数private long max = 0; /* 最终文件大小 */private long percent = -1;//进度/*** 当每次传输了一个数据块后,调用count方法,count方法的参数为这一次传输的数据块大小* 大** 这里显示的百分比是整数*/@Overridepublic boolean count(long count) {this.count += count;if (percent >= this.count * 100 / max) {return true;}percent = this.count * 100 / max;log.info("Completed {}({}%) out of {}.", this.count, percent, max ); //打印当前进度return true;}/*** 大大* 当传输结束时,调用end方法* 大*/@Overridepublic void end() {log.info("Transferring done.");}/*** 当文件开始传输时,调用init方法* 大*/@Overridepublic void init(int op, String src, String dest, long max) {log.info("Transferring begin. ");this.max = max;this.count = 0;this.percent = -1;}
}
5、SFTP客户端工厂类:SftpClientFactory
package person.brickman.ftp.pool;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.pool2.DestroyMode;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;import java.net.UnknownHostException;
/*** @Description: ftp 池化封装,工厂类* @Author: brickman* @CreateDate: 2025/1/2 20:30* @Version: 1.0*/
@Slf4j
public class FtpClientFactory implements PooledObjectFactory<FTPClient> {private String host;private int port;private String user;private String password;/** FTP主动模式(port)和被动模式(PASV) default:PASV */private String connectMode="PASV";public FtpClientFactory(String host, int port, String user, String password) {this.host = host;this.port = port;this.user = user;this.password = password;}public FtpClientFactory(String host, int port, String user, String password, String connectMode) {this.host = host;this.port = port;this.user = user;this.password = password;this.connectMode= connectMode;}@Overridepublic void activateObject(PooledObject<FTPClient> pooledObject) throws Exception {if(pooledObject.getObject().isAvailable()){pooledObject.getObject().sendNoOp();}else{pooledObject.getObject().disconnect();}}@Overridepublic void destroyObject(PooledObject<FTPClient> pooledObject) throws Exception {if (pooledObject.getObject() != null) {pooledObject.getObject().disconnect();}}@Overridepublic void destroyObject(PooledObject<FTPClient> p, DestroyMode destroyMode) throws Exception {PooledObjectFactory.super.destroyObject(p, destroyMode);}@Overridepublic PooledObject<FTPClient> makeObject() throws Exception {FTPClient ftpClient = new FTPClient();try {// 连接ftpClient.connect(host, port);// 登录ftpClient.login(user, password);// 不需要写死ftp server的OS TYPE,FTPClient getSystemType()方法会自动识别/// ftpClient.configure(new FTPClientConfig(FTPClientConfig.SYST_UNIX));ftpClient.setConnectTimeout(15*1000);// 0 infinite 无穷大, 当前设置15秒ftpClient.setDataTimeout(15*1000);if ("PASV".equals(connectMode)) {ftpClient.enterRemotePassiveMode();ftpClient.enterLocalPassiveMode();} else if ("PORT".equals(connectMode)) {ftpClient.enterLocalActiveMode();/// ftpClient.enterRemoteActiveMode(host, port);}int reply = ftpClient.getReplyCode();if (!FTPReply.isPositiveCompletion(reply)) {ftpClient.disconnect();String message = String.format("与ftp服务器建立连接失败,请检查用户名和密码是否正确: [%s]","message:host =" + host + ",username = " + user + ",port =" + port);log.error(message);throw new RuntimeException(message);}//设置命令传输编码String fileEncoding = System.getProperty("file.encoding");ftpClient.setControlEncoding(fileEncoding);} catch (UnknownHostException e) {String message = String.format("请确认ftp服务器地址是否正确,无法连接到地址为: [%s] 的ftp服务器", host);log.error(message);throw new RuntimeException(message);} catch (IllegalArgumentException e) {String message = String.format("请确认连接ftp服务器端口是否正确,错误的端口: [%s] ", port);log.error(message);throw new RuntimeException(message);} catch (Exception e) {String message = String.format("与ftp服务器建立连接失败 : [%s]","message:host =" + host + ",username = " + user + ",port =" + port);log.error(message);throw new RuntimeException(message);}return new DefaultPooledObject<FTPClient>(ftpClient);}@Overridepublic void passivateObject(PooledObject<FTPClient> pooledObject) throws Exception {// 可不使用钝化方法就不用实现
// pooledObject.getObject().exit();}@Overridepublic boolean validateObject(PooledObject<FTPClient> pooledObject) {return pooledObject.getObject().isConnected();}
}
6、SFTP连接池类:SftpClientPool
package person.brickman.ftp.pool;import com.jcraft.jsch.ChannelSftp;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import person.brickman.ftp.utils.EncryptUtil;/*** @Description: sftp 池化封装,池访问类* @Author: brickman* @CreateDate: 2025/1/2 20:30* @Version: 1.0*/
public class SftpClientPool {private GenericObjectPool<ChannelSftp> pool;public SftpClientPool(String host, int port, String user, String password) {GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();poolConfig.setMaxTotal(50); // 最大连接数poolConfig.setMaxIdle(20); // 最大空闲连接数poolConfig.setMinIdle(10); // 最小空闲连接数pool = new GenericObjectPool<>(new SftpClientFactory(host, port, user, password), poolConfig);}public SftpClientPool( Boolean configDecrypt, String configDecryptKey,String host, int port, String user, String password) throws Exception {GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();poolConfig.setMaxTotal(50); // 最大连接数poolConfig.setMaxIdle(20); // 最大空闲连接数poolConfig.setMinIdle(10); // 最小空闲连接数if(configDecrypt){user = EncryptUtil.aesDecrypt( user, configDecryptKey );password = EncryptUtil.aesDecrypt( password, configDecryptKey );}pool = new GenericObjectPool<>(new SftpClientFactory(host, port, user, password), poolConfig);}public ChannelSftp getSftpClient() throws Exception {return pool.borrowObject();}public void returnSftpClient(ChannelSftp sftpClient) {pool.returnObject(sftpClient);}}
五、单元测试类
1、SFTP单元测试类:SftpClientPoolTest
使用springboot自带junit5实现
testUpload 上传带池
testUploadWithProgressMonitorWithRESUME 带池上传支持断点续传和进度监控
testDownload 下载带池
testDownloadWithProgerssMonitorWithRESUME 带池下载支持断点续传和进度监控
说明:
读者本地运行前先修改单元测试类中的常量:ip、port、username、password、路径、文件名等
package persion.brickman.ftp;import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.SftpProgressMonitor;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.*;
import org.springframework.boot.test.context.SpringBootTest;
import person.brickman.ftp.AbstractFtpHelper;
import person.brickman.ftp.FileProgressMonitor;
import person.brickman.ftp.SftpHelper;
import person.brickman.ftp.pool.SftpClientPool;import java.io.File;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Set;/*** @Description: sftpclient 连接池测试** 同:SftpClientPoolTest,只是换了种写法,一个是在每个方法执行前都读配置,一个是所有方法执行前读配置* 实际两种都只创建了一次池,不过是创建的时机不一样** 上传:含 进度监控、断点续传* 下载:含 进度监控、断点续传** @Author: brickman* @CreateDate: 2025/1/2 20:30* @Version: 1.0*/
@Slf4j
@SpringBootTest
public class SftpClientPoolTest {private static Boolean configDecrypt;private static String configDecryptKey;private static SftpClientPool pool;private static String host = "172.16.0.232";private static int port = 22;private static String username = "root";private static String password = "root02027";private static String localDir = "/Users/brickman/tmp";private static String uploadLocalFileName = "工作JUDE_v5测试.zip";private static String uploadLocalFilePath = localDir+"/"+uploadLocalFileName;private static String remoteDir = "/root/tmp/sftpdir";private static String downloadRemoteFileName = "工作JUDE_v5测试.zip";private static String downloadRemoteFilePath = remoteDir+"/"+downloadRemoteFileName;@BeforeAllpublic static void setUpBeforeClass() throws Exception {System.out.println("BeforeClass: 在所有测试方法执行前只执行一次");if(pool==null){pool = new SftpClientPool(host, port, username, password );}}@BeforeEachpublic void setUp() throws Exception {System.out.println("Before: 在每个测试方法执行前都会执行");}/*** sftp上传功能单元测试,* 本地路径:uploadLocalFilePath eg: "/Users/brickman/tmp/工作JUDE_v5测试.zip";* 远程路径:remoteFilePath eg:"/root/tmp/sftpdir/"+ unitTestMethodName+"-测试.zip";* @throws Exception*/@Test@Disabledpublic void testUpload( ) throws Exception {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String remoteFileName = method+"-测试.zip";String remoteFilePath = remoteDir+"/"+remoteFileName;long start = System.currentTimeMillis();ChannelSftp sftp = pool.getSftpClient();// 用于调用统一封装的方法AbstractFtpHelper ftpHelper = new SftpHelper();ftpHelper.setFtpClient(sftp);try {
// sftp.lcd("");
// sftp.cd("");// 使用sftp客户端执行操作, 默认模式:OVERWRITE(覆盖)sftp.put( uploadLocalFilePath, remoteFilePath );log.info("File transfer(upload) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> ftp://{}{}, ", end-start, host, remoteFilePath);log.info(" localFilePath:{}, ", uploadLocalFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( ftpHelper.isFileExist(remoteFilePath) );Set<String> set = new HashSet<String>();set.add(remoteFilePath);ftpHelper.deleteFiles(set);}catch (Exception e){log.error("文件上传失败 : ", e);throw e;} finally {pool.returnSftpClient(sftp); // 归还到对象池}}/*** sftp上传功能单元测试,* 本地路径:uploadLocalFilePath eg: "/Users/brickman/tmp/工作JUDE_v5测试.zip";* 远程路径:remoteFilePath eg:"/root/tmp/sftpdir/"+ unitTestMethodName+"-测试.zip";* @throws Exception*/@Test
// @Disabledpublic void testUploadWithProgressMonitorWithRESUME2( ) throws Exception {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String remoteFileName = method+"-测试.zip";String remoteFilePath = remoteDir+"/"+remoteFileName;long start = System.currentTimeMillis();ChannelSftp sftp = pool.getSftpClient();// 使用实现了SftpProgressMonitor接口的monitor对象来监控文件传输的进度SftpProgressMonitor monitor = new FileProgressMonitor();// 用于调用统一封装的方法AbstractFtpHelper ftpHelper = new SftpHelper();ftpHelper.setFtpClient(sftp);try {log.info("sftp.lpwd():{}",sftp.lpwd());log.info("sftp.pwd():{}",sftp.pwd());// 切换到远程目录
// sftpChannel.cd(remoteDir);sftp.cd(remoteDir);log.info("sftp.lpwd():{}",sftp.lpwd());log.info("sftp.pwd():{}",sftp.pwd());// 上传文件
// sftp.put(localFilePath, remoteFileName); // 上传到远程目录并重命名为 remoteFileName
// /* 上传到远程目录并重命名为remoteFileName, 带进度 */
// sftp.put(localFilePath, remoteFileName, monitor);// mode(mode可选值为:ChannelSftp.OVERWRITE,ChannelSftp.RESUME,ChannelSftp.APPEND)/* 上传到远程目录并重命名为remoteFileName, 带进度、断点续传 */sftp.put(uploadLocalFilePath, remoteFileName, monitor, ChannelSftp.RESUME );log.info("File transfer(upload) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> sftp://{}{}, ", end-start, host, remoteFilePath);log.info(" localFilePath:{}, ", uploadLocalFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( ftpHelper.isFileExist(remoteFilePath) );Set<String> set = new HashSet<String>();set.add(remoteFilePath);ftpHelper.deleteFiles(set);} catch (Exception e){log.error("文件上传失败 : ", e);throw e;}finally {pool.returnSftpClient(sftp); // 归还到对象池}}/*** 使用 jsch* sftp文件下载,普通下载(覆盖)* 远程路径:downloadRemoteFilePath eg:"/root/tmp/sftpdir/工作JUDE_v5测试.zip";* 本地路径:localFilePath eg: "/Users/brickman/tmp/"+ unitTestMethodName+"-测试.zip";*/@Test@Disabledpublic void testDownload() throws Exception {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String localFileName = method+"-测试.zip";String localFilePath = localDir+"/"+localFileName;long start = System.currentTimeMillis();ChannelSftp sftp = pool.getSftpClient();// 用于调用统一封装的方法try{// 切换本地目录log.info("sftp.lpwd():{}",sftp.lpwd());log.info("sftp.pwd():{}",sftp.pwd());sftp.lcd(localDir);log.info("sftp.lpwd():{}",sftp.lpwd());log.info("sftp.pwd():{}",sftp.pwd());// 文件传输模式为mode(mode可选值为:ChannelSftp.OVERWRITE,ChannelSftp.RESUME,ChannelSftp.APPEND)// 下载文件
// sftp.get( remoteFilePath ); // 这个并不是下载到默认目录,而是返回一个流
// sftp.get(remoteFilePath, monitor, mode);sftp.get(downloadRemoteFilePath, localFileName); // 下载到本地目录并重命名为 localFileName
// sftp.get(remoteFilePath, localFileName, monitor, mode);log.info("File transfer(download) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> sftp://{}{}, ", end-start, host, downloadRemoteFilePath);log.info(" localFilePath:{}, ", localFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( new File(localFilePath).exists() );Files.delete( new File(localFilePath).toPath() );}catch (Exception e){log.error("文件下载失败 : ", e);throw e;}finally {pool.returnSftpClient(sftp); // 归还到对象池}}/*** 文件下载* 使用 jsch* 带进度监控,带断点续传* 远程路径:downloadRemoteFilePath eg:"/root/tmp/sftpdir/工作JUDE_v5测试.zip";* 本地路径:localFilePath eg: "/Users/brickman/tmp/"+ unitTestMethodName+"-测试.zip";*/@Testpublic void testDownloadWithProgressMonitorWithRESUME2() throws Exception {String method = Thread.currentThread().getStackTrace()[1].getMethodName();String localFileName = method+"-测试.zip";String localFilePath = localDir+"/"+localFileName;long start = System.currentTimeMillis();ChannelSftp sftp = pool.getSftpClient();// 使用实现了SftpProgressMonitor接口的monitor对象来监控文件传输的进度SftpProgressMonitor monitor = new FileProgressMonitor();try{// 切换本地目录log.info("sftp.lpwd():{}",sftp.lpwd());log.info("sftp.pwd():{}",sftp.pwd());sftp.lcd(localDir);log.info("sftp.lpwd():{}",sftp.lpwd());log.info("sftp.pwd():{}",sftp.pwd());// 文件传输模式为mode(mode可选值为:ChannelSftp.OVERWRITE,ChannelSftp.RESUME,ChannelSftp.APPEND)/* 下载到本地目录并重命名为 localFileName , 带进度监控、断点续传 */sftp.get(downloadRemoteFilePath, localFileName, monitor, ChannelSftp.RESUME);log.info("File transfer(download) completed successfully.");long end = System.currentTimeMillis();log.info(" time cost: {} ms, req url ====> sftp://{}{}, ", end-start, host, downloadRemoteFilePath);log.info(" localFilePath:{}, ", localFilePath);// 校验并删除(单元测试不留痕)Assertions.assertTrue( new File(localFilePath).exists() );Files.delete( new File(localFilePath).toPath() );}catch (Exception e){log.error("文件下载失败 : ", e);throw e;}finally {pool.returnSftpClient(sftp); // 归还到对象池}}
}
六、总结
1、第五篇为SFTP进阶篇,讲SFTP池化处理(SFTP客户端连接池封装),单元测试类全是干货
2、【重点】带池上传支持断点续传和进度监控的单元测试方法:testUploadWithProgressMonitorWithRESUME
3、【重点】带池下载支持断点续传和进度监控的单元测试方法:testDownloadWithProgerssMonitorWithRESUME
4、请务必阅读前面的“篇章内容说明”,以便精准命中读者需求