背景描述:
为了满足linux服务器上特定目录的非结构化文件的实时监控,并上传HDFS
使用的方法
Apache的Commons-IO,来实现文件的监控功能
所需要的pom
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.0.0</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency><dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version><scope>compile</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.26</version></dependency><!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.22</version></dependency></dependencies>
public static void copyFile2HDFS(URI hdfsURI, String username, String srcPath, String newPath) {try {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(hdfsURI, conf, username);Path src = new Path(srcPath);Path dst = new Path(newPath);if (fs.exists(dst)) {fs.delete(dst, true);}fs.copyFromLocalFile(src, dst);fs.close();System.out.println("Upload Successfully!");} catch (Exception e) {e.printStackTrace();StaticLog.info("复制文件失败{}", e.getMessage());}}
public static String getHDFSPath(File file) {// 判断文件格式,包括视频、图片、文本和音频等,你可以根据实际需求进行修改String fileName = file.getName();String extension = fileName.substring(fileName.lastIndexOf(".") + 1).toLowerCase();if (extension.equals("mp4") || extension.equals("avi") || extension.equals("mov")) {return "/data/shipin/" + file.getName();} else if (extension.equals("jpg") || extension.equals("png")) {return "/data/txt/" + file.getName();} else if (extension.equals("m4a") || extension.equals("wav")) {return "/data/yuyin/" + file.getName();} else if (extension.equals("txt")) {return "/data/wenjian/" + file.getName();} else {return "/data/" + file.getName();}}
FileMonitorTest.java
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package com.xxx.fileSync;import java.util.concurrent.TimeUnit;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;public class FileMonitorTest {public FileMonitorTest() {}public static void main(String[] arugs) throws Exception {String absolateDir = "/opt/xxxx";long intervalTime = TimeUnit.SECONDS.toMillis(5L);new FileAlterationObserver(absolateDir, FileFilterUtils.and(new IOFileFilter[]{FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(".success")}));FileAlterationObserver observer = new FileAlterationObserver(absolateDir);observer.addListener(new FileListener());FileAlterationMonitor monitor = new FileAlterationMonitor(intervalTime, new FileAlterationObserver[]{observer});monitor.start();}
}
FileListener.java重写方法
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package com.xxx.fileSync;import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class FileListener extends FileAlterationListenerAdaptor {private static final Logger log = LoggerFactory.getLogger(FileListener.class);URI uri = new URI("hdfs://xxxxx:802xx0");String newPath = "";String newHDFSPath = "";String userName = "root";public FileListener() throws URISyntaxException {}public void onStart(FileAlterationObserver observer) {super.onStart(observer);}public void onDirectoryCreate(File directory) {this.newPath = "/data" + directory.getName();System.out.println("文件路径:" + directory.getAbsolutePath() + " 文件夹创建:" + directory.getName());FileUtil.newDir2HDFS(this.uri, this.userName, this.newPath);log.info("[Deleted Directory] : {}", directory.getAbsolutePath());}public void onDirectoryChange(File directory) {log.info("[Changed Directory] : {}", directory.getAbsolutePath());}public void onDirectoryDelete(File directory) {log.info("[Created Directory] : {}", directory.getAbsolutePath());}public void onFileCreate(File file) {try {log.info("[Created File] : {}", file.getAbsolutePath());this.newHDFSPath = FileUtil.getHDFSPath(file);this.newPath = FileUtil.getDestPath(file);System.out.println("监控源文件路径:" + file.toPath());System.out.println("监控源文件路径:" + file.getAbsolutePath() + " 目标HDFS文件创建:" + this.newHDFSPath);System.out.println("监控源文件路径:" + file.getAbsolutePath() + " 目标Linux文件创建:" + this.newPath);FileUtil.copyFile2HDFS(this.uri, this.userName, file.getAbsolutePath(), this.newHDFSPath);Files.copy(file.toPath(), (new File(this.newPath)).toPath(), StandardCopyOption.REPLACE_EXISTING);} catch (Throwable var3) {throw var3;}}public void onFileChange(File file) {try {log.info("[Amended File] : {}", file.getAbsolutePath());this.newPath = FileUtil.getDestPath(file);FileUtil.copyFile2HDFS(this.uri, this.userName, file.getAbsolutePath(), this.newPath);Files.copy(file.toPath(), (new File(this.newPath)).toPath(), StandardCopyOption.REPLACE_EXISTING);} catch (Throwable var3) {throw var3;}}public void onFileDelete(File file) {try {log.info("[Deleted File] : {}", file.getAbsolutePath());this.newHDFSPath = FileUtil.getHDFSPath(file);this.newPath = FileUtil.getDestPath(file);FileUtil.delFile2HDFS(this.uri, this.userName, this.newHDFSPath);Files.delete((new File(this.newPath)).toPath());} catch (Throwable var3) {throw var3;}}public void onStop(FileAlterationObserver observer) {super.onStop(observer);}
}