目前,我正在接受Coursera的培训“ 挖掘海量数据集 ”。 我对MapReduce和Apache Hadoop感兴趣已有一段时间了,通过本课程,我希望对何时以及如何MapReduce可以帮助解决一些现实世界中的业务问题有更多的了解(我在这里介绍了另一种解决方法)。 该Coursera课程主要侧重于使用算法的理论,而较少涉及编码本身。 第一周是关于PageRanking以及Google如何使用它来对页面进行排名。 幸运的是,与Hadoop结合可以找到很多关于该主题的信息。 我到这里结束并决定仔细看一下这段代码。
我所做的就是获取这段代码 (将其分叉)并重新编写了一下。 我创建的映射器单元测试和减速器跟我描述这里 。 作为测试用例,我使用了课程中的示例。 我们有三个相互链接和/或彼此链接的网页:
此链接方案应解析为以下页面排名:
- Y 7/33
- 5/33
- M 21/33
由于MapReduce示例代码期望输入“ Wiki页面” XML ,因此我创建了以下测试集:
<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.10/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.10/ http://www.mediawiki.org/xml/export-0.10.xsd" version="0.10" xml:lang="en"><page><title>A</title><id>121173</id><revision>...<text xml:space="preserve" bytes="6523">[[Y]] [[M]]</text></revision></page><page><title>Y</title><id>121173</id><revision>...<text xml:space="preserve" bytes="6523">[[A]] [[Y]]</text></revision></page><page><title>M</title><id>121173</id><revision>...<text xml:space="preserve" bytes="6523">[[M]]</text></revision></page>
</mediawiki>
原始页面本身已经很好地解释了它的全局工作方式。 我将仅描述我创建的单元测试。 有了原始的解释和我的单元测试,您应该能够解决问题并了解发生了什么。
如上所述,整个工作分为三个部分:
- 解析
- 计算
- 订购
在解析部分中,将原始XML提取,拆分成多个页面并进行映射,以便我们将页面作为键和它具有传出链接的页面的值作为输出获得。 因此,单元测试的输入将是三个“ Wiki”页面XML,如上所示。 预期带有链接页面的页面的“标题”。 单元测试如下所示:
package net.pascalalma.hadoop.job1;...public class WikiPageLinksMapperTest {MapDriver<LongWritable, Text, Text, Text> mapDriver;String testPageA = " <page>\n" +" <title>A</title>\n" +" ..." +" <text xml:space=\"preserve\" bytes=\"6523\">[[Y]] [[M]]</text>\n" +" </revision>";String testPageY = " <page>\n" +" <title>Y</title>\n" +" ..." +" <text xml:space=\"preserve\" bytes=\"6523\">[[A]] [[Y]]</text>\n" +" </revision>\n" +" </page>";String testPageM = " <page>\n" +" <title>M</title>\n" +" ..." +" <text xml:space=\"preserve\" bytes=\"6523\">[[M]]</text>\n" +" </revision>\n" +" </page>";@Beforepublic void setUp() {WikiPageLinksMapper mapper = new WikiPageLinksMapper();mapDriver = MapDriver.newMapDriver(mapper);}@Testpublic void testMapper() throws IOException {mapDriver.withInput(new LongWritable(1), new Text(testPageA));mapDriver.withInput(new LongWritable(2), new Text(testPageM));mapDriver.withInput(new LongWritable(3), new Text(testPageY));mapDriver.withOutput(new Text("A"), new Text("Y"));mapDriver.withOutput(new Text("A"), new Text("M"));mapDriver.withOutput(new Text("Y"), new Text("A"));mapDriver.withOutput(new Text("Y"), new Text("Y"));mapDriver.withOutput(new Text("M"), new Text("M"));mapDriver.runTest(false);}
}
映射器的输出将成为我们的reducer的输入。 那个的单元测试如下:
package net.pascalalma.hadoop.job1;
...
public class WikiLinksReducerTest {ReduceDriver<Text, Text, Text, Text> reduceDriver;@Beforepublic void setUp() {WikiLinksReducer reducer = new WikiLinksReducer();reduceDriver = ReduceDriver.newReduceDriver(reducer);}@Testpublic void testReducer() throws IOException {List<Text> valuesA = new ArrayList<Text>();valuesA.add(new Text("M"));valuesA.add(new Text("Y"));reduceDriver.withInput(new Text("A"), valuesA);reduceDriver.withOutput(new Text("A"), new Text("1.0\tM,Y"));reduceDriver.runTest();}
}
如单元测试所示,我们期望reducer将输入减少到“初始”页面等级1.0的值,该等级与(关键)页面具有传出链接的所有页面连接。 这是该阶段的输出,将用作“计算”阶段的输入。
在计算部分中,将对传入的页面排名进行重新计算,以实现“ 幂迭代 ”方法。 将多次执行此步骤,以获得给定页面集的可接受页面排名。 如前所述,前一步的输出是该步骤的输入,正如我们在此映射器的单元测试中所看到的:
package net.pascalalma.hadoop.job2;
...
public class RankCalculateMapperTest {MapDriver<LongWritable, Text, Text, Text> mapDriver;@Beforepublic void setUp() {RankCalculateMapper mapper = new RankCalculateMapper();mapDriver = MapDriver.newMapDriver(mapper);}@Testpublic void testMapper() throws IOException {mapDriver.withInput(new LongWritable(1), new Text("A\t1.0\tM,Y"));mapDriver.withInput(new LongWritable(2), new Text("M\t1.0\tM"));mapDriver.withInput(new LongWritable(3), new Text("Y\t1.0\tY,A"));mapDriver.withOutput(new Text("M"), new Text("A\t1.0\t2"));mapDriver.withOutput(new Text("A"), new Text("Y\t1.0\t2"));mapDriver.withOutput(new Text("Y"), new Text("A\t1.0\t2"));mapDriver.withOutput(new Text("A"), new Text("|M,Y"));mapDriver.withOutput(new Text("M"), new Text("M\t1.0\t1"));mapDriver.withOutput(new Text("Y"), new Text("Y\t1.0\t2"));mapDriver.withOutput(new Text("A"), new Text("!"));mapDriver.withOutput(new Text("M"), new Text("|M"));mapDriver.withOutput(new Text("M"), new Text("!"));mapDriver.withOutput(new Text("Y"), new Text("|Y,A"));mapDriver.withOutput(new Text("Y"), new Text("!"));mapDriver.runTest(false);}
}
源页面中说明了此处的输出。 “额外”项目带有“!” 和'|' 在减少步骤中对于计算是必需的。 减速器的单元测试如下:
package net.pascalalma.hadoop.job2;
...
public class RankCalculateReduceTest {ReduceDriver<Text, Text, Text, Text> reduceDriver;@Beforepublic void setUp() {RankCalculateReduce reducer = new RankCalculateReduce();reduceDriver = ReduceDriver.newReduceDriver(reducer);}@Testpublic void testReducer() throws IOException {List<Text> valuesM = new ArrayList<Text>();valuesM.add(new Text("A\t1.0\t2"));valuesM.add(new Text("M\t1.0\t1"));valuesM.add(new Text("|M"));valuesM.add(new Text("!"));reduceDriver.withInput(new Text("M"), valuesM);List<Text> valuesA = new ArrayList<Text>();valuesA.add(new Text("Y\t1.0\t2"));valuesA.add(new Text("|M,Y"));valuesA.add(new Text("!"));reduceDriver.withInput(new Text("A"), valuesA);List<Text> valuesY = new ArrayList<Text>();valuesY.add(new Text("Y\t1.0\t2"));valuesY.add(new Text("|Y,A"));valuesY.add(new Text("!"));valuesY.add(new Text("A\t1.0\t2"));reduceDriver.withInput(new Text("Y"), valuesY);reduceDriver.withOutput(new Text("A"), new Text("0.6\tM,Y"));reduceDriver.withOutput(new Text("M"), new Text("1.4000001\tM"));reduceDriver.withOutput(new Text("Y"), new Text("1.0\tY,A"));reduceDriver.runTest(false);}
}
如图所示,映射器的输出被重新创建为输入,我们检查reducer的输出是否与页面等级计算的第一次迭代相匹配。 每次迭代将导致相同的输出格式,但可能具有不同的页面等级值。
最后一步是“订购”部分。 这非常简单,单元测试也是如此。 这部分仅包含一个映射器,该映射器获取上一步的输出并将其“重新格式化”为所需的格式:pagerank +按pagerank的页面顺序。 当将映射器结果提供给化简器步骤时,按键进行排序是由Hadoop框架完成的,因此该排序不会反映在Mapper单元测试中。 此单元测试的代码是:
package net.pascalalma.hadoop.job3;
...
public class RankingMapperTest {MapDriver<LongWritable, Text, FloatWritable, Text> mapDriver;@Beforepublic void setUp() {RankingMapper mapper = new RankingMapper();mapDriver = MapDriver.newMapDriver(mapper);}@Testpublic void testMapper() throws IOException {mapDriver.withInput(new LongWritable(1), new Text("A\t0.454545\tM,Y"));mapDriver.withInput(new LongWritable(2), new Text("M\t1.90\tM"));mapDriver.withInput(new LongWritable(3), new Text("Y\t0.68898\tY,A"));//Please note that we cannot check for ordering here because that is done by Hadoop after the Map phasemapDriver.withOutput(new FloatWritable(0.454545f), new Text("A"));mapDriver.withOutput(new FloatWritable(1.9f), new Text("M"));mapDriver.withOutput(new FloatWritable(0.68898f), new Text("Y"));mapDriver.runTest(false);}
}
因此,在这里,我们只检查映射器是否接受输入并正确格式化输出。
总结了单元测试的所有示例。 通过这个项目,您应该能够自己对其进行测试,并且对原始代码的工作方式有更深入的了解。 它肯定有助于我理解它!
- 包括单元测试在内的完整代码版本可以在这里找到。
翻译自: https://www.javacodegeeks.com/2015/02/calculate-pageranks-apache-hadoop.html