使用spark进行hbase的bulkload

使用spark进行hbase的bulkload

一、 背景

HBase 是一个面向列,schemaless,高吞吐,高可靠可水平扩展的 NoSQL 数据库,用户可以通过 HBase client 提供的 put get 等 api 实现在数据的实时读写。在过去的几年里,HBase 有了长足的发展,它在越来越多的公司里扮演者越来越重要的角色。
HBase 擅长于海量数据的实时读取,原生 HBase 没有二级索引,复杂查询场景支持的不好。同时因为 split,磁盘,网络抖动,Java GC 等多方面的因素会影响其 RT 表现,所以通常我们在使用HBase的同时也会使用其他的存储中间件,比如 ES,Reids,Mysql 等等。避免 HBase 成为信息孤岛,我们需要数据导入导出的工具在这些中间件之间做数据迁移,而最常用的莫过于阿里开源的 DataX。Datax从 其他数据源迁移数据到 HBase 实际上是走的 HBase 原生 api 接口,在少量数据的情况下没有问题,但当我们需要从 Hive 里,或者其他异构存储里批量导入几亿,几十亿的数据,那么用 DataX 这里就显得不那么适合,因为走原生接口为了避免影响生产集群的稳定性一定要做好限流,那么海量数据的迁移就很很慢,同时数据的持续写入会因为 flush,compaction 等机制占用较多的系统资源。为了解决批量导入的场景,Bulkload 应运而生。

二、HBase Bulkload
在大量数据需要写入HBase时,通常有 put方式和bulkLoad 两种方式。

1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息 写入WAL ,
在写入到WAL后, 数据就会被放到MemStore中 ,当MemStore满后数据就会被 flush到磁盘
(即形成HFile文件) ,在这种写操作过程会涉及到flush、split、compaction等操作,容易造
成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统
性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。


2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量
生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。

  • Extract,异构数据源数据导入到 HDFS 之上。
  • Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。
  • Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。

 三、实践

hive表


 

 hbase表

 依赖

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

