Hbase Compaction 源码分析 - CompactionChecker

其他相关文章

Hbase Compaction 源码分析 - CompactionChecker

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

CompactionChecker

介绍:

RegionServer会在后台启动一个线程CompactionChecker,定期触发检查对应Store是否需要执行Compaction,检查周期为hbase.server.thread.wakefrequency*hbase.server.compactchecker.interval.multiplier。和flush不同的是,该线程优先检查Store中总文件数是否大于配置Compaction阈值hbase.hstore.compactionThreshold,一旦大于就会触发Compaction;如果不满足,接着检查是否满足Major Compaction条件。简单来说,如果当前Store中HFile的最早更新时间早于某个值mcTime,就会触发Major Compaction。mcTime是一个浮动值,浮动区间默认为[7-7 0.5,7+7*0.5],其中7为hbase.hregion.majorcompaction,0.5为hbase.hregion.majorcompaction.jitter,可见默认在7天左右就会执行一次Major Compaction。用户如果想禁用Major Compaction,需要将参数hbase.hregion.majorcompaction设为0

源码分析:

在 org.apache.hadoop.hbase.regionserver.HRegionServer 类中,有个 compactionChecker 变量,该变量类型实现 Runnable 接口,用做后台独立线程监测是否需要执行Compaction操作

 /** Check for compactions requests.*/ScheduledChore compactionChecker;

 CompactionChecker 类是HRegionServer内部类,CompactionChecker构造方法如下

CompactionChecker(final HRegionServer h, final int sleepTime,final Stoppable stopper) {super("CompactionChecker", stopper, sleepTime);this.instance = h;LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));/* MajorCompactPriority is configurable.* If not set, the compaction will use default priority.*///设置major合并优先级,取参数hbase.regionserver.compactionChecker.majorCompactPriority,// 默认为Integer.MAX_VALUEthis.majorCompactPriority = this.instance.conf.getInt("hbase.regionserver.compactionChecker.majorCompactPriority",DEFAULT_PRIORITY);}

传入三个参数, 第一个是HRegionServer,第二个是休眠时间,第三个是是否停止(如果RegionServer停止运行,CompactionChecker会监控到,并停止Compaction)

同时调用父类方法

super("CompactionChecker", stopper, sleepTime);

我们看下父类方法 实现了Runnable接口,这里我们直接看run方法具体运行方法

我们发现第一次运行会初始化执行initialChore()方法,该方法值有retrun true 不做任何处理,之后每次都会运行chore()方法,该方法在CompactionChecker类中实现

public abstract class ScheduledChore implements Runnable @Overridepublic void run() {updateTimeTrackingBeforeRun();if (missedStartTime() && isScheduled()) {onChoreMissedStartTime();if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");} else if (stopper.isStopped() || !isScheduled()) {cancel(false);cleanup();if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");} else {try {if (!initialChoreComplete) {initialChoreComplete = initialChore();} else {chore();}} catch (Throwable t) {if (LOG.isErrorEnabled()) LOG.error("Caught error", t);if (this.stopper.isStopped()) {cancel(false);cleanup();}}}}

CompactionChecker类chore 

查看CompactionChecker类chore方法

这里主要就是调用相关策略的方法,判断是否需要Compaction,具体策略在下面介绍

