文章目录
- 1. 生成数据
- 2. 编写实体类
- 3. Mapper类
- 4. Reducer类
- 5. Driver类
- 6. 运行
参考书:《Hadoop大数据原理与应用》
相关文章:MapReduce 编程实践
1. 生成数据
超市消费者 数据: id, 时间,消费金额,会员/非会员
使用 Python 生成虚拟数据
import random
import time
consumer_type = ['会员', '非会员']
vip, novip = 0, 0
vipValue, novipValue = 0, 0
with open("consumer.txt",'w',encoding='utf-8') as f:for i in range(1000): # 1000条数据random.seed(time.time()+i)id = random.randint(0, 10000)t = time.strftime('%Y-%m-%d %H:%M',time.localtime(time.time()))value = random.randint(1, 500)type = consumer_type[random.randint(0, 1)]f.write(str(id)+'\t'+t+'\t'+str(value)+'\t'+type+'\n')if type == consumer_type[0]:vip += 1vipValue += valueelse:novip += 1novipValue += value
print(consumer_type[0] + ": 人数 " + str(vip) + ", 总金额: " + str(vipValue) +", 平均金额:"+str(vipValue/vip))
print(consumer_type[1] + ": 人数 " + str(novip) + ", 总金额: " + str(novipValue) +", 平均金额:"+str(novipValue/novip))
[dnn@master HDFS_example]$ python test.py
会员: 人数 510, 总金额: 128744, 平均金额:252.439215686
非会员: 人数 490, 总金额: 123249, 平均金额:251.528571429
2. 编写实体类
- Consumer.java
package com.michael.mapreduce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;public class Consumer implements Writable{private String id;private int money;private int vip; // 0 no, 1 yespublic Consumer() {}public Consumer(String id, int money, int vip) {this.id = id;this.money = money;this.vip = vip;}public int getMoney() {return money;}public void setMoney() {this.money = money;}public String getId() {return id;}public void setId(String id) {this.id = id;}public int getVip() {return vip;}public void setVip(int vip) {this.vip = vip;}public void write(DataOutput dataOutput) throws IOException{dataOutput.writeUTF(id);dataOutput.writeInt(money);dataOutput.writeInt(vip);}public void readFields(DataInput dataInput) throws IOException{this.id = dataInput.readUTF();this.money = dataInput.readInt();this.vip = dataInput.readInt();}@Overridepublic String toString() {return this.id + "\t" + this.money + "\t" + this.vip;}
}
3. Mapper类
- ConsumerMapper.java
package com.michael.mapreduce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class ConsumerMapper extends Mapper<LongWritable, Text, Text, IntWritable>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String[] fields = line.split("\t");String id = fields[0];int money = Integer.parseInt(fields[2]);String vip = fields[3];context.write(new Text(vip), new IntWritable(money));}
}
4. Reducer类
- ConsumerReducer.java
package com.michael.mapreduce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;public class ConsumerReducer extends Reducer<Text, IntWritable, Text, LongWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{int count = 0;long sum = 0;for(IntWritable v : values) {count++;sum += v.get();}long avg = sum/count;context.write(key, new LongWritable(avg));}
}
5. Driver类
- ConsumerDriver.java
package com.michael.mapreduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class ConsumerDriver {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ConsumerDriver.class);job.setMapperClass(ConsumerMapper.class);job.setReducerClass(ConsumerReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
6. 运行
- 复制数据到 hdfs
[dnn@master Desktop]$ hadoop dfs -copyFromLocal /home/dnn/eclipse-workspace/HDFS_example/consumer.txt /InputDataTest
- 导出 jar 在 bash 命令行运行
hadoop jar /home/dnn/eclipse-workspace/HDFS_example/consumer_avg.jar com.michael.mapreduce.ConsumerDriver /InputDataTest/consumer.txt /OutputTest2
- 运行结果
[dnn@master Desktop]$ hadoop fs -cat /OutputTest2/part-r-00000
会员 252
非会员 251