MapReduce超详解

简介

概述

MapReduce是Hadoop提供的一套用于进行分布式计算的模型,本身是Doug Cutting根据Google的<MapReduce: Simplified Data Processing on Large Clusters>仿照实现的。

MapReduce由两个阶段组成:Map(映射)阶段和Reduce(规约)阶段,用户只需要实现map以及reduce两个函数,即可实现分布式计算,这样做的目的是简化分布式程序的开发和调试周期。

特点

MapReduce的优点:

1)MapReduce易于编程:用户只需要简单的实现MapReduce提供的一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。

2)具有良好的扩展性:当当前的集群的计算资源不能得到满足的时候,可以通过简单的增加机器来扩展它的计算能力。

3)高容错性:MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。例如,如果集群中某一台服务器宕机,那么MapReduce可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

4)适合PB级以上海量数据的离线处理:可以实现上千台服务器集群并发工作,提供数据处理能力。

MapReduce的缺点:

1)不擅长实时计算:MapReduce的运行速度相对比较低,一般在毫秒或者秒级内返回结果,因此不适合于实时分析的场景。

2)不擅长流式计算:流式计算的输入数据是动态的,而MapReduce要求输入的数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

3)不擅长DAG(有向图)计算:多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

入门案例

思路

案例:统计一个文件中每一个字符出现的次数(处理文件:characters.txt)。

在MapReduce刚开始的时候,会先对文件进行切片(Split)处理。需要注意的是,切片本身是一种逻辑切分而不是物理切分,本质上就是在划分任务量,之后每一个切片会交给一个单独的MapTask来进行处理。默认情况下,Split和Block的大小是一致的。

切片之后,每一个切片(Split)会分配给一个单独的MapTask来处理。而MapTask确定好要处理的切片之后,默认情况下会对切片进行按行处理。需要注意,不同的MapTask之间只是处理的数据不同,但是处理的逻辑是相同的。

MapTask处理完数据之后,会将数据交给ReduceTask进行汇总。ReduceTask收到数据之后,会先将相同的键对应的值放到一组去,形成一个迭代器,这个过程称之为分组(group)。分组之后,再调用reduce方法对数据进行汇总处理,最终将处理结果写出到指定的文件系统中。

实现过程

导入POM依赖:

<dependencies><!--单元测试--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency><!--日志打印--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.20.0</version></dependency><!--Hadoop通用包--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.4</version></dependency><!--Hadoop客户端--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.4</version></dependency><!--Hadoop HDFS--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.2.4</version></dependency></dependencies>

定义Mapper类:

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;// 需要继承Mapper类// 需要注意的是,MapReduce要求被处理的传输的数据能够被序列化// MapReduce提供了一套单独的序列化机制// KEYIN - 输入的键的类型。默认情况下,是行的字节偏移量// VALUEIN - 输入的值的类型。默认情况下,是输入的一行数据// KEYOUT - 输出的键的类型。本案例中,输出的是字符,所以类型是Text// VALUEOUT - 输出的值的类型。本案例中,输出的是个数,所以类型是LongWritablepublic class CharCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {    // 次数    private final LongWritable once = new LongWritable(1);    // 需要覆盖map方法,将处理逻辑放入map方法中    // key:键。行的字节偏移量    // value:值,读取的一行数据    // context:环境参数,可以利用这个参数将数据传递给ReduceTask    @Override    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {        // 获取一行数据        String line = value.toString();        // 拆分字符        char[] cs = line.toCharArray();        // 遍历数据,写出        for (char c : cs) {            context.write(new Text(String.valueOf(c)), once);        }    }}

定义Reducer类:

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;// 需要继承Reducer// KEYIN,VALUEIN - 输入的键值类型。Reducer的数据从Mapper来,所以Mapper输出什么类型,Reducer就接收什么类型// KEYOUT,VALUEOUT - 输出的值的类型。本案例中,输出的是字符和次数public class CharCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {    // 覆盖reduce方法,将逻辑写到reduce方法中    // key:键。本案例中,是字符    // values:值。本案例中,是字符对应的次数    // context:环境参数    @Override    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {        // 定义变量记录次数        long sum = 0;        // 遍历次数        for (LongWritable value : values) {            // 次数累计            sum += value.get();        }        // 写出结果        context.write(key, new LongWritable(sum));    }}

