MapReduce入门2-流量监控

3、流量监控汇总(使用LongWritable实现)

hdfs文件路径:/tmp/flow.txt
查看文件内容:
13770759991	50	100	25	400
13770759991	800	600	500	100
13770759992	400	300	250	1400
13770759992	800	1200	600	900

字符串含义:
号码	上行	下行	上传	下载
phoneNum	uppackBytes	downpackBytes	uploadBytes	downloadBytes
 

代码:

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowTest {public static void main(String[] args) {// TODO Auto-generated method stubPath fromPath = new Path(args[0]);Path toPath = new Path(args[1]);try {Configuration conf = new Configuration();Job job = Job.getInstance();; job.setJarByClass(FlowTest.class);FileInputFormat.addInputPath(job, fromPath);FileOutputFormat.setOutputPath(job, toPath);job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}/*
号码	上行	下行	上传	下载
phoneNum	uppackBytes	downpackBytes	uploadBytes	downloadBytes
13770759991	50L	100L	25L	400L
13770759991	800L	600L	500L	100L
13770759992	400L	300L	250L	1400L
13770759992	800L	1200L	600L	900L
*/
class FlowMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString[] line = value.toString().split("\\W+"); String phoneNum = line[0];long uppackBytes = Long.parseLong(line[1]);long downpackBytes = Long.parseLong(line[2]);long uploadBytes = Long.parseLong(line[3]);long downloadBytes = Long.parseLong(line[4]);context.write(new Text(phoneNum), new Text(uppackBytes+"-"+downpackBytes+"-"+uploadBytes+"-"+downloadBytes));}}class FlowReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text phoneNum, Iterable<Text> text, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stublong sumUppack = 0L;long sumDownpack = 0L;long sumUpload = 0L;long sumDownload = 0L;for(Text t : text){String[] line  = t.toString().split("-");sumUppack += Long.parseLong(line[0].toString());sumDownpack += Long.parseLong(line[1].toString());sumUpload += Long.parseLong(line[2].toString());sumDownload += Long.parseLong(line[3].toString());}context.write(phoneNum,new Text(sumUppack+"-"+sumDownpack+"-"+sumUpload+"-"+sumDownload) );}}

输出:

