java导出hbase表数据_通用MapReduce程序复制HBase表数据

编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。

原始表test1数据如下:

b4d7516b06e845aed13292d5e5c2b89c.png

每个row key都有两个版本的数据,这里只显示了row key为1的数据

在hbase shell 中创建数据表:

create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、无列导入设置、无列导出设置的数据

create 'test3',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、无列导入设置、有列导出设置的数据

create 'test4',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、有列导入设置、无列导出设置的数据

create 'test5',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、无列导入设置、无列导出设置的数据

create 'test6',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、无列导入设置、有列导出设置的数据

create 'test7',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列导入设置、无列导出设置的数据

create 'test8',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列导入设置、有列导出设置的数据

main函数入口:

package GeneralHBaseToHBase;

import org.apache.hadoop.util.ToolRunner;

public class DriverTest {

public static void main(String[] args) throws Exception {

// 无版本设置、无列导入设置,无列导出设置

String[] myArgs1= new String[]{

"test1", // 输入表

"test2", // 输出表

"0", // 版本大小数,如果值为0,则为默认从输入表导出最新的数据到输出表

"-1", // 列导入设置,如果为-1 ,则没有设置列导入

"-1" // 列导出设置,如果为-1,则没有设置列导出

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs1);

// 无版本设置、有列导入设置,无列导出设置

String[] myArgs2= new String[]{

"test1",

"test3",

"0",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs2);

// 无版本设置,无列导入设置,有列导出设置

String[] myArgs3= new String[]{

"test1",

"test4",

"0",

"-1",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs3);

// 有版本设置,无列导入设置,无列导出设置

String[] myArgs4= new String[]{

"test1",

"test5",

"2",

"-1",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs4);

// 有版本设置、有列导入设置,无列导出设置

String[] myArgs5= new String[]{

"test1",

"test6",

"2",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs5);

// 有版本设置、无列导入设置,有列导出设置

String[] myArgs6= new String[]{

"test1",

"test7",

"2",

"-1",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs6);

// 有版本设置、有列导入设置,有列导出设置

String[] myArgs7= new String[]{

"test1",

"test8",

"2",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs7);

}

}

driver:

package GeneralHBaseToHBase;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import util.JarUtil;

public class HBaseDriver extends Configured implements Tool{

public static String FROMTABLE=""; //导入表

public static String TOTABLE=""; //导出表

public static String SETVERSION=""; //是否设置版本

// args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable}

@Override

public int run(String[] args) throws Exception {

if(args.length!=5){

System.err.println("Usage:\n demo.job.HBaseDriver "

+ ""

+"< versions >"

+ " like or "

+ " like or ");

return -1;

}

Configuration conf = getConf();

FROMTABLE = args[0];

TOTABLE = args[1];

SETVERSION = args[2];

conf.set("SETVERSION", SETVERSION);

if(!args[3].equals("-1")){

conf.set("COLUMNFROMTABLE", args[3]);

}

if(!args[4].equals("-1")){

conf.set("COLUMNTOTABLE", args[4]);

}

String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE;

Job job = Job.getInstance(conf, jobName);

job.setJarByClass(HBaseDriver.class);

Scan scan = new Scan();

// 判断是否需要设置版本

if(SETVERSION != "0" || SETVERSION != "1"){

scan.setMaxVersions(Integer.parseInt(SETVERSION));

}

// 设置HBase表输入:表名、scan、Mapper类、mapper输出键类型、mapper输出值类型

TableMapReduceUtil.initTableMapperJob(

FROMTABLE,

scan,

HBaseToHBaseMapper.class,

ImmutableBytesWritable.class,

Put.class,

job);

// 设置HBase表输出:表名,reducer类

TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job);

// 没有 reducers, 直接写入到 输出文件

job.setNumReduceTasks(0);

return job.waitForCompletion(true) ? 0 : 1;

}

private static Configuration configuration;

