6、統計每個月份中,最高的三個溫度。
輸入格式:年月日 空格 時分秒 TAB 溫度
inputfile:
1949-10-01 14:21:02 34c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WRunner {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJobName("weather");job.setJarByClass(WRunner.class);job.setMapperClass(WMapper.class);job.setReducerClass(WReducer.class);job.setMapOutputKeyClass(MyKey.class);job.setMapOutputValueClass(DoubleWritable.class);job.setPartitionerClass(MyPartitioner.class);job.setSortComparatorClass(MySort.class);job.setGroupingComparatorClass(MyGroup.class);job.setInputFormatClass(KeyValueTextInputFormat.class);job.setNumReduceTasks(3);Path in = new Path("/home/jinzhao/mrtest/input");FileInputFormat.setInputPaths(job, in);Path out = new Path("/home/jinzhao/mrtest/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job, out);job.waitForCompletion(true);}static class WMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");NullWritable nw = NullWritable.get();@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {try {Date date = sdf.parse(key.toString());Calendar c = Calendar.getInstance();c.setTime(date);int year = c.get(Calendar.YEAR);int month = c.get(Calendar.MONTH);int day = c.get(Calendar.DAY_OF_MONTH);String h = value.toString().trim();double hot = Double.parseDouble(h.substring(0, h.length()-1));context.write(new MyKey(year, month, day, hot), new DoubleWritable(hot));} catch (ParseException e) {e.printStackTrace();}}}static class WReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{@Overrideprotected void reduce(MyKey key, Iterable<DoubleWritable> values, Context context)throws IOException, InterruptedException {int i=0;for(DoubleWritable v : values){++i;String msg = key.getYear() + "\t" + (key.getMonth() + 1) + "\t" + (key.getDay()+1) + "\t" + v.get();context.write(new Text(msg), NullWritable.get());if (i == 3)break;}}}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 序列化所传输的对象*/
public class MyKey implements WritableComparable<MyKey> {private int year;private int month;private int day;private double hot;public MyKey(){super();}public MyKey(int year, int month, int day, double hot){this.year = year;this.month = month;this.day = day;this.hot = hot;}public int getYear() {return year;}public void setYear(int year) {this.year = year;}public int getMonth() {return month;}public void setMonth(int month) {this.month = month;}public int getDay() {return day;}public void setDay(int day) {this.day = day;}public double getHot() {return hot;}public void setHot(double hot) {this.hot = hot;}@Overridepublic void readFields(DataInput arg0) throws IOException {this.year = arg0.readInt();this.month = arg0.readInt();this.hot = arg0.readDouble();this.day = arg0.readInt();}@Overridepublic void write(DataOutput arg0) throws IOException {arg0.writeInt(year);arg0.writeInt(month);arg0.writeDouble(hot); arg0.writeInt(day);}/*** 判断是否是同一个对象,当对象作为key时。*/@Overridepublic int compareTo(MyKey arg0) {int r1 = Integer.compare(this.year, arg0.getYear());if (r1 == 0){int r2 = Integer.compare(this.month, arg0.getMonth());if (r2 == 0){return Double.compare(this.hot, arg0.getHot());}else{return r2;}}elsereturn r1;}}
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 分组,将具有相同年份和月份的MyKey作为一组,即传递给一个reduce函数进行处理。*/
public class MyGroup extends WritableComparator{public MyGroup(){super(MyKey.class, true);}public int compare (WritableComparable a, WritableComparable b){MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;int r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1 == 0){return Integer.compare(k1.getMonth(), k2.getMonth());}elsereturn r1;}
}
package hadoop.wheather;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 自定义的排序,先分组,再排序*/
public class MySort extends WritableComparator{public MySort(){super(MyKey.class, true);}public int compare (WritableComparable a, WritableComparable b){MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;int r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1 == 0){int r2 = Integer.compare(k1.getMonth(), k2.getMonth());if (r2 == 0){return -Double.compare(k1.getHot(), k2.getHot());}elsereturn r2;}elsereturn r1;}
}
package hadoop.wheather;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;/*** 分区,每个分区由一个reduce进程来处理*/
public class MyPartitioner extends Partitioner<MyKey, DoubleWritable>{@Overridepublic int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {return(key.getYear() - 1949)%numReduceTasks;}}
7、社交網路的朋友推薦算法
格式:用戶 TAB 朋友1 空格 朋友2 空格 ...
inputfile:
小明 老王 如花 林志玲
老王 小明 凤姐
如花 小明 李刚 凤姐
林志玲 小明 李刚 凤姐 郭美美
李刚 如花 凤姐 林志玲
郭美美 凤姐 林志玲
凤姐 如花 老王 林志玲 郭美美
第一次輸出:
格式:用戶1 空格 用戶2 TAB 次數
第二次輸出:
格式:用戶 TAB 推薦1 空格 推薦2 空格...
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Friends {static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>{@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String user = key.toString();String[] friends = value.toString().split(" ");for (int i = 0; i < friends.length; ++i){context.write(new Fof(user, friends[i]), new IntWritable(0));for (int j = i + 1; j < friends.length; ++j)context.write(new Fof(friends[i], friends[j]), new IntWritable(1));}}}static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>{@Overrideprotected void reduce(Fof key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;boolean flag = true;for (IntWritable i : values){if (i.get() == 0){flag = false;break;}else{sum = sum + i.get();}}if (flag)context.write(key, new IntWritable(sum));}}public static void main(String[] args){try {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Friends.class);job.setJobName("friend-I");job.setMapperClass(FofMapper.class);job.setReducerClass(FofReducer.class);job.setMapOutputKeyClass(Fof.class);job.setMapOutputValueClass(IntWritable.class);job.setInputFormatClass(KeyValueTextInputFormat.class);Path in = new Path("/home/jinzhao/mrtest/input");FileInputFormat.setInputPaths(job, in);Path out = new Path("/home/jinzhao/mrtest/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job, out);if ( job.waitForCompletion(true)){Job job2 = Job.getInstance(conf);job2.setJarByClass(Friends.class);job2.setJobName("friend-II");job2.setMapperClass(SortMapper.class);job2.setReducerClass(SortReducer.class);job2.setMapOutputKeyClass(User.class);job2.setMapOutputValueClass(User.class);job2.setInputFormatClass(KeyValueTextInputFormat.class);job2.setSortComparatorClass(FSort.class);job2.setGroupingComparatorClass(FGroup.class);Path in2 = new Path("/home/jinzhao/mrtest/output");FileInputFormat.setInputPaths(job2, in2);Path out2 = new Path("/home/jinzhao/mrtest/output2");if (fs.exists(out2))fs.delete(out2, true);FileOutputFormat.setOutputPath(job2, out2);job2.waitForCompletion(true);}} catch (Exception e){e.printStackTrace();}}static class SortMapper extends Mapper<Text, Text, User, User>{@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String[] friends = key.toString().split(" ");int count = Integer.parseInt(value.toString());context.write(new User(friends[0], count), new User(friends[1], count));context.write(new User(friends[1], count), new User(friends[0], count));}}static class SortReducer extends Reducer<User, User, Text, Text>{@Overrideprotected void reduce(User key, Iterable<User> values, Context context)throws IOException, InterruptedException {StringBuilder sb = new StringBuilder();for (User i : values)sb.append(i.getUsername() + "," + i.getCount() + " ");context.write(new Text(key.getUsername()), new Text(sb.toString().trim()));}}
}
import org.apache.hadoop.io.Text;public class Fof extends Text{public Fof(){super();}public Fof(String a, String b){super(getFof(a, b));}public static String getFof(String a, String b){int r = a.compareTo(b);if (r < 0)return a + " " + b;else return b + " " + a;}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class User implements WritableComparable<User>{private String username;private int count;public User(){}public User(String username, int count){this.username = username;this.count = count;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(username);out.writeInt(count);}@Overridepublic void readFields(DataInput in) throws IOException {this.username = in.readUTF();this.count = in.readInt();}@Overridepublic int compareTo(User arg0) {int c1 = this.username.compareTo(arg0.username);if (c1 == 0){return this.count - arg0.getCount();} elsereturn c1;}}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class FGroup extends WritableComparator{public FGroup(){super(User.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {User u1 = (User)a;User u2 = (User)b;return u1.getUsername().compareTo(u2.getUsername());}}
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class FSort extends WritableComparator{public FSort(){super(User.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {User u1 = (User)a;User u2 = (User)b;int c1 = u1.getUsername().compareTo(u2.getUsername());if (c1==0){return u2.getCount() - u1.getCount();} elsereturn c1;}
}