数据预处理
- 引入
- 一.配置java , hadoop , maven的window本机环境变量
- 1.配置
- 2.测试是否配置成功
- 二.创建一个Maven项目
- 三.导入hadoop依赖
- 四.数据清洗
- 1.数据清洗的java代码
- 2.查看数据清洗后的输出结果
引入
做数据预处理 需要具备的条件 : java,hadoop,maven环境以及idea软件
一.配置java , hadoop , maven的window本机环境变量
1.配置
- 本机的设置/高级系统设置/环境变量
- 在系统变量中配置文件的路径
- 我是在用户变量和系统变量中都做配置了
- 用户变量和系统变量的区别是什么呢?
环境变量包括了用户变量和系统变量
它俩的关系简言之就是 系统变量包括用户变量 , 也就是 , 如果你配置了系统变量 , 那么其配置在用户变量中也是有效的
而我们都知道 , 一个系统可以同时有多个用户 , 所以用户变量是只在当前用户环境下有效的
一般,在没有特殊要求的情况下,只配置系统变量就够了
例如我的maven文件的路径如下 :
双击Path
新建
输入 : %HADOOP_HOME%\bin
这个格式的(注意名称与上面配置路径的名称相同即可)
下图只有hadoop和maven的
2.测试是否配置成功
win+R
输入 cmd
输入下面命令 , 有版本号显示 , 则说明环境配置成功
java -version
hadoop version
mvn -version
二.创建一个Maven项目
三.导入hadoop依赖
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.3</version>
</dependency>
把刚刚环境配置的hadoop文件中的hadoop/etc/hadoop/log4j.propertities文件移动到resources中
四.数据清洗
在 /usr/data文件下创建一个新文件夹log(用来存储日志文件)
[root@hadoop ~]# cd /usr/
[root@hadoop usr]# ls
bin data etc games include lib lib64 libexec local sbin share soft src tmp
[root@hadoop usr]# cd data/
[root@hadoop data]# ls
student.txt
[root@hadoop data]# mkdir log
[root@hadoop data]# ls
log student.txt
[root@hadoop data]# cd log/
[root@hadoop log]# ls
access_2013_05_30.log access_2013_05_31.log
[root@hadoop log]# hdfs dfs -put access_2013_05_30.log /
//上传到hdfs中
1.数据清洗的java代码
//日志解析类
package com.stu.log;import sun.rmi.runtime.Log;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Locale;/*** 日志解析类*/
public class LogParser {public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);// yyyyMMddHHmmsspublic static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");// 解析英文时间字符串private Date parseDateFormat(String string){Date parse = null;try {parse = FORMAT.parse(string);} catch (ParseException e) {e.printStackTrace();}return parse;}/*** 解析日志的行记录* @param line* @return*/public String[] parse(String line){String s = parseIp(line);String s1 = parseTime(line);String s2 = parseURL(line);String s3 = parseStatus(line);String s4 = parseTraffic(line);return new String[]{s,s1,s2,s3,s4};}private String parseTraffic(String line){String trim = line.substring(line.lastIndexOf("\"") + 1).trim();String traffic = trim.split(" ")[1];return traffic;}private String parseStatus(String line){String substring = line.substring(line.lastIndexOf("\"") + 1).trim();String status = substring.split(" ")[0];return status;}private String parseURL(String line){int i = line.indexOf("\"");int i1 = line.lastIndexOf("\"");String substring = line.substring(i + 1, i1);return substring;}private String parseTime(String line){int i = line.indexOf("[");int i1 = line.indexOf("+0800");String trim = line.substring(i + 1, i1).trim();Date date = parseDateFormat(trim);return dateformat1.format(date);}private String parseIp(String line){String trim = line.split("- -")[0].trim();return trim;}}
map和reduce
package com.stu.log;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogMapper extends Mapper<LongWritable,Text,LongWritable, Text> {private LogParser lp = new LogParser();private Text outPutValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] parse = lp.parse(value.toString());// Step1. 过滤掉静态资源访问请求if(parse[2].startsWith("GET /static/") || parse[2].startsWith("GET /uc_server")){return;}// Step2. 过滤掉开头为GET 和 POSTif(parse[2].startsWith("GET /")){parse[2] = parse[2].substring("GET /".length());}else if(parse[2].startsWith("POST /")){parse[2] = parse[2].substring("POST /".length());}// Step3 过滤掉http协议if(parse[2].endsWith(" HTTP/1.1")){parse[2] = parse[2].substring(0,parse[2].length() - " HTTP/1.1".length());}outPutValue.set(parse[0] + "\t"+ parse[1] +"\t"+parse[2]);context.write(key,outPutValue);}
}class LogReducer extends Reducer<LongWritable, Text,Text, NullWritable>{@Overrideprotected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(),NullWritable.get());}
}
job
job类的代码需要做如下修改 :
package com.stu.log;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;public class LogJob extends Configured implements Tool {public static void main(String[] args) {try {new LogJob().run(args);} catch (Exception e) {e.printStackTrace();}}public int run(String[] args) throws Exception {Configuration configuration = new Configuration();FileSystem fileSystem = FileSystem.get(configuration);Job job = Job.getInstance(configuration);job.setJarByClass(LogJob.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job,new Path(args[0]));Path path = new Path(args[1]);if(fileSystem.exists(path)){fileSystem.delete(path,true);}FileOutputFormat.setOutputPath(job,path);boolean b = job.waitForCompletion(true);System.out.println(b);return 0;}
}
2.查看数据清洗后的输出结果
[root@hadoop log]# hdfs dfs -cat /logresult/part-r-00000 | head -100
//通过管道查看100条数据(清洗过的)