本身求平均数很简单的,必须用到combine的话我在两个地方废了很多时间,
一是combine的输入不仅仅是map的输出,还有可能是combine的输出,所以对value的处理得分两种情况吧;
二是结果要保留4位有效数字。。。噗,注意保留4位有效数字不等于小数点后面有四位,第二不能用parse!它只能转成整形。
第三,代码写的实在比较挫,哎。
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Avg {
public static class Map extends
Mapper
{
protected void map(LongWritable
key, Text value, Context context)
throws
IOException, InterruptedException {
String line =
value.toString();
StringTokenizer
words = new StringTokenizer(line, "\n");
//
对每一行进行分割
while
(words.hasMoreElements()) {
StringTokenizer
tokenizerLine = new StringTokenizer(
words.nextToken());
String
strAlp = tokenizerLine.nextToken();
String
strData = tokenizerLine.nextToken();
Text
alphabet = new Text(strAlp);
Text
score = new Text(strData);
context.write(alphabet,
score);
}
}
}
public static class Combine extends
Reducer
{
//将部分和以及出现次数共同作为value
private Text tmp = new
Text();
public void reduce(Text key,
Iterable values, Context context)
throws
IOException, InterruptedException {
//
统计某字母出现次数和部分和
int sum =
0;
int count =
0;
String[]
val_input = null;
for (Text val
: values) {
if((val_input=val.toString().split(":")).length==2){
sum
+= Integer.parseInt(val_input[0].toString());
count
+=
Integer.parseInt(val_input[1].toString()); }
else{
sum
+= Integer.parseInt(val.toString());
count++;
}
} // 设置value值为部分和以及出现次数
tmp.set(sum +
":" + count);
context.write(key, tmp);
}
}
public static class Reduce extends Reducer
{
private String tmp_sum = new
String();
private String tmp_count = new
String();
public void reduce(Text key,
Iterable values, Context context)
throws IOException, InterruptedException {
int sum_all =
0;
int count_all
=
0; for (Text val
: values) {
String
str_tmp=val.toString();
//将部分和以及出现次数分割,分别求和
int
splitIndex = str_tmp.indexOf(":");
tmp_sum
= str_tmp.substring(0, splitIndex);
tmp_count
= str_tmp.substring(splitIndex+1);
int
int_sum = Integer.parseInt(tmp_sum);
int
int_count =
Integer.parseInt(tmp_count); sum_all
+= int_sum;
count_all += int_count;
} double average = (sum_all * 1.0) / (count_all *
1.0); MathContext mathContext = new MathContext(4);
BigDecimal c = new BigDecimal(average);
BigDecimal aver_final = c.round(mathContext);
double aver_4 = aver_final.doubleValue();
String str_aver = String.valueOf(aver_4);
Text aver = new Text();
aver.set(str_aver);
context.write(key, aver);
}
}
public static void main(String[] args) throws
Exception {
Configuration conf = new
Configuration();
Job job = new Job(conf,
"Avg_use_combine");
job.setJarByClass(Avg.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,
new Path(args[0]));
FileOutputFormat.setOutputPath(job,
new Path(args[1]));
System.exit(job.waitForCompletion(true)
? 0 : 1);
}
}