参考:http://hbase.apache.org/2.0/book.html#cp
Hbase2.0 不支持 1.x版本的RegionObserver ,查看hbase官网更新说明,自己做了测试并通过
- Hbase RegionObserver
import java.io.IOException;
import java.util.List;
import java.util.Optional;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;import com.izhonghong.util.Md5Utils;//注:2.0版本之前使用extends BaseRegionObserver 实现,
public class ObserverTest implements RegionObserver,RegionCoprocessor {private static final Log LOG = LogFactory.getLog(ObserverTest.class);static Connection connection = null; static Table table = null;static{Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","test:2181");try {connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf("tableName"));} catch (Exception e) {e.printStackTrace();}}private RegionCoprocessorEnvironment env = null;private static final String FAMAILLY_NAME = "fn";private static final String UID = "uid";private static final String BIZ = "biz";//2.0加入该方法,否则无法生效@Overridepublic Optional<RegionObserver> getRegionObserver() {// Extremely important to be sure that the coprocessor is invoked as a RegionObserverreturn Optional.of(this);}@Overridepublic void start(CoprocessorEnvironment e) throws IOException {env = (RegionCoprocessorEnvironment) e;}@Overridepublic void stop(CoprocessorEnvironment e) throws IOException {// nothing to do here}@Overridepublic void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final Durability durability)throws IOException {try {List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(UID));if (list == null || list.size() == 0) {return;}Cell cell2 = list.get(0);String uid = Bytes.toString(CellUtil.cloneValue(cell2));Put put2 = new Put(Bytes.toBytes(Md5Utils.getMd5ByStr(uid)));put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(BIZ), Bytes.toBytes(uid));put2.setTTL(1000l * 60 * 60 *24 *30);table.put(put2);table.close();} catch (Exception e1) {LOG.error("异常------->>>>>> "+e1.getMessage());return ;}}}
- 协处理器安装-表级别安装(这里coprocessor是hdfs路径,需提前将jar包发布到hdfs)
disable 'tableName'
alter 'tableName' , METHOD =>'table_att','coprocessor'=>'/hbase/coprocessor/hbase-coprocessor-1.0.0.jar|com.xxx.hbase.coprocessor.xxx|1001'
enable 'tableName'
- 协处理器卸载
disable 'tableName'
alter 'tableName', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
enable 'tableName'