文章目录
- 1. 实战概述
- 2. 提出任务
-
- 3. 准备数据
- 3.1 在云主机上创建文本文件
- 3.2 上传文件到HDFS指定目录
- 4. 实现步骤
- 4.1 创建Maven项目
- 4.2 添加相关依赖
- 4.3 创建日志属性文件
- 4.4 创建网址去重映射器类
- 4.5 创建网址去重归并器类
- 4.6 创建网址去重驱动器类
- 4.7 启动应用,查看结果
- 5. 拓展练习1 - 显示每个网址重复次数
- 5.1 修改网址去重归并器
- 5.2 修改网址去重驱动器类
- 5.3 启动应用,查看结果
- 6. 拓展练习2 - 按网址重复次数降序排列
- 6.1 创建网址实体类
- 6.2 修改网址去重归并器类
- 6.3 启动应用,查看结果
- 6.4 解决重复次数相同的网址被删问题
- 6.5 启动应用,查看结果
- 7. 实战小结
1. 实战概述
- 本次实战通过Hadoop MapReduce实现了IP地址的去重与统计。首先,使用Mapper读取IP地址并输出,Reducer进行去重操作。接着,扩展功能统计每个IP地址的访问次数,并按访问次数降序排列。通过自定义
IPBean
类和调整Reducer逻辑,解决了排序过程中重复次数相同的IP地址被删除的问题。最终,成功输出去重后的IP地址及其访问次数,并按访问次数降序排列。
2. 提出任务
2.1 原始问题
- 某人今天访问了许多不同的网站,移动或电信日志会记录每次访问的详细信息。有些网站被频繁访问,而有些则访问次数较少。为了分析该用户的访问行为,需要统计他今天访问了多少个不同的网站。通过处理日志数据,去除重复的网站记录,可以准确计算出他访问的唯一网站数量。这一过程不仅有助于了解用户的浏览习惯,还能为网络优化和个性化推荐提供数据支持。
2.2 简单化处理
192.168.234.21
192.168.234.22
192.168.234.21
192.168.234.21
192.168.234.23
192.168.234.21
192.168.234.21
192.168.234.21
192.168.234.25
192.168.234.21
192.168.234.21
192.168.234.26
192.168.234.21
192.168.234.27
192.168.234.21
192.168.234.27
192.168.234.21
192.168.234.29
192.168.234.21
192.168.234.26
192.168.234.21
192.168.234.25
192.168.234.25
192.168.234.21
192.168.234.22
192.168.234.21
3. 准备数据
3.1 在云主机上创建文本文件
- 执行命令:
vim ips.txt
,录入IP地址,存盘退出,然后执行命令:cat ips.txt
3.2 上传文件到HDFS指定目录
- 执行命令:
hdfs dfs -mkdir -p /deduplicateips/input
,创建目录
- 执行命令:
hdfs dfs -put ips.txt /deduplicateips/input
,上传文件
4. 实现步骤
4.1 创建Maven项目
- 设置项目名称、位置、构建系统、JDK版本……
- 单击【Create】按钮,生成项目基本骨架
4.2 添加相关依赖
- 修改pom.xml文件,添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.mr</groupId><artifactId>DeduplicateIPs</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
- 刷新项目依赖
4.3 创建日志属性文件
- 在
resources
目录里创建log4j.properties
文件
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/deduplicateips.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4.4 创建网址去重映射器类
- 创建
net.huawei.mr
包
- 在
net.huawei.mr
包里创建DeduplicateIPsMapper
类
package net.huawei.mr;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
public class DeduplicateIPsMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {context.write(value, NullWritable.get());}
}
- 代码说明:
DeduplicateIPsMapper
是一个Hadoop MapReduce的Mapper类,用于处理IP地址的去重任务。它继承自Mapper<LongWritable, Text, Text, NullWritable>
,表示输入键为行偏移量(LongWritable
),输入值为IP地址(Text
),输出键为IP地址(Text
),输出值为NullWritable
(无实际值)。在map
方法中,直接将输入的IP地址作为键输出,值为NullWritable.get()
,以便在Reducer阶段进行去重操作。
4.5 创建网址去重归并器类
- 在
net.huawei.mr
包里创建DeduplicateIPsReducer
类
package net.huawei.mr;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
public class DeduplicateIPsReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
- 代码说明:
DeduplicateIPsReducer
是一个Hadoop MapReduce的Reducer类,用于实现IP地址的去重功能。它继承自Reducer<Text, NullWritable, Text, NullWritable>
,表示输入键为IP地址(Text
),输入值为NullWritable
,输出键为去重后的IP地址(Text
),输出值为NullWritable
。在reduce
方法中,直接将输入的IP地址作为键输出,值为NullWritable.get()
,确保每个IP地址只输出一次,从而实现去重效果。
4.6 创建网址去重驱动器类
- 在
net.huawei.mr
包里创建DeduplicateIPsDriver
类
package net.huawei.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;
public class DeduplicateIPsDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.set("dfs.client.use.datanode.hostname", "true");Job job = Job.getInstance(conf);job.setJarByClass(DeduplicateIPsDriver.class);job.setMapperClass(DeduplicateIPsMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setReducerClass(DeduplicateIPsReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);String uri = "hdfs://bigdata1:9000";Path inputPath = new Path(uri + "/deduplicateips/input");Path outputPath = new Path(uri + "/deduplicateips/output");FileSystem fs = FileSystem.get(new URI(uri), conf);fs.delete(outputPath, true);FileInputFormat.addInputPath(job, inputPath);FileOutputFormat.setOutputPath(job, outputPath);job.waitForCompletion(true);System.out.println("======统计结果======");FileStatus[] fileStatuses = fs.listStatus(outputPath);for (int i = 1; i < fileStatuses.length; i++) {System.out.println(fileStatuses[i].getPath());FSDataInputStream in = fs.open(fileStatuses[i].getPath());IOUtils.copyBytes(in, System.out, 4096, true);}}
}
- 代码说明:
DeduplicateIPsDriver
是Hadoop MapReduce任务的驱动器类,负责配置和启动IP地址去重任务。它设置了Mapper和Reducer类,并定义了输入输出路径。通过FileSystem
操作HDFS文件系统,删除旧的输出目录并提交作业。作业完成后,读取并输出结果文件内容到控制台。该类实现了从HDFS读取数据、去重处理并将结果写回HDFS的完整流程。
4.7 启动应用,查看结果
- 运行
DeduplicateIPsDriver
类,提示没有访问权限
- 执行命令:
hdfs dfs -chmod -R 775 /deduplicateips
,通过设置权限,确保只有授权用户或组可以修改/deduplicateips
目录中的内容,同时其他用户可以读取和执行。
- 执行命令:
hdfs dfs -chown -R huawei:supergroup /deduplicateips
,更改所有者:将/deduplicateips
目录及其内容的所有者设置为 huawei
,这意味着huawei
用户对该目录及其内容拥有完全控制权。更改组:将/deduplicateips
目录及其内容的组设置为 supergroup
,这意味着supergroup
组的成员可以根据组权限访问该目录及其内容。
- 再次运行
DeduplicateIPsDriver
类
- 可以看到,
26
个IP地址记录去重之后,只有7
个IP地址记录。
5. 拓展练习1 - 显示每个网址重复次数
5.1 修改网址去重归并器
DeduplicateIPsReducer
类的输出值类型改为IntWritable
,reduce
方法里遍历值迭代器统计网址重复次数
package net.huawei.mr;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
public class DeduplicateIPsReducer extends Reducer<Text, NullWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {int count = 0;for (NullWritable value: values) {count++;}context.write(key, new IntWritable(count));}
}
- 代码说明:
DeduplicateIPsReducer
是一个Hadoop MapReduce的Reducer类,用于统计每个IP地址的出现次数。它继承自Reducer<Text, NullWritable, Text, IntWritable>
,表示输入键为IP地址(Text
),输入值为NullWritable
,输出键为IP地址(Text
),输出值为出现次数(IntWritable
)。在reduce
方法中,通过遍历values
迭代器统计每个IP地址的出现次数,并将结果写入上下文。
5.2 修改网址去重驱动器类
- 将输出值类型由
NullWritable
改为IntWritable
5.3 启动应用,查看结果
- 运行
DeduplicateIPsDriver
类
- 目前输出结果默认按照IP地址(key)升序排列。若需根据网址重复次数降序排列,以反映网址的受欢迎程度,可以采取以下方法:首先,自定义一个
WritableComparable
类(如IPBean
),封装IP地址及其重复次数,并实现compareTo
方法以按重复次数降序排序。其次,在Reducer中使用TreeSet
或PriorityQueue
存储结果,确保按重复次数排序后再输出。最后,可在Driver类中设置自定义的排序类(SortComparator
),使MapReduce框架在Shuffle阶段按重复次数降序排序。通过这种方式,输出结果将按重复次数从高到低排列,直观展示受欢迎程度。
6. 拓展练习2 - 按网址重复次数降序排列
6.1 创建网址实体类
- 在
net.huawei.mr
包里创建IPBean
类
package net.huawei.mr;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class IPBean implements WritableComparable<IPBean> {private String ip;private int count;public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic int compareTo(IPBean o) {return o.count - this.count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(ip);out.writeInt(count);}@Overridepublic void readFields(DataInput in) throws IOException {ip = in.readUTF();count = in.readInt();}
}
- 代码说明:
IPBean
是一个自定义的Hadoop WritableComparable
类,用于封装IP地址及其访问次数。它实现了WritableComparable<IPBean>
接口,支持序列化和反序列化操作。compareTo
方法按访问次数降序排序,确保在MapReduce任务中能够按重复次数从高到低排列结果。通过write
和readFields
方法,实现了数据的序列化和反序列化,便于在Hadoop集群中传输和存储。
6.2 修改网址去重归并器类
DeduplicateIPsReducer
通过 TreeSet
实现IP地址去重和按访问次数降序排序。在 reduce
方法中统计IP地址出现次数,封装为 IPBean
并存入 TreeSet
。cleanup
方法遍历 TreeSet
,按访问次数降序输出结果。利用 TreeSet
的自动排序功能,简化了排序逻辑,代码清晰高效,适用于大数据场景下的IP地址分析与处理。
package net.huawei.mr;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.TreeSet;
public class DeduplicateIPsReducer extends Reducer<Text, NullWritable, Text, IntWritable> {private TreeSet<IPBean> treeSet = new TreeSet<>();@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {int count = 0;for (NullWritable value: values) {count++;}IPBean ib = new IPBean();ib.setIp(key.toString()); ib.setCount(count); treeSet.add(ib);}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {for (IPBean ib: treeSet) {context.write(new Text(ib.getIp()), new IntWritable(ib.getCount()));}}
}
6.3 启动应用,查看结果
- 运行
DeduplicateIPsDriver
类
- 虽然使用
TreeSet
成功实现了按网址重复次数降序排列,但带来了一个新问题:TreeSet
会自动去重,导致重复次数相同的网址被删除。例如,7个不同网址中,重复次数相同的网址被合并,最终只剩下4个网址。为了解决这个问题,可以采用 List
结合 Collections.sort()
的方式替代 TreeSet
。
6.4 解决重复次数相同的网址被删问题
- 修改网址去重归并器类
package net.huawei.mr;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class DeduplicateIPsReducer extends Reducer<Text, NullWritable, Text, IntWritable> {private List<IPBean> ibList = new ArrayList<>();@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {int count = 0;for (NullWritable value: values) {count++;}IPBean ib = new IPBean();ib.setIp(key.toString()); ib.setCount(count); ibList.add(ib);}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {Collections.sort(ibList);for (IPBean ib: ibList) {context.write(new Text(ib.getIp()), new IntWritable(ib.getCount()));}}
}
DeduplicateIPsReducer
类通过 List
和 Collections.sort()
实现IP地址的去重和按访问次数降序排序。在 reduce
方法中统计IP地址的访问次数,封装为 IPBean
并存入 List
。在 cleanup
方法中,使用 Collections.sort()
对 List
按访问次数降序排序,并遍历输出结果。这种方式避免了 TreeSet
自动去重的问题,确保重复次数相同的网址都能正确显示。
6.5 启动应用,查看结果
- 运行
DeduplicateIPsDriver
类
- 程序成功实现了IP地址的去重和访问次数的统计。结果按访问次数降序排列,便于分析哪些IP地址访问频率较高。
7. 实战小结
- 本次实战通过Hadoop MapReduce实现了IP地址的去重与统计。首先,使用Mapper读取IP地址并输出,Reducer进行去重操作。接着,扩展功能统计每个IP地址的访问次数,并按访问次数降序排列。通过自定义
IPBean
类和调整Reducer逻辑,解决了排序过程中重复次数相同的IP地址被删除的问题。最终,成功输出去重后的IP地址及其访问次数,并按访问次数降序排列。这一过程不仅展示了Hadoop MapReduce的强大功能,还为大数据处理中的去重和排序提供了实用解决方案。通过本次实战,深入理解了MapReduce的工作机制,并掌握了如何优化数据处理流程以满足实际需求。