private static class CompactionChecker extends ScheduledChore {private final HRegionServer instance;private final int majorCompactPriority;private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;//Iteration is 1-based rather than 0-based so we don't check for compaction// immediately upon region server startupprivate long iteration = 1;//sleepTime上面调用传入的是:hbase.server.thread.wakefrequency=10 * 1000CompactionChecker(final HRegionServer h, final int sleepTime,final Stoppable stopper) {super("CompactionChecker", stopper, sleepTime);this.instance = h;LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));/* MajorCompactPriority is configurable.* If not set, the compaction will use default priority.*///设置major合并优先级,取参数hbase.regionserver.compactionChecker.majorCompactPriority,// 默认为Integer.MAX_VALUEthis.majorCompactPriority = this.instance.conf.getInt("hbase.regionserver.compactionChecker.majorCompactPriority",DEFAULT_PRIORITY);}@Overrideprotected void chore() {//onlineRegions.values() 是所有RegionServer中活跃的Region集合for (Region r : this.instance.onlineRegions.values()) {if (r == null)continue;//r.getStores 获取region中所有Store(一个Region有几个列簇就有几个Store)for (Store s : r.getStores()) {try {//multiplier = hbase.server.compactchecker.interval.multiplier的值//该值获取方法在 HStore 的构造函数中初始化 默认1000long multiplier = s.getCompactionCheckMultiplier();//断言是否为正常值assert multiplier > 0;// iteration该值初始化为1,每次定时执行该值会+1,当为multiplier的整数倍时会往下执行//我们上面获取到的multiplier=1000,chore定期执行,每隔 hbase.server.thread.wakefrequency=10秒 默认 10 * 1000//也就是每隔10s*1000=10000s=2.77小时,会往下执行一次if (iteration % multiplier != 0) continue;//需要合并的话,发起SystemCompaction请求,// 此处最终比较的是是否当前storefile数量减去正在compacting的文件数大于设置的compact min值(这里看的是RatioBasedCompactionPolicy策略的needsCompaction方法)// 若满足则执行systemcompactif (s.needsCompaction()) {// Queue a compaction. Will recognize if major is needed.this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()+ " requests compaction");} else if (s.isMajorCompaction()) {//判断是否需要执行Major Compactions.triggerMajorCompaction();if (majorCompactPriority == DEFAULT_PRIORITY|| majorCompactPriority > ((HRegion)r).getCompactPriority()) {this.instance.compactSplitThread.requestCompaction(r, s, getName()+ " requests major compaction; use default priority", null);} else {this.instance.compactSplitThread.requestCompaction(r, s, getName()+ " requests major compaction; use configured priority",this.majorCompactPriority, null, null);}}} catch (IOException e) {LOG.warn("Failed major compaction check on " + r, e);}}}iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);}}

needsCompaction 方法 

 

@Overridepublic boolean needsCompaction(final Collection<StoreFile> storeFiles,final List<StoreFile> filesCompacting) {//当前storeFiles数量-正在compact的文件数量,是否大于minFilesToCompact //minFilesToCompact = Math.max(2, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY,//          /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));//如果待compaction文件数量大于配置,则返回true,进行compactionint numCandidates = storeFiles.size() - filesCompacting.size();return numCandidates >= comConf.getMinFilesToCompact();}

isMajorCompaction 方法

可以看到调用的是 storeEngine.getCompactionPolicy() 的 shouldPerformMajorCompaction方法storeEngine.getCompactionPolicy() 获取到执行的策略,然后调用该策略的 shouldPerformMajorCompaction 方法,这里分析的是 RatioBasedCompactionPolicy 策略,

 @Overridepublic boolean isMajorCompaction() throws IOException {for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {// TODO: what are these reader checks all over the place?if (sf.getReader() == null) {LOG.debug("StoreFile " + sf + " has null Reader");return false;}}return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles());}

shouldPerformMajorCompaction 方法

该方法返回是否需要Compaction

这里查看的是hbase 1.4.10 版本源码,该方法存在一个bug,倒数第二行的 result = true;会导致其中的一种判断失效,后来去查看了下 1.4.13 版本以后源码,已经修复该问题。

 @Overridepublic boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)throws IOException {boolean result = false;//获取下一次major compact的时间 ,该值是一个浮动值 [7-7*0.5,7+7.0.5]// hbase.hregion.majorcompaction = 7天// hbase.hregion.majorcompaction.jitter = 0.5long mcTime = getNextMajorCompactTime(filesToCompact);if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {return result;}// TODO: Use better method for determining stamp of last major (HBASE-2990)//获取待合并文件中的修改时间最小的那个long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);//获取当前时间long now = EnvironmentEdgeManager.currentTime();//判断上次修改时间,是否在本次修改时间范围内,如果最早caption时间大于mcTime天前,// 即在mcTime时间内执行过,则不运行Majorif (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {//到了这一步就肯定会执行Major Compaction,后面判断,基本就是Debug时候使用String regionInfo;if (this.storeConfigInfo != null && this.storeConfigInfo instanceof HStore) {regionInfo = ((HStore)this.storeConfigInfo).getRegionInfo().getRegionNameAsString();} else {regionInfo = this.toString();}// Major compaction time has elapsed.long cfTTL = HConstants.FOREVER;if (this.storeConfigInfo != null) {//获取文件保存时间ttlcfTTL = this.storeConfigInfo.getStoreFileTtl();}if (filesToCompact.size() == 1) {//合并文件为1个// Single fileStoreFile sf = filesToCompact.iterator().next();//文件最小时间戳Long minTimestamp = sf.getMinimumTimestamp();//文件存在时间long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue();if (sf.isMajorCompaction() && (cfTTL == Long.MAX_VALUE || oldest < cfTTL)) {//文件未过期float blockLocalityIndex =sf.getHDFSBlockDistribution().getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {//判断文件是否本地化,如果未本地化则进行CompactionLOG.debug("Major compaction triggered on only store " + regionInfo+ "; to make hdfs blocks local, current blockLocalityIndex is "+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");result = true;} else {//跳过压缩LOG.debug("Skipping major compaction of " + regionInfo+ " because one (major) compacted file only, oldestTime " + oldest+ "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex+ " (min " + comConf.getMinLocalityToForceCompact() + ")");}} else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {//storeFile过期触发Major CompactionLOG.debug("Major compaction triggered on store " + regionInfo+ ", because keyvalues outdated; time since last major compaction "+ (now - lowTimestamp) + "ms");result = true;}} else {//如果合并文件为多个则返回trueLOG.debug("Major compaction triggered on store " + regionInfo+ "; time since last major compaction " + (now - lowTimestamp) + "ms");}result = true;//该处存在bug}return result;}

 这里返回的result的就是  s.isMajorCompaction() 返回的结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509783.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++ 之类的前置声明

转自&#xff1a;http://blog.csdn.net/fjb2080/archive/2010/04/27/5533514.aspx 作者&#xff1a;清林&#xff0c;博客名&#xff1a;飞空静渡 刚开始学习c的人都会遇到这样的问题&#xff1a; 定义一个类 class A&#xff0c;这个类里面使用了类B的对象b&#xff0c;然后定…

java中随机数Random和ThreadLocalRandom()用法与区别

package com.test;import java.util.Random; import java.util.concurrent.ThreadLocalRandom;public class M1001{public static void main(String[] args) {Random random new Random();System.out.println("-----------产生1到10之间的随机数----------------");…

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

目录 类的关系图 ​ RatioBasedCompactionPolicy selectCompaction 方法 getCurrentEligibleFiles方法 skipLargeFiles方法 createCompactionRequest方法 filterBulk方法 applyCompactionPolicy方法 removeExcessFiles方法 setIsMajor方法 其他相关文章 Hbase Compa…

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

目录 CompactSplitThread requestCompactionInternal方法 selectCompaction方法 requestCompaction方法 其他相关文章 Hbase Compaction 源码分析 - CompactionChecker Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略 Hbase Compaction 源码分析 - CompactS…

java如何生成验证码

package com.test;import java.util.Random; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom;public class M1001{public static void main(String[] args) {System.out.println("-----------产生5个随机数的验证码----------------");Strin…

m3u8下载ts 合并成一个视频

我们在用网页看视频时&#xff0c;很多时候视频是下载不下来的&#xff0c;当然这里面有很多技术来防止我们下载视频&#xff0c;接下来我将破解使用m3u8格式来下载视频。一般情况下&#xff0c;我们使用浏览器中Network来查看服务器和本机的数据传输&#xff0c;而视频的原地址…

用行为树的方式思考问题

这段时间做了很多和AI无关的事情&#xff0c;做了个Flash的3D引擎&#xff0c;用汇编写了些shader&#xff0c;做了很多引擎的工具&#xff0c;脚本&#xff0c;插件&#xff0c;游戏也发布了首个预告片&#xff0c;一年多的工作收获满满&#xff0c;职位从AI Engineer变成了En…

Linux常用指令2

linux的命令操作 1、日常操作命令 **查看当前所在的工作目录 pwd **查看当前系统的时间 date **查看有谁在线&#xff08;哪些人登陆到了服务器&#xff09; who 查看当前在线 last 查看最近的登陆历史记录 2、文件系统操作 ** ls / 查看根目录下的子节点&#xff…

qt, connect参数,Qt::DirectConnection,Qt::QueuedConnection

connect用于连接qt的信号和槽&#xff0c;在qt编程过程中不可或缺。它其实有第五个参数&#xff0c;只是一般使用默认值&#xff0c;在满足某些特殊需求的时候可能需要手动设置。 Qt::AutoConnection&#xff1a; 默认值&#xff0c;使用这个值则连接类型会在信号发送时决定。如…

Hbase Compaction 队列数量较大分析

目录 问题 问题原因分析 总结建议 问题 前几天朋友公司Hbase集群出现Compaction队列持续处于比较大的情况&#xff0c;并且mem flush队列也比较大&#xff0c;一起看了下问题&#xff0c;大概情况如下图 从图中可以看出来压缩队列总和持续在1000-2000&#xff0c;平对压缩队列…

java中Date和DateFormat、SimpleDateFormat类

package com.test;import java.text.DateFormat; import java.util.Date; import java.util.Random; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom;public class M1001{public static void main(String[] args) {//Date表示特定的瞬间,精确到毫秒Dat…

Hbase 2.x Region in transition (永久RIT) 异常解决

环境 Hbase 版本&#xff1a;2.0 问题原因 hbase长时间出现RIT&#xff0c;并且发生RIT的Region是已经删除了的Hbase表&#xff0c;表未删除的情况下执行assgin可以消除该问题 Hbase Region in transition (RIT) 异常解决&#xff1a;https://datamining.blog.csdn.net/artic…

sigslot库源码分析

言归正传&#xff0c;sigslot是一个用标准C语法实现的信号与槽机制的函数库&#xff0c;类型和线程安全。提到信号与槽机制&#xff0c;恐怕最容易想到的就是大名鼎鼎的Qt所支持的对象之间通信的模式吧。不过这里的信号与槽虽然在概念上等价与Qt所实现的信号与槽&#xff0c;但…

Mysql for linux 安装

Mysql for linux 安装&#xff1a; 第一步&#xff0c;参考这个链接至第四步为止 https://jingyan.baidu.com/article/cd4c2979209c32756e6e60e1.html 第二步&#xff1a;使用rpm -qa | grep mysql 查询安装状态 第三步&#xff1a;使用/etc/rc.d/init.d/mysqld status 检…

Hue开发指南 - 提交 Spark 程序

目录 Hue开发指南 1.Spark文件打包成一个Jar包提交Hue运行 1.1 上传Spark Jar包至HDFS文件系统 1.2.Hue中创建Spark任务 2.多jar包导入执行&#xff08;依赖jar包与主程序jar包分开打包&#xff09; 2.1 修改worksapce 2.2 添加程序依赖jar包 Hue开发指南 Hue是面向 Had…

如何缩小码农和高手的差距

为什么同样的时间有的人可以漂亮的完成工作&#xff0c;而有些人废了很大的力气也没有完成&#xff1f;前者我们常常称之为“大牛”&#xff0c;后者我们常常叫他们“菜鸟”。当然“大牛”都是相对而言的&#xff0c;“大牛”也不可能方方面面都非常厉害&#xff0c;换句话说大…

OpenResty 安装,收集日志保存到文本文件

目录 安装 1.安装相关类库 2.安装编译openresty 3.编写配置启动openresty服务 4.通过 openresty 保存日志数据到系统 安装 1.安装相关类库 yum install -y readline-devel pcre-devel openssl-devel gcc 2.安装编译openresty wget https://openresty.org/download/open…

暗时间-领悟

作者&#xff1a;排长链接&#xff1a;https://www.zhihu.com/question/20689852/answer/23227406来源&#xff1a;知乎著作权归作者所有&#xff0c;转载请联系作者获得授权。第一次看到“暗时间”这个词&#xff0c;我的第一反应是有点不屑&#xff0c;又是一个概念噱头吧。直…

Openresty Nginx 负载均衡

目录 OpenResty Openresty 服务配置文件 启动Openresty服务 测试调用接口 Nginx 负载均衡服务 nginx 配置文件 启动服务 实现功能 测试结果 这里实现个简单的负载均衡&#xff0c;只做功能展示&#xff08;实际业务比这复杂高&#xff0c;单台服务器无法满足需求的情况…

总结--美丽的敷衍

我看新年 转眼间&#xff0c;2015已经悄然离去&#xff0c;正像2015年刚来时候感叹2014的白驹过隙。年年岁岁花相似&#xff0c;岁岁年年人不同。时间过去了&#xff0c;自己又有如何的不同呢&#xff1f;客观的说&#xff0c;这一年有了一定的长进&#xff0c;但总感觉失去/错…