<properties>

        <maven.compiler.source>1.8</maven.compiler.source>

        <maven.compiler.target>1.8</maven.compiler.target>

        <encoding>UTF-8</encoding>

        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>

        <log4j.version>1.7.30</log4j.version>

        <zk.version>3.4.5-cdh5.16.2</zk.version>

        <scala.version>2.12.10</scala.version>

        <scala.tools.version>2.12</scala.tools.version>

        <spark.version>3.2.0</spark.version>

        <hbase.version>1.2.0-cdh5.16.2</hbase.version>

        <config.version>1.4.0</config.version>

    </properties>

     

    <repositories>

        <repository>

            <id>nexus-aliyun</id>

            <url>http://maven.aliyun.com/nexus/content/groups/public</url>

        </repository>

        <repository>

            <id>cloudera</id>

            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

        </repository>

    </repositories>

    <dependencies>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>${hadoop.version}</version>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-log4j12</artifactId>

            <version>${log4j.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.zookeeper</groupId>

            <artifactId>zookeeper</artifactId>

            <version>${zk.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-core_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-sql_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

         

    </dependencies>

spark 代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

package com.jojo

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}

import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

 * Description:Hbase批量加载   同一列族多列

 */

object HbaseBulkLoadApp {

  val zookeeperQuorum = "cdh01,cdh02,cdh03"//zookeeper信息

  val dataSourcePath = "hdfs://cdh03:8020/user/hive/warehouse/sample_07" //源文件

  val hFilePath = "hdfs://cdh03:8020/tmp/result"//hfile的存储路径

  val hdfsRootPath = "hdfs://cdh03:8020/"//根路径

  val tableName = "sample_07"//表名

  val familyName = "basic"//列族

  val arr = Array("code","description""total_emp","salary")//列的名字集合

  def main(args: Array[String]): Unit = {

    //获取content

    val sparkConf = new SparkConf()

      .setAppName(s"${this.getClass.getSimpleName}")

      .setMaster("local")

      //指定序列化格式,默认是java序列化

      .set("spark.serializer""org.apache.spark.serializer.KryoSerializer")

      //告知哪些类型需要序列化

      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

    val sc = new SparkContext(sparkConf)

    //hadoop配置

    val hadoopConf = new Configuration()

    hadoopConf.set("fs.defaultFS", hdfsRootPath)

    //获取输出路径

    val fileSystem = FileSystem.get(hadoopConf)

    //获取hbase配置

    val hconf = HBaseConfiguration.create()

    //设置zookeeper集群

    hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)

    //设置端口

    hconf.set("hbase.zookeeper.property.clientPort""2181");

    //设置hfile最大个数

    hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")

    //设置hfile的大小

    hconf.set("hbase.hregion.max.filesize","10737418240")

    hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    //获取hbase连接

    val hbaseConn = ConnectionFactory.createConnection(hconf)

    val admin = hbaseConn.getAdmin

    /**

     * 保存生成的HFile文件

     * 注:bulk load  生成的HFile文件需要落地

     * 然后再通过LoadIncrementalHFiles类load进Hbase

     * 此处关于  sortBy 操作详解:

     * 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序,

     * 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因,

     * 这就要求我们在插入数据的时候,要插在rowkey该在的位置。

     * 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序

     * 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行

     * 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true)

     *

     * @param hfileRDD

     */

    // 0. 准备程序运行的环境

    // 如果 HBase 表不存在,就创建一个新表

    if (!admin.tableExists(TableName.valueOf(tableName))) {

      val desc = new HTableDescriptor(TableName.valueOf(tableName))

      val hcd = new HColumnDescriptor(familyName)

      desc.addFamily(hcd)

      admin.createTable(desc)

      print("创建了一个新表")

    }

    // 如果存放 HFile文件的路径已经存在,就删除掉

    if(fileSystem.exists(new Path(hFilePath))) {

      fileSystem.delete(new Path(hFilePath), true)

      print("删除hdfs上存在的路径")

    }

    // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:

    // java.io.IOException: Added a key not lexically larger than previous.

    val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath)

      .map(row => {

        // 处理数据的逻辑

        val arrs = row.split("\t")

        var kvlist: Seq[KeyValue] = List()//存储多个列

        var rowkey: Array[Byte] = null

        var cn: Array[Byte] = null

        var v: Array[Byte] = null

        var kv: KeyValue = null

        val cf = familyName.getBytes //列族

        rowkey = Bytes.toBytes(arrs(0)) //key

        for (i <- 1 to (arrs.length - 1)) {

          cn = arr(i).getBytes() //列的名称

          v = Bytes.toBytes(arrs(i)) //列的值

          //将rdd转换成HFile需要的格式,上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key

          kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value

          kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)

        }

        (new ImmutableBytesWritable(rowkey), kvlist)

      })

    val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data

      .flatMapValues(_.iterator)

    // 2. Save Hfiles on HDFS

    val table = hbaseConn.getTable(TableName.valueOf(tableName))

    val job = Job.getInstance(hconf)

    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

    job.setMapOutputValueClass(classOf[KeyValue])

    HFileOutputFormat2.configureIncrementalLoadMap(job, table)

    hfileRDD

      .sortBy(x => (x._1, x._2.getKeyString), true//要保持 整体有序

      .saveAsNewAPIHadoopFile(hFilePath,

        classOf[ImmutableBytesWritable],

        classOf[KeyValue],

        classOf[HFileOutputFormat2],

        hconf)

    print("成功生成HFILE")

    val bulkLoader = new LoadIncrementalHFiles(hconf)

    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))

    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

    hbaseConn.close()

    sc.stop()

  }

}

 其中可能遇到的问题:

1

EndOfStreamException: Unable to read additional data from server sessionid 0x17f44ca01833e45, likely server has closed socket

 解决:

  主要是zk的版本不匹配,在依赖选择匹配的zk版本。

输出结果

https://www.cnblogs.com/huangguoming/articles/12967868.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/6579.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTTP 请求走私漏洞(HTTP Request Smuggling)

