关于MapReduce中自定义Combine类(一)

MRJobConfig
public static fina COMBINE_CLASS_ATTR
属性COMBINE_CLASS_ATTR= "mapreduce.job.combine.class"
————子接口(F4) JobContent
方法getCombinerClass
————子实现类 JobContextImpl
实现getCombinerClass方法:
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(COMBINE_CLASS_ATTR, null);
}
因为JobContextImpl是MRJobConfig子类
所以得到了父类MRJobConfig的COMBINE_CLASS_ATTR属性
————子类Job
public void setCombinerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
因为JobContextImpl是MRJobConfig子类,
而Job是JobContextImpl的子类
所以也有COMBINE_CLASS_ATTR属性
通过setCombinerClass设置了父类MRJobConfig的属性
MRJobConfig
————子接口JobContent
方法getCombinerClass
————子实现类 JobContextImpl
————子类 Job
————子实现类 TaskAttemptContext
继承了方法getCombinerClass
Task   
$CombinerRunner(Task的内部类)   
该内部类有方法create:
public static <K,V> CombinerRunner<K,V> create(JobConf job,
TaskAttemptID taskId,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer
) throws ClassNotFoundException
{
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
if (cls != null) {
return new OldCombinerRunner(cls, job, inputCounter, reporter);
}
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
reporter);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
inputCounter, reporter, committer);
}
return null;
}
其中这一段应该是旧的API
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
if (cls != null) {
return new OldCombinerRunner(cls, job, inputCounter, reporter);
}
而这个是新的API
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
reporter);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
inputCounter, reporter, committer);
}
return null;
(不知道为什么要写全名,去掉那些包名、向上/下转型和各种泛型的话,看起来就会清晰很多?)
而TaskAttemptContext是JobContent的子实现类,所以继承了getCombinerClass方法
而且,这里用的是多态,其调用的是子实现类TaskAttemptContextImpl的getCombinerClass方法
(TaskAttemptContextImpl继承了JobContextImpl,而JobContextImpl实现了该方法)
所以最终get到了属性COMBINE_CLASS_ATTR,即得到了我们通过job.setCombinerClass的xxxC
而这个xxxC是给了newcls,而newcls是给了NewCombinerRunner的构造函数的reducerClassc参数
NewCombinerRunner(Class reducerClass,
JobConf job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
org.apache.hadoop.mapreduce.TaskAttemptContext context,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer)
{
super(inputCounter, job, reporter);
this.reducerClass = reducerClass;
this.taskId = taskId;
keyClass = (Class<K>) context.getMapOutputKeyClass();
valueClass = (Class<V>) context.getMapOutputValueClass();
comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
this.committer = committer;
}
Task
MapTask
$MapOutputBuffer
private CombinerRunner<K,V> combinerRunner;
$SpillThread类($表示内部类)
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
//此时,我们得到了设置好的合并类                            
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
再查看combine函数
在Task的内部类NewCombinerRunner下
public void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector)
throws IOException, InterruptedException,ClassNotFoundException
{
// make a reducer
org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
(org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
ReflectionUtils.newInstance(reducerClass, job);
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, taskId,
iterator, null, inputCounter,
new OutputConverter(collector),
committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
}
上面的reducerClass就是我们传入的xxxC
最终是通过反射创建了一个xxxC对象,并将其强制向上转型为Reducer实例对象,
然后调用了向上转型后对象的run方法(当前的xxxC没有run方法,调用的是父类Reduce的run)
在类Reducer中,run方法如下
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
           * control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();       
}
}
} finally {
cleanup(context);
}
}
有由于多态,此时调用的reduce是子类xxxC中的reduce方法
(多态态性质:子类复写了该方法,则实际上执行的是子类中的该方法)
所以说,我们自定义combine用的类的时候,应该继承Reducer类,并且复写reduce方法
且其输入形式:(以wordcount为例)
       reduce(Text key, Iterable<IntWritable> values, Context context)
       其中key是单词个数,而values是个数列表,也就是value1、value2........
       注意,此时已经是列表,即<键,list<值1、值2、值3.....>>
       (之所以得到这个结论,是因为我当时使用的combine类是WCReduce,
        即Reduce和combine所用的类是一样的,通过对代码的分析,传入值的结构如果是<lkey,value>的话,是不可能做到combine的啊——即所谓的对相同值合并,求计数的累积和,这根本就是两个步骤,对key相同的键值对在map端就进行了一次合并了,合并成了<key,value list>,然后才轮到combine接受直接换个形式的输入,并处理——我们的处理是求和,然后再输出到context,进入reduce端的shuffle过程。
        然后我在reduce中遍历了用syso输出
        结果发现是0,而这实际上是因为经过一次遍历,我的指针指向的位置就不对了啊,
        )
