本实例采用Scala开发,实现了RDD数据两种方式入库到HBase,从HBase中读取数据并print输出。
build.sbt
name := "SparkSbt"version := "0.1"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"libraryDependencies+="org.apache.hbase"%"hbase-common"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-client"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-server"%"1.2.0"
先hbase shell执行命令创建表:
create 'account' , 'cf'
create 'account2' , 'cf'
源码
package com.whq.testimport org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._object HBaseTest {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("HBaseTest")val sc = new SparkContext(sparkConf)// please ensure HBASE_CONF_DIR is on classpath of spark driverval conf = HBaseConfiguration.create()//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置conf.set("hbase.zookeeper.quorum","192.168.91.144")conf.set("hbase.zookeeper.property.clientPort", "2181")入库方式一saveAsHadoopDatasetprintln("————————————入库方式一")var tablename = "account"conf.set(TableInputFormat.INPUT_TABLE, tablename)//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!val jobConf = new JobConf(conf)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)//待入库数据val indataRDD = sc.makeRDD(Array("11,whq,30","12,wanghongqi,29","13,xiaoming,15"))//数据转换为可入库的RDD[(ImmutableBytesWritable,Put)]val rdd = indataRDD.map(_.split(',')).map{arr=>{/*一个Put对象就是一行记录,在构造方法中指定主键* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换* Put.add方法接收三个参数:列族,列名,数据*/val put = new Put(Bytes.toBytes(arr(0).toInt))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset(new ImmutableBytesWritable, put)}}//入库写入rdd.saveAsHadoopDataset(jobConf)入库方式二saveAsNewAPIHadoopDatasetprintln("————————————入库方式二")tablename = "account2"conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)val job2 = Job.getInstance(conf)job2.setOutputKeyClass(classOf[ImmutableBytesWritable])job2.setOutputValueClass(classOf[Result])job2.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])val rdd2 = indataRDD.map(_.split(',')).map{arr=>{val put = new Put(Bytes.toBytes(arr(0)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))(new ImmutableBytesWritable, put)}}rdd2.saveAsNewAPIHadoopDataset(job2.getConfiguration())读取数据println("————————————读取数据")conf.set(TableInputFormat.INPUT_TABLE, tablename)//读取数据并转化成rddval hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count = hBaseRDD.count()println(count)hBaseRDD.collect().foreach{case (_,result) =>{//获取行键val key = Bytes.toString(result.getRow)//通过列族和列名获取列val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))println("Row key:"+key+" Name:"+name+" Age:"+age)}}sc.stop()}
}
执行命令
spark-submit --master yarn --deploy-mode client --class com.whq.test.HBaseTest sparksbt_2.10-0.1.jar
查看数据情况
scan 'account'
scan 'account2'