java监控目录实时上传HDFS

背景描述:

为了满足linux服务器上特定目录的非结构化文件的实时监控,并上传HDFS

使用的方法

Apache的Commons-IO,来实现文件的监控功能

所需要的pom

<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.0.0</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency><dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version><scope>compile</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.26</version></dependency><!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.22</version></dependency></dependencies>
public static void copyFile2HDFS(URI hdfsURI, String username, String srcPath, String newPath) {try {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(hdfsURI, conf, username);Path src = new Path(srcPath);Path dst = new Path(newPath);if (fs.exists(dst)) {fs.delete(dst, true);}fs.copyFromLocalFile(src, dst);fs.close();System.out.println("Upload Successfully!");} catch (Exception e) {e.printStackTrace();StaticLog.info("复制文件失败{}", e.getMessage());}}
public static String getHDFSPath(File file) {// 判断文件格式,包括视频、图片、文本和音频等,你可以根据实际需求进行修改String fileName = file.getName();String extension = fileName.substring(fileName.lastIndexOf(".") + 1).toLowerCase();if (extension.equals("mp4") || extension.equals("avi") || extension.equals("mov")) {return "/data/shipin/" + file.getName();} else if (extension.equals("jpg") || extension.equals("png")) {return "/data/txt/" + file.getName();} else if (extension.equals("m4a") || extension.equals("wav")) {return "/data/yuyin/" + file.getName();} else if (extension.equals("txt")) {return "/data/wenjian/" + file.getName();} else {return "/data/" + file.getName();}}
FileMonitorTest.java
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package com.xxx.fileSync;import java.util.concurrent.TimeUnit;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;public class FileMonitorTest {public FileMonitorTest() {}public static void main(String[] arugs) throws Exception {String absolateDir = "/opt/xxxx";long intervalTime = TimeUnit.SECONDS.toMillis(5L);new FileAlterationObserver(absolateDir, FileFilterUtils.and(new IOFileFilter[]{FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(".success")}));FileAlterationObserver observer = new FileAlterationObserver(absolateDir);observer.addListener(new FileListener());FileAlterationMonitor monitor = new FileAlterationMonitor(intervalTime, new FileAlterationObserver[]{observer});monitor.start();}
}
FileListener.java重写方法
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package com.xxx.fileSync;import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class FileListener extends FileAlterationListenerAdaptor {private static final Logger log = LoggerFactory.getLogger(FileListener.class);URI uri = new URI("hdfs://xxxxx:802xx0");String newPath = "";String newHDFSPath = "";String userName = "root";public FileListener() throws URISyntaxException {}public void onStart(FileAlterationObserver observer) {super.onStart(observer);}public void onDirectoryCreate(File directory) {this.newPath = "/data" + directory.getName();System.out.println("文件路径:" + directory.getAbsolutePath() + " 文件夹创建:" + directory.getName());FileUtil.newDir2HDFS(this.uri, this.userName, this.newPath);log.info("[Deleted Directory] : {}", directory.getAbsolutePath());}public void onDirectoryChange(File directory) {log.info("[Changed Directory] : {}", directory.getAbsolutePath());}public void onDirectoryDelete(File directory) {log.info("[Created Directory] : {}", directory.getAbsolutePath());}public void onFileCreate(File file) {try {log.info("[Created File] : {}", file.getAbsolutePath());this.newHDFSPath = FileUtil.getHDFSPath(file);this.newPath = FileUtil.getDestPath(file);System.out.println("监控源文件路径:" + file.toPath());System.out.println("监控源文件路径:" + file.getAbsolutePath() + " 目标HDFS文件创建:" + this.newHDFSPath);System.out.println("监控源文件路径:" + file.getAbsolutePath() + " 目标Linux文件创建:" + this.newPath);FileUtil.copyFile2HDFS(this.uri, this.userName, file.getAbsolutePath(), this.newHDFSPath);Files.copy(file.toPath(), (new File(this.newPath)).toPath(), StandardCopyOption.REPLACE_EXISTING);} catch (Throwable var3) {throw var3;}}public void onFileChange(File file) {try {log.info("[Amended File] : {}", file.getAbsolutePath());this.newPath = FileUtil.getDestPath(file);FileUtil.copyFile2HDFS(this.uri, this.userName, file.getAbsolutePath(), this.newPath);Files.copy(file.toPath(), (new File(this.newPath)).toPath(), StandardCopyOption.REPLACE_EXISTING);} catch (Throwable var3) {throw var3;}}public void onFileDelete(File file) {try {log.info("[Deleted File] : {}", file.getAbsolutePath());this.newHDFSPath = FileUtil.getHDFSPath(file);this.newPath = FileUtil.getDestPath(file);FileUtil.delFile2HDFS(this.uri, this.userName, this.newHDFSPath);Files.delete((new File(this.newPath)).toPath());} catch (Throwable var3) {throw var3;}}public void onStop(FileAlterationObserver observer) {super.onStop(observer);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/109846.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

选择什么电容笔比较好?平板手写笔推荐

由于苹果Pencil的热销&#xff0c;让华国内市场上&#xff0c;也出现了不少的平替式电容笔&#xff0c;这些产品&#xff0c;有好有坏&#xff0c;价格也很公道。不过&#xff0c;也有很多产品的价格都很平价。我是一个拥有多年经验的数码发烧友&#xff0c;在前几年就开始用上…

嵌入式Linux运行一定需要MMU吗

为什么需要MMU &#xff1f; 我们知道应用程序是不能随意访问内存的&#xff0c;如果让应用程序直接访问物理内存&#xff0c;那么计算机是很危险的&#xff0c;计算机内存的所有内容将被完全暴露出来。 所以出现了mmu&#xff0c;mmu是内存管理单元&#xff0c;应用程序访问…

docker load and build过程的一些步骤理解

docker load 命令执行原理 “docker load” command, the following steps are followed to load an image from a specified tar file to the local image repository: Parsing the tar file: Docker first parses the tar file to check its integrity and verify the form…

Spring Boot关掉APR

在Spring Boot中&#xff0c;可以通过设置环境变量或配置文件来关闭APR&#xff08;Apache Portable Runtime&#xff09;。下面是两种常用的方法&#xff1a; 方法一&#xff1a;设置环境变量 在启动Spring Boot应用程序时&#xff0c;可以通过设置-DuseApr环境变量来控制是…

链表 oj2 (7.31)

206. 反转链表 - 力扣&#xff08;LeetCode&#xff09; 我们通过头插来实现 将链表上的节点取下来&#xff08;取的时候需要记录下一个节点&#xff09;&#xff0c;形成新的链表&#xff0c;对新的链表进行头插。 /*** Definition for singly-linked list.* struct ListNode…

攻防千层饼

目录 钓鱼 钓鱼攻击 求职招聘 投毒 社会工程学 仿冒IT运维人员 补贴 吃瓜 反钓鱼攻击 DLL劫持制作木马 NSIS制作安装包 mysql蜜罐 LOAD DATA LOCAL INFILE 蜜罐 溯源反制 溯源 钓鱼邮件溯源 后门木马溯源 NPS 未授权访问 默认密码 灯塔 默认密码 反溯源…

Vulnhub系列靶机---Raven2

文章目录 Raven2 渗透测试信息收集提权UDF脚本MySQL提权SUID提权 Raven2 渗透测试 信息收集 查看存活主机 arp-scan -l 找到目标主机。 扫描目标主机上的端口、状态、服务类型、版本信息 nmap -A 192.168.160.47目标开放了 22、80、111 端口 访问一下80端口&#xff0c;并…

亚马逊云科技多项新功能与服务,助力各种规模的组织拥抱生成式 AI

从初创企业到大型企业&#xff0c;各种规模的组织都纷纷开始接触生成式 AI 技术。这些企业希望充分利用生成式 AI&#xff0c;将自身在测试版、原型设计以及演示版中的畅想带到现实场景中&#xff0c;实现生产力的大幅提升并大力进行创新。但是&#xff0c;组织要怎样才能在企业…

pytorch的安装【全官网流程】

1.准备python环境 python环境需要看pytorch上说明的版本本文用的是python3.9 conda create -n pytorch39 python3.92.安装pytorch【要使用GPU的先安装步骤3的CUDA在安装这个】 pytorch官方地址 &#xff08;1&#xff09;官方指出了python版本&#xff1a; &#xff08;2…

接口自动化测试难点:数据库验证解决方案!

接口自动化中的数据库验证&#xff1a;确保数据的一致性和准确性 接口自动化测试是现代软件开发中不可或缺的一环&#xff0c;而数据库验证则是确保接口返回数据与数据库中的数据一致性的重要步骤。本文将介绍接口自动化中的数据库验证的原理、步骤以及示例代码&#xff0c;帮…

rust的Defef和DerefMut学习

rust的Defef 介绍 pub trait Deref {type Target: ?Sized;// Required methodfn deref(&self) -> &Self::Target; }用于不可变的解引用操作,例如 *v ( immutable dereferencing operations)。 除了在不可变上下文中使用(一元)* 运算符进行显式解引用操作(…

am权限系统对接笔记

文章目录 角色如何对应机构如何对应 am需要提供的接口机构、角色、人员查关系 消息的交互方式方式1 接口查询方式2 mq推送消息到业务系统 am是一套通用权限管理系统。 为什么要接入am呢? 举例&#xff0c;甲方有10个供方&#xff0c;每个供方都有单独的权限系统&#xff0c;不…

爬取某音乐榜单歌曲

一、打开网页https://music.163.com/&#xff0c;进入榜单&#xff08;热歌榜&#xff09; 二、右键检查、刷新网页&#xff0c;选择元素&#xff08;点击歌曲名&#xff09; 三、相关代码 import requests #正则表达式模块内置模块 import re import osfilename music\\ if …

“第四十三天”

这个是我自己写的&#xff0c;下面那个是看的别人的&#xff0c;其实大致都是一样的&#xff0c;通过四次循环&#xff0c;挨个求和比较&#xff0c;都很麻烦&#xff0c;但重点在于&#xff0c;对于已知变量的运用&#xff0c;当我需要在最内层循环用变量确定a数组组元时&…

ARM 堆栈寻址类型区分

文章目录 堆栈指向分类堆栈指向数据分类满递增与满递减空递增与空递减 堆栈指向分类 根据堆栈指针的指向的方向不同&#xff0c;可以划分为向上生成型和向下生成型。 向上生成型&#xff1a; 随着数据的入栈&#xff0c;堆栈的指针逐渐增大&#xff0c;称为&#xff1a;递增…

Linux真的很难吗?文末送5本《Linux运维之道(第3版)》

目录 一、百度百科二、VMWare中安装centos1、下载地址2、网络适配器简介 三、克隆和快照1、克隆一般用于项目部署2、快照相当于SVN&#xff0c;是Linux系统的版本管理手段 四、XShell和Xftp1、XShell百度百科2、Xftp百度百科3、xshell7和xftp7下载地址 五、vi和vim1、vim基本编…

【GA-ACO-BP预测】基于混合遗传算法-蚁群算法优化BP神经网络回归预测研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

python:talib.BBANDS 画股价-布林线图

python 安装使用 TA_lib 安装主要在 http://www.lfd.uci.edu/~gohlke/pythonlibs/ 这个网站找到 TA_Lib-0.4.24-cp310-cp310-win_amd64.whl pip install /pypi/TA_Lib-0.4.24-cp310-cp310-win_amd64.whl 编写 talib_boll.py 如下 # -*- coding: utf-8 -*- import os impor…

《视觉 SLAM 十四讲》V2 第 12 讲 建图

文章目录 12.2 单目稠密 重建12.2.2 极线搜索 && 块匹配12.2.3 高斯分布的深度滤波器 12.3 单目稠密重建 【Code】待改进12.3.4 图像间的变换 12.4 RGB-D 稠密建图12.4.1 点云地图 【Code】查询OpenCV版本 opencv_version 12.4.2 从点云 重建 网格 【Code】查看PCL 版本…

GO-实现简单文本格式 文本字体颜色、大小、突出

毫无疑问GO的生态就是一坨大便。老子英文水平小学啊。 实现简单文本格式 文本字体颜色、大小、突出显示等。 创建要给docx文件容器【我估算的】 doc : document.New() defer doc.Close() doc.SaveToFile("simple.docx") 把容器保存为文件 设置标题 创建自然段…