嗯,自己反复使用以下的代码,不断的组合、注释,去测试吧~就会得出这样的结论了
  1. /reduce
  2.     publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
  3.         private final IntWritableValueOut=newIntWritable();
  4.         @Override
  5.         protectedvoid reduce(Text key,Iterable<IntWritable> values,
  6.                 Context context)  throws IOException,InterruptedException{
  7.             for(IntWritable value : values){
  8.                 System.out.println(value.get()+"--");
  9.             }
  10.  
  11. //            int total = 0 ;
  12. //            for (IntWritable value : values) {
  13. //                total += value.get();
  14. //            }
  15. //            ValueOut.set(total);
  16. //            context.write(key, ValueOut);
  17.         }
  18.  
  19.     }
  20.           
  21. job.setCombinerClass(WCReduce.class);



来自为知笔记(Wiz)



附件列表

 

转载于:https://www.cnblogs.com/xuanlvshu/p/5744445.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/470305.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

react脚手架配置代理解决跨域问题

一、问题描述&#xff1a; 控制台报错&#xff0c;出现跨域问题 二、解决方案 配置代理&#xff1a; 第一种配置方式&#xff1a; 在package.json中追加如下配置 "proxy":"http://localhost:5000"说明&#xff1a; 优点&#xff1a;配置简单&#xff…

kstools工具是什么牌子_2020年平衡车推荐,电动平衡车哪个牌子好?老司机教你如何选购电动平衡车...

2020年平衡车推荐&#xff0c;电动平衡车哪个牌子好&#xff1f;老司机教你如何选购电动平衡车随着我国科技的发展&#xff0c;生活水平的提高&#xff0c;在很多地方都出现了电动平衡车的身影&#xff0c;人们将电动平衡车当做短距离代步的工具&#xff0c;也是非常实用的。很…

plsql 中的一些好的设置和快捷键总结

1、SQL语句字符全部大写 自认为这是个好习惯&#xff0c;信息系统的核心是数据库&#xff0c;系统出问题时最先要查的就是SQL语句&#xff0c;怎样在浩瀚的日志中快速找到那条SQL语句是件比较痛苦的事情。 SQL语句全部大写并不能彻底解决这一问题&#xff0c;但在一堆代码…

react父子组件通信案例

父组件&#xff1a;App组件 子组件&#xff1a;Search组件、List组件 案例需求&#xff1a;文本框中输入关键词&#xff0c;点击搜索按钮后&#xff0c;下方列表展示出搜索结果 实现思路&#xff1a; 子组件Search组件向父组件App传递状态&#xff08;状态包括&#xff1a;是否…

matlab画线不同颜色_怎样画线框图才有意义

我们常轻忽身边习以为常的事物&#xff0c;觉得没有必要为一些看似简单又可有可无的东西浪费时间——例如线框图。虽然没必要凡事都寻根问底&#xff0c;但当面对复杂问题时&#xff0c;脚踏实地回归基本面也许才是根本解法。本文章深入介绍程序开发界面设计中&#xff0c;最简…

react 消息订阅-发布机制(解决兄弟组件通信问题)

消息订阅-发布机制 工具库: PubSubJS下载: npm install pubsub-js --save使用: 1)import PubSub from ‘pubsub-js’ //引入 2)PubSub.subscribe(‘delete’, function(data){ }); //订阅 3)PubSub.publish(‘delete’, data) //发布消息 App组件&#xff1a; import React,…

运行Myeclipse发生这事这是怎么回事,大神们

转载于:https://www.cnblogs.com/zhuh102/p/5753616.html

非零返回怎么解决_VLOOKUP如何返回多个值?

今天我来谈谈大家最熟悉的函数&#xff0c;也是使用频率最高的函数&#xff0c;基本是每天都在使用-VLOOKUP大家都知道VLOOKUP可以根据条件&#xff0c;查找并返回满足条件对应列的值&#xff0c;但是他的设定只是只能返回第一个满足条件的值如果我们要返回满足条件的多个值&am…

Fetch发送网络请求

1. 文档 https://github.github.io/fetch/https://segmentfault.com/a/1190000003810652 2. 特点 fetch: 原生函数&#xff0c;不再使用XmlHttpRequest对象提交ajax请求老版本浏览器可能不支持 3. 相关API GET请求 fetch(url).then(function(response) {return response.…

