Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

目录

CompactSplitThread

requestCompactionInternal方法

selectCompaction方法

requestCompaction方法


 

其他相关文章

Hbase Compaction 源码分析 - CompactionChecker

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

 

CompactSplitThread

从名称我们可以看出来这是个处理Compcation和Split的线程

我们从下面的方法调用关系来看可发现CompactionChecker会调用requestCompactionInternal方法

requestCompactionInternal方法

/*** @param r region store belongs to* @param s Store to request compaction on* @param why Why compaction requested -- used in debug messages* @param priority override the default priority (NO_PRIORITY == decide)* @param request custom compaction request. Can be <tt>null</tt> in which case a simple*          compaction will be used.*///selectNow:系统自动触发的system compaction,selectNow参数为false,如果周期性或者人工触发的major compaction的合并,则selectNow为trueprivate synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,final String why, int priority, CompactionRequest request, boolean selectNow, User user)throws IOException {//判断Hregionserver服务是否停止if (this.server.isStopped()|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {return null;}CompactionContext compaction = null;if (selectNow) {//周期执行执行MajorCompaction,或人工触发的major compaction,selectNow为true.compaction = selectCompaction(r, s, priority, request, user);if (compaction == null) return null; // message logged inside}// We assume that most compactions are small. So, put system compactions into small// pool; we will do selection there, and move to large pool if necessary.// throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",//          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());//如果selectNow为ture,需要compaction的文件大小大于throttlePoint值,则使用longCompactions线程,否则使用shortCompactions线程//longCompactions和shortCompactions默认大小都是1,生产环境可以调整大一些ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))? longCompactions : shortCompactions;pool.execute(new CompactionRunner(s, r, compaction, pool, user));((HRegion)r).incrementCompactionsQueuedCount();if (LOG.isDebugEnabled()) {String type = (pool == shortCompactions) ? "Small " : "Large ";LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);}return selectNow ? compaction.getRequest() : null;}

 该方法主要内容如下

1.如果HRegionServer停止则跳过
2.判断是否selectNow,如果为true,则执行selectCompaction方法,否则跳过
electNow:系统自动触发的system compaction,selectNow参数为false,如果周期性执行MajorCompaction或者人工触发的major compaction的合并,则selectNow为true
3.选择线程池,如果selectNow为ture且需要compaction的文件大小大于throttlePoint值,则使用longCompactions线程,否则使用shortCompactions线程
longCompactions和shortCompactions线程池默认大小都是1,生产环境可以调整大一些
4.执行线程

