Hbase 协处理器 RegionObserver

参考链接1:https://www.cnblogs.com/ios123/p/6370724.html

参考链接2:http://www.zhyea.com/2017/04/13/using-hbase-coprocessor.html

RegionObserver

注:每次更新协处理器方法,最好加上版本更新,否则可能会出现更新失败

  • 协处理器安装-表级别安装
disable 'wechat_article'
alter 'wechat_article' , METHOD =>'table_att','coprocessor'=>'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.6-SNAPSHOT.jar|com.izhonghong.hbase.coprocessor.WechatUserObserver|1001'
enable 'wechat_article'
  • 卸载协处理器:
disable 'wechat_article'
alter 'wechat_article', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
enable
  • 查看是否成功
hbase(main):002:0> desc 'wechat_article'
Table wechat_article is ENABLED                                                                                                                            
wechat_article, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.14-SNAPSHOT.jar|com.izhonghong.hbase.coproces
sor.WechatUserObserver|1001'}                                                                                                                              
COLUMN FAMILIES DESCRIPTION                                                                                                                                
{NAME => 'fn', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER',COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                         
1 row(s) in 0.3030 seconds

代码开发

代码查看(RegionObserverDemo.java):http://note.youdao.com/noteshare?id=9e8b53139c00840d00308356dda0f203&sub=B241C0BD8E46423EBB48DDC285EC5BC2

  • prePut-插入前处理数据
插入数据前判断download_type类型是否在200-300范围中,如果在直接将用户id插入到另外一个表中,并做关联
@Overridepublic void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final Durability durability)throws IOException {LOG.warn("###########################################");// 获取列簇为FAMAILLY_NAME,列名为DOWNLOAD_TYPE的数据List<Cell> cells = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(DOWNLOAD_TYPE));//判断列名为DOWNLOAD_TYPE是否包含数据,不包含直接return,退出处理if (cells == null || cells.size() == 0) {LOG.warn("download_type 字段不存在退出过滤");return;}// 列名为DOWNLOAD_TYPE已包含数据,进行处理byte[] aValue = null;for (Cell cell : cells) {try {//DOWNLOAD_TYPE转换为数字aValue = CellUtil.cloneValue(cell);Integer valueOf = Integer.valueOf(Bytes.toString(aValue));if(valueOf>=200 &&valueOf<=300) {//如果DOWNLOAD_TYPE范围在200-300之间,获取用户UID信息List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(UID));//判断用户是否包含UIDif (list == null || list.size() == 0) {LOG.warn("用户BIZ不存在,或者为空");return;}//获取用户UIDCell cell2 = list.get(0);LOG.warn("UID--->"+Bytes.toString(CellUtil.cloneValue(cell2)));//将用户UID设置为rowkey,原数据的rowkey当作列名,download_type当作列值Put put2 = new Put(CellUtil.cloneValue(cell2));put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), put.getRow(), aValue);//插入数据到table中table.put(put2);table.close();}else {LOG.warn("Download type is not in range.");}} catch (Exception e1) {LOG.error("异常------->>>>>> "+e1.getMessage());return ;}}}
  • preGetOp-处理返回结果只对get有效
@Overridepublic void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,final Get get, final List<Cell> results) throws IOException { //判断查询的rowkey是否等于testif(Bytes.equals(get.getRow(),Bytes.toBytes("test"))) { //新增返回数据fn:time,值为当前时间戳给客户端KeyValue kv = new KeyValue(get.getRow(),Bytes.toBytes("fn"),Bytes.toBytes("time"),1535555555555l ,Bytes.toBytes(String.valueOf(System.currentTimeMillis())));results.add(kv);}}
  • preScannerOpen-在客户端打开新扫描仪之前过滤,此方法会覆盖原有filter
@Overridepublic RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,final RegionScanner s) throws IOException {//查询rowkey等于test的行进行过滤(显示不等test的数据),会覆盖原有filterFilter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("test")));scan.setFilter(filter);return s;}
  • postScannerNext,对返回结果进行过滤
@Overridepublic boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,final List<Result> results, final int limit, final boolean hasMore) throws IOException {Result result = null;//获取返回结果,如果rowkey等于test则过滤掉Iterator<Result> iterator = results.iterator();while (iterator.hasNext()) {result = iterator.next();if (Bytes.equals(result.getRow(), Bytes.toBytes("test2"))) {iterator.remove();break;}}return hasMore;}
  • preDelete,,删除列前进行判断该列是否可以删除
@Overridepublic void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,final Delete delete, final WALEdit edit, final Durability durability)throws IOException {// 判断是否对列簇FAMAILLY_NAME操作List<Cell> cells = delete.getFamilyCellMap().get(Bytes.toBytes(FAMAILLY_NAME));if (cells == null || cells.size() == 0) {//如果没有对指定列簇操作则跳过判断,直接执行语句return;}byte[] qualifierName = null;boolean aDeleteFlg = false;for (Cell cell : cells) {//获取带操作的列名qualifierName = CellUtil.cloneQualifier(cell);// 如果列名等于DOWNLOAD_TYPE ,则抛出异常。注://需查看该值在集群中配置多少,否则重试好几百次性能会很差。//conf.set("hbase.client.retries.number", "1");//client失败重试次数if (Arrays.equals(qualifierName, Bytes.toBytes(DOWNLOAD_TYPE))) {throw new IOException("You cannot delete read-only columns.");}// 检查是否存在对UID列进行删除if (Arrays.equals(qualifierName, Bytes.toBytes(UID))) {aDeleteFlg = true;}}// 如果对于UID列有删除,则需要对DOWNLOAD_TYPE列也要删除if (aDeleteFlg){delete.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(DOWNLOAD_TYPE));}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509705.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

搭建zookeeper集群环境详解

第一步:上传zookeeper.jar.gz文件至一台虚拟机&#xff0c;并解压至 /root/apps 目录下&#xff08;如没有该目录则手动创建&#xff09; 第二步&#xff1a;进入在/root/apps/zookeeper目录下 &#xff0c;删除不必要文件 rm -rf .txt .xml docs dist-maven src 第三步: 进入…

顺序表应用2:多余元素删除之建表算法

题目描述 一个长度不超过10000数据的顺序表&#xff0c;可能存在着一些值相同的“多余”数据元素&#xff08;类型为整型&#xff09;&#xff0c;编写一个程序将“多余”的数据元素从顺序表中删除&#xff0c;使该表由一个“非纯表”&#xff08;值相同的元素在表中可能有多个…

OSG框架分析

本文参考<<osg最长一帧>>, <<OpenSceneGraph三维渲染引擎编程指南>>, <<OpenSceneGraph三维渲染引擎设计与实践>> 整理而来, 感谢大牛们的精彩著作. 相比Ogre来说, Ogre代码很规范, 只是入门资料较少,如果能在学习之前能总体上对架构有个…

自动化部署脚本开启所有zookpeer等服务

1.为了方便ssh连接&#xff0c;最好把所有的主机之间实现免密登录 ssh-keygen sh-copy-id 主机名 2.在根目录下新建bin文件夹&#xff0c;并创建部署的脚本start.sh #!/bin/bash for i in 1 2 3 dossh weijie$i "source /etc/profile; /root/apps/zookeeper-3.4.5/bin…

顺序表应用3:元素位置互换之移位算法

题目描述 一个长度为len(1<len<1000000)的顺序表&#xff0c;数据元素的类型为整型&#xff0c;将该表分成两半&#xff0c;前一半有m个元素&#xff0c;后一半有len-m个元素&#xff08;1<m<len)&#xff0c;借助元素移位的方式&#xff0c;设计一个空间复杂度为O…

Hbase 协处理器之将数据保存到es (二级索引)

利用Hbase Coprocessor 实现将插入hbase中的数据保存至ElasticSearch中&#xff0c;实现二级索引目的 版本&#xff1a; Hbase&#xff1a; 2.1 ES&#xff1a;6.3.0 一、Coprocessor代码开发 协处理器类 package wiki.hadoop.coprocessor;import org.apache.hadoop.hbas…

Delta3d框架学习--程序启动过程详解

一个Delta3d程序启动过程详解 一、初始化一个dtGame::GameApplication的实例&#xff0c;dtGame::GameApplication* app new dtGame::GameApplication(); 设置游戏库的名称&#xff0c;SetGameLibraryName("libname")&#xff1b; 调用app->Config("conf…

在Eclipse中如何操作zookpeer

导入jar包 jar包下载链接 代码解析 package com.itcast.zookpeer.zk;import java.io.IOException; import java.util.List;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apac…

顺序表应用4:元素位置互换之逆置算法

题目描述 一个长度为len(1<len<1000000)的顺序表&#xff0c;数据元素的类型为整型&#xff0c;将该表分成两半&#xff0c;前一半有m个元素&#xff0c;后一半有len-m个元素&#xff08;1<m<len)&#xff0c;设计一个时间复杂度为O(N)、空间复杂度为O(1)的算法&am…

Linux 系统进程守护工具 cesi + superviosr

一、安装 Supervisor pip install supervisor 使用 echo_supervisord_conf 命令生成默认配置文件 echo_supervisord_conf > /etc/supervisord.conf 配置文件说明 位置&#xff1a;etc/supervisord.conf内容&#xff1a;# 指定了socket file的位置 [unix_http_server] f…

Delta3d组件以及消息机制

在游戏管理器&#xff08;GameManager&#xff09;中维护一个消息队列std::queue(mSendMessageQueue),在GameManager::SendMessage中将消息放入队列中&#xff0c;如下 void GameManager::SendMessage(const Message& message){mGMImpl->mSendMessageQueue.push(dtCore:…

集合类三种遍历方式

package com.test;import java.util.ArrayList; import java.util.Iterator;//集合三种遍历方式 /** iterator的操作是有限的&#xff0c;只能对元素进行判断、取出、删除操作* 如果进行其他操作用ListIterator* */ public class Array_list {public static void main(String[]…

顺序表应用6:有序顺序表查询

题目描述 顺序表内按照由小到大的次序存放着n个互不相同的整数&#xff08;1<n<20000)&#xff0c;任意输入一个整数&#xff0c;判断该整数在顺序表中是否存在。如果在顺序表中存在该整数&#xff0c;输出其在表中的序号&#xff1b;否则输出“No Found!"。输入 第…

Docker 服务器安装(一)

使用官方安装脚本自动安装 安装命令如下&#xff1a; curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 也可以使用国内 daocloud 一键安装命令&#xff1a; curl -sSL https://get.daocloud.io/docker | sh 设置docker 加速器 sudo curl -sSL https…

游戏入口点GameEntryPoint

GameStart 通过在我们的库中查找入口点类来调用相应的接口启动我们的应用程序。一旦它找到了入口点&#xff0c;它会调用三个函数来替换掉它自己的应用转向执行我们的游戏循环。游戏入口点对于 GameStart 来说就像ActorPluginRegistry 对于 ActorLibrary 一样。游戏入口点有以下…

Docker 入门使用 (二)

配置国内的源 > /etc/docker/daemon.json{"registry-mirrors" : ["https://mirror.ccs.tencentyun.com","http://registry.docker-cn.com","http://docker.mirrors.ustc.edu.cn","http://hub-mirror.c.163.com"],"…

顺序表应用5:有序顺序表归并

题目描述 已知顺序表A与B是两个有序的顺序表&#xff0c;其中存放的数据元素皆为普通整型&#xff0c;将A与B表归并为C表&#xff0c;要求C表包含了A、B表里所有元素&#xff0c;并且C表仍然保持有序。输入 输入分为三行&#xff1a;第一行输入m、n&#xff08;1<m,n<100…

Delta3d角色注册机制

角色注册主要通过继承自类dtDAL::ActorPluginRegistry类来实现&#xff0c;重写其中的RegisterActorTypes()即可&#xff1b;在对象工厂ObjectFactory中保存了“角色类型到负责创建角色对象的全局函数”的Map&#xff1b; 关键函数有&#xff1a; dtCore::RefPtr<BaseActor…

Docker 使用Dockerfile构建自己的docker服务(三)

先介绍一下DockerFile文件的一些指令说明 DockerFile的指令 FROM 基础镜镜像&#xff0c;一切从这里开始构建 MAINTAINER 镜像是谁写的&#xff0c;姓名邮箱 RUN 镜像构建的时候需要运行的$令 ADD 步骤&#xff0c;tomcat镜像&#xff0c; 这个tomcat压缩包!添加内容 WORKDI…

数据结构实验之栈一:进制转换

题目描述 输入一个十进制整数&#xff0c;将其转换成对应的R&#xff08;2<R<9)进制数,并输出。输入 第一行输入需要转换的十进制数&#xff1b;第二行输入R。输出 输出转换所得的R进制数。示例输入 1279 8 示例输出 2377 #include <stdio.h> #include <stdlib.…