Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作

章节内容

上一节我们完成了:

  • MapReduce的介绍
  • Hadoop序列化介绍
  • Mapper编写规范
  • Reducer编写规范
  • Driver编写规范
  • WordCount功能开发
  • WordCount本地测试

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。

注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!
请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!

但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:

  • 2C4G 编号 h121
  • 2C4G 编号 h122
  • 2C2G 编号 h123

在这里插入图片描述

业务需求

平常我们在业务上,有很多时候表都是分开的,通过一些 id 或者 code 来进行关联。
在大数据的情况下,也有很多这种情况,我们需要进行联表操作。

表1

项目编码projectCode 项目名projectName

表2

项目编码projectCode 项目类型projectType 项目分类projectFrom

SQL 中,可以通过 LEFT JOIN 来实现字段补齐。大数据下,也需要进行这样的操作,我们需要借助 MapReduce

表1测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"	"社区项目1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8"	"社区项目2"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb"	"智慧社区"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393"	"公交站点"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"	"街道布建"
"2e556d83-bb56-45b1-8d6e-00510902c464"	"街道公交站点"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9"	"未命名项目1"
"5a5982d7-7257-422f-822a-a0c2f31c28d1"	"未命名项目2"

表2测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"	"重要类型"	"种类1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8"	"重要类型"	"种类1"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb"	"重要类型"	"种类1"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393"	"普通类型"	"种类1"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"	"普通类型"	"种类2"
"2e556d83-bb56-45b1-8d6e-00510902c464"	"普通类型"	"种类2"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9"	"一般类型"	"种类2"
"5a5982d7-7257-422f-822a-a0c2f31c28d1"	"一般类型"	"种类2"

SQL连表

假设我们使用SQL的方式联表:

SELECT*
FROMt_project
LEFT JOINt_project_info
ONt_project.projectCode=t_project_info.projectCode

Reduce JOIN

有时候,表可能过大,无法支持我们使用 SQL 进行连表查询。
这里我们编写一个程序来完成操作。

ProjectBean

这里是最终的Bean类,里边是两个表把字段补齐的结果,一会儿我们将使用这个类进行表的连接。

package icu.wzk.demo03;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class ProjectBean implements Writable {private String projectCode;private String projectName;private String projectType;private String projectFrom;private String flag;@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(projectCode);dataOutput.writeUTF(projectName);dataOutput.writeUTF(projectType);dataOutput.writeUTF(projectFrom);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.projectCode = dataInput.readUTF();this.projectName = dataInput.readUTF();this.projectType = dataInput.readUTF();this.projectFrom = dataInput.readUTF();this.flag = dataInput.readUTF();}public ProjectBean(String projectCode, String projectName, String projectType, String projectFrom, String flag) {this.projectCode = projectCode;this.projectName = projectName;this.projectType = projectType;this.projectFrom = projectFrom;this.flag = flag;}public ProjectBean() {}@Overridepublic String toString() {return "ProjectBean{" +"projectCode='" + projectCode + '\'' +", projectName='" + projectName + '\'' +", projectType='" + projectType + '\'' +", projectFrom='" + projectFrom + '\'' +", flag=" + flag + '\'' +'}';}public String getProjectCode() {return projectCode;}public void setProjectCode(String projectCode) {this.projectCode = projectCode;}public String getProjectName() {return projectName;}public void setProjectName(String projectName) {this.projectName = projectName;}public String getProjectType() {return projectType;}public void setProjectType(String projectType) {this.projectType = projectType;}public String getProjectFrom() {return projectFrom;}public void setProjectFrom(String projectFrom) {this.projectFrom = projectFrom;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}
}

Reduce Driver

