Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑

一、情景描述

我们知道,在MapTask阶段开始时,需要InputFormat来读取数据
而在ReduceTask阶段结束时,将处理完成的数据,输出到磁盘,此时就要用到OutputFormat

在之前的程序中,我们都没有设置过这部分配置
所以,采用的是默认输出格式:TextOutputFormat

在实际工作中,我们的输出不一定是到磁盘,可能是输出到MySQL、HBase

那么,如何实现自定义的OutputFormat
在这里插入图片描述

二、案例

1、源数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.baidu.com
http://www.sina.com
http://www.sin2a.com
http://www.baidu.com
http://www.sin2desa.com
http://www.sindsafa.com

2、需求分析

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log

3、代码实现

LogMapper.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// http://www.baidu.com//http://www.google.com// (http://www.google.com, NullWritable)// 不做任何处理context.write(value, NullWritable.get());}
}

LogReducer.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {// http://www.baidu.com// http://www.baidu.com// 防止有相同数据,丢数据for (NullWritable value : values) {context.write(key, NullWritable.get());}}
}

LogRecordWriter.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private  FSDataOutputStream atguiguOut;private  FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) {// 创建两条流try {FileSystem fs = FileSystem.get(job.getConfiguration());atguiguOut = fs.create(new Path("D:\\hadoop\\atguigu.log"));otherOut = fs.create(new Path("D:\\hadoop\\other.log"));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {String log = key.toString();// 具体写if (log.contains("atguigu")){atguiguOut.writeBytes(log+"\n");}else {otherOut.writeBytes(log+"\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {// 关流IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}
}

LogOutputFormat.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {LogRecordWriter lrw = new LogRecordWriter(job);return lrw;}
}

LogDriver.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputoutputformat"));//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output1111"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

3、测试

在这里插入图片描述
在这里插入图片描述

三、总结

关键文件:
LogRecordWriter.java
LogOutputFormat.java
LogDriver.java

        //设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/31692.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQLite 3 优化批量数据存储操作---事务transaction机制

0、事务操作 事务的目的是为了保证数据的一致性和完整性。 事务&#xff08;Transaction&#xff09;具有以下四个标准属性&#xff0c;通常根据首字母缩写为 ACID&#xff1a; 原子性&#xff08;Atomicity&#xff09;&#xff1a;确保工作单位内的所有操作都成功完成&…

八、yolov8模型预测和模型导出(目标检测)

模型查看 模型预测 模型导出 模型训练完成后&#xff0c;找到训练文件生成文件夹&#xff0c;里面包含wights、过程图、曲线图。 模型预测 1、在以下文件夹中放入需要预测的图&#xff1b; 2、找到detect文件下的predict.py文件&#xff0c;修改以下内容。 3、右键点击…

【NLP练习】Transformer实战-单词预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 任务&#xff1a;自定义输入一段英文文本进行预测 一、定义模型 from tempfile import TemporaryDirectory from typing import Tuple from torch import nn…

AI时代:硬件狂欢,软件落寞 华为开发者大会2024

内容提要 分析师表示&#xff0c;目前AI行业大多数的支出都流向用于训练大模型的硬件或云基础设备。相较之下&#xff0c;软件应用商们在AI时代显得停滞不前。尽管软件应用商们十分热衷于构建AI工具&#xff0c;然而其收入状况却并不乐观。 文章正文 AI浪潮之下&#xff0c;英…

AIGC时代,重塑人的核心竞争力?

随着人工智能技术的飞速发展&#xff0c;AIGC&#xff08;人工智能生成内容&#xff09;的时代已经悄然而至。在这个时代&#xff0c;AI不再仅仅是一个技术概念&#xff0c;而是深入到我们生活的方方面面&#xff0c;从创作到生产&#xff0c;从娱乐到工作&#xff0c;AI都在以…

RabbitMQ 相关概念

引言 什么是消息中间件 消息是指在应用间传送的数据&#xff0c;包含文本字符串、JSON等。消息队列中间件&#xff08;MQ&#xff09;指利用高效可靠的消息传递机制进行平台无关的数据交流&#xff0c;并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型…

剑指offer 算法题(搜索二维矩阵)

剑指offer 第二题 去力扣里测试算法 思路一&#xff1a; 直接暴力遍历二维数组。 class Solution { public:bool searchMatrix(vector<vector<int>>& matrix, int target) {for (unsigned int i{ 0 }; i < matrix.size(); i){for (unsigned int j{ 0 };…

Shell脚本:条件语句(if、case)

目录 硬编码 硬编码的缺点 条件判断 $? 命令行语句 判断指定目录是否存在 判断指定文件是否存在 判断指定对象是否存在 表达式形式语句 判断对象是否存在 判断对象是否有权限 与、或、非 运算 与运算 或运算 非运算 比较大小 判断磁盘利用率实验步骤 字符串…

