MapReduce 读写数据库

MapReduce 读写数据库

经常听到小伙伴吐槽 MapReduce 计算的结果无法直接写入数据库,
实际上 MapReduce 是有操作数据库实现的
本案例代码将实现 MapReduce 数据库读写操作和将数据表中数据复制到另外一张数据表中

准备数据表

create database htu;
use htu;
create table word(name varchar(255) comment '单词',count int comment '数量'
);
create table new_word(name varchar(255) comment '单词',count int comment '数量'
);

数据库持久化类

package com.lihaozhe.db;import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;/*** @author 李昊哲* @version 1.0.0* @create 2023/11/7*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Word implements DBWritable {/*** 单词*/private String name;/*** 单词数量*/private int count;@Overridepublic String toString() {return this.name + "\t" + this.count;}@Overridepublic void write(PreparedStatement pst) throws SQLException {pst.setString(1, this.name);pst.setInt(2, this.count);}@Overridepublic void readFields(ResultSet rs) throws SQLException {this.name = rs.getString(1);this.count = rs.getInt(2);}
}

MapReduce 将数据写入数据库

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Write {public static class WordMapper extends Mapper<LongWritable, Text, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Word, NullWritable>.Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");Word word = new Word();word.setName(split[0]);word.setCount(Integer.parseInt(split[1]));context.write(word, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "write db");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(Write.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(WordMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用// job.setCombinerClass(WordCountReduce.class);// 指定当前Job的 Reducer// job.setReducerClass(WordCountReduce.class);// 设置 reduce 数量为 零job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(WordMapper.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(NullWritable.class);// 设置最终输出 key 的数据类型// job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型// job.setOutputValueClass(NullWritable.class);// 定义 map 输入的路径 注意:该路径默认为hdfs路径FileInputFormat.addInputPath(job, new Path("/wordcount/result/part-r-00000"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
//        Path dst = new Path("/video/ods");
//        // 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录
//        DistributedFileSystem dfs = new DistributedFileSystem();
//        String nameService = conf.get("dfs.nameservices");
//        String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;
//        dfs.initialize(URI.create(hdfsRPCUrl), conf);
//        if (dfs.exists(dst)) {
//            dfs.delete(dst, true);
//        }//        FileSystem fs = FileSystem.get(conf);
//        if (fs.exists(dst)) {
//            fs.delete(dst, true);
//        }
//        FileOutputFormat.setOutputPath(job, dst);// 设置输出类job.setOutputFormatClass(DBOutputFormat.class);// 配置将数据写入表DBOutputFormat.setOutput(job, "word", "name", "count");// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapReduce 从数据库读取数据

注意:
由于集群环境 导致 MapTask数量不可控可导致最终输出文件可能不止一个,
可以在代码使用 conf.set(“mapreduce.job.maps”, “1”) 设置 MapTask 数量

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Read {public static class WordMapper extends Mapper<LongWritable, Word, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Word value, Mapper<LongWritable, Word, Word, NullWritable>.Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 设置 MapTask 数量conf.set("mapreduce.job.maps", "1");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "read db");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(Read.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(WordMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用// job.setCombinerClass(WordCountReduce.class);// 指定当前Job的 Reducer// job.setReducerClass(WordCountReduce.class);// 设置 reduce 数量为 零job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(Word.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(NullWritable.class);// 设置最终输出 key 的数据类型// job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型// job.setOutputValueClass(NullWritable.class);// 设置输入类job.setInputFormatClass(DBInputFormat.class);// 配置将数据写入表DBInputFormat.setInput(job, Word.class,"select name,count from word","select count(*) from word");// 定义 map 输入的路径 注意:该路径默认为hdfs路径// FileInputFormat.addInputPath(job, new Path("/wordcount/result/part-r-00000"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径Path dst = new Path("/wordcount/db");// 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录DistributedFileSystem dfs = new DistributedFileSystem();String nameService = conf.get("dfs.nameservices");String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;dfs.initialize(URI.create(hdfsRPCUrl), conf);if (dfs.exists(dst)) {dfs.delete(dst, true);}//        FileSystem fs = FileSystem.get(conf);
//        if (fs.exists(dst)) {
//            fs.delete(dst, true);
//        }FileOutputFormat.setOutputPath(job, dst);// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapReduce 实现数据库表复制

MapReduce 实现将数据库一张数据表的数据复制到另外一张数据表中

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import java.io.IOException;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Copy {public static class RWMapper extends Mapper<LongWritable, Word, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Word value, Mapper<LongWritable, Word, Word, NullWritable>.Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {System.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 设置 MapTask 数量// conf.set("mapreduce.job.maps", "1");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "read db");job.setJarByClass(Read.class);job.setMapperClass(Read.WordMapper.class);job.setNumReduceTasks(0);job.setMapOutputValueClass(Word.class);job.setMapOutputValueClass(NullWritable.class);// 设置输入类job.setInputFormatClass(DBInputFormat.class);// 配置将数据写入表DBInputFormat.setInput(job, Word.class,"select name,count from word","select count(*) from word");// 设置输出类job.setOutputFormatClass(DBOutputFormat.class);// 配置将数据写入表DBOutputFormat.setOutput(job, "new_word", "name", "count");System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/142034.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

android手机平板拓展电脑音频

&#xff08;1&#xff09;首先确保电脑上有声卡&#xff0c;就是电脑右下角小喇叭能调音量&#xff0c;不管电脑会不会响&#xff0c;如果小喇叭标记了个错误&#xff0c;说明没有声卡&#xff0c;安装图上的虚拟声卡软件。 &#xff08;2&#xff09;图上第一个PC免安装及局…

vscode launch.json

有时新的服务器进行调试时&#xff0c;需要设置调试的launch.json的结果 然后就可以打开一个launch.json 其内容如下 {// 使用 IntelliSense 了解相关属性。 // 悬停以查看现有属性的描述。// 欲了解更多信息&#xff0c;请访问: https://go.microsoft.com/fwlink/?linkid83…

JAVA 版小程序商城免费搭建 多商家入驻 直播带货 商城系统 B2B2C 商城源码之 B2B2C产品概述

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框架…

Go语言error错误处理

error Go语言内置错误接口类型&#xff0c;任何类型只要是实现了Error()string方法&#xff0c;都可以传递error接口类型变量。Go语言典型的错误处理方式是将error作为函数最后一个返回值。在调用函数时&#xff0c;通过监测其返回的error值是否为nil来进行错误处理 Go语言标准…

[单片机课程设计报告汇总] 单片机设计报告常用硬件元器件描述

[单片机课程设计必看] 单片机设计报告常用描述 硬件设计 AT89C51最小系统 AT89C51是美国ATMEL公司生产的低电压&#xff0c;高性能CMOS16位单片机&#xff0c;片内含4k bytes的可反复擦写的只读程序存储器和128 bytes的随机存取数据存储器&#xff0c;期间采用ATMEL公司的高…

数据结构—队列的实现

前言&#xff1a;上次我们已经学习了数据结构中一个重要的线性表—栈&#xff0c;那么我们这一次就来学习另外一个重要的线性表—队列。 目录&#xff1a; 一、 队列的概念 二、 队列的实现&#xff1a; 1.队列的创建 三、 队列的操作 1.初始化队列 2.队尾入队列 3.队头出队列…

兼容iphone(ios)圆角(border-radius)不起作用的问题

一、出现场景&#xff1a;使用mosowe-swiper&#xff1a;适用于uni-app的轮播图插件&#xff0c;圆弧无效 ios手机会在transform的时候导致border-radius失效解决方法&#xff1a;在使用动画效果带transform的元素的上一级div元素的css加上下面语句&#xff1a; transform: rot…

创建自己的nas服务,从远端拉取所需文件

一、前言 创建一个nas文件存储&#xff0c;然后需要的时候随时从远端或者其他终端拉取所需文件是不是一件很帅气的工作。 二、准备工作 一台服务器&#xff08;云的更好&#xff09;&#xff0c;没了。 首先安装docker和docker-compose 此处省略docker的安装(改天更新)&…

CSRF和XSS漏洞结合实战案例

文章目录 CSRF和XSS漏洞结合实战案例实验原理实验步骤信息收集构造CSRF和XSS代码xss注入 CSRF和XSS漏洞结合实战案例 实验环境为csm 实验原理 攻击者利用JavaScript可以构造请求的功能在留言面板构造一个存储型xss注入&#xff0c;里面的内容为js请求。请求新添加用户&…

python matlplotlib/seaborn 绘制曲线的平均值标准差阴影图

1. seaborn 旧版本(0.8.1)中使用tsplot&#xff0c;新版本中使用lineplot 直线代表均值&#xff0c;阴影代表meanstd&#xff08;带有置信区间&#xff0c;参数ci&#xff09; import seaborn as sns import matplotlib.pyplot as plt import numpy as np import pandas as p…

11月14日星期二今日早报简报微语报早读

11月14日星期二&#xff0c;农历十月初二&#xff0c;早报微语早读。 1、江西南城县&#xff1a;限时发放购房补贴政策&#xff0c;三孩家庭每平方米最高补贴500元&#xff1b; 2、2023年中国内地电影市场累计票房突破500亿元&#xff1b; 3、市场监管总局&#xff1a;在全国…

Javaweb开发 利用servlet+jsp+jdbc+tomcat数据库实现登录功能

前言&#xff1a;很久没更新了&#xff0c;今天给大家分享一个Java web的小案例&#xff0c;是一个登录页面&#xff0c;利用Login控制类和JDBC连接数据库&#xff0c;并判断用户名密码是否正确&#xff0c;项目最终部署在Tomcat上。 先看效果 正文 一、前期工作 1.首先我们…

ubuntu上安装edge浏览器

1下载edge浏览器 官网下载 edge浏览器的linux版本可在上面的官网中寻找。 我选择的是Linux(.deb)。 2 安装 可在终端的edge安装包所在的路径下输入下面命令安装。 sudo dpkg -i edge安装包的名称.deb3 安装可能存在的问题 1dpkg:依赖关系问题使得edge-stable的配置工作不…

Docker的安装配置与使用

1、docker安装与启动 首先你要保证虚拟机所在的盘要有至少20G的空间&#xff0c;因为docker开容器很吃空间的&#xff0c;其次是已经安装了yum依赖 yum install -y epel-release yum install docker-io # 安装docker配置文件 /etc/sysconfig/docker chkconfig docker on # 加…

uniapp 小程序 身份证 和人脸视频拍摄

使用前提&#xff1a; 已经在微信公众平台的用户隐私协议&#xff0c;已经选择配置“摄像头&#xff0c;录像”等权限 开发背景&#xff1a;客户需要使用带有拍摄边框的摄像头 &#xff0c;微信小程序的方法无法支持&#xff0c;使用camera修改 身份证正反面&#xff1a; <…

介绍美国服务器的优缺点

美国服务器的优点&#xff1a; 网络连接速度快&#xff1a;美国服务器的网络连接速度非常快&#xff0c;可以满足用户对快速访问的需求。 大规模数据存储&#xff1a;美国有很多大型数据中心&#xff0c;可以提供大规模的数据存储服务&#xff0c;方便用户管理海量数据。 支持…

面试:linux相关

文章目录 什么是 LinuxUnix 和 Linux 有什么区别&#xff1f;什么是 Linux 内核&#xff1f;Linux 的基本组件是什么&#xff1f;Linux 的体系结构BASH 和 DOS 之间的基本区别是什么&#xff1f;Linux 开机启动过程&#xff1f;Linux 系统缺省的运行级别&#xff1f;Linux 使用…

【unity】网格描边方法

【unity】网格描边方法 介绍对模型四边网格的三种描边方法&#xff1a;包括纯Shader方法、创建网格方法和后处理方法。于增强场景中3D模型的轮廓&#xff0c;使其在视觉上更加突出和清晰。这种效果可以用于增强三维场景中的物体、角色或环境&#xff0c;使其在视觉上更加吸引人…

网易有道开源语音合成引擎“易魔声”

概述 11 月 10 日&#xff0c;网易有道正式上线“易魔声”开源语音合成&#xff08;TTS&#xff09;引擎&#xff0c;所有用户可免费在开源社区 GitHub 进行下载使用&#xff0c;通过其提供的 web 界面及批量生成结果的脚本接口&#xff0c;轻松实现音色的情感合成与应用。 据…

前端---CSS的盒模型

文章目录 什么是盒模型&#xff1f;设置边框设置内边距设置外边距块级元素水平居中 什么是盒模型&#xff1f; 页面上的每个HTML元素都是一个一个的“盒子”&#xff0c;这些盒子由&#xff1a;内容、内边距、边框、外边距组成。 我们可以和住的房子联系起来&#xff0c;更好…