Apache Hudi初探(五)(与flink的结合)--Flink 中hudi clean操作

背景

本文主要是具体说说Flink中的clean操作的实现

杂说闲谈

在flink中主要是CleanFunction函数:

  @Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();String instantTime = HoodieActiveTimeline.createNewInstantTime();LOG.info(String.format("exec clean with instant time %s...", instantTime));executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");}@Overridepublic void notifyCheckpointComplete(long l) throws Exception {if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {executor.execute(() -> {try {this.writeClient.waitForCleaningFinish();} finally {// ensure to switch the isCleaning flagthis.isCleaning = false;}}, "wait for cleaning finish");}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {try {this.writeClient.startAsyncCleaning();this.isCleaning = true;} catch (Throwable throwable) {// catch the exception to not affect the normal checkpointingLOG.warn("Error while start async cleaning", throwable);}}}
  • open函数

    • writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext())
      创建FlinkWriteClient,用于写hudi数据

    • this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
      创建一个只有一个线程的线程池,改线程池的主要作用来异步执行hudi写操作

    • executor.execute(() -> writeClient.clean(instantTime)
      异步执行hudi的清理操作,该clean函数的主要代码如下:

         if (!tableServicesEnabled(config)) {return null;}final Timer.Context timerContext = metrics.getCleanCtx();CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));HoodieTable table = createTable(config, hadoopConf);if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {LOG.info("Cleaner started");// proceed only if multiple clean schedules are enabled or if there are no pending cleans.if (scheduleInline) {scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);table.getMetaClient().reloadActiveTimeline();}}// Proceeds to execute any requested or inflight clean instances in the timelineHoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);if (timerContext != null && metadata != null) {long durationMs = metrics.getDurationInMs(timerContext.stop());metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()+ " cleanerElapsedMs" + durationMs);}return metadata;
      
      • CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION,() -> rollbackFailedWrites *
        根据配置
        hoodie.cleaner.policy.failed.writes* 默认是EAGER,也就是在写数据失败的时候,会立即进行这次写失败的数据的清理,在这种情况下,
        就不会执行rollbackFailedWrites操作,也就是回滚写失败文件的操作

      • HoodieTable table = createTable *
        创建
        HoodieFlinkMergeOnReadTable*类型的hudi表,用来做clean等操作

      • scheduleTableServiceInternal
        如果hoodie.clean.allow.multiple为true(默认为true)或者没有正在运行中clean操作,则会生成Clean计划
        这里最终调用的是FlinkWriteClient.scheduleCleaning方法,即CleanPlanActionExecutor.execute方法

        这里最重要的就是requestClean方法:

        CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
        Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
        List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant)    
        int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
        Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism).stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue))    
        Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())))    
        List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey).collect(Collectors.toList())    
        return new HoodieCleanerPlan(earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),planner.getLastCompletedCommitTimestamp(),config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete)    
        
        • planner.getEarliestCommitToRetain();
          根据保留策略,获取到最早需要保留的commit的HoodieInstant,在这里会兼顾考虑到hoodie.cleaner.commits.retained(默认是10)以及hoodie.cleaner.hours.retained默认是24小时以及hoodie.cleaner.policy策略(默认是KEEP_LATEST_COMMITS)
        • planner.getPartitionPathsToClean(earliestInstant);
          根据保留的最新commit的HoodieInstant,得到要删除的分区,这里会根据配置hoodie.cleaner.incremental.mode(默认是true)来进行增量清理,
          这个时候就会根据上一次已经clean的信息,只需要删除差量的分区数据就行
        • cleanOpsWithPartitionMeta = context
          根据上面得到的需要删除的分区信息,获取需要删除的文件信息,具体的实现可以参考CleanPlanner.getFilesToCleanKeepingLatestCommits
          这里的操作主要是先通过fileSystemView获取分区下所有的FileGroup,之后再获取每个FileGroup下的所有的FileSlice(这里的FileSlice就有版本的概念,也就是commit的版本),之后再与最新保留的commit的时间戳进行比较得到需要删除的文件信息
        • new HoodieCleanerPlan
          最后组装成HoodieCleanPlan的计划,并且在外层调用table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); 方法把clean request的状态存储到对应的.hoodie目录下,并建立一个xxxx.clean.requested的元数据文件
      • table.getMetaClient().reloadActiveTimeline()
        重新加载timeline,便于过滤出来刚才scheduleTableServiceInternal操作生成的xxxxxxxxxxxxxx.clean.requested的元数据文件

      • table.clean(context, cleanInstantTime, skipLocking)
        真正执行clean的部分,主要是调用CleanActionExecutor.execute的方法,最终调用的是*runPendingClean(table, hoodieInstant)*方法:

         HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);return runClean(table, cleanInstant, cleanerPlan);
        

        首先是反序列化CleanPlan,然后在进行清理,主要是删除1. 如果没有满足的分区,直接删除该分区,2. 否则删除该分区下的满足条件的文件,最后返回HoodieCleanStat包含删除的文件信息等。

  • snapshotState方法

    • 如果clean.async.enabled是true(默认是true),并且不是正在进行clean动作,则会进行异步清理
      this.writeClient.startAsyncCleaning(); 这里最终也是调用的writeClient.clean方法。
    • this.isCleaning = true;
      设置标志位,用来保证clean操作的有序性
  • notifyCheckpointComplete方法

    • 如果clean.async.enabled是true(默认是true),并且正在进行clean动作,则等待clean操作完成,
      并且设置清理标识位,用来和snapshotState方法进行呼应以保证clean操作的有序性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/92670.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python实用小代码

