MapReduce简单应用(三)——高级WordCount

目录

  • 1. 高级WordCount
    • 1.1 IntWritable降序排列
    • 1.2 输入输出格式
    • 1.3 处理流程
  • 2. 代码和结果
    • 2.1 pom.xml中依赖配置
    • 2.2 工具类util
    • 2.3 高级WordCount
    • 2.4 结果
  • 参考

  本文引用的Apache Hadoop源代码基于Apache许可证 2.0,详情请参阅 Apache许可证2.0。

1. 高级WordCount

  文本内容就是下文2.3中的代码,目标是要实现文本计数,并且数量在前,文本在后,同时数量要升序排列。

1.1 IntWritable降序排列

  IntWritable类型中实现一个升序排列的比较器,代码如下。而实现IntWritable降序排序只需要定义一个新类,继承IntWritable.Comparator,并且重载public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2),使其返回值为父类该方法返回值的相反数。此外,如果你想要让作为键的IntWritable类型进行降序排列,还需要在MapReduce任务调度代码中设置Job.setSortComparatorClass(比较器.class)

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/public static class Comparator extends WritableComparator {public Comparator() {super(IntWritable.class);}@Overridepublic int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {int thisValue = readInt(b1, s1);int thatValue = readInt(b2, s2);return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));}}

1.2 输入输出格式

java类名输入/输出功能
org.apache.hadoop.mapreduce.lib.input.TextInputFormatMapReduce默认的输入格式将输入文件按行分割,每一行作为<key, value>对,其中key是行的偏移量(从0开始),value 是行的内容
org.apache.hadoop.mapreduce.lib.output.TextOutputFormatMapReduce默认的输出格式将输出写成文本文件,每个<key, value>对占一行,key和value之间用制表符(\t)分隔
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormatSequenceFile的输入格式读取Hadoop的二进制文件格式SequenceFile
org.apache.hadoop.mapreduce.lib.input.SequenceFileOutputFormatSequenceFile的输出格式将输出写成Hadoop的二进制文件格式SequenceFile

  (Hadoop定义的SequenceFile是一种高效、可分割的二进制文件格式,支持压缩)
  (Hadoop定义了好多输入输出格式,由于我没有详细使用,这里就不介绍了)
  如果要进行多次MapReduce作业,中间结果可以以SequenceFile的形式存储,加速作业的运行。

1.3 处理流程

  首先高级WordCount也要像普通WordCount一样对文本进行计数,因此Reduce函数输入的键值对为<Text,IntWritable>。而最终要求的结果键值对为<IntWritable, Text>,如果把Reduce函数的输出键值对直接变为<IntWritable, Text>并且在该任务中只使用一个作业的话,你会发现无法完成IntWritable降序排列(尽管你可以已经设置SortComparatorClass),那是因为Shuffle过程的排序只会发生在Map结束后Reduce发生前,这时键的类型是Text而非IntWritable。
  为了解决这个任务,需要进行两次作业,第一次作业负责计数,并以SequenceFile的格式输出,Map的输出、Reduce的输入和输出均为<Text, IntWritable>,最终文件输出格式选择SequenceFileOutputFormat;第二次作业负责交换键值对,并以SequenceFile的个数读入,然后再对键进行降序排列,这就需要使用Hadoop自带的org.apache.hadoop.mapreduce.lib.map.InverseMapper,它能交换键值对。这次作业的输入格式选择SequenceFileInputFormat,Map输入和Map输出分别是<Text, IntWritable>、<IntWritable, Text>,这时设置SortComparatorClass就可以实现IntWritable降序排列。

2. 代码和结果

2.1 pom.xml中依赖配置

  <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.6</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.3.6</version><type>pom</type></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>3.3.6</version></dependency></dependencies>

2.2 工具类util

import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class util {public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {URI add = new URI(uri);return FileSystem.get(add, conf);}public static void removeALL(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println("Delete Output Folder? " + isDeleted);}}public static void removeALL(String uri, Configuration conf, String[] pathList) throws Exception {FileSystem fs = getFileSystem(uri, conf);for (String path : pathList) {if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println(String.format("Delete %s? %s", path, isDeleted));}}}public static void showResult(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);String regex = "part-r-";Pattern pattern = Pattern.compile(regex);if (fs.exists(new Path(path))) {FileStatus[] files = fs.listStatus(new Path(path));for (FileStatus file : files) {Matcher matcher = pattern.matcher(file.getPath().toString());if (matcher.find()) {System.out.println(file.getPath() + ":");FSDataInputStream openStream = fs.open(file.getPath());IOUtils.copyBytes(openStream, System.out, 1024);openStream.close();}}}}
}