package icu.wzk.demo03;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReducerJoinDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// String inputPath = args[0];// String outputPath = args[1];// === 测试环境 ===String inputPath = "project_test";String outputPath = "project_test_output";// === ===Configuration configuration = new Configuration();Job job = Job.getInstance(configuration, "ReducerJoinDriver");job.setJarByClass(ReducerJoinDriver.class);job.setMapperClass(ReducerJoinMapper.class);job.setReducerClass(ReducerJoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(ProjectBean.class);job.setOutputKeyClass(ProjectBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}

ReduceMapper

package icu.wzk.demo03;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class ReducerJoinMapper extends Mapper<LongWritable, Text, Text, ProjectBean> {String name;ProjectBean projectBean = new ProjectBean();Text k = new Text();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {// 获取路径信息name = context.getInputSplit().toString();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {String line = value.toString();if (name.contains("layout_project")) {// layout_projectString[] fields = line.split("\t");projectBean.setProjectCode(fields[0]);projectBean.setProjectName(fields[1]);projectBean.setProjectType("");projectBean.setProjectFrom("");projectBean.setFlag("layout_project");// projectCode 关联k.set(fields[0]);} else {// project_infoString[] fields = line.split("\t");projectBean.setProjectCode(fields[0]);projectBean.setProjectName("");projectBean.setProjectType(fields[1]);projectBean.setProjectFrom(fields[2]);projectBean.setFlag("project_info");// projectCode 关联k.set(fields[0]);}context.write(k, projectBean);}
}

ReduceReducer

package icu.wzk.demo03;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class ReducerJoinReducer extends Reducer<Text, ProjectBean, ProjectBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<ProjectBean> values, Reducer<Text, ProjectBean, ProjectBean, NullWritable>.Context context) throws IOException, InterruptedException {List<ProjectBean> dataList = new ArrayList<>();ProjectBean deviceProjectBean = new ProjectBean();for (ProjectBean pb : values) {if ("layout_project".equals(pb.getFlag())) {// layout_projectProjectBean projectProjectBean = new ProjectBean(pb.getProjectCode(),pb.getProjectName(),pb.getProjectType(),pb.getProjectFrom(),pb.getFlag());dataList.add(projectProjectBean);} else {// project_infodeviceProjectBean = new ProjectBean(pb.getProjectCode(),pb.getProjectName(),pb.getProjectType(),pb.getProjectFrom(),pb.getFlag());}}for (ProjectBean pb : dataList) {pb.setProjectType(deviceProjectBean.getProjectType());pb.setProjectFrom(deviceProjectBean.getProjectFrom());context.write(pb, NullWritable.get());}}
}

运行结果

ProjectBean{projectCode='"02d9c090-e467-42b6-9c14-52cacd72a4a8"', projectName='"社区项目2"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"244dcaca-0778-4eec-b3a2-403f8fac1dfb"', projectName='"智慧社区"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"2e556d83-bb56-45b1-8d6e-00510902c464"', projectName='"街道公交站点"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"3ba00542-eac9-4399-9c2b-3b06e671f4c9"', projectName='"未命名项目1"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"5a5982d7-7257-422f-822a-a0c2f31c28d1"', projectName='"未命名项目2"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"', projectName='"社区项目1"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"94befb97-d1af-43f2-b5d5-6df9ce5b9393"', projectName='"公交站点"', projectType='"普通类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"', projectName='"街道布建"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}

在这里插入图片描述

方案缺点

JOIN 操作是在 reduce 阶段完成的,reduce端处理压力过大map节点的运算负载很低,资源利用不高

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/39234.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文件扫描pdf怎么弄?5个简易高效的文件扫描方法

在繁忙的工作中&#xff0c;我们常常需要将纸质文件快速转换为电子文档&#xff0c;以便于编辑、存储或分享。 无论是合同、报告还是笔记&#xff0c;将这些纸质文件转换为Word格式&#xff0c;不仅能提高工作效率&#xff0c;还能确保信息的安全备份。然而&#xff0c;面对市…

Redis 7.x 系列【16】持久化机制之 AOF

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2. 执行原理2.1 Redis 6.x2.1.1 直接写2.1.2 重写 2.2 Redis 7.x2.2.1 Redis 6…

Spring Ioc学习

第二章 Spring IOC 章节内容 Spring IOC技术实现Spring IOC设值注入Spring IOC构造注入 章节目标 掌握Spring IOC技术实现掌握Spring IOC设置注入掌握Spring IOC构造注入 第一节 Spring简介 1. Spring 简介 Spring 是目前主流的 Java 开发框架&#xff0c;是 Java 世界最…

基于Springboot+Vue+mysql仓库管理系统仓库进销存管理系统

博主介绍&#xff1a; 大家好&#xff0c;本人精通Java、Python、C#、C、C编程语言&#xff0c;同时也熟练掌握微信小程序、Php和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我有丰富的成品Java、Python、C#毕设项目经验&#xff0c;能够为学生提供各类…

【Python】Python中的数据类型

数据类型 导读一、数据类型的分类1.1 Number&#xff08;数字&#xff09;1.1.1 静态数据类型1.1.2 动态数据类型 1.2 String&#xff08;字符串&#xff09;1.3 bool&#xff08;布尔类型&#xff09; 二、数据类型的转换2.1 隐式类型转换2.2 显式类型转换2.2.1 int(x[,base])…

系统运维面试总结(shell编程)

SYNDDOS攻击&#xff0c;需要判断这个访问是正常访问还是信包攻击&#xff0c;当前这个信包发起的访问数量是多少&#xff0c;例如看到30个信包同时再访问时设置监控报警。 一般选用/dev/urandom生成&#xff0c;但其生成的随机数带有二进制乱码&#xff0c;所以需要tr命令…

CASS中按指定距离和方向移动图形

1、绘制一个图形 打开软件&#xff0c;随意绘制一个矩形&#xff0c;并量取左下角点的坐标值&#xff0c;具体如下&#xff1a; 2、按距离移动原理讲解 例1&#xff1a;将图形沿着y轴负方向移动100米&#xff0c;如何实现&#xff1f; 如上图所示&#xff0c;测绘中的坐标系…

多载波调制与OFDM原理讲解以及MATLAB实现GUI设计

前言 基于MATLAB设计并实现了一个OFDM调制的图形用户界面&#xff08;GUI&#xff09;系统。该系统旨在简化OFDM调制过程的仿真&#xff0c;提供友好的用户交互界面。设计目标是通过GUI实现参数化的OFDM仿真&#xff0c;包括子载波数、符号数、IFFT长度、循环前缀长度、循环后…

模拟退火算法2—优缺点

模拟退火算法优点 1、以一定的概率接受恶化解 模拟退火算法(SA)在搜索策略上与传统的随机搜索方法不同,它不仅引入了适当的随机因素,而且还引入了物理系统退火过程的自然机理。这种自然机理的引入使模拟退火算法在迭代过程中不仅接受使目标函数变“好”的试探点,而且还能以一…

【单片机毕业设计选题24034】-基于STM32的手机智能充电系统

系统功能: 系统可以设置充电时长&#xff0c;启动充电后按设置的充电时长充电&#xff0c;充电时间到后自动 停止充电&#xff0c;中途检测到温度过高也会结束充电并开启风扇和蜂鸣器报警。 系统上电后&#xff0c;OLED显示“欢迎使用智能充电系统请稍后”&#xff0c;两秒钟…

哨兵1SAR空间数据包协议数据单元文档(五)

《哨兵1SAR空间数据包协议数据单元》文档对数据包的结构进行了详细描述&#xff0c;并提供了用户数据的格式和解码算法。 原文链接: 哨兵1SAR空间数据包协议数据单元文档英文版 同系列中的其他文章篇链接: 哨兵1SAR空间数据包协议数据单元文档&#xff08;一&#xff09; 哨兵1…

保存在FinalShell服务器登录密码忘记了,如何快速获取到

一、从FinalShell获取服务器基本信息 如图操作会导出一个json文件&#xff0c;可以直接保存在桌面&#xff0c;或者其他位置 json格式如下&#xff1a; {"forwarding_auto_reconnect":false ,"custom_size":false ,"delete_time":0 ,"sec…

Python数据分析-旧金山犯罪预测分析(San Francisco Crime Classification)

一、研究背景 旧金山是一个人口稠密、旅游业发达的城市&#xff0c;同时也是美国犯罪率较高的城市之一。随着城市的不断发展&#xff0c;犯罪行为的类型和频率也在不断变化&#xff0c;这对城市的治安管理和社会稳定构成了巨大的挑战。近年来&#xff0c;数据科学技术的迅猛发…

C# 编程中互斥锁的使用

C# 中的互斥锁 互斥锁是 C# 中使用的同步原语&#xff0c;用于控制多个线程或进程对共享资源的访问。其目的是确保在任何给定时间只有一个线程或进程可以获取互斥锁&#xff0c;从而提供互斥。 C# 中互斥锁的优点 可以使用互斥锁 (Mutex) 并享受其带来的好处。 1. 共享资源…

德国威步的技术演进之路(下):从云端许可管理到硬件加密狗的创新

从单机用户许可证到WkNET网络浮点授权的推出&#xff0c;再到引入使用次数和丰富的时间许可证管理&#xff0c;德国威步产品不断满足市场对灵活性和可扩展性的需求。TCP/IP浮动网络许可证进一步展示了威步技术在网络时代的创新应用。借助于2009年推出的借用许可证以及2015年推出…

mac磁盘工具如何合并分区 macos 磁盘工具 无法抹除 磁盘管理软件哪个使用率最高

一、什么是NTFS格式分区 NTFS格式分区是微软公司开发的诸多文件系统中的一种。NTFS格式分区是一种文件系统&#xff0c;磁盘只有在安装了文件系统后才能被正常使用&#xff0c;文件系统的格式有非常多&#xff0c;常见的有FAT 32和NTFS。 作为常见文件系统&#xff0c;NTFS格式…

无人机集群协同搜索研究综述

源自&#xff1a;指挥控制与仿真 作者&#xff1a;刘圣洋, 宋婷, 冯浩龙, 孙玥, 韩飞 注&#xff1a;若出现无法显示完全的情况&#xff0c;可 V 搜索“人工智能技术与咨询”查看完整文章 摘要 无人机集群协同区域搜索能够有效地获取任务区域地面信息,降低环境不确定度。基…

买卖股票的最佳时期含冷冻期(leetcode)

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 也就有这样的状态转移方程&#xff1a; 买入&#xff1a;dp[i][0] max(dp[i-1][1] - prices[i], dp[i-1][0]); 可买入&#xff1a;dp[i][1] max(dp[i-1][1], dp[i-1][2]); 冷冻期&#xff1a;dp[i][2] dp[i-1][0] prices…

使用ChatGPT自动生成测试用例思维导图

使用ChatGPT自动生成测试用例思维导图 引言ChatGPT在测试用例编写中的应用全面覆盖测试场景边界测试避免测试用例重复 借助ChatGPT生成测试用例思维导图准备工作步骤一&#xff1a;与ChatGPT对话步骤二&#xff1a;生成思维导图代码 结语 引言 在编写测试用例时&#xff0c;测…

基于Python Django的房价数据分析平台,包括大屏和后台数据管理,有线性、向量机、梯度提升树、bp神经网络等模型

背景 随着城市化进程的加速和房地产市场的快速发展&#xff0c;房价已成为经济学、社会学等多学科交叉研究的热点问题。为了更精确地分析和预测房价&#xff0c;数据分析和机器学习技术被广泛应用。在此背景下&#xff0c;开发一个基于Python Django的房价数据分析平台具有重要…