Hadoop学习:深入解析MapReduce的大数据魔力(三)

Hadoop学习:深入解析MapReduce的大数据魔力(三)

  • 3.5 MapReduce 内核源码解析
    • 3.5.1 MapTask 工作机制
    • 3.5.2 ReduceTask 工作机制
    • 3.5.3 ReduceTask 并行度决定机制
  • 3.6 数据清洗(ETL)
    • 1)需求
    • 2)需求分析
    • 3)实现代码
  • 3.7 MapReduce 开发总结
    • 1)输入数据接口:InputFormat
    • 2)逻辑处理接口:Mapper
    • 3)Partitioner 分区
    • 4)Comparable 排序
    • 5)Combiner 合并
    • 6)逻辑处理接口:Reducer
    • 7)输出数据接口:OutputFormat

3.5 MapReduce 内核源码解析

3.5.1 MapTask 工作机制

在这里插入图片描述
(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。

(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。

(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition 进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

(5)Merge 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。 当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件output/file.out 中,同时生成相应的索引文件output/file.out.index。 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

3.5.2 ReduceTask 工作机制

在这里插入图片描述
(1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Sort 阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一
起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(3)Reduce 阶段:reduce()函数将计算结果写到HDFS上。

3.5.3 ReduceTask 并行度决定机制

回顾:MapTask并行度由切片个数决定,切片个数由输入文件和切片规则决定。

思考:ReduceTask并行度由谁决定?

1)设置ReduceTask并行度(个数)
ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

2)实验:测试ReduceTask多少合适
(1)实验环境:1个Master节点,16个Slave节点:CPU:8GHZ,内存: 2G
(2)实验结论:
在这里插入图片描述
3)注意事项
(1)ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。
(2)ReduceTask默认值就是1,所以输出文件个数为一个。
(3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
(4)ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。
(5)具体多少个ReduceTask,需要根据集群性能而定。
(6)如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。

3.6 数据清洗(ETL)

“ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取
(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库

在运行核心业务MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。==清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。 ==

1)需求

去除日志中字段个数小于等于11的日志。
(1)输入数据
在这里插入图片描述
(2)期望输出数据
每行字段长度都大于11。

2)需求分析

需要在Map阶段对输入的数据根据规则进行过滤清洗。

3)实现代码

(1)编写WebLogMapper类

package com.atguigu.mapreduce.weblog; 
import java.io.IOException; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; public class WebLogMapper extends Mapper<LongWritable, Text, Text, 
NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException { // 1 获取1行数据 String line = value.toString(); // 2 解析日志 boolean result = parseLog(line,context); // 3 日志不合法退出 if (!result) { return; } // 4 日志合法就直接写出 context.write(value, NullWritable.get()); } // 2 封装解析日志的方法 private boolean parseLog(String line, Context context) { // 1 截取 String[] fields = line.split(" "); // 2 日志长度大于11的为合法 if (fields.length > 11) { return true; }else { return false; } } 
} 

(2)编写WebLogDriver类

package com.atguigu.mapreduce.weblog; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
public class WebLogDriver { 
public static void main(String[] args) throws Exception { 
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置 
args = new String[] { "D:/input/inputlog", "D:/output1" }; 
// 1 获取job信息 
Configuration conf = new Configuration(); 
Job job = Job.getInstance(conf); 
// 2 加载jar包 
job.setJarByClass(LogDriver.class); 
// 3 关联map 
job.setMapperClass(WebLogMapper.class); 
// 4 设置最终输出类型 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(NullWritable.class); 
// 设置reducetask个数为0 
job.setNumReduceTasks(0); 
// 5 设置输入和输出路径 
FileInputFormat.setInputPaths(job, new Path(args[0])); 
FileOutputFormat.setOutputPath(job, new Path(args[1])); 
// 6 提交 
boolean b = job.waitForCompletion(true); 
System.exit(b ? 0 : 1); 
} 
} 

3.7 MapReduce 开发总结

1)输入数据接口:InputFormat

(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat 的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
(3)CombineTextInputFormat 可以把多个小文件合并成一个切片处理,提高处理效率。

2)逻辑处理接口:Mapper

用户根据业务需求实现其中三个方法:map() setup() cleanup ()

