其他相关文章
Hbase Compaction 源码分析 - CompactionChecker
Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略
Hbase Compaction 源码分析 - CompactSplitThread 线程池选择
CompactionChecker
介绍:
RegionServer会在后台启动一个线程CompactionChecker,定期触发检查对应Store是否需要执行Compaction,检查周期为hbase.server.thread.wakefrequency*hbase.server.compactchecker.interval.multiplier。和flush不同的是,该线程优先检查Store中总文件数是否大于配置Compaction阈值hbase.hstore.compactionThreshold,一旦大于就会触发Compaction;如果不满足,接着检查是否满足Major Compaction条件。简单来说,如果当前Store中HFile的最早更新时间早于某个值mcTime,就会触发Major Compaction。mcTime是一个浮动值,浮动区间默认为[7-7 0.5,7+7*0.5],其中7为hbase.hregion.majorcompaction,0.5为hbase.hregion.majorcompaction.jitter,可见默认在7天左右就会执行一次Major Compaction。用户如果想禁用Major Compaction,需要将参数hbase.hregion.majorcompaction设为0
源码分析:
在 org.apache.hadoop.hbase.regionserver.HRegionServer 类中,有个 compactionChecker 变量,该变量类型实现 Runnable 接口,用做后台独立线程监测是否需要执行Compaction操作
/** Check for compactions requests.*/ScheduledChore compactionChecker;
CompactionChecker 类是HRegionServer内部类,CompactionChecker构造方法如下
CompactionChecker(final HRegionServer h, final int sleepTime,final Stoppable stopper) {super("CompactionChecker", stopper, sleepTime);this.instance = h;LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));/* MajorCompactPriority is configurable.* If not set, the compaction will use default priority.*///设置major合并优先级,取参数hbase.regionserver.compactionChecker.majorCompactPriority,// 默认为Integer.MAX_VALUEthis.majorCompactPriority = this.instance.conf.getInt("hbase.regionserver.compactionChecker.majorCompactPriority",DEFAULT_PRIORITY);}
传入三个参数, 第一个是HRegionServer,第二个是休眠时间,第三个是是否停止(如果RegionServer停止运行,CompactionChecker会监控到,并停止Compaction)
同时调用父类方法
super("CompactionChecker", stopper, sleepTime);
我们看下父类方法 实现了Runnable接口,这里我们直接看run方法具体运行方法
我们发现第一次运行会初始化执行initialChore()方法,该方法值有retrun true 不做任何处理,之后每次都会运行chore()方法,该方法在CompactionChecker类中实现
public abstract class ScheduledChore implements Runnable @Overridepublic void run() {updateTimeTrackingBeforeRun();if (missedStartTime() && isScheduled()) {onChoreMissedStartTime();if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");} else if (stopper.isStopped() || !isScheduled()) {cancel(false);cleanup();if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");} else {try {if (!initialChoreComplete) {initialChoreComplete = initialChore();} else {chore();}} catch (Throwable t) {if (LOG.isErrorEnabled()) LOG.error("Caught error", t);if (this.stopper.isStopped()) {cancel(false);cleanup();}}}}
CompactionChecker类chore
查看CompactionChecker类chore方法
这里主要就是调用相关策略的方法,判断是否需要Compaction,具体策略在下面介绍
private static class CompactionChecker extends ScheduledChore {private final HRegionServer instance;private final int majorCompactPriority;private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;//Iteration is 1-based rather than 0-based so we don't check for compaction// immediately upon region server startupprivate long iteration = 1;//sleepTime上面调用传入的是:hbase.server.thread.wakefrequency=10 * 1000CompactionChecker(final HRegionServer h, final int sleepTime,final Stoppable stopper) {super("CompactionChecker", stopper, sleepTime);this.instance = h;LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));/* MajorCompactPriority is configurable.* If not set, the compaction will use default priority.*///设置major合并优先级,取参数hbase.regionserver.compactionChecker.majorCompactPriority,// 默认为Integer.MAX_VALUEthis.majorCompactPriority = this.instance.conf.getInt("hbase.regionserver.compactionChecker.majorCompactPriority",DEFAULT_PRIORITY);}@Overrideprotected void chore() {//onlineRegions.values() 是所有RegionServer中活跃的Region集合for (Region r : this.instance.onlineRegions.values()) {if (r == null)continue;//r.getStores 获取region中所有Store(一个Region有几个列簇就有几个Store)for (Store s : r.getStores()) {try {//multiplier = hbase.server.compactchecker.interval.multiplier的值//该值获取方法在 HStore 的构造函数中初始化 默认1000long multiplier = s.getCompactionCheckMultiplier();//断言是否为正常值assert multiplier > 0;// iteration该值初始化为1,每次定时执行该值会+1,当为multiplier的整数倍时会往下执行//我们上面获取到的multiplier=1000,chore定期执行,每隔 hbase.server.thread.wakefrequency=10秒 默认 10 * 1000//也就是每隔10s*1000=10000s=2.77小时,会往下执行一次if (iteration % multiplier != 0) continue;//需要合并的话,发起SystemCompaction请求,// 此处最终比较的是是否当前storefile数量减去正在compacting的文件数大于设置的compact min值(这里看的是RatioBasedCompactionPolicy策略的needsCompaction方法)// 若满足则执行systemcompactif (s.needsCompaction()) {// Queue a compaction. Will recognize if major is needed.this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()+ " requests compaction");} else if (s.isMajorCompaction()) {//判断是否需要执行Major Compactions.triggerMajorCompaction();if (majorCompactPriority == DEFAULT_PRIORITY|| majorCompactPriority > ((HRegion)r).getCompactPriority()) {this.instance.compactSplitThread.requestCompaction(r, s, getName()+ " requests major compaction; use default priority", null);} else {this.instance.compactSplitThread.requestCompaction(r, s, getName()+ " requests major compaction; use configured priority",this.majorCompactPriority, null, null);}}} catch (IOException e) {LOG.warn("Failed major compaction check on " + r, e);}}}iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);}}
needsCompaction 方法
@Overridepublic boolean needsCompaction(final Collection<StoreFile> storeFiles,final List<StoreFile> filesCompacting) {//当前storeFiles数量-正在compact的文件数量,是否大于minFilesToCompact //minFilesToCompact = Math.max(2, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY,// /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));//如果待compaction文件数量大于配置,则返回true,进行compactionint numCandidates = storeFiles.size() - filesCompacting.size();return numCandidates >= comConf.getMinFilesToCompact();}
isMajorCompaction 方法
可以看到调用的是 storeEngine.getCompactionPolicy() 的 shouldPerformMajorCompaction方法storeEngine.getCompactionPolicy() 获取到执行的策略,然后调用该策略的 shouldPerformMajorCompaction 方法,这里分析的是 RatioBasedCompactionPolicy 策略,
@Overridepublic boolean isMajorCompaction() throws IOException {for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {// TODO: what are these reader checks all over the place?if (sf.getReader() == null) {LOG.debug("StoreFile " + sf + " has null Reader");return false;}}return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles());}
shouldPerformMajorCompaction 方法
该方法返回是否需要Compaction
这里查看的是hbase 1.4.10 版本源码,该方法存在一个bug,倒数第二行的 result = true;会导致其中的一种判断失效,后来去查看了下 1.4.13 版本以后源码,已经修复该问题。
@Overridepublic boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)throws IOException {boolean result = false;//获取下一次major compact的时间 ,该值是一个浮动值 [7-7*0.5,7+7.0.5]// hbase.hregion.majorcompaction = 7天// hbase.hregion.majorcompaction.jitter = 0.5long mcTime = getNextMajorCompactTime(filesToCompact);if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {return result;}// TODO: Use better method for determining stamp of last major (HBASE-2990)//获取待合并文件中的修改时间最小的那个long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);//获取当前时间long now = EnvironmentEdgeManager.currentTime();//判断上次修改时间,是否在本次修改时间范围内,如果最早caption时间大于mcTime天前,// 即在mcTime时间内执行过,则不运行Majorif (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {//到了这一步就肯定会执行Major Compaction,后面判断,基本就是Debug时候使用String regionInfo;if (this.storeConfigInfo != null && this.storeConfigInfo instanceof HStore) {regionInfo = ((HStore)this.storeConfigInfo).getRegionInfo().getRegionNameAsString();} else {regionInfo = this.toString();}// Major compaction time has elapsed.long cfTTL = HConstants.FOREVER;if (this.storeConfigInfo != null) {//获取文件保存时间ttlcfTTL = this.storeConfigInfo.getStoreFileTtl();}if (filesToCompact.size() == 1) {//合并文件为1个// Single fileStoreFile sf = filesToCompact.iterator().next();//文件最小时间戳Long minTimestamp = sf.getMinimumTimestamp();//文件存在时间long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue();if (sf.isMajorCompaction() && (cfTTL == Long.MAX_VALUE || oldest < cfTTL)) {//文件未过期float blockLocalityIndex =sf.getHDFSBlockDistribution().getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {//判断文件是否本地化,如果未本地化则进行CompactionLOG.debug("Major compaction triggered on only store " + regionInfo+ "; to make hdfs blocks local, current blockLocalityIndex is "+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");result = true;} else {//跳过压缩LOG.debug("Skipping major compaction of " + regionInfo+ " because one (major) compacted file only, oldestTime " + oldest+ "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex+ " (min " + comConf.getMinLocalityToForceCompact() + ")");}} else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {//storeFile过期触发Major CompactionLOG.debug("Major compaction triggered on store " + regionInfo+ ", because keyvalues outdated; time since last major compaction "+ (now - lowTimestamp) + "ms");result = true;}} else {//如果合并文件为多个则返回trueLOG.debug("Major compaction triggered on store " + regionInfo+ "; time since last major compaction " + (now - lowTimestamp) + "ms");}result = true;//该处存在bug}return result;}
这里返回的result的就是 s.isMajorCompaction() 返回的结果