public static Configuration getConfiguration(){

if(configuration==null){

/**

* TODO 了解如何直接从Windows提交代码到Hadoop集群

* 并修改其中的配置为实际配置

*/

configuration = new Configuration();

configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平台提交任务

configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode

configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架

configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager

configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定资源分配器

configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver

configuration.set("hbase.master", "master:16000");

configuration.set("hbase.rootdir", "hdfs://master:8020/hbase");

configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");

configuration.set("hbase.zookeeper.property.clientPort", "2181");

//TODO 需export->jar file ; 设置正确的jar包所在位置

configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 设置jar包路径

}

return configuration;

}

}

mapper:

package GeneralHBaseToHBase;

import java.io.IOException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.HashSet;

import java.util.Map.Entry;

import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class HBaseToHBaseMapper extends TableMapper {

Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class);

private static int versionNum = 0;

private static String[] columnFromTable = null;

private static String[] columnToTable = null;

private static String column1 = null;

private static String column2 = null;

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

Configuration conf = context.getConfiguration();

versionNum = Integer.parseInt(conf.get("SETVERSION", "0"));

column1 = conf.get("COLUMNFROMTABLE",null);

if(!(column1 == null)){

columnFromTable = column1.split(",");

}

column2 = conf.get("COLUMNTOTABLE",null);

if(!(column2 == null)){

columnToTable = column2.split(",");

}

}

@Override

protected void map(ImmutableBytesWritable key, Result value,

Context context)

throws IOException, InterruptedException {

context.write(key, resultToPut(key,value));

}

/***

* 把key,value转换为Put

* @param key

* @param value

* @return

* @throws IOException

*/

private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException {

HashMap fTableMap = new HashMap<>();

HashMap tTableMap = new HashMap<>();

Put put = new Put(key.get());

if(! (columnFromTable == null || columnFromTable.length == 0)){

fTableMap = getFamilyAndColumn(columnFromTable);

}

if(! (columnToTable == null || columnToTable.length == 0)){

tTableMap = getFamilyAndColumn(columnToTable);

}

if(versionNum==0){

if(fTableMap.size() == 0){

if(tTableMap.size() == 0){

for (Cell kv : value.rawCells()) {

put.add(kv); // 没有设置版本,没有设置列导入,没有设置列导出

}

return put;

} else{

return getPut(put, value, tTableMap); // 无版本、无列导入、有列导出

}

} else {

if(tTableMap.size() == 0){

return getPut(put, value, fTableMap);// 无版本、有列导入、无列导出

} else {

return getPut(put, value, tTableMap);// 无版本、有列导入、有列导出

}

}

} else{

if(fTableMap.size() == 0){

if(tTableMap.size() == 0){

return getPut1(put, value); // 有版本,无列导入,无列导出

}else{

return getPut2(put, value, tTableMap); //有版本,无列导入,有列导出

}

}else{

if(tTableMap.size() == 0){

return getPut2(put,value,fTableMap);// 有版本,有列导入,无列导出

}else{

return getPut2(put,value,tTableMap); // 有版本,有列导入,有列导出

}

}

}

}

/***

* 无版本设置的情况下,对于有列导入或者列导出

* @param put

* @param value

* @param tableMap

* @return

* @throws IOException

*/

private Put getPut(Put put,Result value,HashMap tableMap) throws IOException{

for(Cell kv : value.rawCells()){

byte[] family = kv.getFamily();

if(tableMap.containsKey(new String(family))){

String columnStr = tableMap.get(new String(family));

ArrayList columnBy = toByte(columnStr);

if(columnBy.contains(new String(kv.getQualifier()))){

put.add(kv); //没有设置版本,没有设置列导入,有设置列导出

}

}

}

return put;

}

/***

* (有版本,无列导入,有列导出)或者(有版本,有列导入,无列导出)

* @param put

* @param value

* @param tTableMap

* @return

*/

