Spark实战之读写HBase

1 配置

1.1 开发环境:

  • HBase:hbase-1.0.0-cdh5.4.5.tar.gz
  • Hadoop:hadoop-2.6.0-cdh5.4.5.tar.gz
  • ZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gz
  • Spark:spark-2.1.0-bin-hadoop2.6

1.2 Spark的配置

  • Jar包:需要HBase的Jar如下(经过测试,正常运行,但是是否存在冗余的Jar并未证实,若发现多余的jar可自行进行删除)

jars

  • spark-env.sh
    添加以下配置:export SPARK_CLASSPATH=/home/hadoop/data/lib1/*
    注:如果使用spark-shell的yarn模式进行测试的话,那么最好每个NodeManager节点都有配置jars和hbase-site.xml
  • spark-default.sh
spark.yarn.historyServer.address=slave11:18080
spark.history.ui.port=18080
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///tmp/spark/events
spark.history.fs.logDirectory=hdfs:///tmp/spark/events
spark.driver.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer

1.3 数据

1)格式: barCode@item@value@standardValue@upperLimit@lowerLimit

01055HAXMTXG10100001@KEY_VOLTAGE_TEC_PWR@1.60@1.62@1.75@1.55
01055HAXMTXG10100001@KEY_VOLTAGE_T_C_PWR@1.22@1.24@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_BC_PWR@1.16@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_11@1.32@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_RC_PWR@1.24@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_VCC_5V@1.93@1.90@1.95@1.65
01055HAXMTXG10100001@KEY_VOLTAGE_T_VDD3V3@1.59@1.62@1.75@1.55

2 代码演示

2.1 准备动作

1)既然是与HBase相关,那么首先需要使用hbase shell来创建一个表

创建表格:create ‘data’,’v’,create ‘data1’,’v’

2)使用spark-shell进行操作,命令如下:

bin/spark-shell --master yarn --deploy-mode client --num-executors 5 --executor-memory 1g --executor-cores 2

代码演示环境

3)import 各种类

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64,Bytes}
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.commons.codec.digest.DigestUtils

2.2 代码实战

创建conf和table

val conf= HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,"data1")
val table = new HTable(conf,"data1")

2.2.1 数据写入

格式:

val put = new Put(Bytes.toBytes("rowKey"))
put.add("cf","q","value")

使用for来插入5条数据

for(i <- 1 to 5){ var put= new Put(Bytes.toBytes("row"+i));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes("value"+i));table.put(put)}

到hbase shell中查看结果

hbase_data1表中的数据

2.2.2 数据读取

val hbaseRdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

1)take

hbaseRdd take 1

take_result

2)scan

var scan = new Scan();
scan.addFamily(Bytes.toBytes(“v”));
var proto = ProtobufUtil.toScan(scan)
var scanToString = Base64.encodeBytes(proto.toByteArray());
conf.set(TableInputFormat.SCAN,scanToString)val datas = hbaseRdd.map( x=>x._2).map{result => (result.getRow,result.getValue(Bytes.toBytes("v"),Bytes.toBytes("value")))}.map(row => (new String(row._1),new String(row._2))).collect.foreach(r => (println(r._1+":"+r._2)))

scan_result

2.3 批量插入

2.3.1 普通插入

1)代码

val rdd = sc.textFile("/data/produce/2015/2015-03-01.log")
val data = rdd.map(_.split("@")).map{x=>(x(0)+x(1),x(2))}
val result = data.foreachPartition{x => {val conf= HBaseConfiguration.create();conf.set(TableInputFormat.INPUT_TABLE,"data");conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");conf.set("hbase.zookeeper.property.clientPort","2181");conf.addResource("/home/hadoop/data/lib/hbase-site.xml");val table = new HTable(conf,"data");table.setAutoFlush(false,false);table.setWriteBufferSize(3*1024*1024); x.foreach{y => {
var put= new Put(Bytes.toBytes(y._1));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)};table.flushCommits}}}

2)执行时间如下:7.6 min

执行时间

2.3.2 Bulkload

1) 代码:

val conf = HBaseConfiguration.create();
val tableName = "data1"
val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)

2) 执行时间:7s

执行时间_BulkLoad
3)执行结果:
到hbase shell 中查看 list “data1”

结果查询

通过对比我们可以发现bulkload批量导入所用时间远远少于普通导入,速度提升了60多倍,当然我没有使用更大的数据量测试,但是我相信导入速度的提升是非常显著的,强烈建议使用BulkLoad批量导入数据到HBase中。

关于Spark与Hbase之间操作就写到这里,如果有什么地方写得不对或者运行不了,欢迎指出,谢谢

转载于:https://www.cnblogs.com/simple-focus/p/6879971.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/416706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

greenplum vacuum清理删除数据命令

官方文档 http://docs.greenplum.org/6-4/ref_guide/sql_commands/VACUUM.html greenplum数据删除后不会释放存储空间&#xff0c;需要命令VACUUM清理释放 命令格式 VACUUM [({ FULL | FREEZE | VERBOSE | ANALYZE } [, ...])] [table [(column [, ...] )]] VACUUM [FULL] …

vitualbox命令操作VBoxManage

进入本地virtualbox管理 运行服务器上的virtualbox 本地直接启动了virtualbox界面 这个很牛啊&#xff0c;直接本地图形化管理了&#xff0c;不用纠结服务器端没有显卡&#xff0c;进不去图形界面的问题了。 研究了VBoxManage startvm 最后才发现可能这样也行&#xff0c;哈…

Hadoop2之NameNode HA详解

在Hadoop1中NameNode存在一个单点故障问题&#xff0c;如果NameNode所在的机器发生故障&#xff0c;整个集群就将不可用(Hadoop1中虽然有个SecorndaryNameNode&#xff0c;但是它并不是NameNode的备份&#xff0c;它只是NameNode的一个助理&#xff0c;协助NameNode工作&#x…

GCC10.1.0最新版编译

官网地址&#xff1a;GCC, the GNU Compiler Collection- GNU Project 下载编译 wget http://mirror.linux-ia64.org/gnu/gcc/releases/gcc-10.1.0/gcc-10.1.0.tar.gztar zxvf gcc-10.1.0.tar.gzcd gcc-10.1.0/mkdir buildcd build/../configure 报错&#xff0c;提示信息co…

python 一句话校验软件 hash值

转载&#xff1a; 我是如何 Python 一句话校验软件哈希值的 原创 2017-05-21 余弦 懒人在思考MD5 python -c "import hashlib,sys;print hashlib.md5(open(sys.argv[1],rb).read()).hexdigest()" ***.exe 校验 下载软件是否被“中间人动过手脚” 例如&#xff1a;校验…

Apache Nifi 实战:多表导入实现及填坑 GitChat连接

NiFi 是美国国家安全局开发并使用了 8 年的可视化数据集成产品&#xff0c;2014 年 NAS 将其贡献给了 Apache 社区&#xff0c;2015 年成为 Apache 顶级项目。 大数据平台都需要进行数据流转&#xff0c;Apache Nifi 作为一款强大的数据流开源软件&#xff0c;支持大量的输入输…

hosts 文件与 ipv6

ipv6 的项目地址&#xff1a;ipv6-hosts 正如文档中所说&#xff0c;用于在大陆地区加快 Google、YouTube、Facebook、Wikipedia 等的访问&#xff1b;&#xff08;twitter 不支持&#xff09;使用说明&#xff08;windows&#xff09;&#xff1a; 解压到的 hosts 文件对于 wi…

工作总结3:axios里面的主要参数

<template><div class"axios"></div> </template> <script> import axios from axiosexport default {name: axios3-2,created() {// 实际开发// 两种请求接口&#xff1a;// http://localhost:9090// http://localhost:9091const …

快速入门系列之 Scala 语言 GitChat连接

Scala 是一门多范式的编程语言&#xff0c;设计初衷是要集成面向对象编程和函数式编程的各种特性。目前常应用于 Spark、后端开发等&#xff0c;Twitter 等公司也选择其作为后端语言。 本文以实例为导向&#xff0c;讲解 Scala 这门语言&#xff0c;适合有一定其他面向对象语言…

jsp页面,使用Struts2标签,传递和获取Action类里的参数,注意事项。s:a actions:iterators:paramognl表达式...

在编写SSH2项目的时候&#xff0c;除了使用<s:form>表单标签向Action类跳转并传递参数之外&#xff0c;很更多时候还需要用到<s:a action"XXX.action">这个链接标签进行跳转&#xff0c;此时&#xff0c;传递需要的参数必不可少。此外&#xff0c;在jsp页…

工作总结4:拦截器的使用

1.拦截器综述 拦截器的功能是定义在Java拦截器规范。 拦截器规范定义了三种拦截点: 业务方法拦截, 生命周期回调侦听, 超时拦截(EJB)方法。 在容器的生命周期中进行拦截 public class DependencyInjectionInterceptor {PostConstructpublic void injectDependencies(Invoca…

快速入门系列之 Rust 语言 GitChat连接

Rust 是一枚新星&#xff0c;兼顾开发效率和执行效率的语言。本文以实例为导向&#xff0c;讲解 Rust 这门语言&#xff0c;适合有一定其他面向对象语言基础的人员快速入门。 本文将讲解如下内容&#xff1a; - Hello World 从头起 - 各种类型各种算 - 各式流程来控制 - 数组…

java -jar maven项目打包提示.jar中没有主清单属性

mvn package java -jar target/java_bottom_level_learning-1.0-SNAPSHOT.jar这里报错了&#xff1a; target/java_bottom_level_learning-1.0-SNAPSHOT.jar中没有主清单属性 我们打开 jar 中的 /META_INF/ MANIFEST.MF缺少项目启动项&#xff0c;即没有Main-Class 怎么处理呢…

工作总结5:插槽的使用

什么是插槽&#xff1f; 插槽就是子组件中的提供给父组件使用的一个占位符&#xff0c;用<slot></slot> 表示&#xff0c;父组件可以在这个占位符中填充任何模板代码&#xff0c;如 HTML、组件等&#xff0c;填充的内容会替换子组件的<slot></slot>标…

pycharm 破解

亲测有效&#xff01; http://blog.lanyus.com/archives/174.html 备注&#xff1a; 注册码破解链接&#xff1a;http://idea.lanyus.com/ 将下载的jar包放入PyCharm.exe所在路径 如D:\pycharm\pycharm2017\PyCharm 2017.1.2\bin pycharm.exe.vmoptions 和pycharm64.exe.vmopti…

Java JVM 汇编代码入门 GitChat链接

为什么 new Integer(151)151&#xff1f;我来带你们一起学习下 JVM 汇编代码吧&#xff0c;窥探下神奇的 Java 中间语言到底什么样子的&#xff0c;能帮你更深入的理解 Java。 本文包含以下内容 工具介绍 JVM 汇编代码初见 汇编初步分析 局部变量生命周期 基础类型 大于 5 的…

Streaming 101

开宗明义&#xff01;本文根据Google Beam大神Tyler Akidau的系列文章《The world beyond batch: Streaming 101》(批处理之外的流式世界)整理而成&#xff0c; 主要讨论流式数据处理。在大数据领域&#xff0c;流式数据处理越发地重要了。原因有以下几点&#xff1a; 人们越来…

工作总结6:token问题

1.使用请求拦截器&#xff0c;拦截vue所有请求&#xff0c;增加token参数 使用倒数计时&#xff0c;假如token有效期60分钟&#xff0c;会在59分钟的时候去重新拿着refresh_Token&#xff0c;去请求新的token. 注意&#xff1a;如果一个账号允许多人登录使用&#xff0c;上述方…

从底层重学 Java 之四大整数 GitChat链接

从底层&#xff0c;从原理&#xff0c;我们来重学一次 Java。四大 Java 整数类 Byte、Short、Integer、Long 是我们比较常用的对象&#xff0c;他们的源码及实现是怎样的呢&#xff1f; 本系列秉承所有结论尽量从源码中来&#xff0c;没有源码的尽量标明出处。相关源码会附着在…

二重循环

一、回顾3种循环结构 1、while 语法 条件表达式的初始值&#xff1b; while(条件表达式){ 循环操作&#xff1b; 更改条件表达式的语句&#xff1b; } 特点&#xff1a;先判断&#xff0c;再执行&#xff0c;有可能一次循环都没有 适用的场合&#xff1a;循环次数未知 表现形式…