a.依赖准备
flink-connector-postgres-cdc-*.jar
b.Synchronizing Tables(同步表)
在Flink DataStream作业中使用 PostgresSyncTableAction 或直接通过flink run
,可以将PostgreSQL中的一个或多个表同步到一个Paimon表中。
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \postgres_sync_table--warehouse <warehouse_path> \--database <database_name> \--table <table_name> \[--partition_keys <partition_keys>] \[--primary_keys <primary_keys>] \[--type_mapping <option1,option2...>] \[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \[--metadata_column <metadata_column>] \[--postgres_conf <postgres_cdc_source_conf> [--postgres_conf <postgres_cdc_source_conf> ...]] \[--catalog_conf <paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]] \[--table_conf <paimon_table_sink_conf> [--table_conf <paimon_table_sink_conf> ...]]
配置信息如下:
Configuration | Description |
---|---|
–warehouse | The path to Paimon warehouse. |
–database | The database name in Paimon catalog. |
–table | The Paimon table name. |
–partition_keys | The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”. |
–primary_keys | The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”. |
–type_mapping | It is used to specify how to map PostgreSQL data type to Paimon type. Supported options:“to-string”: maps all PostgreSQL types to STRING. |
–computed_column | The definitions of computed columns. The argument field is from PostgreSQL table field name. See here for a complete list of configurations. |
–metadata_column | –metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its document for a complete list of available metadata. |
–postgres_conf | The configuration for Flink CDC Postgres sources. Each configuration should be specified in the format “key=value”. hostname, username, password, database-name, schema-name, table-name and slot.name are required configurations, others are optional. See its document for a complete list of configurations. |
–catalog_conf | The configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations. |
–table_conf | The configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations. |
如果指定的Paimon表不存在,将自动创建该表,表结构将从所有指定的PostgreSQL表中派生出来。
如果Paimon表已经存在,其表结构将与所有指定PostgreSQL表的结构进行比较。
示例1:将表同步到一个Paimon表中
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \postgres_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--primary_keys pt,uid \--computed_column '_year=year(age)' \--postgres_conf hostname=127.0.0.1 \--postgres_conf username=root \--postgres_conf password=123456 \--postgres_conf database-name='source_db' \--postgres_conf schema-name='public' \--postgres_conf table-name='source_table1|source_table2' \--postgres_conf slot.name='paimon_cdc' \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4
如示例所示,postgres_conf的表名支持正则表达式,以监控满足正则表达式的多个表。所有表的结构将合并到一个Paimon表结构中。
示例2:将分片的表同步到一个Paimon表中
使用正则表达式设置“schema-name”来捕获多个schemas。
典型场景:表“source_table”被拆分为模式“source_schema1”,“source_schema2”…,然后将所有“source_table”的数据同步到一个Paimon表中。
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \postgres_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--primary_keys pt,uid \--computed_column '_year=year(age)' \--postgres_conf hostname=127.0.0.1 \--postgres_conf username=root \--postgres_conf password=123456 \--postgres_conf database-name='source_db' \--postgres_conf schema-name='source_schema.+' \--postgres_conf table-name='source_table' \--postgres_conf slot.name='paimon_cdc' \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4