Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

目录

CompactSplitThread

requestCompactionInternal方法

selectCompaction方法

requestCompaction方法


 

其他相关文章

Hbase Compaction 源码分析 - CompactionChecker

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

 

CompactSplitThread

从名称我们可以看出来这是个处理Compcation和Split的线程

我们从下面的方法调用关系来看可发现CompactionChecker会调用requestCompactionInternal方法

requestCompactionInternal方法

/*** @param r region store belongs to* @param s Store to request compaction on* @param why Why compaction requested -- used in debug messages* @param priority override the default priority (NO_PRIORITY == decide)* @param request custom compaction request. Can be <tt>null</tt> in which case a simple*          compaction will be used.*///selectNow:系统自动触发的system compaction,selectNow参数为false,如果周期性或者人工触发的major compaction的合并,则selectNow为trueprivate synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,final String why, int priority, CompactionRequest request, boolean selectNow, User user)throws IOException {//判断Hregionserver服务是否停止if (this.server.isStopped()|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {return null;}CompactionContext compaction = null;if (selectNow) {//周期执行执行MajorCompaction,或人工触发的major compaction,selectNow为true.compaction = selectCompaction(r, s, priority, request, user);if (compaction == null) return null; // message logged inside}// We assume that most compactions are small. So, put system compactions into small// pool; we will do selection there, and move to large pool if necessary.// throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",//          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());//如果selectNow为ture,需要compaction的文件大小大于throttlePoint值,则使用longCompactions线程,否则使用shortCompactions线程//longCompactions和shortCompactions默认大小都是1,生产环境可以调整大一些ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))? longCompactions : shortCompactions;pool.execute(new CompactionRunner(s, r, compaction, pool, user));((HRegion)r).incrementCompactionsQueuedCount();if (LOG.isDebugEnabled()) {String type = (pool == shortCompactions) ? "Small " : "Large ";LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);}return selectNow ? compaction.getRequest() : null;}

 该方法主要内容如下

1.如果HRegionServer停止则跳过
2.判断是否selectNow,如果为true,则执行selectCompaction方法,否则跳过
electNow:系统自动触发的system compaction,selectNow参数为false,如果周期性执行MajorCompaction或者人工触发的major compaction的合并,则selectNow为true
3.选择线程池,如果selectNow为ture且需要compaction的文件大小大于throttlePoint值,则使用longCompactions线程,否则使用shortCompactions线程
longCompactions和shortCompactions线程池默认大小都是1,生产环境可以调整大一些
4.执行线程

