Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据

Flink 1.11 最重要的 Feature —— Hive Streaming 之前已经和大家分享过了,今天就和大家来聊一聊另一个特别重要的功能 —— CDC。

CDC概述

何为CDC?Change Data Capture,将数据库中的’增’、’改’、’删’操作记录下来。在很早之前是通过触发器来完成记录,现在通过 binlog+同步中间件来实现。常用的 binlog 同步中间件有很多,比如 Alibaba 开源的 canal[1],Red Hat 开源的debezium[2],Zendesk 开源的 Maxwell[3] 等等。

这些中间件会负责 binlog 的解析,并同步到消息中间件中,我们只需要消费对应的 Topic 即可。

回到 Flink 上,CDC 似乎和我们没有太大的关联?其实不然,让我们更加抽象地来看这个世界。

当我们用 Flink 去消费数据比如 Kafka 时,我们就仿佛在读一张表,什么表?一张不断有记录被插入的表,我们将每一条被插入的数据取出来,完成我们的逻辑。

6401  .png

当插入的每条数据都没有问题时,一切都很美好。关联、聚合、输出。

但当我们发现,某条已经被计算过的数据有问题时,麻烦大了。我们直接改最后的输出值其实是没有用的,这次改了,当再来数据触发计算时,结果还是会被错误的数据覆盖,因为中间计算结果没有被修改,它仍然是一个错误的值。怎么办?撤回流似乎能解决这个问题,这也确实是解决这个问题的手段,但是问题来了,撤回流怎么确定读取的数据是要被撤回的?另外,怎么去触发一次撤回?

CDC 解决了这些:将消息中间件的数据反序列化后,根据 Type 来识别数据是 Insert 还是 Delete;另外,如果大家看过 Flink 源码,会发现反序列化后的数据类型变了,从 Row 升级为 RowData,RowData 能够将数据标记为撤回还是插入,这就意味着每个算子能够判断出数据到底是需要下发还是撤回。

CDC 的重要性就先说这么多,之后有机会的话,出一篇实时 DQC 的视频,告诉大家 CDC 的出现,对于实时 DQC 的帮助有多大。下面让我们回到正题。

既然有那么多 CDC 同步中间件,那么一定会有各种各样的格式存放在消息中间件中,我们必然需要去解析它们。于是 Flink 1.11 提供了 canal-json 和 debezium-json,但我们用的是 Maxwell 怎么办?只能等官方出或者说是等有人向社区贡献吗?那如果我们用的是自研的同步中间件怎么办?

所以就有了今天的分享:如何去自定义实现一个 Maxwell format。大家也可以基于此文的思路去实现其他 CDC format,比如 OGG, 或是自研 CDC 工具产生的数据格式。

如何实现

当我们提交任务之后,Flink 会通过 SPI 机制将 classpath 下注册的所有工厂类加载进来,包括 DynamicTableFactory、DeserializationFormatFactory 等等。而对于 Format 来说,到底使用哪个 DeserializationFormatFactory,是根据 DDL 语句中的 Format 来决定的。通过将 Format 的值与工厂类的 factoryIdentifier() 方法的返回值进行匹配 来确定。

再通过 DeserializationFormatFactory 中的 createDecodingFormat(...) 方法,将反序列化对象提供给 DynamicTableSource。

通过图来了解整个过程(仅从反序列化数据并消费的角度来看):

6402.png

想要实现 CDC Format 去解析某种 CDC 工具产生的数据其实很简单,核心组件其实就三个:

  • 工厂类(DeserializationFormatFactory):负责编译时根据 ‘format’ = ‘maxwell-json’创建对应的反序列化器。即 MaxwellJsonFormatFactory。
  • 反序列化类(DeserializationSchema):负责运行时的解析,根据固定格式将 CDC 数据转换成 Flink 系统能认识的 INSERT/DELETE/UPDATE 消息,如 RowData。即 MaxwellJsonDeserializationSchema。
  • Service 注册文件:需要添加 Service 文件 META-INF/services/org.apache.flink.table.factories.Factory ,并在其中增加一行我们实现的 MaxwellJsonFormatFactory 类路径。

