3、流量监控汇总(使用LongWritable实现)
hdfs文件路径:/tmp/flow.txt
查看文件内容:
13770759991 50 100 25 400
13770759991 800 600 500 100
13770759992 400 300 250 1400
13770759992 800 1200 600 900
字符串含义:
号码 上行 下行 上传 下载
phoneNum uppackBytes downpackBytes uploadBytes downloadBytes
代码:
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowTest {public static void main(String[] args) {// TODO Auto-generated method stubPath fromPath = new Path(args[0]);Path toPath = new Path(args[1]);try {Configuration conf = new Configuration();Job job = Job.getInstance();; job.setJarByClass(FlowTest.class);FileInputFormat.addInputPath(job, fromPath);FileOutputFormat.setOutputPath(job, toPath);job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}/*
号码 上行 下行 上传 下载
phoneNum uppackBytes downpackBytes uploadBytes downloadBytes
13770759991 50L 100L 25L 400L
13770759991 800L 600L 500L 100L
13770759992 400L 300L 250L 1400L
13770759992 800L 1200L 600L 900L
*/
class FlowMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString[] line = value.toString().split("\\W+"); String phoneNum = line[0];long uppackBytes = Long.parseLong(line[1]);long downpackBytes = Long.parseLong(line[2]);long uploadBytes = Long.parseLong(line[3]);long downloadBytes = Long.parseLong(line[4]);context.write(new Text(phoneNum), new Text(uppackBytes+"-"+downpackBytes+"-"+uploadBytes+"-"+downloadBytes));}}class FlowReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text phoneNum, Iterable<Text> text, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stublong sumUppack = 0L;long sumDownpack = 0L;long sumUpload = 0L;long sumDownload = 0L;for(Text t : text){String[] line = t.toString().split("-");sumUppack += Long.parseLong(line[0].toString());sumDownpack += Long.parseLong(line[1].toString());sumUpload += Long.parseLong(line[2].toString());sumDownload += Long.parseLong(line[3].toString());}context.write(phoneNum,new Text(sumUppack+"-"+sumDownpack+"-"+sumUpload+"-"+sumDownload) );}}
输出:
导出成flow.jar并上传至服务器的/opt目录
执行:
hadoop jar flow.jar "FlowTest" "/tmp/flow.txt" "/tmp/flow/out"再执行:
hadoop fs -ls /tmp/flow/out/* 查看输出的文件:
4、流量监控汇总(使用自定义的writable类NetflowWritable实现)
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NetflowTest {public static void main(String[] args) {// TODO Auto-generated method stubPath fromPath = new Path(args[0]);Path toPath = new Path(args[1]);try {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(NetflowTest.class);FileInputFormat.addInputPath(job, fromPath);FileOutputFormat.setOutputPath(job, toPath);job.setMapperClass(NetflowMapper.class);job.setReducerClass(NetflowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NetflowWritable.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(NetflowWritable.class);try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}class NetflowWritable implements Writable{private long uppackBytes;private long downpackBytes;private long uploadBytes;private long downloadBytes;//创建一个无参的构造方法,不加的话会执行报错public NetflowWritable(){}public NetflowWritable(long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {//this.phoneNum=phoneNum;this.uppackBytes = uppackBytes;this.downpackBytes = downpackBytes; this.uploadBytes = uploadBytes;this.downloadBytes = downloadBytes;}public long getUppackBytes() {return uppackBytes;}public long getDownpackBytes() {return downpackBytes;}public long getUploadBytes() {return uploadBytes;}public long getDownloadBytes() {return downloadBytes;}public void set( long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {this.uppackBytes = uppackBytes;this.downpackBytes = downpackBytes; this.uploadBytes = uploadBytes;this.downloadBytes = downloadBytes;}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.uppackBytes = in.readLong();this.downpackBytes = in.readLong();this.uploadBytes = in.readLong();this.downloadBytes = in.readLong();}@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(uppackBytes);out.writeLong(downpackBytes);out.writeLong(uploadBytes);out.writeLong(downloadBytes);}@Override//重写toString方法public String toString() {// TODO Auto-generated method stubreturn "NetflowWritable [uppackBytes="+uppackBytes+",downpackBytes="+downpackBytes+",uploadBytes="+uploadBytes+",downloadBytes="+downloadBytes+"]" ;}
}class NetflowMapper extends Mapper<LongWritable,Text,Text,NetflowWritable>{private String phoneNum;private long uppackBytes;private long downpackBytes;private long uploadBytes;private long downloadBytes;NetflowWritable nf = new NetflowWritable();//Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NetflowWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString[] line = value.toString().split("\\t");phoneNum = line[0];uppackBytes = Long.parseLong(line[1]);downpackBytes = Long.parseLong(line[2]);uploadBytes = Long.parseLong(line[3]);downloadBytes = Long.parseLong(line[4]);nf.set( uppackBytes, downpackBytes, uploadBytes, downloadBytes);context.write(new Text(phoneNum), nf);}}class NetflowReducer extends Reducer<Text,NetflowWritable,Text,NetflowWritable>{private NetflowWritable nf;@Overrideprotected void reduce(Text arg0, Iterable<NetflowWritable> arg1,Reducer<Text, NetflowWritable, Text, NetflowWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stublong uppackBytes = 0L;long downpackBytes = 0L;long uploadBytes = 0L;long downloadBytes = 0L;for(NetflowWritable nw : arg1){uppackBytes += nw.getUppackBytes();downpackBytes += nw.getDownpackBytes();uploadBytes += nw.getUploadBytes();downloadBytes += nw.getDownloadBytes();}nf = new NetflowWritable(uppackBytes,downpackBytes,uploadBytes,downloadBytes);context.write(arg0, nf);}}
输出:
导出成netflow.jar并上传至服务器的/opt目录
执行:
hadoop jar netflow.jar "NetflowTest" "/tmp/flow.txt" "/tmp/netflow/out"再执行:
hadoop fs -ls /tmp/netflow/out/* 查看输出的文件: