4 开发MapReduce应用程序


系统参数配置

Configuration类由源来设置,每个源包含以XML形式出现的一系列属性/值对。如:

configuration-default.xml

configuration-site.xml

Configuration conf = new Configuration();

conf.addResource("configuraition-default.xml");

conf.addResource("|configuration-site.xml");

后添加进来的属性取值覆盖前面所添加资源中的属性取值,除非前面的属性值被标记为final。


Hadoop默认使用两个源进行配置,顺序加载core-default.xml和core-site.xml。

前者定义系统默认属性,后者定义在特定的地方重写。


性能调优

在正确完成功能的基础上,使执行的时间尽量短,占用的空间尽量小。


输入采用大文件

1000个2.3M的文件运行33分钟;合并为1个2.2G的文件后运行3分钟。

也可借用Hadoop中的CombineFileInputFormat,它将多个文件打包到一个输入单元中,从而每次执行Map操作就会处理更多的数据。


压缩文件

对Map的输出进行压缩,好处:减少存储文件的空间;加快在网络上的传输速度;减少数据在内存和磁盘间交换的时间。

mapred.compress.map.output设置为true来对Map的输出数据进行压缩;

mapred.map.output.compression.codec设置压缩格式


修改作业属性

在conf目录下修改属性

mapred.tasktracker.map.tasks.maximum

mapred.tasktracker.reduce.tasks.maximum

设置Map/Reduce任务槽数,默认均为2。


MapReduce工作流

如果处理过程变得复杂了,可以通过更加复杂、完善的Map和Reduce函数,甚至更多的MapReduce工作来体现。


复杂的Map和Reduce函数

基本的MapReduce作业仅仅集成并覆盖了基类Mapper和Reducer中的核心函数Map或Reduce。

下面介绍基类中的其他函数,使大家能够编写功能更加复杂、控制更加完备的Map和Reduce函数。


setup函数

源码如下

/**
* Called once at the start of the task
*/
protected void setup( Context context) throws IOException, InterruptedException {//NOTHING
}

此函数在task启动开始时调用一次。

每个task以Map类或Reduce类为处理方法主体,输入分片为处理方法的输入,自己的分片处理完之后task也就销毁了。

setup函数在task启动之后数据处理之前只调用一次,而覆盖的Map函数或Reduce函数会针对输入分片中的每个key调用一次。

可以将Map或Reduce函数中的重复处理放置到setup函数中;

可以将Map或Reduce函数处理过程中可能使用到的全局变量进行初始化;

可以从作业信息中获取全局变量;

可以监控task的启动。


cleanup函数

/**
* Called noce at the end of the task
*/
protected void cleanup(Context context) throws IOException, InterruptedException {//NOTHING
}

和setup相似,不同之处在于cleanup函数是在task销毁之前执行的。


run函数