一、什么是Http 请求走私漏洞&#xff1f; HTTP请求走私漏洞&#xff08;HTTP Request Smuggling&#xff09;是一种安全漏洞&#xff0c;利用了HTTP协议中请求和响应的解析和处理方式的不一致性。攻击者通过构造特定的恶意请求&#xff0c;以欺骗服务器和代理服务器&#xff0…

Godot 4 源码分析 - 增加管道通信

学习研究Godot 4&#xff0c;很爽&#xff0c;虽然很庞杂&#xff0c;但相对于自己的水平来说&#xff0c;很强大&#xff0c;尤其是vulkan这块直接打包可用&#xff0c;省得自己从头琢磨。 一点一点地消化、优化与完善&#xff0c;最终才能成为自己的。 这段时间就在Godot的…

Pytorch迁移学习使用Resnet50进行模型训练预测猫狗二分类

目录 1.ResNet残差网络 1.1 ResNet定义 1.2 ResNet 几种网络配置 1.3 ResNet50网络结构 1.3.1 前几层卷积和池化 1.3.2 残差块&#xff1a;构建深度残差网络 1.3.3 ResNet主体&#xff1a;堆叠多个残差块 1.4 迁移学习猫狗二分类实战 1.4.1 迁移学习 1.4.2 模型训练 1.…

华为数通HCIP-ISIS基础

IS-IS的基本概念 isis&#xff08;中间系统到中间路由协议&#xff09; 链路状态路由协议、IGP、无类路由协议&#xff1b; IS-IS是一种链路状态路由协议&#xff0c;IS-IS与OSPF在许多方面非常相似:运行IS-IS协议的直连设备之间通过发送Hello报文发现彼此&#xff0c;然后建…

从零开始搭建vue3 + ts + pinia + vite +element-plus项目

前言&#xff1a;据说vue2将于 2023 年 12 月 31 日停止维护&#xff0c;最近打算搭建一个vue3项目来学习一下&#xff0c;以防忘记&#xff0c;记录一下搭建过程。 一、使用npm创建项目 前提条件&#xff1a;已安装 16.0 或更高版本的 Node.js 执行 “npm init vuelatest”…

【Java基础教程】(四十三)多线程篇 · 下:深入剖析Java多线程编程:同步、死锁及经典案例——生产者与消费者,探究sleep()与wait()的差异

Java基础教程之多线程 下 &#x1f539;本节学习目标1️⃣ 线程的同步与死锁1.1 同步问题的引出2.2 synchronized 同步操作2.3 死锁 2️⃣ 多线程经典案例——生产者与消费者&#x1f50d;分析sleep()和wait()的区别&#xff1f; &#x1f33e; 总结 &#x1f539;本节学习目标…

谷歌插件(Chrome扩展) “Service Worker (无效)” 解决方法

问题描述&#xff1a; 写 background 文件的时候报错了&#xff0c;说 Service Worker 设置的 background 无效。 解决&#xff08;检查&#xff09;方法&#xff1a; 检查配置文件&#xff08;manifest.json&#xff09; 中的 manifest_version 是否为 3。 background 中的…

如何动态修改 spring aop 切面信息?让自动日志输出框架更好用

业务背景 很久以前开源了一款 auto-log 自动日志打印框架。 其中对于 spring 项目&#xff0c;默认实现了基于 aop 切面的日志输出。 但是发现一个问题&#xff0c;如果切面定义为全切范围过大&#xff0c;于是 v0.2 版本就是基于注解 AutoLog 实现的。 只有指定注解的类或…

DataWhale AI夏令营——机器学习

DataWhale AI夏令营——机器学习 学习记录一1. 异常值分析2. 单变量箱线图可视化3. 特征重要性分析 学习记录一 锂电池电池生产参数调控及生产温度预测挑战赛 已配置环境&#xff0c;跑通baseline&#xff0c;并在此基础上对数据进行了简单的分析。 1. 异常值分析 对训练集…

K8S初级入门系列之八-网络

