1. 计数器应用
计数器是用来记录job的执行进度和状态的。MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些 Counter 的数值表现出来的。
MapReduce 自带了许多默认Counter。在执行mr程序的日志上,大家也许注意到了类似以下这样的信息:
Shuffle Errors
BAD_ID=0
CONNECTION=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=89
File Output Format Counters
Bytes Written=86
内置计数器包括:
文件系统计数器(File System Counters)
作业计数器(Job Counters)
MapReduce框架计数器(Map-Reduce Framework)
Shuffle 错误计数器(Shuffle Errors)
文件输入格式计数器(File Output Format Counters)
文件输出格式计数器(File Input Format Counters)
当然, Hadoop也支持自定义计数器。在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现。
示例代码如下:
public class WordCount{static class WordCount Mapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Counter counter =context.getCounter(“SelfCounters”,”myCounters”);String[] words = value.toString().split(",");for (String word : words) {if("hello".equals(word)){counter.increment(1)};context.write(new Text(word), new LongWritable(1));}}}
2. 多job串联
一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现
示例代码:
1. ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration()); 2. controlledJob1.setJob(job1); 3. ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration()); 4. controlledJob2.setJob(job2); 5. controlledJob2.addDependingJob(controlledJob1); // job2 依赖于 job16. JobControl jc = new JobControl(chainName); 7. jc.addJob(controlledJob1); 8. jc.addJob(controlledJob2); 9. Thread jcThread = new Thread(jc); 10. jcThread.start(); 11. while(true){ 12. if(jc.allFinished()){ 13. System.out.println(jc.getSuccessfulJobList()); 14. jc.stop(); 15. return 0; 16. } 17. if(jc.getFailedJobList().size() > 0){ 18. System.out.println(jc.getFailedJobList()); 19. jc.stop(); 20. return 1; 21. } 22. }
转载于:https://blog.51cto.com/13587708/2295809