目录
CompactSplitThread
requestCompactionInternal方法
selectCompaction方法
requestCompaction方法
其他相关文章
Hbase Compaction 源码分析 - CompactionChecker
Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略
Hbase Compaction 源码分析 - CompactSplitThread 线程池选择
CompactSplitThread
从名称我们可以看出来这是个处理Compcation和Split的线程
我们从下面的方法调用关系来看可发现CompactionChecker会调用requestCompactionInternal方法
requestCompactionInternal方法
/*** @param r region store belongs to* @param s Store to request compaction on* @param why Why compaction requested -- used in debug messages* @param priority override the default priority (NO_PRIORITY == decide)* @param request custom compaction request. Can be <tt>null</tt> in which case a simple* compaction will be used.*///selectNow:系统自动触发的system compaction,selectNow参数为false,如果周期性或者人工触发的major compaction的合并,则selectNow为trueprivate synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,final String why, int priority, CompactionRequest request, boolean selectNow, User user)throws IOException {//判断Hregionserver服务是否停止if (this.server.isStopped()|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {return null;}CompactionContext compaction = null;if (selectNow) {//周期执行执行MajorCompaction,或人工触发的major compaction,selectNow为true.compaction = selectCompaction(r, s, priority, request, user);if (compaction == null) return null; // message logged inside}// We assume that most compactions are small. So, put system compactions into small// pool; we will do selection there, and move to large pool if necessary.// throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",// 2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());//如果selectNow为ture,需要compaction的文件大小大于throttlePoint值,则使用longCompactions线程,否则使用shortCompactions线程//longCompactions和shortCompactions默认大小都是1,生产环境可以调整大一些ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))? longCompactions : shortCompactions;pool.execute(new CompactionRunner(s, r, compaction, pool, user));((HRegion)r).incrementCompactionsQueuedCount();if (LOG.isDebugEnabled()) {String type = (pool == shortCompactions) ? "Small " : "Large ";LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);}return selectNow ? compaction.getRequest() : null;}
该方法主要内容如下
1.如果HRegionServer停止则跳过
2.判断是否selectNow,如果为true,则执行selectCompaction方法,否则跳过
electNow:系统自动触发的system compaction,selectNow参数为false,如果周期性执行MajorCompaction或者人工触发的major compaction的合并,则selectNow为true
3.选择线程池,如果selectNow为ture且需要compaction的文件大小大于throttlePoint值,则使用longCompactions线程,否则使用shortCompactions线程
longCompactions和shortCompactions线程池默认大小都是1,生产环境可以调整大一些
4.执行线程
selectCompaction方法
private CompactionContext selectCompaction(final Region r, final Store s,int priority, CompactionRequest request, User user) throws IOException {//调用HStore.requestCompaction方法,获取CompactionContext数据CompactionContext compaction = s.requestCompaction(priority, request, user);if (compaction == null) {if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +" because compaction request was cancelled");}return null;}//确认compcation不为空assert compaction.hasSelection();if (priority != Store.NO_PRIORITY) {compaction.getRequest().setPriority(priority);}return compaction;}
该方法主要作用 :获取CompactionContext数据
requestCompaction方法
@Overridepublic CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,User user) throws IOException {// don't even select for compaction if writes are disabled//如果对应的region不可以写则返回nullif (!this.areWritesEnabled()) {return null;}// Before we do compaction, try to get rid of unneeded files to simplify things.removeUnneededFiles();//通过StoreEngine获取CompactionContext,这里介绍使用的是DefaultStoreEnginefinal CompactionContext compaction = storeEngine.createCompaction();CompactionRequest request = null;//设置读锁this.lock.readLock().lock();try {//设置同步锁synchronized (filesCompacting) {final Store thisStore = this;// First, see if coprocessor would want to override selection.//如果存在协处理器if (this.getCoprocessorHost() != null) {final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);boolean override = false;if (user == null) {override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,baseRequest);} else {try {override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws Exception {return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,baseRequest);}});} catch (InterruptedException ie) {InterruptedIOException iioe = new InterruptedIOException();iioe.initCause(ie);throw iioe;}}if (override) {// Coprocessor is overriding normal file selection.compaction.forceSelect(new CompactionRequest(candidatesForCoproc));}}// Normal case - coprocessor is not overriding file selection.//正常情况if (!compaction.hasSelection()) {//是否为用户Compactionboolean isUserCompaction = priority == Store.PRIORITY_USER;//判断是否为高峰期boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&offPeakCompactionTracker.compareAndSet(false, true);try {//调用DefaultCompactionContext的select方法compaction.select(this.filesCompacting, isUserCompaction,mayUseOffPeak, forceMajor && filesCompacting.isEmpty());} catch (IOException e) {if (mayUseOffPeak) {offPeakCompactionTracker.set(false);}throw e;}assert compaction.hasSelection();//isOffPeak为true,这种压实被提升为非高峰if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {// Compaction policy doesn't want to take advantage of off-peak.offPeakCompactionTracker.set(false);}}if (this.getCoprocessorHost() != null) {if (user == null) {this.getCoprocessorHost().postCompactSelection(this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);} else {try {user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws Exception {getCoprocessorHost().postCompactSelection(thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);return null;}});} catch (InterruptedException ie) {InterruptedIOException iioe = new InterruptedIOException();iioe.initCause(ie);throw iioe;}}}// Selected files; see if we have a compaction with some custom base request.if (baseRequest != null) {// Update the request with what the system thinks the request should be;// its up to the request if it wants to listen.compaction.forceSelect(baseRequest.combineWith(compaction.getRequest()));}// Finally, we have the resulting files list. Check if we have any files at all.request = compaction.getRequest();final Collection<StoreFile> selectedFiles = request.getFiles();if (selectedFiles.isEmpty()) {return null;}addToCompactingFiles(selectedFiles);// If we're enqueuing a major, clear the force flag.this.forceMajor = this.forceMajor && !request.isMajor();// Set common request properties.// Set priority, either override value supplied by caller or from store.request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());}} finally {this.lock.readLock().unlock();}LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"+ (request.isAllFiles() ? " (all files)" : ""));this.region.reportCompactionRequestStart(request.isMajor());return compaction;}
该方法主要作用:
1.先判断region是否可以写
2.提出不必要的文件
3.处理存在协处理器的数据
4.调用DefaultCompactionContext.select方法
DefaultCompactionContext.select方法最终调用SortedCompactionPolicy.selectCompaction 方法
参数说明:
参数 | 说明 | 默认值 |
hbase.regionserver.thread.compaction.small | RegionServer 小型压缩线程计数 | 1 |
hbase.regionserver.thread.compaction.large | RegionServer大型压缩线程计数 | 1 |
下一步就是在具体合并策略选取文件
查看:Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略