Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑

一、情景描述

我们知道,在MapTask阶段开始时,需要InputFormat来读取数据
而在ReduceTask阶段结束时,将处理完成的数据,输出到磁盘,此时就要用到OutputFormat

在之前的程序中,我们都没有设置过这部分配置
所以,采用的是默认输出格式:TextOutputFormat

在实际工作中,我们的输出不一定是到磁盘,可能是输出到MySQL、HBase

那么,如何实现自定义的OutputFormat
在这里插入图片描述

二、案例

1、源数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.baidu.com
http://www.sina.com
http://www.sin2a.com
http://www.baidu.com
http://www.sin2desa.com
http://www.sindsafa.com

2、需求分析

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log

3、代码实现

LogMapper.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// http://www.baidu.com//http://www.google.com// (http://www.google.com, NullWritable)// 不做任何处理context.write(value, NullWritable.get());}
}

LogReducer.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {// http://www.baidu.com// http://www.baidu.com// 防止有相同数据,丢数据for (NullWritable value : values) {context.write(key, NullWritable.get());}}
}

LogRecordWriter.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private  FSDataOutputStream atguiguOut;private  FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) {// 创建两条流try {FileSystem fs = FileSystem.get(job.getConfiguration());atguiguOut = fs.create(new Path("D:\\hadoop\\atguigu.log"));otherOut = fs.create(new Path("D:\\hadoop\\other.log"));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {String log = key.toString();// 具体写if (log.contains("atguigu")){atguiguOut.writeBytes(log+"\n");}else {otherOut.writeBytes(log+"\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {// 关流IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}
}

LogOutputFormat.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {LogRecordWriter lrw = new LogRecordWriter(job);return lrw;}
}

LogDriver.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputoutputformat"));//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output1111"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

3、测试

在这里插入图片描述
在这里插入图片描述

三、总结

关键文件:
LogRecordWriter.java
LogOutputFormat.java
LogDriver.java

        //设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/31692.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQLite 3 优化批量数据存储操作---事务transaction机制

0、事务操作 事务的目的是为了保证数据的一致性和完整性。 事务&#xff08;Transaction&#xff09;具有以下四个标准属性&#xff0c;通常根据首字母缩写为 ACID&#xff1a; 原子性&#xff08;Atomicity&#xff09;&#xff1a;确保工作单位内的所有操作都成功完成&…

八、yolov8模型预测和模型导出(目标检测)

模型查看 模型预测 模型导出 模型训练完成后&#xff0c;找到训练文件生成文件夹&#xff0c;里面包含wights、过程图、曲线图。 模型预测 1、在以下文件夹中放入需要预测的图&#xff1b; 2、找到detect文件下的predict.py文件&#xff0c;修改以下内容。 3、右键点击…

【NLP练习】Transformer实战-单词预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 任务&#xff1a;自定义输入一段英文文本进行预测 一、定义模型 from tempfile import TemporaryDirectory from typing import Tuple from torch import nn…

Date(时间类)

第一章 Date类 1.1 Date概述 java.util.Date类 表示特定的瞬间&#xff0c;精确到毫秒。 继续查阅Date类的描述&#xff0c;发现Date拥有多个构造函数&#xff0c;只是部分已经过时&#xff0c;我们重点看以下两个构造函数 public Date()&#xff1a;从运行程序的此时此刻到…

AI时代:硬件狂欢,软件落寞 华为开发者大会2024

内容提要 分析师表示&#xff0c;目前AI行业大多数的支出都流向用于训练大模型的硬件或云基础设备。相较之下&#xff0c;软件应用商们在AI时代显得停滞不前。尽管软件应用商们十分热衷于构建AI工具&#xff0c;然而其收入状况却并不乐观。 文章正文 AI浪潮之下&#xff0c;英…

数据安全“星熠”案例 | 一体化数据安全平台uDSP助力工银安盛“多快好省”落地数据安全

近年来&#xff0c;国家从顶层规划和政策细则层面&#xff0c;频频出台相关政策文件&#xff0c;强化了数据安全作为数字中国建设核心底座的地位&#xff0c;数据安全产业迎来了黄金发展期。自2022年起&#xff0c;数据安全共同体计划已开展两届数据安全“星熠”案例征集工作&a…

AIGC时代,重塑人的核心竞争力?

随着人工智能技术的飞速发展&#xff0c;AIGC&#xff08;人工智能生成内容&#xff09;的时代已经悄然而至。在这个时代&#xff0c;AI不再仅仅是一个技术概念&#xff0c;而是深入到我们生活的方方面面&#xff0c;从创作到生产&#xff0c;从娱乐到工作&#xff0c;AI都在以…

RabbitMQ 相关概念

