Hbase 协处理器之将数据保存到es (二级索引)

利用Hbase Coprocessor 实现将插入hbase中的数据保存至ElasticSearch中,实现二级索引目的

版本:

Hbase: 2.1

ES:6.3.0

一、Coprocessor代码开发

协处理器类

package wiki.hadoop.coprocessor;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.log4j.Logger;import wiki.hadoop.es.ESClient;
import wiki.hadoop.es.ElasticSearchBulkOperator;import java.io.IOException;
import java.util.*;public class HbaseDataSyncEsObserver implements RegionObserver , RegionCoprocessor {private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class);private String index = "user_test";private String type = "user_test_type";public Optional<RegionObserver> getRegionObserver() {return Optional.of(this);}@Overridepublic void start(CoprocessorEnvironment env) throws IOException {// init ES clientESClient.initEsClient();LOG.info("****init start*****");}@Overridepublic void stop(CoprocessorEnvironment env) throws IOException {ESClient.closeEsClient();// shutdown time taskElasticSearchBulkOperator.shutdownScheduEx();LOG.info("****end*****");}@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {String indexId = new String(put.getRow());try {NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();Map<String, Object> infoJson = new HashMap<>();Map<String, Object> json = new HashMap<>();for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {for (Cell cell : entry.getValue()) {String key = Bytes.toString(CellUtil.cloneQualifier(cell));String value = Bytes.toString(CellUtil.cloneValue(cell));json.put(key, value);}}// set hbase family to esinfoJson.put("info", json);LOG.info(json.toString());ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(index,type, indexId).setDocAsUpsert(true).setDoc(json));LOG.info("**** postPut success*****");} catch (Exception ex) {LOG.error("observer put  a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());}}@Overridepublic void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {String indexId = new String(delete.getRow());try {ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(index,type, indexId));LOG.info("**** postDelete success*****");} catch (Exception ex) {LOG.error(ex);LOG.error("observer delete  a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());}}
}

es工具类

package wiki.hadoop.es;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ElasticSearchBulkOperator {private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);private static final int MAX_BULK_COUNT = 10000;private static BulkRequestBuilder bulkRequestBuilder = null;private static final Lock commitLock = new ReentrantLock();private static ScheduledExecutorService scheduledExecutorService = null;static {// 初始化  bulkRequestBuilderbulkRequestBuilder = ESClient.client.prepareBulk();bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);// 初始化线程池大小为1scheduledExecutorService = Executors.newScheduledThreadPool(1);//创建一个Runnable对象,提交待写入的数据,并使用commitLock锁保证线程安全final Runnable beeper = () -> {commitLock.lock();try {LOG.info("Before submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());//提交数据至esbulkRequest(0);LOG.info("After submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());} catch (Exception ex) {System.out.println(ex.getMessage());} finally {commitLock.unlock();}};//初始化延迟10s执行 runnable方法,后期每隔30s执行一次scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);}public static void shutdownScheduEx() {if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {scheduledExecutorService.shutdown();}}private static void bulkRequest(int threshold) {if (bulkRequestBuilder.numberOfActions() > threshold) {BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();if (!bulkItemResponse.hasFailures()) {bulkRequestBuilder = ESClient.client.prepareBulk();}}}/*** add update builder to bulk* use commitLock to protected bulk as thread-save* @param builder*/public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {LOG.error(" update Bulk " + "gejx_test" + " index error : " + ex.getMessage());} finally {commitLock.unlock();}}/*** add delete builder to bulk* use commitLock to protected bulk as thread-save** @param builder*/public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {LOG.error(" delete Bulk " + "gejx_test" + " index error : " + ex.getMessage());} finally {commitLock.unlock();}}
}
package wiki.hadoop.es;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;/*** ES Cleint class*/
public class ESClient {public static Client client;private static final Log log = LogFactory.getLog(ESClient.class);/*** init ES client*/public static void initEsClient() throws UnknownHostException {log.info("初始化es连接开始");System.setProperty("es.set.netty.runtime.available.processors", "false");Settings esSettings = Settings.builder().put("cluster.name", "log_cluster")//设置ES实例的名称.put("client.transport.sniff", true).build();client = new  PreBuiltTransportClient(esSettings).addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300)).addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300)).addTransportAddress(new TransportAddress(InetAddress.getByName("host3"), 9300));log.info("初始化es连接完成");}/*** Close ES client*/public static void closeEsClient() {client.close();log.info("es连接关闭");}
}

二、ES创建索引

简单创建个测试的

PUT user_test
{"settings": {"number_of_replicas": 1, "number_of_shards": 5}
}PUT user_test/_mapping/user_test_type
{"user_test_type":{"properties":{"name":{"type":"text"},"city":{"type":"text"},"province":{"type":"text"},"followers_count":{"type":"long"},"friends_count":{"type":"long"},"statuses_count":{"type":"long"}}}
}

三、协处理器安装

将jar包传至HDFS中,执行命令进行协处理器安装

disable 'es_test'
alter 'es_test' , METHOD =>'table_att','coprocessor'=>'/es-coprocessor-0.0.4-jar-with-dependencies.jar|wiki.hadoop.coprocessor.HbaseDataSyncEsObserver|1001'
enable 'es_test'
desc 'es_test'

