Flink Gauss CDC:深度剖析存量与增量同步的创新设计

目录

设计思路

1.为什么不直接用FlinkCDC要重写Flink Gauss CDC

2.存量同步的逻辑是什么

2.1、单主键的切片策略是什么

2.2、​​​​​复合主键作切片,怎么保证扫描到所有的数据

3、增量同步的逻辑是什么

4、存量同步结束之后如何无缝衔接增量同步 

5、下游数据如何落库

6、项目结构大概怎么样

总结


设计思路

1.为什么不直接用FlinkCDC要重写Flink Gauss CDC

GaussDB 是华为内部自研的一套数据库,提供了类似于PostgreSQL的逻辑复制插件。Gauss100 OLTP逻辑复制解析包含逻辑日志信息的REDO日志,只有当表逻辑复制开关和全局逻辑复制开关同时打开时,该表的数据才会被逻辑复制。变化的数据最终到kafka,假设对标USRSAMPLE.T1分别进行插入,更新,删除操作同步的消息格式如下:

[{"data": {"F2": "aaaaa","F1": 1},"dn": 0,"keys": null,"lsn": 295269,"msgTime": "2022-08-11 10:53:18.000999","opType": "I","scn": 598260140474369,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:53:18.000307"},{"data": {"F2": "bbbb"},"dn": 0,"keys": {"F1": 1},"lsn": 295299,"msgTime": "2022-08-11 10:53:52.000061","opType": "U","scn": 598401572352001,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:53:51.000234"},{"data": null,"dn": 0,"keys": {"F1": 1},"lsn": 295313,"msgTime": "2022-08-11 10:54:13.000824","opType": "D","scn": 598495963910145,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:54:13.000210"}
]
  • long scn:System Change Number 值递增。
  • long lsn: Log Sequence Num递增序号,对应Log_group,用于表示原子操作的
  • 顺序。
  • short dn:数据库实例节点序列号,暂时没有用。
  • int seq:一个事务中的每条数据的序列号。
  • String txTime:事务在数据库中的提交时间。
  • String msgTime:json序列化一行数据的当前时间。
  • String opType:操作数据的类型(I:insert;D:delete;U:update)。
  • String table:表名,格式为(用户名.表名)。
  • HashMap<String, Object> data:增删改的数据列信息(列名和列值)。
  • HashMap<String, Object> keys:kafka.msg.version为0时,keys字段为null;
  • kafka.msg.version为1时,该字段填充delete和update的主键或者唯一索引数据列
  • 信息(列名和列值)。

逻辑复制的局限性

  1. 只能捕获开启逻辑复制之后的数据,即存量数据无法同步
  2. Flink cdc 数据格式要求有 before 和 after的数据,逻辑复制工具只有 after 数据
  3. 只捕获发生变化的字段里的数据到kafka,而不是这一行的所有字段的数据

2.存量同步的逻辑是什么

 把分片轮询均匀的分配给多个读取器,每个读取器(多个线程)可以从各自的队列中获取多个分片,从而保证并行读取分片数据后写入RowData传递给下游。

2.1、单主键的切片策略是什么

算出主键的min_value和max_value,然后根据当前算子并行度进行切片后放入

Queue<HybridSourceSplit> splits = new ConcurrentLinkedQueue<>();

需要考虑如何保证每个分片负载均衡
 

2.2、​​​​​复合主键作切片,怎么保证扫描到所有的数据

要保证复合主键的分片能覆盖所有数据,需要对多个主键列的分片进行笛卡尔积组合。这样可以得到一组互不重叠、能覆盖整个主键空间的分片。

假设有一个表,主键由 (pk1, pk2) 组成,我们分别对 pk1 和 pk2 进行分片:

  • pk1 的分片为:[(pk1_min, pk1_a), (pk1_a, pk1_b), (pk1_b, pk1_max)]
  • pk2 的分片为:[(pk2_min, pk2_x), (pk2_x, pk2_max)]

为了覆盖所有的数据,我们需要将两个主键的分片进行笛卡尔积组合,得到以下分片:

[(pk1_min, pk1_a), (pk2_min, pk2_x)]
[(pk1_min, pk1_a), (pk2_x, pk2_max)]
[(pk1_a, pk1_b),   (pk2_min, pk2_x)]
[(pk1_a, pk1_b),   (pk2_x, pk2_max)]
[(pk1_b, pk1_max), (pk2_min, pk2_x)]
[(pk1_b, pk1_max), (pk2_x, pk2_max)]

全量同步之后如何无缝衔接增量同步? 

3、增量同步的逻辑是什么

  1. 通过参考Kafka Upsert Connector的源码,重写 Kafka Connector,把ChangeEvent转为RowData,

  2. RowData是Flink内核传输数据的基本格式,其中RowKind有 INSERT,UPDATE_BEFORE, UPDATE_AFTER, DELETE 四种枚举格式和ChangeEvent 的operType相呼应
public class ChangeEvent {private long scn;private long lsn;private short dn;private int seq;private String txTime;private String msgTime;private String opType;private String table;private HashMap<String, Object> data;private HashMap<String, Object> keys;
}
@PublicEvolving
public interface RowData {int getArity();RowKind getRowKind();void setRowKind(RowKind kind);...
}

具体细节可以参考这篇博客:

Flink与Kafka集成:跨版本兼容性与性能优化实战_flink1.18.1u与kafka哪个版本-CSDN博客

4、存量同步结束之后如何无缝衔接增量同步 

  1. HybridTableSource 根据配置决定是使用【存量增量一体化读取】还是【Kafka 增量数据】。
  2. 启用【存量增量一体化读取】时,HybridParallelSource 被使用,其中包括 HybridSourceReader 和 HybridSourceEnumerator 的创建。
  3. 如果未启用并行读取,使用单线程的 ChangeEventToRowDataSourceFunction进行【Kafka 增量数据】。
  4. 在【存量增量一体化读取】中,HybridSourceEnumerator 负责分配分片,而 HybridSourceReader 负责读取数据。
  5. 存量数据读取通过 SnapshotReader 完成,增量数据读取通过 Kafka 完成,由静态锁保证多个HybridSourceReader实例最后只有一个启动 Kafka读取。

5、下游数据如何落库

  1. 为了提高算子间的数据传输效率,上游传递过来的数据最初是个二进制数据,BinaryRowData需要手动再转为GenericRowData
  2. 参考jdbc connector 把GenericRowData数据根据自定义的字段拼接sql用jdbc落库

拼接sql语句的方式,会带来新的问题:

  • 比如表有id,name,age三个字段,正常情况的Flink source -> sink时,是传递一整行的数据(id:1,name:张三,age:18),但是如果是手动拼接sql的方式,当遇到把age手动设置为null时,逻辑复制捕获的格式为(id:1,age:null),此时通过flink cdc的方式传递给sink时,数据为(id:1,name:null,age:null),导致sink端无法判断name和age是手动置为null的还是因为该字段未发生改变,导致的为null。
  • 此问题在不更改flink内核源码的情况下,通过多传递一个字段px_gauss_marks到sink,sink使用该字段识别字段是否为手动设置为null,从而正确的拼接sql
  • px_gauss_marks:如果kafka没传对应的字段,则设置为 0;如果kafka传对应的字段但是值不为null,则设置为 1; 如果kafka传对应的字段但是值为null,则设置为 2,最终拼接成一个包含012这样的字符串,下游根据这个字段做进一步解析
-- source
CREATE TABLE sourceTable (id BIGINT,name STRING,age INT,status BOOLEAN,px_gauss_marks STRING, -- 此字段传递的值用于sink端判断以上的字段是否为手动置为null的情况PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'gauss-cdc','topic' = '逻辑复制配置的topic','properties.bootstrap.servers' = '','properties.group.id' = '','table-name'='','url' = 'db url','username' = 'db username','password' = 'db password','table-name' = '','scan.startup.mode' = 'earliest-offset','enable-parallel-read' = 'true'
);-- sink
CREATE TABLE SinkTable (id BIGINT,name STRING,age INT,status BOOLEAN,px_gauss_marks STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'gauss-jdbc','url' = '','username' = '','password' = '','table-name'='','sink.buffer-flush.max-rows'='1','sink.buffer-flush.interval' = '0','sink.max-retries' = '0'
);

6、项目结构大概怎么样

所有实现DynamicTableSourceFactory接口的类都是程序的入口

总结

源码涉及商业秘密目前不便公开,但是这篇博客的设计思想借鉴了很多开源组件源码相信可以承前启后。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893400.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

idea新增java快捷键代码片段

最近在写一些算法题&#xff0c;有很多的List<List这种编写&#xff0c;想着能否自定义一下快捷键 直接在写代码输入&#xff1a;lli&#xff0c;即可看见提示

深度学习-91-大语言模型LLM之基于langchain的模型IO的提示模板

文章目录 1 Model的输入输出2 提示模板2.1 提示模板的特点2.2 提示模板的类型3 使用提示模板3.1 设置环境变量3.2 PromptTemplate提示模板3.2.1 通过from_template方法3.2.2 直接生成提示模板3.2.3 使用提示模板3.2.4 复用提示模板3.3 ChatPromptTemplate聊天提示模板3.3.1 通过…

stm8s单片机(二)外部中断实验

中断优先级 stm8的中断优先级不是固定不变的&#xff0c;stm8的中断分为硬件优先级与软件优先级&#xff1b;当多个中断发生时&#xff0c;cpu会先响应软件优先级高的中断&#xff0c;若软件优先级相同会先响应硬件优先级高的&#xff1b; 其中软件优先级有四个 /*** brief …

社区版Dify实现文生视频 LLM+ComfyUI+混元视频

社区版Dify实现文生视频 LLMComfyUI混元视频 一、 社区版Dify实现私有化混元视频效果二、为什么社区版Dify可以在对话框实现文生视频&#xff1f;LLMComfyUI混元视频 实现流程图&#xff08;重点&#xff09;1. 文生视频模型支持ComfyUI2. ComfyUI可以轻松导出API实现封装3. Di…

helm推送到harbor私有库--http: server gave HTTP response to HTTPS client

harbor私有库访问的是http模式 harbor 2.8版本以上可以存储helm镜像 docker镜像推送的时候需要docker端配置insecure-registries 发现helm推送只能在harbor部署的本机使用localhost才能推送成功&#xff0c;即 helm push xxx.tgz oci://localhost:80/library 使用helm pus…

transformers使用过程问题

transfomers新旧版本冲突&#xff0c;和accelerate、datasets、evaluate这些库直接也经常会发生冲突 我使用了下面的版本&#xff0c;暂时没有冲突&#xff0c;如果有冲突再更新 transformers4.41.2 datasets2.20.0 accelerate0.31.0 evaluate0.4.2pip install transformers安…

svn tag

一般发布版本前&#xff0c;需要在svn上打个tag。步骤如下&#xff1a; 1、空白处右击&#xff0c;选择TortoiseSVN->Branch/tag; 2、填写To path&#xff0c;即tag的路基以及tag命名&#xff08;一般用版本号来命名&#xff09;&#xff1b;填写tag信息&#xff1b;勾选cr…

【JavaSE】(8) String 类

一、String 类常用方法 1、构造方法 常用的这4种构造方法&#xff1a;直接法&#xff0c;或者传参字符串字面量、字符数组、字节数组。 在 JDK1.8 中&#xff0c;String 类的字符串实际存储在 char 数组中&#xff1a; String 类也重写了 toString 方法&#xff0c;所以可以直…

【理解工具调用的流程,本质体现了大模型智能性】

1、工具调用 调用完结果看里面tool_calls 是否为空&#xff0c;不为空就调用工具函数处理&#xff0c; 如果为空就中断循环。大模型返回的message结果智能判断是否继续调用 输入输出如下&#xff1a; 请输入&#xff1a;深圳西安天气 ------------------------------------…

excel实用工具

持续更新… 文章目录 1. 快捷键1.1 求和 2. 命令2.1 查找 vloopup 1. 快捷键 1.1 求和 windows: alt mac : command shift T 2. 命令 2.1 查找 vloopup vlookup 四个入参数 要查找的内容 &#xff08;A2 6xx1&#xff09;查找的备选集 &#xff08;C2:C19&#xff09;…

【C++】模板(进阶)

本篇我们来介绍更多关于C模板的知识。模板初阶移步至&#xff1a;【C】模板&#xff08;初阶&#xff09; 1.非类型模板参数 1.1 非类型模板参数介绍 模板参数可以是类型形参&#xff0c;也可以是非类型形参。类型形参就是我们目前接触到的一些模板参数。 //类型模板参数 …

一文学会YOLO系列算法(从V3到11)实现遥感图像目标检测

目录 前言 数据集介绍 数据集转换 YOLO代码的下载 YOLO的配置 1.数据集的配置 2.模型的配置 YOLO11模型的训练 其它版本YOLO的训练 前言 遥感技术的快速发展&#xff0c;特别是在高分辨率遥感图像的获取能力上的显著提升&#xff0c;已经大大拓宽了遥感数据在环境监测…

图解Git——分布式Git《Pro Git》

分布式工作流程 Centralized Workflow&#xff08;集中式工作流&#xff09; 所有开发者都与同一个中央仓库同步代码&#xff0c;每个人通过拉取、提交来合作。如果两个开发者同时修改了相同的文件&#xff0c;后一个开发者必须在推送之前合并其他人的更改。 Integration-Mana…

【高阶数据结构】布隆过滤器(BloomFilter)

1. 概念 1.1 背景引入 背景&#xff1a;在计算机软件中&#xff0c;一个常见的需求就是 在一个集合中查找一个元素是否存在 &#xff0c;比如&#xff1a;1. Word 等打字软件需要判断用户键入的单词是否在字典中存在 2. 浏览器等网络爬虫程序需要保存一个列表来记录已经遍历过…

【json_object】mysql中json_object函数过长,显示不全

问题&#xff1a;json只显示部分 解决&#xff1a; SET GLOBAL group_concat_max_len 1000000; -- 设置为1MB&#xff0c;根据需要调整如果当前在navicat上修改&#xff0c;只有效本次连接和后续会话&#xff0c;重新连接还是会恢复默认值1024 在my.ini配置文件中新增或者修…

计算机网络 (52)秘钥分配

一、重要性 在计算机网络中&#xff0c;密钥分配是密钥管理中的一个核心问题。由于密码算法通常是公开的&#xff0c;因此网络的安全性主要依赖于密钥的安全保护。密钥分配的目的是确保密钥在传输过程中不被窃取或篡改&#xff0c;同时确保只有合法的用户才能获得密钥。 二、方…

第35天:安全开发-JavaEE应用原生反序列化重写方法链条分析触发类类加载

时间轴&#xff1a; 序列化与反序列化图解&#xff1a; 演示案例&#xff1a; Java-原生使用-序列化&反序列化 Java-安全问题-重写方法&触发方法 Java-安全问题-可控其他类重写方法 Java-原生使用-序列化&反序列化 1.为什么进行序列化和反序列化&#xff1…

MindAgent:基于大型语言模型的多智能体协作基础设施

2023-09-18 &#xff0c;加州大学洛杉矶分校&#xff08;UCLA&#xff09;、微软研究院、斯坦福大学等机构共同创建的新型基础设施&#xff0c;目的在评估大型语言模型在游戏互动中的规划和协调能力。MindAgent通过CuisineWorld这一新的游戏场景和相关基准&#xff0c;调度多智…

Excel 技巧17 - 如何计算倒计时,并添加该倒计时的数据条(★)

本文讲如何计算倒计时&#xff0c;并添加该倒计时的数据条。 1&#xff0c;如何计算倒计时 这里也要用公式 D3 - TODAY() 显示为下面这个样子的 然后右键该单元格&#xff0c;选 设置单元格格式 然后点 常规 这样就能显示出还书倒计时的日数了。 下拉适用到其他单元格。 2&a…

rocketmq基本架构

简介 Name server 负责broker注册、心跳&#xff0c;路由等功能&#xff0c;类似Kafka的ZKname server节点之间不互相通信&#xff0c;broker需要和所有name server进行通信。扩容name server需要重启broker&#xff0c;不然broker不会和name server建立连接producer和consum…