1)部署安装Postgres服务
jiangzhongzhou@ZBMac-C02CW08SM ~ % docker pull postgres
Using default tag: latest
latest: Pulling from library/postgres
latest: Pulling from library/postgres
24c63b8dcb66: Pull complete
2bb0b7dbd861: Pull complete
...
Digest: sha256:1bf73ccae25238fa555100080042f0b2f9be08eb757e200fe6afc1fc413a1b3c
Status: Downloaded newer image for postgres:latest
docker.io/library/postgres:latestWhat's Next?View a summary of image vulnerabilities and recommendations → docker scout quickview postgresjiangzhongzhou@ZBMac-C02CW08SM ~ % docker run -d -p 5432:5432 --name postgresql -e POSTGRES_PASSWORD=pass123 postgres5bad03668df7dd010079ff9499c1751f83b73eba802a4972dbf3008550fe3213
此处生成postgresql实例,账户postgres/pass123,通过IDEA创建Datasource数据源查看postgress的版本信息
> select version();
PostgreSQL 16.3 (Debian 16.3-1.pgdg120+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
> create database test;
> create table public.poc_cdc_src(id serial PRIMARY KEY,last_update bigint not null default -1,payload varchar(200)
);
> ALTER ROLE postgres replication;
> ALTER TABLE poc_cdc_src REPLICA IDENTITY FULL;
2)开启Postgress wal日志
编辑/var/lib/postgresql/data/postgresql.conf
文件,修改配置项
3)开启Flink建表映射
- 全量+增量
CREATE TABLE IF NOT EXISTS postgresql_source (id INT,last_update bigint,payload string,PRIMARY KEY (id) NOT ENFORCED
) COMMENT '表注释'
WITH ('connector' = 'postgres-cdc','hostname' = 'localhost','port' = '5432','username' = 'postgres','password' = 'pass123','schema-name' = 'public','database-name' = 'test','table-name' = 'poc_cdc_src','debezium.snapshot.mode' = 'never','decoding.plugin.name' = 'pgoutput','slot.name' = 'cdcslot1'
)
- 增量
CREATE TABLE IF NOT EXISTS postgresql_source (id INT,last_update bigint,payload string,PRIMARY KEY (id) NOT ENFORCED
) COMMENT '表注释'
WITH ('connector' = 'postgres-cdc','hostname' = 'localhost','port' = '5432','username' = 'postgres','password' = 'pass123','schema-name' = 'public','database-name' = 'test','table-name' = 'poc_cdc_src','debezium.snapshot.mode' = 'never','decoding.plugin.name' = 'pgoutput','slot.name' = 'cdcslot1','scan.incremental.snapshot.enabled' = 'true','scan.startup.mode' = 'latest-offset'
)