真正帮你实现—MapReduce统计WordCount词频,并将统计结果按出现次数降序排列

项目整体介绍

对类似WordCount案例的词频统计,并将统计结果按出现次数降序排列。

网上有很多帖子,均用的相似方案,重写某某方法然后。。。运行起来可能会报这样那样的错误,这里实现了一种解决方案,分享出来供大家参考:编写两个MapReduce程序,第一个程序进行词频统计,第二个程序进行降序处理,由于是降序,还需要自定义对象,在对象内部实现降序排序。

一、项目背景及数据集说明

现有某电商网站用户对商品的收藏数据,记录了用户收藏的商品id以及收藏日期,名为buyer_favorite1。buyer_favorite1包含:买家id,商品id,收藏日期这三个字段,数据以“\t”分割,样例展现如下:在这里插入图片描述

二、编写MapReduce程序,统计每个买家收藏商品数量。(即统计买家id出现的次数)

前置说明

1.配置好Hadoop集群环境,并开启相应服务、
2.在hdfs对应路径上先上传好文件,可以自己根据文件路径定义,这里是"hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1"。同时再定义好输出路径
3.这里是整个程序(词频降序)的入口,若只是想统计词频,请注释掉WordCountSortDESC.mainJob2();

package mapreduce;import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {public static void main(String[] args) {Configuration conf = new Configuration();conf.set("yarn,resourcemanager", "bym@d2e674ec1e78");try {Job job = Job.getInstance(conf, "111");job.setJobName("WordCount");job.setJarByClass(WordCount.class);job.setMapperClass(doMapper.class); // 这里就是设置下job使用继承doMapper类,与定义的内容保持一致job.setReducerClass(doReducer.class); // 同上,设置Reduce类型job.setMapOutputKeyClass(Text.class); // 如果map的输出和reduce的输出不一样,这里要分别定义好格式job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path in = new Path("hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1");Path out = new Path("hdfs://localhost:9000/mymapreduce1/out");FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);if (job.waitForCompletion(true)) {System.out.println("WordCount completition");WordCountSortDESC.mainJob2();System.out.println("diaoyong");}} catch (Exception e) {e.printStackTrace();}// System.exit(job.waitForCompletion(true) ? 0 : 1);}// 第一个Object表示输入key的类型、是该行的首字母相对于文本文件的首地址的偏移量;// 第二个Text表示输入value的类型、存储的是文本文件中的一行(以回车符为行结束标记);// 第三个Text表示输出键的类型;第四个IntWritable表示输出值的类型public static class doMapper extendsMapper<LongWritable, Text, Text, IntWritable> {public static final IntWritable one = new IntWritable(1);public static Text word = new Text();@Override// 前面两个Object key,Text value就是输入的key和value,第三个参数Context// context是可以记录输入的key和value。protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// StringTokenizer是Java工具包中的一个类,用于将字符串进行拆分StringTokenizer tokenizer = new StringTokenizer(value.toString(),"\t");// 返回当前位置到下一个分隔符之间的字符串, 并把字符串设置成Text格式word.set(tokenizer.nextToken());context.write(word, one);}}// 参数依次表示是输入键类型,输入值类型,输出键类型,输出值类型public static class doReducer extendsReducer<Text, IntWritable, Text, Text> {@Override// 输入的是键值类型,其中值类型为归并后的结果,输出结果为Context类型protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(key, new Text(Integer.toString(sum)));}}
}

三、核心问题:再次编写MapReduce程序,将上一步统计的结果降序排列

前置说明

1.这里将上一步统计的结果作为输入,进行第二次mapreduce程序的运行。因此要注意输入路径与上一步的输出路径保持一致。
2.由于是降序排列,只能自定义FlowBean对象,内部实现排序方式。否则,升序可以利用shuffle机制默认的排序策略不用自定义对象排序,这里不再叙述。

package mapreduce;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountSortDESC {public static void mainJob2() {Configuration conf = new Configuration();conf.set("yarn,resourcemanager", "bym@d2e674ec1e78");try {Job job = Job.getInstance(conf, "1111");job.setJobName("WordCountSortDESC");job.setJarByClass(WordCountSortDESC.class);job.setMapperClass(TwoMapper.class); // 这里就是设置下job使用继承doMapper类,与定义的内容保持一致job.setReducerClass(TwoReducer.class); // 同上,设置Reduce类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);Path in = new Path("hdfs://localhost:9000/mymapreduce1/out");Path out = new Path("hdfs://localhost:9000/mymapreduce1/out555");FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);if (job.waitForCompletion(true)) {System.out.println("DESC Really Done");}} catch (Exception e) {System.out.println("errormainJob2-----------");}}public static class TwoMapper extends Mapper<Object, Text, FlowBean, Text> {private FlowBean outK = new FlowBean();private Text outV = new Text();@Overrideprotected void map(Object key, Text value, Context context)throws IOException, InterruptedException {// 由于真实的数据存储在文件块上,这里是因为数据量较小,可以保证只在一个文件块FileSplit fs = (FileSplit) context.getInputSplit();if (fs.getPath().getName().contains("part-r-00000")) {// 1 获取一行数据String line = value.toString();// 2 按照"\t",切割数据String[] split = line.split("\t");// 3 封装outK outVoutK.setNumber(Long.parseLong(split[1]));outV.set(split[0]);// 4 写出outK outVcontext.write(outK, outV);} else {System.out.println("error-part-r-------------------");}}}public static class TwoReducer extendsReducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values,Context context) throws IOException, InterruptedException {// 遍历values集合,循环写出,避免总流量相同的情况for (Text value : values) {// 调换KV位置,反向写出context.write(value, key);}}}public static class FlowBean implements WritableComparable<FlowBean> {private long number;// 提供无参构造public FlowBean() {}public long getNumber() {return number;}public void setNumber(long number) {this.number = number;}// 实现序列化和反序列化方法,注意顺序一定要一致@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.number);}@Overridepublic void readFields(DataInput in) throws IOException {this.number = in.readLong();}@Overridepublic String toString() {return number + "\t";}@Overridepublic int compareTo(FlowBean o) {// 按照总流量比较,倒序排列if (this.number > o.number) {return -1;} else if (this.number < o.number) {return 1;} else {return 0;}}}}

四、结果展示:

执行查看文件命令

hadoop fs -cat /mymapreduce1/out555/part-r-00000

在这里插入图片描述
可以发现已经进行了降序排列,其他数据集结果应类似。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/14605.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习中简易FC和CNN搭建

TensorFlow是由谷歌开发的PyTorch是由Facebook人工智能研究院&#xff08;Facebook AI Research&#xff09;开发的 Torch和cuda版本的对应&#xff0c;手动安装较好 全连接FC(Batch*Num) 搭建建议网络&#xff1a; from torch import nnclass Mnist_NN(nn.Module):def __i…

力扣 968. 监控二叉树

题目来源&#xff1a;https://leetcode.cn/problems/binary-tree-cameras/description/ C题解&#xff08;来源代码随想录&#xff09;&#xff1a;节点可以分为3个状态&#xff1a;0无覆盖&#xff1b;1有摄像头&#xff1b;2有覆盖。 要想放的摄像头最少&#xff0c;应当叶子…

SOC FPGA之HPS模型设计(一)

目录 一、建立HPS硬件系统模型 1.1 GHRD 1.2 从0开始搭建HPS 1.2.1 FPGA Interfaces 1.2.1.1 General 1.2.1.2 AXI Bridge 1.2.1.3 FPGA-to-HPS SDRAM Interface 1.2.1.4 DMA Peripheral Request 1.2.1.5 Interrupts 1.2.1.6 EMAC ptp interface 1.2.2 Peripheral P…

seata组件使用期间,获取全局事务状态

GlobalStatus枚举类展示全局事务状态 官网链接&#xff1a;http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html 获得全局事务状态 // 开启全局事务地方获取全局事务xid String xid RootContext.getXID(); // 通过全局事务xid获得GlobalStatus枚举类 …

Unity游戏源码分享-2.5D塔防类游戏

Unity游戏源码分享-2.5D塔防类游戏 项目地址&#xff1a; https://download.csdn.net/download/Highning0007/88118947

电子元器件选型与实战应用—01 电阻选型

大家好, 我是记得诚。 这是《电子元器件选型与实战应用》专栏的第一篇文章,今天的主角是电阻,在每一个电子产品中,都少不了电阻的身影,其重要性不言而喻。 文章目录 1. 入门知识1.1 基础1.2 常用品牌1.3 电阻的种类2. 贴片电阻标识2.1 三位数标注法2.2 四位数标注法2.3 小…

操作系统_进程与线程(二)

目录 2. 处理机调度 2.1 调度的基本概念 2.2 调度的层次 2.3 三级调度的联系 2.4 调度的目标 2.5 调度的实现 2.5.1 调度程序&#xff08;调度器&#xff09; 2.5.2 调度的时机、切换与过程 2.5.3 进程调度方式 2.5.4 闲逛进程 2.5.5 两种线程的调度 2.6 典型的调度…

多旋翼物流无人机节能轨迹规划(Python代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f308;3 Python代码实现 &#x1f389;4 参考文献 &#x1f4a5;1 概述 多旋翼物流无人机的节能轨迹规划是一项重要的技术&#xff0c;可以有效减少无人机的能量消耗&#xff0c;延长飞行时间&#xff0c;提高物流效率…

了解Unity编辑器之组件篇Layout(八)

Layout&#xff1a;用于管理和控制UI元素的排列和自动调整一、Aspect Ratio Fitter&#xff1a;用于根据宽高比自动调整UI元素的大小 Aspect Mode&#xff1a;用于定义纵横比适配的行为方式。Aspect Mode属性有以下几种选项&#xff1a; &#xff08;1&#xff09;None&#xf…

【etcd】解决 go-zero 注册 etcd 出现 “Auto sync endpoints failed.” 的问题

go: v1.20.3 go-zero: v1.5.4 etcd: v3.5.9 问题描述 在 go-zero 中用 etcd 去实现服务注册发现&#xff0c;rpc 服务可以注册到 etcd&#xff0c;同时其他服务可以发现注册的微服务&#xff0c;也可以访问。但是&#xff0c;注册的 rpc 服务的日志&#xff0c;就是一直报以…

bat一键批量、有序启动jar

将脚本文件后缀改为 bat&#xff0c;脚本文件和 jar 包放在同一个目录 echo offstart cmd /c "java -jar register.jar " ping 192.0.2.2 -n 1 -w 10000 > nulstart cmd /c "java -jar admin.jar " ping 192.0.2.2 -n 1 -w 30000 > nulstart cmd /c…

基于ARM+FPGA (STM32+ Cyclone 4)的滚动轴承状态监测系统

状态监测系统能够在故障早期及时发现机械设备的异常状态&#xff0c;避免故障的 进一步恶化造成不必要的损失&#xff0c;滚动轴承是机械设备的易损部件&#xff0c;本文对以滚动 轴承为研究对象的状态监测系统展开研究。现有的监测技术多采用定时上传监 测数据&#xff0c;…

Spring MVC学习笔记,包含mvc架构使用,过滤器、拦截器、执行流程等等

&#x1f600;&#x1f600;&#x1f600;创作不易&#xff0c;各位看官点赞收藏. 文章目录 Spring MVC 习笔记1、Spring MVC demo2、Spring MVC 中常见注解3、数据处理3.1、请求参数处理3.2、响应数据处理 4、RESTFul 风格5、静态资源处理6、HttpMessageConverter 转换器7、过…

Open3D(C++) 根据索引提取点云

目录 一、功能概述1、主要函数2、源码二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。爬虫网站自重,把自己当个人 一、功能概述 1、主要函数 std::shared_ptr<PointCloud> SelectByIn

spring boot 2 配置上传文件大小限制

一、起因&#xff1a;系统页面上传一个文件超过日志提示的文件最大100M的限制&#xff0c;需要更改配置文件 二、经过&#xff1a; 1、在本地代码中找到配置文件&#xff0c;修改相应数值后交给运维更新生产环境配置&#xff0c;但是运维说生产环境没有这行配置&#xff0c;遂…

MODBUS-TCP转Ethernet IP 网关连接空压机 配置案例

本案例是工业现场应用捷米特JM-EIP-TCP的Ethernet/IP转Modbus-TCP网关连接欧姆龙PLC与空压机的配置案例。使用设备&#xff1a;欧姆龙PLC&#xff0c;捷米特JM-EIP-TCP网关&#xff0c; ETHERNET/IP 的电气连接 ETHERNET/IP 采用标准的 T568B 接法&#xff0c;支持直连和交叉接…

[个人笔记] Linux配置NTP时间同步

Linux - 运维篇 第四章 Linux配置NTP时间同步 Linux - 运维篇系列文章回顾Linux配置NTP时间同步Linux配置CST时区 参考来源 系列文章回顾 第一章 Linux扩容LVM分区 第二章 Linux虚拟机安装VMware Tools插件 第三章 ssh-keygen和openssl工具的使用 Linux配置NTP时间同步 仅实验…

Ubuntu通用镜像加速配置

备份 cp -rf /etc/apt/sources.list /etc/apt/sources.list.bak开始配置 阿里云 sed -i shttp://archive.ubuntu.comhttps://mirrors.aliyun.comg /etc/apt/sources.listsed -i shttp://security.ubuntu.comhttps://mirrors.aliyun.comg /etc/apt/sources.list清华源 sed -i …

【etcd】docker 启动单点 etcd

etcd: v3.5.9 etcd-browser: rustyx/etcdv3-browser:latest 本文档主要描述用 docker 部署单点的 etcd&#xff0c; 用 etcd-browser 来查看注册到 etcd 的 key 默认配置启动 docker run -d --name ai-etcd --networkhost --restart always \-v $PWD/etcd.conf.yml:/opt/bitn…

【Linux】线程池

1 线程池的介绍 1.1 线程池 一种线程使用模式。线程过多会带来调度开销&#xff0c;进而影响局部性和整体性能。而线程池维护多个线程&#xff0c;等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务创建与销毁线程的代价。线程池不仅能够保证内核的充分利用&am…