1. HBase to HBase
Mapper 继承 TableMapper,输入为Rowkey和Result.
public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
public TableMapper() {
}
}
package com.scb.jason.mapper;import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/ public class User2BasicMapper extends TableMapper<ImmutableBytesWritable, Put> {private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();@Overridepublic void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//Get rowKey mapOutputkey.set(key.get());Put put = new Put(key.get());for(Cell cell:value.rawCells()){if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}}}context.write(mapOutputkey,put);}}
Reducer 继承 TableReducer
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
public TableReducer() {
}
}
package com.scb.jason.reducer;import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/ public class User2BasicReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{@Overrideprotected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for(Put put:values){context.write(null,put);}} }
Driver
package com.scb.jason.driver;import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/ public class User2BasicDriver extends Configured implements Tool{public int run(String[] strings) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());Scan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don't set to true for MR jobs// set other scan attrs TableMapReduceUtil.initTableMapperJob("user", // input tablescan, // Scan instance to control CF and attribute selectionUser2BasicMapper.class, // mapper classImmutableBytesWritable.class, // mapper output keyPut.class, // mapper output value job);TableMapReduceUtil.initTableReducerJob("basic", // output tableUser2BasicReducer.class, // reducer class job);job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new User2BasicDriver(),args);System.exit(status);}}
2. HBase to File
Mapper
package com.scb.jason.mapper;import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/ public class User2FileMapper extends TableMapper<Text, Text> {private Text rowKeyText = new Text();private Text valueText = new Text();@Overridepublic void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//Get rowKey rowKeyText.set(key.get());Put put = new Put(key.get());byte[] inforName = null;byte[] inforAge = null;for(Cell cell:value.rawCells()){if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){inforName = CellUtil.cloneValue(cell);}if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){inforAge = CellUtil.cloneValue(cell);}}}valueText.set(new String(inforName)+"\t"+new String(inforAge));context.write(rowKeyText,valueText);}}
No Reducer Reducer
Driver
package com.scb.jason.driver;import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.mapper.User2FileMapper; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/*** Created by Administrator on 2017/8/16.*/ public class User2FileDriver extends Configured implements Tool{public int run(String[] args) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());Scan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don't set to true for MR jobs// set other scan attrs TableMapReduceUtil.initTableMapperJob("user", // input tablescan, // Scan instance to control CF and attribute selectionUser2FileMapper.class, // mapper classText.class, // mapper output keyText.class, // mapper output value job);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job,new Path(args[0]));job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new User2FileDriver(),args);System.exit(status);}}
3. File to HBase
Driver
package com.scb.jason.driver;import com.scb.jason.mapper.File2HbaseMapper; import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.reducer.File2HBaseReducer; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/*** Created by Administrator on 2017/8/16.*/ public class File2BasicDriver extends Configured implements Tool{public int run(String[] strings) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());job.setMapperClass(File2HbaseMapper.class);FileInputFormat.addInputPath(job,new Path("F:\\Workspace\\File"));job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);TableMapReduceUtil.initTableReducerJob("basic", // output tableFile2HBaseReducer.class, // reducer class job);job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new File2BasicDriver(),args);System.exit(status);}}
Mapper
package com.scb.jason.mapper;import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException; import java.util.StringTokenizer;/*** Created by Administrator on 2017/8/17.*/ public class File2HbaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String lineValue = value.toString();StringTokenizer stringTokenizer = new StringTokenizer(lineValue);String rowkey = stringTokenizer.nextToken();String name = stringTokenizer.nextToken();String age = stringTokenizer.nextToken();Put put = new Put(Bytes.toBytes(rowkey));put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name));put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age));mapOutputkey.set(Bytes.toBytes(key.get()));context.write(mapOutputkey,put);}}
Reducer
package com.scb.jason.reducer;import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer;import java.io.IOException;/*** Created by Administrator on 2017/8/25.*/ public class File2HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {@Overrideprotected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for(Put put:values){context.write(null,put);}}}
4. HBase to RDBMS
public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private Connection c = null;public void setup(Context context) {// create DB connection... }public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// do summarization// in this example the keys are Text, but this is just an example }public void cleanup(Context context) {// close db connection }}
5. File -> HFile -> HBase 批量导入
http://www.cnblogs.com/shitouer/archive/2013/02/20/hbase-hfile-bulk-load.html