Java基础之练习(2)

需求: 键盘录入一个字符串,使用程序实现在控制台遍历该字符串 package String;import java.util.Scanner;public class StringDemo5 {public static void main(String[] args) {//录入一个字符串Scanner sc new Scanner(System.in);System.out.println("请输入一个字符串…

1. 基础设计流程(以时钟分频器的设计为例)

1. 准备工作 1. 写有vcs编译命令的run_vcs.csh的shell脚本 2. 装有timescale&#xff0c;设计文件以及仿真文件的flish.f&#xff08;filelist文件&#xff0c;用于VCS直接读取&#xff09; vcs -R -full64 -fsdb -f flist.f -l test.log 2. 写代码&#xff08;重点了解代码…

如何将办公文档压缩成rar格式文件?

压缩包格式是我们生活工作中常用到的文件格式&#xff0c;那么如何得到一个rar格式的压缩文件&#xff1f;或者说如何将文件压缩成rar格式而不是zip格式呢&#xff1f;今天我们来了解一下如何压缩为rar格式文件。 首先&#xff0c;下载并安装WinRAR&#xff0c;然后用鼠标选择需…

【Python】成功解决TypeError: missing 1 required positional argument

【Python】成功解决TypeError: missing 1 required positional argument 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&#xff1…

React的服务器端渲染(SSR)和客户端渲染(CSR)有什么区别?

React的服务器端渲染&#xff08;SSR&#xff09;和客户端渲染&#xff08;CSR&#xff09;是两种不同的页面渲染方式&#xff0c;它们各自有不同的特点和适用场景&#xff1a; 服务器端渲染&#xff08;SSR&#xff09; 页面渲染: 页面在服务器上生成&#xff0c;然后将完整的…

复盘最近的面试

这个礼拜一直在面试&#xff0c;想着看看能否拿到不错的offer前去实习&#xff0c;从周一到周四&#xff0c;面了将近10家&#xff0c;特整理此份面经&#xff0c;希望对秋招的各位有所帮助 A公司 一面 面试官人很好&#xff0c;我回答的时候不会他会笑笑然后提醒我 自我介绍~…

数据通信与网络(三)

物理层概述&#xff1a; 物理层是网络体系结构中的最低层 它既不是指连接计算机的具体物理设备&#xff0c;也不是指负责信号传输的具体物理介质&#xff0c; 而是指在连接开放系统的物理媒体上为上一层(指数据链路层)提供传送比特流的一个物理连接。 物理层的主要功能——为…

项目中eventbus和rabbitmq配置后,不起作用

如下&#xff1a;配置了baseService层和SupplyDemand层得RabbitMQ和EventBus 但是在执行订阅事件时&#xff0c;发送得消息在base项目中没有执行&#xff0c;后来发现是虚拟机使用得不是一个&#xff0c;即上图中得EventBus下得VirtualHost&#xff0c;修改成一直就可以了

肆拾玖坊三级众筹模式玩法揭秘,白酒体验馆运作模式

发展至今&#xff0c;肆拾玖坊已积累了数百万忠实用户&#xff0c;拥有100多家分销商、5000多个新零售终端&#xff0c;覆盖全国34个省级行政区域、200余地市、1500个县区。成为中国创业界和酒行业的“现象级”企业。 今天&#xff0c;我们就来深入解析肆拾玖坊的营销模式&…

Linux入门攻坚——26、Web Service基础知识与httpd配置-2

http协议 URL&#xff1a;Uniform Resource Locator&#xff0c;统一资源定位符 URL方案&#xff1a;scheme&#xff0c;如http://&#xff0c;https:// 服务器地址&#xff1a;IP&#xff1a;port 资源路径&#xff1a; 示例&#xff1a;http://www.test.com:80/bbs/…

ios18计算器大更新使用指南,一招掌握新计算器使用技巧!

苹果推出iOS 18系统中&#xff0c;变化较大的之一就是以多年没有更新的计算器应用程序&#xff0c;新增了多个使用的功能&#xff0c;经过小编几天的使用&#xff0c;总结了几个iOS 18计算器的使用技巧和更新点分享给大家。 一、界面布局变化 与iOS 17相比&#xff0c;iOS18的…

Java学习笔记(二)变量原理、常用编码、类型转换

Hi i,m JinXiang ⭐ 前言 ⭐ 本篇文章主要介绍Java变量原理、常用编码、类型转换详细使用以及部分理论知识 🍉欢迎点赞 👍 收藏 ⭐留言评论 📝私信必回哟😁 🍉博主收将持续更新学习记录获,友友们有任何问题可以在评论区留言 1、变量原理 1.1、变量的介绍 变量是程…