第1章 Hadoop概述
1.1 Hadoop是什么
1.2 Hadoop发展历史(了解)
1.3 Hadoop三大发行版本(了解)
Hadoop三大发行版本:Apache、Cloudera、Hortonworks。
Apache版本最原始(最基础)的版本,对于入门学习最好。2006
Cloudera内部集成了很多大数据框架,对应产品CDH。2008
Hortonworks文档较好,对应产品HDP。2011
Hortonworks现在已经被Cloudera公司收购,推出新的品牌CDP。
1)Apache Hadoop
官网地址:http://hadoop.apache.org
下载地址:https://hadoop.apache.org/releases.html
2)Cloudera Hadoop
官网地址:https://www.cloudera.com/downloads/cdh
下载地址:https://docs.cloudera.com/documentation/enterprise/6/release-notes/topics/rg_cdh_6_download.html
(1)2008年成立的Cloudera是最早将Hadoop商用的公司,为合作伙伴提供Hadoop的商用解决方案,主要是包括支持、咨询服务、培训。
(2)2009年Hadoop的创始人Doug Cutting也加盟Cloudera公司。Cloudera产品主要为CDH,Cloudera Manager,Cloudera Support
(3)CDH是Cloudera的Hadoop发行版,完全开源,比Apache Hadoop在兼容性,安全性,稳定性上有所增强。Cloudera的标价为每年每个节点10000美元。
(4)Cloudera Manager是集群的软件分发及管理监控平台,可以在几个小时内部署好一个Hadoop集群,并对集群的节点及服务进行实时监控。
3)Hortonworks Hadoop
官网地址:https://hortonworks.com/products/data-center/hdp/
下载地址:https://hortonworks.com/downloads/#data-platform
(1)2011年成立的Hortonworks是雅虎与硅谷风投公司Benchmark Capital合资组建。
(2)公司成立之初就吸纳了大约25名至30名专门研究Hadoop的雅虎工程师,上述工程师均在2005年开始协助雅虎开发Hadoop,贡献了Hadoop80%的代码。
(3)Hortonworks的主打产品是Hortonworks Data Platform(HDP),也同样是100%开源的产品,HDP除常见的项目外还包括了Ambari,一款开源的安装和管理系统。
(4)2018年Hortonworks目前已经被Cloudera公司收购。
1.4 Hadoop优势(4高)
1.5 Hadoop组成(面试重点)
hadoop1.x 2.x 3.x 的区别
1.5.1 HDFS架构概述
Hadoop Distributed File System,简称HDFS,是一个分布式文件系统。
1.5.2 YARN架构概述
Yet Another Resource Negotiator简称YARN ,另一种资源协调者,是Hadoop的资源管理器。
1.5.3 MapReduce架构概述
MapReduce将计算过程分为两个阶段:Map和Reduce
- 1)Map阶段并行处理输入数据
- 2)Reduce阶段对Map结果进行汇总
1.5.4 HDFS、YARN、MapReduce三者关系
1.6 大数据技术生态体系
图中涉及的技术名词解释如下:
1)Sqoop:Sqoop是一款开源的工具,主要用于在Hadoop、Hive与传统的数据库(MySQL)间进行数据的传递,可以将一个关系型数据库(例如 :MySQL,Oracle 等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
2)Flume:Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;
3)Kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统;
4)Spark:Spark是当前最流行的开源大数据内存计算框架。可以基于Hadoop上存储的大数据进行计算。
5)Flink:Flink是当前最流行的开源大数据内存计算框架。用于实时计算的场景较多。
6)Oozie:Oozie是一个管理Hadoop作业(job)的工作流程调度管理系统。
7)Hbase:HBase是一个分布式的、面向列的开源数据库。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。
8)Hive:Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
9)ZooKeeper:它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。
1.7 推荐系统框架图
第2章 Hadoop运行环境搭建(开发重点)
2.1 模板虚拟机环境准备
0)安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G
Linux下安装JDK,Mysql,Ngnix-CSDN博客
1)hadoop100虚拟机配置要求如下(本文Linux系统全部以CentOS-7.5-x86-1804为例)
(1)使用yum安装需要虚拟机可以正常上网,yum安装前可以先测试下虚拟机联网情况
(2)安装epel-release
注:Extra Packages for Enterprise Linux是为“红帽系”的操作系统提供额外的软件包,适用于RHEL、CentOS和Scientific Linux。相当于是一个软件仓库,大多数rpm包在官方 repository 中是找不到的)
yum install -y epel-release
杀死3030进程
(3)注意:如果Linux安装的是最小系统版,还需要安装如下工具;如果安装的是Linux桌面标准版,不需要执行如下操作
- net-tool:工具包集合,包含ifconfig等命令
yum install -y net-tools
- vim:编辑器
yum install -y vim
2)关闭防火墙,关闭防火墙开机自启
systemctl stop firewalld
systemctl disable firewalld.service
注意:在企业开发时,通常单个服务器的防火墙时关闭的。公司整体对外会设置非常安全的防火墙
3)创建atguigu用户,并修改atguigu用户的密码
useradd atguigu
passwd atguigu
4)配置atguigu用户具有root权限,方便后期加sudo执行root权限的命令
vim /etc/sudoers
修改/etc/sudoers文件,在%wheel这行下面添加一行,如下所示:
atguigu ALL=(ALL) NOPASSWD:ALL
注意:atguigu这一行不要直接放到root行下面,因为所有用户都属于wheel组,你先配置了atguigu具有免密功能,但是程序执行到%wheel行时,该功能又被覆盖回需要密码。所以atguigu要放到%wheel这行下面。
5)在/opt目录下创建文件夹,并修改所属主和所属组
(1)在/opt目录下创建module、software文件夹
mkdir /opt/module
mkdir /opt/software
(2)修改module、software文件夹的所有者和所属组均为atguigu用户
chown atguigu:atguigu /opt/module
chown atguigu:atguigu /opt/software
(3)查看module、software文件夹的所有者和所属组
cd /opt/
ll
6)卸载虚拟机自带的JDK
注意:如果你的虚拟机是最小化安装不需要执行这一步。
rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps
- rpm -qa:查询所安装的所有rpm软件包
- grep -i:忽略大小写
- xargs -n1:表示每次只传递一个参数
- rpm -e –nodeps:强制卸载软件
7)重启虚拟机
reboot
2.2 克隆虚拟机
1)利用模板机hadoop100,克隆三台虚拟机:hadoop102 hadoop103 hadoop104
注意:克隆时,要先关闭hadoop100
2)修改克隆机IP,以下以hadoop102举例说明
(1)修改克隆虚拟机的静态IP
vim /etc/sysconfig/network-scripts/ifcfg-ens33
改成
IPADDR=192.168.10.102
(2)查看Linux虚拟机的虚拟网络编辑器,编辑->虚拟网络编辑器->VMnet8
(3)查看Windows系统适配器VMware Network Adapter VMnet8的IP地址
(4)保证Linux系统ifcfg-ens33文件中IP地址、虚拟网络编辑器地址和Windows系统VM8网络IP地址相同。
3)修改克隆机主机名,以下以hadoop102举例说明
(1)修改主机名称
vim /etc/hostname
hadoop102
(2)配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts
vim /etc/hosts
添加如下内容
192.168.10.100 hadoop100192.168.10.101 hadoop101192.168.10.102 hadoop102192.168.10.103 hadoop103192.168.10.104 hadoop104192.168.10.105 hadoop105192.168.10.106 hadoop106192.168.10.107 hadoop107192.168.10.108 hadoop108
4)重启克隆机hadoop102
reboot
5)修改windows的主机映射文件(hosts文件)
(1)如果操作系统是window7,可以直接修改
(a)进入C:\Windows\System32\drivers\etc路径
(b)打开hosts文件并添加如下内容,然后保存
192.168.10.100 hadoop100192.168.10.101 hadoop101192.168.10.102 hadoop102192.168.10.103 hadoop103192.168.10.104 hadoop104192.168.10.105 hadoop105192.168.10.106 hadoop106192.168.10.107 hadoop107192.168.10.108 hadoop108
(2)如果操作系统是window10,先拷贝出来,修改保存以后,再覆盖即可
(a)进入C:\Windows\System32\drivers\etc路径
(b)拷贝hosts文件到桌面
(c)打开桌面hosts文件并添加如下内容
192.168.10.100 hadoop100192.168.10.101 hadoop101192.168.10.102 hadoop102192.168.10.103 hadoop103192.168.10.104 hadoop104192.168.10.105 hadoop105192.168.10.106 hadoop106192.168.10.107 hadoop107192.168.10.108 hadoop108
(d)将桌面hosts文件覆盖C:\Windows\System32\drivers\etc路径hosts文件
2.3 在hadoop102安装JDK
1)卸载现有JDK
注意:安装JDK前,一定确保提前删除了虚拟机自带的JDK。详细步骤见问文档3.1节中卸载JDK步骤。
2)用XShell传输工具将JDK导入到opt目录下面的software文件夹下面
3)在Linux系统下的opt目录中查看软件包是否导入成功
ls /opt/software/
看到如下结果:
jdk-8u212-linux-x64.tar.gz
4)解压JDK到/opt/module目录下
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
5)配置JDK环境变量
(1)新建/etc/profile.d/my_env.sh文件
vim /etc/profile
vim /etc/profile.d/my_env.sh
添加如下内容
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin
(2)保存后退出
(3)source一下/etc/profile文件,让新的环境变量PATH生效
source /etc/profile
6)测试JDK是否安装成功
java -version
如果能看到以下结果,则代表Java安装成功。
java version "1.8.0_212"
注意:重启(如果java -version可以用就不用重启)
reboot
2.4 在hadoop102安装Hadoop
Hadoop下载地址:https://archive.apache.org/dist/hadoop/common/hadoop-3.1.3/
1)用XShell文件传输工具将hadoop-3.1.3.tar.gz导入到opt目录下面的software文件夹下面
2)进入到Hadoop安装包路径下
cd /opt/software/
3)解压安装文件到/opt/module下面
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
4)查看是否解压成功
ls /opt/module/
hadoop-3.1.3
5)将Hadoop添加到环境变量
(1)获取Hadoop安装路径
/opt/module/hadoop-3.1.3
(2)打开/etc/profile.d/my_env.sh文件
vim /etc/profile.d/my_env.sh
- 在my_env.sh文件末尾添加如下内容:(shift+g)
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
- 保存并退出: :wq
(3)让修改后的文件生效
source /etc/profile
6)测试是否安装成功
hadoop version
Hadoop 3.1.3
7)重启(如果Hadoop命令不能用再重启虚拟机)
reboot
2.5 Hadoop目录结构
1)查看Hadoop目录结构
2)重要目录
(1)bin目录:存放对Hadoop相关服务(hdfs,yarn,mapred)进行操作的脚本
(2)etc目录:Hadoop的配置文件目录,存放Hadoop的配置文件
(3)lib目录:存放Hadoop的本地库(对数据进行压缩解压缩功能)
(4)sbin目录:存放启动或停止Hadoop相关服务的脚本
(5)share目录:存放Hadoop的依赖jar包、文档、和官方案例
第3章 Hadoop运行模式
1)Hadoop官方网站:Apache Hadoop
2)Hadoop运行模式包括:本地模式、伪分布式模式以及完全分布式模式。
- 本地模式:单机运行,只是用来演示一下官方案例。生产环境不用。
- 伪分布式模式:也是单机运行,但是具备Hadoop集群的所有功能,一台服务器模拟一个分布式的环境。个别缺钱的公司用来测试,生产环境不用。
- 完全分布式模式:多台服务器组成分布式环境。生产环境使用。
3.1 本地运行模式(官方Word Count)
1)创建在hadoop-3.1.3文件下面创建一个wcinput文件夹
mkdir wcinput
2)在wcinput文件下创建一个word.txt文件
cd wcinput
3)编辑word.txt文件
vim word.txt
- 在文件中输入如下内容
Hello hadoopdoujiang
- 保存退出::wq
4)回到Hadoop目录/opt/module/hadoop-3.1.3
5)执行程序
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount wcinput/ ./wcoutput
6)查看结果
cat part-r-00000
看到如下结果:
3.2 完全分布式运行模式(开发重点)
分析:
1)准备3台客户机(关闭防火墙、静态IP、主机名称)
2)安装JDK
3)配置环境变量
4)安装Hadoop
5)配置环境变量
6)配置集群
7)单点启动
8)配置ssh
9)群起并测试集群
3.2.1 虚拟机准备
详见2.1、2.2两节。
3.2.2 编写集群分发脚本xsync
1)scp(secure copy)安全拷贝
(1)scp定义
scp可以实现服务器与服务器之间的数据拷贝。(from server1 to server2)
(2)基本语法
scp -r $pdir/$fname $user@$host:$pdir/$fname
命令 递归 要拷贝的文件路径/名称 目的地用户@主机:目的地路径/名称
(3)案例实操
- 前提:在hadoop102、hadoop103、hadoop104都已经创建好的/opt/module、 /opt/software两个目录
(a)在hadoop102上,将hadoop102中/opt/module/jdk1.8.0_212目录拷贝到hadoop103上。
scp -r jdk1.8.0_212/ hadoop103:/opt/module/
(b)在hadoop103上,将hadoop102中/opt/module/hadoop-3.1.3目录拷贝到hadoop103上。
scp -r @hadoop102:/opt/module/hadoop-3.1.3 /opt/module/
(c)在hadoop103上操作,将hadoop102中/opt/module目录下所有目录拷贝到hadoop104上。
scp -r @hadoop102:/opt/module/* @hadoop104:/opt/module/
2)rsync远程同步工具
rsync主要用于备份和镜像。具有速度快、避免复制相同内容和支持符号链接的优点。
rsync和scp区别:用rsync做文件的复制要比scp的速度快,rsync只对差异文件做更新。scp是把所有文件都复制过去。
(1)基本语法
rsync -av $pdir/$fname $user@$host:$pdir/$fname
命令 选项参数 要拷贝的文件路径/名称 目的地用户@主机:目的地路径/名称
选项参数说明
选项 | 功能 |
-a | 归档拷贝 |
-v | 显示复制过程 |
(2)案例实操
(a)删除hadoop103中/opt/module/hadoop-3.1.3/wcinput
rm -rf wcinput/
(b)同步hadoop102中的/opt/module/hadoop-3.1.3到hadoop103
rsync -av hadoop-3.1.3/ @hadoop103:/opt/module/hadoop-3.1.3/
3)xsync集群分发脚本
(1)需求:循环复制文件到所有节点的相同目录下
(2)需求分析:
(a)rsync命令原始拷贝:
rsync -av /opt/module atguigu@hadoop103:/opt/
(b)期望脚本:
xsync要同步的文件名称
(c)期望脚本在任何路径都能使用(脚本放在声明了全局环境变量的路径)
$echo $PATH
(3)脚本实现
(a)在/root/bin目录下创建xsync文件
cd /root/
mkdir bin
cd bin
vim xsync
在该文件中编写如下代码
#!/bin/bash#1. 判断参数个数
if [ $# -lt 1 ]
thenecho Not Enough Arguement!Exit;
fi#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
doecho ==================== $host ====================#3. 遍历所有目录,挨个发送for file in $@do#4. 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done
(b)修改脚本 xsync 具有执行权限
chmod 777 xsync
chmod +x xsync
(c)测试脚本
xsync bin/
(d)将脚本复制到/bin中,以便全局调用
(e)同步环境变量配置(root所有者)
xsync /etc/profile.d/my_env.sh
注意:如果用了sudo,那么xsync一定要给它的路径补全。
sudo ./bin/xsync /etc/profile.d/my_env.sh
让环境变量生效hadoop103,hadoop104
source /etc/profile
3.2.3 SSH无密登录配置
1)配置ssh
(1)基本语法
ssh另一台电脑的IP地址
(2)ssh连接时出现Host key verification failed的解决方法
[atguigu@hadoop102 ~]$ ssh hadoop103
- 如果出现如下内容
Are you sure you want to continue connecting (yes/no)?
- 输入yes,并回车
(3)退回到hadoop102
[atguigu@hadoop103 ~]$ exit
2)无密钥配置
(1)免密登录原理
(2)生成公钥和私钥
[root@hadoop102 .ssh]$
ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
(3)将公钥拷贝到要免密登录的目标机器上
[root@hadoop102 .ssh]$ ssh-copy-id hadoop102
[root@hadoop102 .ssh]$ ssh-copy-id hadoop103
[root@hadoop102 .ssh]$ ssh-copy-id hadoop104
注意:
还需要在hadoop103上配置一下无密登录到hadoop102、hadoop103、hadoop104服务器上。
还需要在hadoop104上配置一下无密登录到hadoop102、hadoop103、hadoop104服务器上。
还需要在hadoop102上配置一下无密登录到hadoop102、hadoop103、hadoop104;
3).ssh文件夹下(~/.ssh)的文件功能解释
known_hosts | 记录ssh访问过计算机的公钥(public key) |
id_rsa | 生成的私钥 |
id_rsa.pub | 生成的公钥 |
authorized_keys | 存放授权过的无密登录服务器公钥 |
3.2.4 集群配置
1)集群部署规划
注意:
- NameNode和SecondaryNameNode不要安装在同一台服务器
- ResourceManager也很消耗内存,不要和NameNode、SecondaryNameNode配置在同一台机器上。
hadoop102 | hadoop103 | hadoop104 | |
HDFS | NameNode DataNode | DataNode | SecondaryNameNode DataNode |
YARN | NodeManager | ResourceManager NodeManager | NodeManager |
2)配置文件说明
Hadoop配置文件分两类:默认配置文件和自定义配置文件,只有用户想修改某一默认配置值时,才需要修改自定义配置文件,更改相应属性值。
(1)默认配置文件:
要获取的默认文件 | 文件存放在Hadoop的jar包中的位置 |
[core-default.xml] | hadoop-common-3.1.3.jar/core-default.xml |
[hdfs-default.xml] | hadoop-hdfs-3.1.3.jar/hdfs-default.xml |
[yarn-default.xml] | hadoop-yarn-common-3.1.3.jar/yarn-default.xml |
[mapred-default.xml] | hadoop-mapreduce-client-core-3.1.3.jar/mapred-default.xml |
(2)自定义配置文件:
core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml四个配置文件存放在$HADOOP_HOME/etc/hadoop这个路径上,用户可以根据项目需求重新进行修改配置。
3)配置集群
(1)核心配置文件
配置core-site.xml
[atguigu@hadoop102 ~]$ cd $HADOOP_HOME/etc/hadoop
[atguigu@hadoop102 hadoop]$
vim core-site.xml
文件内容如下:
<!-- 指定NameNode的地址 --><property><name>fs.defaultFS</name><value>hdfs://hadoop102:8020</value></property><!-- 指定hadoop数据的存储目录 --><property><name>hadoop.tmp.dir</name><value>/opt/module/hadoop-3.1.3/data</value></property><!-- 配置HDFS网页登录使用的静态用户为atguigu --><property><name>hadoop.http.staticuser.user</name><value>atguigu</value></property>
(2)HDFS配置文件
配置hdfs-site.xml
[atguigu@hadoop102 hadoop]$ vim hdfs-site.xml
文件内容如下:
<!-- nn web端访问地址--><property><name>dfs.namenode.http-address</name><value>hadoop102:9870</value></property><!-- 2nn web端访问地址--><property><name>dfs.namenode.secondary.http-address</name><value>hadoop104:9868</value></property>
(3)YARN配置文件
配置yarn-site.xml
[atguigu@hadoop102 hadoop]$ vim yarn-site.xml
文件内容如下:
<!-- 指定MR走shuffle --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 指定ResourceManager的地址--><property><name>yarn.resourcemanager.hostname</name><value>hadoop103</value></property><!-- 环境变量的继承 --><property><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value></property>
(4)MapReduce配置文件
配置mapred-site.xml
[atguigu@hadoop102 hadoop]$ vim mapred-site.xml
文件内容如下:
<!-- 指定MapReduce程序运行在Yarn上 --><property><name>mapreduce.framework.name</name><value>yarn</value></property>
4)在集群上分发配置好的Hadoop配置文件
[atguigu@hadoop102 hadoop]$
xsync /opt/module/hadoop-3.1.3/etc/hadoop/
5)去103和104上查看文件分发情况
[atguigu@hadoop103 ~]$
cat /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
[atguigu@hadoop104 ~]$
cat /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
3.2.5 群起集群
1)配置workers
[atguigu@hadoop102 hadoop]$
vim /opt/module/hadoop-3.1.3/etc/hadoop/workers
在该文件中增加如下内容:
注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。
同步所有节点配置文件
[atguigu@hadoop102 hadoop]$
xsync /opt/module/hadoop-3.1.3/etc
2)启动集群
(1)如果集群是第一次启动,需要在hadoop102节点格式化NameNode(注意:格式化NameNode,会产生新的集群id,导致NameNode和DataNode的集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化NameNode的话,一定要先停止namenode和datanode进程,并且要删除所有机器的data和logs目录,然后再进行格式化。)
[atguigu@hadoop102 hadoop-3.1.3]$
hdfs namenode -format
(2)启动HDFS
[atguigu@hadoop102 hadoop-3.1.3]$
sbin/start-dfs.sh
两种解决ERROR: Attempting to operate on hdfs namenode as root的方法_starting namenodes on [localhost] error: attemptin-CSDN博客
(3)在配置了ResourceManager的节点(hadoop103)启动YARN
[atguigu@hadoop103 hadoop-3.1.3]$
sbin/start-yarn.sh
hadoop | start-yarn.sh与stop-yarn.sh执行失败_starting resourcemanager error: attempting to oper-CSDN博客
使用root用户配置的hadoop集群启动报错:ERROR: Attempting to operate on hdfs namenode as root_[root@master ~]# start-dfs.sh starting namenodes o-CSDN博客
(4)Web端查看HDFS的NameNode
(a)浏览器中输入:http://hadoop102:9870http://hadoop102:9870
(b)查看HDFS上存储的数据信息
(5)Web端查看YARN的ResourceManager
(a)浏览器中输入:http://hadoop103:8088http://hadoop103:8088
(b)查看YARN上运行的Job信息
3)集群基本测试
(1)上传文件到集群
hadoop fs -mkdir /wcinput
- 上传小文件
[atguigu@hadoop102 ~]$
hadoop fs -mkdir /wcinput
[atguigu@hadoop102 ~]$
hadoop fs -put wcinput/word.txt /wcinput
- 上传大文件
[atguigu@hadoop102 ~]$
hadoop fs -put /opt/software/jdk-8u212-linux-x64.tar.gz /
(2)上传文件后查看文件存放在什么位置
- 查看HDFS文件存储路径
[atguigu@hadoop102 subdir0]$ pwd
/opt/module/hadoop-3.1.3/data/dfs/data/current/BP-1436128598-192.168.10.102-1610603650062/current/finalized/subdir0/subdir0
- 查看HDFS在磁盘存储文件内容
[atguigu@hadoop102 subdir0]$ cat blk_1073741825
(3)拼接
-rw-rw-r--. 1 atguigu atguigu 134217728 5月 23 16:01 blk_1073741836
-rw-rw-r--. 1 atguigu atguigu 1048583 5月 23 16:01 blk_1073741836_1012.meta
-rw-rw-r--. 1 atguigu atguigu 63439959 5月 23 16:01 blk_1073741837
-rw-rw-r--. 1 atguigu atguigu 495635 5月 23 16:01 blk_1073741837_1013.meta
[atguigu@hadoop102 subdir0]$ cat blk_1073741836>>tmp.tar.gz
[atguigu@hadoop102 subdir0]$ cat blk_1073741837>>tmp.tar.gz
[atguigu@hadoop102 subdir0]$ tar -zxvf tmp.tar.gz
(4)下载
[atguigu@hadoop104 software]$
hadoop fs -get /jdk-8u212-linux-x64.tar.gz ./
(5)执行wordcount程序
[atguigu@hadoop102 hadoop-3.1.3]$
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /wcinput /wcoutput
sbin /stop-dfs.sh
jps
删除每个集群data,logs
rm -rf data/ logs/
格式化节点
hadoop namenode -format
sbin/start-dfs.sh
jps
3.2.6 配置历史服务器
为了查看程序的历史运行情况,需要配置一下历史服务器。具体配置步骤如下:
1)配置mapred-site.xml
[atguigu@hadoop102 hadoop]$
vim mapred-site.xml
在该文件里面增加如下配置。
<!-- 历史服务器端地址 --><property><name>mapreduce.jobhistory.address</name><value>hadoop102:10020</value></property><!-- 历史服务器web端地址 --><property><name>mapreduce.jobhistory.webapp.address</name><value>hadoop102:19888</value></property>
2)分发配置
[atguigu@hadoop102 hadoop]$
xsync mapred-site.xml
3)在hadoop102启动历史服务器
关闭 ResourceManager
1 sbin/stop-yarn.sh
2 sbin/start-yarn.sh
[atguigu@hadoop102 hadoop]$
mapred --daemon start historyserver
4)查看历史服务器是否启动
[atguigu@hadoop102 hadoop]$ jps
5)查看JobHistory
http://hadoop102:19888/jobhistoryhttp://hadoop102:19888/jobhistory
3.2.7 配置日志的聚集
日志聚集概念:应用运行完成以后,将程序运行日志信息上传到HDFS系统上。
日志聚集功能好处:可以方便的查看到程序运行详情,方便开发调试。
注意:开启日志聚集功能,需要重新启动NodeManager 、ResourceManager和HistoryServer。
开启日志聚集功能具体步骤如下:
1)配置yarn-site.xml
[atguigu@hadoop102 hadoop]$
vim yarn-site.xml
在该文件里面增加如下配置。
<!-- 开启日志聚集功能 --><property><name>yarn.log-aggregation-enable</name><value>true</value></property><!-- 设置日志聚集服务器地址 --><property> <name>yarn.log.server.url</name> <value>http://hadoop102:19888/jobhistory/logs</value></property><!-- 设置日志保留时间为7天 --><property><name>yarn.log-aggregation.retain-seconds</name><value>604800</value></property>
2)分发配置
[atguigu@hadoop102 hadoop]$
xsync yarn-site.xml
3)关闭NodeManager 、ResourceManager和HistoryServer
[atguigu@hadoop103 hadoop-3.1.3]$
sbin/stop-yarn.sh
[atguigu@hadoop103 hadoop-3.1.3]$
mapred --daemon stop historyserver
4)启动NodeManager 、ResourceManage和HistoryServer
[atguigu@hadoop103 ~]$
sbin/start-yarn.sh
[atguigu@hadoop102 ~]$
mapred --daemon start historyserver
5)删除HDFS上已经存在的输出文件
[atguigu@hadoop102 ~]$
hadoop fs -rm -r /output
6)执行WordCount程序
[atguigu@hadoop102 hadoop-3.1.3]$
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output
7)查看日志
(1)历史服务器地址
http://hadoop102:19888/jobhistory
(2)历史任务列表
(3)查看任务运行日志
(4)运行日志详情
3.2.8 集群启动/停止方式总结
1)各个模块分开启动/停止(配置ssh是前提)常用
(1)整体启动/停止HDFS
start-dfs.sh/stop-dfs.sh
(2)整体启动/停止YARN
start-yarn.sh/stop-yarn.sh
2)各个服务组件逐一启动/停止
(1)分别启动/停止HDFS组件
hdfs --daemon start/stop namenode/datanode/secondarynamenode
(2)启动/停止YARN
yarn --daemon start/stop resourcemanager/nodemanager
3.2.9 编写Hadoop集群常用脚本
1)Hadoop集群启停脚本(包含HDFS,Yarn,Historyserver):myhadoop.sh
[atguigu@hadoop102 ~]$
cd /root/bin
[atguigu@hadoop102 bin]$
vim myhadoop.sh
- 输入如下内容
#!/bin/bashif [ $# -lt 1 ]
thenecho "No Args Input..."exit ;
ficase $1 in
"start")echo " =================== 启动 hadoop集群 ==================="echo " --------------- 启动 hdfs ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"echo " --------------- 启动 yarn ---------------"ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"echo " --------------- 启动 historyserver ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")echo " =================== 关闭 hadoop集群 ==================="echo " --------------- 关闭 historyserver ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"echo " --------------- 关闭 yarn ---------------"ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"echo " --------------- 关闭 hdfs ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)echo "Input Args Error..."
;;
esac
- 保存后退出,然后赋予脚本执行权限
[atguigu@hadoop102 bin]
chmod +x myhadoop.sh
2)查看三台服务器Java进程脚本:jpsall
[atguigu@hadoop102 ~]$
cd /root/bin
[atguigu@hadoop102 bin]$
vim jpsall
- 输入如下内容
#!/bin/bashfor host in hadoop102 hadoop103 hadoop104
doecho =============== $host ===============ssh $host jps
done
- 保存后退出,然后赋予脚本执行权限
[atguigu@hadoop102 bin]$
chmod +x jpsall
3)分发/root/bin目录,保证自定义脚本在三台机器上都可以使用
[atguigu@hadoop102 ~]$
xsync /root/bin/
jpsall
./jpsall
3.2.10 常用端口号说明
端口名称 | Hadoop2.x | Hadoop3.x |
NameNode内部通信端口 | 8020 / 9000 | 8020 / 9000/9820 |
NameNode HTTP UI | 50070 | 9870 |
MapReduce查看执行任务端口 | 8088 | 8088 |
历史服务器通信端口 | 19888 | 19888 |
3.2.11 集群时间同步
如果服务器在公网环境(能连接外网),可以不采用集群时间同步,因为服务器会定期和公网时间进行校准;
如果服务器在内网环境,必须要配置集群时间同步,否则时间久了,会产生时间偏差,导致集群执行任务时间不同步。
1)需求
找一个机器,作为时间服务器,所有的机器与这台集群时间进行定时的同步,生产环境根据任务对时间的准确程度要求周期同步。测试环境为了尽快看到效果,采用1分钟同步一次。
2)时间服务器配置(必须root用户)
(1)查看所有节点ntpd服务状态和开机自启动状态
[atguigu@hadoop102 ~]$ sudo systemctl status ntpd
[atguigu@hadoop102 ~]$ sudo systemctl start ntpd
[atguigu@hadoop102 ~]$ sudo systemctl is-enabled ntpd
(2)修改hadoop102的ntp.conf配置文件
[atguigu@hadoop102 ~]$ sudo vim /etc/ntp.conf
修改内容如下
(a)修改1(授权192.168.200.0-192.168.200.255网段上的所有机器可以从这台机器上查询和同步时间)
#restrict 192.168.0.0 mask 255.255.255.0 nomodify notrap
为restrict 192.168.200.0 mask 255.255.255.0 nomodify notrap
(b)修改2(集群在局域网中,不使用其他互联网上的时间)
server 0.centos.pool.ntp.org iburst
server 1.centos.pool.ntp.org iburst
server 2.centos.pool.ntp.org iburst
server 3.centos.pool.ntp.org iburst
为
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
(c)添加3(当该节点丢失网络连接,依然可以采用本地时间作为时间服务器为集群中的其他节点提供时间同步)
server 127.127.1.0
fudge 127.127.1.0 stratum 10
(3)修改hadoop102的/etc/sysconfig/ntpd 文件
[atguigu@hadoop102 ~]$ sudo vim /etc/sysconfig/ntpd
增加内容如下(让硬件时间与系统时间一起同步)
SYNC_HWCLOCK=yes
(4)重新启动ntpd服务
[atguigu@hadoop102 ~]$ sudo systemctl start ntpd
(5)设置ntpd服务开机启动
[atguigu@hadoop102 ~]$ sudo systemctl enable ntpd
3)其他机器配置(必须root用户)
(1)关闭所有节点上ntp服务和自启动
[atguigu@hadoop103 ~]$ sudo systemctl stop ntpd
[atguigu@hadoop103 ~]$ sudo systemctl disable ntpd
[atguigu@hadoop104 ~]$ sudo systemctl stop ntpd
[atguigu@hadoop104 ~]$ sudo systemctl disable ntpd
(2)在其他机器配置1分钟与时间服务器同步一次
[atguigu@hadoop103 ~]$ sudo crontab -e
编写定时任务如下:
*/1 * * * * /usr/sbin/ntpdate hadoop102
(3)修改任意机器时间
[atguigu@hadoop103 ~]$ sudo date -s "2021-9-11 11:11:11"
(4)1分钟后查看机器是否与时间服务器同步
[atguigu@hadoop103 ~]$ sudo date
第4章 常见错误及解决方案
1)防火墙没关闭、或者没有启动YARN
INFO client.RMProxy: Connecting to ResourceManager at hadoop108/192.168.10.108:8032
2)主机名称配置错误
3)IP地址配置错误
4)ssh没有配置好
5)root用户和atguigu两个用户启动集群不统一
6)配置文件修改不细心
7)不识别主机名称
java.net.UnknownHostException: hadoop102: hadoop102
at java.net.InetAddress.getLocalHost(InetAddress.java:1475)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:146)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
解决办法:
(1)在/etc/hosts文件中添加192.168.10.102 hadoop102
(2)主机名称不要起hadoop hadoop000等特殊名称
8)DataNode和NameNode进程同时只能工作一个。
删除data,logs
9)执行命令不生效,粘贴Word中命令时,遇到-和长–没区分开。导致命令失效
解决办法:尽量不要粘贴Word中代码。
10)jps发现进程已经没有,但是重新启动集群,提示进程已经开启。
原因是在Linux的根目录下/tmp目录中存在启动的进程临时文件,将集群相关进程删除掉,再重新启动集群。
11)jps不生效
原因:全局变量hadoop java没有生效。解决办法:需要source /etc/profile文件。
12)8088端口连接不上
[atguigu@hadoop102 桌面]$ cat /etc/hosts
注释掉如下代码
#127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
#::1 hadoop102
第1章 HDFS概述
1.1 HDFS产出背景及定义
1)HDFS产生背景
随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种。
2)HDFS定义
HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。
HDFS的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭之后就不需要改变。
1.2 HDFS优缺点
HDFS的优点:
HDFS缺点:
1.3 HDFS组成架构
1.4 HDFS文件块大小(面试重点)
如果它的硬盘传输速率为100兆每秒,它的大小就设为128兆,如果是固态硬盘200兆,每秒就选256兆!!!!!!
如果它的硬盘传输速率为100兆每秒,它的大小就设为128兆,如果是固态硬盘200兆,每秒就选256兆!!!!
如果它的硬盘传输速率为100兆每秒,它的大小就设为128兆,如果是固态硬盘200兆,每秒就选256兆!!!
第2章 HDFS的Shell操作(开发重点)
2.1 基本语法
hadoop fs 具体命令 OR hdfs dfs 具体命令
两个是完全相同的。
2.2 命令大全
[atguigu@hadoop102 hadoop-3.1.3]$ bin/hadoop fs[-appendToFile <localsrc> ... <dst>][-cat [-ignoreCrc] <src> ...][-chgrp [-R] GROUP PATH...][-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...][-chown [-R] [OWNER][:[GROUP]] PATH...][-copyFromLocal [-f] [-p] <localsrc> ... <dst>][-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>][-count [-q] <path> ...][-cp [-f] [-p] <src> ... <dst>][-df [-h] [<path> ...]][-du [-s] [-h] <path> ...][-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>][-getmerge [-nl] <src> <localdst>][-help [cmd ...]][-ls [-d] [-h] [-R] [<path> ...]][-mkdir [-p] <path> ...][-moveFromLocal <localsrc> ... <dst>][-moveToLocal <src> <localdst>][-mv <src> ... <dst>][-put [-f] [-p] <localsrc> ... <dst>][-rm [-f] [-r|-R] [-skipTrash] <src> ...][-rmdir [--ignore-fail-on-non-empty] <dir> ...]<acl_spec> <path>]][-setrep [-R] [-w] <rep> <path> ...][-stat [format] <path> ...][-tail [-f] <file>][-test -[defsz] <path>][-text [-ignoreCrc] <src> ...]
2.3 常用命令实操
2.3.1 准备工作
1)启动Hadoop集群(方便后续的测试)
[atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
2)-help:输出这个命令参数
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -help rm
3)创建/sanguo文件夹
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir /sanguo
2.3.2 上传
1)-moveFromLocal:从本地剪切粘贴到HDFS
[atguigu@hadoop102 hadoop-3.1.3]$ vim shuguo.txt
输入:
shuguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -moveFromLocal ./shuguo.txt /sanguo
2)-copyFromLocal:从本地文件系统中拷贝文件到HDFS路径去
[atguigu@hadoop102 hadoop-3.1.3]$ vim weiguo.txt
输入:
weiguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -copyFromLocal weiguo.txt /sanguo
3)-put:等同于copyFromLocal,生产环境更习惯用put
[atguigu@hadoop102 hadoop-3.1.3]$ vim wuguo.txt
输入:
wuguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -put ./wuguo.txt /sanguo
4)-appendToFile:追加一个文件到已经存在的文件末尾
[atguigu@hadoop102 hadoop-3.1.3]$ vim liubei.txt
输入:
liubei
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt
2.3.3 下载
1)-copyToLocal:从HDFS拷贝到本地
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -copyToLocal /sanguo/shuguo.txt ./
2)-get:等同于copyToLocal,生产环境更习惯用get
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -get /sanguo/shuguo.txt ./shuguo2.txt
2.3.4 HDFS直接操作
1)-ls: 显示目录信息
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -ls /sanguo
2)-cat:显示文件内容
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -cat /sanguo/shuguo.txt
3)-chgrp、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -chmod 666 /sanguo/shuguo.txt
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -chown atguigu:atguigu /sanguo/shuguo.txt
4)-mkdir:创建路径
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir /jinguo
5)-cp:从HDFS的一个路径拷贝到HDFS的另一个路径
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -cp /sanguo/shuguo.txt /jinguo
6)-mv:在HDFS目录中移动文件
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /sanguo/wuguo.txt /jinguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /sanguo/weiguo.txt /jinguo
7)-tail:显示一个文件的末尾1kb的数据
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -tail /jinguo/shuguo.txt
8)-rm:删除文件或文件夹
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -rm /sanguo/shuguo.txt
9)-rm -r:递归删除目录及目录里面内容
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -rm -r /sanguo
10)-du统计文件夹的大小信息
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -du -s -h /jinguo
27 81 /jinguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -du -h /jinguo
14 42 /jinguo/shuguo.txt
7 21 /jinguo/weiguo.txt
6 18 /jinguo/wuguo.tx
说明:27表示文件大小;81表示27*3个副本;/jinguo表示查看的目录
11)-setrep:设置HDFS中文件的副本数量
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -setrep 10 /jinguo/shuguo.txt
这里设置的副本数只是记录在NameNode的元数据中,是否真的会有这么多副本,还得看DataNode的数量。因为目前只有3台设备,最多也就3个副本,只有节点数的增加到10台时,副本数才能达到10。
第3章 HDFS的API操作
3.1 客户端环境准备
1)找到资料包路径下的Windows依赖文件夹,拷贝hadoop-3.1.0到非中文路径(比如d:\)。
2)配置HADOOP_HOME环境变量
3)配置Path环境变量。
注意:如果环境变量不起作用,可以重启电脑试试。
验证Hadoop环境变量是否正常。双击winutils.exe,如果报如下错误。说明缺少微软运行库(正版系统往往有这个问题)。再资料包里面有对应的微软运行库安装包双击安装即可。
4)在IDEA中创建一个Maven工程HdfsClientDemo,并导入相应的依赖坐标+日志添加
<dependencies><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency><dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency></dependencies>
在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
5)创建包名:com.atguigu.hdfs
6)创建HdfsClient类
package com.atguigu.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.junit.Test;/*** @Author:豆浆* @name :HdfsClient* @Date:2024/4/9 22:55*/
public class HdfsClient {@Testpublic void testmkdir() throws IOException, URISyntaxException, InterruptedException {//连接的集群nn的地址URI uri = new URI("hdfs://hadoop102:8020");//创建一个配置文件Configuration configuration = new Configuration();// 1 获取文件系统FileSystem fileSystem = FileSystem.get(uri, configuration,"root");//2.创建一个文件fileSystem.mkdirs(new Path("/xiyou/huanguoshan"));//3.关闭资源fileSystem.close();}
}
第二种:
package com.atguigu.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;import org.junit.After;
import org.junit.Before;
import org.junit.Test;/*** @Author:豆浆* @name :HdfsClient* @Date:2024/4/9 22:55*/
public class HdfsClient {private FileSystem fileSystem;@Beforepublic void init() throws URISyntaxException, IOException, InterruptedException {//连接的集群nn的地址URI uri = new URI("hdfs://hadoop102:8020");//创建一个配置文件Configuration configuration = new Configuration();//用户String user="root";// 1 获取文件系统fileSystem = FileSystem.get(uri, configuration,user);}@Afterpublic void close() throws IOException {//3.关闭资源fileSystem.close();}@Testpublic void testmkdir() throws IOException, URISyntaxException, InterruptedException {//2.创建一个文件fileSystem.mkdirs(new Path("/xiyou/huanguoshan1"));}
}
7)执行程序
客户端去操作HDFS时,是有一个用户身份的。默认情况下,HDFS客户端API会从采用Windows默认用户访问HDFS,会报权限异常错误。所以在访问HDFS时,一定要配置用户。
org.apache.hadoop.security.AccessControlException: Permission denied: user=56576, access=WRITE, inode="/xiyou/huaguoshan":atguigu:supergroup:drwxr-xr-x
3.2 HDFS的API案例实操
3.2.1 HDFS文件上传(测试参数优先级)
1)编写源代码
package com.atguigu.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;import org.junit.After;
import org.junit.Before;
import org.junit.Test;/*** @Author:豆浆* @name :HdfsClient* @Date:2024/4/9 22:55*/
public class HdfsClient {private FileSystem fileSystem;@Beforepublic void init() throws URISyntaxException, IOException, InterruptedException {//连接的集群nn的地址URI uri = new URI("hdfs://hadoop102:8020");//创建一个配置文件Configuration configuration = new Configuration();//用户String user="root";// 1 获取文件系统fileSystem = FileSystem.get(uri, configuration,user);}@Afterpublic void close() throws IOException {//3.关闭资源fileSystem.close();}@Testpublic void testmkdir() throws IOException, URISyntaxException, InterruptedException {//2.创建一个文件fileSystem.mkdirs(new Path("/xiyou/huanguoshan1"));}//上传@Testpublic void testCopyFromLocalFile() throws IOException {//参数:参数一:表示是否删除 参数二:表示是否被覆盖 参数三:原数据 参数四路径:目的地路径fileSystem.copyFromLocalFile(true,true,new Path("E:/hadoop01.txt"),new Path("/xiyou/hadoop01"));}}
2)将hdfs-site.xml拷贝到项目的resources资源目录下
/*
* 参数优先级:hdfs-default.xml => hdfs-site.xml
* */
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name>dfs.replication</name><value>1</value></property>
</configuration>
3)参数优先级
参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的自定义配置(xxx-site.xml) >(4)服务器的默认配置(xxx-default.xml)
3.2.2 HDFS文件下载
package com.atguigu.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;import org.junit.After;
import org.junit.Before;
import org.junit.Test;/*** @Author:豆浆* @name :HdfsClient* @Date:2024/4/9 22:55*/
public class HdfsClient {private FileSystem fileSystem;@Beforepublic void init() throws URISyntaxException, IOException, InterruptedException {//连接的集群nn的地址URI uri = new URI("hdfs://hadoop102:8020");//创建一个配置文件Configuration configuration = new Configuration();configuration.set("dfs.replication","2");//用户String user="root";// 1 获取文件系统fileSystem = FileSystem.get(uri, configuration,user);}@Afterpublic void close() throws IOException {//3.关闭资源fileSystem.close();}@Testpublic void testmkdir() throws IOException, URISyntaxException, InterruptedException {//2.创建一个文件fileSystem.mkdirs(new Path("/xiyou/huanguoshan1"));}//上传@Testpublic void testCopyFromLocalFile() throws IOException {//参数:参数一:表示是否删除 参数二:表示是否被覆盖 参数三:原数据 参数四路径:目的地路径fileSystem.copyFromLocalFile(false,true,new Path("E:/hadoop01.txt"),new Path("/xiyou/hadoop01"));}//下载@Testpublic void testCopyToFromLocalFile() throws IOException {/**boolean delSrc 指是否将原文件删除Path src 指要下载的文件路径Path dst 指将文件下载到的路径boolean useRawLocalFileSystem 是否开启文件校验* */fileSystem.copyToLocalFile(false,new Path("/xiyou/hadoop01"),new Path("E:/"),true);}
}
注意:如果执行上面代码,下载不了文件,有可能是你电脑的微软支持的运行库少,需要安装一下微软运行库。
3.2.3 HDFS文件更名和移动
//文件的更名和移动@Testpublic void testmv() throws IOException {//参数:参数一:表示原文件的路径 参数二:表示目标文件路径fileSystem.rename(new Path("/xiyou/huanguoshan"),new Path("/xiyou/hd"));}
3.2.4 HDFS删除文件和目录
//删除文件,目录,空目录(必true)@Testpublic void testdelect() throws IOException {//参数:参数一:表示删除的路径 参数二:表示是否递归删除fileSystem.delete(new Path("/xiyou/hadoop01"),false);}
3.2.5 HDFS文件详情查看
查看文件名称、权限、长度、块信息
//HDFS文件详情查看//查看文件名称、权限、长度、块信息@Testpublic void fileDetail() throws IOException {//获取所有文件信息RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/wcinput"), true);//遍历文件while (locatedFileStatusRemoteIterator.hasNext()){LocatedFileStatus next = locatedFileStatusRemoteIterator.next();//getSystem.out.println("文件路径:"+next.getPath());System.out.println("文件名:"+next.getPath().getName());System.out.println("各种get........");//获取块信息BlockLocation[] blockLocations = next.getBlockLocations();System.out.println(Arrays.toString(blockLocations));}}
3.2.6 HDFS文件和文件夹判断
//HDFS文件和文件夹判断@Testpublic void testisfile() throws IOException {FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));for (FileStatus fileStatus : fileStatuses) {if (fileStatus.isFile()){System.out.println("这是一个文件"+fileStatus.getPath().getName());}else {System.out.println("这是一个目录"+fileStatus.getPath().getName());}}}
第4章 HDFS的读写流程(面试重点)
4.1 HDFS写数据流程
4.1.1 剖析文件写入
(1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。
(2)NameNode返回是否可以上传。
(3)客户端请求第一个 Block上传到哪几个DataNode服务器上。
(4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。
(5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
(6)dn1、dn2、dn3逐级应答客户端。
(7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
(8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。
4.1.2 网络拓扑-节点距离计算
在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。那么这个最近距离怎么计算呢?
节点距离:两个节点到达最近的共同祖先的距离总和。
例如,假设有数据中心d1机架r1中的节点n1。该节点可以表示为/d1/r1/n1。利用这种标记,这里给出四种距离描述。
大家算一算每两个节点之间的距离。
4.1.3 机架感知(副本存储节点选择)
1)机架感知说明
(1)官方说明
Apache Hadoop 3.1.3 – HDFS Architecture
For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on the local machine if the writer is on a datanode, otherwise on a random datanode, another replica on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.
(2)源码说明
Crtl + n 查找BlockPlacementPolicyDefault,在该类中查找chooseTargetInOrder方法。
2)Hadoop3.1.3副本节点选择
4.2 HDFS串行读数据流程
(1)客户端通过DistributedFileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
(4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
第5章 NameNode和SecondaryNameNode
5.1 NN和2NN工作机制
思考:NameNode中的元数据是存储在哪里的?
首先,我们做个假设,如果存储在NameNode节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage。
这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。
但是,如果长时间添加数据到Edits中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。
1)第一阶段:NameNode启动
(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志,更新滚动日志。
(4)NameNode在内存中对元数据进行增删改。
2)第二阶段:Secondary NameNode工作
(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
(2)Secondary NameNode请求执行CheckPoint。
(3)NameNode滚动正在写的Edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重新命名成fsimage。
5.2 Fsimage和Edits解析
1)oiv查看Fsimage文件
(1)查看oiv和oev命令
[atguigu@hadoop102 current]$ hdfs
oiv apply the offline fsimage viewer to an fsimage
oev apply the offline edits viewer to an edits file
(2)基本语法
hdfs oiv -p 文件类型 -i镜像文件 -o 转换后文件输出路径
(3)案例实操
[atguigu@hadoop102 current]$ pwd
/opt/module/hadoop-3.1.3/data/dfs/name/current
[atguigu@hadoop102 current]$ hdfs oiv -p XML -i fsimage_0000000000000000025 -o /opt/module/hadoop-3.1.3/fsimage.xml
[atguigu@hadoop102 current]$ cat /opt/module/hadoop-3.1.3/fsimage.xml
将显示的xml文件内容拷贝到Idea中创建的xml文件中,并格式化。部分显示结果如下。
<inode><id>16386</id><type>DIRECTORY</type><name>user</name><mtime>1512722284477</mtime><permission>atguigu:supergroup:rwxr-xr-x</permission><nsquota>-1</nsquota><dsquota>-1</dsquota></inode><inode><id>16387</id><type>DIRECTORY</type><name>atguigu</name><mtime>1512790549080</mtime><permission>atguigu:supergroup:rwxr-xr-x</permission><nsquota>-1</nsquota><dsquota>-1</dsquota></inode><inode><id>16389</id><type>FILE</type><name>wc.input</name><replication>3</replication><mtime>1512722322219</mtime><atime>1512722321610</atime><perferredBlockSize>134217728</perferredBlockSize><permission>atguigu:supergroup:rw-r--r--</permission><blocks><block><id>1073741825</id><genstamp>1001</genstamp><numBytes>59</numBytes></block></blocks></inode >
思考:可以看出,Fsimage中没有记录块所对应DataNode,为什么?
在集群启动后,要求DataNode上报数据块信息,并间隔一段时间后再次上报。
2)oev查看Edits文件
(1)基本语法
hdfs oev -p 文件类型 -i编辑日志 -o 转换后文件输出路径
(2)案例实操
[atguigu@hadoop102 current]$ hdfs oev -p XML -i edits_0000000000000000012-0000000000000000013 -o /opt/module/hadoop-3.1.3/edits.xml
[atguigu@hadoop102 current]$ cat /opt/module/hadoop-3.1.3/edits.xml
将显示的xml文件内容拷贝到Idea中创建的xml文件中,并格式化。显示结果如下。
<?xml version="1.0" encoding="UTF-8"?><EDITS><EDITS_VERSION>-63</EDITS_VERSION><RECORD><OPCODE>OP_START_LOG_SEGMENT</OPCODE><DATA><TXID>129</TXID></DATA></RECORD><RECORD><OPCODE>OP_ADD</OPCODE><DATA><TXID>130</TXID><LENGTH>0</LENGTH><INODEID>16407</INODEID><PATH>/hello7.txt</PATH><REPLICATION>2</REPLICATION><MTIME>1512943607866</MTIME><ATIME>1512943607866</ATIME><BLOCKSIZE>134217728</BLOCKSIZE><CLIENT_NAME>DFSClient_NONMAPREDUCE_-1544295051_1</CLIENT_NAME><CLIENT_MACHINE>192.168.10.102</CLIENT_MACHINE><OVERWRITE>true</OVERWRITE><PERMISSION_STATUS><USERNAME>atguigu</USERNAME><GROUPNAME>supergroup</GROUPNAME><MODE>420</MODE></PERMISSION_STATUS><RPC_CLIENTID>908eafd4-9aec-4288-96f1-e8011d181561</RPC_CLIENTID><RPC_CALLID>0</RPC_CALLID></DATA></RECORD><RECORD><OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE><DATA><TXID>131</TXID><BLOCK_ID>1073741839</BLOCK_ID></DATA></RECORD><RECORD><OPCODE>OP_SET_GENSTAMP_V2</OPCODE><DATA><TXID>132</TXID><GENSTAMPV2>1016</GENSTAMPV2></DATA></RECORD><RECORD><OPCODE>OP_ADD_BLOCK</OPCODE><DATA><TXID>133</TXID><PATH>/hello7.txt</PATH><BLOCK><BLOCK_ID>1073741839</BLOCK_ID><NUM_BYTES>0</NUM_BYTES><GENSTAMP>1016</GENSTAMP></BLOCK><RPC_CLIENTID></RPC_CLIENTID><RPC_CALLID>-2</RPC_CALLID></DATA></RECORD><RECORD><OPCODE>OP_CLOSE</OPCODE><DATA><TXID>134</TXID><LENGTH>0</LENGTH><INODEID>0</INODEID><PATH>/hello7.txt</PATH><REPLICATION>2</REPLICATION><MTIME>1512943608761</MTIME><ATIME>1512943607866</ATIME><BLOCKSIZE>134217728</BLOCKSIZE><CLIENT_NAME></CLIENT_NAME><CLIENT_MACHINE></CLIENT_MACHINE><OVERWRITE>false</OVERWRITE><BLOCK><BLOCK_ID>1073741839</BLOCK_ID><NUM_BYTES>25</NUM_BYTES><GENSTAMP>1016</GENSTAMP></BLOCK><PERMISSION_STATUS><USERNAME>atguigu</USERNAME><GROUPNAME>supergroup</GROUPNAME><MODE>420</MODE></PERMISSION_STATUS></DATA></RECORD></EDITS >
思考:NameNode如何确定下次开机启动的时候合并哪些Edits?
5.3 CheckPoint时间设置
<property><name>dfs.namenode.checkpoint.period</name><value>3600s</value></property>
1)通常情况下,SecondaryNameNode每隔一小时执行一次。
[hdfs-default.xml]
2)一分钟检查一次操作次数,当操作次数达到1百万时,SecondaryNameNode执行一次。
[hdfs-default.xml]
<property><name>dfs.namenode.checkpoint.txns</name><value>1000000</value><description>操作动作次数</description></property><property><name>dfs.namenode.checkpoint.check.period</name><value>60s</value><description> 1分钟检查一次操作次数</description></property>
第6章 DataNode
6.1 DataNode工作机制
cd /opt/module/hadoop-3.1.3/data/dfs/data/current/BP-15275544-192.168.200.102-1712634566449/current/finalized/subdir0/subdir0
(1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
(2)DataNode启动后向NameNode注册,通过后,周期性(6小时)的向NameNode上报所有的块信息。
DN向NN汇报当前解读信息的时间间隔,默认6小时;
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>21600000</value>
<description>Determines block reporting interval in milliseconds.</description>
</property>
DN扫描自己节点块信息列表的时间,默认6小时
<property>
<name>dfs.datanode.directoryscan.interval</name>
<value>21600s</value>
<description>Interval in seconds for Datanode to scan data directories and reconcile the difference between blocks in memory and on the disk.
Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description>
</property>
(3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟+30秒没有收到某个DataNode的心跳,则认为该节点不可用。
(4)集群运行中可以安全加入和退出一些机器。
6.2 数据完整性
思考:如果电脑磁盘里面存储的数据是控制高铁信号灯的红灯信号(1)和绿灯信号(0),但是存储该数据的磁盘坏了,一直显示是绿灯,是否很危险?同理DataNode节点上的数据损坏了,却没有发现,是否也很危险,那么如何解决呢?
如下是DataNode节点保证数据完整性的方法。
(1)当DataNode读取Block的时候,它会计算CheckSum。
(2)如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏。
(3)Client读取其他DataNode上的Block。
(4)常见的校验算法crc(32),md5(128),sha1(160)
(5)DataNode在其文件创建后周期验证CheckSum。
6.3 掉线时限参数设置
需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
<value>300000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>3</value>
</property>
单节点启动:
第1章 MapReduce概述
1.1 MapReduce定义
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1.2 MapReduce优缺点
1.2.1 优点
1)MapReduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
4)适合PB级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
1.2.2 缺点
1)不擅长实时计算
MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
2)不擅长流式计算
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
3)不擅长DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
1.3 MapReduce核心思想
(1)分布式的运算程序往往需要分成至少2个阶段。
(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
总结:分析WordCount数据流走向深入理解MapReduce核心思想。
1.4 MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程。
1.5 官方WordCount源码
采用反编译工具反编译源码,发现WordCount案例有Map类、Reduce类和驱动类。且数据的类型是Hadoop自身封装的序列化类型。
1.6 常用数据序列化类型
Java类型 | Hadoop Writable类型 |
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWritable |
1.7 MapReduce编程规范
用户编写的程序分成三个部分:Mapper、Reducer和Driver。
1.8 WordCount案例实操
1.8.1 本地测试
1)需求
在给定的文本文件中统计输出每一个单词出现的总次数
(1)输入数据
(2)期望输出数据
atguigu 2
banzhang 1
cls 2
hadoop 1
jiao 1
ss 2
xue 1
2)需求分析
按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。
3)环境准备
(1)创建maven工程,MapReduceDemo
(2)在pom.xml文件中添加如下依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(3)创建包名:com.atguigu.mapreduce.wordcount
4)编写程序
(1)编写Mapper类
package com.atguigu.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
(2)编写Reducer类
package com.atguigu.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
context.write(key,v);
}
}
(3)编写Driver驱动类
package com.atguigu.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
5)本地测试
(1)需要首先配置好HADOOP_HOME变量以及Windows运行依赖
(2)在IDEA/Eclipse上运行程序
1.8.2 提交到集群测试
集群上测试
(1)用maven打jar包,需要添加的打包插件依赖
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
注意:如果工程上显示红叉。在项目上右键->maven->Reimport刷新即可。
(2)将程序打成jar包
(3)修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群的/opt/module/hadoop-3.1.3路径。
(4)启动Hadoop集群
[atguigu@hadoop102 hadoop-3.1.3]sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
(5)执行WordCount程序
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop jar wc.jar
com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/input /user/atguigu/output
第2章 Hadoop序列化
2.1 序列化概述
1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
3)为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
4)Hadoop序列化特点:
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互
2.2 自定义bean对象实现序列化接口(Writable)
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() {
super();
}
(3)重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
(4)重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
(6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
2.3 序列化案例实操
1)需求
统计每一个手机号耗费的总上行流量、总下行流量、总流量
(1)输入数据
(2)输入数据格式:
7 13560436666 120.196.100.99 1116 954 200 id 手机号码 网络ip 上行流量 下行流量 网络状态码 |
(3)期望输出数据格式
13560436666 1116 954 2070 手机号码 上行流量 下行流量 总流量 |
2)需求分析
3)编写MapReduce程序
(1)编写流量统计的Bean对象
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//1 继承Writable接口
public class FlowBean implements Writable {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
//2 提供无参构造
public FlowBean() {
}
//3 提供三个参数的getter和setter方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//4 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//5 重写ToString
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
(2)编写Mapper类
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 获取一行数据,转成字符串
String line = value.toString();
//2 切割数据
String[] split = line.split("\t");
//3 抓取我们需要的数据:手机号,上行流量,下行流量
String phone = split[1];
String up = split[split.length - 3];
String down = split[split.length - 2];
//4 封装outK outV
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
//5 写出outK outV
context.write(outK, outV);
}
}
(3)编写Reducer类
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long totalUp = 0;
long totalDown = 0;
//1 遍历values,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
totalUp += flowBean.getUpFlow();
totalDown += flowBean.getDownFlow();
}
//2 封装outKV
outV.setUpFlow(totalUp);
outV.setDownFlow(totalDown);
outV.setSumFlow();
//3 写出outK outV
context.write(key,outV);
}
}
(4)编写Driver驱动类
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联本Driver类
job.setJarByClass(FlowDriver.class);
//3 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6 设置程序的输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));
FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
第3章 MapReduce框架原理
已停