Hadoop学习:深入解析MapReduce的大数据魔力(二)

Hadoop学习:深入解析MapReduce的大数据魔力(二)

    • 3.3 Shuffle 机制
      • 3.3.1 Shuffle 机制
      • 3.3.2 Partition 分区
      • 3.3.3 Partition 分区案例实操
      • 3.3.4 WritableComparable 排序
      • 3.3.5 Combiner 合并
    • 3.4 OutputFormat 数据输出
      • 3.4.1 OutputFormat 接口实现类
      • 3.4.2 自定义OutputFormat案例实操
  • 题外话

🤵‍♂️ 个人主页@老虎爱淘气 个人主页
✍🏻作者简介:Python学习者
🐋 希望大家多多支持我们一起进步!😄
如果文章对你有帮助的话,
欢迎评论 💬点赞👍🏻 收藏 📂加关注

在这里插入图片描述

3.3 Shuffle 机制

3.3.1 Shuffle 机制

Map 方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
在这里插入图片描述

3.3.2 Partition 分区

1、问题引出
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机
归属地不同省份输出到不同文件中(分区)
2、默认Partitioner分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {}public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
3、自定义Partitioner步骤
(1)自定义类继承Partitioner,重写getPartition()方法

public class CustomPartitioner extends Partitioner<Text, FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {// 控制分区代码逻辑
… …}}return partition;

(2)在Job驱动中,设置自定义Partitioner

 job.setPartitionerClass(CustomPartitioner.class);

(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

 job.setNumReduceTasks(5);

4、分区总结
(1)如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个
ReduceTask,最终也就只会产生一个结果文件part-r-00000;
(4)分区号必须从零开始,逐一累加。
5、案例分析
例如:假设自定义分区数为5,则
(1)job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件
(2)job.setNumReduceTasks(2);会报错
(3)job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件

3.3.3 Partition 分区案例实操

1)需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据
在这里插入图片描述
2)期望输出数据
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到
一个文件中。
2)需求分析
1、需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
在这里插入图片描述
3)在案例2.3的基础上,增加一个分区类

package com.atguigu.mapreduce.partitioner; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text text, FlowBean flowBean, int numPartitions) 
{ //获取手机号前三位prePhone String phone = text.toString(); String prePhone = phone.substring(0, 3); //定义一个分区号变量partition,根据prePhone设置分区号 int partition; if("136".equals(prePhone)){ partition = 0; }else if("137".equals(prePhone)){ partition = 1; }else if("138".equals(prePhone)){ partition = 2; }else if("139".equals(prePhone)){ partition = 3; }else { partition = 4; } //最后返回分区号partition return partition; } 
} 

4)在驱动函数中增加自定义数据分区设置和ReduceTask设置

package com.atguigu.mapreduce.partitioner; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException { //1 获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2 关联本Driver类 job.setJarByClass(FlowDriver.class); //3 关联Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4 设置Map端输出数据的KV类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //5 设置程序最终输出的KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //8 指定自定义分区器 job.setPartitionerClass(ProvincePartitioner.class); //9 同时指定相应数量的ReduceTask job.setNumReduceTasks(5); //6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("D:\\inputflow")); FileOutputFormat.setOutputPath(job, new Path("D\\partitionout")); //7 提交Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } 
} 

3.3.4 WritableComparable 排序

排序概述
排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于
Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是
否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使
用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数
据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大
小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到
一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者
数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完
毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

