8、PageRank
Page-rank源于Google,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。
Page-rank实现了将链接价值概念作为排名因素。
算法原理
– 入链 == 投票
• Page-rank 让链接来“ 投票 “ ,到一个页面的超链接相当于对该页投一票。
– 入链数量
• 如果一个页面节点接收到的其他网页指向的入链数量越多,那么这个页面越重要。
– 入链质量
• 指向页面A的入链质量不同,质量高的页面会通过链接向其他页面传递更多的权重。所以越是质量高的页面指向页面A ,则页面
A 越重要。
– 初始值
• 每个页面设置相同的PR值
• Google的page-rank算法给每个页面的PR初始值为1。
– 迭代递归计算(收敛)
• Google不断的重复计算每个页面的Page-rank。那么经过不断的重复计算,这些页面的PR值会趋向于稳定,也就是收敛的状态。
• 在具体企业应用中怎么样确定收敛标准?
– 1、每个页面的PR值和上一次计算的PR相等
– 2、设定一个差值指标(0.0001 )。当所有页面和上一次计算的PR差值平均小于该标准时,则收敛。
– 3、设定一个百分比(99% ),当99%的页面和上一次计算的PR相等
– 修正Page-rank计算公式
• 由于存在一些出链为0,也就是那些不链接任何其他网页的网,也称为孤立网页,使得很多网页不能被访问到。因此需要对Page-rank公式进行修正,即在简单公式的基础上增加了阻尼系数(damping factor ) q , q 一般取值q=0.85。
– 完整Page-rank计算公式
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PageRankRun {public static enum MyCounter{counter}public static class PRMapper extends Mapper<Text, Text, Text, Text>{//webname:weight linkout1 linkout2 linkoutn//webname weight:linkout1 linkout2 linkoutn//linkout1 weight//linkout2 weight//linkoutn weight@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String[] links = value.toString().split(" ");int num = links.length;String[] pageRank = key.toString().split(":");context.write(new Text(pageRank[0]), new Text(pageRank[1] + ":" + value.toString()));double weight = Double.parseDouble(pageRank[1].trim());for(String s : links)context.write(new Text(s), new Text(weight/num + ""));}}public static class PRReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {String links = "";double oldPR = 0;int totalWebNum = context.getConfiguration().getInt("totalWebNum", 1);double newPR = (1-0.85)/totalWebNum;for(Text t : values){String[] s = t.toString().trim().split(":");if (s.length > 1){links = s[1].trim();oldPR = Double.parseDouble(s[0]);}else{newPR += Double.parseDouble(s[0])*0.85;}}int i = (int)Math.abs((oldPR - newPR)*10000);context.getCounter(MyCounter.counter).increment(i);context.write(new Text(key.toString() + ":" + newPR), new Text(links));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();conf.setInt("totalWebNum", 4);boolean flag = true;double limit = 0.0001;while(true){Job job = Job.getInstance(conf);job.setJarByClass(PageRankRun.class);job.setInputFormatClass(KeyValueTextInputFormat.class);job.setMapperClass(PRMapper.class);job.setReducerClass(PRReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);Path in;Path out;if (flag){in = new Path("/home/jinzhao/mrtest/input");out = new Path("/home/jinzhao/mrtest/output");flag = false;}else{out = new Path("/home/jinzhao/mrtest/input");in = new Path("/home/jinzhao/mrtest/output");flag = true;}FileInputFormat.setInputPaths(job, in);FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job, out);if (job.waitForCompletion(true)){long sum= job.getCounters().findCounter(MyCounter.counter).getValue();double avgd= sum*1.0/(conf.getInt("totalWebNum", 1)*10000);if(avgd < limit){fs.delete(in, true);break;}}}}
}