关于MapReduce中自定义Combine类(一)

MRJobConfig
public static fina COMBINE_CLASS_ATTR
属性COMBINE_CLASS_ATTR= "mapreduce.job.combine.class"
————子接口(F4) JobContent
方法getCombinerClass
————子实现类 JobContextImpl
实现getCombinerClass方法:
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(COMBINE_CLASS_ATTR, null);
}
因为JobContextImpl是MRJobConfig子类
所以得到了父类MRJobConfig的COMBINE_CLASS_ATTR属性
————子类Job
public void setCombinerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
因为JobContextImpl是MRJobConfig子类,
而Job是JobContextImpl的子类
所以也有COMBINE_CLASS_ATTR属性
通过setCombinerClass设置了父类MRJobConfig的属性
MRJobConfig
————子接口JobContent
方法getCombinerClass
————子实现类 JobContextImpl
————子类 Job
————子实现类 TaskAttemptContext
继承了方法getCombinerClass
Task   
$CombinerRunner(Task的内部类)   
该内部类有方法create:
public static <K,V> CombinerRunner<K,V> create(JobConf job,
TaskAttemptID taskId,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer
) throws ClassNotFoundException
{
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
if (cls != null) {
return new OldCombinerRunner(cls, job, inputCounter, reporter);
}
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
reporter);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
inputCounter, reporter, committer);
}
return null;
}
其中这一段应该是旧的API
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
if (cls != null) {
return new OldCombinerRunner(cls, job, inputCounter, reporter);
}
而这个是新的API
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
reporter);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
inputCounter, reporter, committer);
}
return null;
(不知道为什么要写全名,去掉那些包名、向上/下转型和各种泛型的话,看起来就会清晰很多?)
而TaskAttemptContext是JobContent的子实现类,所以继承了getCombinerClass方法
而且,这里用的是多态,其调用的是子实现类TaskAttemptContextImpl的getCombinerClass方法
(TaskAttemptContextImpl继承了JobContextImpl,而JobContextImpl实现了该方法)
所以最终get到了属性COMBINE_CLASS_ATTR,即得到了我们通过job.setCombinerClass的xxxC
而这个xxxC是给了newcls,而newcls是给了NewCombinerRunner的构造函数的reducerClassc参数
NewCombinerRunner(Class reducerClass,
JobConf job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
org.apache.hadoop.mapreduce.TaskAttemptContext context,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer)
{
super(inputCounter, job, reporter);
this.reducerClass = reducerClass;
this.taskId = taskId;
keyClass = (Class<K>) context.getMapOutputKeyClass();
valueClass = (Class<V>) context.getMapOutputValueClass();
comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
this.committer = committer;
}
Task
MapTask
$MapOutputBuffer
private CombinerRunner<K,V> combinerRunner;
$SpillThread类($表示内部类)
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
//此时,我们得到了设置好的合并类                            
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
再查看combine函数
在Task的内部类NewCombinerRunner下
public void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector)
throws IOException, InterruptedException,ClassNotFoundException
{
// make a reducer
org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
(org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
ReflectionUtils.newInstance(reducerClass, job);
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, taskId,
iterator, null, inputCounter,
new OutputConverter(collector),
committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
}
上面的reducerClass就是我们传入的xxxC
最终是通过反射创建了一个xxxC对象,并将其强制向上转型为Reducer实例对象,
然后调用了向上转型后对象的run方法(当前的xxxC没有run方法,调用的是父类Reduce的run)
在类Reducer中,run方法如下
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
           * control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();       
}
}
} finally {
cleanup(context);
}
}
有由于多态,此时调用的reduce是子类xxxC中的reduce方法
(多态态性质:子类复写了该方法,则实际上执行的是子类中的该方法)
所以说,我们自定义combine用的类的时候,应该继承Reducer类,并且复写reduce方法
且其输入形式:(以wordcount为例)
       reduce(Text key, Iterable<IntWritable> values, Context context)
       其中key是单词个数,而values是个数列表,也就是value1、value2........
       注意,此时已经是列表,即<键,list<值1、值2、值3.....>>
       (之所以得到这个结论,是因为我当时使用的combine类是WCReduce,
        即Reduce和combine所用的类是一样的,通过对代码的分析,传入值的结构如果是<lkey,value>的话,是不可能做到combine的啊——即所谓的对相同值合并,求计数的累积和,这根本就是两个步骤,对key相同的键值对在map端就进行了一次合并了,合并成了<key,value list>,然后才轮到combine接受直接换个形式的输入,并处理——我们的处理是求和,然后再输出到context,进入reduce端的shuffle过程。
        然后我在reduce中遍历了用syso输出
        结果发现是0,而这实际上是因为经过一次遍历,我的指针指向的位置就不对了啊,
        )