再通过代码,来看看反序列化中的细节:

public void deserialize(byte[] message, Collectorout) throws IOException {try {RowData row = jsonDeserializer.deserialize(message);String type = row.getString(2).toString(); // "type" fieldif (OP_INSERT.equals(type)) {RowData insert = row.getRow(0, fieldCount);insert.setRowKind(RowKind.INSERT);out.collect(insert);} else if (OP_UPDATE.equals(type)) {GenericRowData after = (GenericRowData) row.getRow(0, fieldCount); // "data" fieldGenericRowData before = (GenericRowData) row.getRow(1, fieldCount); // "old" fieldfor (int f = 0; f < fieldCount; f++) {if (before.isNullAt(f)) {before.setField(f, after.getField(f));}}before.setRowKind(RowKind.UPDATE_BEFORE);after.setRowKind(RowKind.UPDATE_AFTER);out.collect(before);out.collect(after);} else if (OP_DELETE.equals(type)) {RowData delete = row.getRow(0, fieldCount);delete.setRowKind(RowKind.DELETE);out.collect(delete);} else {if (!ignoreParseErrors) {throw new IOException(format("Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", type, new String(message)));}}} catch (Throwable t) {if (!ignoreParseErrors) {throw new IOException(format("Corrupt Maxwell JSON message '%s'.", new String(message)), t);}}}

其实并不复杂:先通过 jsonDeserializer 将字节数组根据 [data: ROW, old: ROW, type: String] 的 schema 反序列化成 RowData,然后根据 “type” 列的值来判断数据是什么类型:增、改、删;再根据数据类型取出 “data” 或者 “old” 区的数据,来组装成 Flink 认识的 INSERT/DELETE/UPDATE 数据并下发。

对象 jsonDeserializer 即 JSON 格式的反序列化器,它可以通过指定的 RowType 类型,读取 JSON 的字节数组中指定的字段并反序列化成 RowData。在我们的场景中,我们需要去读取如下 Maxwell 数据的 “data”, “old” 和 “type” 部分的数据。

{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1}}

因此 MaxwellJsonDeserializationSchema 中定义的 JSON 的 RowType 如下所示。

private RowType createJsonRowType(DataType databaseSchema) {// Maxwell JSON contains other information, e.g. "database", "ts"// but we don't need themreturn (RowType) DataTypes.ROW(DataTypes.FIELD("data", databaseSchema),DataTypes.FIELD("old", databaseSchema),DataTypes.FIELD("type", DataTypes.STRING())).getLogicalType();}

databaseSchema 是用户通过 DDL 定义的 schema 信息,也对应着数据库中表的 schema。结合上面的 JSON 和代码,我们能够得知 jsonDeserializer 只会取走 byte[] 中 data、old、type 这三个字段对应的值,其中 data 和old 还是个嵌套JSON,它们的 schema 信息和 databaseSchema 一致。由于 Maxwell 在同步数据时,“old”区不包含未被更新的字段,所以 jsonDeserializer 返回后,我们会通过 “data” 区的 RowData 将 old 区的缺失字段补齐。

得到 RowData 之后,会取出 type 字段,然后根据对应的值,会有三种分支:

  • insert:取出 data 中的值,也就是我们通过DDL定义的字段对应的值,再将其标记为 RowKind.INSERT 类型数据,最后下发。
  • update:分别取出 data 和 old 的值,然后循环 old 中每个字段,字段值如果为空说明是未修改的字段,那就用 data 中对应位置字段的值替代;之后将 old 标记为 RowKind.UPDATE_BEFORE 也就意味着 Flink 引擎需要将之前对应的值撤回,data 标记为 RowKind.UPDATE_AFTER 正常下发。
  • delete:取出 data 中的值,标记为 RowKind.DELETE,代表需要撤回。

处理的过程中,如果抛出异常,会根据 DDL 中maxwell-json.ignore-parse-errors的值来确定是忽视这条数据继续处理下一条数据,还是让任务报错。

笔者在 maxwell-json 反序列化功能的基础之上,还实现了序列化的功能,即能将 Flink 产生的 changelog 以 Maxwell 的 JSON 格式输出到外部系统中。其实现思路与反序列化器的思路正好相反,更多细节可以参考 Pull Request 中的实现。

PR 实现详情链接: 
https://github.com/apache/flink/pull/13090

功能演示

给大家演示一下从 Kafka 中读取 Maxwell 推送来的 maxwell json 格式数据,并将聚合后的数据再次写入 Kafka 后,重新读出来验证数据是否正确。

Kafka 数据源表

CREATE TABLE topic_products (-- schema is totally the same to the MySQL "products" tableid BIGINT,name STRING,description STRING,weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json');

Kafka 数据结果表&数据源表

CREATE TABLE topic_sink (name STRING,sum_weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell-sink',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
);

MySQL 表

-- 注意,这部分 SQL 在 MySQL 中执行,不是 Flink 中的表
CREATE TABLE product (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
description VARCHAR(512),
weight FLOAT
);
truncate product ;
ALTER TABLE product AUTO_INCREMENT = 101;
INSERT INTO product
VALUES (default,"scooter","Small 2-wheel scooter",3.14),(default,"car battery","12V car battery",8.1),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),(default,"hammer","12oz carpenter's hammer",0.75),(default,"hammer","14oz carpenter's hammer",0.875),(default,"hammer","16oz carpenter's hammer",1.0),(default,"rocks","box of assorted rocks",5.3),(default,"jacket","water resistent black wind breaker",0.1),(default,"spare tire","24 inch spare tire",22.2);
UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
UPDATE product SET weight='5.1' WHERE id=107;
INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
UPDATE product SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;
UPDATE product SET weight='5.17' WHERE id=111;
DELETE FROM product WHERE id=111;
UPDATE product SET weight='5.17' WHERE id=102 or id = 101;
DELETE FROM product WHERE id=102 or id = 103;

先看看能不能正常读取 Kafka 中的 maxwell json 数据。

select * from topic_products;

6403.png

可以看到,所有字段值都变成了 Update 之后的值,同时,被 Delete 的数据也没有出现。

接着让我们再将聚合数据写入 Kafka。

insert into topic_sink select name,sum(weight) as sum_weight from topic_products group by name;

在 Flink 集群的 Web 页面也能够看到任务正确提交,接下来再让我们把聚合数据查出来。

select * from topic_sink

6404.png

最后,让我们查询一下 MySQL 中的表,来验证数据是否一致;因为在 Flink 中,我们将 weight 字段定义成 Decimal(10,2),所以我们在查询 MySQL 的时候,需要将 weight 字段进行类型转换。

6405.png

没有问题,我们的 maxwell json 解析很成功。

写在最后

根据笔者实现 maxwell-json format 的经验,Flink 对于接口的定义、对于模块职责的划分还是很清晰的,所以实现一个自定义 CDC format 非常简单(核心代码只有200多行)。因此,如果你是用的 OGG,或是自研的同步中间件,可以通过本文的思路快速实现一个 CDC format,一起解放你的 CDC 数据!

 

 

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/515125.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里巴巴大数据实践:大数据建设方法论OneData

来源&#xff1a;数智化转型俱乐部 面对爆炸式增长的数据&#xff0c;如何建设高效的数据模型和体系&#xff0c;对这些数据进行有序和有结构地分类组织和存储&#xff0c;避免重复建设和数据不一致性&#xff0c;保证数据的规范性&#xff0c;一直是大数据系统建设不断追求的…

干货!一文搞懂无状态服务

来源 | 机智的程序员小熊责编 | 寇雪芹头图 | 下载于视觉中国事故的发生是量的积累的结果&#xff0c;任何事情都没有表面看起来那么简单&#xff0c;在软件运行的过程中&#xff0c;随着用户量的增加&#xff0c;不考虑高可用&#xff0c;迟早有一天会发生故障&#xff0c;不得…

后疫情时代,这家在线教育机构如何乘“云”而上

简介&#xff1a; 阿里云依托于云计算的基础设施特性&#xff0c;能够帮助教育机构避免业务侧重复投入、提高资源利用率、降低开发和运维成本&#xff0c;使洋葱学院激发出更大的活力&#xff0c;在后疫情时代得到更多用户的青睐 新冠疫情让现代人类和国际社会经历了大规模的隔…

2021全球权威AI性能竞赛MLPerf最新榜单: 浪潮获18项冠军几近半壁江山

4月22日&#xff0c;全球权威AI基准评测MLPerf公布2021年最新榜单&#xff0c;在全部有效41个项目中&#xff0c;浪潮获得18项性能第一&#xff0c;斩获几近半数冠军。 MLPerf™由图灵奖得主大卫•帕特森 &#xff08;David Patterson&#xff09;联合谷歌、斯坦福、哈佛大学…

NFS文件锁一致性设计原理解析

简介&#xff1a; 在存储系统中&#xff0c; NFS&#xff08;Network File System&#xff0c;即网络文件系统&#xff09;是一个重要的概念&#xff0c;已成为兼容POSIX语义的分布式文件系统的基础。它允许在多个主机之间共享公共文件系统&#xff0c;并提供数据共享的优势&am…

作为工程师,你真的了解无服务器?

译者 | 王欢来源 | 分布式实验室头图 | 下载于ICphoto最近&#xff0c;我在YouTube上看了一个非常出色的开发人员的视频。它的标题是“无服务器毫无意义”。虽然我非常喜欢该视频&#xff0c;但也不敢确定作者关于无服务器的观点是否完全正确&#xff0c;因此我想在本文中进行讨…

recaf反编译 java jar包

文章目录1. 获取方式2. 软件运行3. 导入jar4. 模式切换5. 字符串混淆解析1. 获取方式 添加QQ群获取197453088 2. 软件运行 java -jar recaf-2.21.13.jar3. 导入jar 4. 模式切换 5. 字符串混淆解析 如何解密Allatori 混淆的字符串 Java ALLATORIxDEMO

分布式锁在存储系统中的技术实践

简介&#xff1a; 阿里云存储提供了完整的分布式锁解决方案&#xff0c;经过了阿里云众多云产品宝贵的业务场景中长期锤炼&#xff0c;稳定高可靠&#xff0c;且提供了多种语言的SDK选择&#xff0c;甚至是RESTful集成方案。 1 背景 针对共享资源的互斥访问历来是很多业务系统…

Spring Cloud 应用在 Kubernetes 上的最佳实践 — 高可用(混沌工程)

简介&#xff1a; 从上篇开始&#xff0c;我们进入到了高可用的章节&#xff0c;上篇提到的熔断能力&#xff0c;是历年保障大促当天晚上整个系统不被洪峰流量打垮的法宝&#xff0c;本篇介绍的措施与熔断有不一样的地方&#xff1f; 前言 从上篇开始&#xff0c;我们进入到了…

闲鱼对Flutter-Native混合工程解耦的探索

简介&#xff1a; 分手快乐&#xff0c;祝你快乐&#xff5e; 作者&#xff1a;祈晴 1. 闲鱼Flutter现状 闲鱼是第一个使用Flutter混合开发的大型应用&#xff0c;但闲鱼客户端开发最深入体会的痛点就是编译时长影响开发体验。在FlutterNative这种开发模式下&#xff0c;Nat…

学 Python 最大的 1 个误区,看看你中招了吗?

提起 Python&#xff0c;大家总觉得很简单。但是&#xff0c;能把 Python 用好的人&#xff0c;好像并没多少。随着 Python 火了之后&#xff0c;像“ 3 天带你学会 Python ”、“快速入门到全栈”这样的教程层出不穷。很多讲了一点基础语法后&#xff0c;还没讲 http 协议和异…

Unable to make public jdk.internal.loader.Resource jdk.internal.loader.URLClassPath.getResource(jav

文章目录1. 现象2. 异常截图2. 解决方案3. 执行命令4. 启动日志5. 浏览器效果图1. 现象 执行命令 xjar.exe java -jar unified-access-center-passwd.jar运行 sprinbgboot 打包的jar包报错 具体信息如下&#xff1a; C:\Users\gblfy\Desktop\xJarDir>xjar.exe java -jar…

win10安装go开发环境

文章目录1. 下载软件2. 安装3. 验证1. 下载软件 golang官网&#xff1a;https://golang.google.cn/dl/ 2. 安装 双击go1.19.1.windows-amd64.msi一路下一步 3. 验证 go version

3 张图带你走近蚂蚁mPaaS音视频通话组件

简介&#xff1a; 远程问诊、线上开户、车载语音通话……蚂蚁 mPaaS 正在“拥抱新技术&#xff0c;探索新未来”。 音视频技术的进步&#xff0c;让线上办公不再是一时权宜之计&#xff0c;也使得线上业务的“无接触”开展成为可能。近日&#xff0c;蚂蚁集团推出的移动开发平台…

立即生效!帕特·基辛格卸任 VMware 所有职务

整理 | 苏宓出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;心无二用&#xff0c;在帕特基辛格&#xff08;Pat Gelsinger&#xff09;成为英特尔新任 CEO 两个月后&#xff0c;他宣布辞去此前的 VMware 首席执行官的职务&#xff0c;全身心地帮助英特尔重建往日的…

云原生应用实现规范 - 初识 Operator

简介&#xff1a; 本文我们将首先了解到 Operator 是什么&#xff0c;之后逐步了解到 Operator 的生态建设&#xff0c;Operator 的关键组件及其基本的工作原理&#xff0c;下面让我们来一探究竟吧。 作者 | 匡大虎、阚俊宝 基于 Kubernetes 平台&#xff0c;我们可以轻松的…

如何基于 K8s 构建下一代 DevOps 平台?

简介&#xff1a; 当前云原生 DevOps 体系现状如何&#xff1f;面临哪些挑战&#xff1f;如何通过 OAM 解决云原生 DevOps 场景下的诸多问题&#xff1f;云原生开发应用模型 OAM(Open Application Model) 社区核心成员孙健波将为大家一一解答&#xff0c;并分享如何基于 OAM 和…

中国电子云发布专属云CECSTACK 以全栈信创赋能千行百业

2021年4月26日&#xff0c;第四届数字中国建设峰会召开之际&#xff0c;中国电子云在福州举办“云可信 创未来——中国电子云全系产品发布暨战略伙伴签约仪式”&#xff0c;重磅发布中国电子云“信创”实践和全栈自主专属云CECSTACK。中国电子副总经理、党组成员陈锡明&#xf…

idea 双击打不开了咋办

文章目录1. 文件内容还原2. 删除以前旧文件3. 删除以前缓存文件1. 文件内容还原 首先检查一下idea64.exe.vmoptions是否有改动 有的话可以把-javaagent的这一行删除&#xff0c;删除后保存再尝试看看能不能打开。 2. 删除以前旧文件 如果还是不行的话我们可以找打开 C:\Us…

使用日志审计查看MaxCompute执行过哪些操作

简介&#xff1a; MaxCompute完整地记录用户的各项操作行为&#xff0c;会自动将操作日志实时投递到ActionTrail中&#xff0c;ActionTrail针对作业&#xff08;Instance&#xff09;、表&#xff08;Table&#xff09;、函数&#xff08;Function&#xff09;、资源&#xff0…