selectCompaction方法

 private CompactionContext selectCompaction(final Region r, final Store s,int priority, CompactionRequest request, User user) throws IOException {//调用HStore.requestCompaction方法,获取CompactionContext数据CompactionContext compaction = s.requestCompaction(priority, request, user);if (compaction == null) {if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +" because compaction request was cancelled");}return null;}//确认compcation不为空assert compaction.hasSelection();if (priority != Store.NO_PRIORITY) {compaction.getRequest().setPriority(priority);}return compaction;}

该方法主要作用 :获取CompactionContext数据

requestCompaction方法

  @Overridepublic CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,User user) throws IOException {// don't even select for compaction if writes are disabled//如果对应的region不可以写则返回nullif (!this.areWritesEnabled()) {return null;}// Before we do compaction, try to get rid of unneeded files to simplify things.removeUnneededFiles();//通过StoreEngine获取CompactionContext,这里介绍使用的是DefaultStoreEnginefinal CompactionContext compaction = storeEngine.createCompaction();CompactionRequest request = null;//设置读锁this.lock.readLock().lock();try {//设置同步锁synchronized (filesCompacting) {final Store thisStore = this;// First, see if coprocessor would want to override selection.//如果存在协处理器if (this.getCoprocessorHost() != null) {final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);boolean override = false;if (user == null) {override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,baseRequest);} else {try {override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws Exception {return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,baseRequest);}});} catch (InterruptedException ie) {InterruptedIOException iioe = new InterruptedIOException();iioe.initCause(ie);throw iioe;}}if (override) {// Coprocessor is overriding normal file selection.compaction.forceSelect(new CompactionRequest(candidatesForCoproc));}}// Normal case - coprocessor is not overriding file selection.//正常情况if (!compaction.hasSelection()) {//是否为用户Compactionboolean isUserCompaction = priority == Store.PRIORITY_USER;//判断是否为高峰期boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&offPeakCompactionTracker.compareAndSet(false, true);try {//调用DefaultCompactionContext的select方法compaction.select(this.filesCompacting, isUserCompaction,mayUseOffPeak, forceMajor && filesCompacting.isEmpty());} catch (IOException e) {if (mayUseOffPeak) {offPeakCompactionTracker.set(false);}throw e;}assert compaction.hasSelection();//isOffPeak为true,这种压实被提升为非高峰if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {// Compaction policy doesn't want to take advantage of off-peak.offPeakCompactionTracker.set(false);}}if (this.getCoprocessorHost() != null) {if (user == null) {this.getCoprocessorHost().postCompactSelection(this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);} else {try {user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws Exception {getCoprocessorHost().postCompactSelection(thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);return null;}});} catch (InterruptedException ie) {InterruptedIOException iioe = new InterruptedIOException();iioe.initCause(ie);throw iioe;}}}// Selected files; see if we have a compaction with some custom base request.if (baseRequest != null) {// Update the request with what the system thinks the request should be;// its up to the request if it wants to listen.compaction.forceSelect(baseRequest.combineWith(compaction.getRequest()));}// Finally, we have the resulting files list. Check if we have any files at all.request = compaction.getRequest();final Collection<StoreFile> selectedFiles = request.getFiles();if (selectedFiles.isEmpty()) {return null;}addToCompactingFiles(selectedFiles);// If we're enqueuing a major, clear the force flag.this.forceMajor = this.forceMajor && !request.isMajor();// Set common request properties.// Set priority, either override value supplied by caller or from store.request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());}} finally {this.lock.readLock().unlock();}LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"+ (request.isAllFiles() ? " (all files)" : ""));this.region.reportCompactionRequestStart(request.isMajor());return compaction;}

该方法主要作用:

1.先判断region是否可以写
2.提出不必要的文件
3.处理存在协处理器的数据
4.调用DefaultCompactionContext.select方法


DefaultCompactionContext.select方法最终调用SortedCompactionPolicy.selectCompaction 方法

参数说明:

参数说明默认值

hbase.regionserver.thread.compaction.small

RegionServer 小型压缩线程计数1
hbase.regionserver.thread.compaction.largeRegionServer大型压缩线程计数1

 

 

 

 

下一步就是在具体合并策略选取文件

查看:Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509779.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java如何生成验证码

package com.test;import java.util.Random; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom;public class M1001{public static void main(String[] args) {System.out.println("-----------产生5个随机数的验证码----------------");Strin…

m3u8下载ts 合并成一个视频

我们在用网页看视频时&#xff0c;很多时候视频是下载不下来的&#xff0c;当然这里面有很多技术来防止我们下载视频&#xff0c;接下来我将破解使用m3u8格式来下载视频。一般情况下&#xff0c;我们使用浏览器中Network来查看服务器和本机的数据传输&#xff0c;而视频的原地址…

用行为树的方式思考问题

这段时间做了很多和AI无关的事情&#xff0c;做了个Flash的3D引擎&#xff0c;用汇编写了些shader&#xff0c;做了很多引擎的工具&#xff0c;脚本&#xff0c;插件&#xff0c;游戏也发布了首个预告片&#xff0c;一年多的工作收获满满&#xff0c;职位从AI Engineer变成了En…

Linux常用指令2

linux的命令操作 1、日常操作命令 **查看当前所在的工作目录 pwd **查看当前系统的时间 date **查看有谁在线&#xff08;哪些人登陆到了服务器&#xff09; who 查看当前在线 last 查看最近的登陆历史记录 2、文件系统操作 ** ls / 查看根目录下的子节点&#xff…

qt, connect参数,Qt::DirectConnection,Qt::QueuedConnection

connect用于连接qt的信号和槽&#xff0c;在qt编程过程中不可或缺。它其实有第五个参数&#xff0c;只是一般使用默认值&#xff0c;在满足某些特殊需求的时候可能需要手动设置。 Qt::AutoConnection&#xff1a; 默认值&#xff0c;使用这个值则连接类型会在信号发送时决定。如…

Hbase Compaction 队列数量较大分析

目录 问题 问题原因分析 总结建议 问题 前几天朋友公司Hbase集群出现Compaction队列持续处于比较大的情况&#xff0c;并且mem flush队列也比较大&#xff0c;一起看了下问题&#xff0c;大概情况如下图 从图中可以看出来压缩队列总和持续在1000-2000&#xff0c;平对压缩队列…

java中Date和DateFormat、SimpleDateFormat类

package com.test;import java.text.DateFormat; import java.util.Date; import java.util.Random; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom;public class M1001{public static void main(String[] args) {//Date表示特定的瞬间,精确到毫秒Dat…

Hbase 2.x Region in transition (永久RIT) 异常解决

环境 Hbase 版本&#xff1a;2.0 问题原因 hbase长时间出现RIT&#xff0c;并且发生RIT的Region是已经删除了的Hbase表&#xff0c;表未删除的情况下执行assgin可以消除该问题 Hbase Region in transition (RIT) 异常解决&#xff1a;https://datamining.blog.csdn.net/artic…

sigslot库源码分析

言归正传&#xff0c;sigslot是一个用标准C语法实现的信号与槽机制的函数库&#xff0c;类型和线程安全。提到信号与槽机制&#xff0c;恐怕最容易想到的就是大名鼎鼎的Qt所支持的对象之间通信的模式吧。不过这里的信号与槽虽然在概念上等价与Qt所实现的信号与槽&#xff0c;但…

Mysql for linux 安装

Mysql for linux 安装&#xff1a; 第一步&#xff0c;参考这个链接至第四步为止 https://jingyan.baidu.com/article/cd4c2979209c32756e6e60e1.html 第二步&#xff1a;使用rpm -qa | grep mysql 查询安装状态 第三步&#xff1a;使用/etc/rc.d/init.d/mysqld status 检…

Hue开发指南 - 提交 Spark 程序

目录 Hue开发指南 1.Spark文件打包成一个Jar包提交Hue运行 1.1 上传Spark Jar包至HDFS文件系统 1.2.Hue中创建Spark任务 2.多jar包导入执行&#xff08;依赖jar包与主程序jar包分开打包&#xff09; 2.1 修改worksapce 2.2 添加程序依赖jar包 Hue开发指南 Hue是面向 Had…

如何缩小码农和高手的差距

为什么同样的时间有的人可以漂亮的完成工作&#xff0c;而有些人废了很大的力气也没有完成&#xff1f;前者我们常常称之为“大牛”&#xff0c;后者我们常常叫他们“菜鸟”。当然“大牛”都是相对而言的&#xff0c;“大牛”也不可能方方面面都非常厉害&#xff0c;换句话说大…

OpenResty 安装,收集日志保存到文本文件

目录 安装 1.安装相关类库 2.安装编译openresty 3.编写配置启动openresty服务 4.通过 openresty 保存日志数据到系统 安装 1.安装相关类库 yum install -y readline-devel pcre-devel openssl-devel gcc 2.安装编译openresty wget https://openresty.org/download/open…

暗时间-领悟

作者&#xff1a;排长链接&#xff1a;https://www.zhihu.com/question/20689852/answer/23227406来源&#xff1a;知乎著作权归作者所有&#xff0c;转载请联系作者获得授权。第一次看到“暗时间”这个词&#xff0c;我的第一反应是有点不屑&#xff0c;又是一个概念噱头吧。直…

Openresty Nginx 负载均衡

目录 OpenResty Openresty 服务配置文件 启动Openresty服务 测试调用接口 Nginx 负载均衡服务 nginx 配置文件 启动服务 实现功能 测试结果 这里实现个简单的负载均衡&#xff0c;只做功能展示&#xff08;实际业务比这复杂高&#xff0c;单台服务器无法满足需求的情况…

总结--美丽的敷衍

我看新年 转眼间&#xff0c;2015已经悄然离去&#xff0c;正像2015年刚来时候感叹2014的白驹过隙。年年岁岁花相似&#xff0c;岁岁年年人不同。时间过去了&#xff0c;自己又有如何的不同呢&#xff1f;客观的说&#xff0c;这一年有了一定的长进&#xff0c;但总感觉失去/错…

Hadoop Yarn REST API未授权漏洞利用挖矿分析

目录 一、背景情况 二、 漏洞说明 攻击步骤&#xff1a; 三、入侵分析 四、安全建议 清理病毒 安全加固 五、IOCs 一、背景情况 5月5日腾讯云安全曾针对攻击者利用Hadoop Yarn资源管理系统REST API未授权漏洞对服务器进行攻击&#xff0c;攻击者可以在未授权的情况…

读书-悟

作者&#xff1a;梅芳growing链接&#xff1a;https://www.zhihu.com/question/20689852/answer/95018631来源&#xff1a;知乎著作权归作者所有&#xff0c;转载请联系作者获得授权。《暗时间》这本书&#xff0c;我读过3遍&#xff0c;第一便是在大三时&#xff0c;第二遍是用…

Linux shell编程学习总结

主要内容&#xff1a; shell编程sed命令awk命令crontab定时器 什么是Shell&#xff1f; Shell是用户与内核进行交互操作的一种接口&#xff0c;目前最流行的Shell称为bash Shell Shell也是一门编程语言<解释型的编程语言>&#xff0c;即shell脚本 一个系统可以存在多…

Flink ProcessFunction 介绍使用

目录 实现功能 代码 测试 问题 官网描述&#xff1a;https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html The ProcessFunction is a low-level stream processing operation, giving access to the basic build…