接着 上期FlinkCDC基础篇章1-安装使用
下载 Flink 和所需要的依赖包 #
-
下载 Flink 1.17.0 并将其解压至目录
flink-1.17.0
-
下载下面列出的依赖包,并将它们放到目录
flink-1.17.0/lib/
下:下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译
- flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
- flink-sql-connector-mysql-cdc-2.4.0.jar
- flink-sql-connector-postgres-cdc-2.4.0.jar
首先,开启 checkpoint,每隔3秒做一次 checkpoint-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s
-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
CREATE TABLE t_source_sqlserver (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
) WITH ('connector' = 'sqlserver-cdc', -- 使用SQL Server CDC连接器'hostname' = '10.194.183.120', -- SQL Server主机名'port' = '30027', -- SQL Server端口'username' = 'sa', -- SQL Server用户名'password' = 'abc@123456', -- SQL Server密码'database-name' = 'cdc_test', -- 数据库名称'schema-name' = 'dbo', -- 模式名称'table-name' = 'orders' -- 要捕获更改的表名
);-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
CREATE TABLE table_sink_mysql (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
)
WITH ('connector' = 'jdbc', -- 使用JDBC连接器'url' = 'jdbc:mysql://10.194.183.120:30025/test', -- MySQL的JDBC URL'username' = 'root', -- MySQL用户名'password' = 'root', -- MySQL密码'table-name' = 'orders' -- 要写入的MySQL表名
);-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
CREATE TABLE income_distribution (serviceCode STRING,accountPeriod STRING,subjectCode STRING,subjectName STRING,amt DECIMAL(13,2),PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://xxxx:9200','index' = 'income_distribution','sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL');
可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
参考文献:
使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC
flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客