MR源码解析和join案例

MR源码解析

  1. new Job(): 读取本地文件, xml配置
  2. job.start(): 启动线程
  3. job的run():线程方法
    • runTasks(): 传入对应的接口,启动map或者reduce
    • MapTask类的run(): 设置map阶段的参数,初始化任务,创建上下文对象
      • 创建读取器LineRecordReader
      • 判断是否压缩 compressFactory
      • 如果没有压缩,使用seek方法
      • mapTask的write(),进行溢写
      • mapper类的init()方法,设置溢写百分比和缓冲区大小
      • collector收集器:进行map阶段数据类型检查和分数数量检查
      • keySerializer: 进行数据的序列化,调用自己写的bean对象
      • kvmeta.put(): 写入环形缓冲区
      • mapPhase结束
    • 数据量达到缓冲区的80%,对索引进行快速排序
    • input.close():关闭输入
    • 关闭输出并同时将缓冲区数据按照分区写入磁盘。
      • 如果开启了combine,进行数据合并
    • mergePart:归并分区
    • combine第二次合并,如果溢写次数小于3就不合并了
    • collector.close():关闭环形缓冲区
  4. reduceTask的run方法
    • submit: 5个reduce并行提交
    • cLeanTask:初始化
    • shuffle类:map的排序,recuce中的归并排序
    • Merger合并器:两次归并排序,先内存归并,后磁盘归并
    • 抓取数据:可以从本地或者网络中抓取
    • sort :归并排序
    • reduce阶段:
      • 创建上下文对象
      • 调用reducer的run方法
      • real.write(): LineRecordWrite写入HDFS

使用MR来进行拷贝去重

  1. 拷贝:values写入上下文时需要迭代遍历
  2. 去重:values写入上下文时不遍历

使用MR来实现join操作

在这里插入图片描述
在这里插入图片描述

  1. 实现TableBean类,四个属性,空参构造器,get-set方法
    • write():序列化
      • out.writeUTF():该方法有换行,不会连在一起
    • readFields(): 反序列化
  2. 实现mapper类
    • setup()
      • 使用context上下文对象获取InputSplit类
      • 强制类型转换为FileSplit类
      • getPath().getName()获取文件名称
    • map()
      • 切分split
      • 封装
      • context写出