3)Partitioner 分区

(1)有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。

4)Comparable 排序

(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable 接口,重写其中的compareTo()方法。

(2)部分排序:对最终输出的每一个文件进行内部排序。

(3)全排序:对所有数据进行排序,通常只有一个Reduce。

(4)二次排序:排序的条件有两个。

5)Combiner 合并

Combiner 合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的
业务处理结果。

6)逻辑处理接口:Reducer

用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()

7)输出数据接口:OutputFormat

(1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件
输出一行。
(2)用户还可以自定义OutputFormat。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/42674.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java数据库连接池原理及spring boot使用数据库连接池(HikariCP、Druid)

和线程池类似&#xff0c;数据库连接池的作用是建立一些和数据库的连接供需要连接数据库的业务使用&#xff0c;避免了每次和数据库建立、销毁连接的性能消耗&#xff0c;通过设置连接池参数可以防止建立连接过多导致服务宕机等&#xff0c;以下介绍Java中主要使用的几种数据库…

【学习FreeRTOS】第11章——FreeRTOS中任务相关的其他API函数

1.函数总览 序号函数描述1uxTaskPriorityGet()获取任务优先级2vTaskPrioritySet()设置任务优先级3uxTaskGetNumberOfTasks()获取系统中任务的数量4uxTaskGetSystemState()获取所有任务的状态信息5vTaskGetInfo()获取单个任务的状态信息6xTaskGetCurrentTaskHandle()获取当前任…

Excel自动化办公——Openpyxl的基本使用

Excel自动化办公——Openpyxl的基本使用 个人感觉&#xff0c;相比Pandas&#xff0c;openpyxl对Excel的操作更为细致&#xff0c;Pandas则更适用于统计计算&#xff1b; 01 基本环境02 Excel数据读取操作03 案例04 向Excel写入数据05 表数据定向修改06 单元格样式制定07 单元…

原型对象的简单了解

在前面学习java有一个概念叫做继承&#xff0c;方便我们对父类方法、变量等的调用。对前端的学习我们需要让对象可以访问和继承其他对象的属性和方法&#xff0c;就需要了解原型对象&#xff0c;以及原型链。 一、原型 构造函数通过原型分配的函数是所有对象所共享的。每一个构…

回归预测 | MATLAB实现SOM-BP自组织映射结合BP神经网络多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现SOM-BP自组织映射结合BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现SOM-BP自组织映射结合BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本介绍…

【css动画】向下的动态箭头

前言 使用css实现一组向下的动态箭头效果&#xff0c;如下图 思路 1.使用svg画箭头 2.设置keyframes&#xff0c;主要是每个箭头加不同的延时。 代码 <div class"down-arrow"><svg id"more-arrows"><polygonclass"arrow-top&quo…

Spark第三课

1.分区规则 1.分区规则 shuffle 1.打乱顺序 2.重新组合 1.分区的规则 默认与MapReduce的规则一致,都是按照哈希值取余进行分配. 一个分区可以多个组,一个组的数据必须一个分区 2. 分组的分区导致数据倾斜怎么解决? 扩容 让分区变多修改分区规则 3.HashMap扩容为什么必须…

[JavaWeb]【七】web后端开发-MYSQL

前言&#xff1a;MySQL是一种流行的关系型数据库管理系统,它的作用是存储和管理数据。在Web开发中,MySQL是必备的数据库技能之一,因为它可以帮助Web开发人员处理大量的数据,并且提供了强大的数据查询和管理功能。 一 数据库介绍 1.1 什么是数据库 1.2 数据库产品 二 MySQL概述…

vue3 实现简单瀑布流

一、整理思路 实际场景中&#xff0c;瀑布流一般由 父组件 提供 数据列表&#xff0c;子组件渲染每个图片都是根据容器进行 绝对定位 &#xff0c;从而定好自己的位置取出 屏幕的宽度&#xff0c;设定 图片的宽度 固定 为一个值&#xff0c;计算可以铺 多少列按列数 先铺上第一…

5G科技防汛,助力守护一方平安

“立秋虽已至&#xff0c;炎夏尚还在”&#xff0c;受台风席卷以及季节性影响全国多地正面临强降水的严峻挑战。“落雨又顺秋&#xff0c;绵绵雨不休”&#xff0c;正值“七下八上” 防汛关键时期&#xff0c;贵州省水文水资源局已全面进入备战状态。 为确保及时响应做好防汛抢…

Vue3 setup新特性简单应用

去官网学习→组合式 API&#xff1a;setup() | Vue.js 运行示例&#xff1a; 代码&#xff1a;App.vue <template><div class"home"><img alt"Vue logo" src"../assets/logo.png"><!-- msg 组件传递数据 --><Hell…

VBA_MF系列技术资料1-157

MF系列VBA技术资料 为了让广大学员在VBA编程中有切实可行的思路及有效的提高自己的编程技巧&#xff0c;我参考大量的资料&#xff0c;并结合自己的经验总结了这份MF系列VBA技术综合资料&#xff0c;而且开放源码&#xff08;MF04除外&#xff09;&#xff0c;其中MF01-04属于定…

MySQL 面试题

一、数据库基础 1、MySQL 有哪些数据库类型? (1) 整数类型&#xff1a; TINYINT 1 字节 SMALLINT 2 字节 MEDIUMINT 3 字节 INT 4 字节 BIGINT 8 字节 ① 任何整数类型都可以加上 UNSIGNED …

【学会动态规划】最长湍流子数组(23)

目录 动态规划怎么学&#xff1f; 1. 题目解析 2. 算法原理 1. 状态表示 2. 状态转移方程 3. 初始化 4. 填表顺序 5. 返回值 3. 代码编写 写在最后&#xff1a; 动态规划怎么学&#xff1f; 学习一个算法没有捷径&#xff0c;更何况是学习动态规划&#xff0c; 跟我…

在 IDEA 中使用 Git开发 图文教程

在 IDEA 中使用 Git开发 图文教程 一、连接远程仓库二、IDEA利用Git进行开发操作三、分支操作3.1 新建分支3.2 切换分支3.3 删除分支3.4 比较分支3.5 合并分支 四、常用快捷键 一、连接远程仓库 一、打开IDEA&#xff0c;进入目录&#xff1a;File ->New ->Project from…

opencv 矩阵运算

1.矩阵乘&#xff08;*&#xff09; Mat mat1 Mat::ones(2,3,CV_32FC1);Mat mat2 Mat::ones(3,2,CV_32FC1);Mat mat3 mat1 * mat2; //矩阵乘 结果 2.元素乘法或者除法&#xff08;mul&#xff09; Mat m Mat::ones(2, 3, CV_32FC1);m.at<float>(0, 1) 3;m.at…

浏览器控制台调试实用方法

许多程序员仅知道控制台的console.log&#xff0c;其实控制台API还包含一些其他实用方法&#xff0c; 这些方法在前端调试时会很有帮助。 目录 console.dir 查看对象属性和方法 输出DOM元素 console.error console.time和console.timeEnd console.log console.clear 总结…

(五)、深度学习框架源码编译

1、源码构建与预构建&#xff1a; 源码构建&#xff1a; 源码构建是通过获取软件的源代码&#xff0c;然后在本地编译生成可执行程序或库文件的过程。这种方法允许根据特定需求进行配置和优化&#xff0c;但可能需要较长的时间和较大的资源来编译源代码。 预构建&#xff1a; 预…

2023年05月 C/C++(二级)真题解析#中国电子学会#全国青少年软件编程等级考试

第1题:数字放大 给定一个整数序列以及放大倍数x,将序列中每个整数放大x倍后输出。 时间限制:1000 内存限制:65536 输入 包含三行: 第一行为N,表示整数序列的长度(N ≤ 100); 第二行为N个整数(不超过整型范围),整数之间以一个空格分开; 第三行包含一个整数(不超过整…

Vue2-全局事件总线、消息的订阅与发布、TodoList的编辑功能、$nextTick、动画与过渡

&#x1f954;&#xff1a;高度自律即自由 更多Vue知识请点击——Vue.js VUE2-Day9 全局事件总线1、安装全局事件总线2、使用事件总线&#xff08;1&#xff09;接收数据&#xff08;2&#xff09;提供数据&#xff08;3&#xff09;组件销毁前最好解绑 3、TodoList中的孙传父&…