package com.jojo import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Description:Hbase批量加载 同一列族多列 */ object HbaseBulkLoadApp { val zookeeperQuorum = "cdh01,cdh02,cdh03" //zookeeper信息 val dataSourcePath = "hdfs://cdh03:8020/user/hive/warehouse/sample_07" //源文件 val hFilePath = "hdfs://cdh03:8020/tmp/result" //hfile的存储路径 val hdfsRootPath = "hdfs://cdh03:8020/" //根路径 val tableName = "sample_07" //表名 val familyName = "basic" //列族 val arr = Array( "code" , "description" , "total_emp" , "salary" ) //列的名字集合 def main(args: Array[String]): Unit = { //获取content val sparkConf = new SparkConf() .setAppName(s "${this.getClass.getSimpleName}" ) .setMaster( "local" ) //指定序列化格式,默认是java序列化 . set ( "spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) //告知哪些类型需要序列化 .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result])) val sc = new SparkContext(sparkConf) //hadoop配置 val hadoopConf = new Configuration() hadoopConf. set ( "fs.defaultFS" , hdfsRootPath) //获取输出路径 val fileSystem = FileSystem. get (hadoopConf) //获取hbase配置 val hconf = HBaseConfiguration.create() //设置zookeeper集群 hconf. set ( "hbase.zookeeper.quorum" , zookeeperQuorum) //设置端口 hconf. set ( "hbase.zookeeper.property.clientPort" , "2181" ); //设置hfile最大个数 hconf. set ( "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily" , "3200" ) //设置hfile的大小 hconf. set ( "hbase.hregion.max.filesize" , "10737418240" ) hconf. set (TableOutputFormat.OUTPUT_TABLE, tableName) //获取hbase连接 val hbaseConn = ConnectionFactory.createConnection(hconf) val admin = hbaseConn.getAdmin /** * 保存生成的HFile文件 * 注:bulk load 生成的HFile文件需要落地 * 然后再通过LoadIncrementalHFiles类load进Hbase * 此处关于 sortBy 操作详解: * 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序, * 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因, * 这就要求我们在插入数据的时候,要插在rowkey该在的位置。 * 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序 * 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行 * 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true) * * @param hfileRDD */ // 0. 准备程序运行的环境 // 如果 HBase 表不存在,就创建一个新表 if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(familyName) desc.addFamily(hcd) admin.createTable(desc) print( "创建了一个新表" ) } // 如果存放 HFile文件的路径已经存在,就删除掉 if (fileSystem.exists( new Path(hFilePath))) { fileSystem.delete( new Path(hFilePath), true ) print( "删除hdfs上存在的路径" ) } // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错: // java.io.IOException: Added a key not lexically larger than previous. val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath) .map(row => { // 处理数据的逻辑 val arrs = row.split( "\t" ) var kvlist: Seq[KeyValue] = List() //存储多个列 var rowkey: Array[Byte] = null var cn: Array[Byte] = null var v: Array[Byte] = null var kv: KeyValue = null val cf = familyName.getBytes //列族 rowkey = Bytes.toBytes(arrs(0)) //key for (i <- 1 to (arrs.length - 1)) { cn = arr(i).getBytes() //列的名称 v = Bytes.toBytes(arrs(i)) //列的值 //将rdd转换成HFile需要的格式,上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序) } ( new ImmutableBytesWritable(rowkey), kvlist) }) val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data .flatMapValues(_.iterator) // 2. Save Hfiles on HDFS val table = hbaseConn.getTable(TableName.valueOf(tableName)) val job = Job.getInstance(hconf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoadMap(job, table) hfileRDD .sortBy(x => (x._1, x._2.getKeyString), true ) //要保持 整体有序 .saveAsNewAPIHadoopFile(hFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hconf) print( "成功生成HFILE" ) val bulkLoader = new LoadIncrementalHFiles(hconf) val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName)) bulkLoader.doBulkLoad( new Path(hFilePath), admin, table, regionLocator) hbaseConn.close() sc.stop() } } |