public class TableMapper extends Mapper<LongWritable, Text, Text,TableBean> {private String filename;private Text outK;private TableBean outV;//初始化,每个文件开始一次maptask,并进行一次初始化//获取到文件的名称@Overrideprotected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//拿到切片信息FileSplit split = (FileSplit) context.getInputSplit();filename = split.getPath().getName();outK = new Text();outV = new TableBean();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//1. 获取一行String line = value.toString();//2.判断是哪个文件的if(filename.contains("order")){//处理的是订单表String[] split = line.split("\t");//封装outK.set(split[1]);//pid作为keyoutV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setTableName("order");outV.setPname("");}else{//处理的是商品表String[] split = line.split(" ");
//            System.out.println("=========> " + Arrays.toString(split)+" <=========");
//            System.out.println("=========> " + split[1] +" <=========");//封装outK.set(split[0]);//pid作为keyoutV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setTableName("pd");outV.setPname(split[1]);}//写出context.write(outK, outV);}
}
  1. 实现reduce类
    • 为了分辨map传递过来的数据是哪个表,给bean对象添加一个表名属性
    • 在mapper类中给对应表的抓取过程中添加标记
    • 在获取到value时不能直接使用等于号进行赋值,values是Iterable集合,比较特殊
    • 属性赋值工具类BeanUtils.copyProperties(dest, src);
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {private ArrayList<TableBean> orderBeans;private TableBean pdBean;@Overrideprotected void setup(Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {//1.创建集合orderBeans = new ArrayList<>();pdBean = new TableBean();}@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {orderBeans.clear();//清空集合//2.遍历赋值for (TableBean value : values) {if ("order".equals(value.getTableName())) {TableBean temp = new TableBean();try {BeanUtils.copyProperties(temp,value);} catch (IllegalAccessException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);}orderBeans.add(temp);} else {//商品表try {BeanUtils.copyProperties(pdBean, value);} catch (IllegalAccessException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);}}}//循环遍历orderBeans,赋值pdnamefor (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());context.write(orderBean,NullWritable.get());}}
}

总结:这种写法,在reduce阶段创建了对象和集合,这些方式都是比较消耗资源的,容易造成数据倾斜问题。

MR在环形缓冲区快排时倒排索引,反向溢写,会导致数据反向输出,类似栈结构的的先进后出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/73887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Visual Studio 新建类从默认internal改为public

前言 之前一直用的Resharp辅助编写C#代码&#xff0c;Resharp用起来的确方便不少&#xff0c;但是太消耗开发机内存了。重装电脑后&#xff0c;还是决定使用Visual Studio内置的功能。 默认情况下&#xff0c;Visual Studio 中生成一个类或接口是internal类型的&#xff0c;而…

EagleSDR USB HAT FT600

给EagleSDR做了个USB 3.0的子卡&#xff0c;采用FT600方案&#xff0c;实物如下&#xff1a; 用FT600DataStreamerDemoApp测试&#xff0c;速度如下&#xff1a; 由于FT600是16bit的接口&#xff0c;如果用FT601的32bit接口&#xff0c;性能应该还会有大幅提升。 测试代码很简…

Notepad++下载安装

自己在 找Notepad发现网上的网址参差不齐&#xff0c;自己找到的一个不错下载链接见文末&#xff01; Notepad 是一个免费的代码编辑器&#xff0c;专为在微软 Windows 环境下使用。它是一个开源项目&#xff0c;采用 GPL 许可证&#xff0c;并使用 C 编程语言结合 Win32 API 和…

数据驱动的数字营销与消费者运营

引言&#xff1a;基于海洋馆文旅企业在推广宣传中&#xff0c;如何通过指标体系量化分析广告收益对业务带来的收益价值的思考&#xff1f; 第一部分:前链路引流投放的策略与实战 1.1 动态广告的实现: 偶然与必然 动态广告是一种基于实时数据和用户行为的广告形式&#xff0c;它…

Mysql--事务

事务 开始之前&#xff0c;让我们先想一个场景&#xff0c;有的时候&#xff0c;为了完成某个工作&#xff0c;需要完成多种sql操作 比如转账 再比如下单 第一步 我的账户余额减少 第二步 商品的库存要减少 第三步 订单表中要新增一项 事务的本质&#xff0c;就是为了把多个操…

Mac 多版本jdk安装与切换

macOS上可以安装多个版本的jdk&#xff0c;方法如下&#xff1a; 1.下载jdk 在Oracle官网上下载不同版本的jdk&#xff1a; https://www.oracle.com/java/technologies/downloads/#java17 方案一 1.查看本机所有的jdk /usr/libexec/java_home -V3. 配置环境变量 打开bash_…

Excel文件生成与下载(SpringBoot项目)(easypoi)

说明 通过接口&#xff0c;导出表格。 使用SpringBoot框架和easypoi表格解析框架&#xff0c;生成Excel表格&#xff0c;并通过接口下载。 表格示例 依赖 版本 <easypoi.version>4.4.0</easypoi.version>依赖 <!-- easypoi --> <dependency><…

python知识:有效使用property装饰器

一、说明 Python是唯一有习语&#xff08;idioms&#xff09;的语言。这增强了它的可读性&#xff0c;也许还有它的美感。装饰师遵循Python的禅宗&#xff0c;又名“Pythonic”方式。装饰器从 Python 2.2 开始可用。PEP318增强了它们。下面是一个以初学者为中心的教程&#xff…

安科瑞精密配电多回路监控装置在轨道交通项目上的应用

行业背景 轨道交通作为城市公共交通系统的一部分&#xff0c;在过去几十年中得到了广泛的发展和扩张。它在解决城市交通拥堵、减少环境污染、提高城市可持续性等方面发挥了重要作用。随着科技的进步&#xff0c;轨道交通系统也在不断引入新的技术和创新&#xff0c;以提高运行…

WPF Material Design 初次使用

文章目录 前言相关资源快速开始快速开始说明地址 吐槽一下 前言 MD全称MaterialDesignInXamlToolkit&#xff0c;MaterialDesign和Bootstrap一样&#xff0c;都是一个UI风格库。相当于衣服中的休闲服&#xff0c;汉服&#xff0c;牛仔裤一样&#xff0c;就是风格不一样的Ui框架…

VR钢铁实训 | 铁前事业部虚拟仿真培训软件

随着科技的发展&#xff0c;虚拟现实技术在各个行业中的应用越来越广泛。在钢铁冶炼行业中&#xff0c;VR技术也逐渐得到了应用&#xff0c;其中铁前事业部虚拟仿真培训软件就是一项非常有优势的技术。 铁前事业部虚拟仿真培训软件是广州华锐互动打造的《钢铁生产VR虚拟培训系统…

msvcp110.dll是什么意思与msvcp110.dll丢失的解决方法

电脑突然提示msvcp110.dll丢失&#xff0c;无法执行此代码。导致软件无法打开运行&#xff0c;这个怎么办呢&#xff1f;我在网上找了一天的资料&#xff0c;终于把这个问题彻底处理好&#xff0c;也弄清楚了msvcp110.dll丢失的原因及msvcp110.dll丢失修复方法&#xff1f;现在…

docker从零部署jenkins保姆级教程(上)

jenkins&#xff0c;基本是最常用的持续集成工具。在实际的工作中&#xff0c;后端研发一般没有jenkins的操作权限&#xff0c;只有一些查看权限&#xff0c;但是我们的代码是经过这个工具构建出来部署到服务器的&#xff0c;所以我觉着有必要了解一下这个工具的搭建过程以及简…

网络通信深入解析:探索TCP/IP模型

http协议访问web 你知道在我们的网页浏览器的地址当中输入url&#xff0c;未必是如何呈现的吗&#xff1f; web浏览器根据地址栏中指定的url&#xff0c;从web服务器获取文件资源&#xff08;resource&#xff09;等信息&#xff0c;从而显示出web页面。web使用HTTP&#xff08…

vs code调试rust乱码问题解决方案

在terminal中 用chcp 65001 修改一下字符集&#xff0c;就行了。有的博主推荐 修改 区域中的设置&#xff0c;这会引来很大的问题。千万不要修改如下设置&#xff1a;

八个针对高级职位的高级 JavaScript 面试题

JavaScript 是一种功能强大的语言&#xff0c;是网络的主要构建块之一。这种强大的语言也有一些怪癖。例如&#xff0c;您是否知道 0 -0 的计算结果为 true&#xff0c;或者 Number("") 的结果为 0&#xff1f; 问题是&#xff0c;有时这些怪癖会让你摸不着头脑&…

同步推送?苹果计划本月推出 iOS17和iPadOS17,你的手机支持吗?

据报道&#xff0c;苹果公司计划在本月推出 iOS 17 和 iPadOS 17 正式版更新。与去年不同的是&#xff0c;这次更新将同时发布&#xff0c;而不是分别发布。根据彭博社的一位消息人士马克・古尔曼的说法&#xff0c;苹果公司认为 iOS 17 和 iPadOS 17 的第八个测试版已经非常接…

动态库的制作与使用及 动态库加载失败解决

加载动态库时有时会出现error while loading shared libraries&#xff1a;libcalc.so:可以通过lld命令查看动态库的依赖关系&#xff0c;发现libcalc.so时not found 原因 查找的优先级是DT_RPATH->LD_LIBRARY_PATH->/etc/ld.so.cache->/lib/,/usr/lib 找不到一个优…

文心一言api接入如何在你的项目里使用文心一言

文心一言api接入在项目里接入文心一言 一、百度文心一言API二、使用步骤1、接口2、请求参数3、请求参数示例4、接口 返回示例 三、 如何获取appKey和uid1、申请appKey:2、获取appKey和uid 四、重要说明 一、百度文心一言API 基于百度文心一言语言大模型的智能文本对话AI机器人…

java读取服务器数据包并下载至本地目录

jsch包如果没有的话&#xff0c;可评论联系我&#xff0c;我私发给你&#xff0c;或者通过https://mvnrepository.com/artifact/com.jcraft/jsch/0.1.55进行下载&#xff0c;添加至工程目录 package com.hbisdt.dqbasic.modular.util;import com.jcraft.jsch.*;import java.i…