Hbase 预写日志WAL处理源码分析之 LogCleaner

目录

Hlog  WALs和oldWALs 

整体流程

HMaster 初始化

定时执行 

LogCleaner 日志清理类

ReplicationLogCleaner 日志清理类

总结

Hlog  WALs和oldWALs 

这里先介绍一下Hlog失效和Hlog删除的规则

HLog失效:写入数据一旦从MemStore中刷新到磁盘,HLog(默认存储目录在/hbase/WALs下)就会自动把数据移动到 /hbase/oldWALs 目录下,此时并不会删除

Hlog删除:Master启动时会启动一个线程,定期去检查oldWALs目录下的可删除文件进行删除,定期检查时间为 hbase.master.cleaner.interval ,默认是1分钟 ,删除条件有两个:

        1.Hlog文件在参与主从复制,否的话删除,是的话不删除

        2.Hlog文件是否在目录中存在 hbase.master.logcleaner.ttl 时间,如果是则删除

整体流程

pos 格式流程图下载地址:

链接:https://pan.baidu.com/s/1szhpVn7RyegE0yqQedACIA 
提取码:ig9x

这里只介绍与wal相关的流程,一下介绍的代码都在上图中标记类名,方法名,以及说明,可以直接从源码中查看

HMaster 初始化

HMaster启动初始化 ,HMaster构造方法调用  startActiveMasterManager 方法

startActiveMasterManager 方法 调用  finishActiveMasterInitialization(status); 方法

在 finishActiveMasterInitialization 方法中会启动所有服务线程,代码段如下

