资源:flink 1.17.0、dinky 1.0.2
问题:对于kafka相关的包内类找不到的情况
解决:使用 flink-sql-connector- 胖包即可,去掉 flink-connector- 相关瘦包,解决胖瘦包冲突
source使用 flink-sql-connector- 胖包,sink使用 flink-connector-jdbc-3.1.0-1.17.jar、mysql-connector-java 包
lib中则添加公共包 flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar 、mysql-connector-java-8.0.28.jar
结果:运行成功
可实现insert、update、delete 的DML语句-增加、修改、删除语句的CDC变更数据捕获,而注意truncate语句变更的数据不可捕获
mysql sink表会在首次执行自动建表
FlinkSQL:
EXECUTE CDCSOURCE demo_mysql WITH ('connector' = 'mysql-cdc','hostname' = '172.xxxx','port' = '3306','username' = 'xxx','password' = 'xxx','checkpoint' = '3000','scan.startup.mode' = 'initial','parallelism' = '1','table-name' = 'test\.student,','sink.connector' = 'jdbc','sink.url' = 'jdbc:mysql://172.xxx:3306/test?characterEncoding=utf-8&useSSL=false','sink.username' = 'xxx','sink.password' = 'xxx','sink.sink.db' = 'test','sink.table.prefix' = 'test_','sink.table.lower' = 'true','sink.table-name' = '#{tableName}','sink.driver' = 'com.mysql.jdbc.Driver','sink.sink.buffer-flush.interval' = '2s','sink.sink.buffer-flush.max-rows' = '100','sink.sink.max-retries' = '5','sink.auto.create' = 'true'
)