目录
设计思路
1.为什么不直接用FlinkCDC要重写Flink Gauss CDC
2.存量同步的逻辑是什么
2.1、单主键的切片策略是什么
2.2、复合主键作切片,怎么保证扫描到所有的数据
3、增量同步的逻辑是什么
4、存量同步结束之后如何无缝衔接增量同步
5、下游数据如何落库
6、项目结构大概怎么样
总结
设计思路
1.为什么不直接用FlinkCDC要重写Flink Gauss CDC
GaussDB 是华为内部自研的一套数据库,提供了类似于PostgreSQL的逻辑复制插件。Gauss100 OLTP逻辑复制解析包含逻辑日志信息的REDO日志,只有当表逻辑复制开关和全局逻辑复制开关同时打开时,该表的数据才会被逻辑复制。变化的数据最终到kafka,假设对标USRSAMPLE.T1分别进行插入,更新,删除操作同步的消息格式如下:
[{"data": {"F2": "aaaaa","F1": 1},"dn": 0,"keys": null,"lsn": 295269,"msgTime": "2022-08-11 10:53:18.000999","opType": "I","scn": 598260140474369,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:53:18.000307"},{"data": {"F2": "bbbb"},"dn": 0,"keys": {"F1": 1},"lsn": 295299,"msgTime": "2022-08-11 10:53:52.000061","opType": "U","scn": 598401572352001,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:53:51.000234"},{"data": null,"dn": 0,"keys": {"F1": 1},"lsn": 295313,"msgTime": "2022-08-11 10:54:13.000824","opType": "D","scn": 598495963910145,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:54:13.000210"}
]
- long scn:System Change Number 值递增。
- long lsn: Log Sequence Num递增序号,对应Log_group,用于表示原子操作的
- 顺序。
- short dn:数据库实例节点序列号,暂时没有用。
- int seq:一个事务中的每条数据的序列号。
- String txTime:事务在数据库中的提交时间。
- String msgTime:json序列化一行数据的当前时间。
- String opType:操作数据的类型(I:insert;D:delete;U:update)。
- String table:表名,格式为(用户名.表名)。
- HashMap<String, Object> data:增删改的数据列信息(列名和列值)。
- HashMap<String, Object> keys:kafka.msg.version为0时,keys字段为null;
- kafka.msg.version为1时,该字段填充delete和update的主键或者唯一索引数据列
- 信息(列名和列值)。
逻辑复制的局限性
- 只能捕获开启逻辑复制之后的数据,即存量数据无法同步
- Flink cdc 数据格式要求有 before 和 after的数据,逻辑复制工具只有 after 数据
- 只捕获发生变化的字段里的数据到kafka,而不是这一行的所有字段的数据
2.存量同步的逻辑是什么
把分片轮询均匀的分配给多个读取器,每个读取器(多个线程)可以从各自的队列中获取多个分片,从而保证并行读取分片数据后写入RowData传递给下游。
2.1、单主键的切片策略是什么
算出主键的min_value和max_value,然后根据当前算子并行度进行切片后放入
Queue<HybridSourceSplit> splits = new ConcurrentLinkedQueue<>();
需要考虑如何保证每个分片负载均衡
2.2、复合主键作切片,怎么保证扫描到所有的数据
要保证复合主键的分片能覆盖所有数据,需要对多个主键列的分片进行笛卡尔积组合。这样可以得到一组互不重叠、能覆盖整个主键空间的分片。
假设有一个表,主键由 (pk1, pk2) 组成,我们分别对 pk1 和 pk2 进行分片:
- pk1 的分片为:[(pk1_min, pk1_a), (pk1_a, pk1_b), (pk1_b, pk1_max)]
- pk2 的分片为:[(pk2_min, pk2_x), (pk2_x, pk2_max)]
为了覆盖所有的数据,我们需要将两个主键的分片进行笛卡尔积组合,得到以下分片:
[(pk1_min, pk1_a), (pk2_min, pk2_x)]
[(pk1_min, pk1_a), (pk2_x, pk2_max)]
[(pk1_a, pk1_b), (pk2_min, pk2_x)]
[(pk1_a, pk1_b), (pk2_x, pk2_max)]
[(pk1_b, pk1_max), (pk2_min, pk2_x)]
[(pk1_b, pk1_max), (pk2_x, pk2_max)]全量同步之后如何无缝衔接增量同步?
3、增量同步的逻辑是什么
-
通过参考Kafka Upsert Connector的源码,重写 Kafka Connector,把ChangeEvent转为RowData,
- RowData是Flink内核传输数据的基本格式,其中RowKind有 INSERT,UPDATE_BEFORE, UPDATE_AFTER, DELETE 四种枚举格式和ChangeEvent 的operType相呼应
public class ChangeEvent {private long scn;private long lsn;private short dn;private int seq;private String txTime;private String msgTime;private String opType;private String table;private HashMap<String, Object> data;private HashMap<String, Object> keys;
}
@PublicEvolving
public interface RowData {int getArity();RowKind getRowKind();void setRowKind(RowKind kind);...
}
具体细节可以参考这篇博客:
Flink与Kafka集成:跨版本兼容性与性能优化实战_flink1.18.1u与kafka哪个版本-CSDN博客
4、存量同步结束之后如何无缝衔接增量同步
- HybridTableSource 根据配置决定是使用【存量增量一体化读取】还是【Kafka 增量数据】。
- 启用【存量增量一体化读取】时,HybridParallelSource 被使用,其中包括 HybridSourceReader 和 HybridSourceEnumerator 的创建。
- 如果未启用并行读取,使用单线程的 ChangeEventToRowDataSourceFunction进行【Kafka 增量数据】。
- 在【存量增量一体化读取】中,HybridSourceEnumerator 负责分配分片,而 HybridSourceReader 负责读取数据。
- 存量数据读取通过 SnapshotReader 完成,增量数据读取通过 Kafka 完成,由静态锁保证多个HybridSourceReader实例最后只有一个启动 Kafka读取。
5、下游数据如何落库
- 为了提高算子间的数据传输效率,上游传递过来的数据最初是个二进制数据,BinaryRowData需要手动再转为GenericRowData
- 参考jdbc connector 把GenericRowData数据根据自定义的字段拼接sql用jdbc落库
拼接sql语句的方式,会带来新的问题:
- 比如表有id,name,age三个字段,正常情况的Flink source -> sink时,是传递一整行的数据(id:1,name:张三,age:18),但是如果是手动拼接sql的方式,当遇到把age手动设置为null时,逻辑复制捕获的格式为(id:1,age:null),此时通过flink cdc的方式传递给sink时,数据为(id:1,name:null,age:null),导致sink端无法判断name和age是手动置为null的还是因为该字段未发生改变,导致的为null。
- 此问题在不更改flink内核源码的情况下,通过多传递一个字段px_gauss_marks到sink,sink使用该字段识别字段是否为手动设置为null,从而正确的拼接sql
- px_gauss_marks:如果kafka没传对应的字段,则设置为 0;如果kafka传对应的字段但是值不为null,则设置为 1; 如果kafka传对应的字段但是值为null,则设置为 2,最终拼接成一个包含012这样的字符串,下游根据这个字段做进一步解析
-- source
CREATE TABLE sourceTable (id BIGINT,name STRING,age INT,status BOOLEAN,px_gauss_marks STRING, -- 此字段传递的值用于sink端判断以上的字段是否为手动置为null的情况PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'gauss-cdc','topic' = '逻辑复制配置的topic','properties.bootstrap.servers' = '','properties.group.id' = '','table-name'='','url' = 'db url','username' = 'db username','password' = 'db password','table-name' = '','scan.startup.mode' = 'earliest-offset','enable-parallel-read' = 'true'
);-- sink
CREATE TABLE SinkTable (id BIGINT,name STRING,age INT,status BOOLEAN,px_gauss_marks STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'gauss-jdbc','url' = '','username' = '','password' = '','table-name'='','sink.buffer-flush.max-rows'='1','sink.buffer-flush.interval' = '0','sink.max-retries' = '0'
);
6、项目结构大概怎么样
总结
源码涉及商业秘密目前不便公开,但是这篇博客的设计思想借鉴了很多开源组件源码相信可以承前启后。