一、python实现31省市名称替换 首先定义了一个字典province_dict&#xff0c;其中包含每个省市的无缩写名称与其标准名称之间的映射。然后&#xff0c;我们使用map()函数将每个省市名称从无缩写名称转换为标准名称&#xff0c;并将结果存储在新列省市标准名称中。 province_di…

安卓玩机-----反编译apk 修改apk 去广告 去弹窗等操作中的一些常识

安卓机型app的编译与反编译 apk文件的简单说明与解析 -安卓修改apk apk的组成和编译 一 电脑端几种反编译apk工具操作步骤解析 前面几个博文有说明关于反编译apk和apk架构等有些常识.今天对以上做个补充。初学者记住一点。对于一个apk文件使用压缩软件7zip打开可以查看到文件…

LeetCode 2582. 递枕头

原题链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 题目描述 n 个人站成一排&#xff0c;按从 1 到 n 编号。 最初&#xff0c;排在队首的第一个人拿着一个枕头。每秒钟&#xff0c;拿着枕头的人会将枕头传递给队伍中的下一个人。一…

【JavaEE基础学习打卡08】JSP之初次认识say hello!

目录 前言一、JSP技术初识1.动态页面2.JSP是什么3.JSP特点有哪些 二、JSP运行环境配置1.JDK安装2.Tomcat安装 三、编写JSP1.我的第一个JSP2.JSP执行过程3.在IDEA中开发JSP 总结 前言 &#x1f4dc; 本系列教程适用于JavaWeb初学者、爱好者&#xff0c;小白白。我们的天赋并不高…

力扣-338.比特位计数

Idea 直接暴力做法&#xff1a;计算从0到n&#xff0c;每一位数的二进制中1的个数&#xff0c;遍历其二进制的每一位即可得到1的个数 AC Code class Solution { public:vector<int> countBits(int n) {vector<int> ans;ans.emplace_back(0);for(int i 1; i < …

【Docker】docker拉取镜像错误 missing signature key

问题 当我使用docker拉取一个特定的镜像时&#xff0c;提示错误&#xff1a; 错误 missing signature key 但是拉取其他镜像又可以访问&#xff0c;&#xff0c;&#xff0c;&#xff0c;于是&#xff0c;我怀疑是否是docker版本问题。 docker --version结果确实&#xff0…

ESP32设备通信-两个ESP32间UART通信

两个ESP32间UART通信 文章目录 两个ESP32间UART通信1、UART介绍2、软件准备3、硬件准备4、代码实现在本文中,我们将使用 Arduino IDE 的 UART 硬件库在两个 ESP32 板之间执行 UART 或串行通信。 要使用 USB 端口调试和编程 ESP32,需要使用称为通用异步接收器/发送器 (UART) 通…

Java发送httpPost,httpGet,httpDelete请求

业务场景&#xff1a;最近开发的项目使用的将文件图片和视频存储在Minio当中&#xff0c;但是准备上线申请资源的时候申请不到资源&#xff0c;就临时决定使用厂商提供的Api接口进行文件图片和视频上传&#xff0c;然后就需要在后端进行登录&#xff0c;文件校验和文件存储和文…

源码编译postgresql

没什么好研究的了&#xff0c;就试试编译Postgresql源码&#xff0c;按照网站查的资料一步步测试的&#xff0c;方便后期定制数据库时候用&#xff0c;也算是开源的大优势了&#xff0c;只要你愿意折腾&#xff0c;可以自己定制或改进一个数据库来满足特殊业务。后面研究一下他…

