Hadoop学习之HDFS
1 HDFS相关概念
1.1 设计思路
分散存储,冗余备份。
分散存储:大文件被切割成小文件,使用分而治之的思想让多个服务器对同一个文件进行联合管理;
冗余备份:每个小文件做冗余备份,并且分散存到不同的服务器,做到高可靠不丢失。
1.2 架构
主从架构
(1)namenode(nn主节点): 存储元数据(目录结构,文件详情,文件块列表,块所在的节点列表)请求和响应,接收datanode汇报(节点状态,块状况),向datanode发送命令(负载均衡)等;
(2)datanode(dn从节点): 存储具体的block块,校验文件,汇报状况,接收命令等;
(3)secondarynamenode(2nn): 辅助作用。
1.3 特点
(1)HDFS 中的文件在物理上是分块存储(block),块的大小可以通过配置参数(dfs.blocksize) 来规定,默认大小在 hadoop3.x 版本中是 256M,hadoop2.x 版本中是 128M,老版本中是 64M
(2)HDFS 文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件
(3)目录结构及文件分块位置信息(元数据)的管理由 namenode 节点承担,负责维护整个 hdfs 文件系统的目录树,以及每一个路径(文 件)所对应的 block 块信息(block 的 id,及所在的 datanode 服务器)
(4)文件的各个 block 的存储管理由 datanode 节点承担,每一个 block 都可以在多个 datanode 上存储多个副本(参数设置 dfs.replication,默认是 3)
(5)HDFS 是设计成适应一次写入,多次读出的场景,且不支持文件的修改
1.4 优缺点
优点:可构建在廉价机器上,高容错性(多个副本,丢失自动恢复),适合批处理(移动计算而非数据,数据位置暴露给计算框架),适合大数据处理,流式文件访问(一次性写入,多次读取,保证数据一致性)
缺点:不适合低延时操作(延时大),不适合频繁修改(网络开销大) ,不适合处理小文件(小文件寻道时间超过读取时间,元数据是在namenode的内存,一个数据块的元数据大小大约是150byte,存储 1 亿个 block,大约需要 20GB 内存,如果一个文件大小为 10K,则 1 亿个文件大小仅为 1TB(但要消耗掉 NameNode 20GB 内存)
1.5 作用
主要用来解决海量数据的存 储问题
2 核心设计思想
2.1 心跳机制
Hadoop 是 Master/Slave 结构,Master 有 NameNode 和 ResourceManager,Slave 有 Datanode 和 NodeManager。master 启动的时候会启动一个 ipc server 服务等待Slave连接;slave 启动时,会主动链接 master 的 ipc server 服务 。master和slave之间通过ipc服务通信,通信有固定时间周期(默认3秒),称之为心跳。
NameNode 通过心跳得知 Datanode 的状态 ResourceManager 通过心跳得知 NodeManager 的状态。master 长时间都没有收到 slave 的心跳,就认为该 slave 挂掉了。默认的超时时间为:
timeout = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval
hdfs-site.xml 配置
配置文件中的 heartbeat.recheck.interval 的单位为毫秒(默认5 min),dfs.heartbeat.interval 的单位为秒(默认3 sec)
检查时间:在master在接收不到slave的心跳时,此时会向slave主动发送检查 <property> <name>heartbeat.recheck.interval</name> <value>5000</value> </property>
心跳报告周期 <property> <name>dfs.heartbeat.interval</name> <value>3 </value> </property> |
2.2 安全模式
safemode是namenode的一种状态(有三种状态,分别为:active,standby,safemode),集群中的文件不能被操作(自我保护)。
进入safemode的状况:namenode发现集群中块的丢失率达到0.1%时(丢失率通过dfs.namenode.safemode.threshold-pc 配置,默认等于0.999f ),namenode就会进入安全模式,客户端不能对任何数据进行操作,只能查看元数据信息。
退出safemode:①找到问题所在,进行修复,如修复宕机的打他弄得;②手动强行退出,没有解决问题,可能再次出现数据丢失
为什么冷启动集群的时候会自动进入安全模式,又自动退出安全模式?因为block块所在的datanode信息(文件路径、副本数、blockid,及每一个 block 所在 datanode 的信息)存在内存中, fsimage 中,不包含 block 所在的 datanode 信息。在冷启动namenode的时候,内存中的元数据是从 fsimage中加载的,所以就没有datanode信息,namenode就认为所有的block丢失,进入安全模式。datanode启动后会定期向namenode汇报自己block块信息,namenode 就会将内存元数据中的 block 所 在 datanode 信息补全更新,就有了datanode信息,从而自动退出安全模式
相关命令:
hdfs dfsadmin -safemode get | 获取当前safemode的状态 |
hdfs dfsadmin -safemode enter | 进入safemode状态 |
hdfs dfsadmin -safemode leave | 强制退出safemode |
hdfs dfsadmin -safemode wait | /等待,一直到安全模式结束 |
2.3 副本存放策略
作用:分散冗余存储,保证可靠性和高效性。
副本存放原则:将每个文件的数据进行分块存储,每一个数据块又保存有多个副本,这些数据块副本分 布在不同的机器节点上,需要考虑高可靠、负载均衡、带宽。
副本存放方法:①尽可能存放在本地节点 :第一个 block 副本放在和 client 所在的 node 里,如果client不在集群中则随机抽取一个node;②存放在不同的机架的节点 :第二个副本放在与第一个节点不同机架的node中,随机选择;③存放在和第二个副本同机架的不同节点 :第三个副本和第二个副本存放在同一个机架的不同node中。
修改副本数:
①修改hdfs-site.xml
<property> <name>>dfs.replication</name> <value>1</value> </property> |
②命令设置:hdfs dfs -setrep 2 文件
2.4 负载均衡
机器与机器之间磁盘利用率不平衡是 HDFS 集群非常容易出现的情况,尤其是在 DataNode 节点出现故障或在现有的集群上增添新的DataNode 的时候。
如何实现负载均衡:
①start-balancer.sh或start-balancer.sh -threshold 5(开启自动进行均衡)
②自动进行均衡非常慢,一天能移动的数据量在 10G-10T 的级别,很难满足超大集群的需求。因为HDFS 集群默认不允许 balance 操作占用很大的网络带宽,这个带宽是可以调整的 hdfs dfsadmin -setBalanacerBandwidth 10485760 该数值的单位是字节,这个配置是 10M/s,默认是 1M/s。这个配置也可以在hdfs-site.xml中配置
<property> <name>>>dfs.balance.bandwidthPerSec</name> <value>>10485760</value> </property> |
③stat-balancer.sh -t 10%: 负载最高的节点和最低节点之间的数据差距比例不超过10%
3 HDFS 的 shell(命令行客户端)操作
支持的命令及参数如下
[-appendToFile <localsrc> ... <dst>] [-cat [-ignoreCrc] <src> ...] [-checksum <src> ...] [-chgrp [-R] GROUP PATH...] [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>] [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-count [-q] [-h] <path> ...] [-cp [-f] [-p | -p[topax]] <src> ... <dst>] [-createSnapshot <snapshotDir> [<snapshotName>]] [-deleteSnapshot <snapshotDir> <snapshotName>] [-df [-h] [<path> ...]] [-du [-s] [-h] <path> ...] [-expunge] [-find <path> ... <expression> ...] [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-getfacl [-R] <path>] [-getfattr [-R] {-n name | -d} [-e en] <path>] [-getmerge [-nl] <src> <localdst>] [-help [cmd ...]] [-ls [-d] [-h] [-R] [<path> ...]] [-mkdir [-p] <path> ...] [-moveFromLocal <localsrc> ... <dst>] [-moveToLocal <src> <localdst>] [-mv <src> ... <dst>] [-put [-f] [-p] [-l] <localsrc> ... <dst>] [-renameSnapshot <snapshotDir> <oldName> <newName>] [-rm [-f] [-r|-R] [-skipTrash] <src> ...] [-rmdir [--ignore-fail-on-non-empty] <dir> ...] [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]] [-setfattr {-n name [-v value] | -x name} <path>] [-setrep [-R] [-w] <rep> <path> ...] [-stat [format] <path> ...] [-tail [-f] <file>] [-test -[defsz] <path>] [-text [-ignoreCrc] <src> ...] [-touchz <path> ...] [-truncate [-w] <length> <path> ...] [-usage [cmd ...]] |
常用命令如下:
命令:
hadoop fs :运行fs
hdfs dfs:运行fs的命令
hdfs dfsadmin -report: 报告整个集群的状态
hdfs getconf -confKey key:获取hdfs集群的配置信息
-help | 输出这个命令参数手册 |
-ls | 显示目录信息 |
-mkdir | 在 hdfs 上创建目录 |
-put | 等同于 copyFromLocal,进行文件上传 |
-get | 等同于 copyToLocal,就是从 hdfs 下载文件到本地 |
-copyFromLocal | 从本地文件系统中拷贝文件到 hdfs 文件系统去 |
-copyToLocal | 从 hdfs 拷贝到本地 |
-cp | 复制(hdfs文件系统) |
-mv | 移动(hdfs文件系统) |
-moveFromLocal | 从本地剪切到 hdfs |
-moveToLocal | 从 hdfs 剪切到本地 |
-getMerge | 合并下载 |
-appendToFile | 追加内容 |
-cat | 查看文件内容 |
-rm | 删除 |
-rmdir | 删除空目录 |
-text | 以字符形式展示文件内容 |
-setrep | 设置副本的数量 |
4 Java API操作HDFS
4.1 环境搭建
(1)下载并复制插件hadoop-eclipse-plugin.jar --> eclipse/plugins
(2)解压windows 平台编译的hadoop放在非中文路径下
(3)eclipse-->preferences-->hadoop map/reduce 选择上一步解压的hadoop路径配置
(4)windows-->show view -->mapreduce,双击选中
出现如下对话框
(5)点击上一步对象框右上角的小象图标,修改相应的信息
(6)完成后,在项目资源管理器下可以看到如下图标
4.2 Java API操作
首先要导入相关的jar包。
(1)Configuration
代表client与文件系统连接的配置对象
默认配置:
①configration对象中默认的配置
②common.jar/hdfs.jar/map.jar/yarn.jar等jar包下的core-default.xml,hdfs-deafult.xml,mapred-default.xml,
yarn-default.xml
③自定义一些xml文件,放置在classpath下,core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml
自定义配置:
①如果是xml文件,conf.addResource("core-aaa.xml");
② 使用conf方法,conf.set("fs.defaultFS", "hdfs://hdp01:9000");
(2)FileSystem
Configuration conf = new Configuration() FileSystem fs = FileSystem.get(conf) |
要获得一个客户端实例对象,get 方法是从从 conf 中的一个参数 fs.defaultFS 的配置值判断具体实例化哪种客户端类。如果没有指定fs.defaultFS,并且工程 classpath 下也没有给定相应的配置,conf 中的默认值就来自于 hadoop 的 jar 包中的 core-default.xml,默认值为: file:///,在代码中设置了conf.set("fs.defaultFS", "hdfs://bigdata01:9000")这个代码获得的才是hdfs的客户端对象
建立与hdfs的连接还可以如下操作
(3)相关api操作
package com.refuel;import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;public class HDFSDemo {FileSystem fs = null;@Beforepublic void before() throws Exception {URI uri = new URI("hdfs://bigdata01:9000");Configuration conf = new Configuration();fs = FileSystem.get(uri, conf, "refuel");}// 创建文件夹@Testpublic void mkdir() throws IOException {Path dir = new Path("/mkdir");boolean bool = fs.mkdirs(dir);System.out.println(bool);}// 上传@Testpublic void copyFromLocal() throws IOException {// 本地文件Path src = new Path("e:/copyFromLocal.txt");// hdfs文件系统Path dst = new Path("/copyFromLocal.txt");fs.copyFromLocalFile(src, dst);System.out.println("上传成功!");}// 下载@Testpublic void copytoLocal() throws IOException {// hdfs文件系统Path src = new Path("/copytoLocal.txt");// 本地文件系统Path dst = new Path("e:/copytoLocal.txt");fs.copyToLocalFile(false, src, dst, true);}// 流式数据访问@Testpublic void get() throws IllegalArgumentException, IOException {// 获取hdfs文件系统的输入流FSDataInputStream in = fs.open(new Path("/get.txt"));// 获取本地文件系统输出流FileOutputStream out = new FileOutputStream("e://get.txt");// 使用工具类读写IOUtils.copyBytes(in, out, 4096);System.out.println("下载成功!");in.close();out.close();}// 下载jdk-8u73-linux-x64.tar.gz第二块文件@Testpublic void getBlock() throws Exception {// 1.获取文件的属性Path f = new Path("/jdk-8u73-linux-x64.tar.gz");FileStatus fileStatus = fs.getFileStatus(f);// 块信息BlockLocation[] blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());// 2.判断是否存在第二块if (blkLocations.length < 2) {throw new RuntimeException("该文件不存在第二块");}// 3.1获取第二块的offset的三种方法如下// 1.获取文件的blocksize: fileStatus.getBlockSize()// 2.获取第一块长度: blkLocations[0].getLength()// 3.直接获取第二块的offset,blkLocation[1].getoffset()long offset = blkLocations[1].getOffset();// 3.2 获取第二块的长度long length = blkLocations[1].getLength();// 4.构建输入输出流,指定offset读取lengthFSDataInputStream in = fs.open(f);FileOutputStream out = new FileOutputStream("e:/jdk");// 指定offsetin.seek(offset);// 指定长度,注意length是long类型的IOUtils.copyBytes(in, out, length, true);System.out.println("下载块成功!");}// 遍历文件@Testpublic void listFile() throws Exception {// 实现递归操作,返回迭代器RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path("/"), true);while (iterator.hasNext()) {// FileStatus的子类,包含文件的详细信息,包含属性blockLocations(文件的块信息)LocatedFileStatus status = iterator.next();System.out.println("path=" + status.getPath());System.out.println("owner=" + status.getOwner());System.out.println("len=" + status.getLen());System.out.println("rep=" + status.getReplication());// 获取该文件的块详情BlockLocation[] blockLocations = status.getBlockLocations();System.out.println("块的个数:" + blockLocations.length);for (int i = 0; i < blockLocations.length; i++) {System.out.println("names=" + Arrays.toString(blockLocations[i].getNames()));System.out.println("hosts=" + Arrays.toString(blockLocations[i].getHosts()));System.out.println("offset=" + blockLocations[i].getOffset());System.out.println("length=" + blockLocations[i].getLength());}System.out.println("----------------------------");}}@Afterpublic void after() throws IOException {fs.close();}}