 四、添加数据进行测试

在kibana中进行查询es数据发现成功插入,在regionServer中查询日志,也发现了我们代码中打印的日志。

 

五、可能遇到的问题

5.1 提示netty jar包冲突

2020-08-12 09:24:55,842 INFO wiki.hadoop.es.ESClient: 初始化es连接开始
2020-08-12 09:24:56,145 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: HBase metrics system started
2020-08-12 09:24:56,260 ERROR org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost: Failed to load coprocessor wiki.hadoop.coprocessor.HbaseDataSyncEsObserver
java.lang.NoSuchMethodError: io.netty.util.AttributeKey.newInstance(Ljava/lang/String;)Lio/netty/util/AttributeKey;at org.elasticsearch.transport.netty4.Netty4Transport.<clinit>(Netty4Transport.java:232)at org.elasticsearch.transport.Netty4Plugin.getSettings(Netty4Plugin.java:56)at org.elasticsearch.plugins.PluginsService.lambda$getPluginSettings$0(PluginsService.java:89)at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)at org.elasticsearch.plugins.PluginsService.getPluginSettings(PluginsService.java:89)at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:144)at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:280)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)at wiki.hadoop.es.ESClient.initEsClient(ESClient.java:35)at wiki.hadoop.coprocessor.HbaseDataSyncEsObserver.start(HbaseDataSyncEsObserver.java:85)at org.apache.hadoop.hbase.coprocessor.BaseEnvironment.startup(BaseEnvironment.java:72)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.checkAndLoadInstance(CoprocessorHost.java:263)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:226)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:185)at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.loadTableCoprocessors(RegionCoprocessorHost.java:378)at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.<init>(RegionCoprocessorHost.java:274)at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:800)at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:702)at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at org.apache.hadoop.hbase.regionserver.HRegion.newHRegion(HRegion.java:6785)at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:6985)

查看后发现hbase-server中包含netty包,我们直接把他过滤掉,他和es的netty冲突了

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><exclusions><exclusion><groupId>io.netty</groupId><artifactId>netty-all</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><exclusions><exclusion><groupId>org.apache.htrace</groupId><artifactId>htrace-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency>

重新打包安装,查看hbase regionserver 中发现运行正常。去es中查询也运行正常

2020-08-12 16:08:28,215 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=跟我一起学知识}
2020-08-12 16:08:28,216 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=教你做点心}
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****

 

具体代码

https://github.com/zhangshenghang/hbase-coprocessors

参考:https://blog.csdn.net/weixin_42348946/article/details/97941152

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Delta3d框架学习--程序启动过程详解

一个Delta3d程序启动过程详解 一、初始化一个dtGame::GameApplication的实例&#xff0c;dtGame::GameApplication* app new dtGame::GameApplication(); 设置游戏库的名称&#xff0c;SetGameLibraryName("libname")&#xff1b; 调用app->Config("conf…

在Eclipse中如何操作zookpeer

导入jar包 jar包下载链接 代码解析 package com.itcast.zookpeer.zk;import java.io.IOException; import java.util.List;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apac…

顺序表应用4:元素位置互换之逆置算法

题目描述 一个长度为len(1<len<1000000)的顺序表&#xff0c;数据元素的类型为整型&#xff0c;将该表分成两半&#xff0c;前一半有m个元素&#xff0c;后一半有len-m个元素&#xff08;1<m<len)&#xff0c;设计一个时间复杂度为O(N)、空间复杂度为O(1)的算法&am…

Linux 系统进程守护工具 cesi + superviosr

一、安装 Supervisor pip install supervisor 使用 echo_supervisord_conf 命令生成默认配置文件 echo_supervisord_conf > /etc/supervisord.conf 配置文件说明 位置&#xff1a;etc/supervisord.conf内容&#xff1a;# 指定了socket file的位置 [unix_http_server] f…

Delta3d组件以及消息机制

在游戏管理器&#xff08;GameManager&#xff09;中维护一个消息队列std::queue(mSendMessageQueue),在GameManager::SendMessage中将消息放入队列中&#xff0c;如下 void GameManager::SendMessage(const Message& message){mGMImpl->mSendMessageQueue.push(dtCore:…

集合类三种遍历方式

package com.test;import java.util.ArrayList; import java.util.Iterator;//集合三种遍历方式 /** iterator的操作是有限的&#xff0c;只能对元素进行判断、取出、删除操作* 如果进行其他操作用ListIterator* */ public class Array_list {public static void main(String[]…

顺序表应用6:有序顺序表查询

题目描述 顺序表内按照由小到大的次序存放着n个互不相同的整数&#xff08;1<n<20000)&#xff0c;任意输入一个整数&#xff0c;判断该整数在顺序表中是否存在。如果在顺序表中存在该整数&#xff0c;输出其在表中的序号&#xff1b;否则输出“No Found!"。输入 第…

