Hadoop3:MapReduce中的Reduce Join和Map Join

一、概念说明

学过MySQL的都知道,join和left join
这里的join含义和MySQL的join含义一样
就是对两张表的数据,进行关联查询

Hadoop的MapReduce阶段,分为2个阶段
一个Map,一个Reduce
那么,join逻辑,就可以在这两个阶段实现。

两者有什么区别了?
我们都知道,一般情况下,MapTaskReduceTask线程数更多。
所以,当两张表,有一个表数据量非常大,一个表非常小的时候
我们建议放在Map阶段进行join,这样可以提高性能。

二、需求说明

有两张表数据
在这里插入图片描述
将商品信息表中数据根据商品pid合并到订单数据表中
在这里插入图片描述

三、代码实现

1、Reduce Join

TableBean

package com.atguigu.mapreduce.reduceJoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class TableBean implements Writable {private String id; // 订单idprivate String pid; // 商品idprivate int amount; // 商品数量private String pname;// 商品名称private String flag; // 标记是什么表 order pd// 空参构造public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readUTF();this.pid = in.readUTF();this.amount = in.readInt();this.pname = in.readUTF();this.flag = in.readUTF();}@Overridepublic String toString() {// id	pname	amountreturn  id + "\t" +  pname + "\t" + amount ;}
}

TableMapper
源数据,是多个文件的时候,我们要在setup方法里,获取文件信息
这样才能在map方法里知道,当前读取的是哪个文件,从而实现区别处理。

package com.atguigu.mapreduce.reduceJoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {private String fileName;private Text outK  = new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {// 初始化  order  pdFileSplit split = (FileSplit) context.getInputSplit();fileName = split.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1 获取一行String line = value.toString();// 2 判断是哪个文件的if (fileName.contains("order")){// 处理的是订单表String[] split = line.split("\t");// 封装k  voutK.set(split[1]);outV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setPname("");outV.setFlag("order");}else {// 处理的是商品表String[] split = line.split("\t");outK.set(split[0]);outV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setPname(split[1]);outV.setFlag("pd");}// 写出context.write(outK, outV);}
}

TableReducer

这里要注意
for循环处理bean list的时候,我们要在循环里面,new一个bean,存入list中
因为,Hadoop中,Iterable里存放的是地址,所以,不在循环内new一个bean来存放
会导致数据覆盖,最终只是存了一个bean

package com.atguigu.mapreduce.reduceJoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;public class TableReducer extends Reducer<Text, TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//        01 	1001	1   order
//        01 	1004	4   order
//        01	小米   	     pd// 准备初始化集合ArrayList<TableBean> orderBeans = new ArrayList<>();TableBean pdBean = new TableBean();// 循环遍历for (TableBean value : values) {if ("order".equals(value.getFlag())){// 订单表TableBean tmptableBean = new TableBean();try {BeanUtils.copyProperties(tmptableBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}orderBeans.add(tmptableBean);}else {// 商品表try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}// 循环遍历orderBeans,赋值 pdnamefor (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());context.write(orderBean,NullWritable.get());}}
}

TableDriver

package com.atguigu.mapreduce.reduceJoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TableDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("E:\\workspace\\data\\inputtable"));FileOutputFormat.setOutputPath(job, new Path("E:\\workspace\\data\\join1"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}}

测试

在这里插入图片描述在这里插入图片描述

数据变化

1、源数据

在这里插入图片描述

2、Map方法中,按行读取数据

在这里插入图片描述

3、Shuffle阶段排序

因为,map方法中,用pid作为key,所以,这里对pid进行排序
在这里插入图片描述

4、Reduce方法,按key读取数据

这里的key只有3个,所以,reduce被调用了3次
每封装好一条数据,就write一次
reduce方法执行完毕后,进行归并排序,得到最终数据文件,输出到磁盘
在这里插入图片描述

2、Map Join

关键技术:
采用DistributedCache,在map阶段缓存小表数据
并且,取消reduce阶段

MapJoinDriver
关键代码:

        // 加载缓存数据job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));//缓存普通文件到Task运行节点。//job.addCacheFile(new URI("file:///e:/cache/pd.txt"));//如果是集群运行,需要设置HDFS路径//job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);
