Hadoop3教程(十八):MapReduce之MapJoin案例分析

文章目录

  • (118)MapJoin案例需求分析
    • ReduceJoin的问题
    • 如何解决ReduceJoin的问题
    • 如何将一个文件主动缓存到集群的内存里
  • (119)MapJoin案例代码实现
  • 参考文献

(118)MapJoin案例需求分析

ReduceJoin的问题

在ReduceJoin中,合并的操作是在Reduce阶段进行的,所以相比Map阶段,Reduce阶段的处理压力过大。另外,相同的产品ID的数据会进入同一个Reducer中,如果这个产品ID下数据过多,其他产品ID的数据很少,那么会导致前面那个Reducer压力过大,这就是数据倾斜问题。

如何解决ReduceJoin的问题

那如何解决这种问题呢?

比较好的方法是不使用ReduceJoin,使用MapJoin,即在Map阶段实现拼接。

思路简单来说,就是将产品码表放进内存,orders.txt正常切片进入mapper,然后mapper处理的时候,就逐行对orders.txt里的数据进行产品码值的替换。

基于这种方式,MapJoin的适用场景也就很明显了,MapJoin适用于一张或多张表特别小(不能把内存撑爆了),一张表特别大的场景

如何将一个文件主动缓存到集群的内存里

那问题来了,在Hadoop里怎么把一张表主动缓存到内存当中,且还能在map()里调用呢?

首先我们需要在驱动类里,指定将文件加载到缓存:

//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));// MapJoin的话就不需要Reduce阶段了
job.setNumReduceTasks(0);

然后在自定义Mapper类的setup()里,按以下流程编写代码,以读取缓存的文件数据:

//1. 获取缓存的文件;
// 2.循环读取缓存文件中每一行;
// 3. 切割;
// 4. 缓存数据到集合;

setup()执行完成后,才会执行map()

所以我们最后在map()里,获取一行后,截取到pid,从内存中码表拿到产品中文名,拼接给出就可以。

(119)MapJoin案例代码实现

过了一遍教程,其实就是对上一小节的代码实现。

总的来说,就是只有一个Map阶段,在Map阶段中,在map()处理之前,先把码表读进内存中,然后map()在一行一行读取后,直接使用内存中的码表对指定字段进行替换即可。

对我来讲用处不大,所以这里直接跳过,但还是补充一下代码:

在MapJoinDriver驱动类中添加缓存文件:

package com.atguigu.mapreduce.mapjoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {// 1 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置加载jar包路径job.setJarByClass(MapJoinDriver.class);// 3 关联mapperjob.setMapperClass(MapJoinMapper.class);// 4 设置Map输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 5 设置最终输出KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 加载缓存数据job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);// 6 设置输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\input"));FileOutputFormat.setOutputPath(job, new Path("D:\\output"));// 7 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

在MapJoinMapper类中的setup方法中读取缓存文件,并在map()里进行替换:

package com.atguigu.mapreduce.mapjoin;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private Map<String, String> pdMap = new HashMap<>();private Text text = new Text();//任务开始前将pd数据缓存进pdMap@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//通过缓存文件得到小表数据pd.txtURI[] cacheFiles = context.getCacheFiles();Path path = new Path(cacheFiles[0]);//获取文件系统对象,并开流FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(path);//通过包装流转换为reader,方便按行读取BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));//逐行读取,按行处理String line;while (StringUtils.isNotEmpty(line = reader.readLine())) {//切割一行    
//01	小米String[] split = line.split("\t");pdMap.put(split[0], split[1]);}//关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//读取大表数据    
//1001	01	1String[] fields = value.toString().split("\t");//通过大表每行数据的pid,去pdMap里面取出pnameString pname = pdMap.get(fields[1]);//将大表每行数据的pid替换为pnametext.set(fields[0] + "\t" + pname + "\t" + fields[2]);//写出context.write(text,NullWritable.get());}
}

参考文献

  1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/108478.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

YOLO目标检测——安全帽手套数据集【含对应voc、coco和yolo三种格式标签】

实际项目应用&#xff1a;主要应用于监控视频中工作人员是否佩戴安全帽或手套的场景。数据集说明&#xff1a;YOLO目标检测数据集&#xff0c;类别有&#xff1a;手套、头盔、非头盔、人、鞋、背心、赤膊&#xff0c;真实场景的高质量图片数据&#xff0c;数据场景丰富。使用la…

The given SOAPAction http__xxxxx_xx does not match an operation

这是在客户端调用服务端接口时报出的错误&#xff0c;主要是客户端在调用时设置了SOAPAction&#xff0c;参考如下&#xff1a; 解决方案 在注解WebMethod() 中加上action注解&#xff0c;设置上一模一样的SOAPAction即可&#xff0c;如下&#xff1a; WebMethod(action &qu…

PostgreSQL中E‘string‘ 的使用

在PostgreSQL中&#xff0c;E’string’ 是一种特殊的字符串表示方式&#xff0c;其中的E代表"ESCAPE STRING"&#xff0c;即转义字符串。 使用E表示法时&#xff0c;可以在字符串中使用转义字符来表示特殊字符&#xff0c;如换行符&#xff08;\n&#xff09;&…

小程序设计基本微信小程序的旅游社系统

项目介绍 现今市面上有关于旅游信息管理的微信小程序还是比较少的&#xff0c;所以本课题想对如今这么多的旅游景区做一个收集和分类。这样可以给身边喜欢旅游的朋友更好地推荐分享适合去旅行的地方。 前端采用HTML架构&#xff0c;遵循HTMLss JavaScript的开发方式&#xff0…

【CANoe】TX Self-ACK自应答配置与CPAL实现

一、引言 在测试CAN&CANFD通信或者网络管理的时候&#xff0c;我们经常遇到使用报文&#xff08;网络管理报文或者通信报文&#xff09;唤醒被测件这个测试点&#xff0c;如果测试比较多的情况下&#xff0c;我们就会发现&#xff0c;如果CANoe没有接被测件或者被测件没有…

CSS 效果:多列文字,第一行对齐,flex方式元素被挤压

如图效果&#xff1a;2列&#xff0c;第一列只有一行&#xff0c;第二列多行。要求第一行对齐 实现&#xff1a;使用flex 如果不配置flex-shrink的话&#xff0c;第一列会被挤压 给第一列&#xff1a;备注配置压缩属性&#xff1a; flex-shrink&#xff1a;0。 <!DOCTYPE…

发现了一个牛逼的网站 可以免费使用chatGPT(看到最后)

最近,打工人们集体陷入了一场焦虑。“一觉醒来,我感觉自己快要失业了……”“身为文字工作者,我该如何自保饭碗?”“人工智能到底会不会完全取代人类的工作?” ChatGPT最近都听说了吧?据说,改论文、敲代码、写文案……只有你想不到的,没有人家办不成的! 它是什么? …

谜题(Puzzle, ACM/ICPC World Finals 1993, UVa227)rust解法

有一个5*5的网格&#xff0c;其中恰好有一个格子是空的&#xff0c;其他格子各有一个字母。一共有4种指令&#xff1a;A, B, L, R&#xff0c;分别表示把空格上、下、左、右的相邻字母移到空格中。输入初始网格和指令序列&#xff08;以数字0结束&#xff09;&#xff0c;输出指…

arthas the number of matched classs is 65

以上是运行trace命令后&#xff0c;得到的提示。 原因就是匹配到的类太多了&#xff0c;可以在命令后加个-m 数量&#xff0c;这个数量只能大于等于匹配的数量&#xff0c;而不能小于&#xff0c;这里指定的数量就是要大于等于65。 参考命令&#xff1a;trace xxxx -m 65。

【数据结构】二叉树链式存储及遍历

二叉树链式存储及遍历 文章目录 二叉树链式存储及遍历前言实现过程代码实现源代码总结 前言 本文章中的内容参考于王道数据结构考研书&#xff0c;如果你对该部分的内容的记忆有所模糊&#xff0c;可以阅读我的文章再加深印象 实现过程 1.定义二叉树结构体 2.初始化二叉树的根结…

【linux API分析】module_init

linux版本&#xff1a;4.19 module_init()与module_exit()用于驱动的加载&#xff0c;分别是驱动的入口与退出函数 module_init()&#xff1a;内核启动时或动态插入模块时调用module_exit()&#xff1a;驱动移除时调用 本篇文章介绍module_init() module_init() module_init…

数据库第一、二章作业

只为记录与分享 第1,2章作业.xls 题量: 34 满分: 100 一. 单选题&#xff08;共34题&#xff09; 1. (单选题)在数据库中&#xff0c;下列说法&#xff08; &#xff09;是不正确的。 A. 数据库避免了一切数据的重复B. 若系统是完全可以控制的&#xff0c;则系统可确保更新…

java 两个list比较,删除相同的元素

概述 在Java开发中&#xff0c;经常需要比较两个List并删除相同的元素。本文将介绍整个流程&#xff0c;并提供相应的代码示例&#xff0c;帮助新手开发者完成这个任务。 流程 下面是比较两个List并删除相同元素的流程&#xff1a; 代码示例 创建两个List 我们首先需要创建两…

从旅游发展大会,看长沙的“落子”与“棋道”

文&#xff5c;新熔财经 作者&#xff5c;石榴 中秋国庆假期作为今年的最后一个小长假&#xff0c;全国各地果断祭出自己的“杀手锏”&#xff0c;不过&#xff0c;虽是百花齐放&#xff0c;但星城长沙仍然是最亮眼的存在之一。 全省文化旅游统计监测系统显示&#xff0c;中…

6-k8s-控制器版本管理

文章目录 一、概念介绍二、配置介绍三、版本生成测试四、版本回滚测试 一、概念介绍 什么是控制器&#xff1a;在k8s中&#xff0c;控制器是一种用于控制和管理Pod的管理器&#xff0c;包括Deployment、ReplicaSet、StatefulSet等。 什么是控制器版本管理&#xff1a;是指对于…

用Java包com.sun.net.httpserver下面的类实现一个简单的http服务器demo

java的com.sun.net.httpserver包下的类提供了一个高层级的http服务器API&#xff0c;可以用来构建内嵌的http服务器。支持http和https。这些API提供了一个RFC 2616 (HTTP 1.1)和RFC 2818 (HTTP over TLS)的部分实现。 https://docs.oracle.com/en/java/javase/19/docs/api/jdk.…

Kotlin中的变量与常量

在Kotlin中&#xff0c;val和var是用于声明变量的关键字。 val用于声明不可变的变量&#xff0c;也就是说一旦初始化之后&#xff0c;其值就不能再被修改。它类似于Java中的final关键字。val声明的变量可以通过类型推断自动推断其类型&#xff0c;也可以显式定义类型。 以下是…

SpringBoot面试题6:Spring Boot 2.X 有什么新特性?与 1.X 有什么区别?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:Spring Boot 2.X 有什么新特性?与 1.X 有什么区别? Spring Boot是一种用于简化Spring应用程序开发的框架,它提供了自动配置、起步依赖和快速开…

opensl学习——base16编码解码、base64编码解码、ASCII码表、扩展ASCII码

文章目录 ASCII表概述base家族简单说明 Hex(十六进制)编码、Base32编码、Base64编码、base256编码base16编码与解码base64编码概述转换过程不足 3 字节处理方法例子一,不足3字节&#xff0c;只有一个字节例子二,不足3字节&#xff0c;只有两个字节 base64示例代码1代码分析 acl…

【数据结构】排序--选择排序(堆排序)

目录 一 堆排序 二 直接选择排序 一 堆排序 堆排序(Heapsort)是指利用堆积树&#xff08;堆&#xff09;这种数据结构所设计的一种排序算法&#xff0c;它是选择排序的一种。它是 通过堆来进行选择数据。 需要注意的是排升序要建大堆&#xff0c;排降序建小堆。 直接选择排…