一. 计数器概述
在执行MapReduce程序时,控制台的输出中一般会包含如下内容。
这些输出就是MapReduce的全局计数器的输出信息。计数器是用来记录job的执行进度和状态的,它的作用可以理解为日志,方便用户了解任务的执行状况,辅助用户诊断故障。
常见内置计数器
-
File System Counters:跟踪作业读写的文件系统操作,如HDFS读写字节数。
-
Job Counters:作业相关的统计,如作业的提交数量、耗费的时间。
-
MapReduce Task Counters:Map和Reduce任务的统计,如map/reduce任务的输入输出记录数。
-
File Input | Output Format Counters:跟踪FilelnputFormat读取的字节数或FileOutputFormat输出的字节数。
二. MapReduce自定义计数器
尽管hadoop内置了很多常见的计数器,但是针对一些特定场景,MapReduce也提供了自定义计数器。
自定义计数器的使用分为以下两部:
-
首先通过context.getCounter方法获取一个全局计数器,创建的时候需要指定计数器所属的组名和计数器的名字。
-
在程序中需要使用计数器的地方,调用 counter 提供的方法即可
需求
在wordcount的案例中使用计数器输出文件的行数。
代码实现
package mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountMRCounter {static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 从程序上下文对象获取一个全局计数器,并指定计数器组和计数器名字Counter counter = context.getCounter("own_counter", "line Counter");String[] words = value.toString().split(" ");for (String word: words) {context.write(new Text(word), new IntWritable(1));}// 处理完1行,计数器加1counter.increment(1);}}static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WordCountMRCounter.class);job.setJobName("WordCount");// 设置输入,输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 设置Mapperjob.setMapperClass(WordCountMRCounter.WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 设置Reducerjob.setReducerClass(WordCountMRCounter.WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setNumReduceTasks(1);boolean waitFor = job.waitForCompletion(true);System.exit(waitFor ? 0 : 1);}
}
运行结果
# 查看输入文件,恰好也是3行
[root@hadoop1 ~]# hdfs dfs -text /test/a.txt
hello world
name hello
world