private Put getPut2(Put put,Result value,HashMap tableMap){

NavigableMap>> map=value.getMap();

for(byte[] family:map.keySet()){

if(tableMap.containsKey(new String(family))){

String columnStr = tableMap.get(new String(family));

log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr);

ArrayList columnBy = toByte(columnStr);

NavigableMap> familyMap = map.get(family);//列簇作为key获取其中的列相关数据

for(byte[] column:familyMap.keySet()){ //根据列名循坏

log.info("!!!!!!!!!!!"+new String(column));

if(columnBy.contains(new String(column))){

NavigableMap valuesMap = familyMap.get(column);

for(Entry s:valuesMap.entrySet()){//获取列对应的不同版本数据,默认最新的一个

System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue()));

put.addColumn(family, column, s.getKey(),s.getValue());

}

}

}

}

}

return put;

}

/***

* 有版本、无列导入、无列导出

* @param put

* @param value

* @return

*/

private Put getPut1(Put put,Result value){

NavigableMap>> map=value.getMap();

for(byte[] family:map.keySet()){

NavigableMap> familyMap = map.get(family);//列簇作为key获取其中的列相关数据

for(byte[] column:familyMap.keySet()){ //根据列名循坏

NavigableMap valuesMap = familyMap.get(column);

for(Entry s:valuesMap.entrySet()){ //获取列对应的不同版本数据,默认最新的一个

put.addColumn(family, column, s.getKey(),s.getValue());

}

}

}

return put;

}

// str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}

/***

* 得到列簇名与列名的k,v形式的map

* @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}

* @return map => {"cf1" => "c1,c2,c10,c11,c14"}

*/

private static HashMap getFamilyAndColumn(String[] str){

HashMap map = new HashMap<>();

HashSet set = new HashSet<>();

for(String s : str){

set.add(s.split(":")[0]);

}

Object[] ob = set.toArray();

for(int i=0; i

String family = String.valueOf(ob[i]);

String columns = "";

for(int j=0;j < str.length;j++){

if(family.equals(str[j].split(":")[0])){

columns += str[j].split(":")[1]+",";

}

}

map.put(family, columns.substring(0, columns.length()-1));

}

return map;

}

private static ArrayList toByte(String s){

ArrayList b = new ArrayList<>();

String[] sarr = s.split(",");

for(int i=0;i

b.add(sarr[i]);

}

return b;

}

}

程序运行完之后,在hbase shell中查看每个表,看是否数据导入正确:

test2:(无版本、无列导入设置、无列导出设置)

663c31cae4e6e0014af4b367ca742ec4.png

