记录几个Hudi Flink使用问题及解决方法

前言

如题,记录几个Hudi Flink使用问题,学习和使用Hudi Flink有一段时间,虽然目前用的还不够深入,但是目前也遇到了几个问题,现在将遇到的这几个问题以及解决方式记录一下

版本

  • Flink 1.15.4
  • Hudi 0.13.0

流写

流写Hudi,必须要开启Checkpoint,这个我在之前的文章:Flink SQL Checkpoint 学习总结提到过。

如果不设置Checkpoint,不会生成commit,感觉像是卡住一样,具体表现为只生成.commit.requested和.inflight,然后不写文件、不生成.commit也不报错,对于新手来说很费劲,很难找到解决方法。

索引

hudi-flink 仅支持两种索引:FLINK_STATEBUCKET,默认FLINK_STATE

最开始使用hudi是用的spark,hudi-spark支持BLOOM索引,hudi java client也支持BLOOM索引,所以认为hudi-flink也支持BLOOM索引,但其实不支持,而且官网并没有相关的文档说明,可以从下面这段代码中看出来

Pipelines.hoodieStreamWrite

public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {// 如果是`BUCKET`索引if (OptionsResolver.isBucketIndexType(conf)) {WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);return dataStream.partitionCustom(partitioner, HoodieRecord::getKey).transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory).uid(opUID("bucket_write", conf)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));} else {// 否则按`FLINK_STATE`索引的逻辑WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);return dataStream// Key-by record key, to avoid multiple subtasks write to a bucket at the same time.keyBy(HoodieRecord::getRecordKey).transform("bucket_assigner",TypeInformation.of(HoodieRecord.class),new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))).uid(opUID("bucket_assigner", conf)).setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))// shuffle by fileId(bucket id).keyBy(record -> record.getCurrentLocation().getFileId()).transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory).uid(opUID("stream_write", conf)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));}}

FLINK_STATE 重复问题

如果使用默认的FLINK_STATE索引,在upsert时可能会有重复问题。(之前使用BLOOM索引时不会有这个问题)

问题复现

先写一部分数据作为历史数据到Hudi表,然后再写相同的数据到这个表,最后count表发现数据量变多,也就是有重复数据。
主要参数:

set parallelism.default=12;
set taskmanager.numberOfTaskSlots=2;'write.operation'='upsert',
'write.tasks'='11', 
'table.type'='COPY_ON_WRITE', 

场景为kafka2hudi,kafka数据量200w,没有重复,设置并发的主要目的是为了将数据打散分布在不同的文件里,这样更容易复现问题。(因为如果只有一个历史文件时,很难复现)

第一次任务跑完表数据量为200w,第二次跑完表数据量大于200w,证明数据重复。

重复原因

index state:保存在state中的主键和文件ID的对应关系
重复的原因为FLINK_STATE将主键和文件ID的对应关系保存在state中,当新启动一个任务时,index state需要重新建立,而默认情况下不会包含历史文件的index state,只会建立新数据的index state,所以对于没有历史文件的新表是不会有重复问题的。(对于有历史文件的表,如果从checkpoint恢复也不会有重复问题,因为从checkpoint恢复时,也恢复了之前历史文件的index state

解决方法

通过参数index.bootstrap.enabled解决,默认为false,当为true时,写hudi任务启动时会先引导(加载)历史文件的index state

'index.bootstrap.enabled'='true'

除了重复问题,FLINK_STATE因为将index保存在state中,所以随着数据量的增加,state越来越大。这样对于数据量特别大的表,对内存的要求也会很高,所以会遇到内存不足OOM的问题。 所以建议对于大表,还是选择使用BUCKET索引。

增量数据,‘index.bootstrap.enabled’='false’时的checkpoint记录,checkpoint大小开始很小,然后逐渐增加

增量数据,‘index.bootstrap.enabled’='true’时的checkpoint记录,checkpoint大小开始和结束差不多大

BUCKET INDEX

BUCKET索引需要根据表数据量大小设定好桶数(hoodie.bucket.index.num.buckets),但是默认情况下不能动态调整bucket数量。

另外可以通过参数hoodie.index.bucket.engine将其值设为CONSISTENT_HASHING,通过一致性哈希实现动态调整bucket数量,但是仅支持MOR表,我还没有试过这个功能,大家可以通过官网:https://hudi.apache.org/docs/configurations/了解相关参数自行测试。

hoodie.index.bucket.engine | SIMPLE (Optional) | org.apache.hudi.index.HoodieIndex$BucketIndexEngineType: Determines the type of bucketing or hashing to use when hoodie.index.type is set to BUCKET. SIMPLE(default): Uses a fixed number of buckets for file groups which cannot shrink or expand. This works for both COW and MOR tables. CONSISTENT_HASHING: Supports dynamic number of buckets with bucket resizing to properly size each bucket. This solves potential data skew problem where one bucket can be significantly larger than others in SIMPLE engine type. This only works with MOR tables.

Config Param: BUCKET_INDEX_ENGINE_TYPE
Since Version: 0.11.0

BUCKET索引主要参数:

'index.type' =  'BUCKET', -- flink只支持两种index,默认FLINK_STATE index,FLINK_STATE index对于数据量比较大的情况会因为tm内存不足导致GC OOM
'hoodie.bucket.index.num.buckets' = '16', -- 桶数

注意,index.type是flink客户端独有的,和公共的不一样(使用公共参数不生效),没有前缀hoodie.,而桶数配置项是hudi公共参数,对于flink客户端哪些用公共参数哪些用flink独有的参数,官方文档并没有提供,需要自己在类org.apache.hudi.configuration.FlinkOptions查看,该类中的参数为flink重写的独有参数,没有的话则需要使用公共参数

insert转upsert问题

对于BUCKET如果先insert一部分历史数据,再upsert增量数据时,默认参数配置会抛出如下异常:
(复现此问题只需要批写一条数据即可)

Caused by: java.lang.NumberFormatException: For input string: "4ff32a41"at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)at java.lang.Integer.parseInt(Integer.java:580)at java.lang.Integer.parseInt(Integer.java:615)at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$1(BucketStreamWriteFunction.java:162)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:160)at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:112)at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)

