Hadoop3教程(十六):MapReduce中的OutputFormat

文章目录

  • (105)OutputFormat概述
  • (106)自定义OutputFormat案例需求分析
  • (107/108)自定义OutputFormat案例实现
    • 自定义Mapper
    • 自定义Reducer
    • 自定义OutputFormat
    • Driver
  • 参考文献

(105)OutputFormat概述

我们之前讲过了Map阶段的InputFormat,对应的,Reduce阶段也有自己的OutputFormat。

Reducer在执行完reduce()之后,接下来就会通过OutputFormat来将处理结果输出至外界环境。

Hadoop里默认使用的是TextOutputFormat,即将reduce()的处理结果,按行输出到文件。

而OutputFormat是MapReduce输出的基类,所有实现了MR输出的程序,都必须实现OutputFormat接口。

OutputFormat有几种官方自带的实现类(具体功能就不展开了):

  • NullOutputFormat
  • FileOutputFormat
    • MapFileOutputFormat
    • SequenceFileOutputFormat
    • TextOutputFormat(默认)
  • FilterOutputFormat
    • LazyOutputFormat
  • DBOutputFormat

OutputFormat类的核心方法:public abstract RecordWriter<K,V> getRecordWriter(...)

最终结果怎么写,以什么形式写,写到哪儿,等等这些,都是在getRecordWriter()里控制的。

当然,这些自带的实现类在日常的生产中肯定是不足以满足各种情况的,所以多数情况下,我们会实现自定义的OutputFormat类

自定义OutputFormat实现类需要:

  • 继承FileOutputFormat;
  • 改写RecordWriter,具体改写输出数据的方法write()

(106)自定义OutputFormat案例需求分析

需求:输入是一个日志文件,即log.txt,里面是罗列了一些访问过的网站,现在需要把其中包含atguigu的网站输出到a.log,不包含atguigu的网站输出到b.log。

输入数据形如:

http://www.baidu.com
http://www.atguibu.com
...

我们需要自定义一个OutputFormat类,即创建一个类LogRecordWriter继承RecordWriter,然后创建两个文件输出流,一个是atguiguOut,一个是otherOut。如果输入数据包含atguigu,就输出到atguiguOut,反之则输出到otherOut流。

最后还需要在驱动类里注册一下:

job.setOutputFormatClass(LogOutputFormat.class);

附注:

其实这个需求从直观上来讲,是可以通过分区来实现类似功能的,但是很遗憾,分区的话无法控制输出文件的名字,所以没法严格符合需求。

(107/108)自定义OutputFormat案例实现

这里直接复制了教程里的代码,来介绍一下,如何针对上一小节提出的需求,自定义OutputFormat。

自定义Mapper

首先需要创建一个自定义的Mapper类,如class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//不做任何处理,直接写出一行log数据context.write(value,NullWritable.get());}
}

自定义Reducer

然后新建一个自定义Reducer类:

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends Reducer<Text, NullWritable,Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {// 防止有相同的数据,迭代写出for (NullWritable value : values) {context.write(key,NullWritable.get());}}
}

自定义OutputFormat

这里是最重要的一步,就是自定义一个OutputFormat类,继承RecordWriter:

  • 创建两个文件的输出流:atguiguOut、otherOut;
  • 如果输入数据中含有atguigu,则输出至atguiguOut,反之则输出到otherOut;

首先自定义OutputFormat类,重写RecordWriter方法,将我们自定义的LogRecordWriter放进去。

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {//创建一个自定义的RecordWriter返回LogRecordWriter logRecordWriter = new LogRecordWriter(job);return logRecordWriter;}
}