搭建智能桥梁,Amazon CodeWhisperer助您轻松编程

零&#xff1a;前言 随着时间的推移&#xff0c;人工智能技术以惊人的速度向前发展&#xff0c;正掀起着全新的编程范式革命。不仅仅局限于代码生成&#xff0c;智能编程助手等创新应用也进一步提升了开发效率和代码质量&#xff0c;极大地推动着软件开发领域的快速繁荣。 当前…

Guitar Pro 8 .1全新功能介绍及2023官方特惠优惠券

《中国好声音》节目诞生10年多热度不减&#xff0c;每一季都有籍籍无名的学员成为万众瞩目的新星。怎么像他们一样把爱好变成事业&#xff1f;带着这个问题在不断的探寻中找到了答案&#xff0c;那就是要在有限的时间里比别人做效率更高的事。所谓“工欲善其事&#xff0c;必先…

制作原创音乐app软件FL Studio21.2中文版

如果你正在录制、编辑或创作新歌曲&#xff0c;你会需要使用 FL Studio 快捷键。FL Studio 可用于录制、编辑和制作&#xff0c;以及专业人士录制和创作歌曲。在 FL Studio 中创建音乐专辑也是一个漫长的过程&#xff0c;可能会变得复杂且需要较长时间。很好的是&#xff0c;学…

电池集成充电解决方案提供商【XCharge Group】获得壳牌风险投资

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;总部位于德国汉堡的电池集成充电解决方案提供商XCharge Group今日宣布已获得了壳牌风险投资公司的投资&#xff0c;这笔交易的金额没有披露。 XCharge Group计划将这笔资金用在三方面&#xff1a;…

UE5 虚幻引擎 详解蓝图通信 必备的知识技能之一!!!

目录 0 引言1 直接蓝图通信1.1 在关卡蓝图中直接拖拽Actor1.2 Get Actor of Class/Get All Actors of Class 2 事件分发器2.1 创建事件分发器2.2 绑定事件分发器2.3 调用事件分发器 3 蓝图接口3.1 使用步骤3.2 为什么要使用蓝图接口 4 蓝图转换 0 引言 问题&#xff1a;为什么需…

【C++】map和set的使用

&#x1f680;write in front&#x1f680; &#x1f4dc;所属专栏&#xff1a; C学习 &#x1f6f0;️博客主页&#xff1a;睿睿的博客主页 &#x1f6f0;️代码仓库&#xff1a;&#x1f389;VS2022_C语言仓库 &#x1f3a1;您的点赞、关注、收藏、评论&#xff0c;是对我最大…

斯坦福数据挖掘教程·第三版》读书笔记(英文版)Chapter 10 Mining Social-Network Graphs

来源&#xff1a;《斯坦福数据挖掘教程第三版》对应的公开英文书和PPT。 Chapter 10 Mining Social-Network Graphs The essential characteristics of a social network are: There is a collection of entities that participate in the network. Typically, these entiti…

JS defineProperty详解

defineProperty Object.defineProperty():方法会在对象上直接定义个新的属性&#xff0c;或者修改现有的属性&#xff0c;并返回此对象 let obj {} //与我们使用 obj.name zhangsna 效果一样 但是用defineProperty定义的属性无法改变 或者删除 Object.defineProperty(obj,n…

MEIS —— 前端部分基本配置

项目基本配置 这篇文章我们随着上一篇文章继续往下叙述&#xff0c;主要是将element和windicss等开发配置进项目中&#xff0c;以及基本的一些页面和组件给他完成。 1. 安装element plus 运行&#xff1a; npm install element-plus --save 这里我们是按需引入(自动)&#x…

flink处理函数--副输出功能

背景 在flink中&#xff0c;如果你想要访问记录的处理时间或者事件时间&#xff0c;注册定时器&#xff0c;或者是将记录输出到多个输出流中&#xff0c;你都需要处理函数的帮助&#xff0c;本文就来通过一个例子来讲解下副输出 副输出 本文还是基于streaming-with-flink这本…

基于YOLOv8的安全帽检测系统

1.Yolov8介绍 Ultralytics YOLOv8是Ultralytics公司开发的YOLO目标检测和图像分割模型的最新版本。YOLOv8是一种尖端的、最先进的&#xff08;SOTA&#xff09;模型&#xff0c;它建立在先前YOLO成功基础上&#xff0c;并引入了新功能和改进&#xff0c;以进一步提升性能和灵活…