文章目录
- 第1关:WordCount词频统计
- 第2关:HDFS文件读写
- 第3关:倒排索引
- 第4关: 网页排序——PageRank算法
第1关:WordCount词频统计
测试说明
以下是测试样例:
测试输入样例数据集:文本文档test1.txt和test2.txt
文档test1.txt中的内容为:
tale as old as time
true as it can be
beauty and the beast
文档test2.txt中的内容为:
ever just the same
ever as before
beauty and the beast
预期输出result.txt文档中的内容为:
and 2
as 4
beast 2
beauty 2
before 1
can 1
ever 2
it 1
just 1
old 1
same 1
tale 1
the 3
time 1
true 1
import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
第2关:HDFS文件读写
编程要求
本关的编程任务是补全右侧代码片段中的代码,具体要求及说明如下:
在主函数main中已获取hadoop的系统设置,并在其中创建HDFS文件。在main函数中,指定创建文档路径(必须设置为/user/hadoop/myfile才能评测),输入内容必须是本关要求内容才能评测。
添加读取文件输出部分
本关只要求在指定区域进行代码编写,其他区域仅供参考请勿改动。
测试说明
本关无测试样例,直接比较文件内容确定输出是否为“china cstor cstor cstor china”
import java.io.IOException;
import java.sql.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class hdfs {public static void main(String[] args) throws IOException {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);System.out.println(fs.getUri());Path file = new Path("/user/hadoop/myfile");if (fs.exists(file)) {System.out.println("File exists.");} else{FSDataOutputStream outStream = fs.create(file);outStream.writeUTF("china cstor cstor cstor china");outStream.close();}FSDataInputStream inStream = fs.open(file);String data = inStream.readUTF();FileSystem hdfs = file.getFileSystem(conf);FileStatus[] fileStatus = hdfs.listStatus(file);for(FileStatus status:fileStatus){System.out.println("FileOwer:"+status.getOwner());System.out.println("FileReplication:"+status.getReplication());System.out.println("FileModificationTime:"+new Date(status.getModificationTime()));System.out.println("FileBlockSize:"+status.getBlockSize());}System.out.println(data);System.out.println("Filename:"+file.getName());inStream.close();fs.close();}
}
第3关:倒排索引
编程要求
本关的编程任务是补全右侧代码片段中map和reduce函数中的代码,具体要求及说明如下:
在主函数main中已初始化hadoop的系统设置,包括hadoop运行环境的连接。
在main函数中,已经设置好了待处理文档路径(即input),以及结果输出路径(即output)。
在main函数中,已经声明了job对象,程序运行的工作调度已经设定好。
本关只要求在map和reduce函数的指定区域进行代码编写,其他区域请勿改动。
测试说明
测试输入样例数据集:文本文档test1.txt, test2.txt
文档test1.txt中的内容为:
tale as old as time
true as it can be
beauty and the beast
文档test2.txt中的内容为:
ever just the same
ever as before
beauty and the beast
预期输出文件result.txt的内容为:
import java.io.IOException;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)context.getInputSplit();String fileName = fileSplit.getPath().getName();String word;IntWritable frequence=new IntWritable();int one=1;Hashtable<String,Integer> hashmap=new Hashtable();StringTokenizer itr = new StringTokenizer(value.toString());for(;itr.hasMoreTokens(); ) { word=itr.nextToken();if(hashmap.containsKey(word)){hashmap.put(word,hashmap.get(word)+1);}else{hashmap.put(word, one);}}for(Iterator<String> it=hashmap.keySet().iterator();it.hasNext();){word=it.next();frequence=new IntWritable(hashmap.get(word));Text fileName_frequence = new Text(fileName+"@"+frequence.toString()); context.write(new Text(word),fileName_frequence);}}}public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException ,InterruptedException{ String fileName="";int sum=0;String num;String s;for (Text val : values) {s= val.toString();fileName=s.substring(0, val.find("@"));num=s.substring(val.find("@")+1, val.getLength());sum+=Integer.parseInt(num);}IntWritable frequence=new IntWritable(sum);context.write(key,new Text(fileName+"@"+frequence.toString()));}}public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> { @Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { Iterator<Text> it = values.iterator();StringBuilder all = new StringBuilder();if(it.hasNext()) all.append(it.next().toString());for(;it.hasNext();) {all.append(";");all.append(it.next().toString()); }context.write(key, new Text(all.toString()));}}public static void main(String[] args) {if(args.length!=2){System.err.println("Usage: InvertedIndex <in> <out>");System.exit(2);}try {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();Job job = new Job(conf, "invertedindex");job.setJarByClass(InvertedIndex.class);job.setMapperClass(InvertedIndexMapper.class);job.setCombinerClass(InvertedIndexCombiner.class);job.setReducerClass(InvertedIndexReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} catch (Exception e) { e.printStackTrace();}}
}
第4关: 网页排序——PageRank算法
测试说明
输入文件格式如下:
1 1.0 2 3 4 5 6 7 8
2 2.0 3 4 5 6 7 8
3 3.0 4 5 6 7 8
4 4.0 5 6 7 8
5 5.0 6 7 8
6 6.0 7 8
7 7.0 8
8 8.0 1 2 3 4 5 6 7
注:为了简化运算,已经对网页集关系进行了规整,并且给出了相应的初始PR值。
以第一行为例: 1表示网址(以tab键隔开),1.0为给予的初始pr值,2,3,4,5,6,7,8为从网址1指向的网址。
输出文件格式:
The origin result
1 1.0 2 3 4 5 6 7 8
2 2.0 3 4 5 6 7 8
3 3.0 4 5 6 7 8
4 4.0 5 6 7 8
5 5.0 6 7 8
6 6.0 7 8
7 7.0 8
8 8.0 1 2 3 4 5 6 7
The 1th result
1 0.150 1.121 _2 3 4 5 6 7 8
2 0.150 1.243 _3 4 5 6 7 8
3 0.150 1.526 _4 5 6 7 8
4 0.150 2.036 _5 6 7 8
5 0.150 2.886 _6 7 8
6 0.150 4.303 _7 8
7 0.150 6.853 _8
8 0.150 11.831 _1 2 3 4 5 6 7
The 2th result
1 0.150 1.587 _2 3 4 5 6 7 8
2 0.150 1.723 _3 4 5 6 7 8
3 0.150 1.899 _4 5 6 7 8
4 0.150 2.158 _5 6 7 8
5 0.150 2.591 _6 7 8
6 0.150 3.409 _7 8
7 0.150 5.237 _8
8 0.150 9.626 _1 2 3 4 5 6 7
The 3th result
1 0.150 1.319 _2 3 4 5 6 7 8
2 0.150 1.512 _3 4 5 6 7 8
3 0.150 1.756 _4 5 6 7 8
4 0.150 2.079 _5 6 7 8
5 0.150 2.537 _6 7 8
6 0.150 3.271 _7 8
7 0.150 4.720 _8
8 0.150 8.003 _1 2 3 4 5 6 7
The 4th result
1 0.150 1.122 _2 3 4 5 6 7 8
2 0.150 1.282 _3 4 5 6 7 8
3 0.150 1.496 _4 5 6 7 8
4 0.150 1.795 _5 6 7 8
5 0.150 2.236 _6 7 8
6 0.150 2.955 _7 8
7 0.150 4.345 _8
8 0.150 7.386 _1 2 3 4 5 6 7
The 5th result
1 0.150 1.047 _2 3 4 5 6 7 8
2 0.150 1.183 _3 4 5 6 7 8
3 0.150 1.365 _4 5 6 7 8
4 0.150 1.619 _5 6 7 8
5 0.150 2.000 _6 7 8
6 0.150 2.634 _7 8
7 0.150 3.890 _8
8 0.150 6.686 _1 2 3 4 5 6 7
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.StringTokenizer;
import java.util.Iterator;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class PageRank {public static class MyMapper extends Mapper<Object, Text, Text, Text>{private Text id = new Text();public void map(Object key, Text value, Context context ) throws IOException, InterruptedException{String line = value.toString();
//判断是否为输入文件if(line.substring(0,1).matches("[0-9]{1}")){boolean flag = false;if(line.contains("_")){line = line.replace("_","");flag = true;}
//对输入文件进行处理String[] values = line.split("\t");Text t = new Text(values[0]);String[] vals = values[1].split(" ");String url="_";//保存url,用作下次计算double pr = 0;int i = 0;int num = 0;if(flag){i=2;pr=Double.valueOf(vals[1]);num=vals.length-2;}else{i=1;pr=Double.valueOf(vals[0]);num=vals.length-1;}for(;i<vals.length;i++){url=url+vals[i]+" ";id.set(vals[i]);Text prt = new Text(String.valueOf(pr/num));context.write(id,prt);}context.write(t,new Text(url));}}}public static class MyReducer extends Reducer<Text,Text,Text,Text>{private Text result = new Text();private Double pr = new Double(0);public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException{double sum=0;String url="";//****请通过url判断否则是外链pr,作计算前预处理****//
/*********begin*********/for(Text val:values) { //发现_标记则表明是url,否则是外链pr,要参与计算 if(!val.toString().contains("_")) { sum=sum+Double.valueOf(val.toString()); } else { url=val.toString(); } } pr=0.15+0.85*sum; String str=String.format("%.3f",pr); result.set(new Text(str+" "+url)); context.write(key,result); /*********end**********/ //****请补全用完整PageRank计算公式计算输出过程,q取0.85****//
/*********begin*********//*********end**********/ }}public static void main(String[] args) throws Exception{String paths="file:///tmp/input/Wiki0";//输入文件路径,不要改动String path1=paths;String path2="";for(int i=1;i<=5;i++)//迭代5次{System.out.println("This is the "+i+"th job!");System.out.println("path1:"+path1);System.out.println("path2:"+path2);Configuration conf = new Configuration();Job job = new Job(conf, "PageRank");path2=paths+i; job.setJarByClass(PageRank.class);job.setMapperClass(MyMapper.class);//****请为job设置Combiner类****//
/*********begin*********/
job.setCombinerClass(MyReducer.class); /*********end**********/ job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(path1));FileOutputFormat.setOutputPath(job, new Path(path2));path1=path2; job.waitForCompletion(true);System.out.println(i+"th end!");}} }