然后编写LogRecordWriter类,:

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private FSDataOutputStream atguiguOut;private FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) {try {//获取文件系统对象FileSystem fs = FileSystem.get(job.getConfiguration());//用文件系统对象创建两个输出流对应不同的目录atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log"));otherOut = fs.create(new Path("d:/hadoop/other.log"));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {String log = key.toString();//根据一行的log数据是否包含atguigu,判断两条输出流输出的内容if (log.contains("atguigu")) {atguiguOut.writeBytes(log + "\n");} else {otherOut.writeBytes(log + "\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {//关流IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}
}

Driver

最后编写LogDriver驱动类,把我们前面自定义的的类统统在驱动类里注册上:

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:\\input"));//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

至此需求完成。

参考文献

  1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/109746.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jetpack:012-Jetpack中的弹出菜单

文章目录 1. 概念介绍2. 使用方法2.1 DropdownMenu2.2 DropdownMenuItem 3. 示例代码3.1 代码和注释3.2 代码难点3.3 运行效果 4. 内容总结 我们在上一章回中介绍了Jetpack中标题栏相关的内容&#xff0c;本章回中主要 弹出菜单。闲话休提&#xff0c;让我们一起Talk Android …

AI系统ChatGPT源码+详细搭建部署教程+支持GPT4.0+支持ai绘画(Midjourney)/支持OpenAI GPT全模型+国内AI全模型

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统AI绘画系统&#xff0c;支持OpenAI GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署…

【vSphere 8 自签名证书】企业 CA 签名证书替换 vSphere Machine SSL 证书Ⅱ—— 创建和添加证书模板

目录 博文摘要3. 使用 Microsoft 证书颁发机构创建 Machine SSL 和 Solution User 证书模板3.1 打开 Certificate Template Console3.2 复制模板3.3 修改 Compatibility 选项卡3.4 修改 General 选项卡3.5 修改 Extensions 选项卡3.6 修改 Subject Name 选项卡3.7 确认新模板 4…

软件工程与计算总结(十六)详细设计的设计模式

一.设计模式基础 某种意义上来说&#xff0c;设计模式就是设计经验的总结~ 设计模式不是简单的经验总结&#xff0c;更不是无中生有&#xff0c;它是经过实践反复检验、能解决关键技术难题、有广泛应用前景和能够显著提高软件质量的有效的经验总结。 每个模式都不是独立的&a…

Docker安装GitLab及使用图文教程

作者&#xff1a; 宋发元 GitLab安装及使用教程 官方教程 https://docs.gitlab.com/ee/install/docker.html Docker安装GitLab 宿主机创建容器持久化目录卷 mkdir -p /docker/gitlab/{config,data,logs}拉取GitLab镜像 docker pull gitlab/gitlab-ce:15.3.1-ce.0运行GitLa…

Linux性能优化--性能追踪:受CPU限制的应用程序(GIMP)

10.0 概述 本章包含了一个例子&#xff1a;如何用Linux性能工具在受CPU限制的应用程序中寻找并修复性能问题。 阅读本章后&#xff0c;你将能够&#xff1a; 在受CPU限制的应用程序中明确所有的CPU被哪些源代码行使用。用1trace和oprofile弄清楚应用程序调用各种内部与外部函…

Jmeter接口测试 —— jmeter对图片验证码的处理

jmeter对图片验证码的处理 在web端的登录接口经常会有图片验证码的输入&#xff0c;而且每次登录时图片验证码都是随机的&#xff1b;当通过jmeter做接口登录的时候要对图片验证码进行识别出图片中的字段&#xff0c;然后再登录接口中使用&#xff1b; 通过jmeter对图片验证码…

在启智平台上安装anconda

安装Anaconda3-5.0.1-Linux-x86_64.sh python版本是3.6 在下面的网站上找到要下载的anaconda版本&#xff0c;把对应的.sh文件下载下来 https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/ 把sh文件压缩成.zip文件&#xff0c;拖到启智平台的调试页面 上传到平台上 un…

ndk 编译报错 find_library called with incorrect number of arguments

ndk 编译报错 C build system [configure] failed while executing: echo off "C:\Users\Administrator\AppData\Local\Android\Sdk\cmake\3.22.1\bin\cmake.exe" ^ "-HF:\jupter\jupiter_offline_cv2\libts" ^ "-DCMAKE_SYSTEM_NAMEAndroid" ^…

什么是内存泄漏?JavaScript 垃圾回收机制原理及方式有哪些?哪些操作会造成内存泄漏?

1、什么是内存泄漏&#xff1f; 内存泄漏是前端开发中的一个常见问题&#xff0c;可能导致项目变得缓慢、不稳定甚至崩溃。内存泄漏是指不再用到的内存没有及时被释放&#xff0c;从而造成内存上的浪费。 2、 JavaScript 垃圾回收机制 1&#xff09; 原理&#xff1a; JavaS…

《从菜鸟到大师之路 正则表达式 篇》

《从菜鸟到大师之路 正则表达式 篇》 正则表达式是一个强大的文本匹配工具。但是&#xff0c;对于前端初学者来说&#xff0c;众多的符号和规则可能让人难以理解。其实&#xff0c;你不需要记住所有的正则表达式语法&#xff01;本文将分享一些简单而实用的技巧&#xff0c;帮…

【C++ 学习 ㉘】- 详解 C++11 的列表初始化

目录 一、C11 简介 二、列表初始化 2.1 - 统一初始化 2.2 - 列表初始化的使用细节 2.2.1 - 聚合类型的定义 2.2.2 - 注意事项 2.3 - initializer_list 2.3.1 - 基本使用 2.3.2 - 源码剖析 一、C11 简介 1998 年&#xff0c;C 标准委员会发布了第一版 C 标准&#xff0…

vm虚拟机克隆ubuntu

1. 使用vm虚拟机自带的克隆功能 2. 选择完整克隆&#xff0c;然后选择您克隆到哪里的目录 3. 点击编辑你克隆后的虚拟机&#xff0c;点网络适配器&#xff0c;然后点高级&#xff0c;点击生成mac地址&#xff08;由于唯一&#xff0c;所以需要重新生成&#xff09; 4. 开启虚拟…

asw ec2 ssh 登录设置

使用 ssh 登录 aws ec2 主机&#xff0c;需要创建 密钥对&#xff0c;设置好安全组规则。 密钥对 ec2 -> 网络与安全 -> 密钥对 -> 创建密钥对 名称: [输入一个名称] 创建密钥对 浏览器会下载一个后缀为 .pem 的文件&#xff0c;使用 ssh 时将会用到。 安全组设置…

vue 动态数字效果 vue-animate-number

安装 vue-animate-number 插件 npm install vue-animate-number &#xff08;注&#xff1a;是npm、cnpm还是yarn根据具体项目要求&#xff09; 在 main.js 中引入 import Vue from vue import VueAnimateNumber from vue-animate-number Vue.use(VueAnimateNumber)动态使用…

【MySQL】分析SQL的几种方式

文章目录 一、查看SQL执行频率二、定位低效率执行SQL1. show processlist2. 慢查询日志 三、explain分析执行计划1. id2. select_type3. type4. key5. extra 四、show profile 一、查看SQL执行频率 show session status&#xff1a;显示 session 级的统计结果&#xff08;不写…

openGauss学习笔记-100 openGauss 数据库管理-管理数据库安全-客户端接入之用SSL进行安全的TCP/IP连接

文章目录 openGauss学习笔记-100 openGauss 数据库管理-管理数据库安全-客户端接入之用SSL进行安全的TCP/IP连接100.1 背景信息100.2 前提条件100.3 注意事项100.4 操作步骤100.5 相关参考 openGauss学习笔记-100 openGauss 数据库管理-管理数据库安全-客户端接入之用SSL进行安…

excel+requests管理测试用例接口自动化框架

背景&#xff1a; 某项目有多个接口&#xff0c;之前使用的unittest框架来管理测试用例&#xff0c;将每个接口的用例封装成一个py文件&#xff0c;接口有数据或者字段变动后&#xff0c;需要去每个py文件中找出变动的接口测试用例&#xff0c;维护起来不方便&#xff0c;为了…

双目视觉实战--单视图测量方法

目录 一.简介 二、2D变换 1. 等距变换&#xff08;欧式变换&#xff09; 2. 相似变换 3. 仿射变换 4. 射影变换&#xff08;透视变换&#xff09; 5. 结论 三、影消点与影消线 1. 平面上的线 2. 直线的交点 3. 2D无穷远点 4. 无穷远直线 5. 无穷远点的透视变换与仿…

Spring Cloud Gateway 使用 Redis 限流使用教程

从本文开始&#xff0c;笔者将总结 spring cloud 相关内容的教程 版本选择 为了适应 java8&#xff0c;笔者选择了下面的版本&#xff0c;后续会出 java17的以SpringBoot3.0.X为主的教程 SpringBoot 版本 2.6.5 SpringCloud 版本 2021.0.1 SpringCloudAlibaba 版本 2021.0.1.…