/**
* Expert users can override this method for more complete control over the execution of the Mapper.
*@param context
*@throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {setup (context);while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}cleanup(context);
}

此函数是map函数或Reduce函数的启动方法。

如果想更完备地控制Map或者Reduce,可以覆盖此函数。


MapReduce中全局共享数据方法

1、读写HDFS文件

利用Hadoop的Java API来实现。

需要注意:多个Map或Reduce的写操作会产生冲突,覆盖原有数据。

优点:能够实现读写,比较直观;

缺点:要贡献一些很小的全局数据也需要使用IO,这将占用系统资源,增加作业完成的资源消耗。


2、配置Job属性

在任务启动之初利用Configuration类中的set(String name, String value)将一些简单的全局数据封装到作业的配置属性中;

然后在task中利用Configuration类中的get(String name)获取配置到属性中的全局数据。

优点:简单,资源消耗小;

缺点:对量比较大的共享数据显得比较无力。


3、使用DistributedCache

为应用提供缓存文件的只读工具,可以缓存文本文件、压缩文件、jar文件。

在使用时,用户可以在作业配置时使用本地或HDFS文件的UCRL来将其设置成共享缓存文件。

在作业启动之后和task启动之前,MapReduce框架会将可能需要的缓存文件复制到执行任务结点的本地。

优点:每个Job共享文件只会在启动之后复制一次,适用于大量的共享数据;

缺点:只读。

//配置
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("/myapp/lookup"), conf);
//在Map函数中使用:
public static class Map extends Mapper<...>{private Path[] localArchives;private Paht[] localFiles;public void setup (Context context) throws IOException, InterruptedException{Configuration conf = context.getConfiguration();localArchives = DistributedCache.getLocalCacheArchives(conf);localFiles = DistributedCache.getLocalCacheFiles(conf);}public void map(K key, V value, Context context) throws IOException {//使用从缓存文件中获取的数据context.collect(k, v);}
} 



链接MapReduce Job

如果问题不是一个MapReduce作业就能解决,就需要在工作流中安排多个MapReduce作业,让它们配合起来自动完成一些复杂的任务,而不需要用户手动启动每一个作业。


1、线性MapReduce Job流

最简单的办法是设置多个有一定顺序的Job,每个Job以前一个Job的输入作为输入,经过处理,将数据再输入到下一个Job中。

这种办法的实现非常简单,将每个Job的启动代码设置成只有上一个Job结束之后才执行,然后将Job的输入设置成上一个Job的输出路径。


2、复杂MapReduce Job流

第一种方法在某些复杂任务下仍然不能满足需求。

如Job3需要将Job1和Job2的输出结果组合起来进行处理。这种情况下Job3的启动依赖于Job1和Job2的完成,但Job1和Job2之间没有关系。

针对这种复杂情况,MapReduce框架提供了让用户将Job组织成复杂Job流的API--ControlledJob类和JobControl类。这两个类属于org.apache.hadoop.mapreduce.lib.jobcontrol包。

具体做法:

先按照正常情况配置各个Job;

配置完成后再将各个Job封装到对应的ControlledJob对象中;

然后使用ControlledJob的addDependingJob()设置依赖关系;

接着再实例化一个JobControl对象,并使用addJob()方法将所有的Job注入JobControl对象中;

最后使用JobControl对象的run方法启动Job流。


3、Job设置预处理和后处理过程

org.apache.hadoop.mapred.lib包下的ChainMapper和ChainReducer两个静态类来实现。


The ChainMapper class allows to use multiple Mapper classes within a single Map task.

The Mapper classes are invoked in a chained (or piped) fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.


The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.

For each record output by the Reducer, the Mapper classes are invoked in a chained (or piped) fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.  


The key functionality of this feature is that the Mappers in the chain do not need to be aware that they are executed in a chain. This enables having reusable specialized Mappers that can be combined to perform composite operations within a single task.

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain. It is assumed all Mappers and the Reduce in the chain use maching output and input key and value classes as no conversion is done by the chaining code.

Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like[MAP+ / REDUCE MAP*]. And immediate benefit of this pattern is a dramatic reduction in disk IO.

IMPORTANT: There is no need to specify the output key/value classes for the ChainMapper, this is done by the addMapper for the last mapper in the chain.

ChainMapper usage pattern:

 ...conf.setJobName("chain");conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);

JobConf mapAConf = new JobConf(false);...ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,Text.class, Text.class, true, mapAConf);

JobConf mapBConf = new JobConf(false);...ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class,LongWritable.class, Text.class, false, mapBConf);

JobConf reduceConf = new JobConf(false);...ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class,Text.class, Text.class, true, reduceConf);

ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class,LongWritable.class, Text.class, false, null);

ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,LongWritable.class, LongWritable.class, true, null);

FileInputFormat.setInputPaths(conf, inDir);FileOutputFormat.setOutputPath(conf, outDir);...

JobClient jc = new JobClient(conf);RunningJob job = jc.submitJob(conf);...


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387169.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实用的HTML5的上传图片方法

<input type"file" accept"video/*;capturecamcorder"> <input type"file" accept"audio/*;capturemicrophone"><input type"file" accept"image/*;capturecamera">直接调用相机<input type…

3.11 列出完数

完数&#xff1a;一个数恰好等于不包括自身的所有不同因子之和。如6123。 输入&#xff1a;每一行含有一个整数n。 输出&#xff1a;对每个整数n&#xff0c;输出所有不大于n的完数。输出格式为&#xff1a;整数n&#xff0c;冒号&#xff0c;空格&#xff0c;完数&#xff0…

angularjs 上传

xxx.module.ts模块 import { NgModule} from “angular/core”; import { FileUploadModule } from “ng2-file-upload” ; import { XXXComponent } from “./xxx.component”; NgModule({ imports:[ FileUploadModule ], declarations:[ XXXComponent &#xff0c;/component…

PHPCMS的产品筛选功能

如下图所示功能&#xff1a; 首先&#xff0c;用下面这些代码替换掉phpcms/libs/functions/extention.func.php的内容 <?php /*** extention.func.php 用户自定义函数库** copyright (C) 2005-2010 PHPCMS* license http://www.phpcms.cn/licen…

框架使用SpringBoot + Spring Security Oauth2 +PostMan

框架使用SpringBoot Spring Security Oauth2 主要完成了客户端授权 可以通过mysql数据库读取当前客户端表信息进行验证&#xff0c;token存储在数据库中 1.引入依赖 oauth2 依赖于spring security&#xff0c;需要引入spring&#xff0c; mysql&#xff0c;redis&#xff0c; …

3.12 12!配对

找出输入数据中所有两两相乘的积为12!的个数。 输入样例&#xff1a; 1 10000 159667200 9696 38373635 1000000 479001600 3 1 479001600 输出样例&#xff1a; 3 有3对&#xff1a; 1 479001600 1 479001600 3 159667200 #include<iostream> #include<fstre…

程序员自身价值值这么多钱么?

xx 网络公司人均奖金 28 个月…… xx 科技公司人均奖金 35 个月…… 每到年底&#xff0c;这样的新闻在互联网业内简直是铺天盖地。那些奖金不高的程序员们一边羡慕嫉妒&#xff0c;一边暗暗比较一下自己的身价&#xff0c;考虑是不是该跳槽了。 不同水平的程序员&#xff0c;薪…

3.13 判读是否是对称素数

输入&#xff1a;11 101 272 输出&#xff1a; Yes Yes No #include<fstream> #include<iostream> #include<sstream> #include<string> #include<cmath> using namespace std;bool isPrime(int); bool isSymmetry(int);int main(){ifstream…

Spring MVC中使用 Swagger2 构建Restful API

0.Spring MVC配置文件中的配置[java] view plaincopy<!-- 设置使用注解的类所在的jar包&#xff0c;只加载controller类 --> <span style"white-space:pre"> </span><context:component-scan base-package"com.jay.plat.config.contro…

Go语言规范汇总

目录 统一规范篇合理规划目录GOPATH设置import 规范代码风格大小约定命名篇基本命令规范项目目录名包名文件名常量变量变量申明变量命名惯例全局变量名局部变量名循环变量结构体(struct)接口名函数和方法名参数名返回值开发篇包魔鬼数字常量 & 枚举结构体运算符函数参数返回…

3.14 01串排序

将01串首先按照长度排序&#xff0c;其次按1的个数的多少排序&#xff0c;最后按ASCII码排序。 输入样例&#xff1a; 10011111 00001101 10110101 1 0 1100 输出样例&#xff1a; 0 1 1100 1010101 00001101 10011111 #include<fstream> #include<iost…

platform(win32) 错误

运行cnpm install后&#xff0c;出现虽然提示不适合Windows&#xff0c;但是问题好像是sass loader出问题的。所以只要执行下面命令即可&#xff1b;方案一&#xff1a;cnpm rebuild node-sass #不放心可以重新安装下 cnpm install方案二&#xff1a;npm update npm install no…

Error: Program type already present: okhttp3.Authenticator$1

在app中的build.gradle中加入如下代码&#xff0c; configurations {all*.exclude group: com.google.code.gsonall*.exclude group: com.squareup.okhttp3all*.exclude group: com.squareup.okioall*.exclude group: com.android.support,module:support-v13 } 如图 转载于:ht…

3.15 排列对称串

筛选出对称字符串&#xff0c;然后将其排序。 输入样例&#xff1a; 123321 123454321 123 321 sdfsdfd 121212 \\dd\\ 输出样例 123321 \\dd\\ 123454321 #include<fstream> #include<iostream> #include<string> #include<set> using …

ES6规范 ESLint

在团队的项目开发过程中&#xff0c;代码维护所占的时间比重往往大于新功能的开发。因此编写符合团队编码规范的代码是至关重要的&#xff0c;这样做不仅可以很大程度地避免基本语法错误&#xff0c;也保证了代码的可读性&#xff0c;毕竟&#xff1a;程序是写给人读的&#xf…

前端 HTML 常用标签 head标签相关内容 script标签

script标签 定义JavaScript代码 <!--定义JavaScript代码--> <script type"text/javascript"></script> 引入JavaScript文件 src""引入的 js文件路径 <!-- 引入JavaScript文件 --> <script src"./index.js"></s…

3.16 按绩点排名

成绩60分及以上的课程才予以计算绩点 绩点计算公式&#xff1a;[(课程成绩-50) / 10 ] * 学分 学生总绩点为所有绩点之和除以10 输入格式&#xff1a; 班级数 课程数 各个课程的学分 班级人数 姓名 各科成绩 输出格式&#xff1a; class 班级号: 姓名&#xff08;占1…

iview日期控件,双向绑定日期格式

日期在双向绑定之后格式为&#xff1a;2017-07-03T16:00:00.000Z 想要的格式为2017-07-04调了好久&#xff0c;几乎一天&#xff1a;用一句话搞定了 on-change”addForm.Birthday$event”<Date-picker placeholder"选择日期" type"datetime" v-model&…

移除html,jsp中的元素

移除html&#xff0c;jsp中的元素 某些时候&#xff0c;需要移除某个元素&#xff0c;比如移除表中的某一行 $("#tbody").children().eq(i).remove();或者 $("#tr").remove();PS&#xff1a;获取表中的tr的数量&#xff1a; $("#tbody").childre…

ACM001 Quicksum

本题的重点在于数据的读入。 可采用cin.getlin()一行一行读入数据&#xff1b;也可采用cin.get()一个一个读入字符。 cin会忽略回车、空格、Tab跳格。 cin.get()一个一个字符读&#xff0c;不忽略任何字符。 cin.getline()一行一行读入。 #include<fstream> #include…