使用Hadoop MapReduce处理文本文件,Mapper负责将文本分割为单词,然后Reducer对每个单词进行计数,最后将结果写入输出文件。
// 定义WordCount公共类
public class WordCount {// 主入口方法,处理命令行参数public static void main(String[] args) throws Exception {// 创建Hadoop配置对象Configuration conf = new Configuration();// 创建Job实例,设置作业名称Job job = Job.getInstance(conf, "word count");// 设置作业的JAR包,这里使用WordCount类所在的包job.setJarByClass(WordCount.class);// 设置Mapper类job.setMapperClass(TokenizerMapper.class);// 设置Combiner和Reducer类,这里使用同一个类,因为Reduce操作不需要排序job.setCombinerClass(IntSumReducer.class);job.setReduceClass(IntSumReducer.class);// 设置输出键和值的类型job.setOutputKeyClass(Text.class); // 输出键:单词类型,Textjob.setOutputValueClass(IntWritable.class); // 输出值:单词计数,IntWritable// 将输入文件添加到作业FileInputFormat.addInputPath(job, new Path(args[0])); // 第一个参数是输入文件路径// 设置输出文件路径FileOutputFormat.setOutputPath(job, new Path(args[1])); // 第二个参数是输出文件路径// 等待作业完成,返回0表示成功,1表示失败System.exit(job.waitForCompletion(true) ? 0 : 1);}// Reducer类,统计单词的出现次数public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {// 初始化结果值为0private IntWritable result = new IntWritable();// 在reduce函数中,处理键值对,累加值public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get(); // 获取值并累加}result.set(sum); // 设置结果值context.write(key, result); // 将键值对写入输出}}// Mapper类,进行单词分词public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {// 声明全局变量,用于存储单个单词private final static IntWritable one = new IntWritable(1);private Text word = new Text();// map函数,将文本分割成单词,每个单词与1一起写入输出public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken()); // 获取下一个单词context.write(word, one); // 将单词和1写入输出}}}
}