目录
文件传输步骤
windows的本机文件传输
linux的虚拟机文件传输
文件传输步骤
建立连接
在connect2HDFS()方法中,通过设置Configuration对象来指定HDFS的URI(在这个例子中为hdfs://192.168.12.133:9000),并初始化一个FileSystem实例fs,用于后续的所有HDFS操作。
关闭连接
close()方法用于在完成所有HDFS操作后关闭与HDFS的连接,确保资源被正确释放。
上传文件并分类 (uploadAndClassify(File file)):方法接收一个本地文件作为参数。
将本地文件上传到HDFS上的相应目录中。
业务逻辑
该方法接受一个字符串类型的目录路径作为参数,将其转换为Path对象,并检查该路径是否已存在。如果不存在,则创建新的目录。
主函数执行
首先调用connect2HDFS()方法与HDFS建立连接。指定一个本地目录(在这个例子中是/home/covid_data),然后遍历这个目录下的所有.json文件。对每个符合条件的文件调用uploadAndClassify(File file)方法进行处理。
package hdfs.demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.File;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;public class HDFSApiDemo {private static FileSystem fs = null;// 用于和HDFS建立连接public static void connect2HDFS() throws IOException {Configuration conf = new Configuration();conf.set("fs.defaultFS","hdfs://192.168.12.133:9000");// 显式指定本地文件系统的实现类conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");System.out.println("111");fs = FileSystem.get(conf);}// 关闭客户端和HDFS的连接public static void close(){if(fs != null){try{fs.close();} catch (IOException e){e.printStackTrace();}}}/*** 上传文件并分类* @param file* @throws IOException*/public static void uploadAndClassify(File file) throws IOException {// 提取所有汉字作为省份名称Pattern pattern = Pattern.compile("([\\p{IsHan}]+).*\\.json");// 匹配所有汉字Matcher matcher = pattern.matcher(file.getName());// 匹配汉字成功if (matcher.find()) {String province = matcher.group(1); // 获取所有汉字组成的字符串String targetDir = "/covid_data/" + province + "/"; // 使用全部汉字作为目录名String fileName = file.getName();System.out.println("Processing file: " + fileName);// 创建省份目录(如果不存在)mkdir(targetDir);// HDFS目的路径Path dst = new Path(targetDir + fileName);// 上传文件fs.copyFromLocalFile(new Path(file.getAbsolutePath()), dst);System.out.println("Uploaded: " + fileName + " to " + targetDir);} else {System.out.println("File does not match the expected pattern: " + file.getName());}}// 重载mkdir()方法,支持String类型参数public static void mkdir(String dir) throws IOException {Path path = new Path(dir);if (!fs.exists(path)) {fs.mkdirs(path); // 创建目录}}public static void main(String[] args) throws IOException {try {connect2HDFS();// 虚拟机上的本地目录File localDir = new File("/home/covid_data");// 遍历目录下的所有文件for (File file : localDir.listFiles()) {if (file.isFile() && file.getName().endsWith(".json")) {uploadAndClassify(file);}}close();} catch (IOException e) {e.printStackTrace();}}}
windows的本机文件传输
对于上述代码只需要更改上传路径即可
遭遇问题1:端口无法访问
// 第一步 cd /usr/local/hadoop(安装路径)
// 第二步 vi ./etc/hadoop/hdfs-site.xml
// 第三步 加入下列配置 目的:让NameNode监听所有网络接口上的9000端口
<property>
<name>dfs.namenode.rpc-bind-host</name>
<value>0.0.0.0</value>
</property>
// 第四步 让配置生效
先关闭HDFS命令 ./sbin/stop-dfs.sh
在重启HDFS命令 ./sbin/start-dfs.sh
遭遇问题2:用户权限不足
// 改变hadoop用户权限 这会将/data目录的权限设置为rwxrwxrwx,允许所有用户读写执行
// hdfs dfs -chmod 777 /data
linux的虚拟机文件传输
遭遇问题1:无法找到主类
<!-- 添加maven-shade-plugin插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>hdfs.demo.HDFSApiDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
// 这段Maven配置的作用是使用maven-shade-plugin插件在打包阶段创建一个包含所有依赖的可执行JAR文件,并指定hdfs.demo.HDFSApiDemo作为JAR文件的主类(即包含main方法的入口类)
遭遇问题1:Hadoop尝试使用file:// URI方案时找不到对应的文件系统实现
// 显式指定本地文件系统的实现类
conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");