本文介绍使用hdfs java api的配置方法。
1、先解决依赖,pom
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version><scope>provided</scope></dependency>
2、配置文件,存放hdfs集群配置信息,基本都是来源于core-site.xml和hdfs-site.xml,可以根据hdfs集群client端配置文件里的信息进行填写
#============== hadoop =================== hdfs.fs.defaultFS=hdfs://mycluster-tj hdfs.ha.zookeeper.quorum=XXXX-apache00.XX01,XXXX-apache01.XX01,XXXX-apache02.XX01 hdfs.dfs.nameservices=XXXX hdfs.dfs.ha.namenodes.mycluster-tj=XX1,XX2 hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1=XXXX-apachenn01.XX01:8020 hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2=XXXX-apachenn02.XX01:8020
3、java client api
import java.io.IOException; import java.net.URI; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.text.SimpleDateFormat; import java.util.Date;public class HadoopClient {protected final Logger logger = LoggerFactory.getLogger(this.getClass());private FileSystem fs;private String defaultFS;private String zKQuorum;private String nameServices;private String nameNodes;private String rpcAddressNN1;private String rpcAddressNN2;public void setDefaultFS(String defaultFS) {this.defaultFS = defaultFS;}public String getDefaultFS() {return defaultFS;}public void setZKQuorum(String zKQuorum) {this.zKQuorum = zKQuorum;}public String getzKQuorum() {return zKQuorum;}public void setNameServices(String nameServices) {this.nameServices = nameServices;}public String getNameServices() {return nameServices;}public void setNameNodes(String nameNodes) {this.nameNodes = nameNodes;}public String getNameNodes() {return nameNodes;}public void setRpcAddressNN1(String rpcAddressNN1) {this.rpcAddressNN1 = rpcAddressNN1;}public String getRpcAddressNN1() {return rpcAddressNN1;}public void setRpcAddressNN2(String rpcAddressNN2) {this.rpcAddressNN2 = rpcAddressNN2;}public String getRpcAddressNN2() {return rpcAddressNN2;}public void init() {try {Configuration conf = new Configuration();conf.set("fs.defaultFS", defaultFS);conf.set("ha.zookeeper.quorum", zKQuorum);conf.set("dfs.nameservice", nameServices);conf.set("dfs.ha.namenodes.mycluster-tj", nameNodes);conf.set("dfs.namenode.rpc-address.mycluster-tj.nn1", rpcAddressNN1);conf.set("dfs.namenode.rpc-address.mycluster-tj.nn2", rpcAddressNN2);fs = FileSystem.get(new URI(defaultFS), conf);} catch (Exception ex) {ex.printStackTrace();}}public void stop() {try {fs.close();} catch(Exception e) {}}public boolean exists(String path) {boolean isExists = false;try {Path hdfsPath = new Path(path);isExists = fs.exists(hdfsPath);} catch (Exception ex) {logger.error("exists error: {}", ex.getMessage());}return isExists;}public String getModificationTime(String path) throws IOException {String modifyTime = null;try {Path hdfsPath = new Path(path);FileStatus fileStatus = fs.getFileStatus(hdfsPath);long modifyTimestamp = fileStatus.getModificationTime();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");Date date = new Date(modifyTimestamp);modifyTime = simpleDateFormat.format(date);} catch(Exception ex) {logger.error("getModificationTime error: {}", ex.getMessage());}return modifyTime;}}
4、configuration
import com.xiaoju.dqa.prometheus.client.hadoop.HadoopClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class HadoopConfiguration {@Value("${hdfs.fs.defaultFS}")private String defaultFS;@Value("${hdfs.ha.zookeeper.quorum}")private String zKQuorum;@Value("${hdfs.dfs.nameservices}")private String nameServices;@Value("${hdfs.dfs.ha.namenodes.mycluster-tj}")private String nameNodes;@Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1}")private String rpcAddressNN1;@Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2}")private String rpcAddressNN2;@Bean(initMethod = "init", destroyMethod = "stop")public HadoopClient hadoopClient() {HadoopClient hadoopClient = new HadoopClient();hadoopClient.setDefaultFS(defaultFS);hadoopClient.setZKQuorum(zKQuorum);hadoopClient.setNameServices(nameServices);hadoopClient.setNameNodes(nameNodes);hadoopClient.setRpcAddressNN1(rpcAddressNN1);hadoopClient.setRpcAddressNN2(rpcAddressNN2);return hadoopClient;} }
今天被一个问题坑的要死了,回来补这篇文章。
如果你要访问的集群采用了viewfs方式管理数据,按照本文上面的方法链接集群是有问题。会导致由URI和nameservices解析成功的namenode才可以访问,而其他的访问不了!!!
如果你想解决这个问题,在api部分你要去掉URI部分和nameservices配置,直接使用集群客户端hdfs-site.xml和core-site.xml
应该是这样的。
package com.xiaoju.dqa.jazz.hadoop.client;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date;public class HadoopClient {protected final Logger logger = LoggerFactory.getLogger(this.getClass());private FileSystem fs;public void init() {try {Configuration conf = new Configuration();conf.addResource("core-site.xml");conf.addResource("hdfs-site.xml");conf.addResource("mount-table.xml");fs = FileSystem.get(conf);} catch (Exception ex) {ex.printStackTrace();}}public void stop() {try {fs.close();} catch(Exception e) {}}public boolean exists(String path) {boolean isExists = true;try {Path hdfsPath = new Path(path);isExists = fs.exists(hdfsPath);} catch (Exception e) {logger.error("[HDFS]判断文件是否存在失败", e);}return isExists;}public String getModificationTime(String path) throws IOException {String modifyTime = null;try {Path hdfsPath = new Path(path);FileStatus fileStatus = fs.getFileStatus(hdfsPath);long modifyTimestamp = fileStatus.getModificationTime();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");Date date = new Date(modifyTimestamp);modifyTime = simpleDateFormat.format(date);} catch(Exception e) {logger.error("[HDFS]获取最近修改时间失败", e);}return modifyTime;}public long getPathSize(String path) throws IOException {long size = -1L;try {Path hdfsPath = new Path(path);size = fs.getContentSummary(hdfsPath).getLength();} catch (Exception e) {logger.error("[HDFS]获取路径大小失败", e);}return size;}}
config中也不需要传任何参数了
package com.xiaoju.dqa.jazz.hadoop.configuration;import com.xiaoju.dqa.jazz.hadoop.client.HadoopClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class HadoopConfig {@Bean(initMethod = "init", destroyMethod = "stop")public HadoopClient hadoopClient() {HadoopClient hadoopClient = new HadoopClient();return hadoopClient;} }