嗯,自己反复使用以下的代码,不断的组合、注释,去测试吧~就会得出这样的结论了
  1. /reduce
  2.     publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
  3.         private final IntWritableValueOut=newIntWritable();
  4.         @Override
  5.         protectedvoid reduce(Text key,Iterable<IntWritable> values,
  6.                 Context context)  throws IOException,InterruptedException{
  7.             for(IntWritable value : values){
  8.                 System.out.println(value.get()+"--");
  9.             }
  10.  
  11. //            int total = 0 ;
  12. //            for (IntWritable value : values) {
  13. //                total += value.get();
  14. //            }
  15. //            ValueOut.set(total);
  16. //            context.write(key, ValueOut);
  17.         }
  18.  
  19.     }
  20.           
  21. job.setCombinerClass(WCReduce.class);



来自为知笔记(Wiz)



附件列表

 

转载于:https://www.cnblogs.com/xuanlvshu/p/5744445.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/470305.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

react脚手架配置代理解决跨域问题

一、问题描述&#xff1a; 控制台报错&#xff0c;出现跨域问题 二、解决方案 配置代理&#xff1a; 第一种配置方式&#xff1a; 在package.json中追加如下配置 "proxy":"http://localhost:5000"说明&#xff1a; 优点&#xff1a;配置简单&#xff…

kstools工具是什么牌子_2020年平衡车推荐,电动平衡车哪个牌子好?老司机教你如何选购电动平衡车...

2020年平衡车推荐&#xff0c;电动平衡车哪个牌子好&#xff1f;老司机教你如何选购电动平衡车随着我国科技的发展&#xff0c;生活水平的提高&#xff0c;在很多地方都出现了电动平衡车的身影&#xff0c;人们将电动平衡车当做短距离代步的工具&#xff0c;也是非常实用的。很…

plsql 中的一些好的设置和快捷键总结

1、SQL语句字符全部大写 自认为这是个好习惯&#xff0c;信息系统的核心是数据库&#xff0c;系统出问题时最先要查的就是SQL语句&#xff0c;怎样在浩瀚的日志中快速找到那条SQL语句是件比较痛苦的事情。 SQL语句全部大写并不能彻底解决这一问题&#xff0c;但在一堆代码…

python工控怎么样_搞工控不了解python,好比雄鹰断了翅膀,理由在这里!

这时&#xff0c;距离下班只有30分钟了。无奈&#xff0c;先梳理一下要展示的数据指标&#xff1a;达成率&#xff0c;环比&#xff0c;增长、人均单产、人力成本等数据的演算、推导、分析&#xff0c;还要匹配对应的PPT数据魔方....数据很多&#xff0c;先把每个人的数据调出来…

react父子组件通信案例

父组件&#xff1a;App组件 子组件&#xff1a;Search组件、List组件 案例需求&#xff1a;文本框中输入关键词&#xff0c;点击搜索按钮后&#xff0c;下方列表展示出搜索结果 实现思路&#xff1a; 子组件Search组件向父组件App传递状态&#xff08;状态包括&#xff1a;是否…

模块与包(8)

第8章 复杂程序组织模块&#xff1a;包含函数或者类的Python程序导入模块形式&#xff1a;import 模块名import 模块名 as 新名字 from 模块名 import 函数名from 模块名 import * #导入模块中的所有代码使用import和from导入&#xff0c;调用函数的不同&#xff1a;使…

matlab画线不同颜色_怎样画线框图才有意义

我们常轻忽身边习以为常的事物&#xff0c;觉得没有必要为一些看似简单又可有可无的东西浪费时间——例如线框图。虽然没必要凡事都寻根问底&#xff0c;但当面对复杂问题时&#xff0c;脚踏实地回归基本面也许才是根本解法。本文章深入介绍程序开发界面设计中&#xff0c;最简…

react 消息订阅-发布机制(解决兄弟组件通信问题)

消息订阅-发布机制 工具库: PubSubJS下载: npm install pubsub-js --save使用: 1)import PubSub from ‘pubsub-js’ //引入 2)PubSub.subscribe(‘delete’, function(data){ }); //订阅 3)PubSub.publish(‘delete’, data) //发布消息 App组件&#xff1a; import React,…

运行Myeclipse发生这事这是怎么回事,大神们