导出成flow.jar并上传至服务器的/opt目录
执行:
hadoop jar flow.jar "FlowTest" "/tmp/flow.txt" "/tmp/flow/out"再执行:
hadoop fs -ls /tmp/flow/out/*  查看输出的文件:

 

 

4、流量监控汇总(使用自定义的writable类NetflowWritable实现)

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NetflowTest {public static void main(String[] args) {// TODO Auto-generated method stubPath fromPath = new Path(args[0]);Path toPath = new Path(args[1]);try {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(NetflowTest.class);FileInputFormat.addInputPath(job, fromPath);FileOutputFormat.setOutputPath(job, toPath);job.setMapperClass(NetflowMapper.class);job.setReducerClass(NetflowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NetflowWritable.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(NetflowWritable.class);try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}class NetflowWritable implements Writable{private long uppackBytes;private long downpackBytes;private long uploadBytes;private long downloadBytes;//创建一个无参的构造方法,不加的话会执行报错public NetflowWritable(){}public NetflowWritable(long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {//this.phoneNum=phoneNum;this.uppackBytes = uppackBytes;this.downpackBytes = downpackBytes;	this.uploadBytes = uploadBytes;this.downloadBytes = downloadBytes;}public long getUppackBytes() {return uppackBytes;}public long getDownpackBytes() {return downpackBytes;}public long getUploadBytes() {return uploadBytes;}public long getDownloadBytes() {return downloadBytes;}public void set( long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {this.uppackBytes = uppackBytes;this.downpackBytes = downpackBytes;	this.uploadBytes = uploadBytes;this.downloadBytes = downloadBytes;}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.uppackBytes = in.readLong();this.downpackBytes = in.readLong();this.uploadBytes = in.readLong();this.downloadBytes = in.readLong();}@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(uppackBytes);out.writeLong(downpackBytes);out.writeLong(uploadBytes);out.writeLong(downloadBytes);}@Override//重写toString方法public String toString() {// TODO Auto-generated method stubreturn "NetflowWritable [uppackBytes="+uppackBytes+",downpackBytes="+downpackBytes+",uploadBytes="+uploadBytes+",downloadBytes="+downloadBytes+"]" ;}
}class NetflowMapper extends Mapper<LongWritable,Text,Text,NetflowWritable>{private String phoneNum;private long uppackBytes;private long downpackBytes;private long uploadBytes;private long downloadBytes;NetflowWritable nf = new NetflowWritable();//Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NetflowWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString[] line = value.toString().split("\\t");phoneNum  = line[0];uppackBytes = Long.parseLong(line[1]);downpackBytes = Long.parseLong(line[2]);uploadBytes = Long.parseLong(line[3]);downloadBytes = Long.parseLong(line[4]);nf.set( uppackBytes, downpackBytes, uploadBytes, downloadBytes);context.write(new Text(phoneNum), nf);}}class NetflowReducer extends Reducer<Text,NetflowWritable,Text,NetflowWritable>{private NetflowWritable nf;@Overrideprotected void reduce(Text arg0, Iterable<NetflowWritable> arg1,Reducer<Text, NetflowWritable, Text, NetflowWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stublong uppackBytes = 0L;long downpackBytes = 0L;long uploadBytes = 0L;long downloadBytes = 0L;for(NetflowWritable nw : arg1){uppackBytes += nw.getUppackBytes();downpackBytes += nw.getDownpackBytes();uploadBytes  += nw.getUploadBytes();downloadBytes += nw.getDownloadBytes();}nf = new NetflowWritable(uppackBytes,downpackBytes,uploadBytes,downloadBytes);context.write(arg0, nf);}}

  

输出:

导出成netflow.jar并上传至服务器的/opt目录
执行:
hadoop jar netflow.jar "NetflowTest" "/tmp/flow.txt" "/tmp/netflow/out"再执行:
hadoop fs -ls /tmp/netflow/out/*  查看输出的文件:

 


 

转载于:https://www.cnblogs.com/cangos/p/6422144.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/255323.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【fiddler学习问题记录】——手机端证书下载页打不开、无法将此证书安装(已解决)

目录 1、手机端下载页打不开 解决方法1&#xff09;电脑端——将网络设置成公用&#xff08;亲测有效&#xff09; 解决方法2&#xff09;手机端将fiddler设置为信任应用&#xff0c;不被拦截 &#xff08;未试&#xff09; 2、无法将此证书安装 方法一&#xff1a;修改证书…

HALCON示例程序ball电路板焊点识别、检测、测量程序剖析

HALCON示例程序ball电路板焊点识别、检测、测量程序剖析 示例程序源码&#xff08;加注释&#xff09; 介绍&#xff1a; ball.hdev: Inspection of Ball Bonding *halcon窗口实时更新关闭 dev_update_window (‘off’) *halcon关闭所有窗口 dev_close_window () *halcon打开…

舵机的原理和控制

控制信号由接收机的通道进入信号调制芯片&#xff0c;获得直流偏置电压。它内部有一个基准电路&#xff0c;产生周期为20ms&#xff0c;宽度为1.5ms的基准信号&#xff0c;将获得的直流偏置电压与电位器的电压比较&#xff0c;获得电压差输出。最后&#xff0c;电压差的正负输出…

HDFS清理坏块

报错 Failed with exception java.io.IOException:org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1921057509-192.168.57.129-1517160177567:blk_1073741930_1106 file/user/hive/warehouse/db_hive.db/student/student.txt Time taken: 0.104 se…

如何在PowerDesigner将PDM导出生成WORD文档或者html文件

a) 使用PowerDesigner打开pdm文件 b) 点击Report Temlates 制作模板 点击PowerDesigner菜单栏“Report” -> “Report Templates” c) 选择模板数据项 完成步骤a&#xff09;&#xff0c;得到如下界面&#xff0c;左右2个区&#xff0c;Aavailable区…

【软件测试】——基础篇(软件测试技术体系、过程管理)

目录 软件测试技术体系 软件测试过程管理​编辑 接口测试用例设计 手机端测试流程​编辑 软件测试技术体系 软件测试过程管理 接口测试用例设计 手机端测试流程

vue 的常用模块安指令(持续记录)

# 全局安装 vue-cli $ cnpm install --global vue-cli# 创建一个基于 webpack 模板的新项目 $ vue init webpack my-project # 路由管理模块 $ npm install vue-router --save # 状态管理模块 $ npm install vuex --save # 网络请求模块 $ npm install vue-resource --save # 停…

HALCON基于形状匹配详解

HALCON基于形状的模板匹配详细说明 很早就想总结一下前段时间学习HALCON的心得&#xff0c;但由于其他的事情总是抽不出时间。去年有过一段时间的集中学习&#xff0c;做了许多的练习和实验&#xff0c;并对基于HDevelop的形状匹配算法的参数优化进行了研究&#xff0c;写了一…

俄罗斯:自由软件在这里生根

2008年6月15日&#xff0c;我在“俄罗斯&#xff1a;自由软件在这里起飞“一文中介绍了自由软件在俄罗斯的发展情况。现在&#xff0c;许多年过去了&#xff0c;实际情况如何呢&#xff1f; 回顾以往&#xff0c;俄罗斯境内自由软件的精英们&#xff0c;在2001年组建了ALTLinux…

小米出招黑科技,5S或成全球首款”Under glass“指纹识别手机

这一次&#xff0c;小米PK苹果&#xff0c;小米胜。 不得不说&#xff0c;最近的手机圈真是热闹&#xff0c;继三星Note 7爆炸、苹果iPhone 7发布一度成为新闻热点之后&#xff0c;小米又来暗戳戳地抢风头了。 最近小米即将发布的两款新旗舰消息扎堆&#xff0c;其中基本已经确…

sql中实现取得某字段中数字值

ALTER function [dbo].[GetNum](a nvarchar(4000)) returns nvarchar(4000) as begin while patindex(%[^0-9]%,a)>0 begin set astuff(a,patindex(%[^0-9]%,a),1,) end--select a --299 return a end 例如: 转载于:https://www.cnblogs.com/fish-ycq/p/6433562.ht…

Java 中 String 的常用方法(二)

本文介绍剩下的一些常用的 String 中的方法。 1、replace 方法 、replaceFirst 方法和 replaceAll 方法 replace(char oldChar, char newChar)Returns a string resulting from replacing all occurrences of oldChar in this string with newChar. replace(CharSequence targe…

【adb错误修复】adb version(39) doesn‘t match the client(40),killing...

问题原因&#xff1a; 由于服务端的adb版本和客户端的adb版本不一样【哪个是客户端哪个是服务端我也不清楚】 解决方法&#xff1a; adb kill-serveradb start-server

HALCON示例程序ball电路板焊点识别、检测、测量程序2剖析(与上篇文章使用了不同方法)

HALCON示例程序ball电路板焊点识别、检测、测量程序2 示例程序源码&#xff08;加注释&#xff09; *这是关于显示的函数&#xff0c;已经介绍过了 dev_update_off () *定义一个字符串变量ImageNames&#xff0c;ImageNames[0]的含义为’die/die_02’以此类推 ImageNames : ‘…

arduino 程序的机制

从一个简单的 arduino 程序说起&#xff1a; /*BlinkTurns on an LED on for one second, then off for one second, repeatedly.This example code is in the public domain.*/// Pin 13 has an LED connected on most Arduino boards. // give it a name: int led 13;// the…

