二级索引实现方式:Hbase + Key-Value Store Indexer + Solr
同步int数据时提示异常
异常如下
2019-12-16 17:39:18,346 WARN com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte value 101 to int
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 3at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)at com.ngdata.hbaseindexer.parse.ByteArrayValueMappers$1.mapInternal(ByteArrayValueMappers.java:37)at com.ngdata.hbaseindexer.parse.ByteArrayValueMappers$AbstractByteValueMapper.map(ByteArrayValueMappers.java:157)at com.ngdata.hbaseindexer.morphline.ExtractHBaseCellsBuilder$Mapping.extractWithSingleOutputField(ExtractHBaseCellsBuilder.java:204)at com.ngdata.hbaseindexer.morphline.ExtractHBaseCellsBuilder$Mapping.apply(ExtractHBaseCellsBuilder.java:197)at com.ngdata.hbaseindexer.morphline.ExtractHBaseCellsBuilder$ExtractHBaseCells.doProcess(ExtractHBaseCellsBuilder.java:83)at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:161)at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:186)at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:161)at com.ngdata.hbaseindexer.morphline.LocalMorphlineResultToSolrMapper.map(LocalMorphlineResultToSolrMapper.java:230)at com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper.map(MorphlineResultToSolrMapper.java:145)at com.ngdata.hbaseindexer.indexer.Indexer$RowBasedIndexer.calculateIndexUpdates(Indexer.java:289)at com.ngdata.hbaseindexer.indexer.Indexer.indexRowData(Indexer.java:144)at com.ngdata.hbaseindexer.indexer.IndexingEventListener.processEvents(IndexingEventListener.java:98)at com.ngdata.sep.impl.SepEventExecutor$1.run(SepEventExecutor.java:97)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
服务调用的是Hbase jar包下的 Bytes.toInt方法,我们再idea中调用验证一下,提示异常和hbase-solr提示异常相同
int a = Bytes.toInt(Bytes.toBytes(111));
System.out.println(a);
System.out.println("-------------------");
int b = Bytes.toInt(Bytes.toBytes("101"));
System.out.println(b);输出如下:
111
-------------------
Exception in thread "main" java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 3at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)at com.example.demo.Test.main(Test.java:23)
查看调用的方法 Bytes.toInt 中 , 我们发现方法会判断如果 offset + length > bytes.length 抛出异常,也就是我们传入数据的字节数小于 4 则会抛出 explainWrongLengthOrOffset 异常
public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE; //SIZEOF_INT = 4/*** Converts a byte array to an int value* @param bytes byte array* @return the int value*/public static int toInt(byte[] bytes) {return toInt(bytes, 0, SIZEOF_INT);}public static int toInt(byte[] bytes, int offset, final int length) {if (length != SIZEOF_INT || offset + length > bytes.length) {throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT);}if (UNSAFE_UNALIGNED) {return UnsafeAccess.toInt(bytes, offset);} else {int n = 0;for(int i = offset; i < (offset + length); i++) {n <<= 8;n ^= bytes[i] & 0xFF;}return n;}}private static IllegalArgumentExceptionexplainWrongLengthOrOffset(final byte[] bytes,final int offset,final int length,final int expectedLength) {String reason;if (length != expectedLength) {reason = "Wrong length: " + length + ", expected " + expectedLength;} else {reason = "offset (" + offset + ") + length (" + length + ") exceed the"+ " capacity of the array: " + bytes.length;}return new IllegalArgumentException(reason);}
我们知道在java中 int 占用字节数为4 ,字符串"101"占用字节为3 ,所以当字符串小于 4 的时候,我们同步数据会报错
System.out.println("占用字节->"+"101".getBytes().length);输出如下:占用字节->3
这时可以猜想是不是传入字符串长度 >= 4 时候,就正常了
我们通过hbase put进去数据
put 'tableName','test1','fn:comments_count','1111'
put 'tableName','test2','fn:comments_count','11111'
我们发现日志中没有报错,并且solr中数据已经成功同步过去;但是发现同步过去的数据不对,并不是我们插入的
Cloudera 文档中提出可以实现 com.ngdata.hbaseindexer.parse.ByteArrayValueMapper 接口自定义类型
文档链接:https://docs.cloudera.com/documentation/enterprise/latest/topics/search_hbase_batch_indexer.html#id_dfb_g24_3db
实现该接口,自定义类型解决上述问题
com.ngdata.hbaseindexer.parse.ByteArrayValueMapper 接口在jar包 hbase-indexer-engine-1.*-cdh*.*.*.jar 包中
该jar包可以从 /opt/cloudera/parcels/CDH-6.*.*-1.cdh6.*.*.p0.590678/lib/hbase-solr/lib 目录下找到,
> ls hbase-indexer-engine-1.*-cdh*.*.*.jar
hbase-indexer-engine-1.5-cdh6.0.1.jar
实现该接口
import com.google.common.collect.ImmutableList;
import com.ngdata.hbaseindexer.parse.ByteArrayValueMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.util.Bytes;import java.util.Collection;public class Jastint implements ByteArrayValueMapper {private static Log log = LogFactory.getLog(Jastint.class);public Collection<? extends Object> map(byte[] input) {try {return ImmutableList.of(mapInternal(Bytes.toString(input)));} catch (IllegalArgumentException e) {log.warn(String.format("Error mapping byte value %s to %s", Bytes.toStringBinary(input),int.class.getName()), e);return ImmutableList.of();}}private int mapInternal(String toString) {return Integer.parseInt(toString);}}
打成 jar 包,将jar传到 /opt/cloudera/parcels/CDH-6.*.*-1.cdh*.*.*.p0.590678/lib/hbase-solr/lib/ 目录下,Key-Value Store Indexer 服务启动时会读取该目录下的jar包
修改 Morphlines 配置文件如下,对 hbase_indexer_vt_fn_comments_count 字段 使用我们自定义的 com.jast.hbaseindexer.Jastint 类,注意这里要引用 包名+类名 否则会提示找不到类
SOLR_LOCATOR : {# Name of solr collectioncollection : hbaseindexer# ZooKeeper ensemblezkHost : "$ZK_HOST"
}morphlines : [
{
id : WeiBoTableMap
importCommands : ["org.kitesdk.**", "com.ngdata.**"]commands : [ {extractHBaseCells {mappings : [{inputColumn : "fn:name"outputField : "hbase_indexer_vt_fn_name" type : string source : value},{inputColumn : "fn:comments_count"outputField : "hbase_indexer_vt_fn_comments_count" type : "com.jast.hbaseindexer.Jastint"source : value},{inputColumn : "fn:text"outputField : "hbase_indexer_vt_fn_text" type : stringsource : value}]}}{ logDebug { format : "output record: {}", args : ["@{}"] } }
]
}
]
重启 Key-Value Store Indexer 服务,再次插入数据
put 'tableName','test1','fn:comments_count','2587'
put 'tableName','test2','fn:comments_count','2587'
插入成功