引言 什么是消息中间件 消息是指在应用间传送的数据&#xff0c;包含文本字符串、JSON等。消息队列中间件&#xff08;MQ&#xff09;指利用高效可靠的消息传递机制进行平台无关的数据交流&#xff0c;并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型…

什么是nginx到底怎么配置,什么是网关到底怎么配置?

那使用upstream里面具体哪个服务器是怎么决定和区分的呢? AI生成 在Nginx中,使用upstream里面具体哪个服务器是通过负载均衡算法来决定的。upstream块定义了一组服务器,Nginx会根据配置的负载均衡算法来选择一个服务器来处理当前的请求。常见的负载均衡算法包括轮询(round-…

剑指offer 算法题(搜索二维矩阵)

剑指offer 第二题 去力扣里测试算法 思路一&#xff1a; 直接暴力遍历二维数组。 class Solution { public:bool searchMatrix(vector<vector<int>>& matrix, int target) {for (unsigned int i{ 0 }; i < matrix.size(); i){for (unsigned int j{ 0 };…

Shell脚本:条件语句(if、case)

目录 硬编码 硬编码的缺点 条件判断 $? 命令行语句 判断指定目录是否存在 判断指定文件是否存在 判断指定对象是否存在 表达式形式语句 判断对象是否存在 判断对象是否有权限 与、或、非 运算 与运算 或运算 非运算 比较大小 判断磁盘利用率实验步骤 字符串…

Spring Data JPA介绍与CRUD实战演练

文章目录 一、Spring Data JPA 简介二、Spring Data JPA 与 MyBatis Plus 比较设计哲学和抽象层次SQL 控制学习曲线和技术要求性能与优化综合考虑 三、SpringDataJpa实战演练1. 创建user表2. 搭建Spring Boot开发环境3. pom.xml文件内容4. application.yml文件内容5. Applicati…

Java基础之练习(2)

需求: 键盘录入一个字符串,使用程序实现在控制台遍历该字符串 package String;import java.util.Scanner;public class StringDemo5 {public static void main(String[] args) {//录入一个字符串Scanner sc new Scanner(System.in);System.out.println("请输入一个字符串…

1. 基础设计流程(以时钟分频器的设计为例)

1. 准备工作 1. 写有vcs编译命令的run_vcs.csh的shell脚本 2. 装有timescale&#xff0c;设计文件以及仿真文件的flish.f&#xff08;filelist文件&#xff0c;用于VCS直接读取&#xff09; vcs -R -full64 -fsdb -f flist.f -l test.log 2. 写代码&#xff08;重点了解代码…

2024年最新消防设施操作员(高级)题库

46.手提式干粉灭火器1~2kg近似有效喷射时间为&#xff08; &#xff09;s。 A.10 B.15 C.18 D.20 答案:A 解析:根据初级教材191页&#xff0c;手提式干粉灭火器1~2kg近似有效喷射时间为10s。 47.手提式干粉灭火器8kg近似有效喷射时间为&#xff08; &#xff09;s。 A.…

如何将办公文档压缩成rar格式文件?

压缩包格式是我们生活工作中常用到的文件格式&#xff0c;那么如何得到一个rar格式的压缩文件&#xff1f;或者说如何将文件压缩成rar格式而不是zip格式呢&#xff1f;今天我们来了解一下如何压缩为rar格式文件。 首先&#xff0c;下载并安装WinRAR&#xff0c;然后用鼠标选择需…

【第23章】Vue实战篇之文章

文章目录 前言一、搭建界面二、加载文章列表1.界面2.请求脚本3. 加载事件4. 搜索和重置5. 分页事件 三、添加文章1. 富文本编辑器1.1 安装1.2 导入组件1.3 使用1.4 样式美化 2. 页面搭建2.1 数据绑定2.2 界面 3. 图片上传3.1 数据绑定3.2 界面 4. 表单校验4.1 脚本4.2 表单 5. …

【Python】成功解决TypeError: missing 1 required positional argument

【Python】成功解决TypeError: missing 1 required positional argument 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&#xff1…

React的服务器端渲染(SSR)和客户端渲染(CSR)有什么区别?

React的服务器端渲染&#xff08;SSR&#xff09;和客户端渲染&#xff08;CSR&#xff09;是两种不同的页面渲染方式&#xff0c;它们各自有不同的特点和适用场景&#xff1a; 服务器端渲染&#xff08;SSR&#xff09; 页面渲染: 页面在服务器上生成&#xff0c;然后将完整的…

MySQL集合运算联结

集合的运算 & 联结&#xff08;内连接&#xff0c;左连接等等&#xff09; 假如我们有两张表&#xff0c;第一个表名为 students&#xff0c;如下所示&#xff1a; ------------------------------- | student_id | name | class_id | ------------------------------…