参考链接1:https://www.cnblogs.com/ios123/p/6370724.html
参考链接2:http://www.zhyea.com/2017/04/13/using-hbase-coprocessor.html
RegionObserver
注:每次更新协处理器方法,最好加上版本更新,否则可能会出现更新失败
- 协处理器安装-表级别安装
disable 'wechat_article'
alter 'wechat_article' , METHOD =>'table_att','coprocessor'=>'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.6-SNAPSHOT.jar|com.izhonghong.hbase.coprocessor.WechatUserObserver|1001'
enable 'wechat_article'
- 卸载协处理器:
disable 'wechat_article'
alter 'wechat_article', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
enable
- 查看是否成功
hbase(main):002:0> desc 'wechat_article'
Table wechat_article is ENABLED
wechat_article, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.14-SNAPSHOT.jar|com.izhonghong.hbase.coproces
sor.WechatUserObserver|1001'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'fn', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER',COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
1 row(s) in 0.3030 seconds
代码开发
代码查看(RegionObserverDemo.java):http://note.youdao.com/noteshare?id=9e8b53139c00840d00308356dda0f203&sub=B241C0BD8E46423EBB48DDC285EC5BC2
- prePut-插入前处理数据
插入数据前判断download_type类型是否在200-300范围中,如果在直接将用户id插入到另外一个表中,并做关联
@Overridepublic void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final Durability durability)throws IOException {LOG.warn("###########################################");// 获取列簇为FAMAILLY_NAME,列名为DOWNLOAD_TYPE的数据List<Cell> cells = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(DOWNLOAD_TYPE));//判断列名为DOWNLOAD_TYPE是否包含数据,不包含直接return,退出处理if (cells == null || cells.size() == 0) {LOG.warn("download_type 字段不存在退出过滤");return;}// 列名为DOWNLOAD_TYPE已包含数据,进行处理byte[] aValue = null;for (Cell cell : cells) {try {//DOWNLOAD_TYPE转换为数字aValue = CellUtil.cloneValue(cell);Integer valueOf = Integer.valueOf(Bytes.toString(aValue));if(valueOf>=200 &&valueOf<=300) {//如果DOWNLOAD_TYPE范围在200-300之间,获取用户UID信息List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(UID));//判断用户是否包含UIDif (list == null || list.size() == 0) {LOG.warn("用户BIZ不存在,或者为空");return;}//获取用户UIDCell cell2 = list.get(0);LOG.warn("UID--->"+Bytes.toString(CellUtil.cloneValue(cell2)));//将用户UID设置为rowkey,原数据的rowkey当作列名,download_type当作列值Put put2 = new Put(CellUtil.cloneValue(cell2));put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), put.getRow(), aValue);//插入数据到table中table.put(put2);table.close();}else {LOG.warn("Download type is not in range.");}} catch (Exception e1) {LOG.error("异常------->>>>>> "+e1.getMessage());return ;}}}
- preGetOp-处理返回结果只对get有效
@Overridepublic void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,final Get get, final List<Cell> results) throws IOException { //判断查询的rowkey是否等于testif(Bytes.equals(get.getRow(),Bytes.toBytes("test"))) { //新增返回数据fn:time,值为当前时间戳给客户端KeyValue kv = new KeyValue(get.getRow(),Bytes.toBytes("fn"),Bytes.toBytes("time"),1535555555555l ,Bytes.toBytes(String.valueOf(System.currentTimeMillis())));results.add(kv);}}
- preScannerOpen-在客户端打开新扫描仪之前过滤,此方法会覆盖原有filter
@Overridepublic RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,final RegionScanner s) throws IOException {//查询rowkey等于test的行进行过滤(显示不等test的数据),会覆盖原有filterFilter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("test")));scan.setFilter(filter);return s;}
- postScannerNext,对返回结果进行过滤
@Overridepublic boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,final List<Result> results, final int limit, final boolean hasMore) throws IOException {Result result = null;//获取返回结果,如果rowkey等于test则过滤掉Iterator<Result> iterator = results.iterator();while (iterator.hasNext()) {result = iterator.next();if (Bytes.equals(result.getRow(), Bytes.toBytes("test2"))) {iterator.remove();break;}}return hasMore;}
- preDelete,,删除列前进行判断该列是否可以删除
@Overridepublic void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,final Delete delete, final WALEdit edit, final Durability durability)throws IOException {// 判断是否对列簇FAMAILLY_NAME操作List<Cell> cells = delete.getFamilyCellMap().get(Bytes.toBytes(FAMAILLY_NAME));if (cells == null || cells.size() == 0) {//如果没有对指定列簇操作则跳过判断,直接执行语句return;}byte[] qualifierName = null;boolean aDeleteFlg = false;for (Cell cell : cells) {//获取带操作的列名qualifierName = CellUtil.cloneQualifier(cell);// 如果列名等于DOWNLOAD_TYPE ,则抛出异常。注://需查看该值在集群中配置多少,否则重试好几百次性能会很差。//conf.set("hbase.client.retries.number", "1");//client失败重试次数if (Arrays.equals(qualifierName, Bytes.toBytes(DOWNLOAD_TYPE))) {throw new IOException("You cannot delete read-only columns.");}// 检查是否存在对UID列进行删除if (Arrays.equals(qualifierName, Bytes.toBytes(UID))) {aDeleteFlg = true;}}// 如果对于UID列有删除,则需要对DOWNLOAD_TYPE列也要删除if (aDeleteFlg){delete.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(DOWNLOAD_TYPE));}}