实训笔记7.18
- 7.18
- 一、座右铭
- 二、Hadoop大数据技术 大数据软件一般都要求7*24小时不宕机
- 三、Hadoop的组成
- 3.1 HDFS
- 3.2 MapReduce
- 3.3 YARN
- 3.4 Hadoop Common
- 四、Hadoop生态圈
- 五、Hadoop的安装问题
- 5.1 Hadoop的本地安装模式-基本不用
- 5.2 Hadoop的伪分布安装模式
- 5.3 Hadoop的完全分布式安装模式
- 5.4 Hadoop的HA高可用安装模式--借助zookeeper
- 六、Hadoop的第一个核心组成:HDFS
- 6.1 概念
- 6.2 组成
- 6.2.1 NameNode
- 6.2.2 DataNode
- 6.2.3 SecondaryNameNode
- 6.2.4 客户端:命令行/Java API
- 6.3 HDFS的基本操作
- 6.4 HDFS的原理性内容
- 6.4.1 HDFS的数据流工作原理
- 6.4.2 NameNode的第一个工作机制原理
- 6.4.3 NameNode的第二个工作机制原理
- 6.5 HDFS、YARN的新节点的服役和旧节点的退役
- 6.5.1 新节点服役操作
- 6.5.2 旧节点的退役操作(如果第一次增加退役文件,必须重启HDFS集群)
- 七、Hadoop的第二个核心组成:MapReduce框架概述
- 7.1 基本概念
- 7.2 MapReduce的分布式计算核心思想
- 7.3 MapReduce程序在运行过程中三个核心进程
- 7.4 如何编写MapReduce计算程序:(编程步骤)
- 7.5 MapReduce底层是由Java开发的,因此MR程序我们要编写的话支持使用Java代码来进行编写
- 八、MapReduce的案例实现--大数据分布式计算的经典案例WordCount(单词计数)
- 8.1 案例需求
- 8.2 案例分析(基于MapReduce)
- 8.3 代码开发
7.18
一、座右铭
我的故事你说,我的文字我落,我值几两你定,我去何方我挑。
二、Hadoop大数据技术 大数据软件一般都要求7*24小时不宕机
三、Hadoop的组成
3.1 HDFS
HDFS:解决海量数据的分布式存储的问题
3.2 MapReduce
MapReduce:解决海量数据的分布式计算的问题
3.3 YARN
YARN:解决分布式计算程序的资源分配以及任务监控问题 Mesos: 分布式资源管理系统(YARN的替代品)
3.4 Hadoop Common
四、Hadoop生态圈
flume、sqoop、hive、spark。。。。
五、Hadoop的安装问题
5.1 Hadoop的本地安装模式-基本不用
5.2 Hadoop的伪分布安装模式
5.3 Hadoop的完全分布式安装模式
5.4 Hadoop的HA高可用安装模式–借助zookeeper
5.2~5.4:需要进行配置文件的修改九个配置文件
六、Hadoop的第一个核心组成:HDFS
6.1 概念
分布式文件存储系统,所谓的分布式采用一些架构和一些组件将多台节点的存储资源当作一个整体进行海量资源的存储
6.2 组成
6.2.1 NameNode
- 存储元数据:整个集群中存储的目录和文件的索引
- 管理整个HDFS集群
- 接受客户端的请求
- 负责节点的故障转移
6.2.2 DataNode
- 存储数据(存储数据是以block块的形式进行数据的存放)
- 定时负责汇总整个节点上存储的block块的信息,然后给NN汇报
- 负责和客户端连接进行文件的读写操作
6.2.3 SecondaryNameNode
辅助namenode完成edits编辑日志和fsimage镜像文件的合并操作
6.2.4 客户端:命令行/Java API
-
负责和HDFS集群进行通信实现文件的增删改查
-
负责进行block块的分割
6.3 HDFS的基本操作
-
HDFS的命令行操作
hdfs dfs | hadoop fs
-
HDFS的Java API操作 Configuration FileSystem
- windows安装Hadoop的假环境
- 引入Hadoop的编程依赖
hadoop-client hadoop-hdfs
6.4 HDFS的原理性内容
6.4.1 HDFS的数据流工作原理
- HDFS上传数据的流程
数据上传的时候,会根据配置进行block块的备份,备份的时候,选择哪些节点进行数据备份? 机架感知原则进行备份
- HDFS下载数据的流程
客户端在和DN建立连接的时候,是和距离它最近的某一个DN建立连接怎么判断DN距离客户端的距离:网络拓扑原则客户端和HDFS的节点在同一个集群上
6.4.2 NameNode的第一个工作机制原理
NameNode的第二个工作机制原理:元数据管理的原理 (NN和SNN的联合工作机制)
-
元数据:指的是HDFS存储文件/文件夹的类似的目录结构,目录中记录着每一个文件的大小、时间、每一个文件的block块的份数,block块存储的节点列表信息… NameNode默认的元数据内存是1000M,可以存储管理百万个block块的元数据信息
-
edits编辑日志文件:记录客户端对HDFS集群的写入和修改操作
-
fsimage镜像文件:理解为HDFS元数据的持久点检查文件
2~3:再次启动HDFS之后恢复元数据的机制
-
HDFS的安全模式(safemode)
- HDFS启动之后会先进入安全模式,安全模式就是将edits和fsimage文件加载到nn内存的这一段时间,dn向NN注册的这一段时间
- 安全模式下无法操作HDFS集群的,安全模式会自动退出,NN的内存加载好了(元数据加载好了),同时HDFS集群还满足节点数的启动
-
SNN的作用就是对NN进行checkpoint操作
- checkpoint什么时候触发
- 检查点时间到了—1小时
dfs.namenode.checkpoint.period 3600s
- HDFS距离上一次检查点操作数到达100万次
dfs.namenode.checkpoint.txns 1000000
- 检查点时间到了—1小时
- SNN每隔1分钟会询问一次NN是否要进行checkponit操作
dfs.namenode.checkpoint.check.period 60s
- checkpoint什么时候触发
-
NameNode的出现故障之后,元数据的恢复方式
-
因为元数据的核心是edits和fsimage文件,同时snn工作的时候会把nn的文件复制到snn当中,因此如果NN的元数据丢失,我们可以从SNN中把这些文件再复制到NN的目录下 进行元数据的恢复(恢复可能会导致一部分的元数据丢失)
SNN的目录:
${hadoop.tmp.dir}/dfs/namesecondary/curren
nn的目录:
${hadoop.tmp.dir}/dfs/name/current
-
元数据还有一种恢复方式:配置HDFS的namenode的多目录保存(HDFS的编辑日志和镜像文件在多个目录下保存相同的备份) 方式只能使用在同一个节点上 问题:如果整个节点宕机,无法恢复了
dfs.namenode.name.dir
多个路径 最好把HDFS重新格式化一下,或者手动把目录创建一份 -
HA高可用模式
-
6.4.3 NameNode的第二个工作机制原理
NameNode的第二个工作机制原理:NameNode管理整个HDFS集群的原理 (NN和DN的联合工作机制)
-
DataNode上存储的block块除了数据本身以外,还包含数据的长度、数据校验和、时间戳…
-
数据校验和是为了保证block块的完整性和一致性的,校验和机制,创建block块的时候会根据数据本身计算一个校验和,以后每一次DN进行block汇总的时候会再进行一次校验和的计算,如果两次校验和不一致 认为block块损坏了。
-
DataNode和NameNode心跳,默认三秒心跳一次,默认值可以调整 dfs.heartbeat.interval 3s 关闭HDFS,但是不需要重新格式化 心跳的作用有两个:1、检测DN是否或者 2、把NN让DN做的事情告诉DN
-
NN如何知道DN掉线-死亡-宕机了(掉线的时限):NN如果在某一次心跳中没有收到DN的心跳,NN不会认为DN死亡了,而是会继续心跳,如果超过掉线的时限的时间还没有心跳成功,NN才会认为DN死亡了,然后启动备份恢复机制 掉线时限的时长是有一个计算公式:
timeout = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval
dfs.namenode.heartbeat.recheck-interval
心跳检测时间 5分钟dfs.heartbeat.interval
心跳时间 3s默认情况下,如果超过10分30S没有收到DN的心跳 认为DN死亡了
-
DataNode每隔一段时间(默认6小时)会向NameNode汇报一次节点上所有的block块的信息
dfs.blockreport.intervalMsec 21600000ms
每隔6小时向NN汇报一次DN的block块的信息dfs.datanode.directoryscan.interval 21600s
每隔6小时DN自己扫描一下DN上的block块信息
6.5 HDFS、YARN的新节点的服役和旧节点的退役
概念:HDFS是一个分布式文件存储系统,HDFS身为一个大数据软件,基本上都是7*24小时不停机的,那如果HDFS集群的容量不够用了,那么我们需要增加一个新的数据节点,因为HDFS不能停止,因此我们需要在HDFS集群运行过程中动态的增加一个数据节点(新节点的服役操作);旧节点的退役。
6.5.1 新节点服役操作
服役新节点之前,需要创建一台新的虚拟节点,并且配置Java、Hadoop环境、SSH免密登录、ip、主机映射、主机名
- 在Hadoop的配置文件目录创建一个dfs.hosts文件,文件中声明Hadoop集群的从节点的主机名
- 在Hadoop的hdfs-site.xml文件中,增加一个配置项
dfs.hosts 值:文件的路径
- 在HDFS开启的状态下刷新从节点的信息
hdfs dfsadmin -refreshNodes yarn rmadmin -refreshNodes
- 只需要在新节点启动datanode和nodemanager即可成功实现节点的服役
6.5.2 旧节点的退役操作(如果第一次增加退役文件,必须重启HDFS集群)
-
在Hadoop的配置目录创建一个文件dfs.hosts.exclude,文件中编写退役的主机名
-
在Hadoop的hdfs-site.xml配置文件中声明退役的节点文件 dfs.hosts.exclude 值 文件的路径
-
同时需要在服役节点文件中把退役节点删除了dfs.hosts
-
刷新节点信息状态
hdfs dfsadmin -refreshNodes yarn rmadmin -refreshNodes
**【注意】**退役的时候,会先把退役节点的block块复制到没有退役的节点上,然后才会下线,而且退役的时候,必须保证退役之后剩余集群的节点数大于等于副本数
6.5.1~6.5.2在namenode所在节点的hadoop中配置
七、Hadoop的第二个核心组成:MapReduce框架概述
7.1 基本概念
- Hadoop解决了大数据面临的两个核心问题:海量数据的存储问题、海量数据的计算问题
- 其中MapReduce就是专门设计用来解决海量数据计算问题的,同时MapReduce和HDFS不一样的地方在于,虽然两者均为分布式组件,但是HDFS是一个完善的软件,我们只需要使用即可,不需要去进行任何的逻辑的编辑。而MapReduce进行数据计算,计算什么样的数据,使用什么样的逻辑,MR程序都不清楚,因此MR只是一个分布式的计算【框架】,所谓的框架就是MR程序把分布式计算的思想和逻辑全部封装好了,我们只需要安装框架的思维编写计算代码(就是我们自己处理数据的逻辑代码),编写完成之后,我们的程序必然是分布式的程序。
- 使用分布式计算框架的好处就在于我们开发人员只需要把关注点和重点放在业务的逻辑开发,而非分布式计算程序逻辑的逻辑。
7.2 MapReduce的分布式计算核心思想
- MR框架实现分布式计算的逻辑是将MR程序分成了两部分:Map阶段、Reduce阶段
- 其中运行一个计算程序先执行Map阶段,map阶段又可以同时运行多个计算程序(MapTask)去计算map阶段的逻辑,Map阶段主要负责分数据,而且map阶段的多个MapTask并行运行互不干扰。
- 第二阶段Reduce阶段,Reduce阶段也可以同时运行多个计算程序(ReduceTask),Reduce阶段的任务主要负责合数据,同时多个ReduceTask同时运行互不干扰的。
- 任何一个MR程序,只能有一个Map阶段,一个Reduce阶段
7.3 MapReduce程序在运行过程中三个核心进程
- MRAppMaster一个:负责整合分布式程序的监控
- MapTask(多个):Map阶段的核心进程,每一个MapTask处理数据源的一部分数据
- ReduceTask(多个):Reduce阶段的核心进程,每一个ReduceTask负责处理Map阶段输出的一部分数据
7.4 如何编写MapReduce计算程序:(编程步骤)
- 编写MapTask的计算逻辑
- 编写一个Java类继承Mapper类,继承Mapper类之后必须指定四个泛型,四个泛型分别代表了MapTask阶段的输入的数据和输出的数据类型 MR程序要求输入的数据和输出的数据类型必须都得是key-value键值对类型的数据
- 重写继承的Mapper类当中的map方法,map方法处理数据的时候是文件中的一行数据调用一次map方法,map方法的计算逻辑就是MapTask的核心计算逻辑
- 同时map方法中数据计算完成,需要把数据以指定的key-value格式类型输出。
- 编写ReduceTask的计算逻辑
- 编写一个Java类继承Reducer类,继承Reducer类之后必须指定四个泛型,四个泛型分别代表了Reduce阶段的输入和输出的KV数据类型 Reduce的输入的KV类型就是Map阶段的输出的KV类型 Reduce的输出类型自定义的
- 重写Reducer类当中提供的reduce方法,reduce方法处理数据的时候一组相同的key调用一次reduce方法,reduce方法的计算逻辑就是ReduceTask的核心计算逻辑
- 调用reduce方法,reduce逻辑处理完成,需要把数据以指定的key-value格式类型输出
- 编写Driver驱动程序
- Driver驱动程序是用来组装MR程序,组装MR程序的处理的文件路径、MR程序的Map阶段的计算逻辑、MR程序的Reduce阶段的计算逻辑、MR程序运行完成之后的结果的输出路径
- Driver驱动程序本质上就是一个main函数
7.5 MapReduce底层是由Java开发的,因此MR程序我们要编写的话支持使用Java代码来进行编写
八、MapReduce的案例实现–大数据分布式计算的经典案例WordCount(单词计数)
8.1 案例需求
现在有一个文件,文件很大,文件中存储的每一行数据都是由空格分割的多个单词组成的,现在需要通过大数据分布式计算技术去统计文件中每一个单词出现的总次数
8.2 案例分析(基于MapReduce)
8.3 代码开发
- 创建一个maven管理的Java项目
- 引入MR的编程依赖
hadoop-client
- ``hadoop-hdfs`
- 编写Mapper阶段的计算逻辑
- 编写Reducer阶段的计算逻辑
- 编写Driver驱动程序
- 代码示例:
package com.sxuek.wc;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** Driver驱动程序说白了就是封装MR程序的* Driver驱动程序其实就是一个main函数*/
public class WCDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1、准备一个配置文件对象ConfigurationConfiguration configuration = new Configuration();//指定HDFS的地址configuration.set("fs.defaultFS","hdfs://192.168.68.101:9000");//2、创建封装MR程序使用一个Job对象Job job = Job.getInstance(configuration);//3、封装处理的文件路径hdfs://single:9000/wc.txtFileInputFormat.setInputPaths(job,new Path("/wc.txt"));//4、封装MR程序的Mapper阶段 还要封装Mapper阶段输出的key-value类型job.setMapperClass(WCMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);//5、封装MR程序的reduce阶段 还需要封装reduce的输出kv类型job.setReducerClass(WCReducer.class);job.setOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setNumReduceTasks(1);//指定reduce阶段只有一个ReduceTask//6、封装MR程序的输出路径---输出路径一定不能存在 如果存在会报错FileOutputFormat.setOutputPath(job,new Path("/wcoutput"));//7、提交运行MR程序boolean flag = job.waitForCompletion(true);System.exit(flag?0:1);}
}
package com.sxuek.wc;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 单词技术的MapTask的计算逻辑* 1、继承Mapper类,同时需要指定四个泛型 两两一组 分别代表输入的key value 和输出的key value的数据类型* 默认情况下,map阶段读取文件数据是以每一行的偏移量为key 整数类型 每一行的数据为value读取的 字符串类型* LongWritable Text* map阶段输出以单词为key 字符串 以1为value输出 整数* 数据类型不能使用Java中数据类型,数据类型必须是Hadoop的一种序列化类型* Int---hadoop.io.IntWritable* Long-hadoop.io.LongWritable* String--hadoop.io.Text* 2、重写map方法***/
public class WCMapper extends Mapper<LongWritable, Text,Text,LongWritable> {/*** map方法是MapTask的核心计算逻辑方法* map方法是切片中一行数据调用一次* @param key 这一行数据的偏移量* @param value 这一行数据* @param context 上下文对象 用于输出map阶段处理完成的keyvalue数据* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {//拿到一行数据 并且将一行数据转成字符串类型String line = value.toString();//字符串以空格切割得到一个数组,数组中存放的就是一行的多个单词String[] words = line.split(" ");//遍历数组 得到每一个单词 以单词为key 以1为value输出数据即可for (String word : words) {context.write(new Text(word),new LongWritable(1L));}}
}
package com.sxuek.wc;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** Reducer的编程逻辑:* 1、继承Reducer类,指定输入和输出的kv类型* 输入KV就是Map阶段的输出KV Text LongWritable* 输出kv Text LongWritable* 2、重写reduce方法*/
public class WCReducer extends Reducer<Text, LongWritable,Text,LongWritable> {/*** Reduce方法是Reduce阶段的核心计算逻辑* reduce方法是一组相同的key执行一次* @param key 一组相同的key 某一个单词* @param values 是一个集合,集合存放的就是这一个单词的所有的value值* @param context 上下文对象 用于reduce阶段输出数据* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {//只需要将某一个单词聚合起来的value数据累加起来 得到总次数long sum = 0L;for (LongWritable value : values) {sum += value.get();}//只需要以单词为key 以sum为value输出即可context.write(key,new LongWritable(sum));}
}