Scala 练习一 将Mysql表数据导入HBase

Scala 练习一 将Mysql表数据导入HBase

续第一篇:Java代码将Mysql表数据导入HBase表

源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase

一、整体介绍

在这里插入图片描述

  1. HBase特质

    连接HBase, 创建HBase执行对象

    1. 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
      Configuration conf = HBaseConfiguration.create()
      conf.set(String, String)
    2. 创建连接:多个连接(池化)
      Connection con = ConnectionFactory.createConnection()
    3. 创建数据表:表名: String
      Table table = con.getTable(TableName)
    def build(): HBase		// 初始化配置信息
    def initPool(): HBase	// 初始化连接池
    def finish(): Executor	// 完成 返回执行对象
    
  2. Executor特质

    对HBase进行操作的方法: 包含如下函数

    def exists(tableName: String): Boolean	// 验证数据表是否存在
    def create(tableName: String, columnFamilies: Seq[String]): Boolean	// 创建数据表
    def drop(tableName: String): Boolean	// 删除数据表
    def put(tableName: String, data: util.List[Put]): Boolean	// 批量插入数据
    
  3. Jdbc 封装

    Jdbc封装

    1. 初始化连接
      driver : com.mysql.cj.jdbc.Driver
      参数:url, username, password
      创建连接
    2. 初始化执行器
      sql, parameters
      创建执行器【初始化参数】
    3. 执行操作并返回【结果】
      DML: 返回影响数据库表行数
      DQL: 返回查询的数据集合
      EX: 出现异常结果
  4. MyHBase用于实现HBaseExecutor特质

  5. 测试数据格式

    mysql表

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;DROP TABLE IF EXISTS `test_table_for_hbase`;
    CREATE TABLE `test_table_for_hbase`  (`test_id` int NULL DEFAULT NULL,`test_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_age` int NULL DEFAULT NULL,`test_gender` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_phone` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;INSERT INTO `test_table_for_hbase` VALUES (1, 'testName1', 26, 'male', '18011111112');
    INSERT INTO `test_table_for_hbase` VALUES (2, 'testName2', 25, 'female', '18011111113');
    INSERT INTO `test_table_for_hbase` VALUES (3, 'testName3', 27, 'male', '18011111114');
    INSERT INTO `test_table_for_hbase` VALUES (4, 'testName4', 35, 'male', '18011111115');
    -- .... 省略以下数据部分
    

    hbase表

    # 创建表  库名:表名, 列族1, 列族2
    create "hbase_test:tranfer_from_mysql","baseInfo","scoreInfo"	
    truncate 'hbase_test:tranfer_from_mysql'  # 清空hbase_test命名空间下的tranfer_from_mysql表
    scan 'hbase_test:tranfer_from_mysql'	  # 查看表
    

二、依赖

<dependencies><!-- HBase 驱动 --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.5</version></dependency><!-- Hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>3.1.3</version></dependency><!-- mysql --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependency><!-- zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.3</version></dependency>
</dependencies>

三、测试结果

终端有个日志的小警告(无伤大雅hh),输出为 true
在这里插入图片描述

查看hbase表,发现数据正常导入

在这里插入图片描述

四、源码

scala代码较简单这里直接上源码了,去除了部分注释,更多请去仓库下载

Executor

package hbase
import org.apache.hadoop.hbase.client.Put
import java.util
trait Executor {def exists(tableName: String): Booleandef create(tableName: String, columnFamilies: Seq[String]): Booleandef drop(tableName: String): Booleandef put(tableName: String, data: util.List[Put]): Boolean
}

HBase

package hbase
import org.apache.hadoop.hbase.client.Connection
trait HBase {protected var statusCode: Int = -1def build(): HBasecase class PoolCon(var available: Boolean, con: Connection) {def out = {available = falsethis}def in = available = true}def initPool(): HBasedef finish(): Executor
}

MyHBase

package hbase.implimport hbase.{Executor, HBase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder}
import org.apache.hadoop.hbase.exceptions.HBaseException
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import java.util
import scala.collection.mutable.ArrayBufferclass MyHBase (conf: Map[String, String])(pooled: Boolean = false, poolSize: Int = 3) extends HBase{private lazy val config: Configuration = HBaseConfiguration.create()private lazy val pool: ArrayBuffer[PoolCon] = ArrayBuffer()override def build(): HBase = {if(statusCode == -1){conf.foreach(t => config.set(t._1, t._2))statusCode = 0this}else{throw new HBaseException("build() function must be invoked first")}}override def initPool(): HBase = {if(statusCode == 0){val POOL_SIZE = if (pooled) {if (poolSize <= 0) 3 else poolSize} else 1for (i <- 1 to POOL_SIZE) {pool.append(PoolCon(available = true, ConnectionFactory.createConnection(config)))}statusCode = 1this}else{throw new HBaseException("initPool() function must be invoked only after build()")}}override def finish(): Executor = {if (statusCode == 1) {statusCode = 2new Executor {override def exists(tableName: String): Boolean = {var pc: PoolCon = nulltry{pc = getConval exists = pc.con.getAdmin.tableExists(TableName.valueOf(tableName))pc.inexists}catch {case e: Exception => e.printStackTrace()false}finally {close(pc)}}override def create(tableName: String, columnFamilies: Seq[String]): Boolean = {if (exists(tableName)) {return false}var pc: PoolCon = nulltry {pc = getConval builder: TableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))columnFamilies.foreach(cf => builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)))pc.con.getAdmin.createTable(builder.build())true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def drop(tableName: String): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getAdmin.deleteTable(TableName.valueOf(tableName))true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def put(tableName: String, data: util.List[Put]): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getTable(TableName.valueOf(tableName)).put(data)true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}}}else {throw new HBaseException("finish() function must be invoked only after initPool()")}}private def getCon = {val left: ArrayBuffer[PoolCon] = pool.filter(_.available)if (left.isEmpty) {throw new HBaseException("no available connection")}left.apply(0).out}private def close(con: PoolCon) = {if (null != con) {con.in}}
}object MyHBase{def apply(conf: Map[String, String])(poolSize: Int): MyHBase = new MyHBase(conf)(true, poolSize)
}

Jdbc

package mysql
import java.sql.{Connection, DriverManager, ResultSet, SQLException}
import java.util
object Jdbc {object Result extends Enumeration {val EX = Value(0) val DML = Value(1) val DQL = Value(2) }// 3种结果(异常,DML,DQL)封装case class ResThree(rst: Result.Value) {def to[T <: ResThree]: T = this.asInstanceOf[T]}class Ex(throwable: Throwable) extends ResThree(Result.EX)object Ex {def apply(throwable: Throwable): Ex = new Ex(throwable)}class Dml(affectedRows: Int) extends ResThree(Result.DML) {def update = affectedRows}object Dml {def apply(affectedRows: Int): Dml = new Dml(affectedRows)}class Dql(set: ResultSet) extends ResThree(Result.DQL) {def generate[T](f: ResultSet => T) = {val list: util.List[T] = new util.ArrayList()while (set.next()) {list.add(f(set))}list}}object Dql {def apply(set: ResultSet): Dql = new Dql(set)}// JDBC 函数封装def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] = null): ResThree = {def con() = {// 1.1 显式加载 JDBC 驱动程序(只需要一次)Class.forName("com.mysql.cj.jdbc.Driver")// 1.2 创建连接对象DriverManager.getConnection(url, user, password)}def pst(con: Connection) = {// 2.1 创建执行对象val pst = con.prepareStatement(sql)// 2.2 初始化 SQL 参数if (null != params && params.nonEmpty) {params.zipWithIndex.foreach(t => pst.setObject(t._2 + 1, t._1))}pst}try {val connect = con()val prepared = pst(connect)sql match {case sql if sql.matches("^(insert|INSERT|delete|DELETE|update|UPDATE) .*")=> Dml(prepared.executeUpdate())case sql if sql.matches("^(select|SELECT) .*")=> Dql(prepared.executeQuery())case _ => Ex(new SQLException(s"illegal sql command : $sql"))}} catch {case e: Exception => Ex(e)}}}

Test

import hbase.impl.MyHBase
import mysql.Jdbc._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import java.utilobject Test {def main(args: Array[String]): Unit = {// 初始化MySQL JDBC操作函数val jdbcOpr: (String, Seq[Any]) => ResThree = jdbc(user = "root",url = "jdbc:mysql://1.94.52.166:3306/test_db_for_bigdata",password = "123456")// 执行SQL查询,并将结果封装在ResThree对象中val toEntity: ResThree = jdbcOpr("select * from test_table_for_hbase where test_id between ? and ?",Seq(2, 4))// 判断ResThree对象中的结果是否为异常if (toEntity.rst == Result.EX) {// 如果异常,执行异常结果处理toEntity.to[Ex]println("出现异常结果处理")} else {// 如果没有异常,将查询结果转换为HBase的Put对象列表val puts: util.List[Put] = toEntity.to[Dql].generate(rst => {// 创建一个Put对象,表示HBase中的一行val put = new Put(Bytes.toBytes(rst.getInt("test_id")), // row key设置为test_idSystem.currentTimeMillis() // 设置时间戳)// 向Put对象中添加列值// baseInfo是列族名,test_name、test_age、test_gender、test_phone是列名put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_name"),Bytes.toBytes(rst.getString("test_name")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_age"),Bytes.toBytes(rst.getString("test_age")) // 注意:这里假设test_age是字符串类型,但通常应为整数类型)put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_gender"),Bytes.toBytes(rst.getString("test_gender")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_phone"),Bytes.toBytes(rst.getString("test_phone")))// 返回构建好的Put对象put})// 如果有数据需要插入HBaseif (puts.size() > 0) {// 初始化HBase连接池并执行Put操作val exe = MyHBase(Map("hbase.zookeeper.quorum" -> "single01:2181"))(1).build().initPool().finish()// 执行Put操作,并返回是否成功val bool = exe.put("hbase_test:tranfer_from_mysql", puts)// 打印操作结果println(bool)} else {// 如果没有数据需要插入println("查无数据")}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/22087.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM学习(28)NXP 双coreMCU IMX1160学习

笔者最近接触到一块IMXRT1160的双core板子&#xff0c;特依次来记录学习一下 1、IMXRT1160 板子介绍 介绍一下NXP的Demo板子&#xff0c;是一个双core的板子&#xff0c;Cortex-M7和Cortex-M4&#xff0c;总计1MB的RAM空间&#xff0c;256KB的ROM空间&#xff0c;提供了丰富的…

Window10磁盘的分盘和合并

注意&#xff1a; 当我们c盘不够大需要扩大磁盘空间时&#xff0c;当c盘后面没有未划分的磁盘时候&#xff0c;我们是无法进行扩充c盘的&#xff0c;此时&#xff0c;我们可以先删除后面一个磁盘&#xff0c;再进行扩大。 如下&#xff1a;c盘后没有未分配的空间&#xff0c;…

【项目管理常见问题大揭秘】每个管理者都要Get的「五维思维」~

走上管理岗☸要懂得五维思维 &#x1f4bc;自我管理——做自己的CEO 严于律己&#xff1a;严格要求自己&#xff0c;注重个人品牌建设 宽以待人&#xff1a;接纳不同观点&#xff0c;提升团队凝聚力 尊重事实&#xff1a;鼓励团队成员发挥优势&#xff0c;避免负面评价 坚守诚…

求助:西莫电子期刊 交流互助 传递

点击上方 “机械电气电机杂谈 ” → 点击右上角“...” → 点选“设为星标 ★”&#xff0c;为加上机械电气电机杂谈星标&#xff0c;以后找夏老师就方便啦&#xff01;你的星标就是我更新动力&#xff0c;星标越多&#xff0c;更新越快&#xff0c;干货越多&#xff01; 关注…

Java面经——SpringCloud微服务

SpringCloud SpringCloud的五大组件 注册中心网关远程调用负载均衡熔断降级 谈谈你对SpringCloud的理解 SpringCloud是为了解决微服务架构中出现的一系列服务治理难题的而提出的一套规范&#xff0c;统一了标准。降低了微服务架构的开发难度。有了 Spring Cloud 这样的技术生…

【android 9】【input】【8.发送按键事件2——InputDispatcher线程】

系列文章目录 本人系列文章-CSDN博客 目录 系列文章目录 1.简介 1.1流程介绍 1.2 时序图 2.普通按键消息发送部分源码分析&#xff08;按键按下事件&#xff09; 2.1 开机后分发线程阻塞的地方 2.2 InputDispatcher::dispatchOnceInnerLocked 2.3 InputDispatcher::disp…

使用C语言实现贪吃蛇(超详细)

目录 实现贪吃蛇我们要知道哪些&#xff1f; Easyx图形库 Easyx的安装 游戏思路 游戏实现 头文件的使用 ​编辑和食物以及控制方向的初始化 对于坐标的实现&#xff1a; 食物颜色的实现&#xff1a; 游戏数据的初始化 加载音乐 图形窗口的设置&#xff1a; 蛇身节数…

【动手学深度学习】多层感知机之暂退法问题研究详情

目录 &#x1f30a;问题研究1 &#x1f31e;问题研究2 &#x1f332;问题研究3 &#x1f30d;问题研究4 &#x1f333;问题研究5 &#x1f30c;问题研究6 &#x1f30a;问题研究1 如果更改第一层和第二层的暂退法概率&#xff0c;会发生什么情况&#xff1f;具体地说&am…

深入理解指针(4)--新手小白都能明白的指针解析

深入理解指针(4)–新手小白都能明白的指针解析 文章目录 深入理解指针(4)--新手小白都能明白的指针解析1. 回调函数2. qsort使用举例2.1 冒泡排序2.2 qsort函数介绍2.3 用冒泡排序实现qsort 结语 1. 回调函数 回调函数就是⼀个通过函数指针调用的函数 如果我们把函数的指针&a…

玩转微服务-GateWay

目录 一. 背景二. API网关1. 概念2. API网关定义3. API网关的四大职能4. API网关分类5. 开源API网关介绍6. 开源网关的选择 三. Spring Cloud Gateway1. 文档地址2. 三个核心概念3. 工作流程4. 运行原理4.1 路由原理4.2 RouteLocator 5. Predicate 断言6. 过滤器 Filter6.1. 过…

[图解]建模相关的基础知识-02

1 00:00:01,530 --> 00:00:05,200 第2个概念&#xff0c;谓词&#xff0c;Predicate 2 00:00:07,530 --> 00:00:10,800 或者叫断言&#xff0c;翻译各种各样都有 3 00:00:12,830 --> 00:00:15,050 实际上就是前面命题 4 00:00:15,060 --> 00:00:16,610 相当于常…

记录Nuxt 3 官网项目的一次部署

本来以为就是一次简单的部署&#xff0c;之前也是部署过几次nuxt项目了&#xff0c;所以&#xff0c;并没有要记录的想法。但是过程出现了很多问题&#xff0c;最后考虑还是写下来吧。留个记录&#xff08;完整的配置部署过程&#xff09; 这里我将要说明两种部署方式以供选择&…

开源网安软件安全国产化替代解决方案亮相2024澳门万讯论坛

近日&#xff0c;2024万讯论坛在澳门成功举办。本次论坛由万讯电脑科技主办&#xff0c;旨在引进国内尖端科技厂商&#xff0c;提供全方位的信创解决方案&#xff0c;分享信创化过程中所面临的挑战及阶段性转换经验。开源网安作为拥有软件安全领域全链条产品的厂商&#xff0c;…

Analytical Model(分析模型)和Compact model(紧凑模型)有什么不同

Analytical Model&#xff08;分析模型&#xff09; 和 Compact Model&#xff08;紧凑模型&#xff09; 在电子工程和半导体物理领域有着不同的应用和特点&#xff1a; Analytical Model&#xff08;分析模型&#xff09;: 理论基础&#xff1a;分析模型基于物理原理和数学公…

jeecg dictText字典值

前端列表的字典值回显&#xff0c;配置了数据字典后&#xff0c;在本地测试可以回显中文的数据&#xff0c; 但在线上服务器不能正常回显出来&#xff1b; 原因是在前端拿到records的列表值时可以拿到dictText的字典&#xff0c;但是线上服务器没有dictText的值&#xff1b; …

聚焦 Navicat 17 新特性 | 模型设计优化与创新

随着 Navicat 17 的正式发布&#xff0c;受到了广泛的关注和讨论。Navicat 产品力又一次大跃迁。新引入的特性显著增强了用户的数据库管理和数据分析体验&#xff0c;包括&#xff1a;模型设计与同步、数据字典、数据分析&#xff08;data profiling&#xff09;、用户体验、查…

共享门店模式:快速打造连锁实体店

在数字化浪潮的冲击下&#xff0c;许多线下实体店正面临前所未有的挑战。然而&#xff0c;在这个变革的时代&#xff0c;共享门店模式&#xff0c;也被称为“共享股东”&#xff0c;正以其独特的魅力&#xff0c;为实体店带来新的生机。 一、共享门店模式的崭新定义 共享门店…

​水经微图Web版1.8.0发布

让每一个人都有自己的地图&#xff01; 水经微图&#xff08;简称“微图”&#xff09;新版已上线&#xff0c;在该版本中主要新增了注册登录功能&#xff0c;线与面图层新增矩形、圆或军标等绘制功能&#xff0c;以及其它功能的优化。 现在&#xff0c;为你分享一下本轮迭代…

PostgreSQL调优工具:PGTune

PostgreSQL调优工具&#xff1a;PGTune 1&#xff0c;PGTune网址 https://pgtune.leopard.in.ua/#/ 参数解释&#xff1a; DB Version&#xff1a;数据库版本 OS Type&#xff1a;操作系统 DB Type&#xff1a;数据库类型&#xff0c;一般默认即可 Total Memory (RAM)&#x…

巨详细Linux安装MySQL

巨详细Linux安装MySQL 1、查看是否有自带数据库或残留数据库信息1.1检查残留mysql1.2检查并删除残留mysql依赖1.3检查是否自带mariadb库 2、下载所需MySQL版本&#xff0c;上传至系统指定位置2.1创建目录2.2下载MySQL压缩包 3、安装MySQL3.1创建目录3.2解压mysql压缩包3.3安装解…