(1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在
处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序:(GroupingComparator分组)
(4)二次排序
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部
字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

自定义排序WritableComparable原理分析
bean 对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override 
public int compareTo(FlowBean bean) { 
int result; 
// 按照总流量大小,倒序排列 
if (this.sumFlow > bean.getSumFlow()) { 
result = -1; 
}else if (this.sumFlow < bean.getSumFlow()) { 
result = 1; 
}else { 
result = 0; 
} 
return result; 
} 

3.3.5 Combiner 合并

(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行;
Reducer是接收全局所有Mapper的输出结果;
应该跟Reducer的输入kv类型要对应起来。
Mapper
(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。
在这里插入图片描述

(6)自定义Combiner实现步骤
(a)自定义一个Combiner继承Reducer,重写Reduce方法

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, 
IntWritable> { 
private IntWritable outV = new IntWritable(); 
@Override 
protected void reduce(Text key, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException { 
int sum = 0; 
for (IntWritable value : values) { 
sum += value.get(); 
} 
outV.set(sum); 
context.write(key,outV); 
} 
} 

(b)在Job驱动类中设置:

job.setCombinerClass(WordCountCombiner.class); 

3.4 OutputFormat 数据输出

3.4.1 OutputFormat 接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat
接口。下面我们介绍几种常见的OutputFormat实现类。
1.OutputFormat实现类
2.默认输出格式TextOutputFormat
3.自定义OutputFormat
3.1 应用场景:
例如:输出数据到MySQL/HBase/Elasticsearch等存储框架中。
3.2 自定义OutputFormat步骤
➢ 自定义一个类继承FileOutputFormat。
➢ 改写RecordWriter,具体改写输出数据的方法write()。

3.4.2 自定义OutputFormat案例实操

1)需求
过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
(1)输入数据
在这里插入图片描述
(2)期望输出数据
在这里插入图片描述
2)需求分析
在这里插入图片描述
3)案例实操
(1)编写LogMapper类

package com.atguigu.mapreduce.outputformat; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
import java.io.IOException; 
public 
class 
LogMapper 
extends 
Mapper<LongWritable, 
Text,Text, 
NullWritable> { 
@Override 
protected void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException { 
//不做任何处理,直接写出一行log数据 
context.write(value,NullWritable.get()); 
} 
} 

(2)编写LogReducer类

package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer<Text, NullWritable,Text, 
NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context 
context) throws IOException, InterruptedException { // 防止有相同的数据,迭代写出 for (NullWritable value : values) { context.write(key,NullWritable.get()); } } 
} 

(3)自定义一个LogOutputFormat类