电池技术多年没有较大发展,成为移动设备最大制约

各大厂商致力于发展闪充技术以此解决电池续航问题。 对于电子产品来说&#xff0c;不管是什么&#xff0c;我们最头疼的无疑就是电池问题了。而电池的重要性更是不用多说&#xff0c;但是&#xff0c;以目前的科技水平来看&#xff0c;就拿智能手机的CPU等功能来说&#xff0c…

什么样的数据应该放入缓存

把数据放入缓存,有三个标准: 1.数据量不大 2.访问频率高 3.数据更改频率低转载于:https://www.cnblogs.com/hwgok/p/5494915.html

VMWare 安装 Eclipse

由于之前已经安装了 OpenJDK 所以 这次我们可以直接下载 eclipse来安装。 Eclipse 下载&#xff1a;http://www.eclipse.org/downloads/?osTypelinux&releaseundefined 如果下载后 在 下载目录&#xff0c;运行解压 &#xff08;我下载的&#xff1a;eclipse-inst-lin…

C语言条件编译及编译预处理阶段

一、C语言由源代码生成的各阶段如下&#xff1a; C源程序&#xff0d;>编译预处理&#xff0d;>编译&#xff0d;>优化程序&#xff0d;>汇编程序&#xff0d;>链接程序&#xff0d;>可执行文件其中 编译预处理阶段&#xff0c;读取c源程序&#xff0c;对其中…

HALCON示例程序board.hdev检测电路板焊锡有无程序剖析

HALCON示例程序board.hdev检测电路板焊锡有无程序剖析 示例程序源码&#xff08;加注释&#xff09; *这是关于系统设置的函数&#xff0c;剪辑区域&#xff0c;设置剪辑区域设置为使能。为clip_region做的设置&#xff0c;后文会介绍 get_system (‘clip_region’, Informat…