一、前言 本章节我们将了解K8S的相关网络概念&#xff0c;包括K8S的网络通讯原理&#xff0c;以及Service以及相关的概念&#xff0c;包括Endpoint&#xff0c;EndpointSlice&#xff0c;Headless service&#xff0c;Ingress等。 二、网络通讯原理和实现 同一K8S集群&…

PMP 数据收集工具与技术

数据收集工具与技术 (9个) 标杆对照 标杆对照是指将实际或计划的产品、流程和实践与其他可比组织的 做法进行比较&#xff0c;以便识别最佳实践、形成改进意见&#xff0c;并为绩效考核 提供依据。 头脑风暴 头脑风暴是一种数据收集和创意技术&#xff0c;主要用于在短时间…

三维点云中的坐标变换(只讲关键部分)

一、坐标旋转 坐标旋转包含绕x、y、z轴旋转&#xff0c;在右手坐标系中&#xff0c;x-翻滚(roll)&#xff0c;y-俯仰(pitch)&#xff0c;z-航向(yaw)。如果想详细了解&#xff0c;可以网络搜索 在PCL中&#xff0c;从baseLink到map的转换关系为:先绕x轴旋转,在绕y轴旋转,最后绕…

【软件工程中的各种图】

1、用例图&#xff08;use case diagrams&#xff09; 【概念】描述用户需求&#xff0c;从用户的角度描述系统的功能 【描述方式】椭圆表示某个用例&#xff1b;人形符号表示角色 【目的】帮组开发团队以一种可视化的方式理解系统的功能需求 【用例图】 2、静态图(Static …

【数据结构】C--单链表(小白入门基础知识)

前段时间写了一篇关于顺序表的博客&#xff0c;http://t.csdn.cn/0gCRp 顺序表在某些时候存在着一些不可避免的缺点: 问题&#xff1a; 1. 中间 / 头部的插入删除&#xff0c;时间复杂度为 O(N) 2. 增容需要申请新空间&#xff0c;拷贝数据&#xff0c;释放旧空间。会有不…

前端 | ( 十一)CSS3简介及基本语法(上) | 尚硅谷前端html+css零基础教程2023最新

学习来源&#xff1a;尚硅谷前端htmlcss零基础教程&#xff0c;2023最新前端开发html5css3视频 系列笔记&#xff1a; 【HTML4】&#xff08;一&#xff09;前端简介【HTML4】&#xff08;二&#xff09;各种各样的常用标签【HTML4】&#xff08;三&#xff09;表单及HTML4收尾…

2023/07/23

1. 必须等待所有请求结束后才能执行后续操作的处理方式 方式一&#xff1a; async func () {const p1 await api1();const p2 await api2();const p3 await api3();Promise.all([p1, p2, p3]).then(res > {后续操作...}) }方式二&#xff1a;待补充 2. flex 弹性盒子布…

FPGA实现串口回环

文章目录 前言一、串行通信1、分类1、同步串行通信2、异步串行通信 2、UART串口通信1、UART通信原理2、串口通信时序图 二、系统设计1、系统框图2.RTL视图 三、源码1、串口发送模块2、接收模块3、串口回环模块4、顶层模块 四、测试效果五、总结六、参考资料 前言 环境&#xff…

【计算机视觉 | 目标检测】arxiv 计算机视觉关于目标检测的学术速递(7 月 21 日论文合集)

文章目录 一、检测相关(15篇)1.1 Representation Learning in Anomaly Detection: Successes, Limits and a Grand Challenge1.2 AlignDet: Aligning Pre-training and Fine-tuning in Object Detection1.3 Cascade-DETR: Delving into High-Quality Universal Object Detectio…

《Docker与持续集成/持续部署:构建高效交付流程,打造敏捷软件交付链》

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

c语言修炼之指针和数组笔试题解析(1.2)

前言&#xff1a; 书接上回&#xff0c;让我们继续开始今天的学习叭&#xff01;废话不多说&#xff0c;还是字符数组的内容上代码&#xff01; char *p是字符指针&#xff0c;*表示p是个指针&#xff0c;char表示p指向的对象类型是char型&#xff01; char*p"abcdef&q…