test3 (无版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

8fed56c4365c50b1bf7384c7c3e90809.png

test4(无版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

95dd81da9857f9d75e6f182eca8ed7f0.png

test5(有版本、无列导入设置、无列导出设置)

b37a5ec4bcc01f3bd824c1d692a83cc9.png

test6(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

7acc004cefa773a006865c0612567622.png

test7(有版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

4c95796f2116410dbf6804a957d4012a.png

test8(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

6649a92bc17c809fac6a0ef6dd2fc035.png

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/443070.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java服务器和linux_在Linux下开一个Java服务器(使用CatServer Pro)

引言Linux开服具有快速&#xff0c;高效&#xff0c;性能等特点&#xff0c;而Windows虽然简单&#xff0c;但是不具备Linux良好的性能。本教程就说明一下简单的Linux开服方式(需要教程的人&#xff0c;如果你学会后&#xff0c;请无偿帮助更多的人。)服务器准备首先。先准备一…

我的世界java版游戏崩溃_我的世界全攻略之-游戏崩溃的解决方法

我的世界崩溃怎么办&#xff1f;下面吾爱网小编给大家带来我的世界无法正常启动的解决方法,需要的朋友可以参考下。我的世界作为许多玩家都十分喜爱的模拟经营沙盘类游戏,经常有玩家反映在玩我的世界的时候,游戏总是会出现崩溃或者无法启动的情况,玩家在遇到的时候不知道怎么办…

vue3 新项目 - 搭建路由router

创建router/index 文件 main.ts 安装 router 然后 在 app下面 去 设置 路由出口

java如何获得相反的颜色_javascript – 如何根据当前颜色生成相反的颜色?

更新&#xff1a;GitHub上的生产就绪代码.我就是这样做的&#xff1a;>将HEX转换为RGB>反转R,G和B组件>将每个组件转换回HEX>用零和输出填充每个组件.function invertColor(hex) {if (hex.indexOf(#) 0) {hex hex.slice(1);}// convert 3-digit hex to 6-digits.…

wamp php启动不成功,wamp的mysql 启动失败解决

wamp启动失败&#xff0c;查看原因是mysql 启动失败首先查看mysql的启动日志命令&#xff1a;mysqld --console知道error报错的地方&#xff1a;然后百度了此报错&#xff0c;解决方法在my.ini中添加innodb_force_recovery 1发现这个会影响insert需要设置为 innodb_force_reco…

php复选框样式,如何自定义checkbox样式?附代码

本篇文章给大家带来的内容是关于如何自定义checkbox样式&#xff1f;附代码&#xff0c;有一定的参考价值&#xff0c;有需要的朋友可以参考一下&#xff0c;希望对你有所帮助。修改原生checkbox样式。效果原理1.利用CSS3属性 appearance。该属性(强制)更改(改变)默认(原生)样式…

JAVA用数据留给出师表排序,如果诸葛亮会编程,用Java写出师表...

继上一篇 "如果诸葛亮用C#写出师表..."后&#xff0c;站长想自己的第一语言是Java&#xff0c;虽然平时工作上用的不多&#xff0c;也用Java实现一遍吧&#xff0c;改改就是了&#xff0c;无非就是:C#的Console.WriteLine改为Java的System.out.println&#xff1b;C#…

linux下的安装命令行工具下载,linux系统程序安装(二)yum工具2-yum源及包下载

继续我们的yum工具应用之旅&#xff0c;yum工具之所以方便就是因为有方便的在线云库&#xff0c;实际工作中我们可能没办法链接互联网&#xff0c;或者我们想安装的程序原生源那么我们能不能用其他方式应用方便的yum源呢&#xff1f;一、使用光盘作为yum源1、将光盘挂载到/mnt目…

体积最小桌面linux,Tiny Core Linux - 体积最小的精简 Linux 操作系统发行版之一 (仅10多MB)...

Tiny Core Linux (TCL) 是一款极体积极小且高度可扩展的微型 Linux 发行版&#xff0c;它将一个 Linux 操作系统精简到仅有 10 多 MB 左右的大小&#xff0c;似乎小巧得有点让人叹为观止&#xff01;要知道无论是常见的 Ubuntu、CentOS、Debian 的体积动辄就是几百MB甚至要上GB…

linux新建samba账户,ubuntu上创建账户和samba用户

系统环境&#xff1a;Linux ubuntu152 3.2.0-23-generic #36-Ubuntu SMP Tue Apr 10 20:39:51 UTC 2012 x86_64 x86_64x86_64 GNU/Linux系统用户登录创建linux账户:1. sudo adduser username --home /home/username执行该命令后需要两次输入账户密码&#xff0c;连续回车&…

在c语言中优先级最低的是6,C语言中 *,<<,= ,->哪个优先级最低

满意答案nishiwodezmx推荐于 2016.03.13采纳率&#xff1a;46% 等级&#xff1a;12已帮助&#xff1a;6812人最高的是->(指向运算符)再到*再到>>(右移运算符)再到(赋值运算符)c语言运算符号:1级优先级 左结合() 圆括号[] 下标运算符-> 指向结构体成员运算符. 结…

队列的顺序数组c语言代码,队列-队列的顺序表示和实现

队列-队列的顺序表示和实现和顺序栈相类似&#xff0c;在利用顺序分配存储结构实现队列时&#xff0c;除了用一维数组描述队列中数据元素的存储区域之外&#xff0c;尚需设立两个指针front和rear分别指示“队头”和“队尾”的位置。为了在C语言中描述方便&#xff0c;在此我们约…

android 链接分享到朋友圈,android 分享到微信朋友圈或微信好友

一、首先创建一个数字签名(keystore文件)这里不再讲述keystore的创建过程&#xff01;二、用keystore给app签名&#xff0c;注意最后如下图所示图中的md5就是 申请apkid时所需的 签名&#xff0c;这里需要注意MD5需要将其中的“&#xff1a;”去掉并将其中的大写字母改为小写三…

android studio开源代码,Android Studio Set of source 代码源集

一、源集1、定义&#xff1a;Android Studio 按逻辑关系将每个模块的源代码和资源进行分组&#xff0c;这个分组叫做源集。2、main Module 源集包括其所有构建变体共用的代码和资源。这句话很关键&#xff0c;意思是&#xff0c;所有的其他构建变体&#xff0c;src/main是其共同…

android nougat和安卓7.1,Android Nougat 7.1.2 先睹为快

Android Nougat 的下一个维护版本 7.1.2 即将发布&#xff01;为了让广大开发者有机会抢先尝鲜&#xff0c;我们从今天开始向已注册 Android Beta 计划、符合条件的设备(包括 Pixel 和 Pixel XL、Nexus 5X、Nexus Player 和 Pixel C 设备)推出公众测试版本。我们还在准备 Nexus…

android app 移植到pc,微软开发新应用把Android app“移植”到Win Phone

依据微软2010年公布在Tweet上的一份专利申请书&#xff0c;微软正在开发一种新服务能够在不同的手机操作系统中“移植”应用和应用数据。这是说Android&#xff0c;iOS应用以后可以跨平台“移植”到 Win Phone Win8系统上&#xff1f;因为Win Phone 著名的“应用问题”&#xf…

鸿蒙系统能否推广,鸿蒙系统凭实力占市场,无需通过禁止安卓系统来推广

有人提议中国应该全面禁用安卓系统&#xff0c;以推广鸿蒙系统(HarmonyOS)&#xff0c;甚至还给出了比如安全等理由。确实有这个必要吗&#xff1f;答案是否定的。一、安卓系统是开源系统&#xff0c;不存在被谷歌完全操控而影响到中国安全的问题。实际上安卓系统诸如碎片化、应…

android ut接口介绍,CMCC UT接口(IMS SS)规范说明

OverviewCMCC 关于 UT 部分的说明及规范请参考 "中国移动VoLTE终端技术规范" 中的第6节说明.写在前面: 由于 CMCC 的XCAP server(UT) 和 CS Server(HLR)暂不支持 SS 业务配置SYNC. 因此, 针对支持 UT 的卡, 以及所在城市提供 UT 服务, 则如果某项 SS 业务不支持通过 …

五年级信息技术上册教案计算机主机探秘,第1课信息与信息技术探秘教案

第1课信息与信息技术探秘教案&#xff3b;教学目标&#xff3d;1、知识与技能(1)了解什么是信息、信息传递和信息处理的基本知识。(2)了解信息技术及其应用、发展的基本情况。(3)了解计算机在信息处理中的作用和地位。2、过程与方法以直观的手段让学生初步了解信息和信息技术。…

pc端html轮播带滑块,swiper.js简单快速实现轮播滑动(兼容PC端、移动端)

swiper是一款免费以及轻量级轮播滑动的js框架&#xff0c;适用于PC端跟移动端&#xff0c;官方地址&#xff1a;(https://www.swiper.com.cn/)效果演示&#xff1a;PC端移动端(在浏览器将设备切换为手机&#xff0c;这里切换为iphone)&#xff0c;swiper支持移动端触控左右滑动…