分布式文件系统HDFS中对文件/目录的相关操作代码,整理了一下,大概包括以下部分:
- 文件夹的新建、删除、重命名
- 文件夹中子文件和目录的统计
- 文件的新建及显示文件内容
- 文件在local和remote间的相互复制
- 定位文件在HDFS中的位置,以及副本存放的主机
- HDFS资源使用情况
1. 新建文件夹
public void mkdirs(String folder) throws IOException {Path path = new Path(folder);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);if (!fs.exists(path)) {fs.mkdirs(path);System.out.println("Create: " + folder);}fs.close(); }
2. 删除文件夹
public void rmr(String folder) throws IOException {Path path = new Path(folder);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);fs.deleteOnExit(path);System.out.println("Delete: " + folder);fs.close(); }
3. 文件重命名
public void rename(String src, String dst) throws IOException {Path name1 = new Path(src);Path name2 = new Path(dst);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);fs.rename(name1, name2);System.out.println("Rename: from " + src + " to " + dst);fs.close(); }
4. 列出文件夹中的子文件及目录
public void ls(String folder) throws IOException {Path path = new Path(folder);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);FileStatus[] list = fs.listStatus(path);System.out.println("ls: " + folder);System.out.println("==========================================================");for (FileStatus f : list) {System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDirectory(), f.getLen());}System.out.println("==========================================================");fs.close(); }
5. 创建文件,并添加内容
public void createFile(String file, String content) throws IOException {FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);byte[] buff = content.getBytes();FSDataOutputStream os = null;try {os = fs.create(new Path(file));os.write(buff, 0, buff.length);System.out.println("Create: " + file);} finally {if (os != null)os.close();}fs.close(); }
6. 将local数据复制到remote
public void copyFile(String local, String remote) throws IOException {FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);fs.copyFromLocalFile(new Path(local), new Path(remote));System.out.println("copy from: " + local + " to " + remote);fs.close(); }
7. 将remote数据下载到local
public void download(String remote, String local) throws IOException {Path path = new Path(remote);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);fs.copyToLocalFile(path, new Path(local));System.out.println("download: from" + remote + " to " + local);fs.close(); }
8. 显示文件内容
public String cat(String remoteFile) throws IOException {Path path = new Path(remoteFile);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);FSDataInputStream fsdis = null;System.out.println("cat: " + remoteFile);OutputStream baos = new ByteArrayOutputStream();String str = null;try {fsdis = fs.open(path);IOUtils.copyBytes(fsdis, baos, 4096, false);str = baos.toString();} finally {IOUtils.closeStream(fsdis);fs.close();}System.out.println(str);return str;}
9. 定位一个文件在HDFS中存储的位置,以及多个副本存储在集群哪些节点上
public void location() throws IOException {String folder = hdfsPath + "create/";String file = "t2.txt";FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration());FileStatus f = fs.getFileStatus(new Path(folder + file));BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());System.out.println("File Location: " + folder + file);for (BlockLocation bl : list) {String[] hosts = bl.getHosts();for (String host : hosts) {System.out.println("host:" + host);}}fs.close(); }
10. 获取HDFS集群存储资源使用情况
public void getTotalCapacity() {try {FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);FsStatus fsStatus = fs.getStatus();System.out.println("总容量:" + fsStatus.getCapacity());System.out.println("使用容量:" + fsStatus.getUsed());System.out.println("剩余容量:" + fsStatus.getRemaining());} catch (IOException e) {e.printStackTrace();} }
完整代码
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf;/* * HDFS工具类 * */ public class Hdfs {private static final String HDFS = "hdfs://10.20.14.47:8020/";public Hdfs(Configuration conf) {this(HDFS, conf);}public Hdfs(String hdfs, Configuration conf) {this.hdfsPath = hdfs;this.conf = conf;}private String hdfsPath;private Configuration conf;public static void main(String[] args) throws IOException {JobConf conf = config();Hdfs hdfs = new Hdfs(conf);hdfs.createFile("/create/t2.txt", "12");hdfs.location();}public static JobConf config() {JobConf conf = new JobConf(Hdfs.class);conf.setJobName("HdfsDAO");conf.addResource("classpath:/hadoop/core-site.xml");conf.addResource("classpath:/hadoop/hdfs-site.xml");conf.addResource("classpath:/hadoop/mapred-site.xml");return conf;}/** 创建文件夹*/public void mkdirs(String folder) throws IOException {Path path = new Path(folder);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);if (!fs.exists(path)) {fs.mkdirs(path);System.out.println("Create: " + folder);}fs.close();}/** 删除文件夹*/public void rmr(String folder) throws IOException {Path path = new Path(folder);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);fs.deleteOnExit(path);System.out.println("Delete: " + folder);fs.close();}/** 文件重命名*/public void rename(String src, String dst) throws IOException {Path name1 = new Path(src);Path name2 = new Path(dst);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);fs.rename(name1, name2);System.out.println("Rename: from " + src + " to " + dst);fs.close();}/** 列出文件夹中的子文件及目录*/public void ls(String folder) throws IOException {Path path = new Path(folder);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);FileStatus[] list = fs.listStatus(path);System.out.println("ls: " + folder);System.out.println("==========================================================");for (FileStatus f : list) {System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDirectory(), f.getLen());}System.out.println("==========================================================");fs.close();}/** 创建文件,并添加内容*/public void createFile(String file, String content) throws IOException {FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);byte[] buff = content.getBytes();FSDataOutputStream os = null;try {os = fs.create(new Path(file));os.write(buff, 0, buff.length);System.out.println("Create: " + file);} finally {if (os != null)os.close();}fs.close();}/** 将local的数据复制到remote*/public void copyFile(String local, String remote) throws IOException {FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);fs.copyFromLocalFile(new Path(local), new Path(remote));System.out.println("copy from: " + local + " to " + remote);fs.close();}/** 将remote数据下载到local*/public void download(String remote, String local) throws IOException {Path path = new Path(remote);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);fs.copyToLocalFile(path, new Path(local));System.out.println("download: from" + remote + " to " + local);fs.close();}/** 显示文件内容*/public String cat(String remoteFile) throws IOException {Path path = new Path(remoteFile);FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);FSDataInputStream fsdis = null;System.out.println("cat: " + remoteFile);OutputStream baos = new ByteArrayOutputStream();String str = null;try {fsdis = fs.open(path);IOUtils.copyBytes(fsdis, baos, 4096, false);str = baos.toString();} finally {IOUtils.closeStream(fsdis);fs.close();}System.out.println(str);return str;}/** 定位一个文件在HDFS中存储的位置,以及多个副本存储在集群哪些节点上*/public void location() throws IOException {String folder = hdfsPath + "create/";String file = "t2.txt";FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration());FileStatus f = fs.getFileStatus(new Path(folder + file));BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());System.out.println("File Location: " + folder + file);for (BlockLocation bl : list) {String[] hosts = bl.getHosts();for (String host : hosts) {System.out.println("host:" + host);}}fs.close();}/** 获取HDFS资源使用情况*/public void getTotalCapacity() {try {FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);FsStatus fsStatus = fs.getStatus();System.out.println("总容量:" + fsStatus.getCapacity());System.out.println("使用容量:" + fsStatus.getUsed());System.out.println("剩余容量:" + fsStatus.getRemaining());} catch (IOException e) {e.printStackTrace();}}/** 获取某文件中包含的目录数,文件数,及占用空间大小*/public void getContentSummary(String path) {ContentSummary cs = null;try {FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);cs = fs.getContentSummary(new Path(path));} catch (Exception e) {e.printStackTrace();}// 目录数Long directoryCount = cs.getDirectoryCount();// 文件数Long fileCount = cs.getFileCount();// 占用空间Long length = cs.getLength();System.out.println("目录数:" + directoryCount);System.out.println("文件数:" + fileCount);System.out.println("占用空间:" + length);} }