定义入口类(驱动类):

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CharCountDriver {    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {        // 获取环境变量        Configuration conf = new Configuration();        // 获取任务        Job job = Job.getInstance(conf);        // 指定入口类        job.setJarByClass(CharCountDriver.class);        // 设置Mapper类        job.setMapperClass(CharCountMapper.class);        // 设置Reducer类        job.setReducerClass(CharCountReducer.class);        // 设置Mapper的输出的键的类型        job.setMapOutputKeyClass(Text.class);        // 设置Mapper的输出的值的类型        job.setMapOutputValueClass(LongWritable.class);        // 设置Reducer的输出的键的类型        job.setOutputKeyClass(Text.class);        // 设置Reducer的输出的值的类型        job.setOutputValueClass(LongWritable.class);        // 设置输入路径        FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/txt/characters.txt"));        // 设置输出路径        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/result/char_count"));        // 提交任务,等待结束        job.waitForCompletion(true);    }}

在本地运行MapReduce之前,需要将Hadoop安装目录解压到本地的路径下,然后需要将给定资料中的bin.7z解压到相应的bin目录下,然后双击winutils.exe,如果是出现一个黑窗口一闪而过,则表示没有任何问题。如果双击winutils.exe出错,则需要将msvcr120.dll文件拷贝到C:\Windows\System32目录下,然后再双击winutils.exe。

之后需要配置环境变量:HADOOP_HOME,Path和HADOOP_USER_NAME。

如果在指定输入的时候,指定路径是一个目录,那么MapReduce会处理这个目录下的所有的文件。

问题解决

如果运行过程中出现了null/bin/winutils.exe,那么解决方案如下:

1)先检查环境变量是否配置正确;

2)如果环境变量正确,那么可以在Drivers中添加如下代码:

System.setProperty("hadoop.home.dir","Hadoop的解压路径");

如果运行过程中出现了NativeIO$Windows,那么解决方案如下:

1)先检查环境变量是否配置正确;

2)如果环境变量配置正确,那么可以将bin目录下的hadoop.dll文件拷贝到C:\Windows\System32目录下,再运行代码看是否配置正确;

3)如果上述方案依然无效,那么需要将给定资料中的NativeIO.java文件拷贝到当前工程下,建好对应的包。

练习

练习一:统计一个文件中单词出现的次数(处理文件:words.txt)。

