我的需求是将ftp上面的文件进行下载入库,文件大概有两万多个文本,大概是36G,六张表总共2亿数据量。
过程中遇到的错误
Connection closed without indication. 错误1
421 Could not create socket . 错误2
原因:应该是下载文件数量比较多,下载一定数量以后网络不稳定,这个时候就需要重新连一下ftp
ftpClient.listFiles反应很慢
原因是因为默认缓冲区大小是1024,也就是1K,当然慢了,在调用上传API之前重新修改以下默认设置即可,如将缓冲区改为10M
ftpClient.setBufferSize(1024 * 1024 * 10)
FTP断点续传代码(公司ftp不支持多线程下载文件,可以设置多线程进行文件下载)
package sample.util.xxzf.sftp;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Properties;@Slf4j
public class FTPUtil {/*** FTP地址**/private static String ftpHost;/*** FTP端口**/private static int ftpPort;/*** FTP用户名**/private static String ftpUsername;/*** FTP密码**/private static String ftpPassword;/*** FTP协议里面,规定文件名编码为iso-8859-1**/private static String serverCharset = "ISO-8859-1";/*** UTF-8字符编码**/private static final String CHARSET_UTF8 = "UTF-8";/*** 设置缓冲区大小4M**/private static final int BUFFER_SIZE = 1024 * 1024 * 4;/** 五分钟的毫秒数 */private static final long TEN_MINUTE = 5 * 600 * 1000L;public FTPClient ftpClient = null;/*** 加载ftp配置信息*/static {try {Properties properties = new Properties();InputStream in = FTPUtil.class.getClassLoader().getResourceAsStream( "db.properties" );properties.load( in );ftpHost = properties.getProperty( "ftp_hostname" );String port = properties.getProperty( "ftp_port" );if (port != null && !"".equals( port )) {ftpPort = Integer.valueOf( port );}ftpUsername = properties.getProperty( "ftp_username" );ftpPassword = properties.getProperty( "ftp_password" );in.close();} catch (Exception e) {e.printStackTrace();}}/*** 下载整个文件夹** @param ftpPath /usr/local/tmp* @param localPath F:/bak/v1* @return*/public static boolean downloadFolder(String ftpPath, String localPath) {FTPClient ftpClient = null;try {// build clientftpClient = buildClient();File savePathFile = new File( localPath );if (!savePathFile.exists()) {savePathFile.mkdirs();}downloadDir(ftpClient, ftpPath, localPath);return true;} catch (Exception e) {e.printStackTrace();log.error( "ftp download fail:" + e.getMessage() );} finally {close( ftpClient );}return false;}/*** 构造FTPClient** @return*/private static FTPClient buildClient() throws Exception {FTPClient ftpClient = new FTPClient();if (null != ftpClient && ftpClient.isConnected() && ftpClient.isAvailable()) {// 防止假卡死ftpClient.enterLocalPassiveMode();}ftpClient = new FTPClient();ftpClient.setControlEncoding( "GBK" );try {// 连接FTP服务器ftpClient.connect( ftpHost, ftpPort );// 登录FTP服务器ftpClient.login( ftpUsername, ftpPassword );// 是否登录成功if (!FTPReply.isPositiveCompletion( ftpClient.getReplyCode() )) {// 登录失败log.info( "用户名或密码错误,登录FTP服务器失败:" + ftpHost + ":" + ftpPort );// 关闭连接close( ftpClient );} else {// 登录成功log.info( "登录FTP服务器成功:" + ftpHost + ":" + ftpPort );//设置被动模式//ftpClient.enterLocalPassiveMode();// 设置文件类型ftpClient.setFileType( FTP.BINARY_FILE_TYPE );// 防止假卡死ftpClient.enterLocalPassiveMode();// 中文支持ftpClient.setControlEncoding( "GBK" );// 服务器获取自身Ip地址和提交的host进行匹配ftpClient.setRemoteVerificationEnabled( false );// 设置连接超时时间ftpClient.setConnectTimeout( 60 * 10000 );// 设置数据传输超时时间ftpClient.setDataTimeout( 2 * 60 * 10000 );// 设置缓冲大小ftpClient.setReceiveBufferSize( 10240 * 10240 );ftpClient.setBufferSize( 10240 * 10240 );}} catch (Exception e) {log.info( "无法登录FTP服务器:" + ftpHost + ":" + ftpPort );e.printStackTrace();}return ftpClient;}/*** 关闭FTP客户端** @param ftpClient*/private static void close(FTPClient ftpClient) {if (ftpClient != null) {try {ftpClient.logout();ftpClient.disconnect();} catch (IOException e) {e.printStackTrace();log.error( "ftp logout fail:" + e.getMessage() );}}}/*** 功能描述: 下载目录下所有文件 ** @param pathname* FTP服务器文件目录 ** @param localpath* 下载后的文件路径 * void* @param*/public static boolean downloadDir(FTPClient ftpClient, String pathname, String localpath) {boolean flag = false;int count=0;try {// 切换FTP目录ftpClient.changeWorkingDirectory(pathname);FTPFile[] ftpFiles = ftpClient.listFiles();for (FTPFile file : ftpFiles) {if (".".equals(file.getName()) || "..".equals(file.getName())) {continue;}if (count==1000){//当文件下载到一定数量以后就会报错,reponse 421 received closed connection 或者 出现 connection closed without indicationftpClient = buildClient();ftpClient.changeWorkingDirectory(pathname);count=0;}count++;FileOutputStream os = null;long size = file.getSize();File localFile = new File(localpath + "/" + file.getName());if (localFile.length() == size) {log.info("文件{}已下载完成", localFile.getName());continue;}if (localFile != null && localFile.exists() && localFile.isFile() && localFile.length() > 0 && localFile.length() < size) {// 需要断点续传os = new FileOutputStream(localFile, true);} else {if (!checkFile(localFile)) {return false;}os = new FileOutputStream(localFile);}log.info("正在下载文件:{},总大小:{}", file.getName(), size);long start = System.currentTimeMillis();try {InputStream in = null;try {byte[] bytes = new byte[1024 * 32];long step = size /100;long process = 0L;long localSize = localFile.length();if (localSize > size) {log.error("本地文件大于服务器文件,终止下载");checkFile(localFile);return false;}if (localSize > 0) {ftpClient.setRestartOffset(localSize);}int c;in = ftpClient.retrieveFileStream(file.getName());if (in == null) {log.info("连接异常,将退出登录");try {ftpClient.logout();} catch (Exception e) {log.debug("关闭连接错误", e); // 可以忽略的错误}try {ftpClient.disconnect();} catch (Exception e) {log.debug("关闭连接错误", e); // 可以忽略的错误}ftpClient = null;log.info("退出登录成功");return false;}while((c = in.read(bytes))!= -1){os.write(bytes, 0, c);localSize += c;long nowProcess=0;if (step!=0){nowProcess = localSize /step;}if(size > 50000000 && nowProcess > process){ // 大于50兆才显示进度process = nowProcess;log.info("{}%", process);if (System.currentTimeMillis() - start > TEN_MINUTE) { // 大于指定时间log.info("时间已到,未下载部分将在下次任务中下载");try {ftpClient.logout();} catch (Exception e) {log.debug("关闭连接错误", e); // 可以忽略的错误}try {ftpClient.disconnect();} catch (Exception e) {log.debug("关闭连接错误", e); // 可以忽略的错误}return false;}}}log.info("文件下载完成到:{}", localpath + "/" + file.getName());} catch (SocketTimeoutException e) {log.info("下载出错:", e);return false;} catch (Exception e) {log.error("下载出错:", e);return false;}finally {try {in.close();ftpClient.completePendingCommand(); // 如果没有这一句 的流是空的 ftpClient.retrieveFileStream(file.getName()); 只有这个文件读取结束以后才能调,不能重复调或者乱掉} catch (Exception e) {log.debug("关闭流错误", e); // 可以忽略的错误}}} catch (Exception e) {log.error("下载文件错误:{}", file.getName(), e);return false;} finally {try {os.close();} catch (Exception e) {log.debug("关闭连接异常"); // 可以忽略的错误类型}}}flag = true;} catch (Exception e) {log.error("下载文件错误:{}", pathname, e);return false;}return flag;}/*** 功能描述: 初始化文件* @param localFile* void*/private static boolean checkFile(File localFile) {if (localFile.exists()) { // 已经存在则删除localFile.delete();}// 然后再创建localFile.mkdirs();if (localFile.exists()) {localFile.delete();}try {localFile.createNewFile();return true;} catch (IOException e) {log.error("创建文件失败", e);return false;}}}
对下载好的数据进行入库操作server层
package sample.service.yssjcj.ky;import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import sample.service.yssjcj.ky.kp.ImportFileThread;
import sample.util.file.FilesUtil;
import sample.util.xxzf.sftp.FTPUtil;import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
@Service
public class Kpbcs4Service {@Value("${threads.size}")private int threadsSie;@Value("${threads.batch-size}")private int batchSize;public void ftpFile(String ftpPath, String localPath) {//FTP下载,ftp目前应该设置的是不支持多线程下载,需要开启设置,boolean b = FTPUtil.downloadFolder( ftpPath, localPath );if (!b) {return;}List<File> fileList = FilesUtil.getFileList( localPath );if (fileList.size() > 0) {//创建多线程List<ListenableFuture<Integer>> futures = new ArrayList();ExecutorService pool = Executors.newFixedThreadPool( threadsSie );ListeningExecutorService executorService = MoreExecutors.listeningDecorator( pool );for (File file : fileList) {//判断是否是文件if (!file.isFile()) {continue;}// 创建线程ImportFileThread importFileThread = new ImportFileThread( file.getPath(), batchSize );// 添加到队列futures.add( executorService.submit( importFileThread ) );}final ListenableFuture<List<Integer>> resultFuture = Futures.successfulAsList( futures );try {// 获取所有线程的执行结果,如果还未执行完毕,则处于等待状态,直至所有线程执行完毕resultFuture.get();} catch (Exception e) {e.printStackTrace();} finally {if (pool != null && !pool.isShutdown()) {pool.shutdown();}}}}
}
线程方法层
package sample.service.yssjcj.ky.kp;import lombok.extern.slf4j.Slf4j;
import sample.util.file.FilesUtil;import java.util.concurrent.Callable;@Slf4j
public class ImportFileThread implements Callable<Integer> {// 文件全名private String filePath;private int batchSize;public ImportFileThread(String filePath, int batchSize) {this.filePath = filePath;this.batchSize = batchSize;}@Overridepublic Integer call() {importfile( filePath ,batchSize);return 1;}public void importfile(String filePath,int batchSize) {log.info( "入库的文件名称{}", filePath );//读取文本数据。FilesUtil.readTxtFile( filePath ,batchSize);}}
dao层,这个使用多线程读取文件,读一条addBatch();一条,需要批量提交。如果将所有一个文件的所有数据加载到内存中,会出现内存溢出问题。
package sample.util.file;import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.Count;
import sample.mapper.yssjcj.ky.KpYrDao;
import sample.mapper.yssjcj.ky.KpYsDao;
import sample.mapper.yssjcj.ky.dao.KpKcs4Dao;
import sample.mapper.yssjcj.ky.dao.KpLrDao;
import sample.mapper.yssjcj.ky.dao.KpLsDao;
import sample.util.StringUtils;
import sample.util.db.DBConnect;import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;@Slf4j
public class FilesUtil {/*** 递归读取文件夹内文件* <p>* fileList* new一个list* strPath* 文件夹路径** @return 文件夹内文件list*/public static List<File> getFileList(String strPath) {List<File> fileList = new ArrayList<>();File dir = new File( strPath );File[] files = dir.listFiles(); // 该文件目录下文件全部放入数组if (files != null) {for (int i = 0; i < files.length; i++) {fileList.add( files[i] );}}return fileList;}/*** 存在的问题就是对于超大型文本,会内存溢出* 读取文本文件,每行按","分隔为数组,去掉空行。* fileFullName 文件名称* charsetName 编码方式*/public static void readTxtFile(String filePath, int batchSize) {BufferedReader reader = null;int count = 0;File file = new File( filePath );String fileName = file.getName();PreparedStatement ps = null;DBConnect dbc = new DBConnect( "daas" );Connection conn = dbc.getConnection();try {log.info( "插入数据的表名【" + fileName + "】" );// 数据库操作,TODO 多线程跑会不会关闭其他的数据库连接reader = new BufferedReader( new InputStreamReader( new FileInputStream( filePath ), "GBK" ) );if (fileName.contains( "LS" )) {String inSql = "INSERT INTO KP_LS ";inSql += "(CZID, SPCDM, CKH, JZRQ, PZTBZ, PJPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, KTTZ, ZZZID, ZZZHZM, XB, PB, CC, CCZZCC, DDQYDM, CCRQ, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, PJHJ, ZZBCCCZCPJ, ZZYTPCC, ZZYTPCCRQ, ZZYTPCCZ, DZZFJYLSH, DZZFFPTKJYLSH, DPBZ, KPXSFWF, YDSPSXF, GSXPBS, GSYPDSPRQ, GSYPDSPCDM, GSYPDSPCKH, GSYPPH, KDWLDH, KDF, KDQYDM, GQCJ, GQTPF, GQJTK, BDH, SXJE, JLJSBZ, WJM, YF) ";inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";ps = conn.prepareStatement( inSql );String lineTempValue = "";//读取第一行文本while ((lineTempValue = reader.readLine()) != null) {if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );KpLsDao lsDao = new KpLsDao();lsDao.saveKpLs( strAarray, fileName, ps );ps.addBatch();/** 批量提交* */count++;if (count % batchSize == 0) {ps.executeBatch();conn.commit();ps.clearBatch();}}}//文件读取完也关闭ps.executeBatch();conn.commit();ps.clearBatch();log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );} else if (fileName.contains( "LR" )) {String inSql = "INSERT INTO KP_LR ";inSql += "(YPDSPRQ, YPDSPCDM, YPDSPCKH, TPZID, TPCDM, TPCKH, TPRQ, TPSJ, JZRQ, TPPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, ZXTZ, ZZZID, ZZZHZM, XB, PB, CC, ZCC, DDQYDM, CCRQ, TPYY, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, YSPJ, YTPJ, TPSXF, JTK, TPRS, SGTPBZ, BZCCSJ, DZZFJYLSH, DPBZ, KDWLDH, KDFTK, KDQYDM, DBH, TBJE, JLJSBZ,WJM,YF) ";inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?)";ps = conn.prepareStatement( inSql );String lineTempValue = "";//读取第一行文本while ((lineTempValue = reader.readLine()) != null) {if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );KpLrDao kpLrDao = new KpLrDao();kpLrDao.saveKpLr( strAarray, fileName, ps );ps.addBatch();/** 批量提交* */count++;if (count % batchSize == 0) {ps.executeBatch();conn.commit();ps.clearBatch();}}}if (count > 0) {ps.executeBatch();conn.commit();ps.clearBatch();}log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );} else if (fileName.contains( "kcs4" )) {log.info( "线程名称" + Thread.currentThread().getName() + "文件名" + reader );String inSql = "INSERT INTO KP_KCS4 ";inSql += "(JYBZ, JZRQ, ZCHZM, SPCDM, ZS, ZFZS, FSRS, GMRS, ZZRS, ZJE, KDJS, KDF, SBZFS, FBFS, SSFS, XSBF,WJM,YF) ";inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";ps = conn.prepareStatement( inSql );String lineTempValue = "";//读取第一行文本while ((lineTempValue = reader.readLine()) != null) {if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );KpKcs4Dao kpKcs4Dao = new KpKcs4Dao();kpKcs4Dao.saveKpKcs4( strAarray, fileName, ps );ps.addBatch();/** 批量提交* */count++;if (count % batchSize == 0) {ps.executeBatch();conn.commit();ps.clearBatch();}}}if (count > 0) {ps.executeBatch();conn.commit();ps.clearBatch();}log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );} else if (fileName.contains( "YS" )) {String inSql = "INSERT INTO KYPG_LS ";inSql += "(YF, WJM, SPBZ, WJRQ, SFCC, CZID, SPCDM, CKH, JZRQ, PZTBZ, PH, SPRQ, FZID, FZ, FZJ, DZID, DZ, DZJ, JYZID, LC, SY1, SY2, SY3, LCTZ, KTTZ, ZZZID, ZZZ, XB, PB, CC, CCZCC, DDQYDM, CCRQ, TSJJ1, TSJJLC1, TSJJPJ1, TSJJ2, TSJJLC2, TSJJPJ2, TSJJ3, TSJJLC3, TSJJPJ3, TSJJ4, TSJJLC4, TSJJPJ4, TSJJ5, TSJJLC5, TSJJPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, PJHJ, ZZBCPJ, YTPCC, YTPCCRQ, YTPCCZ, DZLSH, FPTKLSH, DPBZ, XSFWF, YDPSXF, GSXPBS, YPSPRQ, YPSPCDM, YPSPCKH, YPPH, KDWLDH, KDF, KDQYDM, GQCJ, GQTPF, GQTPK, BDH, SBJ, ZXBS, JCPJ_S, WPDPF7_S, WPDPF3_S, CZKTF_S, RPF_S, JCPJ_J, WPDPF7_J, WPDPF3_J, CZKTF_J, RPF_J, PJHJ_J, GQTPF_J, YL1, ZXPJ, KJPJPJ,YF) ";inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)";ps = conn.prepareStatement( inSql );String lineTempValue = "";//读取第一行文本while ((lineTempValue = reader.readLine()) != null) {if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );KpYsDao kpYsDao = new KpYsDao();kpYsDao.saveKpYs( strAarray, fileName, ps );ps.addBatch();/** 批量提交* */count++;if (count % batchSize == 0) {ps.executeBatch();conn.commit();ps.clearBatch();}}}if (count > 0) {ps.executeBatch();conn.commit();ps.clearBatch();}log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );} else if (fileName.contains( "YR" )) {String inSql = "INSERT INTO KP_LR ";inSql += "(YPDSPRQ, YPDSPCDM, YPDSPCKH, TPZID, TPCDM, TPCKH, TPRQ, TPSJ, JZRQ, TPPH, SPRQ, FZID, FZHZM, FZSSJM, DZID, DZHZM, DZSSJM, JYZID, LC, SY1, SY2, SY3, LCTZ, ZXTZ, ZZZID, ZZZHZM, XB, PB, CC, ZCC, DDQYDM, CCRQ, TPYY, TSJJD1, TSJJDLC1, TSJJDPJ1, TSJJD2, TSJJDLC2, TSJJDPJ2, TSJJD3, TSJJDLC3, TSJJDPJ3, TSJJD4, TSJJDLC4, TSJJDPJ4, TSJJD5, TSJJDLC5, TSJJDPJ5, JCPJ, YZYJK, JJFDF, WPDPF, CZKTF, RPF, YHJ, YSPJ, YTPJ, TPSXF, JTK, TPRS, SGTPBZ, BZCCSJ, DZZFJYLSH, DPBZ, KDWLDH, KDFTK, KDQYDM, DBH, TBJE, JLJSBZ,WJM,YF) ";inSql += " VALUES(?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?)";ps = conn.prepareStatement( inSql );String lineTempValue = "";//读取第一行文本while ((lineTempValue = reader.readLine()) != null) {if (lineTempValue.split( "," ).length > 0 && !(lineTempValue.equals( "#" ))) {String[] strAarray = StringUtils.substringBefore( lineTempValue, ";" ).split( "," );KpYrDao kpYrDao = new KpYrDao();kpYrDao.saveKpYr( strAarray, fileName, ps );ps.addBatch();/** 批量提交* */count++;if (count % batchSize == 0) {ps.executeBatch();conn.commit();ps.clearBatch();}}}if (count > 0) {ps.executeBatch();conn.commit();ps.clearBatch();}log.info( "【{}】文件插入数据库的条数是【{}】", fileName, count );}// 关闭reader.close();//TODO 是否关闭文件,这里关闭跟下面关闭的区别,这里关闭文件会不会关闭错误;} catch (IOException e) {e.printStackTrace();} catch (SQLException e) {e.printStackTrace();} finally {try {if (null != ps) {ps.close();}if (null != conn) {conn.close();}} catch (Exception e) {log.error( e.getMessage(), e );}}}public static int lineNumber(String filePath) {int lineNumber = 0;try {FileReader fileReader = new FileReader( new File( filePath ) );LineNumberReader lineNumberReader = new LineNumberReader( fileReader );lineNumberReader.skip( Long.MAX_VALUE );lineNumber = lineNumberReader.getLineNumber();} catch (IOException e) {e.printStackTrace();}return lineNumber;}public static void main(String[] args) {String fileName = "kcs40201.00P";System.out.println( fileName.contains( "kcs4" ) );}
}
数据库连接方法
package sample.util.db;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.sql.DataSource;
import java.io.InputStream;
import java.sql.*;
import java.util.Hashtable;
import java.util.Properties;/*** @Description* @Author chenfei* @Date 2024/3/20 17:47*/
public class DBConnect { private Connection conn = null;static Logger logger = LoggerFactory.getLogger(DBConnect.class.getName());private static DataSource dataSource;public static Properties dbProps = null;public static String dbName;public static Hashtable<String, DataSource> hmDataSource = new Hashtable<String, DataSource>();static {// 项目执行使用InputStream is = DBConnect.class.getClassLoader().getResourceAsStream("db.properties");//本地执行使用
// File file = new File("src/main/resources/db.properties");
// InputStream is = new FileInputStream(file);dbProps = new Properties();try {dbProps.load(is);dbName = dbProps.getProperty("defaultDb");} catch (Exception e) {logger.error("不能读取属性文件. " + "请确保db.properties在CLASSPATH指定的路径中");}}public static DataSource getdataSource() {return dataSource;}/*** 构造数据库的连接和访问类*/public DBConnect(String dbName) {initializeDataSource(dbName);}/*** 无参数构造方法,默认找ldtj数据源*/public DBConnect() {initializeDataSource(dbName);}private void initializeDataSource(String dbName) {dataSource = hmDataSource.get(dbName);if (dataSource == null)// 建立数据库连接{String dbPool = dbProps.getProperty(dbName + ".dbPool");if (dbPool.equalsIgnoreCase("weblogic")) {String weblogic_pool = dbProps.getProperty(dbName + ".weblogic.poolName");Context ctx = null;Hashtable<Object, Object> ht = new Hashtable<Object, Object>();ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");// ht.put(Context.PROVIDER_URL, "t3://10.1.3.49:7003");try {ctx = new InitialContext(ht);dataSource = (DataSource) ctx.lookup(weblogic_pool);} catch (Exception e) {logger.error("数据库连接失败");e.printStackTrace();}} else if (dbPool.equalsIgnoreCase("druid")) {try {Hashtable<String, String> map = new Hashtable<String, String>();map.put("driverClassName", dbProps.getProperty(dbName + ".driverClassName"));map.put("url", dbProps.getProperty(dbName + ".url"));map.put("username", dbProps.getProperty(dbName + ".username"));map.put("password", dbProps.getProperty(dbName + ".password"));map.put("initialSize", dbProps.getProperty(dbName + ".initialSize"));map.put("maxActive", dbProps.getProperty(dbName + ".maxActive"));map.put("timeBetweenEvictionRunsMillis", "60000");map.put("minEvictableIdleTimeMillis", "300000");if (map.get("driverClassName").indexOf("oracle") != -1) {map.put("validationQuery", "SELECT 1 FROM DUAL");map.put("testWhileIdle", "true");map.put("testOnBorrow", "false");map.put("testOnReturn", "false");}// map.put("poolPreparedStatements", "true");// map.put("maxPoolPreparedStatementPerConnectionSize",// "50");map.put("removeAbandoned", "true");map.put("removeAbandonedTimeout", "1800");map.put("logAbandoned", "true");// DruidDataSourceFactory.createDataSource(map).getConnection();dataSource = DruidDataSourceFactory.createDataSource(map);} catch (Exception e) {dataSource = null;logger.error("数据库连接失败");e.printStackTrace();}}hmDataSource.put(dbName, dataSource);}try {conn = hmDataSource.get(dbName).getConnection();} catch (SQLException e) {logger.error("数据库连接失败:" + e.getMessage());e.printStackTrace();}}public Connection getConnection() {return conn;}public void close() {try {conn.close();conn = null;} catch (SQLException e) {e.printStackTrace();}}public static void closeConn(Connection conn, PreparedStatement ps, Statement st, ResultSet rs) {try {if (null != rs) {rs.close();}if (null != st) {st.close();}if (null != ps) {ps.close();}if (null != conn) {conn.close();}} catch (SQLException e) {e.printStackTrace();}}public static void closeConn(Connection conn, PreparedStatement pst) {DBConnect.closeConn(conn, pst, null, null);}public static void closePst(PreparedStatement pst) {DBConnect.closeConn(null, pst, null, null);}public static void main(String[] args) {DBConnect dbConnect = new DBConnect("jcpt");System.out.println(dbConnect.getConnection());}/*** @param dbUrl: 数据库 IP:port/dbname* @param username: 用户名* @param password: 密码* @description 通过JDBC获取数据库连接* @return: java.sql.Connection* @author Xin.Lu* @date 2020/8/11 16:18*/public static Connection getConneciton(String dbUrl, String username, String password) {Connection conn = null;try {Class.forName("oracle.jdbc.driver.OracleDriver");conn = DriverManager.getConnection("jdbc:oracle:thin:@" + dbUrl, username, password);} catch (ClassNotFoundException | SQLException e) {e.printStackTrace();}return conn;}/*** @param i:* @param size:* @description 判断是否需要提交* @return: boolean* @author Xin.Lu* @date 2020/8/12 16:40*/public static boolean isNeedCommit(int i, int size) {if ((i != 0 && i % 1000 == 0) || (i == size - 1)) {return true;} else {return false;}}
}