package com.atguigu.mapreduce.mapjoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {// 1 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置加载jar包路径job.setJarByClass(MapJoinDriver.class);// 3 关联mapperjob.setMapperClass(MapJoinMapper.class);// 4 设置Map输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 5 设置最终输出KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 加载缓存数据job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);// 6 设置输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputtable2"));FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output8888"));// 7 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}}

MapJoinMapper
setup方法中,使用driver中配置的小表文件路径,创建流,并将数据缓存起来,供map方法使用。

package com.atguigu.mapreduce.mapjoin;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private HashMap<String, String> pdMap = new HashMap<>();private Text outK = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {// 获取缓存的文件,并把文件内容封装到集合 pd.txtURI[] cacheFiles = context.getCacheFiles();FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));// 从流中读取数据BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));String line;while (StringUtils.isNotEmpty(line = reader.readLine())) {// 切割String[] fields = line.split("\t");// 赋值pdMap.put(fields[0], fields[1]);}// 关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 处理 order.txtString line = value.toString();String[] fields = line.split("\t");// 获取pidString pname = pdMap.get(fields[1]);// 获取订单id 和订单数量// 封装outK.set(fields[0] + "\t" + pname + "\t" + fields[2]);context.write(outK, NullWritable.get());}
}

测试

在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36950.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端开发的工厂设计模式

在前端开发中&#xff0c;工厂设计模式&#xff08;Factory Pattern&#xff09;是一种非常有用的设计模式&#xff0c;能够帮助我们在创建对象时减少代码的重复性和复杂性。 一、工厂设计模式概述 工厂设计模式是一种创建型设计模式&#xff0c;主要目的是定义一个用于创建对…

2024年建筑八大员(资料员)考试题库,省心高效,轻松通过!

1.插入的图片无法显示&#xff0c;或者显示失真&#xff0c;正确做法是&#xff08;&#xff09;。 A.插人图片是应选中【自动调整图片大小】 B.在下拉【菜单】中选中【按单元格式大小】插入 C.在【格式】下拉中【图片】处打钩 D.在【属性】下拉中选中【工具显示】 答案&a…

两张图片怎样拼在一起?将两张图片拼在一起的几种方法介绍

两张图片怎样拼在一起&#xff1f;拼接两张图片是一种常见的编辑技巧&#xff0c;能够将不同的视觉元素融合成一个整体&#xff0c;从而创造出更加生动和丰富的图像效果。无论是为了设计创意作品、制作社交媒体内容&#xff0c;还是简单地为个人相册增添趣味&#xff0c;掌握如…

Element-UI 并排显示多个 disabled按钮的时候, 不生效问题解决

目录 Element-UI 并排显示多个 disabled按钮的时候&#xff0c; 不生效问题解决 解决方法&#xff1a; 运行结果&#xff1a; Element-UI 并排显示多个 disabled按钮的时候&#xff0c; 不生效问题解决 解决方法&#xff1a; Element-UI 并排显示多个 disabled按钮的时候&a…

spring boot 3.0.1多模块项目使用nacos动态配置

根pom文件增加&#xff0c;spring-cloud-alibaba包管理&#xff0c;注意版本spring-boot 3.0.3&#xff0c;spring-cloud-alibaba 2022.0.0.0-RC1 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0…

我的创作纪念日学期总结

&#x1f525;个人主页&#xff1a; Forcible Bug Maker &#x1f525;专栏&#xff1a; 关于博主 目录 &#x1f308;前言&#x1f525;我的期末考试&#x1f525;我的学期总结&#x1f525;对未来的展望&#x1f308;结语 &#x1f308;前言 本篇博客主要内容&#xff1a;博…

归并排序和计数排序

目录 1.归并排序1.1递归1.1基本思想1.2算法描述1.3画图解释1.4代码实现 1.2非递归 2.计数排序2.1基本思想2.2算法描述3.画图解释 1.归并排序 1.1递归 1.1基本思想 归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法&#xff08;Divide and Conquer&#xf…

【C++】动态内存管理new和delete

文章目录 一、C的内存管理方式二、new和delete的用法1.操作内置类型2.操作自定义内置类型 三、new和delete的底层实现1.operator new和operator delete函数2.new和delete的实现原理 四、定位new表达式五、malloc/free和new/delete的区别 一、C的内存管理方式 之前在C语言的动态…

kafka(四)消息类型

一、同步消息 1、生产者 同步发送的意思就是&#xff0c;一条消息发送之后&#xff0c;会阻塞当前线程&#xff0c;直至返回 ack。 由于 send 方法返回的是一个 Future 对象&#xff0c;根据 Futrue 对象的特点&#xff0c;我们也可以实现同 步发送的效果&#xff0c;只需在调…

【数据结构】计数排序等排序

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…

Ubuntu系统中创建桌面快捷方式和添加Favorites

一. Ubuntu系统中创建软件的桌面快捷方式 Ubuntu桌面创建某个软件的桌面快捷方式&#xff0c;一个直观的方法。 方法1. 在图像界面下&#xff0c;一层一层地打开文件目录软件快捷方式/usr/share/applications/ 方法2. 或者在终端运行$ nautilus /usr/share/applications/ …

MQ - RabbitMQ、SpringAMQP --学习笔记

什么是MQ&#xff1f; MQ 是消息队列&#xff08;Message Queue&#xff09;的缩写&#xff0c;它是一种应用程序间异步通信的技术。消息队列允许应用程序或服务间通过发送消息来交换数据&#xff0c;而不是直接调用对方&#xff0c;从而实现解耦、异步处理和负载均衡等目的。…

零成本打造精品宣传册

​随着互联网的发展&#xff0c;企业和个人对宣传册的需求日益增长&#xff0c;然而&#xff0c;高质量的宣传册制作往往需要不菲的成本。那么&#xff0c;如何零成本打造精品宣传册呢&#xff1f; 一、明确定位和目标群体 在制作宣传册之前&#xff0c;首先要明确其定位和目标…

qt pro文件常用配置

概述 记录一下常用的项目pro文件的一些常用配置 常用配置 QT core gui concurrent#添加concurrent并行处理模块 CONFIG windeployqt#打包部署&#xff0c;项目->构建步骤->Make参数 添加windeployqt&#xff0c;编译自动打包greaterThan(QT_MAJOR_VERSION, 4):…

Kafka入门到精通(三)-Kafka

Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台&#xff0c;由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;它可以处理消费者在网站中的所有动作流数据。 这种动作&#xff08;网页浏览&#xff0c;搜索和其他用户的行动&#xf…

JeecgBoot新建模块

引言 jeecg-boot设置了demo, system等默认模块。在二次开发中&#xff0c;常常需要进行模块扩展。比如新增一个订单模块或支付模块。如何准确的新增模块&#xff0c;在此文进行记录。 步骤 新建模块 在项目点击右键&#xff0c;新建模块。 如下图。 注意&#xff1a;报名需…

鸿蒙NEXT开发知识:工具常用命令—ohpm config

设置ohpm用户级配置项。 命令格式 ohpm config set <key> <value> ohpm config get <key> ohpm config delete <key> ohpm config list 说明 配置文件中信息以键值对<key> <value>形式存在。 功能描述 ohpm 从命令行和 .ohpmrc 文件中…

Linux命令----wc,uniq,sort的用法

1.wc的用法&#xff1a;wc 命令用于计算文件中的行数、单词数和字节数。 常用选项 -l&#xff1a;只显示行数-w&#xff1a;只显示单词数-c&#xff1a;只显示字节数-m&#xff1a;只显示字符数&#xff08;与 -c 类似&#xff0c;但处理多字节字符&#xff09;-L&#xff1a…

day22--77. 组合+216.组合总和III+17.电话号码的字母组合

一、77. 组合 题目链接&#xff1a;https://leetcode.cn/problems/combinations/ 文章讲解&#xff1a;https://programmercarl.com/0077.%E7%BB%84%E5%90%88.html 视频讲解&#xff1a;https://www.bilibili.com/video/BV1ti4y1L7cv 1.1 初见思路 组合问题用回溯学会使用剪…

SpringBoot:SpringBoot中调用失败如何重试

一、引言 在实际的应用中&#xff0c;我们经常需要调用第三方API来获取数据或执行某些操作。然而&#xff0c;由于网络不稳定、第三方服务异常等原因&#xff0c;API调用可能会失败。为了提高系统的稳定性和可靠性&#xff0c;我们通常会考虑实现重试机制。 Spring Retry为Spri…