Flink 流式读取 Debezium CDC 数据写入 Hudi 表无法处理 -D / Delete 消息

问题场景是:使用 Kafka Connect 的 Debezium MySQL Source Connector 将 MySQL 的 CDC 数据 (Avro 格式)接入到 Kafka 之后,通过 Flink 读取并解析这些 CDC 数据,然后以流式方式写入到 Hudi 表中,测试中发现,INSERT 和 UPDATE 消息都能很好的处理,但是,-D 类型的 Delete 消息被忽略了,即使已经开启了 ‘changelog.enabled’ = ‘true’ ,既然无效。测试版本:Flink 1.17.1, Hudi 0.14.0, 具体测试工作和脚本在 <> 中已经完整记录,以下是问题表现:

数据库使用的是 Debezium 官方提供的 Docker 镜像,测试表为内置的 inventory 数据库中的 orders 表。操作记录:初始 4 条记录 10001 - 10004 => 添加 10005 => 更新 10001 => 删除 10004,以下是推送至 Kafka 中的全部 CDC 数据:

Struct{order_number=10001} | {"before":null,"after":{"osci.mysql-server-3.inventory.orders.Value":{"order_number":10001,"order_date":16816,"purchaser":1001,"quantity":1,"product_id":102}},"source":{"version":"2.2.0.Final","connector":"mysql","name":"osci.mysql-server-3","ts_ms":1706686648000,"snapshot":{"string":"first_in_data_collection"},"db":"inventory","sequence":null,"table":{"string":"orders"},"server_id":0,"gtid":null,"file":"mysql-bin.000005","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1706686648863},"transaction":null}
Struct{order_number=10002} | {"before":null,"after":{"osci.mysql-server-3.inventory.orders.Value":{"order_number":10002,"order_date":16817,"purchaser":1002,"quantity":2,"product_id":105}},"source":{"version":"2.2.0.Final","connector":"mysql","name":"osci.mysql-server-3","ts_ms":1706686648000,"snapshot":{"string":"true"},"db":"inventory","sequence":null,"table":{"string":"orders"},"server_id":0,"gtid":null,"file":"mysql-bin.000005","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1706686648864},"transaction":null}
Struct{order_number=10003} | {"before":null,"after":{"osci.mysql-server-3.inventory.orders.Value":{"order_number":10003,"order_date":16850,"purchaser":1002,"quantity":2,"product_id":106}},"source":{"version":"2.2.0.Final","connector":"mysql","name":"osci.mysql-server-3","ts_ms":1706686648000,"snapshot":{"string":"true"},"db":"inventory","sequence":null,"table":{"string":"orders"},"server_id":0,"gtid":null,"file":"mysql-bin.000005","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1706686648897},"transaction":null}
Struct{order_number=10004} | {"before":null,"after":{"osci.mysql-server-3.inventory.orders.Value":{"order_number":10004,"order_date":16852,"purchaser":1003,"quantity":1,"product_id":107}},"source":{"version":"2.2.0.Final","connector":"mysql","name":"osci.mysql-server-3","ts_ms":1706686648000,"snapshot":{"string":"last_in_data_collection"},"db":"inventory","sequence":null,"table":{"string":"orders"},"server_id":0,"gtid":null,"file":"mysql-bin.000005","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1706686648898},"transaction":null}
Struct{order_number=10005} | {"before":null,"after":{"osci.mysql-server-3.inventory.orders.Value":{"order_number":10005,"order_date":19753,"purchaser":1003,"quantity":3,"product_id":105}},"source":{"version":"2.2.0.Final","connector":"mysql","name":"osci.mysql-server-3","ts_ms":1706687538000,"snapshot":{"string":"false"},"db":"inventory","sequence":null,"table":{"string":"orders"},"server_id":223344,"gtid":null,"file":"mysql-bin.000005","pos":354,"row":0,"thread":{"long":6},"query":null},"op":"c","ts_ms":{"long":1706687539115},"transaction":null}
Struct{order_number=10001} | {"before":{"osci.mysql-server-3.inventory.orders.Value":{"order_number":10001,"order_date":16816,"purchaser":1001,"quantity":1,"product_id":102}},"after":{"osci.mysql-server-3.inventory.orders.Value":{"order_number":10001,"order_date":16816,"purchaser":1002,"quantity":5,"product_id":104}},"source":{"version":"2.2.0.Final","connector":"mysql","name":"osci.mysql-server-3","ts_ms":1706687601000,"snapshot":{"string":"false"},"db":"inventory","sequence":null,"table":{"string":"orders"},"server_id":223344,"gtid":null,"file":"mysql-bin.000005","pos":640,"row":0,"thread":{"long":6},"query":null},"op":"u","ts_ms":{"long":1706687601997},"transaction":null}
Struct{order_number=10004} | {"before":{"osci.mysql-server-3.inventory.orders.Value":{"order_number":10004,"order_date":16852,"purchaser":1003,"quantity":1,"product_id":107}},"after":null,"source":{"version":"2.2.0.Final","connector":"mysql","name":"osci.mysql-server-3","ts_ms":1706687635000,"snapshot":{"string":"false"},"db":"inventory","sequence":null,"table":{"string":"orders"},"server_id":223344,"gtid":null,"file":"mysql-bin.000005","pos":947,"row":0,"thread":{"long":6},"query":null},"op":"d","ts_ms":{"long":1706687636121},"transaction":null}
Struct{order_number=10004} | null

读取 Kafka Debezium 源表结果如下:

image-20240131160329561

使用 Tableau / Changelog 模式读取 Hudi Sink 表,无 -D 数据

image-20240131163241177

上述行为和 《Flink Hudi 构建流式数据湖》 这篇文章对应不起来,正常是应该有 -D 记录的! 目前尚未找到原因,欢迎了解情况的朋友留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/662411.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux mount

挂载移动硬盘 1、通过 命令 fdisk -l 查看移动硬盘 2、创建 挂载点及文件 mkdir zen 3、mount -t ntfs /dev/sdb1 zen 报错&#xff1a;mount: unknown filesystem type ‘ntfs’ 需要安装ntfs-3g 如下才用编译安装方法&#xff1a; wget https://tuxera.com/opensource/ntf…

基于Java SSM框架实现智能快递分拣系统项目【项目源码】计算机毕业设计

基于java的SSM框架实现智能快递分拣系统演示 JAVA简介 Java主要采用CORBA技术和安全模型&#xff0c;可以在互联网应用的数据保护。它还提供了对EJB&#xff08;Enterprise JavaBeans&#xff09;的全面支持&#xff0c;java servlet API&#xff0c;JSP&#xff08;java serv…

解读命令docker-compose up -d

docker-compose up -d 命令是用来启动Docker Compose项目中定义的服务的&#xff0c;并且让这些服务在后台以守护进程&#xff08;daemon&#xff09;模式运行。 详细解读如下&#xff1a; docker-compose: 这是Docker官方提供的用于定义和管理多容器应用的工具&#xff0c;它…

【基础算法练习】并查集模板

文章目录 算法思想代码模板题目描述&#xff1a;代码并查集模板模板题二&#xff08;求并查集内集合的数量&#xff09; 算法思想 并查集的核心操作&#xff1a; 将两个集合合并询问两个元素是否在一个集合中 基本原理&#xff1a;每个集合我们将他维护成一颗树&#xff0c;…

基于Transformer结构的扩散模型综述

&#x1f380;个人主页&#xff1a; https://zhangxiaoshu.blog.csdn.net &#x1f4e2;欢迎大家&#xff1a;关注&#x1f50d;点赞&#x1f44d;评论&#x1f4dd;收藏⭐️&#xff0c;如有错误敬请指正! &#x1f495;未来很长&#xff0c;值得我们全力奔赴更美好的生活&…

npm淘宝镜像过期解决办法

npm淘宝镜像过期解决办法 因为npm 官方镜像&#xff08;registry.npmjs.org&#xff09;在国内访问很慢&#xff0c;我们基本上都会选择切换到国内的一些 npm 镜像&#xff08;淘宝镜像、腾讯云镜像等&#xff09;。由于淘宝原来的镜像&#xff08;registry.npm.taobao.org&am…

【习题】三方库

判断题 1. 三方组件是开发者在系统能力的基础上进行了一层具体功能的封装&#xff0c;对其能力进行拓展的工具 正确(True) 回答正确 2. 可以通过ohpm uninstall 指令下载指定的三方库 错误(False) 回答正确 3. lottie使用loadAnimation方法加载动画。 正确(True) 回答正…

react中使用useEffcet抛出错误“超出最大更新深度”

目录 【项目中部分代码】&#xff1a; 【说明】&#xff1a; 【抛出错误】&#xff1a;“超出最大更新深度” 【造成原因】&#xff1a; 【例如&#xff1a;】 【解决】&#xff1a; 【项目中部分代码】&#xff1a; // 类组件中&#xff1a;一进页面就拿到要notiveType的…

C语言:文件操作详解

创作不易&#xff0c;友友们给个三连吧&#xff01;&#xff01; 一、为什么我们需要使用文件 我们在写程序的时候&#xff0c;输入的数据是存储在电脑内存中的&#xff0c;如果程序退出内存回收&#xff0c;相应数据也就丢失了&#xff0c;等再次运行程序&#xff0c;就看不到…

VR全景技术如何运用在文旅展示,VR全景技术对景区有哪些好处

引言&#xff1a; 随着科技的不断进步和社会的不断发展&#xff0c;VR全景技术越来越受到人们的关注。在文化旅游行业中&#xff0c;VR全景技术的应用为景区提供了全新的展示方式和体验内容&#xff0c;极大地丰富了游客的文化旅游体验。那么VR全景技术能给文旅展示带来哪些好…

SpringBoot集成H2数据库

1&#xff09;添加H2的依赖 <dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>compile</scope> </dependency>2&#xff09;添加连接配置&#xff0c;启用web控制台 spring:datasource:url…

UE4 C++ UGameInstance实例化

1.创建GameInstance C类 2.在.h添加变量 class 工程名称_API UMyGameInstance : public UGameInstance {GENERATED_BODY()public: //定义了三个公开的变量UMyGameInstance();UPROPERTY(EditAnywhere, BlueprintReadWrite, Category "MyGameInstance")FString Name…

gitlab ci cd 不完全指南

gitlab 可能大家很常用&#xff0c;CI、CD 也应该早有耳闻&#xff0c;但是可能还没有去真正地了解过&#xff0c;这篇文章就是我对 gitlab CI、CD 的一些理解&#xff0c;以及踩过的一些坑&#xff0c;希望能帮助到大家。 什么是 CI、CD CI&#xff08;Continuous Integrati…

云计算底层技术、磁盘技术揭秘虚拟化管理、公有云概述

查看本机是否具备虚拟化支持 硬件辅助虚拟化 处理器里打开 虚拟化Inter VT-x/EPT 或AMD-V 构建虚拟化平台工具软件包 yum 与 dnf Yum和DNF都是用于管理Linux系统中的软件包的工具&#xff0c;但它们在许多方面存在一些差异。以下是一些可能的区别&#xff1a; 依赖解…

Python爬虫某云免费音乐——多线程批量下载

重点一&#xff1a;每首音乐的下载地址 重点二&#xff1a;如何判断是免费音乐 重点三&#xff1a;如何用线程下载并保存 重点四&#xff1a;如何规避运行错误导致子线程死掉 重点五&#xff1a;如何管理子线程合理运行 需要全部代码的私信或者VX:Kmwcx1109 运行效果&…

通过阿里云仓库来下载docker镜像

本文的前提是你已经在你的centos中安装了docker。 Step1&#xff1a; 注册登录阿里云登录 - 欢迎登录阿里云&#xff0c;安全稳定的云计算服务平台注册一个阿里云&#xff08;直接用支付宝扫码就行&#xff09;。 Step2&#xff1a; 创建个人实例 ​ Step3&#xff1a; 进…

关于bypassuac的探究——bypass的实现

经过前面的探究过后&#xff0c;我们整理下思路&#xff0c;首先要创建注册表&#xff0c;并添加DelegateExecute这个键值对&#xff0c;并修改command的指向exe路径即可bypassuac&#xff0c;那么这里用到一下几个函数 RegCreateKeyExA 首先是创建注册表项&#xff0c;对应的…

书客、米家、柏曼大路灯哪款好?多维度实测对比推荐!

每到寒暑假&#xff0c;各个论坛上出现“大路灯怎么选”的类似话题非常频繁&#xff0c;因为现在的孩子出来上学期间需要读写之外&#xff0c;在寒暑假时也在不断的学习&#xff0c;许多家长关注到孩子学习时的光线问题&#xff0c;担心影响到孩子的视力状况&#xff0c;都纷纷…

HTTP(Java web方向补充篇)

HTTP&#xff08;Java web方向补充篇&#xff09; HTTP简介 概念&#xff1a;Hyper Text Transfer Protocol,超文本传输协议&#xff0c;规定了浏览器和服务器之间数据传输的规则 HTTP协议特点&#xff1a; 基于TCP协议&#xff1a;面向连接&#xff0c;安全基于请求-响应模…

如何有效获取 Go 变量类型?探索多种方法

嗨&#xff0c;大家好&#xff01;本文是系列文章 Go 小技巧第九篇&#xff0c;系列文章查看&#xff1a;Go 语言小技巧。 文章目录 Go 的类型系统类型获取使用 fmt.Printf类型选择类型选择反射 reflect.TypeOf 其他注意点错误处理性能考量 总结 在 Python 中&#xff0c;可以使…