博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。 |
本测试要验证两个问题:
- Flink CDC 能否将多张表的 CDC 数据 (debezium-json 格式)写入到同一个 Kafka Topic 中?
- 验证使用 Flink SQL 方式将多表同时写入 Kafka 时,Flink 的作业数量
首先,准备好用 Flink SQL 实现的将两张表同步到一个 Kafka Topic 中的代码:
create catalog mysql_datasource with ('type'='jdbc','base-url'='jdbc:mysql://10.0.13.30:3306','default-database'='gmall','username'='root','password'='Admin1234!'
);create database if not exists src;
create database if not exists ods;-- sync table: activity_infodrop table if exists src.activity_info;
create table if not exists src.activity_info with ('connector' = 'mysql-cdc','hostname' = '10.0.13.30','port' = '3306','username' = 'root','password' = 'Admin1234!','database-name' = 'gmall','table-name' = 'activity_info'
) like mysql_datasource.gmall.activity_info (excluding options);drop table if exists ods.activity_info;
create table if not exists ods.activity_info with ('connector' = 'kafka','topic' = 'ods','properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092','properties.group.id' = 'ods','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json'
) like src.activity_info (excluding options);-- sync table: activity_ruledrop table if exists src.activity_rule;
create table if not exists src.activity_rule with ('connector' = 'mysql-cdc','hostname' = '10.0.13.30','port' = '3306','username' = 'root','password' = 'Admin1234!','database-name' = 'gmall','table-name' = 'activity_rule'
) like mysql_datasource.gmall.activity_rule (excluding options);drop table if exists ods.activity_rule;
create table if not exists ods.activity_rule with ('connector' = 'kafka','topic' = 'ods','properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092','properties.group.id' = 'ods','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json'
) like src.activity_rule (excluding options);-- sync all tables...insert into ods.activity_info select * from src.activity_info;
insert into ods.activity_rule select * from src.activity_rule;
将上述文件保存为 gmall-sync.sql
,使用如下命令提交:
/usr/lib/flink/bin/sql-client.sh embedded -f gmall-sync.sql
打开 Flink UI,看到如下结果:
结论:
- Flink CDC 能将多张表的 CDC 数据 (debezium-json 格式)写入到同一个 Kafka Topic 中
- Flink SQL 中每个 INSERT INTO 都是一个单独的 Job,所以上面的 SQL 虽然能将多个表的 CDC 数据写入同一个 Kafka Topic,但并不能实现多表一个 Flink 任务,还是一个表对应一个数据库连接,一个 Flink Job