Spark 连接 HBase 入库及查询操作

本实例采用Scala开发,实现了RDD数据两种方式入库到HBase,从HBase中读取数据并print输出。

build.sbt

 

name := "SparkSbt"version := "0.1"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"libraryDependencies+="org.apache.hbase"%"hbase-common"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-client"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-server"%"1.2.0"

 

先hbase shell执行命令创建表:

create 'account' , 'cf'

create 'account2' , 'cf'

 

源码

package com.whq.testimport org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._object HBaseTest {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("HBaseTest")val sc = new SparkContext(sparkConf)// please ensure HBASE_CONF_DIR is on classpath of spark driverval conf = HBaseConfiguration.create()//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置conf.set("hbase.zookeeper.quorum","192.168.91.144")conf.set("hbase.zookeeper.property.clientPort", "2181")入库方式一saveAsHadoopDatasetprintln("————————————入库方式一")var tablename = "account"conf.set(TableInputFormat.INPUT_TABLE, tablename)//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!val jobConf = new JobConf(conf)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)//待入库数据val indataRDD = sc.makeRDD(Array("11,whq,30","12,wanghongqi,29","13,xiaoming,15"))//数据转换为可入库的RDD[(ImmutableBytesWritable,Put)]val rdd = indataRDD.map(_.split(',')).map{arr=>{/*一个Put对象就是一行记录,在构造方法中指定主键* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换* Put.add方法接收三个参数:列族,列名,数据*/val put = new Put(Bytes.toBytes(arr(0).toInt))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset(new ImmutableBytesWritable, put)}}//入库写入rdd.saveAsHadoopDataset(jobConf)入库方式二saveAsNewAPIHadoopDatasetprintln("————————————入库方式二")tablename = "account2"conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)val job2 = Job.getInstance(conf)job2.setOutputKeyClass(classOf[ImmutableBytesWritable])job2.setOutputValueClass(classOf[Result])job2.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])val rdd2 = indataRDD.map(_.split(',')).map{arr=>{val put = new Put(Bytes.toBytes(arr(0)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))(new ImmutableBytesWritable, put)}}rdd2.saveAsNewAPIHadoopDataset(job2.getConfiguration())读取数据println("————————————读取数据")conf.set(TableInputFormat.INPUT_TABLE, tablename)//读取数据并转化成rddval hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count = hBaseRDD.count()println(count)hBaseRDD.collect().foreach{case (_,result) =>{//获取行键val key = Bytes.toString(result.getRow)//通过列族和列名获取列val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))println("Row key:"+key+" Name:"+name+" Age:"+age)}}sc.stop()}
}

执行命令

spark-submit --master yarn --deploy-mode client --class com.whq.test.HBaseTest sparksbt_2.10-0.1.jar

 

查看数据情况

scan 'account'

scan 'account2'

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/416954.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++程序设计语言(特别版) -- 一个桌面计算器

前言 这里要介绍各种语句和表达式,将通过一个桌面计算器的程序做些事情,该计算器提供四种座位浮点数的中缀运算符的标准算术运算。这个计算器由四个部分组成:一个分析器,一个输入函数,一个符号表和一个驱动程序。分析器…

前端学习(2364):图片的上传

message.vue <template><view><button type"primary" click"chooseImg">上传图片</button><image v-for"item in imgArr" :src"item" mode""></image></view> </template>…

superset docker方式安装

superset是优秀的数据可视化开源项目&#xff0c;为用户提供了丰富的图表视觉效果&#xff0c;基于python开发。但仅可以以图表、表格形式展示结果&#xff0c;不支持页面组装等高级BI功能。 准备环境docker 然后执行命令 docker pull amancevice/superset docker run --det…

※交换排序(1)——快速排序(quick sort)

快速排序使用分治策略(Divide and Conquer)来把一个序列分为两个子序列。步骤为&#xff1a; 从序列中挑出一个元素&#xff0c;作为"基准"(pivot). 把所有比基准值小的元素放在基准前面&#xff0c;所有比基准值大的元素放在基准的后面&#xff08;相同的数可以到任…