Docker 服务器安装(一)

使用官方安装脚本自动安装 安装命令如下&#xff1a; curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 也可以使用国内 daocloud 一键安装命令&#xff1a; curl -sSL https://get.daocloud.io/docker | sh 设置docker 加速器 sudo curl -sSL https…

游戏入口点GameEntryPoint

GameStart 通过在我们的库中查找入口点类来调用相应的接口启动我们的应用程序。一旦它找到了入口点&#xff0c;它会调用三个函数来替换掉它自己的应用转向执行我们的游戏循环。游戏入口点对于 GameStart 来说就像ActorPluginRegistry 对于 ActorLibrary 一样。游戏入口点有以下…

Docker 入门使用 (二)

配置国内的源 > /etc/docker/daemon.json{"registry-mirrors" : ["https://mirror.ccs.tencentyun.com","http://registry.docker-cn.com","http://docker.mirrors.ustc.edu.cn","http://hub-mirror.c.163.com"],"…

顺序表应用5:有序顺序表归并

题目描述 已知顺序表A与B是两个有序的顺序表&#xff0c;其中存放的数据元素皆为普通整型&#xff0c;将A与B表归并为C表&#xff0c;要求C表包含了A、B表里所有元素&#xff0c;并且C表仍然保持有序。输入 输入分为三行&#xff1a;第一行输入m、n&#xff08;1<m,n<100…

Delta3d角色注册机制

角色注册主要通过继承自类dtDAL::ActorPluginRegistry类来实现&#xff0c;重写其中的RegisterActorTypes()即可&#xff1b;在对象工厂ObjectFactory中保存了“角色类型到负责创建角色对象的全局函数”的Map&#xff1b; 关键函数有&#xff1a; dtCore::RefPtr<BaseActor…

Docker 使用Dockerfile构建自己的docker服务(三)

先介绍一下DockerFile文件的一些指令说明 DockerFile的指令 FROM 基础镜镜像&#xff0c;一切从这里开始构建 MAINTAINER 镜像是谁写的&#xff0c;姓名邮箱 RUN 镜像构建的时候需要运行的$令 ADD 步骤&#xff0c;tomcat镜像&#xff0c; 这个tomcat压缩包!添加内容 WORKDI…

数据结构实验之栈一:进制转换

题目描述 输入一个十进制整数&#xff0c;将其转换成对应的R&#xff08;2<R<9)进制数,并输出。输入 第一行输入需要转换的十进制数&#xff1b;第二行输入R。输出 输出转换所得的R进制数。示例输入 1279 8 示例输出 2377 #include <stdio.h> #include <stdlib.…

Delta3d动态角色层

DAL 采用一种灵活的、非侵入式的机制来暴露游戏角色的属性信息。 其中两大基础组件就是角色代理和角色属性。角色代理组件就是对底层游戏角色的一个封装&#xff0c;维护单个游戏角色的所有属性信息。而属性组件通过提供对单个游戏角色的所有属性的属性数据访问器来暴露角色的属…

Hbase Shell Filter 过滤

Get 和 Scan 操作都可以使用过滤器来设置输出的范围&#xff0c;类似于 SQL 里面的 Where 查询条件。使用 show_filters 命令可以查看当前 HBase 支持的 过滤器类型。 show_filters使用过滤器的语法格式&#xff1a; scan 表名,{Filter > ”过滤器(比较运算符,’比较器’)…

equals 和 == 的区别?知乎转载

作者&#xff1a;知乎用户 链接&#xff1a;https://www.zhihu.com/question/26872848/answer/34364603 来源&#xff1a;知乎 著作权归作者所有。商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处。 简单易懂 Java 语言里的 equals方法其实是交给开发者去覆写…

ElasticSearch sql 插件安装

PS&#xff1a;6.3 开始 ElasticSearch 自身已经支持SQL查询。 github地址&#xff1a;https://github.com/NLPchina/elasticsearch-sql 一、在线安装 直接执行 ./bin/elasticsearch-plugin install https://github.com/NLPchina/elasticsearch-sql/releases/download/6.3.…

Delta3d组件机制

dtGame::GMComponent主要用于处理系统中的消息&#xff0c;给系统动态的添加功能&#xff0c;增加系统的可维护性&#xff0c; 简单来说&#xff0c;一个游戏管理器组件就是一个由游戏管理器管理的可以处理和发送消息的对象。它不像游戏角色&#xff0c;游戏管理器组件接受系统…

zookpeer实现对服务器动态上下线的监听

服务器动态上下线程序的工作机制 服务器代码&#xff1a; 补充&#xff1a;volatile关键字&#xff1a;java中一切都是对象&#xff0c;当多个线程操作同一个对象时候&#xff0c;该对象会放在堆内存中&#xff0c;而多个线程相当于在多个栈中&#xff0c;当A线程想要去除对…