Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

目录

类的关系图

RatioBasedCompactionPolicy

selectCompaction 方法

getCurrentEligibleFiles方法

skipLargeFiles方法

createCompactionRequest方法

filterBulk方法

applyCompactionPolicy方法

removeExcessFiles方法

setIsMajor方法

 


其他相关文章

Hbase Compaction 源码分析 - CompactionChecker

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

之前介绍 CompactionChecker 执行时机,这回接着介绍具体的策略

类的关系图

RatioBasedCompactionPolicy

该类在

org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy

 

 

selectCompaction 方法

调用过程如下

我们看一下RatioBasedCompactionPolicy 的 selectCompaction 实现方法,实际是在父类  SortedCompactionPolicy 中

//candidateFiles 候选文件,并且按照seqId从最早到最新的排序//filesCompacting 正在Compcation的文件//mayUseOffPeak 是否为高峰期//forceMajor 是否为MajorCompaction,该值在 CompactionChecker 中会设置为true//返回 符合Compaction的候选列表public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,final List<StoreFile> filesCompacting, final boolean isUserCompaction,final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {// Preliminary compaction subject to filtersArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);// Stuck and not compacting enough (estimate). It is not guaranteed that we will be// able to compact more if stuck and compacting, because ratio policy excludes some// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).int futureFiles = filesCompacting.isEmpty() ? 0 : 1;//如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),
//   blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)>= storeConfigInfo.getBlockingFileCount();//删除正在合并的文件candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +filesCompacting.size() + " compacting, " + candidateSelection.size() +" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");// If we can't have all files, we cannot do major anyway//判断是否包含全部文件,如果没有正在合并的文件则为trueboolean isAllFiles = candidateFiles.size() == candidateSelection.size();//如果是全部文件,并且是MajorCompaction,则不进行文件过滤,否则进行文件过滤,过滤掉大于hbase.hstore.compaction.max.size值的文件if (!(forceMajor && isAllFiles)) {//排除大于hbase.hstore.compaction.max.size值的数据,默认Long.MAX_VALUE//hbase.hstore.compaction.max.size 表示文件大小大于该值的store file 一定会被minor compaction排除candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);isAllFiles = candidateFiles.size() == candidateSelection.size();}// Try a major compaction if this is a user-requested major compaction,// or if we do not have too many files to compact and this was requested as a major compaction//isTryingMajor判断条件有两种// 1、Major合并为True,且包含所有问文件,且是一个用户合并// 2、Major合并为True,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)|| (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection))&& (candidateSelection.size() < comConf.getMaxFilesToCompact()));// Or, if there are any references among the candidates.//判断是否包含分裂后的文件boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);//如果不是isTryingMajor且不包含分裂后的文件,则 createCompactionRequest 方法中进行进一步文件过滤CompactionRequest result = createCompactionRequest(candidateSelection,isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);ArrayList<StoreFile> filesToCompact = Lists.newArrayList(result.getFiles());//过滤掉多余最大合并的文件数量removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);result.updateFiles(filesToCompact);isAllFiles = (candidateFiles.size() == filesToCompact.size());result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak);result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);return result;}

该方法主要流程:

 

1.传入参数与返回类型

candidateFiles 压缩候选文件,并且按照seqId从最早到最新的排序

filesCompacting 正在压缩的文件

mayUseOffPeak 是否为高峰期

forceMajor 是否为MajorCompaction,该值在 CompactionChecker 中会设置为true

isUserCompaction 是否为用户压缩

返回 CompactionRequest ,符合Compaction的候选列表

2.判断是否阻塞,等待合并的文件数量大于blockingStoreFiles,认为是阻塞

如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),

hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。

3.从候选列表中candidateSelection删除正在Compaction的文件

4.判断是否包含全部文件,如果没有正在合并的文件isAllFiles则为true

5.如果是全部文件,并且是MajorCompaction,则不进行文件过滤,否则进行文件过滤;

文件过滤方法:skipLargeFiles,过滤掉大于hbase.hstore.compaction.max.size值的文件,该方法后面介绍

6.判断isTryingMajor(判断后续是否为Major使用),判断条件有两种,满足一个即为true:

a.Major(forceMajor)合并为true,且包含所有文件,且是一个用户合并

