hbase常见处理方式

相关依赖

<dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.2.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies>

mr

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;import java.io.IOException;public class HBaseMR {public static class HBaseMapper extends TableMapper<Text,Put>{@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//获取rowkey的字节数组byte[] bytes = key.get();String rowkey = Bytes.toString(bytes);//构建一个put对象Put put = new Put(bytes);//获取一行中所有的cell对象Cell[] cells = value.rawCells();for (Cell cell : cells) {// f1列族if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){// name列名if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}// age列名if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}}}if(!put.isEmpty()){context.write(new Text(rowkey),put);}}}public  static  class HbaseReducer extends TableReducer<Text,Put,ImmutableBytesWritable>{@Overrideprotected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for (Put put : values) {context.write(null,put);}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Scan scan = new Scan();Job job = Job.getInstance(conf);job.setJarByClass(HBaseMR.class);//使用TableMapReduceUtil 工具类来初始化我们的mapperTableMapReduceUtil.initTableMapperJob(TableName.valueOf(args[0]),scan,HBaseMapper.class,Text.class,Put.class,job);//使用TableMapReduceUtil 工具类来初始化我们的reducerTableMapReduceUtil.initTableReducerJob(args[1],HbaseReducer.class,job);//设置reduce task个数job.setNumReduceTasks(1);System.exit(job.waitForCompletion(true) ? 0 : 1);}}

打成jar包提交到集群中运行

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;public class Hdfs2Hbase {public static class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable> {protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get());}}public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable> {protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {String[] split = key.toString().split(" ");Put put = new Put(Bytes.toBytes(split[0]));put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Hdfs2Hbase.class);job.setInputFormatClass(TextInputFormat.class);//输入文件路径TextInputFormat.addInputPath(job,new Path(args[0]));job.setMapperClass(HdfsMapper.class);//map端的输出的key value 类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//指定输出到hbase的表名TableMapReduceUtil.initTableReducerJob(args[1],HBASEReducer.class,job);//设置reduce个数job.setNumReduceTasks(1);System.exit(job.waitForCompletion(true)?0:1);}
}

打成jar包提交到集群中运行

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class HBaseLoad {public static class LoadMapper  extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {@Overrideprotected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {String[] split = value.toString().split(" ");Put put = new Put(Bytes.toBytes(split[0]));put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {final String INPUT_PATH=  "hdfs://node1:9000/input";final String OUTPUT_PATH= "hdfs://node1:9000/output_HFile";Configuration conf = HBaseConfiguration.create();Connection connection = ConnectionFactory.createConnection(conf);Table table = connection.getTable(TableName.valueOf("t4"));Job job= Job.getInstance(conf);job.setJarByClass(HBaseLoad.class);job.setMapperClass(LoadMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);//指定输出的类型HFileOutputFormat2job.setOutputFormatClass(HFileOutputFormat2.class);HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("t4")));FileInputFormat.addInputPath(job,new Path(INPUT_PATH));FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));System.exit(job.waitForCompletion(true)?0:1);}
}

打成jar包提交到集群中运行

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;public class LoadData {public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");//获取数据库连接Connection connection =  ConnectionFactory.createConnection(configuration);//获取表的管理器对象Admin admin = connection.getAdmin();//获取table对象TableName tableName = TableName.valueOf("t4");Table table = connection.getTable(tableName);//构建LoadIncrementalHFiles加载HFile文件LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);load.doBulkLoad(new Path("hdfs://node1:9000/output_HFile"), admin,table,connection.getRegionLocator(tableName));}
}

hbase集成hive

#### 整合配置1、修改hive-site.xml文件,添加配置属性<property>      <name>hbase.zookeeper.quorum</name><value>node1:2181,node2:2181,node3:2181</value></property>2、修改 hive-env.sh 文件,添加hbase的依赖包到hive的classpath中
export HIVE_CLASSPATH=$HIVE_CLASSPATH:/hbase/lib/*3、使用编译好的 hive-hbase-handler-1.2.1.jar替换hive之前的lib目录下的该jar包将hbase表映射到hive表中
创建基于hbase的hive表
create external table hiveFromHbase(
rowkey string,
f1 map<STRING,STRING>,
f2 map<STRING,STRING>,
f3 map<STRING,STRING>
) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:,f2:,f3:")
TBLPROPERTIES ("hbase.table.name" = "hbase_test");--这里使用外部表映射到HBase中的表,这样,在Hive中删除表,并不会删除HBase中的表,否则,就会删除。另外,除了rowkey,其他三个字段使用Map结构来保存HBase中的每一个列族。--hbase.columns.mapping
Hive表和HBase表的字段映射关系,分别为:Hive表中第一个字段映射:key(rowkey),第二个字段映射列族f1,第三个字段映射列族f2,第四个字段映射列族f3--hbase.table.name
HBase中表的名字
查看hive表的数据
select * from hivefromhbase;将hive表映射到hbase表中
1、创建一张映射hbase的表
create  table hive_test(
id string,
name string,
age int,
address string
)STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:name,f2:age,f3:address")
TBLPROPERTIES ("hbase.table.name" = "hbaseFromhive");
* 2、查看hbase映射表是否产生* 这里由于hive表是刚刚构建,目前是没有数据,同样这张hbase表也没有数据3、向hive表加载数据
insert into table hive_test select * from hive_source;hbase的数据备份#####  基于hbase提供的类对hbase中某张表进行备份* 使用hbase提供的类把hbase中某张表的数据导出hdfs,之后再导出到测试hbase表中。* (1)  ==从hbase表导出==
* HBase数据导出到HDFS
hbase org.apache.hadoop.hbase.mapreduce.Export test /hbase_data/test_bakHBase数据导出到本地文件
hbase org.apache.hadoop.hbase.mapreduce.Export test file:///home/hadoop/test_bak
`
将hdfs上的数据导入到备份目标表中将hdfs上的数据导入到备份目标表中
hbase org.apache.hadoop.hbase.mapreduce.Driver import test_bak /hbase_data/test_bak/*将本地文件上的数据导入到备份目标表中
hbase org.apache.hadoop.hbase.mapreduce.Driver import test_bak file:///home/hadoop/test_bak/*基于snapshot的方式实现对hbase中某张表进行备份* 通过snapshot快照的方式实现HBase数据的迁移和拷贝。这种方式比较常用,效率高,也是最为推荐的数据迁移方式。* HBase的snapshot其实就是一组==metadata==信息的集合(文件列表),通过这些metadata信息的集合,就能将表的数据回滚到snapshot那个时刻的数据。snapshot 'tableName', 'snapshotName'list_snapshots查找以test开头的snapshotlist_snapshots 'test.*'restore_snapshot 'snapshotName'ps:这里需要对表进行disable操作,先把表置为不可用状态,然后在进行进行restore_snapshot的操作例如:disable 'tableName'restore_snapshot 'snapshotName'enable 'tableName'delete_snapshot 'snapshotName'hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot \-snapshot snapshotName  \-copy-from hdfs://src-hbase-root-dir/hbase \-copy-to hdfs://dst-hbase-root-dir/hbase \-mappers 1 \-bandwidth 1024例如:hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot \-snapshot test  \-copy-from hdfs://node1:9000/hbase \-copy-to hdfs://node1:9000/hbase1 \-mappers 1 \-bandwidth 1024这种方式用于将快照表迁移到另外一个集群的时候使用,使用MR进行数据的拷贝,速度很快,使用的时候记得设置好bandwidth参数,以免由于网络打满导致的线上业务故障。* 将snapshot使用bulkload的方式导入~~~hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles \hdfs://dst-hbase-root-dir/hbase/archive/datapath/tablename/filename \tablename例如:创建一个新表create 'newTest','f1','f2'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://node1:9000/hbase1/archive/data/default/test/6325fabb429bf45c5dcbbe672225f1fb newTest~~~为了HBase的数据查询更高效、适应更多的场景,诸如使用非rowkey字段检索也能做到秒级响应,或者支持各个字段进行模糊查询和多字段组合查询等, 因此需要在HBase上面构建二级索引, 以满足现实中更复杂多样的业务需求。hbase的二级索引其本质就是建立hbase表中列与行键之间的映射关系。
构建hbase二级索引方案* MapReduce
* Hbase Coprocessor(协处理器)
* Solr+hbase
* ES+hbase
* Phoenix+hbase

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508770.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flink连接kafka整合hbase,scala

解析kafka当中的json格式的数据&#xff0c;入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBack…

sparkStreaming连接kafka整合hbase和redis

sparkStreaming消费kafka数据&#xff0c;并将数据保存到redis和hbase当中去&#xff0c;实现实时 import org.apache.hadoop.hbase.client.{Admin, Connection} import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.kafka.c…

sparksql一些指标

统计指标 select substr(tb.begin_address_code , 1 ,4) as begin_address_code , count(distinct vehicle_license) as dayVehicleCount from (select begin_address_code , vehicle_license from order where date_format(create_time , yyyy-MM-dd) 2020-02-15 ) tb grou…

sparkConf常见参数设置

def getSparkConf():SparkConf {val sparkConf: SparkConf new SparkConf().set("spark.driver.cores","4") //设置driver的CPU核数.set("spark.driver.maxResultSize","2g") //设置driver端结果存放的最大容量&#xff0c;这里设置…

sparkSession常见参数设置

def getSparkSession(sparkConf:SparkConf):SparkSession {val sparkSession: SparkSession SparkSession.builder().config(sparkConf)//调度模式.config("spark.scheduler.mode", "FAIR").config("spark.executor.memoryOverhead", "51…

关于kafka中acks是否可以为all

kafka源码中有这样一段代码&#xff1a; org.apache.kafka.clients.producer.KafkaProducer private static int parseAcks(String acksString) {try {return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());} catch (Numb…

关于统计时间切片标签的一些sql

------当天付费明细表 DROP TABLE IF EXISTS rpt.tmp_mm_rb_daily_ffmx; create table rpt.tmp_mm_rb_daily_ffmx as select a.* FROM (select c.feemsisdn, c.destmsisdn, c.day, c.price/1000 fee, c.contentid, dc.content_name, c.ordernumber, c.cdrtime, c.createtime, c…

hadoop 二次开发DatanodeWriteTimeout设置

int getDatanodeWriteTimeout(int numNodes) {return this.dfsClientConf.confTime > 0 ? this.dfsClientConf.confTime 5000 * numNodes : 0;}int getDatanodeReadTimeout(int numNodes) {return this.dfsClientConf.socketTimeout > 0 ? 5000 * numNodes this.dfsC…

聚类算法

假定样本集 D {X1&#xff0c; 的&#xff0c;…&#xff0c; Xm} 包含 m 个无标记样本&#xff0c; 每个样本 X (X1; X2;… ; Xn) 是一个 n 维特征向量&#xff0c;则聚类算法将样本 集 D 划分为 k 个不相交的簇 {Gl I l 1&#xff0c; 2;… &#xff0c;时&#xff0c;其中…

k-means均值向量

给定样本集 D {Xl) 的&#xff0c;… ,xm}, “k 均值” (k-means )算法针对聚类所 得簇划分 C {C1, C2,…, Ck} 最小化平方误差 ι ELL Ilx 一队IIL il EGi 其中队甘il LEGi X 是簇 q 的均值向量.在一定程度上 刻画了簇内样本围绕簇均值向量的紧密程度&#xff0c; E 值越小则…

学习向量量化

与 k 均值算法类似&#xff0c;“学习向量量化” (Learning Vector Quantization&#xff0c;简 称 LVQ)也是试图找到一组原型向量来刻画聚类结构&#xff0c; 但与一般聚类算法不同 的是&#xff0c; LVQ 假设数据样本带有类别标记&#xff0c;学习过程利用样本的这些监督信息…

k 近邻加权平均

k 近邻(k-Nearest Neighbor&#xff0c;简称 kNN)学习是一种常用的监督学习方法&#xff0c; 其工作机制非常简单: 给定测试样本?基于某种距离度量找出训练集中与其最 靠近的 k 个训练样本&#xff0c;然后基于这 k 个"邻居"的信息来进行预测. 通常&#xff0c; 在分…

k 近邻降维

k 近邻(k-Nearest Neighbor&#xff0c;简称 kNN)学习是一种常用的监督学习方法&#xff0c; 其工作机制非常简单: 给定测试样本?基于某种距离度量找出训练集中与其最 靠近的 k 个训练样本&#xff0c;然后基于这 k 个"邻居"的信息来进行预测. 通常&#xff0c; 在分…

维度建模工具

幵始维度建模工作前&#xff0c;项目组需要理解业务需求&#xff0c;以及作为基础的源数据的实际情况。 通过与、 Ik务代表交流来发现需求&#xff0c;用于理解他们的基于关键性能指标、竞争性商业问题、 决策制定过程、支持分析需求的目标。同时&#xff0c;数据实际情况可以通…

Cube和Grouping 和Rollup

增强的聚合 Cube和Grouping 和Rollup 这几个分析函数通常用于OLAP中&#xff0c;不能累加&#xff0c;而且需要根据不同维度上钻和下钻的指标统计&#xff0c;比如&#xff0c;分小时、天、月的UV数。 GROUPING SETS 在一个GROUP BY查询中&#xff0c;根据不同的维度组合进行聚…

常见维度建模错误

需要避免的常见维度建模错误 错误 10: 在事实表中放入文本属性 要从数据仓库事实表中 挑出这些文本属性&#xff0c;并将它们放入维度表中。 错误 9: 限制使用冗长的描述符以节省空间 维度表从几何上看总是比事实表小很多。 错误 8: 将层次划分为多个维度 以用户看来最自然最 有…

2020-09-21

columns has 234 elements while hbase.columns.mapping has 92 elements (counting the key if implicit)) 根本原因&#xff1a; 对于4000个字符&#xff0c;hive Metastore中SERDE_PARAMS表中PARAM_VALUE字段的字符限制是此问题的根本原因。 此限制可防止Hive创建具有高列数…

2020-09-23

insert into table ads_user_action_convert_day select ‘2019-02-10’, uv.day_count, ua.order_count, cast(ua.order_count/uv.day_count as decimal(10,2)) visitor2order_convert_ratio, ua.payment_count, cast(ua.payment_count/ua.order_count as decimal(10,2)) orde…

2020-09-28

Mybatis中#{}和${} 1、#{}将传入的数据都当成一个字符串&#xff0c;会对自动传入的数据加一个引号&#xff08;单引号&#xff1f;双引号&#xff1f;加了引号就对了&#xff09;如&#xff1a; //传入的值是sex order by #{column} 解析后为 order by “sex” //将会出错 2…

PageUtil

Data NoArgsConstructor public class PageUtil implements Serializable {private static final long serialVersionUID 1L;/*** 总记录数*/private int totalCount;/*** 每页记录数*/private int pageSize;/*** 总页数*/private int totalPage;/*** 当前页数*/private int c…