必需要提前说明下:不建议使用自定义的Filter。所有的Filter都是在服务端生效:就是说需要将自定义的Filter封装为jar,上传到HBase的类路径下,并重启HBase使之生效。对于生产环境的HBase来说,重启通常是不能接受的。
Filter的设置是在客户端完成的,而Filter的逻辑是在HBase的服务端完成的,中间需要一次序列化。我试过几种序列化方案,不过protobuffer以外的其他几种效果不算好。HBase自带的Filter也是用protobuffer进行的序列化,因此使用protobuffer还可以少传几个包。
需要提前说明的已经说完了,开始进入正题。这次从一个案例开始说起:在HBase中存储着用户行为记录,行键设计为“uid(6位)+etime(时间戳/1000)+tid(7位)+顺序号(8位)”。其中uid为用户ID、etime为事件时间、tid为行为标签。目标是检索出某个用户在指定时间范围内的几种行为数据。
针对这个案例我们自定义一个CustomRowKeyFilter,并将一个用户ID、事件起止时间以及多个行为ID作为CustomRowKeyFilter的成员变量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
packagecom.zhyea.dev.hbase.filter;
importorg.apache.hadoop.hbase.Cell;
importorg.apache.hadoop.hbase.filter.FilterBase;
importorg.apache.hadoop.hbase.util.Bytes;
importjava.io.IOException;
publicclassCustomRowKeyFilterextendsFilterBase{
privatelongpid;
privatelongeventTime;
privateStringtids;
privatebooleanfilterOutRow=false;
publicCustomRowKeyFilter(long_pid,long_eventTime,String_tids){
this.pid=_pid;
this.eventTime=_eventTime;
this.tids=_tids;
}
@Override
publicbooleanfilterRowKey(byte[]data,intoffset,intlength){
StringrowKey=Bytes.toString(data,offset,length);
this.filterOutRow=check(rowKey);
returnthis.filterOutRow;
}
publicReturnCodefilterKeyValue(Cellv)throwsIOException{
if(this.filterOutRow){
returnReturnCode.NEXT_ROW;
}
returnReturnCode.INCLUDE;
}
privatebooleancheck(StringrowKey){
try{
if(rowKey.length()<7){
returntrue;
}
long_pid=Long.valueOf(rowKey.substring(0,6));
long_eTime=Long.valueOf(rowKey.substring(6,16));
long_tid=Long.valueOf(rowKey.substring(16,23));
if(this.pid!=_pid){
returntrue;
}
if(this.eventTime>_eTime){
returntrue;
}
if(!this.tids.contains(_tid+"")){
returntrue;
}
}catch(Exceptione){
returntrue;
}
returnfalse;
}
}
代码中继承了FilterBase类,可以减少一些结构性的代码工作。至于Filter是如何工作的,在网上找到的这张图应该描述得很清楚了:
前面的代码只是实现了Filter的处理逻辑。要想使用这个Filter还需要做一些序列化处理。如前面所说序列化方案选择的是protobuffer,这里需要先定义一个描述文件CustomRowKeyFilterProto.proto,内容如下:
1
2
3
4
5
6
7
8
9
10
packagefilter;
optionjava_package="com.zhyea.dev.hbase.filter.proto";
optionjava_outer_classname="CustomRowKeyFilterProto";
messageCustomRowKeyFilter{
requiredint64pid=1;
requiredint64eventTime=2;
requiredstringtids=3;
}
定义完成后,执行protoc命令:
1
protoc-I=./--java_out=../src/main/javaCustomRowKeyFilterProto.proto
其中“-I”指定了proto描述文件的父目录, “—java_out”指定了java类的类路径,具体请根据自己的情况进行设置。执行命令后会在包com.zhyea.dev.hbase.filter.proto下生成序列化工具类CustomRowKeyFilterProto.java。
接下来在CustomRowKeyFilter中重写Filter类的toByteArray()方法和parseFrom()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
publicbyte[]toByteArray()throwsIOException{
CustomRowKeyFilterProto.CustomRowKeyFilter.Builderbuilder=CustomRowKeyFilterProto.CustomRowKeyFilter.newBuilder();
builder.setPid(this.pid);
builder.setEventTime(this.eventTime);
builder.setCids(this.tids);
returnbuilder.build().toByteArray();
}
publicstaticFilterparseFrom(finalbyte[]pbBytes)throwsDeserializationException{
CustomRowKeyFilterProto.CustomRowKeyFilterproto;
try{
proto=CustomRowKeyFilterProto.CustomRowKeyFilter.parseFrom(pbBytes);
}catch(InvalidProtocolBufferExceptione){
thrownewDeserializationException(e);
}
long_pid=proto.getPid();
long_eventTime=proto.getEventTime();
String_tids=proto.getCids();
returnnewCustomRowKeyFilter(_pid,_eventTime,_tids);
}
这样自定义Filter就完成了。剩下的事情就是将之打包并上传到HBase(每个RegionServer)的类路径下。然后就可以在程序中使用了。
现在再仔细想想这个程序,是否一定需要一个自定义Filter呢!我们已经将查询需要的所有元素都定义在行键里了。那么可以使用“uid+起始时间”作为startRow,“uid+结束时间”作为stopRow完成时间范围的匹配,使用RegexStringComparator来处理tid的匹配,这样直接使用HBase提供的RowFilter就能解决问题了。唯一需要注意的事情就是在设计表时多花些心思在行键上罢了。
就是这样。
参考文档