JSX详解React的事件绑定事件参数的传递

一、认识JSX 这段element变量的声明右侧赋值的标签语法是什么呢&#xff1f; 它不是一段字符串&#xff08;因为没有使用引号包裹&#xff09;&#xff0c;它看起来是一段HTML原生&#xff0c;但是我们能在js中直接给一个变量赋值html吗&#xff1f;其实是不可以的&#xff0c…

总谐波失真计算公式_新能源汽车技术|车用轮毂电机转矩谐波协同控制策略

点击 电机与控制应用 关注我们轮毂电机因结构简单、驱动灵活的特点广泛应用于轻型电动车辆。电机运行中存在的齿槽效应、逆变器非线性效应及电流谐波等问题&#xff0c;导致电机电磁转矩波动&#xff0c;影响车辆运行的平顺性。通过电磁转矩谐波分析发现其主要成分为低阶谐波。…

React条件渲染列表渲染

一、React条件渲染 某些情况下&#xff0c;界面的内容会根据不同的情况显示不同的内容&#xff0c;或者决定是否渲染某部分内容&#xff1a; 在vue中&#xff0c;我们会通过指令来控制&#xff1a;比如v-if、v-show&#xff1b;在React中&#xff0c;所有的条件判断都和普通的J…

解决VirtualBox错误:“FATAL:No bootable medium found!”

VirtualBox错误&#xff1a;“FATAL&#xff1a;No bootable medium found!”  用VirtualBox安装系统出现这个错误的几率极高&#xff0c;因为当哥出现同样问题的时候股沟了下”FATAL&#xff1a;No bootable medium found!“出现很多内容&#xff0c;但没一个把问题解决了的…

React事件总线

通过Context主要实现的是数据的共享&#xff0c;但是在开发中如果有跨组件之间的事件传递&#xff0c;应该如何操作呢&#xff1f; 一、安装events 在Vue中我们可以通过Vue的实例&#xff0c;快速实现一个事件总线&#xff08;EventBus&#xff09;&#xff0c;来完成操作&…

React中使用ref

一、如何使用ref 在React的开发模式中&#xff0c;通常情况下不需要、也不建议直接操作DOM原生&#xff0c;但是某些特殊的情况&#xff0c;确实需要获取到DOM进行某些操作&#xff1a; 管理焦点&#xff0c;文本选择或媒体播放&#xff1b;触发强制动画&#xff1b;集成第三方…

React中的受控组件和非受控组件

一、认识受控组件 在React中&#xff0c;HTML表单的处理方式和普通的DOM元素不太一样&#xff1a;表单元素通常会保存在一些内部的state。 比如下面的HTML表单元素&#xff1a; 这个处理方式是DOM默认处理HTML表单的行为&#xff0c;在用户点击提交时会提交到某个服务器中&…

JS动画 | 用TweenMax实现收集水滴效果

之前在CodePen上接触了TweenMax, 被它能做到的酷炫效果震撼了. (文末放了5个GSAP的效果GIF) 最近要做一个"收集水滴"的动效, 于是就试用了一下TweenMax实现这个效果. 什么是TweenMax TweenMax是GSAP(GreenSock Animation Platform)创作的动画工具库. GSAP的产品除了T…

React中的组件通信——父传子、子传父、Context

0、认识组件间的通信 在开发过程中&#xff0c;我们会经常遇到需要组件之间相互进行通信&#xff1a; 比如App可能使用了多个Header&#xff0c;每个地方的Header展示的内容不同&#xff0c;那么我们就需要使用者传递给Header一些数据&#xff0c;让其进行展示&#xff1b;又比…

React ref的转发

在前面学习ref时讲过&#xff0c;ref不能应用于函数式组件&#xff1a; 因为函数式组件没有实例&#xff0c;所以不能获取到对应的组件对象 但是&#xff0c;在开发中我们可能想要获取函数式组件中某个元素的DOM&#xff0c;这个时候我们应该如何操作呢&#xff1f; 方式一&…

电脑桌面 IE 图标删除不了的解决方法

电脑换了系统之后想把桌面的IE浏览器给删掉&#xff0c;可是直接删除又删不掉&#xff0c;杀毒软件查杀也没有问题。找了很多方法&#xff0c;终于才把它给解决了。下面&#xff0c;就把我的方法分享给桌面ie图标删除不了的解决方法&#xff0c;希望能对大家有所帮助! 方法 1 1…