Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)

背景

在Apache Hudi初探(一)(与flink的结合)中,我们提到了Pipelines.hoodieStreamWrite 写hudi文件,这个操作真正写hudi是在Pipelines.hoodieStreamWrite方法下的transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory),具体分析一下写入的过程。

分析

对于transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)这个代码片段,我们主要看operatorFactory 这个对象(transform这个操作是Flink框架的操作):

public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {public StreamWriteOperator(Configuration conf) {super(new StreamWriteFunction<>(conf));}public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf));}
}

最主要的hudi算子为StreamWriteOperator,其中最主要的操作是由StreamWriteFunction来完成的:

// StreamWriteFunction@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {this.taskID = getRuntimeContext().getIndexOfThisSubtask();this.metaClient = StreamerUtil.createMetaClient(this.config);this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext());this.writeStatuses = new ArrayList<>();this.writeMetadataState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("write-metadata-state",TypeInformation.of(WriteMetadataEvent.class)));this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());this.currentInstant = lastPendingInstant();if (context.isRestored()) {restoreWriteMetadata();} else {sendBootstrapEvent();}// blocks flushing until the coordinator starts a new instantthis.confirming = true;}@Overridepublic void open(Configuration parameters) throws IOException {this.tracer = new TotalSizeTracer(this.config);initBuffer();initWriteFunction();}@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {if (inputEnded) {return;}snapshotState();// Reload the snapshot state as the current state.reloadWriteMetaState();}@Overridepublic void snapshotState() {// Based on the fact that the coordinator starts the checkpoint first,// it would check the validity.// wait for the buffer data flush out and request a new instantflushRemaining(false);}@Overridepublic void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {bufferRecord((HoodieRecord<?>) value);}
  • initializeState操作,主要是做一些初始化的操作

    • this.taskID = getRuntimeContext().getIndexOfThisSubtask();
      获取当前的task的索引下标,用来向operator coordinator发送event给operator coordinator,之后 StreamWriteOperatorCoordinator(operator coordinator) 进行处理,后续会说到StreamWriteOperatorCoordinator

    • metaClient = StreamerUtil.createMetaClient(this.config)
      writeClient = FlinkWriteClients.createWriteClient
      初始化hudi的元数据客户端(这里是HoodieTableMetaClient)和写入客户端(这里是HoodieFlinkWriteClient)

    • writeStatuses = new ArrayList<>()
      记录后续的写入hudi文件的信息

    • writeMetadataState = context.getOperatorStateStore().getListState
      记录写入hudi的元数据事件,会在后续的操作中,会包装成event发送给operator coordinator(StreamWriteOperatorCoordinator)

    • ckpMetadata = CkpMetadata.getInstance
      Flink的checkpoint的元数据信息路径,默认的路径是/${hoodie.basePath}/.hoodie/.aux/ckp_meta

    • currentInstant = lastPendingInstant()
      获取上次还没有完成的commit

    • restoreWriteMetadata或者sendBootstrapEvent,根据是否是从checkpoint恢复过来的进行不同消息的发送,
      这里的operator coordinator(StreamWriteOperatorCoordinator)会进行统一的处理,并初始化一个commit

  • open操作
    写入hudi前的前置操作,比如说 初始化TotalSizeTracer记录maxBufferSize便于flush操作
    根据write.operation的值(默认是upsert)选择后续的操作是insert或upsert或overwrite,这里是upsert

  • processElement操作
    这里对传入的HoodieRecord进行缓存,主要是bufferRecord做的事情,

    • 首先会获取bucketID,之后再往对应的bucket中插入数据
    • 如果超出write.batch.size(默认是128MB),则会进行flushBucket操作,该操作主要是写入hudi操作 //TODO: 具体的写入hudi操作
      • 首先会获取新的需要提交的commit
      • 再进行写入的实际操作
      • 写入的文件元数据信息回传到operator coordinator进行统一处理
  • snapshotState 操作

    • 调用flushRemaining 写入剩下的数据到hudi存储中
    • 重新加载当前写入的hudi文件元数据信息到当前flink的state中

hudi StreamWriteOperatorCoordinator作用

