对于Hbase的Bulkload基本流程这里就不多介绍可以看Bulkload流程介绍,本问主要介绍如何提升Bulkload的方式。
很多时候我们在写HFile的时候都会进行repartition使用的是repartitionAndSortWithinPartitions,其中Spark也提供几种repartition的实现如HashPartitioner、RangePartitioner,但当数据量大的时候就会出现性能问题,就会变慢。那如何解决呢?
思路
Bulkload总体分两步:
- 按照Hbase的文件格式写HFile;
- 把写好的HFile load到Hbase对应分区里。
第一步影响写文件的速度、第二步影响load的速度。在load的时候,是会按照Hbase的分区,把数据放到对应分区里面,这里面就会有两个问题,1.如果一个HFile文件对应多分区数据,在load的时候就会进行文件拆分(关键),2.如果多个文件对应一个分区,那可能进行合并。也就是说load的时候可能会进行拆分和合并操作,关键就在于你写HFile时的方式是如何。如果使用HashPartitioner、RangePartitioner方式基本上都会拆分,这也就会影响load的性能,因为数据并没有和Hbase的分区对齐。
这样一来思路就有了,分区对齐,写入HFile的数据按照Hbase的分区进行写入,数据一一对齐。保证每个写入的HFile文件都和Hbase的分区一一对应,这样在load的时候就不会进行拆分和合并了。具体看代码扩展Partitioner
class BulkLoadPartitioner(startKeys:Array[Array[Byte]])extends Partitioner {override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.lengthoverride def getPartition(rowKey: Any): Int = {val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {Bytes.compareTo(o1, o2)}}var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)if (partition < 0)partition = partition * -1 + -2if (partition < 0)partition = 0partition}
}
注意:
- Hbase的分区需要先进行预分区。
- 如果发现Load的时间远远高于写HFile的时间时候,就可以使用这种方式。
- 提高HFile的写入速度和Hbase的分区数有关。