原因是:默认参数下,insert时没有按照bucket索引的逻辑写文件,而upsert是按照bucket逻辑写文件的,bucket索引写的文件名前缀都带有桶号,不是bucket索引写的文件名没有桶号,所以upsert时会尝试解析insert写的历史文件的桶号,导致解析失败。

bucket索引逻辑写的文件

/tmp/cdc/hudi_sink_insert/4ff32a41-4232-4f47-855a-6364eb1d6ce8-0_0-1-0_20230820210751280.parquet

bucket索引逻辑写的文件

/tmp/cdc/hudi_sink_insert/00000000-82f4-48a5-85e9-2c4bb9679360_0-1-0_20230820211542006.parquet

解决方法

对于实际应用场景是有这种先insert在upsert的需求的,解决方法就是尝试通过配置参数使insert也按照bucket索引的逻辑写数据
主要参数:'write.insert.cluster'='true'
相关参数:

'write.operation'='insert', 
'table.type'='COPY_ON_WRITE',
'write.insert.cluster'='true',
'index.type' =  'BUCKET',

我是通过阅读源码发现这个参数可以使insert按照bucket逻辑写数据的
对应的源码在HoodieTableSink.getSinkRuntimeProvider,我在上篇文章Hudi Flink SQL源码调试学习(一)中分析了写hudi时是如何调用到这个方法的,感兴趣得可以看一下。

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {return (DataStreamSinkProviderAdapter) dataStream -> {// setup configurationlong ckpTimeout = dataStream.getExecutionEnvironment().getCheckpointConfig().getCheckpointTimeout();conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);// set up default parallelismOptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();// bulk_insert modefinal String writeOperation = this.conf.get(FlinkOptions.OPERATION);if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {return Pipelines.bulkInsert(conf, rowType, dataStream);}// Append modeif (OptionsResolver.isAppendMode(conf)) {DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());if (OptionsResolver.needsAsyncClustering(conf)) {return Pipelines.cluster(conf, rowType, pipeline);} else {return Pipelines.dummySink(pipeline);}}DataStream<Object> pipeline;// bootstrapfinal DataStream<HoodieRecord> hoodieRecordDataStream =Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);// write pipelinepipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);// compactionif (OptionsResolver.needsAsyncCompaction(conf)) {// use synchronous compaction for bounded source.if (context.isBounded()) {conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);}return Pipelines.compact(conf, pipeline);} else {return Pipelines.clean(conf, pipeline);}};}

我们在上面的代码中可以发现,当是append模式时会走单独的写逻辑,不是append模式时,才会走下面的Pipelines.hoodieStreamWrite,那么就需要看一下append模式的判断逻辑

OptionsResolver.isAppendMode(conf)

  public static boolean isAppendMode(Configuration conf) {// 1. inline clustering is supported for COW table;// 2. async clustering is supported for both COW and MOR tablereturn isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER)|| needsScheduleClustering(conf);}

对于cow表insert时,默认参数的情况needsScheduleClustering(conf)返回false,而!conf.getBoolean(FlinkOptions.INSERT_CLUSTER)返回true,所以只需要让!conf.getBoolean(FlinkOptions.INSERT_CLUSTER)返回false就可以跳过append模式的逻辑了,也就是上面的 'write.insert.cluster'='true'。(每个版本的源码不太一样,所以对于其他版本,可能这个参数并不能解决该问题)

Hive查询异常

记录一个Hive SQL查询Hudi表的异常

异常信息

Caused by: java.lang.ClassCastException: org.apache.hadoop.io.ArrayWritable cannot be cast to org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchat org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.deliverVectorizedRowBatch(VectorMapOperator.java:803)at org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.process(VectorMapOperator.java:845)... 20 morejava.lang.ClassCastException: org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch cannot be cast to org.apache.hadoop.io.ArrayWritablejava.lang.ClassCastException: org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch cannot be cast to org.apache.hadoop.io.ArrayWritable    

异常复现

找一个hudi mor表的rt表,执行count语句(有人反馈聚合函数也会出现此异常)

解决方法

set hive.vectorized.execution.enabled=false; (我验证的这一个参数就可以了)
set hive.vectorized.execution.reduce.enabled=false;(不确定此参数是否必须)

相关阅读

  • Flink SQL Checkpoint 学习总结
  • Hudi Flink SQL源码调试学习(一)
  • Flink SQL操作Hudi并同步Hive使用总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/46683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink之时间语义

Flink之时间语义 简介 Flink中时间语义可以说是最重要的一个概念了,这里就说一下关于时间语义的机制,我们下看一下下面的表格,简单了解一下 时间定义processing time处理时间,也就是现实世界的时间,或者说代码执行时,服务器的时间event time事件时间,就是事件数据中所带的时…

nginx代理webSocket链接响应403

一、场景 使用nginx代理webSocket链接&#xff0c;nginx响应403 1、nginx访问日志响应403 [18/Aug/2023:09:56:36 0800] "GET /FS_WEB_ASS/webim_api/socket/message HTTP/1.1" 403 5 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit…

【数据结构】循环队列

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

浏览器 - 事件循环机制详解

目录 1&#xff0c;浏览器进程模型进程线程浏览器的进程和线程1&#xff0c;浏览器进程2&#xff0c;网络进程3&#xff0c;渲染进程 2&#xff0c;渲染主线程事件循环异步同步 JS 为什么会阻塞渲染任务优先级 3&#xff0c;常见面试题1&#xff0c;如何理解 js 的异步2&#x…

❤ Vue工作常用的一些动态数据和方法处理

❤ Vue工作常用的一些动态数据和方法处理 &#xff08;1&#xff09;动态拼接相对路径结尾的svg 错误写法一 ❌ 正确写法 &#x1f646; <img :src"require(/assets//amazon/svg/homemenu${index}.svg)" style"height: 20px;display: block;margin: 0 au…

关于视频监控平台EasyCVR视频汇聚平台建设“明厨亮灶”具体实施方案以及应用

一、方案背景 近几年来&#xff0c;餐饮行业的食品安全、食品卫生等新闻频频发生&#xff0c;比如某火锅店、某网红奶茶&#xff0c;食材以次充好、后厨卫生被爆堪忧&#xff0c;种种问题引起大众关注和热议。这些负面新闻不仅让餐饮门店的品牌口碑暴跌&#xff0c;附带的连锁…

[JavaWeb]【二】Vue Ajax Elemnet Vue路由打包部署

目录 一 什么是Vue 1.1 Vue快速入门 1.2 常用指令 1.2.1 v-bind && v-model 1.2.2 v-on 1.2.3 v-if && v-show 1.2.4 v-for 1.2.5 案例 1.3 生命周期 二 Ajax 2.1 Ajax介绍 2.2 同步与异步 2.3 原生Ajax&#xff08;繁琐&#xff0c;过时了&#xff09…

手机技巧:分享五个非常实用的生活类APP

目录 1、我的桌面iScreen-桌面美化神器 2.Just Rain-创意听雨声APP 3.得言-美文句子神器 4、微手帐 5、暗盒-隐私保护神器 今天给大家整理5个非常实用的实用APP软件&#xff0c;感兴趣的朋友可以下载试试&#xff01; 1、我的桌面iScreen-桌面美化神器 我的桌面iScreen是一…

[uni-app] uview封装Popup组件,处理props及v-model的传值问题

文章目录 需求及效果遇到的问题解决的办法偷懒的写法 需求及效果 uView(1.x版本)中, 有Pop弹出层的组件, 现在有个需求是,进行简单封装,有些通用的设置不想每次都写(比如 :mask-custom-style"{background: rgba(0, 0, 0, 0.7)}"这种) 然后内部内容交给插槽去自己随…

系统架构设计专业技能 · 系统工程与系统性能

系列文章目录 系统架构设计专业技能 网络技术&#xff08;三&#xff09; 系统架构设计专业技能 系统安全分析与设计&#xff08;四&#xff09;【系统架构设计师】 系统架构设计高级技能 软件架构设计&#xff08;一&#xff09;【系统架构设计师】 系统架构设计高级技能 …

2023年上半年软件设计师下午真题及答案解析

试题一(15分) 随着农业领域科学种植的发展&#xff0c;需要对农业基地及农事进行信息化管理&#xff0c;为租户和农户等人员提供种植相关服务&#xff0c;现欲开发农事管理服务平台&#xff0c;其主要功能是&#xff1a; (1)人员管理&#xff1a;平台管理员管理租户&#xff…

​Redis概述

目录 Redis - 概述 使用场景 如何安装 Window 下安装 Linux 下安装 docker直接进行安装 下载Redis镜像 Redis启动检查常用命令 Redis - 概述 redis是一款高性能的开源NOSQL系列的非关系型数据库,Redis是用C语言开发的一个开源的高键值对(key value)数据库,官方提供测试…

python Requests

Requests概述 官方文档&#xff1a;http://cn.python-requests.org/zh_CN/latest/,Requests是python的HTTP的库&#xff0c;我们可以安全的使用 Requests安装 pip install Requests -i https://pypi.tuna.tsinghua.edu.cn/simple Requests的使用 Respose的属性 属性说明url响…

http学习笔记3

第 11 章 Web 的攻击技术 11.1 针对 Web 的攻击技术 简单的 HTTP 协议本身并不存在安全性问题&#xff0c;因此协议本身几乎不会成为攻击的对象。应用 HTTP 协议的服务器和客户端&#xff0c;以及运行在服务器上的 Web 应用等资源才是攻击目标。目前&#xff0c;来自互联网的攻…

【汇编语言】CS、IP寄存器

文章目录 修改CS、IP的指令转移指令jmp问题分析 修改CS、IP的指令 理论&#xff1a;CPU执行何处的指令&#xff0c;取决于CS:IP应用&#xff1a;程序员可以通过改变CS、IP中的内容&#xff0c;进行控制CPU即将要执行的目标指令&#xff1b;问题&#xff1a;如何改变CS、IP中的…

Golang 基础语法问答

使用值为 nil 的 slice、map 会发生什么&#xff1f; 允许对值为 nil 的 slice 添加元素&#xff0c;但是对值为 nil 的 map 添加元素时会造成运行时 panic。 // map错误示例 func main() {var m map[string]intm["one"] 1 // error: panic: assignment to entry …

Python Opencv实践 - 图像均值滤波

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png", cv.IMREAD_COLOR) print(img.shape) pixel_count img.shape[0] * img.shape[1] print(pixel_count)#为图像添加椒盐噪声 #参考资料&#xf…

基于YOLOX的输电线路异物检测算法研究及软件设计_有系统有文献,整体认知蛮好的

我国自改革开放以来&#xff0c;大力发展工业和经济&#xff0c;对电能同样有着巨大的需求&#xff0c;所需求的电能不仅需要保证其数量&#xff0c;还要保障其质量&#xff0c;因此对整个电力系统安全稳定的运行也提出了更高的要求&#xff0c;电力系统发生故障要实时检测并及…

Kafka—工作流程、如何保证消息可靠性

什么是kafka&#xff1f; 分布式事件流平台。希望不仅仅是存储数据&#xff0c;还能够数据存储、数据分析、数据集成等功能。消息队列&#xff08;把数据从一方发给另一方&#xff09;&#xff0c;消息生产好了但是消费方不一定准备好了&#xff08;读写不一致&#xff09;&am…

Vscode详细安装教程

Vscode官网下载 官网地址&#xff1a;Download Visual Studio Code - Mac, Linux, Windows 通过链接可以直接跳转到下面的页面当中&#xff0c;支持的版本有Windows、Linux、Mac&#xff0c;可以选择适配自己电脑的版本&#xff0c;一般来说应该是Windows x64的。不要直接点W…