MapReduce入门2-流量监控

3、流量监控汇总(使用LongWritable实现)

hdfs文件路径:/tmp/flow.txt
查看文件内容:
13770759991	50	100	25	400
13770759991	800	600	500	100
13770759992	400	300	250	1400
13770759992	800	1200	600	900

字符串含义:
号码	上行	下行	上传	下载
phoneNum	uppackBytes	downpackBytes	uploadBytes	downloadBytes
 

代码:

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowTest {public static void main(String[] args) {// TODO Auto-generated method stubPath fromPath = new Path(args[0]);Path toPath = new Path(args[1]);try {Configuration conf = new Configuration();Job job = Job.getInstance();; job.setJarByClass(FlowTest.class);FileInputFormat.addInputPath(job, fromPath);FileOutputFormat.setOutputPath(job, toPath);job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}/*
号码	上行	下行	上传	下载
phoneNum	uppackBytes	downpackBytes	uploadBytes	downloadBytes
13770759991	50L	100L	25L	400L
13770759991	800L	600L	500L	100L
13770759992	400L	300L	250L	1400L
13770759992	800L	1200L	600L	900L
*/
class FlowMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString[] line = value.toString().split("\\W+"); String phoneNum = line[0];long uppackBytes = Long.parseLong(line[1]);long downpackBytes = Long.parseLong(line[2]);long uploadBytes = Long.parseLong(line[3]);long downloadBytes = Long.parseLong(line[4]);context.write(new Text(phoneNum), new Text(uppackBytes+"-"+downpackBytes+"-"+uploadBytes+"-"+downloadBytes));}}class FlowReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text phoneNum, Iterable<Text> text, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stublong sumUppack = 0L;long sumDownpack = 0L;long sumUpload = 0L;long sumDownload = 0L;for(Text t : text){String[] line  = t.toString().split("-");sumUppack += Long.parseLong(line[0].toString());sumDownpack += Long.parseLong(line[1].toString());sumUpload += Long.parseLong(line[2].toString());sumDownload += Long.parseLong(line[3].toString());}context.write(phoneNum,new Text(sumUppack+"-"+sumDownpack+"-"+sumUpload+"-"+sumDownload) );}}

输出:

导出成flow.jar并上传至服务器的/opt目录
执行:
hadoop jar flow.jar "FlowTest" "/tmp/flow.txt" "/tmp/flow/out"再执行:
hadoop fs -ls /tmp/flow/out/*  查看输出的文件:

 

 

4、流量监控汇总(使用自定义的writable类NetflowWritable实现)

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NetflowTest {public static void main(String[] args) {// TODO Auto-generated method stubPath fromPath = new Path(args[0]);Path toPath = new Path(args[1]);try {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(NetflowTest.class);FileInputFormat.addInputPath(job, fromPath);FileOutputFormat.setOutputPath(job, toPath);job.setMapperClass(NetflowMapper.class);job.setReducerClass(NetflowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NetflowWritable.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(NetflowWritable.class);try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}class NetflowWritable implements Writable{private long uppackBytes;private long downpackBytes;private long uploadBytes;private long downloadBytes;//创建一个无参的构造方法,不加的话会执行报错public NetflowWritable(){}public NetflowWritable(long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {//this.phoneNum=phoneNum;this.uppackBytes = uppackBytes;this.downpackBytes = downpackBytes;	this.uploadBytes = uploadBytes;this.downloadBytes = downloadBytes;}public long getUppackBytes() {return uppackBytes;}public long getDownpackBytes() {return downpackBytes;}public long getUploadBytes() {return uploadBytes;}public long getDownloadBytes() {return downloadBytes;}public void set( long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {this.uppackBytes = uppackBytes;this.downpackBytes = downpackBytes;	this.uploadBytes = uploadBytes;this.downloadBytes = downloadBytes;}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.uppackBytes = in.readLong();this.downpackBytes = in.readLong();this.uploadBytes = in.readLong();this.downloadBytes = in.readLong();}@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(uppackBytes);out.writeLong(downpackBytes);out.writeLong(uploadBytes);out.writeLong(downloadBytes);}@Override//重写toString方法public String toString() {// TODO Auto-generated method stubreturn "NetflowWritable [uppackBytes="+uppackBytes+",downpackBytes="+downpackBytes+",uploadBytes="+uploadBytes+",downloadBytes="+downloadBytes+"]" ;}
}class NetflowMapper extends Mapper<LongWritable,Text,Text,NetflowWritable>{private String phoneNum;private long uppackBytes;private long downpackBytes;private long uploadBytes;private long downloadBytes;NetflowWritable nf = new NetflowWritable();//Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NetflowWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString[] line = value.toString().split("\\t");phoneNum  = line[0];uppackBytes = Long.parseLong(line[1]);downpackBytes = Long.parseLong(line[2]);uploadBytes = Long.parseLong(line[3]);downloadBytes = Long.parseLong(line[4]);nf.set( uppackBytes, downpackBytes, uploadBytes, downloadBytes);context.write(new Text(phoneNum), nf);}}class NetflowReducer extends Reducer<Text,NetflowWritable,Text,NetflowWritable>{private NetflowWritable nf;@Overrideprotected void reduce(Text arg0, Iterable<NetflowWritable> arg1,Reducer<Text, NetflowWritable, Text, NetflowWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stublong uppackBytes = 0L;long downpackBytes = 0L;long uploadBytes = 0L;long downloadBytes = 0L;for(NetflowWritable nw : arg1){uppackBytes += nw.getUppackBytes();downpackBytes += nw.getDownpackBytes();uploadBytes  += nw.getUploadBytes();downloadBytes += nw.getDownloadBytes();}nf = new NetflowWritable(uppackBytes,downpackBytes,uploadBytes,downloadBytes);context.write(arg0, nf);}}

  

输出:

导出成netflow.jar并上传至服务器的/opt目录
执行:
hadoop jar netflow.jar "NetflowTest" "/tmp/flow.txt" "/tmp/netflow/out"再执行:
hadoop fs -ls /tmp/netflow/out/*  查看输出的文件:

 


 

转载于:https://www.cnblogs.com/cangos/p/6422144.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/255323.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【fiddler学习问题记录】——手机端证书下载页打不开、无法将此证书安装(已解决)

目录 1、手机端下载页打不开 解决方法1&#xff09;电脑端——将网络设置成公用&#xff08;亲测有效&#xff09; 解决方法2&#xff09;手机端将fiddler设置为信任应用&#xff0c;不被拦截 &#xff08;未试&#xff09; 2、无法将此证书安装 方法一&#xff1a;修改证书…

舵机的原理和控制

控制信号由接收机的通道进入信号调制芯片&#xff0c;获得直流偏置电压。它内部有一个基准电路&#xff0c;产生周期为20ms&#xff0c;宽度为1.5ms的基准信号&#xff0c;将获得的直流偏置电压与电位器的电压比较&#xff0c;获得电压差输出。最后&#xff0c;电压差的正负输出…

HDFS清理坏块

报错 Failed with exception java.io.IOException:org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1921057509-192.168.57.129-1517160177567:blk_1073741930_1106 file/user/hive/warehouse/db_hive.db/student/student.txt Time taken: 0.104 se…

如何在PowerDesigner将PDM导出生成WORD文档或者html文件

a) 使用PowerDesigner打开pdm文件 b) 点击Report Temlates 制作模板 点击PowerDesigner菜单栏“Report” -> “Report Templates” c) 选择模板数据项 完成步骤a&#xff09;&#xff0c;得到如下界面&#xff0c;左右2个区&#xff0c;Aavailable区…

【软件测试】——基础篇(软件测试技术体系、过程管理)

目录 软件测试技术体系 软件测试过程管理​编辑 接口测试用例设计 手机端测试流程​编辑 软件测试技术体系 软件测试过程管理 接口测试用例设计 手机端测试流程

小米出招黑科技,5S或成全球首款”Under glass“指纹识别手机

这一次&#xff0c;小米PK苹果&#xff0c;小米胜。 不得不说&#xff0c;最近的手机圈真是热闹&#xff0c;继三星Note 7爆炸、苹果iPhone 7发布一度成为新闻热点之后&#xff0c;小米又来暗戳戳地抢风头了。 最近小米即将发布的两款新旗舰消息扎堆&#xff0c;其中基本已经确…

sql中实现取得某字段中数字值

ALTER function [dbo].[GetNum](a nvarchar(4000)) returns nvarchar(4000) as begin while patindex(%[^0-9]%,a)>0 begin set astuff(a,patindex(%[^0-9]%,a),1,) end--select a --299 return a end 例如: 转载于:https://www.cnblogs.com/fish-ycq/p/6433562.ht…

C语言条件编译及编译预处理阶段

一、C语言由源代码生成的各阶段如下&#xff1a; C源程序&#xff0d;>编译预处理&#xff0d;>编译&#xff0d;>优化程序&#xff0d;>汇编程序&#xff0d;>链接程序&#xff0d;>可执行文件其中 编译预处理阶段&#xff0c;读取c源程序&#xff0c;对其中…

网上书店 买方数据库

买方表 属性 字段名 类型 键值 是否空 用户ID UserId char(5) 主键 用户名称 UserName nvarchar(50) 用户密码 UserPwd nvarchar(50) 用户真实姓名 UserRealName nvarchar(50) 用户地址 UserAddress nvarchar(100) …

Web开发模式(MVC设计模式)

1.MVC&#xff1a;(Model-View-Controller)操作流程 显示层View:主要负责接收Servlet传递的内容&#xff0c;并调用JavaBean把内容显示给用户。 控制层Controller:负责所有的用户请求参数&#xff0c;判断请求参数是否合法&#xff0c;根据请求方式调用JavaBean进行处理&#x…

Arduino IDE 配置文件

最近学习Arduino。 Arduino开源硬件和Arduino IDE是一个很容易上手的系统。 目前arduino已经支持很多种板类型&#xff0c;甚至已经支持了部分arm芯片。比如arduino ng、arduino uno、arduino mini、pro mini等。但是大多数情况&#xff0c;都是使用的atmega8/at…

vue封装axios接口

一、安装axios axios安装命令&#xff1a;cnpm install axios 二、在文件中引用axios 一开始我是放在src下的main.js这个文件里面&#xff0c;后来发现mounted钩子读取接口方法为undefined&#xff0c;百度了才发现是vue生命周期的原因&#xff0c;最好的解决办法是把axios单独…

编写Arduino支持的C++类库

以下为摘抄的例子&#xff0c;已经亲自验证过&#xff0c;例子是正确的 我们在上一讲中实现了一个TN901红外温度传感器51程序到Arduino程序的转换&#xff0c;如果代码越来越多这样程序的可维护性会随之降低&#xff0c;也不适合团度开发。我们应该把常用的文件封装成C库&#…

【机器学习实战】——常见函数积累

目录 第二章 k近邻算法 1、array.sum(axies 1) : 2、array.argsort(axies0/1) 3、array.tile(mat,(m,n)) 4、dict.get(key,value) 5、sorted函数 6、string.strip()函数 7、string.split() 8、scatter&#xff08;&#xff09;函数 9、min()&max() 10、enumera…

安装oracle 11g 客户端,检查过程中报物理内存不足的解决

今早接到同事电话&#xff0c;说安装oracle 11g客户端的时候&#xff0c;在检查先决条件的时候&#xff0c;报错&#xff0c;说内存不足&#xff0c;但是本机的内存是2G&#xff0c;肯定够用&#xff1a;如图&#xff1a; 找了一圈&#xff0c;原来Oracle执行先决条件检查是依赖…

Codeforces Round #401 (Div. 2) D. Cloud of Hashtags

题目链接&#xff1a;D. Cloud of Hashtags 题意&#xff1a; 给你n个字符串&#xff0c;让你删后缀&#xff0c;使得这些字符串按字典序排列&#xff0c;要求是删除的后缀最少 题解&#xff1a; 由于n比较大&#xff0c;我们可以将全部的字符串存在一个数组里面&#xff0c;然…

史陶比尔机器人的 LLI (Low Level Interface)

史陶比尔机器人的 LLI &#xff08;Low Level Interface&#xff09; 史陶比尔机器人拥有 Low Level Interface (LLI)接口选项. 在CS8C控制器的时代&#xff0c;LLI 接口仍然可用。这是一个选项接口。.这是除了VAL3编程语言之外的替代操作系统。通过C程序替代你的程序。 这里的…

HALCON示例程序check_bottle_crate.hdev啤酒箱内酒瓶数检测

HALCON示例程序check_bottle_crate.hdev啤酒箱内酒瓶数检测 示例程序源码&#xff08;加注释&#xff09; 获取系统关于“空白区域储存的设置” get_system (‘store_empty_region’, StoreEmptyRegion)系统“空白区域储存”设置为 ‘false’ set_system (‘store_empty_regi…

单片机平台的最小偏差圆弧插补算法

在CNC机床的G代码中&#xff0c;最常见的有G0、G1、G2、G3代码&#xff0c;分别表示直线和圆弧插补&#xff0c;直线插补对于单片机来说&#xff0c;比较容易实现&#xff0c;只需要将位移增量转换为脉冲增量然后输出给步进电机就可以了&#xff0c;但对于圆弧插补&#xff0c;…

【转】JS跨域(ajax跨域、iframe跨域)解决方法及原理详解(jsonp)

这里说的js跨域是指通过js在不同的域之间进行数据传输或通信&#xff0c;比如用ajax向一个不同的域请求数据&#xff0c;或者通过js获取页面中不同域的框架中(iframe)的数据。只要协议、域名、端口有任何一个不同&#xff0c;都被当作是不同的域。 下表给出了相对http://store.…