b.Major(forceMajor)合并为true,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目

7.判断candidateSelection是否包含分裂后的文件

8.如果不是isTryingMajor且不包含分裂后的文件,则执行 createCompactionRequest 方法中进行进一步文件过滤,createCompactionRequest方法后面介绍

9.执行removeExcessFiles方法,如果大于最大合并的文件数量,则过滤掉多余的数量,否则不处理;执行removeExcessFiles方法下一步介绍

10.在返回的result中设置本次Compcation的类型(Major或者Minor),调用方法 setIsMajor,下面介绍

getCurrentEligibleFiles方法

protected ArrayList<StoreFile> getCurrentEligibleFiles(ArrayList<StoreFile> candidateFiles,final List<StoreFile> filesCompacting) {// candidates = all storefiles not already in compaction queueif (!filesCompacting.isEmpty()) {// exclude all files older than the newest file we're currently// compacting. this allows us to preserve contiguity (HBASE-2856)StoreFile last = filesCompacting.get(filesCompacting.size() - 1);int idx = candidateFiles.indexOf(last);Preconditions.checkArgument(idx != -1);candidateFiles.subList(0, idx + 1).clear();}return candidateFiles;}

该方法主要流程:

1.从候选文件列表中删除正在Compaction的文件

skipLargeFiles方法

 /*** @param candidates pre-filtrate* @return filtered subset exclude all files above maxCompactSize*   Also save all references. We MUST compact them*/protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,boolean mayUseOffpeak) {int pos = 0;//候选文件大于0 且文件不是分裂后的文件 且文件大小大于配置最大文件大小maxCompactSize时,该文件会被剔除while (pos < candidates.size() && !candidates.get(pos).isReference()&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {++pos;}if (pos > 0) {LOG.debug("Some files are too large. Excluding " + pos+ " files from compaction candidates");candidates.subList(0, pos).clear();}return candidates;}

该方法主要流程:

1.判断候选文件大于0 且文件不是分裂后的文件(如果是split后的文件,是需要进行Compaction,不会剔除) 且文件大小大于配置最大文件大小maxCompactSize时,执行++pos

2.pos大于0,清除大的数据

createCompactionRequest方法

 protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {if (!tryingMajor) {//进入这里则为minorCompaction//过滤掉BulkLoad到HBase的文件candidateSelection = filterBulk(candidateSelection);//过滤掉不应该Minor的文件candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);candidateSelection = checkMinFilesCriteria(candidateSelection,comConf.getMinFilesToCompact());}return new CompactionRequest(candidateSelection);}

该方法主要流程:

如果不是isTryingMajor且不包含分裂后的文件,则为MinorCompaction 进行进一步文件过滤,否则直接返回

filterBulk方法

 /*** @param candidates pre-filtrate* @return filtered subset exclude all bulk load files if configured*/protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {candidates.removeAll(Collections2.filter(candidates, new Predicate<StoreFile>() {@Overridepublic boolean apply(StoreFile input) {//判断该文件是否需要执行MinorCompactionreturn input.excludeFromMinorCompaction();}}));return candidates;}

该方法主要作用:

判断StoreFIle是否设置excludeFromMinorCompaction,也就是过滤掉BulkLoad到HBase的文件

applyCompactionPolicy方法

 protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {if (candidates.isEmpty()) {return candidates;}
//前提:,选择待合并的文件按时间排序,最旧的文件排最前。// we're doing a minor compaction, let's see what files are applicableint start = 0;//hbase.hstore.compaction.ratio 1.2double ratio = comConf.getCompactionRatio();//判断是否为高峰期,高峰期 ratio 值为5,非高峰期为1.2if (mayUseOffPeak) {//获取hbase.hstore.compaction.ratio.offpeak值,默认是5ratio = comConf.getCompactionRatioOffPeak();LOG.info("Running an off-peak compaction, selection ratio = " + ratio);}// get store file sizes for incremental compacting selection.//https://blog.csdn.net/bryce123phy/article/details/56003628//获取待Compaction文件数量final int countOfFiles = candidates.size();long[] fileSizes = new long[countOfFiles];//每个file大小long[] sumSize = new long[countOfFiles];//前几个file大小总和for (int i = countOfFiles - 1; i >= 0; --i) {StoreFile file = candidates.get(i);fileSizes[i] = file.getReader().length();// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo//getMaxFilesToCompact 获取最大文件压缩数,默认为10int tooFar = i + comConf.getMaxFilesToCompact() - 1;sumSize[i] = fileSizes[i]+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);}//getMinFilesToCompact : hbase.hstore.compactionThreshold
//    如在任意一个 HStore 中有超过此数量的 HStoreFiles,
//    则将运行压缩以将所有 HStoreFiles 文件作为一个 HStoreFile 重新写入。
//    (每次 memstore 刷新写入一个 HStoreFile)您可通过指定更大数量延长压缩,
//    但压缩将运行更长时间。在压缩期间,更新无法刷新到磁盘。长时间压缩需要足够的内存,
//    以在压缩的持续时间内记录所有更新。如太大,压缩期间客户端会超时。//getMinCompactSize 最小合并大小//也就是说,当待合并文件数量大于最小合并数量  并且// 文件大小大于Math.max(comConf.getMinCompactSize(),(long) (sumSize[start + 1] * ratio)值// 该端代码意思是过滤比较大的文件,默认认为最早的StoreFile文件大小最大(之前合并过)
//这里高峰期满足条件的数量小于等于非高峰期数量while (countOfFiles - start >= comConf.getMinFilesToCompact() &&fileSizes[start] > Math.max(comConf.getMinCompactSize(),(long) (sumSize[start + 1] * ratio))) {++start;}if (start < countOfFiles) {//从 countOfFiles 个候选文件中选取 start 个文件进行CompactionLOG.info("Default compaction algorithm has selected " + (countOfFiles - start)+ " files from " + countOfFiles + " candidates");} else if (mayBeStuck) {//mayBeStuck判断规则://如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),//hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。// We may be stuck. Compact the latest files if we can.int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();if (filesToLeave >= 0) {start = filesToLeave;}}candidates.subList(0, start).clear();return candidates;}

该方法主要流程

1.判断是否为高峰期,并确认ratio的值
2.计算文件大小
3.增加start变量,过滤掉文件较大的文件
4.判断
    a.判断如果不是所有文件都被过滤,则从候选列表清空比较大的文件,我们认为越老的文件(在变量的最前面,所以可以通过++start方式可以过滤大文件),文件占用空间越大
    b.判断如果所有文件都被过滤,继续判断是否阻塞(如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7)则视为阻塞),如果阻塞则将start调整,保证Compaction压缩 

removeExcessFiles方法

 protected void removeExcessFiles(ArrayList<StoreFile> candidates,boolean isUserCompaction, boolean isMajorCompaction) {//如果待合并的文件大于配置的最大合并文件数量int excess = candidates.size() - comConf.getMaxFilesToCompact();if (excess > 0) {//如果isMajorCompaction为true并且是用户合并则不过滤if (isMajorCompaction && isUserCompaction) {LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact()+ " files because of a user-requested major compaction");} else {//过滤掉多余最大合并文件数量的文件LOG.debug("Too many admissible files. Excluding " + excess+ " files from compaction candidates");candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();}}}

setIsMajor方法

 public void setIsMajor(boolean isMajor, boolean isAllFiles) {//如果不是全部文件,并且是major压缩,抛异常,也就是说如果有正在Compaction的文件,就不能执行MajorCompactionassert isAllFiles || !isMajor;//不是全部文件:则为Minor压缩//是全部文件:并且是isTryingMajor为true,则为MAJOR,否则则为ALL_FILESthis.isMajor = !isAllFiles ? DisplayCompactionType.MINOR: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);}

该方法主要流程:

1.如果不是全部文件,并且是major压缩,抛异常;也就是说如果有正在Compaction的文件,就不能执行MajorCompaction

2. 不是全部文件:则为Minor压缩
    是全部文件:并且是isTryingMajor为true,则为MAJOR,否则则为ALL_FILES

DisplayCompactionType 枚举有三个值 MINOR, ALL_FILES, MAJOR

但是判断是否为Major的条件只有

LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
+ (request.isAllFiles() ? " (all files)" : ""));
public boolean isMajor() {
return this.isMajor == DisplayCompactionType.MAJOR;
}

我们从代码中可以看出当类型为ALL_FILES或者MINOR都是MinorCompcation

涉及到的配置参数

说明默认值
hbase.hregion.majorcompaction

在一个区域中所有 HStoreFiles Major 压缩之间的时间(以毫秒为单位)。要禁用自动的Major压缩,请将此值设置为 0。

7天
hbase.hregion.majorcompaction.jitter

抖动外边界以进行最大化压缩。在每个 RegionServer 上,hbase.region.majorcompaction 间隔与此最大边界内的随机分数相乘。在即将运行下一个最大化压缩时加入该 + 或 - 乘积。最大化压缩不应同时发生在各 RegionServer 上。该数越小,压缩越紧密。

所以 major compact的时间间隔 = [7-7*0.5,7+7.0.5]

0.5
hbase.server.thread.wakefrequency搜索工作时暂停的时间段(以毫秒为单位)。服务线程如 META 扫描仪、日志滚轮、Major Compcation 线程使用的睡眠间隔。10秒
hbase.server.compactchecker.interval.multiplier

hbase后台线程检查因子,hbase.server.compactchecker.interval.multiplier*

hbase.server.thread.wakefrequency 就是Compaction Major 检查的周期,比如1000*10秒≈2.77小时

1000
hbase.hstore.compaction.ratio这个ratio参数的作用是判断文件大小 > hbase.hstore.compaction.min.size的StoreFile是否也是适合进行minor compaction的,默认值1.2。更大的值将压缩产生更大的StoreFile,建议取值范围在1.0~1.4之间。大多数场景下也不建议调整该参数。1.2
hbase.hstore.compaction.ratio.offpeak此参数与compaction ratio参数含义相同,是在原有文件选择策略基础上增加了一个非高峰期的ratio控制,默认值5.0。这个参数受另外两个参数 hbase.offpeak.start.hour 与 hbase.offpeak.end.hour 控制,这两个参数值为[0, 23]的整数,用于定义非高峰期时间段,默认值均为-1表示禁用非高峰期ratio设置。5

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509780.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

目录 CompactSplitThread requestCompactionInternal方法 selectCompaction方法 requestCompaction方法 其他相关文章 Hbase Compaction 源码分析 - CompactionChecker Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略 Hbase Compaction 源码分析 - CompactS…

Hbase Compaction 队列数量较大分析

目录 问题 问题原因分析 总结建议 问题 前几天朋友公司Hbase集群出现Compaction队列持续处于比较大的情况&#xff0c;并且mem flush队列也比较大&#xff0c;一起看了下问题&#xff0c;大概情况如下图 从图中可以看出来压缩队列总和持续在1000-2000&#xff0c;平对压缩队列…

Hbase 2.x Region in transition (永久RIT) 异常解决

环境 Hbase 版本&#xff1a;2.0 问题原因 hbase长时间出现RIT&#xff0c;并且发生RIT的Region是已经删除了的Hbase表&#xff0c;表未删除的情况下执行assgin可以消除该问题 Hbase Region in transition (RIT) 异常解决&#xff1a;https://datamining.blog.csdn.net/artic…

sigslot库源码分析

言归正传&#xff0c;sigslot是一个用标准C语法实现的信号与槽机制的函数库&#xff0c;类型和线程安全。提到信号与槽机制&#xff0c;恐怕最容易想到的就是大名鼎鼎的Qt所支持的对象之间通信的模式吧。不过这里的信号与槽虽然在概念上等价与Qt所实现的信号与槽&#xff0c;但…

Hue开发指南 - 提交 Spark 程序

目录 Hue开发指南 1.Spark文件打包成一个Jar包提交Hue运行 1.1 上传Spark Jar包至HDFS文件系统 1.2.Hue中创建Spark任务 2.多jar包导入执行&#xff08;依赖jar包与主程序jar包分开打包&#xff09; 2.1 修改worksapce 2.2 添加程序依赖jar包 Hue开发指南 Hue是面向 Had…

如何缩小码农和高手的差距

为什么同样的时间有的人可以漂亮的完成工作&#xff0c;而有些人废了很大的力气也没有完成&#xff1f;前者我们常常称之为“大牛”&#xff0c;后者我们常常叫他们“菜鸟”。当然“大牛”都是相对而言的&#xff0c;“大牛”也不可能方方面面都非常厉害&#xff0c;换句话说大…

OpenResty 安装,收集日志保存到文本文件

目录 安装 1.安装相关类库 2.安装编译openresty 3.编写配置启动openresty服务 4.通过 openresty 保存日志数据到系统 安装 1.安装相关类库 yum install -y readline-devel pcre-devel openssl-devel gcc 2.安装编译openresty wget https://openresty.org/download/open…

Hadoop Yarn REST API未授权漏洞利用挖矿分析

目录 一、背景情况 二、 漏洞说明 攻击步骤&#xff1a; 三、入侵分析 四、安全建议 清理病毒 安全加固 五、IOCs 一、背景情况 5月5日腾讯云安全曾针对攻击者利用Hadoop Yarn资源管理系统REST API未授权漏洞对服务器进行攻击&#xff0c;攻击者可以在未授权的情况…

Linux shell编程学习总结

主要内容&#xff1a; shell编程sed命令awk命令crontab定时器 什么是Shell&#xff1f; Shell是用户与内核进行交互操作的一种接口&#xff0c;目前最流行的Shell称为bash Shell Shell也是一门编程语言<解释型的编程语言>&#xff0c;即shell脚本 一个系统可以存在多…

Flink ProcessFunction 介绍使用

目录 实现功能 代码 测试 问题 官网描述&#xff1a;https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html The ProcessFunction is a low-level stream processing operation, giving access to the basic build…

Flink keyby 数据倾斜问题处理

上一篇我们使用keyby后发现数据严重倾斜 https://datamining.blog.csdn.net/article/details/105316728 大概看下问题所在&#xff0c;大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题 之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计&a…

linux中iptables对防火墙的操作

Iptables教程 1. iptables防火墙简介 Iptables也叫netfilter是Linux下自带的一款免费且优秀的基于包过滤的防火墙工具&#xff0c;它的功能十分强大&#xff0c;使用非常灵活&#xff0c;可以对流入、流出、流经服务器的数据包进行精细的控制。iptables是Linux2.4及2.6内核中…

Web Components入门不完全指北

目前流行的各类前端框架&#xff0c;不管是react, angular还是vue&#xff0c;都有一个共同点&#xff0c;那就是支持组件化开发&#xff0c;但事实上随着浏览器的发展&#xff0c;现在浏览器也原生支持组件式开发&#xff0c;本文将通过介绍Web Components 的三个主要概念&…

Flink 1.9 CDH 6.3 集成

目录 1.下载准备文件 2.felink csa jar包准备 3.将 Flink Parcel放入httpd目录下 4.配置CDH Flink Parcel 5.安装Flink 1.下载准备文件 https://archive.cloudera.com/csa/1.0.0.0/csd/FLINK-1.9.0-csa1.0.0.0-cdh6.3.0.jarhttps://archive.cloudera.com/csa/1.0.0.0/parc…

ssh免密登陆机制示意图

ssh免密登陆机制示意图

CDH 6.x 安装 Phoenix 服务

最近有个新项目启动&#xff0c;版本升级到6.3&#xff0c;发现CDH6.2 版本已经支持Phoenix parcel安装 一、准备文件 下载 https://archive.cloudera.com/phoenix/6.2.0/csd/PHOENIX-1.0.jar 下载parcel #目录 https://archive.cloudera.com/phoenix/6.2.0/parcels/ #根据…

域名服务的工作流程

域名服务的工作流程

Kafka 消费者组 Rebalance 详解

Rebalance作用 Rebalance 本质上是一种协议&#xff0c;主要作用是为了保证消费者组&#xff08;Consumer Group&#xff09;下的所有消费者&#xff08;Consumer&#xff09;消费的主体分区达成均衡。 比如&#xff1a;我们有10个分区&#xff0c;当我们有一个消费者时&…

Kafka JMX 监控 之 jmxtrans + influxdb + grafana

目录 效果图 环境准备 安装 influxdb 安装我们刚刚下载 influxdb rpm文件 查看默认配置 修改参数 启动 influxdb 查看启动状态 设置基本配置 influxdb 其他命令扩展 安装 jmxtrans 可能遇到的异常 验证jmxtrans是否成功运行 安装 Grafana 安装 influxDB 与 Grafa…