目录
类的关系图
RatioBasedCompactionPolicy
selectCompaction 方法
getCurrentEligibleFiles方法
skipLargeFiles方法
createCompactionRequest方法
filterBulk方法
applyCompactionPolicy方法
removeExcessFiles方法
setIsMajor方法
其他相关文章
Hbase Compaction 源码分析 - CompactionChecker
Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略
Hbase Compaction 源码分析 - CompactSplitThread 线程池选择
之前介绍 CompactionChecker 执行时机,这回接着介绍具体的策略
类的关系图
RatioBasedCompactionPolicy
该类在
org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy
selectCompaction 方法
调用过程如下
我们看一下RatioBasedCompactionPolicy 的 selectCompaction 实现方法,实际是在父类 SortedCompactionPolicy 中
//candidateFiles 候选文件,并且按照seqId从最早到最新的排序//filesCompacting 正在Compcation的文件//mayUseOffPeak 是否为高峰期//forceMajor 是否为MajorCompaction,该值在 CompactionChecker 中会设置为true//返回 符合Compaction的候选列表public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,final List<StoreFile> filesCompacting, final boolean isUserCompaction,final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {// Preliminary compaction subject to filtersArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);// Stuck and not compacting enough (estimate). It is not guaranteed that we will be// able to compact more if stuck and compacting, because ratio policy excludes some// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).int futureFiles = filesCompacting.isEmpty() ? 0 : 1;//如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),
// blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)>= storeConfigInfo.getBlockingFileCount();//删除正在合并的文件candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +filesCompacting.size() + " compacting, " + candidateSelection.size() +" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");// If we can't have all files, we cannot do major anyway//判断是否包含全部文件,如果没有正在合并的文件则为trueboolean isAllFiles = candidateFiles.size() == candidateSelection.size();//如果是全部文件,并且是MajorCompaction,则不进行文件过滤,否则进行文件过滤,过滤掉大于hbase.hstore.compaction.max.size值的文件if (!(forceMajor && isAllFiles)) {//排除大于hbase.hstore.compaction.max.size值的数据,默认Long.MAX_VALUE//hbase.hstore.compaction.max.size 表示文件大小大于该值的store file 一定会被minor compaction排除candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);isAllFiles = candidateFiles.size() == candidateSelection.size();}// Try a major compaction if this is a user-requested major compaction,// or if we do not have too many files to compact and this was requested as a major compaction//isTryingMajor判断条件有两种// 1、Major合并为True,且包含所有问文件,且是一个用户合并// 2、Major合并为True,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)|| (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection))&& (candidateSelection.size() < comConf.getMaxFilesToCompact()));// Or, if there are any references among the candidates.//判断是否包含分裂后的文件boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);//如果不是isTryingMajor且不包含分裂后的文件,则 createCompactionRequest 方法中进行进一步文件过滤CompactionRequest result = createCompactionRequest(candidateSelection,isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);ArrayList<StoreFile> filesToCompact = Lists.newArrayList(result.getFiles());//过滤掉多余最大合并的文件数量removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);result.updateFiles(filesToCompact);isAllFiles = (candidateFiles.size() == filesToCompact.size());result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak);result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);return result;}
该方法主要流程:
1.传入参数与返回类型
candidateFiles 压缩候选文件,并且按照seqId从最早到最新的排序
filesCompacting 正在压缩的文件
mayUseOffPeak 是否为高峰期
forceMajor 是否为MajorCompaction,该值在 CompactionChecker 中会设置为true
isUserCompaction 是否为用户压缩
返回 CompactionRequest ,符合Compaction的候选列表
2.判断是否阻塞,等待合并的文件数量大于blockingStoreFiles,认为是阻塞
如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),
hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。
3.从候选列表中candidateSelection删除正在Compaction的文件
4.判断是否包含全部文件,如果没有正在合并的文件isAllFiles则为true
5.如果是全部文件,并且是MajorCompaction,则不进行文件过滤,否则进行文件过滤;
文件过滤方法:skipLargeFiles,过滤掉大于hbase.hstore.compaction.max.size值的文件,该方法后面介绍
6.判断isTryingMajor(判断后续是否为Major使用),判断条件有两种,满足一个即为true:
a.Major(forceMajor)合并为true,且包含所有文件,且是一个用户合并
b.Major(forceMajor)合并为true,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目
7.判断candidateSelection是否包含分裂后的文件
8.如果不是isTryingMajor且不包含分裂后的文件,则执行 createCompactionRequest 方法中进行进一步文件过滤,createCompactionRequest方法后面介绍
9.执行removeExcessFiles方法,如果大于最大合并的文件数量,则过滤掉多余的数量,否则不处理;执行removeExcessFiles方法下一步介绍
10.在返回的result中设置本次Compcation的类型(Major或者Minor),调用方法 setIsMajor,下面介绍
getCurrentEligibleFiles方法
protected ArrayList<StoreFile> getCurrentEligibleFiles(ArrayList<StoreFile> candidateFiles,final List<StoreFile> filesCompacting) {// candidates = all storefiles not already in compaction queueif (!filesCompacting.isEmpty()) {// exclude all files older than the newest file we're currently// compacting. this allows us to preserve contiguity (HBASE-2856)StoreFile last = filesCompacting.get(filesCompacting.size() - 1);int idx = candidateFiles.indexOf(last);Preconditions.checkArgument(idx != -1);candidateFiles.subList(0, idx + 1).clear();}return candidateFiles;}
该方法主要流程:
1.从候选文件列表中删除正在Compaction的文件
skipLargeFiles方法
/*** @param candidates pre-filtrate* @return filtered subset exclude all files above maxCompactSize* Also save all references. We MUST compact them*/protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,boolean mayUseOffpeak) {int pos = 0;//候选文件大于0 且文件不是分裂后的文件 且文件大小大于配置最大文件大小maxCompactSize时,该文件会被剔除while (pos < candidates.size() && !candidates.get(pos).isReference()&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {++pos;}if (pos > 0) {LOG.debug("Some files are too large. Excluding " + pos+ " files from compaction candidates");candidates.subList(0, pos).clear();}return candidates;}
该方法主要流程:
1.判断候选文件大于0 且文件不是分裂后的文件(如果是split后的文件,是需要进行Compaction,不会剔除) 且文件大小大于配置最大文件大小maxCompactSize时,执行++pos
2.pos大于0,清除大的数据
createCompactionRequest方法
protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {if (!tryingMajor) {//进入这里则为minorCompaction//过滤掉BulkLoad到HBase的文件candidateSelection = filterBulk(candidateSelection);//过滤掉不应该Minor的文件candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);candidateSelection = checkMinFilesCriteria(candidateSelection,comConf.getMinFilesToCompact());}return new CompactionRequest(candidateSelection);}
该方法主要流程:
如果不是isTryingMajor且不包含分裂后的文件,则为MinorCompaction 进行进一步文件过滤,否则直接返回
filterBulk方法
/*** @param candidates pre-filtrate* @return filtered subset exclude all bulk load files if configured*/protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {candidates.removeAll(Collections2.filter(candidates, new Predicate<StoreFile>() {@Overridepublic boolean apply(StoreFile input) {//判断该文件是否需要执行MinorCompactionreturn input.excludeFromMinorCompaction();}}));return candidates;}
该方法主要作用:
判断StoreFIle是否设置excludeFromMinorCompaction,也就是过滤掉BulkLoad到HBase的文件
applyCompactionPolicy方法
protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {if (candidates.isEmpty()) {return candidates;}
//前提:,选择待合并的文件按时间排序,最旧的文件排最前。// we're doing a minor compaction, let's see what files are applicableint start = 0;//hbase.hstore.compaction.ratio 1.2double ratio = comConf.getCompactionRatio();//判断是否为高峰期,高峰期 ratio 值为5,非高峰期为1.2if (mayUseOffPeak) {//获取hbase.hstore.compaction.ratio.offpeak值,默认是5ratio = comConf.getCompactionRatioOffPeak();LOG.info("Running an off-peak compaction, selection ratio = " + ratio);}// get store file sizes for incremental compacting selection.//https://blog.csdn.net/bryce123phy/article/details/56003628//获取待Compaction文件数量final int countOfFiles = candidates.size();long[] fileSizes = new long[countOfFiles];//每个file大小long[] sumSize = new long[countOfFiles];//前几个file大小总和for (int i = countOfFiles - 1; i >= 0; --i) {StoreFile file = candidates.get(i);fileSizes[i] = file.getReader().length();// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo//getMaxFilesToCompact 获取最大文件压缩数,默认为10int tooFar = i + comConf.getMaxFilesToCompact() - 1;sumSize[i] = fileSizes[i]+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);}//getMinFilesToCompact : hbase.hstore.compactionThreshold
// 如在任意一个 HStore 中有超过此数量的 HStoreFiles,
// 则将运行压缩以将所有 HStoreFiles 文件作为一个 HStoreFile 重新写入。
// (每次 memstore 刷新写入一个 HStoreFile)您可通过指定更大数量延长压缩,
// 但压缩将运行更长时间。在压缩期间,更新无法刷新到磁盘。长时间压缩需要足够的内存,
// 以在压缩的持续时间内记录所有更新。如太大,压缩期间客户端会超时。//getMinCompactSize 最小合并大小//也就是说,当待合并文件数量大于最小合并数量 并且// 文件大小大于Math.max(comConf.getMinCompactSize(),(long) (sumSize[start + 1] * ratio)值// 该端代码意思是过滤比较大的文件,默认认为最早的StoreFile文件大小最大(之前合并过)
//这里高峰期满足条件的数量小于等于非高峰期数量while (countOfFiles - start >= comConf.getMinFilesToCompact() &&fileSizes[start] > Math.max(comConf.getMinCompactSize(),(long) (sumSize[start + 1] * ratio))) {++start;}if (start < countOfFiles) {//从 countOfFiles 个候选文件中选取 start 个文件进行CompactionLOG.info("Default compaction algorithm has selected " + (countOfFiles - start)+ " files from " + countOfFiles + " candidates");} else if (mayBeStuck) {//mayBeStuck判断规则://如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),//hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。// We may be stuck. Compact the latest files if we can.int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();if (filesToLeave >= 0) {start = filesToLeave;}}candidates.subList(0, start).clear();return candidates;}
该方法主要流程
1.判断是否为高峰期,并确认ratio的值
2.计算文件大小
3.增加start变量,过滤掉文件较大的文件
4.判断
a.判断如果不是所有文件都被过滤,则从候选列表清空比较大的文件,我们认为越老的文件(在变量的最前面,所以可以通过++start方式可以过滤大文件),文件占用空间越大
b.判断如果所有文件都被过滤,继续判断是否阻塞(如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7)则视为阻塞),如果阻塞则将start调整,保证Compaction压缩
removeExcessFiles方法
protected void removeExcessFiles(ArrayList<StoreFile> candidates,boolean isUserCompaction, boolean isMajorCompaction) {//如果待合并的文件大于配置的最大合并文件数量int excess = candidates.size() - comConf.getMaxFilesToCompact();if (excess > 0) {//如果isMajorCompaction为true并且是用户合并则不过滤if (isMajorCompaction && isUserCompaction) {LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact()+ " files because of a user-requested major compaction");} else {//过滤掉多余最大合并文件数量的文件LOG.debug("Too many admissible files. Excluding " + excess+ " files from compaction candidates");candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();}}}
setIsMajor方法
public void setIsMajor(boolean isMajor, boolean isAllFiles) {//如果不是全部文件,并且是major压缩,抛异常,也就是说如果有正在Compaction的文件,就不能执行MajorCompactionassert isAllFiles || !isMajor;//不是全部文件:则为Minor压缩//是全部文件:并且是isTryingMajor为true,则为MAJOR,否则则为ALL_FILESthis.isMajor = !isAllFiles ? DisplayCompactionType.MINOR: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);}
该方法主要流程:
1.如果不是全部文件,并且是major压缩,抛异常;也就是说如果有正在Compaction的文件,就不能执行MajorCompaction
2. 不是全部文件:则为Minor压缩
是全部文件:并且是isTryingMajor为true,则为MAJOR,否则则为ALL_FILES
DisplayCompactionType 枚举有三个值 MINOR, ALL_FILES, MAJOR
但是判断是否为Major的条件只有
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + (request.isAllFiles() ? " (all files)" : "")); public boolean isMajor() { return this.isMajor == DisplayCompactionType.MAJOR; }
我们从代码中可以看出当类型为ALL_FILES或者MINOR都是MinorCompcation
涉及到的配置参数
值 | 说明 | 默认值 |
---|---|---|
hbase.hregion.majorcompaction | 在一个区域中所有 HStoreFiles Major 压缩之间的时间(以毫秒为单位)。要禁用自动的Major压缩,请将此值设置为 0。 | 7天 |
hbase.hregion.majorcompaction.jitter | 抖动外边界以进行最大化压缩。在每个 RegionServer 上,hbase.region.majorcompaction 间隔与此最大边界内的随机分数相乘。在即将运行下一个最大化压缩时加入该 + 或 - 乘积。最大化压缩不应同时发生在各 RegionServer 上。该数越小,压缩越紧密。 所以 major compact的时间间隔 = [7-7*0.5,7+7.0.5] | 0.5 |
hbase.server.thread.wakefrequency | 搜索工作时暂停的时间段(以毫秒为单位)。服务线程如 META 扫描仪、日志滚轮、Major Compcation 线程使用的睡眠间隔。 | 10秒 |
hbase.server.compactchecker.interval.multiplier | hbase后台线程检查因子,hbase.server.compactchecker.interval.multiplier* hbase.server.thread.wakefrequency 就是Compaction Major 检查的周期,比如1000*10秒≈2.77小时 | 1000 |
hbase.hstore.compaction.ratio | 这个ratio参数的作用是判断文件大小 > hbase.hstore.compaction.min.size的StoreFile是否也是适合进行minor compaction的,默认值1.2。更大的值将压缩产生更大的StoreFile,建议取值范围在1.0~1.4之间。大多数场景下也不建议调整该参数。 | 1.2 |
hbase.hstore.compaction.ratio.offpeak | 此参数与compaction ratio参数含义相同,是在原有文件选择策略基础上增加了一个非高峰期的ratio控制,默认值5.0。这个参数受另外两个参数 hbase.offpeak.start.hour 与 hbase.offpeak.end.hour 控制,这两个参数值为[0, 23]的整数,用于定义非高峰期时间段,默认值均为-1表示禁用非高峰期ratio设置。 | 5 |