前端学习(2365):图片的预览

message.vue <template><view><button type"primary" click"chooseImg">上传图片</button><image v-for"item in imgArr" :src"item" click"previewImg(item)"></image></view>…

superset 图标调整

superset docker方式安装 系统为debian 9 安装路径为 /usr/local/lib/python3.6/site-packages/superset/ web页面左上角图标路径为&#xff1a; static/assets/images/superset-logo2x.png 采用Flask框架 页面在views/中&#xff0c;core.py中 #欢迎页面 expose(/welco…

第五周思维导图

转载于:https://www.cnblogs.com/java1765415329/p/6686733.html

superset 时区问题Timestamp subtraction must have the same timezones or no timezones

superset绑定数据源后&#xff0c;切换到图表展示时报错&#xff1a; Timestamp subtraction must have the same timezones or no timezones File "/usr/local/lib/python3.6/site-packages/superset/utils/core.py", line 362, in datetime_to_epoch return (dtt…

使用Kotlin开发Android应用 - 环境搭建 (1)

一. 在Android Studio上安装Kotlin插件 按快捷键Command, -> 在Preferences界面找到Plugins -> 点击Browse repositories ... -> 输入Kotlin -> 安装Kotlin (老版本需要安装 Kotlin 和Kotlin Extensions For Android两个插件, 最新版本的Kotlin插件包含了Android E…

前端学习(2367):两种方式导航跳转和传参

<template><view><button type"primary" click"chooseImg">上传图片</button><image v-for"item in imgArr" :src"item" click"previewImg(item)"></image><navigator url"../…

sqlserver启用登陆审计

客户端管理工具进入后&#xff1a;安全性——审核——新建审核——审核目标&#xff08;就是存储位置&#xff09;选application log&#xff08;windows系统应用日志&#xff09;或者选File&#xff08;文件目录中会产生一堆日志文件&#xff0c;设置最大滚动更新文件数、最大…

java基础——equals及==的区别

①数值比较【必须为整数&#xff0c;小数因为精度问题不能通过这个比较&#xff0c;只能依靠d1-d2<0.0000000001这样的比较两者的不同】&#xff0c;值相等就行。 ②引用比较&#xff0c;引用的对象有父子关系或者是同一类的才能比较&#xff0c;只有当指向同一个引用才相等…

前端学习(2368):编程式导航

<template><view><button type"primary" click"chooseImg">上传图片</button><image v-for"item in imgArr" :src"item" click"previewImg(item)"></image><navigator url"../…

centos7 firewall防火墙实现映射其他机器的端口

vi /etc/sysctl.conf 增加一行 net.ipv4.ip_forward 1 使上述修改生效 sysctl -p firewall-cmd --add-port5432/tcp --permanent #开放端口 firewall-cmd --add-forward-portport5432:prototcp:toaddr10.0.197.189:toport5432 --permanent #添加端口映射 10.0.197.189的54…

Navicat Premium 未保存和已执行SQL存储位置

未保存备份存放目录地址&#xff1a; C:\Users\{登录用户名}\Documents\Navicat\MySQL\servers\{MySQL连接名称}\{数据库名称} SQL执行历史文件地址&#xff1a; C:\Users\{登录用户名}\Documents\Navicat\Premium\logs\LogHistory.txt

正则表达式之反向引用

1.概述 捕获组捕获到的内容&#xff0c;不仅可以在正则表达式外部通过程序进行引用&#xff0c;也可以在正则表达式内部进行引用&#xff0c;这种引用方式就是反向引用。要了解反向引用&#xff0c;首先要了解捕获组&#xff0c;关于捕获组&#xff0c;参考 正则基础之——捕获…

MySQL 表一列逗号分隔字段,按逗号切割

直接查询 ---切割前 select id,content from test_split; 1 12,13,14 2 21,25 3 33 --切割后 1 12 1 13 1 14 2 21 2 25 3 33 --执行sql SELECT a.id,SUBSTRING_INDEX(SUBSTRING_INDEX(a.content,,,b.help_topic_id1),,,-1) AS num FROM test_split a join mys…