2.3 高级WordCount

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;public class App {public static class IntWritableDecreaseingComparator extends IntWritable.Comparator {@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return -super.compare(b1, s1, l1, b2, s2, l2);}}public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] splitStr = value.toString().split("\\s+");for (String str : splitStr) {context.write(new Text(str), new IntWritable(1));}}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String tempPath = "hdfs://localhost:9000/user/developer/Temp";String[] myArgs = {"file:///home/developer/CodeArtsProjects/advanced-word-count/AdvancedWordCount.txt","hdfs://localhost:9000/user/developer/AdvancedWordCount/output"};util.removeALL("hdfs://localhost:9000", conf, new String[] { tempPath, myArgs[myArgs.length - 1] });Job job = Job.getInstance(conf, "AdvancedWordCount");job.setJarByClass(App.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setNumReduceTasks(2);for (int i = 0; i < myArgs.length - 1; i++) {FileInputFormat.addInputPath(job, new Path(myArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(tempPath));int res1 = job.waitForCompletion(true) ? 0 : 1;if (res1 == 0) {Job sortJob = Job.getInstance(conf, "Sort");sortJob.setJarByClass(App.class);sortJob.setMapperClass(InverseMapper.class);sortJob.setInputFormatClass(SequenceFileInputFormat.class);sortJob.setOutputKeyClass(IntWritable.class);sortJob.setOutputValueClass(Text.class);sortJob.setSortComparatorClass(IntWritableDecreaseingComparator.class);FileInputFormat.addInputPath(sortJob, new Path(tempPath));FileOutputFormat.setOutputPath(sortJob, new Path(myArgs[myArgs.length - 1]));int res2 = sortJob.waitForCompletion(true) ? 0 : 1;if (res2 == 0) {System.out.println("高级WordCount结果为:");util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);}System.exit(res2);}System.exit(res1);}
}

2.4 结果

在这里插入图片描述
  结果文件内容如下:

64
14      {
13      }
12      import
8       int
7       public
7       =
4       static
4       class
4       -
4       new
4       @Override
3       for
3       :
3       void
3       throws
3       extends
2       l1,
2       1;
2       0;
2       String[]
2       s2,
2       s1,
2       i
2       context.write(new
2       context)
2       conf,
2       InterruptedException
2       key,
2       IntWritable,
2       return
2       IOException,
2       b2,
2       sum
2       Context
2       protected
2       myArgs[myArgs.length
2       Text,
2       1]);
1       };
1       values,
1       values)
1       value.toString().split("\\s+");
1       value,
1       val.get();
1       val
1       util.showResult("hdfs://localhost:9000",
1       util.removeALL("hdfs://localhost:9000",
1       str
1       splitStr)
1       splitStr
1       res
1       reduce(Text
1       org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
1       org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
1       org.apache.hadoop.mapreduce.Reducer;
1       org.apache.hadoop.mapreduce.Mapper;
1       org.apache.hadoop.mapreduce.Job;
1       org.apache.hadoop.io.WritableComparable;
1       org.apache.hadoop.io.Text;
1       org.apache.hadoop.io.LongWritable;
1       org.apache.hadoop.io.IntWritable;
1       org.apache.hadoop.fs.Path;
1       org.apache.hadoop.conf.Configuration;
1       myArgs.length
1       myArgs
1       map(LongWritable
1       main(String[]
1       l2);
1       l2)
1       key);
1       job.waitForCompletion(true)
1       job.setSortComparatorClass(IntWritableDecreaseingComparator.class);
1       job.setReducerClass(MyReducer.class);
1       job.setOutputValueClass(Text.class);
1       job.setOutputKeyClass(IntWritable.class);
1       job.setMapperClass(MyMapper.class);
1       job.setJarByClass(App.class);
1       job.setCombinerClass(MyReducer.class);
1       job
1       java.io.IOException;
1       if
1       i++)
1       compare(byte[]
1       compare(WritableComparable
1       byte[]
1       b1,
1       b);
1       b)
1       args)
1       a,
1       WritableComparable
1       Text>
1       Text(str),
1       Text
1       System.out.println("高级WordCount结果为:");
1       System.exit(res);
1       Reducer<Text,
1       Path(myArgs[myArgs.length
1       Path(myArgs[i]));
1       MyReducer
1       MyMapper
1       Mapper<LongWritable,
1       Job.getInstance(conf,
1       Job
1       Iterable<IntWritable>
1       IntWritableDecreaseingComparator
1       IntWritable>
1       IntWritable.Comparator
1       IntWritable(sum),
1       IntWritable(1));
1       FileOutputFormat.setOutputPath(job,
1       FileInputFormat.addInputPath(job,
1       Exception
1       Configuration();
1       Configuration
1       App
1       ?
1       ==
1       <
1       1]));
1       0)
1       0
1       -super.compare(b1,
1       -super.compare(a,
1       +=
1       (res
1       (int
1       (String
1       (IntWritable
1       "hdfs://localhost:9000/user/developer/AdvancedWordCount/output"
1       "file:///home/developer/CodeArtsProjects/AdvancedWordCount.txt",
1       "AdvancedWordCount");
1       conf

参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/895174.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧机房解决方案(文末联系,领取整套资料,可做论文)

智慧机房解决方案-软件部分 一、方案概述 本智慧机房解决方案旨在通过硬件设备与软件系统的深度整合&#xff0c;实现机房的智能化管理与服务&#xff0c;提升机房管理人员的工作效率&#xff0c;优化机房运营效率&#xff0c;确保机房设备的安全稳定运行。软件部分包括机房管…

(五)Spring Boot学习——spring security +jwt使用(前后端分离模式)

一定要熟悉spring security原理和jwt无状态原理&#xff0c;理解了才知道代码作用。 在 Spring Security JWT 认证流程中&#xff0c;通常的做法是&#xff1a; 用户提交用户名和密码Spring Security 认证管理器 (AuthenticationManager) 进行认证如果认证成功&#xff0c;生…

清华DeepSeek手册:从入门到精通(网页版便于阅读)

目录 一、产品概述二、清华DeepSeek从入门到精通三、PDF文件路径 一、产品概述 DeepSeek是国产领先的人工智能技术平台&#xff0c;提供从数据分析到大型语言模型的全栈解决方案。其核心产品包括网页端数据分析工具[1] 、视觉语言模型(DeepSeek-VL)[2] 和670亿参数大型语言模型…

阿里云百炼初探DeepSeek模型调用

阿里云百炼初探DeepSeek模型调用 阿里云百炼为什么选择百炼开始使用百炼方式一&#xff1a;文本对话方式二&#xff1a;文本调试方式三&#xff1a;API调用 DeepSeek调用1、搜索模型2、查看API调用3、开始调用安装依赖查看API Key运行以下代码 4、流式输出 总结 阿里云百炼 阿…

【网络安全】服务器安装Docker及拉取镜像教程

文章目录 1. 安装 Docker2. 拉取镜像3. 运行 Ubuntu 容器4. 执行相关操作5. 退出并停止容器1. 安装 Docker # 更新软件包索引 sudo apt update# 安装必要的依赖 sudo apt install -y ca-certificates curl gnupg

AI刷题-子数组和的最大值问题

目录 问题描述 输入格式 输出格式 输入样例 输出样例 说明 数据范围 解题思路&#xff1a; 问题理解 数据结构选择 算法步骤 具体步骤 代码实现&#xff1a; 1.特判&#xff1a; 不需要删除元素的时候 2.在前面的判断结束后&#xff1a;k1&#xff0c;&#xff…

【语法】C++的内存管理 模板

内存管理&#xff1a; 在C语言中&#xff0c;动态开辟空间可以用malloc&#xff0c;calloc&#xff0c;realloc这三个函数&#xff0c;下面先来复习一下这三者的区别 malloc和calloc都是用来开辟新空间&#xff0c;calloc在malloc的基础上还会初始化该空间为0&#xff0c;用法…

30~32.ppt

目录 30.导游小姚-介绍首都北京❗ 题目​ 解析 31.小张-旅游产品推广文章 题目 解析 32.小李-水的知识❗ 题目​ 解析 30.导游小姚-介绍首都北京❗ 题目 解析 新建幻灯片-从大纲-重置-检查设计→主题对话框→浏览主题&#xff1a;考生文件夹&#xff08;注意&#x…

uniapp实现人脸识别(不使用三方插件)

uniapp实现人脸识别 内容简介功能实现上传身份证进行人脸比对 遇到的问题 内容简介 1.拍摄/相册将身份证照片上传到接口进行图片解析 2.使用live-pusher组件拍摄人脸照片&#xff0c;上传接口与身份证人脸进行比对 功能实现 上传身份证 先看下效果 点击按钮调用chooseImage…

Evaluating Very Long-Term Conversational Memory of LLM Agents 论文

Abstract : 长期开放域对话的现有作品着重于评估不超过五个聊天会议的上下文中的模型响应。尽管LongContext大语言模型&#xff08;LLM&#xff09;和检索增强发电&#xff08;RAG&#xff09;技术的进步&#xff0c;但在长期对话中的功效仍未得到探索。为了解决这一研究差距&a…

相对收益-固定收益组合归因-Campisi模型

固定收益组合归因-Campisi模型 1 Campisi模型11.1 Campisi归因框架1.2 Campisi模型绝对收益分解1.2.1 票息收益1. 2.2 收敛收益1. 2.3 骑乘收益1. 2.4 平移收益1. 2.5 扭曲收益1. 2.6 利差收益1. 2.7 残差收益 1.3 Campisi模型超额收益分解 2 Campisi模型22.1 分解框架2.2 模型…

IntelliJ IDEA使用经验(十三):使用Git克隆github的开源项目

文章目录 问题背景办法1、设置git代理&#xff1b;2、再次克隆项目&#xff1b;3、再次按常规方式进行git克隆即可。 问题背景 由于github在国外&#xff0c;很多时候我们在使用idea克隆开源项目的时候&#xff0c;没办法检出&#xff0c;提示 连接重置。 办法 1、设置git代…

JAVA安全之Java Agent打内存马

基本介绍 Java Agent是一种特殊的Java程序&#xff0c;它允许开发者在Java虚拟机(JVM)启动时或运行期间通过java.lang.instrument包提供的Java标准接口进行代码插桩&#xff0c;从而实现在Java应用程序类加载和运行期间动态修改已加载或者未加载的类&#xff0c;包括类的属性、…

RabbitMQ 消息顺序性保证

方式一&#xff1a;Consumer设置exclusive 注意条件 作用于basic.consume不支持quorum queue 当同时有A、B两个消费者调用basic.consume方法消费&#xff0c;并将exclusive设置为true时&#xff0c;第二个消费者会抛出异常&#xff1a; com.rabbitmq.client.AlreadyClosedEx…

【MQ】Spring3 中 RabbitMQ 的使用与常见场景

一、初识 MQ 传统的单体架构&#xff0c;分布式架构的同步调用里&#xff0c;无论是方法调用&#xff0c;还是 OpenFeign 难免会有以下问题&#xff1a; 扩展性差&#xff08;高耦合&#xff0c;需要依赖对应的服务&#xff0c;同样的事件&#xff0c;不断有新需求&#xff0…

EasyExcel 导出合并层级单元格

EasyExcel 导出合并层级单元格 一、案例 案例一 1.相同订单号单元格进行合并 合并结果 案例二 1.相同订单号的单元格进行合并2.相同订单号的总数和总金额进行合并 合并结果 案例三 1.相同订单号的单元格进行合并2.相同订单号的商品分类进行合并3.相同订单号的总数和总金额…

cs106x-lecture3(Autumn 2017)

打卡cs106x(Autumn 2017)-lecture3 1、streamErrors Suppose an input file named streamErrors-data.txt contains the following text: Donald Knuth M 76 Stanford U. The code below attempts to read the data from the file, but each section has a bug. Correct th…

C++模板编程——typelist的实现

文章最后给出了汇总的代码&#xff0c;可直接运行 1. typelist是什么 typelist是一种用来操作类型的容器。和我们所熟知的vector、list、deque类似&#xff0c;只不过typelist存储的不是变量&#xff0c;而是类型。 typelist简单来说就是一个类型容器&#xff0c;能够提供一…

windows通过网络向Ubuntu发送文件/目录

由于最近要使用树莓派进行一些代码练习&#xff0c;但是好多东西都在windows里或虚拟机上&#xff0c;就想将文件传输到树莓派上&#xff0c;但试了发现u盘不能简单传送&#xff0c;就在网络上找到了通过windows 的scp命令传送 前提是树莓派先开启ssh服务&#xff0c;且Window…

字节跳动后端一面

&#x1f4cd;1. Gzip压缩技术详解 Gzip是一种流行的无损数据压缩格式&#xff0c;它使用DEFLATE算法来减少文件大小&#xff0c;广泛应用于网络传输和文件存储中以提高效率。 &#x1f680; 使用场景&#xff1a; • 网站优化&#xff1a;通过压缩HTML、CSS、JavaScript文件来…