转载于:https://www.cnblogs.com/zhuh102/p/5753616.html

非零返回怎么解决_VLOOKUP如何返回多个值?

今天我来谈谈大家最熟悉的函数&#xff0c;也是使用频率最高的函数&#xff0c;基本是每天都在使用-VLOOKUP大家都知道VLOOKUP可以根据条件&#xff0c;查找并返回满足条件对应列的值&#xff0c;但是他的设定只是只能返回第一个满足条件的值如果我们要返回满足条件的多个值&am…

Fetch发送网络请求

1. 文档 https://github.github.io/fetch/https://segmentfault.com/a/1190000003810652 2. 特点 fetch: 原生函数&#xff0c;不再使用XmlHttpRequest对象提交ajax请求老版本浏览器可能不支持 3. 相关API GET请求 fetch(url).then(function(response) {return response.…

JSX详解React的事件绑定事件参数的传递

一、认识JSX 这段element变量的声明右侧赋值的标签语法是什么呢&#xff1f; 它不是一段字符串&#xff08;因为没有使用引号包裹&#xff09;&#xff0c;它看起来是一段HTML原生&#xff0c;但是我们能在js中直接给一个变量赋值html吗&#xff1f;其实是不可以的&#xff0c…

剑指Offer 从尾到头打印链表

题目描述 输入一个链表&#xff0c;从尾到头打印链表每个节点的值。 输入描述: 输入为链表的表头 输出描述: 输出为需要打印的“新链表”的表头 思路&#xff1a; 用容器vector&#xff0c;递归到最后一个元素&#xff0c;push_back到dev中。 AC代码&#xff1a; 1 /**2 * str…

python排序元组两个元素_在python中对具有3个元素的元组列表进行排...

只需对列表进行排序&#xff1b;默认排序功能可以满足您的需求.比较两个元组时,将根据它们的内容对其进行排序&#xff1b;首先对第一个元素进行排序,如果相等,则对第二个元素进行排序,依此类推.演示&#xff1a;>>> L [(14, 2, 3), (1, 14, 0), (14, 1, 1), (1, 14,…

叠积木

【题目描述】 约翰和贝西在叠积木。共有30000块积木&#xff0c;编号为1到30000。一开始&#xff0c;这些积木放在地上&#xff0c;自然地分成N堆。贝西接受约翰的指示&#xff0c;把一些积木叠在另一些积木的上面。一旦两块积木相叠&#xff0c; 彼此就再也不会分开了&#xf…

总谐波失真计算公式_新能源汽车技术|车用轮毂电机转矩谐波协同控制策略

点击 电机与控制应用 关注我们轮毂电机因结构简单、驱动灵活的特点广泛应用于轻型电动车辆。电机运行中存在的齿槽效应、逆变器非线性效应及电流谐波等问题&#xff0c;导致电机电磁转矩波动&#xff0c;影响车辆运行的平顺性。通过电磁转矩谐波分析发现其主要成分为低阶谐波。…

React条件渲染列表渲染

一、React条件渲染 某些情况下&#xff0c;界面的内容会根据不同的情况显示不同的内容&#xff0c;或者决定是否渲染某部分内容&#xff1a; 在vue中&#xff0c;我们会通过指令来控制&#xff1a;比如v-if、v-show&#xff1b;在React中&#xff0c;所有的条件判断都和普通的J…

解决VirtualBox错误:“FATAL:No bootable medium found!”

VirtualBox错误&#xff1a;“FATAL&#xff1a;No bootable medium found!”  用VirtualBox安装系统出现这个错误的几率极高&#xff0c;因为当哥出现同样问题的时候股沟了下”FATAL&#xff1a;No bootable medium found!“出现很多内容&#xff0c;但没一个把问题解决了的…

React事件总线

通过Context主要实现的是数据的共享&#xff0c;但是在开发中如果有跨组件之间的事件传递&#xff0c;应该如何操作呢&#xff1f; 一、安装events 在Vue中我们可以通过Vue的实例&#xff0c;快速实现一个事件总线&#xff08;EventBus&#xff09;&#xff0c;来完成操作&…

字符串最长公共子序列python_求解两个字符串的最长公共子序列

一&#xff0c;问题描述给定两个字符串&#xff0c;求解这两个字符串的最长公共子序列(Longest Common Sequence)。比如字符串1&#xff1a;BDCABA&#xff1b;字符串2&#xff1a;ABCBDAB则这两个字符串的最长公共子序列长度为4&#xff0c;最长公共子序列是&#xff1a;BCBA二…