// start up all service threads.
status.setStatus("Initializing master service threads");
startServiceThreads();

 startServiceThreads 方法代码如下,

 /** Start up all services. If any of these threads gets an unhandled exception* then they just die with a logged message.  This should be fine because* in general, we do not expect the master to get such unhandled exceptions*  as OOMEs; it should be lightly loaded. See what HRegionServer does if*  need to install an unexpected exception handler.*/private void startServiceThreads() throws IOException{// Start the executor service poolsthis.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,conf.getInt("hbase.master.executor.openregion.threads", 5));this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,conf.getInt("hbase.master.executor.closeregion.threads", 5));this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,conf.getInt("hbase.master.executor.serverops.threads", 5));this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,conf.getInt("hbase.master.executor.serverops.threads", 5));this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,conf.getInt("hbase.master.executor.logreplayops.threads", 10));// We depend on there being only one instance of this executor running// at a time.  To do concurrency, would need fencing of enable/disable of// tables.// Any time changing this maxThreads to > 1, pls see the comment at// AccessController#postCreateTableHandlerthis.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);startProcedureExecutor();// Initial cleaner choreCleanerChore.initChorePool(conf);// Start log cleaner thread//获取定时日志清理时间,从系统配置获取,默认为10分钟int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);this.logCleaner =new LogCleaner(cleanerInterval,this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf),getMasterFileSystem().getOldLogDir());//将任务加入定时执行,时间间隔为 cleanerInterval ,该值在LogCleaner中已经设置为定时执行间隔getChoreService().scheduleChore(logCleaner);//start the hfile archive cleaner threadPath archiveDir = HFileArchiveUtil.getArchivePath(conf);Map<String, Object> params = new HashMap<String, Object>();params.put(MASTER, this);this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), archiveDir, params);getChoreService().scheduleChore(hfileCleaner);serviceStarted = true;if (LOG.isTraceEnabled()) {LOG.trace("Started service threads");}if (!conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {try {replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,cleanerInterval, this.getZooKeeper(), this.conf);getChoreService().scheduleChore(replicationZKLockCleanerChore);} catch (Exception e) {LOG.error("start replicationZKLockCleanerChore failed", e);}}try {replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));getChoreService().scheduleChore(replicationZKNodeCleanerChore);} catch (Exception e) {LOG.error("start replicationZKNodeCleanerChore failed", e);}}

定时执行 

其中这段代码是对我们HLog进行处理,并加入调度定时执行

 // Initial cleaner choreCleanerChore.initChorePool(conf);// Start log cleaner thread//获取定时日志清理时间,从系统配置获取,默认为10分钟int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);this.logCleaner =new LogCleaner(cleanerInterval,this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf),getMasterFileSystem().getOldLogDir());//将任务加入定时执行,时间间隔为 cleanerInterval ,该值在LogCleaner中已经设置为定时执行间隔getChoreService().scheduleChore(logCleaner);

  加入调度后会周期性执行 LogCleaner.chore() 方法(在父类CleanerChore中)

 @Overrideprotected void chore() {if (getEnabled()) {try {POOL.latchCountUp();if (runCleaner()) {if (LOG.isTraceEnabled()) {LOG.trace("Cleaned all WALs under " + oldFileDir);}} else {if (LOG.isTraceEnabled()) {LOG.trace("WALs outstanding under " + oldFileDir);}}} finally {POOL.latchCountDown();}// After each cleaner chore, checks if received reconfigure notification while cleaning.// First in cleaner turns off notification, to avoid another cleaner updating pool again.if (POOL.reconfigNotification.compareAndSet(true, false)) {// This cleaner is waiting for other cleaners finishing their jobs.// To avoid missing next chore, only wait 0.8 * period, then shutdown.POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));}} else {LOG.trace("Cleaner chore disabled! Not cleaning.");}}

上面代码中的runCleaner()方法就是将我们CleanerTask加入任务队列中

  public Boolean runCleaner() {CleanerTask task = new CleanerTask(this.oldFileDir, true);POOL.submit(task);return task.join();}

LogCleaner 日志清理类

 LogCleaner类是清理日志数据,LogCleaner 父类 CleanerChore 类中的 私有类CleanerTask(该类继承RecursiveTask类,不做过多介绍,想了解的可以百度 ForkJoinTask ), 的 compute()方法是定时清理的关键,这里获取了所有oldWALs目录下的文件,并进行选择性删除

@Overrideprotected Boolean compute() {LOG.trace("Cleaning under " + dir);List<FileStatus> subDirs;List<FileStatus> tmpFiles;final List<FileStatus> files;try {// if dir doesn't exist, we'll get null back for both of these// which will fall through to succeeding.subDirs = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {@Overridepublic boolean accept(FileStatus f) {return f.isDirectory();}});if (subDirs == null) {subDirs = Collections.emptyList();}//获取oldWALs目录下文件tmpFiles = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {@Overridepublic boolean accept(FileStatus f) {return f.isFile();}});files = tmpFiles == null ? Collections.<FileStatus>emptyList() : tmpFiles;} catch (IOException ioe) {LOG.warn("failed to get FileStatus for contents of '" + dir + "'", ioe);return false;}boolean allFilesDeleted = true;if (!files.isEmpty()) {allFilesDeleted = deleteAction(new Action<Boolean>() {@Overridepublic Boolean act() throws IOException {//files 是oldWALs目录下所有文件return checkAndDeleteFiles(files);}}, "files");}boolean allSubdirsDeleted = true;if (!subDirs.isEmpty()) {final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());for (FileStatus subdir : subDirs) {CleanerTask task = new CleanerTask(subdir, false);tasks.add(task);//任务task.fork();}allSubdirsDeleted = deleteAction(new Action<Boolean>() {@Overridepublic Boolean act() throws IOException {return getCleanResult(tasks);}}, "subdirs");}boolean result = allFilesDeleted && allSubdirsDeleted;// if and only if files and subdirs under current dir are deleted successfully, and// it is not the root dir, then task will try to delete it.if (result && !root) {result &= deleteAction(new Action<Boolean>() {@Overridepublic Boolean act() throws IOException {return fs.delete(dir, false);}}, "dir");}return result;}

 

 上一步中调用了 checkAndDeleteFiles(files) 方法,该方法的作用是:通过每个清理程序运行给定的文件,以查看是否应删除该文件,并在必要时将其删除。输入参数是所有oldWALs目录下的文件

 /*** Run the given files through each of the cleaners to see if it should be deleted, deleting it if* necessary.* 通过每个清理程序运行给定的文件,以查看是否应删除该文件,并在必要时将其删除。* @param files List of FileStatus for the files to check (and possibly delete)* @return true iff successfully deleted all files*/private boolean checkAndDeleteFiles(List<FileStatus> files) {if (files == null) {return true;}// first check to see if the path is validList<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());List<FileStatus> invalidFiles = Lists.newArrayList();for (FileStatus file : files) {if (validate(file.getPath())) {validFiles.add(file);} else {LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");invalidFiles.add(file);}}Iterable<FileStatus> deletableValidFiles = validFiles;// check each of the cleaners for the valid filesfor (T cleaner : cleanersChain) {if (cleaner.isStopped() || getStopper().isStopped()) {LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"+ this.oldFileDir);return false;}Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);// trace which cleaner is holding on to each fileif (LOG.isTraceEnabled()) {ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);for (FileStatus file : deletableValidFiles) {if (!filteredFileSet.contains(file)) {LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);}}}deletableValidFiles = filteredFiles;}Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);return deleteFiles(filesToDelete) == files.size();}

ReplicationLogCleaner 日志清理类

checkAndDeleteFiles方法中 又调用了 cleaner.getDeletableFiles(deletableValidFiles) ,getDeletableFiles方法在ReplicationLogCleaner类下,是判断哪些文件该删除,哪些不该删除,删除条件就是文章开头提出的是否在参与复制中,如果在参与则不删除,不在则删除。

注:所有在参与peer的数据都在 zookeeper 中 /hbase/replication/rs 目录下存储

比如在zookeeper目录下有这么个节点

 /hbase/replication/rs/jast.zh,16020,1576397142865/Indexer_account_indexer_prd/jast.zh%2C16020%2C1576397142865.jast.zh%2C16020%2C1576397142865.regiongroup-0.1579283025645

 那么我们再oldWALs目录下是不会删除掉这个数据的

[jast@jast002 ~]$ hdfs dfs -du -h /hbase/oldWALs/jast015.zh%2C16020%2C1576397142865.jast015.zh%2C16020%2C1576397142865.regiongroup-0.1579283025645
256.0 M  512.0 M  /hbase/oldWALs/jast015.zh%2C16020%2C1576397142865.jast015.zh%2C16020%2C1576397142865.regiongroup-0.1579283025645
 @Overridepublic Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {// all members of this class are null if replication is disabled,// so we cannot filter the filesif (this.getConf() == null) {return files;}final Set<String> wals;try {// The concurrently created new WALs may not be included in the return list,// but they won't be deleted because they're not in the checking set.wals = loadWALsFromQueues();} catch (KeeperException e) {LOG.warn("Failed to read zookeeper, skipping checking deletable files");return Collections.emptyList();}return Iterables.filter(files, new Predicate<FileStatus>() {@Overridepublic boolean apply(FileStatus file) {String wal = file.getPath().getName();//包含文件则保留,不包含则删除boolean logInReplicationQueue = wals.contains(wal);if (LOG.isDebugEnabled()) {if (logInReplicationQueue) {//包含文件保留LOG.debug("Found log in ZK, keeping: " + wal);} else {//不包含删除LOG.debug("Didn't find this log in ZK, deleting: " + wal);}}return !logInReplicationQueue;}});}

上一步调用了 loadWALsFromQueues 方法,该方法作用是:获取所有在复制队列中的wals文件,并返回,

/*** Load all wals in all replication queues from ZK. This method guarantees to return a* snapshot which contains all WALs in the zookeeper at the start of this call even there* is concurrent queue failover. However, some newly created WALs during the call may* not be included.** 从ZK加载所有复制队列中的所有wals。 即使存在并发队列故障转移,* 此方法也保证在此调用开始时返回包含zookeeper中所有WAL的快照。* 但是,可能不会包括通话过程中一些新创建的WAL。*/private Set<String> loadWALsFromQueues() throws KeeperException {for (int retry = 0; ; retry++) {int v0 = replicationQueues.getQueuesZNodeCversion();List<String> rss = replicationQueues.getListOfReplicators();if (rss == null || rss.isEmpty()) {LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");return ImmutableSet.of();}Set<String> wals = Sets.newHashSet();for (String rs : rss) {//加载zookeeper下,/hbase/replication/rs 目录下所有数据List<String> listOfPeers = replicationQueues.getAllQueues(rs);// if rs just died, this will be nullif (listOfPeers == null) {continue;}//加载所有目录for (String id : listOfPeers) {List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);if (peersWals != null) {wals.addAll(peersWals);}}}int v1 = replicationQueues.getQueuesZNodeCversion();if (v0 == v1) {return wals;}LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",v0, v1, retry));}}

总结

至此我们可以发现,删除的过程就是定期执行删除文件线程,从oldWALs获取所有文件,如果在peer复制队列中则不进行副本删除,否则则删除

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509800.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Dubbo 2.7.x admin 控制台管理提示 : 无元数据信息,请升级至Dubbo2.7及以上版本

版本&#xff1a;2.7.3 安装完Dubbo admin 运行后&#xff0c;查询服务提示 无元数据信息&#xff0c;请升级至Dubbo2.7及以上版本&#xff0c;或者查看application.properties中关于config center的配置&#xff0c;详见 这里 这里描述着处理方法&#xff1a;https://github…

Unity在运行时(代码中)设置材质的渲染模式(RenderingMode)

在Unity中&#xff0c;有些少数情况下我们需要用代码来创建材质。比如说在材质非常多&#xff0c;而仅仅是纹理不一样的时候。 而用代码创建的材质是没有对应的资源文件的&#xff0c;我们也就无法使用Inspector来设置它的Rendering Mode。 关于Rendering Mode&#xff0c;许我…

java中String与new String的区别

String类&#xff1a;表示不可改变的字符串&#xff0c;当前对象创建完毕之后&#xff0c;该对象的内容&#xff08;字符序列&#xff09;是不能改变的&#xff0c;一旦内容改变就是一个新的对象。 String对象的创建&#xff1a; 1&#xff09;&#xff1a;直接赋一个字面量&a…

Yarn 监控 - 监控任务运行状态 (包括Spark,MR 所有在Yarn中运行的任务)

目录 Maven pom引用 配置文件 代码 平时开发中可以在yarn的web页面查看应用程序运行状态&#xff0c;如下图 下面代码实现了&#xff0c;代码监控Yarn运行程序&#xff0c;可以对部分任务进行实时监控 Maven pom引用 这里Demo使用的hadoop版本是 3.0.0 <dependency>…

HugeGraph 图数据库索引介绍 - 范围索引,全文索引

目录 HugeGraph 索引介绍 二级索引 组合索引 范围索引 全文索引 HugeGraph 索引介绍 二级索引 创建schema和添加数据 schema.propertyKey("name").asText().ifNotExist().create();schema.propertyKey("uid").asLong().ifNotExist().create();schem…

Hbase JMX 监控 - Region

获取Region监控信息页面&#xff1a; http://regionServerName:16030/jmx?qryHadoop:serviceHBase,nameRegionServer,subRegions 获得数据如下 参数代表含义 *** 为前缀代表&#xff1a;Namespace_${namespace}_table_${tableName}_region_${regionName} ***_metric_storeCo…

c++ 之类的前置声明

转自&#xff1a;http://blog.csdn.net/fjb2080/archive/2010/04/27/5533514.aspx 作者&#xff1a;清林&#xff0c;博客名&#xff1a;飞空静渡 刚开始学习c的人都会遇到这样的问题&#xff1a; 定义一个类 class A&#xff0c;这个类里面使用了类B的对象b&#xff0c;然后定…

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

目录 类的关系图 ​ RatioBasedCompactionPolicy selectCompaction 方法 getCurrentEligibleFiles方法 skipLargeFiles方法 createCompactionRequest方法 filterBulk方法 applyCompactionPolicy方法 removeExcessFiles方法 setIsMajor方法 其他相关文章 Hbase Compa…

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

目录 CompactSplitThread requestCompactionInternal方法 selectCompaction方法 requestCompaction方法 其他相关文章 Hbase Compaction 源码分析 - CompactionChecker Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略 Hbase Compaction 源码分析 - CompactS…

Hbase Compaction 队列数量较大分析

目录 问题 问题原因分析 总结建议 问题 前几天朋友公司Hbase集群出现Compaction队列持续处于比较大的情况&#xff0c;并且mem flush队列也比较大&#xff0c;一起看了下问题&#xff0c;大概情况如下图 从图中可以看出来压缩队列总和持续在1000-2000&#xff0c;平对压缩队列…

Hbase 2.x Region in transition (永久RIT) 异常解决

环境 Hbase 版本&#xff1a;2.0 问题原因 hbase长时间出现RIT&#xff0c;并且发生RIT的Region是已经删除了的Hbase表&#xff0c;表未删除的情况下执行assgin可以消除该问题 Hbase Region in transition (RIT) 异常解决&#xff1a;https://datamining.blog.csdn.net/artic…

sigslot库源码分析

言归正传&#xff0c;sigslot是一个用标准C语法实现的信号与槽机制的函数库&#xff0c;类型和线程安全。提到信号与槽机制&#xff0c;恐怕最容易想到的就是大名鼎鼎的Qt所支持的对象之间通信的模式吧。不过这里的信号与槽虽然在概念上等价与Qt所实现的信号与槽&#xff0c;但…

Hue开发指南 - 提交 Spark 程序

目录 Hue开发指南 1.Spark文件打包成一个Jar包提交Hue运行 1.1 上传Spark Jar包至HDFS文件系统 1.2.Hue中创建Spark任务 2.多jar包导入执行&#xff08;依赖jar包与主程序jar包分开打包&#xff09; 2.1 修改worksapce 2.2 添加程序依赖jar包 Hue开发指南 Hue是面向 Had…

如何缩小码农和高手的差距

为什么同样的时间有的人可以漂亮的完成工作&#xff0c;而有些人废了很大的力气也没有完成&#xff1f;前者我们常常称之为“大牛”&#xff0c;后者我们常常叫他们“菜鸟”。当然“大牛”都是相对而言的&#xff0c;“大牛”也不可能方方面面都非常厉害&#xff0c;换句话说大…

OpenResty 安装,收集日志保存到文本文件

目录 安装 1.安装相关类库 2.安装编译openresty 3.编写配置启动openresty服务 4.通过 openresty 保存日志数据到系统 安装 1.安装相关类库 yum install -y readline-devel pcre-devel openssl-devel gcc 2.安装编译openresty wget https://openresty.org/download/open…

Hadoop Yarn REST API未授权漏洞利用挖矿分析

目录 一、背景情况 二、 漏洞说明 攻击步骤&#xff1a; 三、入侵分析 四、安全建议 清理病毒 安全加固 五、IOCs 一、背景情况 5月5日腾讯云安全曾针对攻击者利用Hadoop Yarn资源管理系统REST API未授权漏洞对服务器进行攻击&#xff0c;攻击者可以在未授权的情况…

Linux shell编程学习总结

主要内容&#xff1a; shell编程sed命令awk命令crontab定时器 什么是Shell&#xff1f; Shell是用户与内核进行交互操作的一种接口&#xff0c;目前最流行的Shell称为bash Shell Shell也是一门编程语言<解释型的编程语言>&#xff0c;即shell脚本 一个系统可以存在多…

Flink ProcessFunction 介绍使用

目录 实现功能 代码 测试 问题 官网描述&#xff1a;https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html The ProcessFunction is a low-level stream processing operation, giving access to the basic build…

Flink keyby 数据倾斜问题处理

上一篇我们使用keyby后发现数据严重倾斜 https://datamining.blog.csdn.net/article/details/105316728 大概看下问题所在&#xff0c;大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题 之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计&a…