package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> 
{ @Override public RecordWriter<Text, NullWritable> 
getRecordWriter(TaskAttemptContext job) throws IOException, 
InterruptedException { //创建一个自定义的RecordWriter返回 LogRecordWriter logRecordWriter = new LogRecordWriter(job); return logRecordWriter; } 
} 

(4)编写LogRecordWriter类

package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class LogRecordWriter extends RecordWriter<Text, NullWritable> { private FSDataOutputStream atguiguOut;private FSDataOutputStream otherOut; public LogRecordWriter(TaskAttemptContext job) { try { //获取文件系统对象 FileSystem fs = FileSystem.get(job.getConfiguration()); //用文件系统对象创建两个输出流对应不同的目录 atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log")); otherOut = fs.create(new Path("d:/hadoop/other.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, 
InterruptedException { String log = key.toString(); //根据一行的log数据是否包含atguigu,判断两条输出流输出的内容 if (log.contains("atguigu")) { atguiguOut.writeBytes(log + "\n"); } else { otherOut.writeBytes(log + "\n"); } } @Override public void close(TaskAttemptContext context) throws IOException, 
InterruptedException { //关流 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); } 
} 

(5)编写LogDriver类

package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogDriver { public static void main(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); ——————————————————————————— 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(NullWritable.class); 
//设置自定义的outputformat 
job.setOutputFormatClass(LogOutputFormat.class); 
FileInputFormat.setInputPaths(job, new Path("D:\\input")); 
//虽然我们自定义了 outputformat,但是因为我们的 outputformat 继承自
fileoutputformat 
//而fileoutputformat 要输出一个_SUCCESS文件,所以在这还得指定一个输出目录 
FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput")); 
boolean b = job.waitForCompletion(true); 
System.exit(b ? 0 : 1); 
} 
} 

题外话

*我整理了一些资源,如果你也对Python和大数据感兴趣,关注下方公众号免费提取资料。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/24509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HttpServletRequest和HttpServletResponse的获取与使用

相关笔记&#xff1a;【JavaWeb之Servlet】 文章目录 1、Servlet复习2、HttpServletRequest的使用3、HttpServletResponse的使用4、获取HttpServletRequest和HttpServletResponse 1、Servlet复习 Servlet是JavaWeb的三大组件之一&#xff1a; ServletFilter 过滤器Listener 监…

医学图像处理

医学图像处理 opencv批量分片高像素图像病理图像色彩特征提取基于 imgaug、skimage 实现色彩增强基于 Cycle-GAN 完成染色标准化 病理图像细微特征提取自动数据标注分类场景下的医学图像分析分割场景下的医学图像分析检测场景下的医学图像分析 , i ] k 8 < * I opencv批量…

4.DNS和负载均衡

文章目录 coreDNS概念部署croeDNS测试 kubernetes多master集群结构master节点部署 负载均衡配置部署nginx做四层反向代理安装高可用 keepalivednginx监控脚本修改k8s中组件的配置文件 coreDNS 概念 coreDNS是kubernetes的默认DNS实现。可以为集群中的service资源创建一个资源名…

PyTorch中加载模型权重 A匹配B|A不匹配B

在做深度学习项目时&#xff0c;从头训练一个模型是需要大量时间和算力的&#xff0c;我们通常采用加载预训练权重的方法&#xff0c;而我们往往面临以下几种情况&#xff1a; 未修改网络&#xff0c;A与B一致 很简单&#xff0c;直接.load_state_dict() net ANet(num_cla…

Java课设--学生信息管理系统(例1)

文章目录 前提一、运行效果二、Text实现类三、Manage选择类四、StudentWay学生方法类五、StudnetSql数据库类 前题 例1为无使用GUI图形界面&#xff0c;例2使用GUI图形界面&#xff01; 首先自己的JDBC驱动已经接好了&#xff0c;连接自己的数据库没有问题。连接数据库可以看…

《吐血整理》高级系列教程-吃透Fiddler抓包教程(33)-Fiddler如何抓取WebSocket数据包

1.简介 本来打算再写一篇这个系列的文章也要和小伙伴或者童鞋们说再见了&#xff0c;可是有人留言问WebSocket包和小程序的包不会抓&#xff0c;那就关于这两个知识点宏哥就再水两篇文章。 2.什么是Socket&#xff1f; 在计算机通信领域&#xff0c;socket 被翻译为“套接字…

物联网||不一样的点灯实验(2)|通过使用CMSIS库函数实现点灯实验-学习笔记(12)

文章目录 通过使用CMSIS库函数实现点灯实验1 如何使用CMIS库2 如何利用CMSIS库操作IO 两种实现方法的比较课后作业:完整代码&#xff1a;LED.C:test.c:led.h:systick.h:systick.c: 通过使用CMSIS库函数实现点灯实验 1 如何使用CMIS库 #####如何使用此驱动#####[. .](#)启用GPI…

langchain-ChatGLM源码阅读:参数设置

文章目录 上下文关联对话轮数向量匹配 top k控制生成质量的参数参数设置心得 上下文关联 上下文关联相关参数&#xff1a; 知识相关度阈值score_threshold内容条数k是否启用上下文关联chunk_conent上下文最大长度chunk_size 其主要作用是在所在文档中扩展与当前query相似度较高…

RichTextBox基本用法

作用&#xff1a;富文本编辑器&#xff0c;支持多种文本展示 常用属性&#xff1a; 允许显示多行 自动换行 展示滚动条 ScrollBars属性值&#xff1a; 1、Both&#xff1a;只有当文本超过RichTextBox的宽度或长度时&#xff0c;才显示水平滚动条或垂直滚动条&#xff0c;或两…

基于粒子群优化算法的配电网光伏储能双层优化配置模型[IEEE33节点](选址定容)(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f308;4 Matlab代码、数据、讲解 &#x1f4a5;1 概述 由于能源的日益匮乏&#xff0c;电力需求的不断增长等&#xff0c;配电网中分布式能源渗透率不断提高&#xff0c;且逐渐向主动配电网方…

【雕爷学编程】Arduino动手做(184)---快餐盒盖,极低成本搭建机器人实验平台2

吃完快餐粥&#xff0c;除了粥的味道不错之外&#xff0c;我对个快餐盒的圆盖子产生了兴趣&#xff0c;能否做个极低成本的简易机器人呢&#xff1f;也许只需要二十元左右 知识点&#xff1a;轮子&#xff08;wheel&#xff09; 中国词语。是用不同材料制成的圆形滚动物体。简…

Mac端口扫描工具

端口扫描工具 Mac内置了一个网络工具 网络使用工具 按住 Command 空格 然后搜索 “网络实用工具” 或 “Network Utility” 即可 域名/ip转换Lookup ping功能 端口扫描 https://zhhll.icu/2022/Mac/端口扫描工具/ 本文由 mdnice 多平台发布

【具生智能】前沿思考与总结(DALL-E-Bot TinyBot)

1. DALL-E-Bot DALL-E-Bot: Introducing Web-Scale Diffusion Models to Robotics (robot-learning.uk) **&#xff08;2023-05-04&#xff09;**DALL-E-Bot: Introducing Web-Scale Diffusion Models to Robotics DALL-E-Bot&#xff1a;将网络规模的扩散模型引入机器人 第…

C语言----动态内存分配(malloc calloc relloc free)超全知识点

目录 一.动态内存函数 1.malloc 2.free 3.calloc 4.malloc和calloc的区别 5.realloc 二.动态内存分配的常见错误 1.对null进行解引用操作 2.对动态开辟空间的越界访问 3.对非动态开辟内存使用free释放 4.使用free释放动态开辟内存的一部分 5.对同一块动态内存多次…

物联网|按键实验---学习I/O的输入及中断的编程|读取I/O的输入信号|中断的编程方法|轮询实现按键捕获实验-学习笔记(13)

文章目录 实验目的了解擒键的工作原理及电原理图 STM32F407中如何读取I/O的输入信号STM32F407对中断的编程方法通过轮询实现按键捕获实验如何利用已有内工程创建新工程通过轮询实现按键捕获代码实现及分析1 代码的流程分析2 代码的实现 Tips:下载错误的解决 实验目的 了解擒键…

Leetcode-每日一题【剑指 Offer 09. 用两个栈实现队列】

题目 用两个栈实现一个队列。队列的声明如下&#xff0c;请实现它的两个函数 appendTail 和 deleteHead &#xff0c;分别完成在队列尾部插入整数和在队列头部删除整数的功能。(若队列中没有元素&#xff0c;deleteHead 操作返回 -1 ) 示例 1&#xff1a; 输入&#xff1a; [&…

springboot第34集:ES 搜索,nginx

#用search after解决深分页性能问题 #第一页 GET /bank/_search {"size": 10,"sort": [{"account_number": {"order": "asc"}}] }#第二页 GET /bank/_search {"size": 10,"sort": [{"account_numb…

【WEB逆向】前端全报文加密的分析技巧

由于前端全报文加密&#xff0c;无法从变量的全文搜索来快速定位加密函数对加密参数的定位&#xff08;全局搜索还有个弊病是编码混淆的js也不能全局搜到&#xff0c;需要进一步分析判定混淆的编码形式后再全局搜编码后的变量名&#xff09;&#xff0c;因此可利用xhr断点全局拦…

LLM reasoners 入门实验 24点游戏

LLM reasoners Ber666/llm-reasoners 实验过程 实验样例24games&#xff0c;examples/tot_game24&#xff0c;在inference.py中配置使用代理和open ai的api key。 首先安装依赖 git clone https://github.com/Ber666/llm-reasoners cd llm-reasoners pip install -e .然后…

【雕爷学编程】Arduino动手做(187)---1.3寸OLED液晶屏模块2

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…