Mapper类:

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {    private final IntWritable once = new IntWritable(1);    @Override    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {        // 拆分单词        String[] arr = value.toString().split(" ");        // 遍历,写出        for (String s : arr) {            context.write(new Text(s), once);        }    }}

Reducer类:

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {    @Override    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {        // 定义变量记录和        int sum = 0;        // 遍历,求和        for (IntWritable value : values) {            sum += value.get();        }        // 写出        context.write(key, new IntWritable(sum));    }}

驱动类:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver {    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(WordCountDriver.class);        job.setMapperClass(WordCountMapper.class);        job.setReducerClass(WordCountReducer.class);        // 如果Mapper和Reducer的输出类型一致,可以只设置一次        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/txt/words.txt"));        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/result/word_count"));        job.waitForCompletion(true);    }}

练习二:IP去重(处理文件:ip.txt)。

Mapper类:

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;// 如果不需要值,那么值的类型可以是NullWritablepublic class IPMapper extends Mapper<LongWritable, Text, Text, NullWritable> {    @Override    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {        // 获取IP,写出        context.write(value, NullWritable.get());    }}

Reducer类:

import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class IPReducer extends Reducer<Text, NullWritable, Text, NullWritable> {    @Override    protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {        context.write(key, NullWritable.get());    }}

驱动类:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class IPDriver {    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(IPDriver.class);        job.setMapperClass(IPMapper.class);        job.setReducerClass(IPReducer.class);                job.setOutputKeyClass(Text.class);        job.setOutputValueClass(NullWritable.class);        FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/txt/ip.txt"));        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/result/ip"));        job.waitForCompletion(true);    }}

组件

Writable-序列化

在Hadoop的集群工作过程中,一般是利用RPC来进行集群节点之间的通信和消息的传输,所以要求MapReduce处理的对象必须可以进行序列化/反序列操作。Hadoop并没有使用Java原生的序列化,而是底层默认使用的序列化机制是AVRO。MapReduce针对常见的数据类型提供了其序列化形式:

表-1 序列化形式

Java类型

MapReduce类型

byte

Bytewritable

short

ShortWritable

int

IntWritable

long

LongWritable

floa

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/747810.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 Boot Camp 助理查明您的 Mac 需不需要 Windows 安装介质

使用 Boot Camp 助理查明您的 Mac 需不需要 Windows 安装介质 当前的 Mac 机型无需介质即可安装 Windows&#xff0c;也就是说&#xff0c;您不需要用到外置驱动器。较早的 Mac 机型需要用到 USB 驱动器或光盘驱动器。使用 Boot Camp 助理可查明您需要用到什么。 Boot Camp 助…

CXL-Enabled Enhanced Memory Functions——论文阅读

IEEE Micro 2023 Paper CXL论文阅读笔记整理 问题 计算快速链路&#xff08;CXL&#xff09;协议是系统社区的一个重要里程碑。CXL提供了标准化的缓存一致性内存协议&#xff0c;可用于将设备和内存连接到系统&#xff0c;同时保持与主机处理器的内存一致性。CXL使加速器&…

遗传算法及基于该算法的典型问题的求解实践

说明 遗传算法是一个很有用的工具&#xff0c;它可以帮我们解决生活和科研中的诸多问题。最近在看波束形成相关内容时了解到可以用这个算法来优化阵元激励以压低旁瓣&#xff0c;于是特地了解和学习了一下这个算法&#xff0c;觉得蛮有意思的&#xff0c;于是把这两天关于该算法…

SpringMVC 02

这里先附上前一篇的地址,以上系列均为博主的学习路线,仅供参考 初识Spring MVC-CSDN博客 下面我们从SpringMVC传递数组开始讲起 1.传递数组 传递数组的方式和传递普通变量的方式其实是相同的,下面我们附上传递的图片 RequestMapping("/r7")public String r1(String[…

笔记本电脑数据恢复:如何轻松地从笔记本电脑恢复文件

不小心从笔记本电脑中删除了一些重要文件&#xff1f;或者恶意软件和其他不可控因素是否导致您的文件消失&#xff1f;人们很容易认为这些文件已经永远消失&#xff0c;并且无法恢复。但这与事实相差甚远。通过遵循正确的数据恢复礼仪并使用良好的数据恢复工具&#xff0c;您可…

(done) 什么是词嵌入技术?word embedding ?(这里没有介绍词嵌入算法)(没有提到嵌入矩阵如何得到)

参考视频&#xff1a;https://www.bilibili.com/video/BV1sw411S7i1/?spm_id_from333.788&vd_source7a1a0bc74158c6993c7355c5490fc600 词嵌入&#xff08;word embedding&#xff09;&#xff1a;把词汇表中的词或短语 -------- 映射 ----> 固定长度向量 我们可以把 …

FPGA静态时序分析与约束(二)、时序分析

系列文章目录 FPGA静态时序分析与约束&#xff08;一&#xff09;、理解亚稳态 FPGA静态时序分析与约束&#xff08;三&#xff09;、读懂vivado时序报告 文章目录 系列文章目录前言一、时序分析基本概念1.1 时钟抖动1.2 时钟偏斜1.3 时钟不确定性Uncertainty1.4 建立时间和保…

DVWA靶场-CSRF跨站请求伪造

CSRF(跨站请求伪造)简介概念 CSRF&#xff08;Cross—site request forgery&#xff09;&#xff0c;跨站请求伪造&#xff0c;是指利用受害者未失效的身份认证信息&#xff08;cookie&#xff0c;会话等&#xff09;&#xff0c;诱骗其点击恶意链接或者访问包含攻击代码的页面…

nacos2.2.3 适配dm数据库

从github上下载了源码&#xff0c;选择了2.2.3分支后修改 适配后的代码下载&#xff0c;本地install用&#xff1a; nacos2.2.3_dm: 适配dm数据库 (gitee.com) alibba加了很多检查&#xff0c;跳过检查install命令&#xff1a; mvn -Prelease-nacos -Dmaven.test.skiptrue -D…

Django和Mysql数据库

Django学习笔记 Django和Mysql数据库 Django开发操作数据库更简单&#xff0c;内部提供了ORM框架。 1)安装mysqlclient pip3 install mysqlclient2)ORM ORM可以帮助我们做两件事&#xff1a; 1.创建、修改、修改数据库中的表&#xff08;不用写sql语句&#xff09;[不能创…

Linux系统性能优化:七个实战经验

1、影响Linux系统性能的因素一般有哪些&#xff1f; Linux系统的性能受多个因素的影响。以下是一些常见的影响Linux系统性能的因素&#xff1a; CPU负载&#xff1a;CPU的利用率和负载水平对系统性能有直接影响。高CPU负载可能导致进程响应变慢、延迟增加和系统变得不稳定。 内…

运维自动化之——Ansible

目录 一、自动化运维 1、通过xshell实现自动化运维 2、Ansible简介 3、Ansible特点及优势 4、Ansible核心程序 5、Ansible工作原理及流程 6、部署Ansible自动化运维工具 7、Ansible常用模块 ①ansible命令模块 ②command模块 ③shell模块 ④cron模块 ⑤user模块 …

Redis:ClassCastException【bug】

Redis&#xff1a;ClassCastException【bug】 前言版权Redis&#xff1a;ClassCastException【bug】错误产生相关资源控制器&#xff1a;UserController("/user")配置&#xff1a;RedisConfiguration实体类&#xff1a;User数据表&#xff1a;User 解决 最后 前言 2…

Windows蓝牙驱动开发之模拟HID设备(一)(把Windows电脑模拟成蓝牙鼠标和蓝牙键盘等设备)

by fanxiushu 2024-03-14 转载或引用请注明原作者 把Windows电脑模拟成蓝牙鼠标和蓝牙键盘&#xff0c;简单的说&#xff0c;就是把笨重的PC电脑当成鼠标键盘来使用。 这应该是一个挺小众的应用&#xff0c;但有时感觉也应该算比较好玩吧&#xff0c; 毕竟实现一种一般人都感觉…

Docker安装蜜罐Hfish

前言 无意中发现公司的一台服务器被爆破&#xff0c;修改了密码&#xff0c;为了确定内网是否安装需要搭建一个蜜罐来看一下是否存在隐患。 如何安装Docker&#xff0c;请查看我另一篇文章 https://blog.csdn.net/l1677516854/article/details/136751211 一、拉取镜像 dock…

工具类实现导出复杂excel、word

1、加入准备的工具类 package com.ly.cloud.utils.exportUtil;import java.util.Map;public interface TemplateRenderer {Writable render(Map<String, Object> dataSource) throws Throwable;}package com.ly.cloud.utils.exportUtil;import java.util.Map;public int…

Unity中的网格创建和曲线变形

Unity中的网格创建和曲线变形 3D贝塞尔曲线变形贝塞尔曲线基础线性公式二次方公式三次方公式 Unity 实现3D贝塞尔曲线变形准备工作脚本概述变量定义 变量解析函数解析 获取所有子节点GetAllChildren 获取所有子节点UpdateBezierBend 控制点更新CalculateBezier Bezier 曲线公式…

【SQL】PgSQL常用命令

PgSQL常用操作&#xff08;命令&#xff09; 连接数据库 使用username连接一个名为username的数据库&#xff08;数据库与用户名同名&#xff0c;其余默认&#xff09; psql -U username;使用username登录名为dbname的数据库&#xff08;其余默认&#xff09; psql -U user…

【算法杂货铺】二分算法

目录 &#x1f308;前言&#x1f308; &#x1f4c1; 朴素二分查找 &#x1f4c2; 朴素二分模板 &#x1f4c1; 查找区间端点处 细节&#xff08;重要&#xff09; &#x1f4c2; 区间左端点处模板 &#x1f4c2; 区间右端点处模板 &#x1f4c1; 习题 1. 35. 搜索插入位…

AI辅助信息技术发展

2024 年 AI 辅助研发趋势随着人工智能技术的持续发展与突破&#xff0c;2024年AI辅助研发正成为科技界和工业界瞩目的焦点。从医药研发到汽车设计&#xff0c;从软件开发到材料科学&#xff0c;AI正逐渐渗透到研发的各个环节&#xff0c;变革着传统的研发模式。在这一背景下&am…