总的来说,StreamWriteOperatorCoordinator扮演的角色和在Spark中driver的角色一样,都是来最后来提交 元数据信息到huid中。
具体的作用还是得从具体的方法来看:

  @Overridepublic void handleEventFromOperator(int i, OperatorEvent operatorEvent) {ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,"The coordinator can only handle WriteMetaEvent");WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;if (event.isEndInput()) {// handle end input event synchronously// wrap handleEndInputEvent in executeSync to preserve the order of eventsexecutor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant);} else {executor.execute(() -> {if (event.isBootstrap()) {handleBootstrapEvent(event);} else {handleWriteMetaEvent(event);}}, "handle write metadata event for instant %s", this.instant);}}...@Overridepublic void notifyCheckpointComplete(long checkpointId) {executor.execute(() -> {// The executor thread inherits the classloader of the #notifyCheckpointComplete// caller, which is a AppClassLoader.Thread.currentThread().setContextClassLoader(getClass().getClassLoader());// for streaming mode, commits the ever received events anyway,// the stream write task snapshot and flush the data buffer synchronously in sequence,// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)final boolean committed = commitInstant(this.instant, checkpointId);if (tableState.scheduleCompaction) {// if async compaction is on, schedule the compactionCompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);}if (tableState.scheduleClustering) {// if async clustering is on, schedule the clusteringClusteringUtil.scheduleClustering(conf, writeClient, committed);}if (committed) {// start new instant.startInstant();// sync Hive if is enabledsyncHiveAsync();}}, "commits the instant %s", this.instant);}
  • handleEventFromOperator方法用来接受task发送的消息

    • 对于BootStrap类型的WriteMetadataEvent(在StreamWriteFunction方法initializeState中),相当于函数初始化也就会触发
      该类型的消息由handleBootstrapEvent来处理(我们这里假设每个任务operator都完成了初始化的操作),对应的数据流如下:

      initInstant||\/
      reset => startInstant
      

      startInstant 这里就会初始化一个hudi写操作的commit信息

    • 对于一般的write的信息的event,(比如说在processElement的flushBucket函数中),由handleWriteMetaEvent来处理:

       if (this.eventBuffer[event.getTaskID()] != null) {this.eventBuffer[event.getTaskID()].mergeWith(event);} else {this.eventBuffer[event.getTaskID()] = event;}
      

      这里只是加到变量名为eventBuffer 的WriteMetadataEvent类型的数组中,后续中会进行处理

    • 对于isEndInputtrue的event,这种一般source是基于文件的这种,这里先不讨论

  • notifyCheckpointComplete 当对应的checkpointId完成以后,该方法会被调用

    • commitInstant 提交hudi元数据,如果如果有发生异常,则回滚当前hudi对应的commit
    • scheduleCompaction && scheduleClustering 进行hui的CompcationClustering
    • 如果成功的提交了,则会开启一个新的commit,如果开了hive同步(hive_sync.enabled默认为false),则会同步元数据信息到hive

总结

用一张图总结一下交互方式,如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/52549.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI加持,创意设计效率百倍提升,探秘背后的数字化魔法

在当今创新潮流不断涌现的时代&#xff0c;人工智能正以惊人的速度和深度赋能各行各业&#xff0c;食品包装设计界也已来到了一个“拼创意、拼二创和拼审美”的时代。有了AI的加入&#xff0c;设计界正迎来一股AI创意风暴&#xff0c;不仅颠覆了设计流程&#xff0c;更为食品包…

go MongoDB

安装 go get go.mongodb.org/mongo-driver/mongo package mongodbexampleimport ("context""fmt""ginapi/structs""time""go.mongodb.org/mongo-driver/bson""go.mongodb.org/mongo-driver/bson/primitive""…

全流程R语言Meta分析核心技术高阶应用

查看原文>>>全流程R语言Meta分析核心技术高阶应用 目录 专题一、Meta分析的选题与检索 专题二、Meta分析与R语言数据清洗及统计方法 专题三、R语言Meta分析与作图 专题四、R语言Meta回归分析 专题五、R语言Meta诊断分析 专题六、R语言Meta分析的不确定性 专题…

Linux centos7 bash编程小训练

训练要求&#xff1a; 求比一个数小的最大回文数 知识点&#xff1a; 一个数字正读反读都一样&#xff0c;我们称为回文数&#xff0c;如5、11、55、121、222等。 我们训练用bash编写一个小程序&#xff0c;由我们标准输入一个整数&#xff0c;计算机将显示出一个比这个数小…

最新ai系统ChatGPT程序源码+详细搭建教程+mj以图生图+Dall-E2绘画+支持GPT4+AI绘画+H5端+Prompt知识库

目录 一、前言 二、系统演示 三、功能模块 3.1 GPT模型提问 3.2 应用工作台 3.3 Midjourney专业绘画 3.4 mind思维导图 四、源码系统 4.1 前台演示站点 4.2 SparkAi源码下载 4.3 SparkAi系统文档 五、详细搭建教程 5.1 基础env环境配置 5.2 env.env文件配置 六、环境…

Java设计模式之建造者模式

建造者模式&#xff0c;又称生成器模式&#xff1a;将一个复杂的构建与其表示相分离&#xff0c;使得同样的构建过程可以创建不同的表示。 三个角色&#xff1a;建造者、具体的建造者、监工、使用者 建造者角色&#xff1a;定义生成实例所需要的所有方法&#xff1b; 具体的建…

力扣--数组类题目27. 移除元素

给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 示例 1&#xff1a; 输入&#xff1a;nums [3,2,2,3], val 3 输出&#xff1a;2, nums [2,2] 解释&#xff1a;函数应该返回新的长度 2, 并且 n…

曲面(弧面、柱面)展平(拉直)瓶子标签识别ocr

瓶子或者柱面在做字符识别的时候由于变形&#xff0c;识别效果是很不好的 或者是检测瓶子表面缺陷的时候效果也没有展平的好 下面介绍两个项目&#xff0c;关于曲面&#xff08;弧面、柱面&#xff09;展平&#xff08;拉直&#xff09; 项目一&#xff1a;通过识别曲面的6个点…

《Go 语言第一课》课程学习笔记(十)

复合数据类型 同构复合类型&#xff1a;从定长数组到变长切片 由多个同构类型&#xff08;相同类型&#xff09;或异构类型&#xff08;不同类型&#xff09;的元素的值组合而成&#xff0c;这类数据类型在 Go 语言中被称为复合类型。 数组有哪些基本特性&#xff1f; Go 语…

c语言 - inline关键字(内联函数)

概念 在编程中&#xff0c;inline是一个关键字&#xff0c;用于修饰函数。inline函数是一种对编译器的提示&#xff0c;表示这个函数在编译时应该进行内联展开。 内联展开是指将函数的代码插入到调用该函数的地方&#xff0c;而不是通过函数调用的方式执行。这样可以减少函数调…

用手势操控现实:OpenCV 音量控制与 AI 换脸技术解析

基于opencv的手势控制音量和ai换脸 HandTrackingModule.py import cv2 import mediapipe as mp import timeclass handDetector():def __init__(self, mode False, maxHands 2, model_complexity 1, detectionCon 0.5, trackCon 0.5):self.mode modeself.maxHands max…

MySQL三大日志(binlog、redo log和undo log)详解

1.redo log redo log是InnoDB存储引擎层的日志&#xff0c;又称重做日志文件。 用于记录事务操作的变化&#xff0c;记录的是数据修改之后的值&#xff0c;不管事务是否提交都会记录下来 redo log包括两部分&#xff1a;一个是内存中的日志缓冲(redo log buffer)&#xff0c;另…

PythonJS逆向解密——实现翻译软件+语音播报

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 环境使用: python 3.8 pycharm 模块使用: requests --> pip install requests execjs --> pip install PyExecJS ttkbootstrap --> pip install ttkbootstrap pyttsx3 --> pip install pyttsx3 第三…

数据分享|R语言PCA主成分、lasso、岭回归降维分析近年来各国土地面积变化影响...

全文链接&#xff1a;http://tecdat.cn/?p31445 机器学习在环境监测领域的应用&#xff0c;着眼于探索全球范围内的环境演化规律&#xff0c;人类与自然生态之间的关系以及环境变化对人类生存的影响&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 课题着眼于…

Yolov8小目标检测(10):DCNv3可形变卷积助力涨点,COCO新纪录65.4mAP | CVPR2023 InternImage

💡💡💡本文改进:DCNv3,基于DCNv2算子引入共享投射权重、多组机制和采样点调制 DCNv3 | 亲测在红外弱小目标检测涨点,map@0.5 从0.755提升至0.765 💡💡💡Yolo小目标检测,独家首发创新(原创),适用于Yolov5、Yolov7、Yolov8等各个Yolo系列,专栏文章提供每一…

Rocky linux 8.8 基础环境一键配置

1、有线 dnf install -y net-tools dnf install -y vim dnf install -y wget sudo dnf install epel-release -y sudo dnf install dkms -ysudo dnf install -y dnf-utils device-mapper-persistent-data lvm2 sudo dnf config-manager --add-repohttps://download.docker.com…

事件捕获和事件冒泡

事件捕获和事件冒泡与事件流有关系。 以下代码&#xff0c;点击 aa &#xff0c;控制台会打印什么呢&#xff1f; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content&q…

ChatGPT在智能娱乐和游戏互动中的应用如何?

在智能娱乐和游戏互动领域&#xff0c;ChatGPT具有广泛的应用潜力&#xff0c;可以为用户带来更丰富、个性化和有趣的体验。从虚拟角色和游戏情节到实时互动和玩家支持&#xff0c;ChatGPT可以在多个方面为游戏产业带来创新和改变。 **1. **虚拟角色和NPC互动**&#xff1a;Ch…

Delphi 开发手持机(android)打印机通用开发流程(举一反三)

目录 一、场景说明 二、厂家应提供的SDK文件 三、操作步骤&#xff1a; 1. 导出Delphi需要且能使用的接口文件&#xff1a; 2. 创建FMX Delphi项目&#xff0c;将上一步生成的接口文件&#xff08;V510.Interfaces.pas&#xff09;引入: 3. 将jarsdk.jar 包加入到 libs中…

回归预测 | MATLAB实现GA-APSO-IBP改进遗传-粒子群算法优化双层BP神经网络多输入单输出回归预测

回归预测 | MATLAB实现GA-APSO-IBP改进遗传-粒子群算法优化双层BP神经网络多输入单输出回归预测 目录 回归预测 | MATLAB实现GA-APSO-IBP改进遗传-粒子群算法优化双层BP神经网络多输入单输出回归预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 MATLAB实现GA-…