相关依赖
<dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.2.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies>
mr
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;import java.io.IOException;public class HBaseMR {public static class HBaseMapper extends TableMapper<Text,Put>{@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//获取rowkey的字节数组byte[] bytes = key.get();String rowkey = Bytes.toString(bytes);//构建一个put对象Put put = new Put(bytes);//获取一行中所有的cell对象Cell[] cells = value.rawCells();for (Cell cell : cells) {// f1列族if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){// name列名if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}// age列名if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}}}if(!put.isEmpty()){context.write(new Text(rowkey),put);}}}public static class HbaseReducer extends TableReducer<Text,Put,ImmutableBytesWritable>{@Overrideprotected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for (Put put : values) {context.write(null,put);}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Scan scan = new Scan();Job job = Job.getInstance(conf);job.setJarByClass(HBaseMR.class);//使用TableMapReduceUtil 工具类来初始化我们的mapperTableMapReduceUtil.initTableMapperJob(TableName.valueOf(args[0]),scan,HBaseMapper.class,Text.class,Put.class,job);//使用TableMapReduceUtil 工具类来初始化我们的reducerTableMapReduceUtil.initTableReducerJob(args[1],HbaseReducer.class,job);//设置reduce task个数job.setNumReduceTasks(1);System.exit(job.waitForCompletion(true) ? 0 : 1);}}
打成jar包提交到集群中运行
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;public class Hdfs2Hbase {public static class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable> {protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get());}}public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable> {protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {String[] split = key.toString().split(" ");Put put = new Put(Bytes.toBytes(split[0]));put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Hdfs2Hbase.class);job.setInputFormatClass(TextInputFormat.class);//输入文件路径TextInputFormat.addInputPath(job,new Path(args[0]));job.setMapperClass(HdfsMapper.class);//map端的输出的key value 类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//指定输出到hbase的表名TableMapReduceUtil.initTableReducerJob(args[1],HBASEReducer.class,job);//设置reduce个数job.setNumReduceTasks(1);System.exit(job.waitForCompletion(true)?0:1);}
}
打成jar包提交到集群中运行
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class HBaseLoad {public static class LoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {@Overrideprotected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {String[] split = value.toString().split(" ");Put put = new Put(Bytes.toBytes(split[0]));put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {final String INPUT_PATH= "hdfs://node1:9000/input";final String OUTPUT_PATH= "hdfs://node1:9000/output_HFile";Configuration conf = HBaseConfiguration.create();Connection connection = ConnectionFactory.createConnection(conf);Table table = connection.getTable(TableName.valueOf("t4"));Job job= Job.getInstance(conf);job.setJarByClass(HBaseLoad.class);job.setMapperClass(LoadMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);//指定输出的类型HFileOutputFormat2job.setOutputFormatClass(HFileOutputFormat2.class);HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("t4")));FileInputFormat.addInputPath(job,new Path(INPUT_PATH));FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));System.exit(job.waitForCompletion(true)?0:1);}
}
打成jar包提交到集群中运行
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;public class LoadData {public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");//获取数据库连接Connection connection = ConnectionFactory.createConnection(configuration);//获取表的管理器对象Admin admin = connection.getAdmin();//获取table对象TableName tableName = TableName.valueOf("t4");Table table = connection.getTable(tableName);//构建LoadIncrementalHFiles加载HFile文件LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);load.doBulkLoad(new Path("hdfs://node1:9000/output_HFile"), admin,table,connection.getRegionLocator(tableName));}
}
hbase集成hive
#### 整合配置1、修改hive-site.xml文件,添加配置属性<property> <name>hbase.zookeeper.quorum</name><value>node1:2181,node2:2181,node3:2181</value></property>2、修改 hive-env.sh 文件,添加hbase的依赖包到hive的classpath中
export HIVE_CLASSPATH=$HIVE_CLASSPATH:/hbase/lib/*3、使用编译好的 hive-hbase-handler-1.2.1.jar替换hive之前的lib目录下的该jar包将hbase表映射到hive表中
创建基于hbase的hive表
create external table hiveFromHbase(
rowkey string,
f1 map<STRING,STRING>,
f2 map<STRING,STRING>,
f3 map<STRING,STRING>
) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:,f2:,f3:")
TBLPROPERTIES ("hbase.table.name" = "hbase_test");--这里使用外部表映射到HBase中的表,这样,在Hive中删除表,并不会删除HBase中的表,否则,就会删除。另外,除了rowkey,其他三个字段使用Map结构来保存HBase中的每一个列族。--hbase.columns.mapping
Hive表和HBase表的字段映射关系,分别为:Hive表中第一个字段映射:key(rowkey),第二个字段映射列族f1,第三个字段映射列族f2,第四个字段映射列族f3--hbase.table.name
HBase中表的名字
查看hive表的数据
select * from hivefromhbase;将hive表映射到hbase表中
1、创建一张映射hbase的表
create table hive_test(
id string,
name string,
age int,
address string
)STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:name,f2:age,f3:address")
TBLPROPERTIES ("hbase.table.name" = "hbaseFromhive");
* 2、查看hbase映射表是否产生* 这里由于hive表是刚刚构建,目前是没有数据,同样这张hbase表也没有数据3、向hive表加载数据
insert into table hive_test select * from hive_source;hbase的数据备份##### 基于hbase提供的类对hbase中某张表进行备份* 使用hbase提供的类把hbase中某张表的数据导出hdfs,之后再导出到测试hbase表中。* (1) ==从hbase表导出==
* HBase数据导出到HDFS
hbase org.apache.hadoop.hbase.mapreduce.Export test /hbase_data/test_bakHBase数据导出到本地文件
hbase org.apache.hadoop.hbase.mapreduce.Export test file:///home/hadoop/test_bak
`
将hdfs上的数据导入到备份目标表中将hdfs上的数据导入到备份目标表中
hbase org.apache.hadoop.hbase.mapreduce.Driver import test_bak /hbase_data/test_bak/*将本地文件上的数据导入到备份目标表中
hbase org.apache.hadoop.hbase.mapreduce.Driver import test_bak file:///home/hadoop/test_bak/*基于snapshot的方式实现对hbase中某张表进行备份* 通过snapshot快照的方式实现HBase数据的迁移和拷贝。这种方式比较常用,效率高,也是最为推荐的数据迁移方式。* HBase的snapshot其实就是一组==metadata==信息的集合(文件列表),通过这些metadata信息的集合,就能将表的数据回滚到snapshot那个时刻的数据。snapshot 'tableName', 'snapshotName'list_snapshots查找以test开头的snapshotlist_snapshots 'test.*'restore_snapshot 'snapshotName'ps:这里需要对表进行disable操作,先把表置为不可用状态,然后在进行进行restore_snapshot的操作例如:disable 'tableName'restore_snapshot 'snapshotName'enable 'tableName'delete_snapshot 'snapshotName'hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot \-snapshot snapshotName \-copy-from hdfs://src-hbase-root-dir/hbase \-copy-to hdfs://dst-hbase-root-dir/hbase \-mappers 1 \-bandwidth 1024例如:hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot \-snapshot test \-copy-from hdfs://node1:9000/hbase \-copy-to hdfs://node1:9000/hbase1 \-mappers 1 \-bandwidth 1024这种方式用于将快照表迁移到另外一个集群的时候使用,使用MR进行数据的拷贝,速度很快,使用的时候记得设置好bandwidth参数,以免由于网络打满导致的线上业务故障。* 将snapshot使用bulkload的方式导入~~~hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles \hdfs://dst-hbase-root-dir/hbase/archive/datapath/tablename/filename \tablename例如:创建一个新表create 'newTest','f1','f2'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://node1:9000/hbase1/archive/data/default/test/6325fabb429bf45c5dcbbe672225f1fb newTest~~~为了HBase的数据查询更高效、适应更多的场景,诸如使用非rowkey字段检索也能做到秒级响应,或者支持各个字段进行模糊查询和多字段组合查询等, 因此需要在HBase上面构建二级索引, 以满足现实中更复杂多样的业务需求。hbase的二级索引其本质就是建立hbase表中列与行键之间的映射关系。
构建hbase二级索引方案* MapReduce
* Hbase Coprocessor(协处理器)
* Solr+hbase
* ES+hbase
* Phoenix+hbase