HBase模糊查询优化 - 并发查询
HBase查询优化续集,继上次优化后查询速度还是很慢,
这次优化我们使用并发查询,查询HBase库里的各region拆分情况,然后对查询的rowkey切分成多段,每一段单独去不同的region中查询,使我们可以并发查询来提升查询速度。
优化后经过测试查询速度大大提升!
代码如下
package query;import main.TaskExecutors;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;/*** <p>* 用来作为并发查询HBase使用,原理是使用scan来设置startrow和endrow。* 因为HBase存储rowkey是字典序排序;* 所以如果是单线程查询我们一般设置startrow为rowkey+"0" , endrow为rowkey+"z"。* 我们并发scan可以将startrow和endrow拆分成多份,例如:* 分为rowkey+'0' -> rowkey+'9' 和 rowkey+'A' -> rowkey+'Z' 和 rowkey+'a' -> rowkey+'z',这样就可以分为三个线程并发查询* <p/>** @author chun* @date 2022/7/21 16:48*/public class ConcurrentScanHBase {private ExecutorService pool = null;private String rowKey;//Pair自定了,也可以使用javafx.util.Pair,需要看服务器上的jdk是oracle还是openjdk,openjdk没有,可以把oracleJDK的Pair类直接复制过来使用private List<Pair<Character, Character>> rowkeyRanges;private List<Scan> scanList = new ArrayList<>();private SetScan setScan = null;private CountDownLatch countDownLatch;private static final String format = "yyyyMMddHH";public ConcurrentScanHBase(ExecutorService pool, String rowkey, List<Pair<Character, Character>> rowkeyRanges, SetScan setScan) {if (setScan == null) {throw new NullPointerException("SetScan is NULL");}this.pool = pool;this.rowKey = rowkey;this.rowkeyRanges = rowkeyRanges;this.setScan = setScan;init();}private void init() {for (Pair<Character, Character> rowkeyRange : rowkeyRanges) {scanList.add(getScann(rowKey.getBytes(), (rowKey + rowkeyRange.getKey()).getBytes(), (rowKey + rowkeyRange.getValue()).getBytes()));}countDownLatch = new CountDownLatch(rowkeyRanges.size());}//如果需要主线程等待此次任务结束,调用await()方法;public void await() {try {this.countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}public void exec(ExecScan execScan) {for (Scan scan : scanList) {pool.execute(() -> {execScan.exec(scan);countDownLatch.countDown();});}}//只设置startrow和endrow,其他设置需要返回后自己设置private Scan getScann(byte[] rowkey, byte[] startRow, byte[] endRow) {Scan scan = new Scan();scan.withStartRow(startRow);scan.withStopRow(endRow);PrefixFilter prefixFilter = new PrefixFilter(rowkey);scan.setFilter(prefixFilter);setScan.initScan(scan);return scan;}
}
匿名内部类接口
package query;import org.apache.hadoop.hbase.client.Scan;/*** 作为执行扫描表的接口使用* 通过设置匿名内部类,来加载扫描表的实现过程** @author chun* @date 2022/7/21 17:52*/
public interface ExecScan {public void exec(Scan scan);
}
package query;import org.apache.hadoop.hbase.client.Scan;/*** 作为ConcurrentScanHBase中init方法的参数接口* 为了方便每个调用者对Scan的初始化而设计* 调用者可以通过匿名内部类的方式使用* 来设置除了startrow和endrow之外的其他参数** @author chun* @date 2022/7/21 17:52*/
public interface SetScan {public void initScan(Scan scan);
}
Pair类
package query;import javafx.beans.NamedArg;import java.io.Serializable;/*** @author chun* @date 2022/7/22 10:10*/
public class Pair<K, V> implements Serializable {/*** Key of this <code>Pair</code>.*/private K key;/*** Gets the key for this pair.** @return key for this pair*/public K getKey() {return key;}/*** Value of this this <code>Pair</code>.*/private V value;/*** Gets the value for this pair.** @return value for this pair*/public V getValue() {return value;}/*** Creates a new pair** @param key The key for this pair* @param value The value to use for this pair*/public Pair(@NamedArg("key") K key, @NamedArg("value") V value) {this.key = key;this.value = value;}/*** <p><code>String</code> representation of this* <code>Pair</code>.</p>** <p>The default name/value delimiter '=' is always used.</p>** @return <code>String</code> representation of this <code>Pair</code>*/@Overridepublic String toString() {return key + "=" + value;}/*** <p>Generate a hash code for this <code>Pair</code>.</p>** <p>The hash code is calculated using both the name and* the value of the <code>Pair</code>.</p>** @return hash code for this <code>Pair</code>*/@Overridepublic int hashCode() {// name's hashCode is multiplied by an arbitrary prime number (13)// in order to make sure there is a difference in the hashCode between// these two parameters:// name: a value: aa// name: aa value: areturn key.hashCode() * 13 + (value == null ? 0 : value.hashCode());}/*** <p>Test this <code>Pair</code> for equality with another* <code>Object</code>.</p>** <p>If the <code>Object</code> to be tested is not a* <code>Pair</code> or is <code>null</code>, then this method* returns <code>false</code>.</p>** <p>Two <code>Pair</code>s are considered equal if and only if* both the names and values are equal.</p>** @param o the <code>Object</code> to test for* equality with this <code>Pair</code>* @return <code>true</code> if the given <code>Object</code> is* equal to this <code>Pair</code> else <code>false</code>*/@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o instanceof Pair) {Pair pair = (Pair) o;if (key != null ? !key.equals(pair.key) : pair.key != null) return false;if (value != null ? !value.equals(pair.value) : pair.value != null) return false;return true;}return false;}
}
使用
public static void main(String[] args) {ArrayList<Pair<Character, Character>> pairs = new ArrayList<>();pairs.add(new Pair<>('-', '3'));pairs.add(new Pair<>('4', '9'));pairs.add(new Pair<>('A', 'J'));pairs.add(new Pair<>('K', 'T'));pairs.add(new Pair<>('U', 'Z'));pairs.add(new Pair<>('a', 'j'));pairs.add(new Pair<>('k', 't'));pairs.add(new Pair<>('u', 'z'));ConcurrentScanHBase concurrentScanHBase = new ConcurrentScanHBase(TaskExecutors.getPool(), "baidu.com", pairs, new SetScan() {@Overridepublic void initScan(Scan scan) {scan.setCacheBlocks(false);scan.setBatch(6000);scan.addFamily(Bytes.toBytes("D"));}});concurrentScanHBase.exec(new ExecScan() {@Overridepublic void exec(Scan scan) {try (Table table = HBaseHelper.getConnection().getTable(TableName.valueOf(HBaseHelper.TABLE_NAME));ResultScanner scanner = table.getScanner(scan)) {int index = 0;for (Result[] results = scanner.next(6000); results.length != 0 && index++ < 3; results = scanner.next(6000)) {for (Result result : results) {for (Cell cell : result.rawCells()) {try {byte[] bytes = Bytes.copy(cell.getValueArray(), cell.getValueOffset(),cell.getValueLength());System.out.println(bytes);} catch (Exception e) {e.printStackTrace();}}}}} catch (IOException e) {e.printStackTrace();}}});try {concurrentScanHBase.await();} catch (RuntimeException e) {e.printStackTrace();}}