selectCompaction方法

 private CompactionContext selectCompaction(final Region r, final Store s,int priority, CompactionRequest request, User user) throws IOException {//调用HStore.requestCompaction方法,获取CompactionContext数据CompactionContext compaction = s.requestCompaction(priority, request, user);if (compaction == null) {if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +" because compaction request was cancelled");}return null;}//确认compcation不为空assert compaction.hasSelection();if (priority != Store.NO_PRIORITY) {compaction.getRequest().setPriority(priority);}return compaction;}

该方法主要作用 :获取CompactionContext数据

requestCompaction方法

  @Overridepublic CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,User user) throws IOException {// don't even select for compaction if writes are disabled//如果对应的region不可以写则返回nullif (!this.areWritesEnabled()) {return null;}// Before we do compaction, try to get rid of unneeded files to simplify things.removeUnneededFiles();//通过StoreEngine获取CompactionContext,这里介绍使用的是DefaultStoreEnginefinal CompactionContext compaction = storeEngine.createCompaction();CompactionRequest request = null;//设置读锁this.lock.readLock().lock();try {//设置同步锁synchronized (filesCompacting) {final Store thisStore = this;// First, see if coprocessor would want to override selection.//如果存在协处理器if (this.getCoprocessorHost() != null) {final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);boolean override = false;if (user == null) {override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,baseRequest);} else {try {override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws Exception {return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,baseRequest);}});} catch (InterruptedException ie) {InterruptedIOException iioe = new InterruptedIOException();iioe.initCause(ie);throw iioe;}}if (override) {// Coprocessor is overriding normal file selection.compaction.forceSelect(new CompactionRequest(candidatesForCoproc));}}// Normal case - coprocessor is not overriding file selection.//正常情况if (!compaction.hasSelection()) {//是否为用户Compactionboolean isUserCompaction = priority == Store.PRIORITY_USER;//判断是否为高峰期boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&offPeakCompactionTracker.compareAndSet(false, true);try {//调用DefaultCompactionContext的select方法compaction.select(this.filesCompacting, isUserCompaction,mayUseOffPeak, forceMajor && filesCompacting.isEmpty());} catch (IOException e) {if (mayUseOffPeak) {offPeakCompactionTracker.set(false);}throw e;}assert compaction.hasSelection();//isOffPeak为true,这种压实被提升为非高峰if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {// Compaction policy doesn't want to take advantage of off-peak.offPeakCompactionTracker.set(false);}}if (this.getCoprocessorHost() != null) {if (user == null) {this.getCoprocessorHost().postCompactSelection(this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);} else {try {user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws Exception {getCoprocessorHost().postCompactSelection(thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);return null;}});} catch (InterruptedException ie) {InterruptedIOException iioe = new InterruptedIOException();iioe.initCause(ie);throw iioe;}}}// Selected files; see if we have a compaction with some custom base request.if (baseRequest != null) {// Update the request with what the system thinks the request should be;// its up to the request if it wants to listen.compaction.forceSelect(baseRequest.combineWith(compaction.getRequest()));}// Finally, we have the resulting files list. Check if we have any files at all.request = compaction.getRequest();final Collection<StoreFile> selectedFiles = request.getFiles();if (selectedFiles.isEmpty()) {return null;}addToCompactingFiles(selectedFiles);// If we're enqueuing a major, clear the force flag.this.forceMajor = this.forceMajor && !request.isMajor();// Set common request properties.// Set priority, either override value supplied by caller or from store.request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());}} finally {this.lock.readLock().unlock();}LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"+ (request.isAllFiles() ? " (all files)" : ""));this.region.reportCompactionRequestStart(request.isMajor());return compaction;}

该方法主要作用:

1.先判断region是否可以写
2.提出不必要的文件
3.处理存在协处理器的数据
4.调用DefaultCompactionContext.select方法


DefaultCompactionContext.select方法最终调用SortedCompactionPolicy.selectCompaction 方法

参数说明:

参数说明默认值

hbase.regionserver.thread.compaction.small

RegionServer 小型压缩线程计数1
hbase.regionserver.thread.compaction.largeRegionServer大型压缩线程计数1

 

 

 

 

下一步就是在具体合并策略选取文件

查看:Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509779.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hbase Compaction 队列数量较大分析

目录 问题 问题原因分析 总结建议 问题 前几天朋友公司Hbase集群出现Compaction队列持续处于比较大的情况&#xff0c;并且mem flush队列也比较大&#xff0c;一起看了下问题&#xff0c;大概情况如下图 从图中可以看出来压缩队列总和持续在1000-2000&#xff0c;平对压缩队列…

Hbase 2.x Region in transition (永久RIT) 异常解决

环境 Hbase 版本&#xff1a;2.0 问题原因 hbase长时间出现RIT&#xff0c;并且发生RIT的Region是已经删除了的Hbase表&#xff0c;表未删除的情况下执行assgin可以消除该问题 Hbase Region in transition (RIT) 异常解决&#xff1a;https://datamining.blog.csdn.net/artic…

sigslot库源码分析

言归正传&#xff0c;sigslot是一个用标准C语法实现的信号与槽机制的函数库&#xff0c;类型和线程安全。提到信号与槽机制&#xff0c;恐怕最容易想到的就是大名鼎鼎的Qt所支持的对象之间通信的模式吧。不过这里的信号与槽虽然在概念上等价与Qt所实现的信号与槽&#xff0c;但…

Hue开发指南 - 提交 Spark 程序

目录 Hue开发指南 1.Spark文件打包成一个Jar包提交Hue运行 1.1 上传Spark Jar包至HDFS文件系统 1.2.Hue中创建Spark任务 2.多jar包导入执行&#xff08;依赖jar包与主程序jar包分开打包&#xff09; 2.1 修改worksapce 2.2 添加程序依赖jar包 Hue开发指南 Hue是面向 Had…

如何缩小码农和高手的差距

为什么同样的时间有的人可以漂亮的完成工作&#xff0c;而有些人废了很大的力气也没有完成&#xff1f;前者我们常常称之为“大牛”&#xff0c;后者我们常常叫他们“菜鸟”。当然“大牛”都是相对而言的&#xff0c;“大牛”也不可能方方面面都非常厉害&#xff0c;换句话说大…

OpenResty 安装,收集日志保存到文本文件

目录 安装 1.安装相关类库 2.安装编译openresty 3.编写配置启动openresty服务 4.通过 openresty 保存日志数据到系统 安装 1.安装相关类库 yum install -y readline-devel pcre-devel openssl-devel gcc 2.安装编译openresty wget https://openresty.org/download/open…

Hadoop Yarn REST API未授权漏洞利用挖矿分析

目录 一、背景情况 二、 漏洞说明 攻击步骤&#xff1a; 三、入侵分析 四、安全建议 清理病毒 安全加固 五、IOCs 一、背景情况 5月5日腾讯云安全曾针对攻击者利用Hadoop Yarn资源管理系统REST API未授权漏洞对服务器进行攻击&#xff0c;攻击者可以在未授权的情况…

Linux shell编程学习总结

主要内容&#xff1a; shell编程sed命令awk命令crontab定时器 什么是Shell&#xff1f; Shell是用户与内核进行交互操作的一种接口&#xff0c;目前最流行的Shell称为bash Shell Shell也是一门编程语言<解释型的编程语言>&#xff0c;即shell脚本 一个系统可以存在多…

Flink ProcessFunction 介绍使用

目录 实现功能 代码 测试 问题 官网描述&#xff1a;https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html The ProcessFunction is a low-level stream processing operation, giving access to the basic build…

Flink keyby 数据倾斜问题处理

上一篇我们使用keyby后发现数据严重倾斜 https://datamining.blog.csdn.net/article/details/105316728 大概看下问题所在&#xff0c;大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题 之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计&a…

linux中iptables对防火墙的操作

Iptables教程 1. iptables防火墙简介 Iptables也叫netfilter是Linux下自带的一款免费且优秀的基于包过滤的防火墙工具&#xff0c;它的功能十分强大&#xff0c;使用非常灵活&#xff0c;可以对流入、流出、流经服务器的数据包进行精细的控制。iptables是Linux2.4及2.6内核中…

Web Components入门不完全指北

目前流行的各类前端框架&#xff0c;不管是react, angular还是vue&#xff0c;都有一个共同点&#xff0c;那就是支持组件化开发&#xff0c;但事实上随着浏览器的发展&#xff0c;现在浏览器也原生支持组件式开发&#xff0c;本文将通过介绍Web Components 的三个主要概念&…

Flink 1.9 CDH 6.3 集成

目录 1.下载准备文件 2.felink csa jar包准备 3.将 Flink Parcel放入httpd目录下 4.配置CDH Flink Parcel 5.安装Flink 1.下载准备文件 https://archive.cloudera.com/csa/1.0.0.0/csd/FLINK-1.9.0-csa1.0.0.0-cdh6.3.0.jarhttps://archive.cloudera.com/csa/1.0.0.0/parc…

ssh免密登陆机制示意图

ssh免密登陆机制示意图

CDH 6.x 安装 Phoenix 服务

最近有个新项目启动&#xff0c;版本升级到6.3&#xff0c;发现CDH6.2 版本已经支持Phoenix parcel安装 一、准备文件 下载 https://archive.cloudera.com/phoenix/6.2.0/csd/PHOENIX-1.0.jar 下载parcel #目录 https://archive.cloudera.com/phoenix/6.2.0/parcels/ #根据…

域名服务的工作流程

域名服务的工作流程

Kafka 消费者组 Rebalance 详解

Rebalance作用 Rebalance 本质上是一种协议&#xff0c;主要作用是为了保证消费者组&#xff08;Consumer Group&#xff09;下的所有消费者&#xff08;Consumer&#xff09;消费的主体分区达成均衡。 比如&#xff1a;我们有10个分区&#xff0c;当我们有一个消费者时&…

Kafka JMX 监控 之 jmxtrans + influxdb + grafana

目录 效果图 环境准备 安装 influxdb 安装我们刚刚下载 influxdb rpm文件 查看默认配置 修改参数 启动 influxdb 查看启动状态 设置基本配置 influxdb 其他命令扩展 安装 jmxtrans 可能遇到的异常 验证jmxtrans是否成功运行 安装 Grafana 安装 influxDB 与 Grafa…

实例浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

一.基本概念 我们通俗一点讲&#xff1a; Level_triggered(水平触发)&#xff1a;当被监控的文件描述符上有可读写事件发生时&#xff0c;epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如…