新一代开源流数据湖平台Apache Paimon入门实操-下

文章目录

  • 实战
    • 写表
      • 插入和覆盖数据
      • 更新数据
      • 删除数据
      • Merge Into
    • 查询表
      • 批量查询
        • 时间旅行
        • 批量增量查询
      • 流式查询
        • 时间旅行
        • ConsumerID
      • 查询优化
    • 系统表
      • 表指定系统表
      • 分区表
      • 全局系统表
      • 维表
    • CDC集成
      • MySQL
      • Kafka
      • 支持schema变更

实战

写表

插入和覆盖数据

可以使用INSERT语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式指定,也可以由查询结果指定。语法格式如下,其与标准sql语法一致

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }
  • part_spec:一个可选参数,用于指定分区的键和值对的逗号分隔列表。请注意,可以在分区规范中使用类型化文字(例如,日期’ 2023-01-02 ')。语法: PARTITION ( partition_col_name = partition_col_val [ , … ] )
  • column_list:一个可选参数,用于指定属于table_identifier表的以逗号分隔的列列表。所有指定的列都应该存在于表中,并且不能相互复制。它包括除静态分区列之外的所有列。列列表的大小应该与VALUES子句或查询中数据的大小完全相同。语法: (col_name1 [, column_name2, …])
  • value_expr:指定要插入的值。可以插入显式指定的值或NULL。必须用逗号分隔子句中的每个值。可以指定多个值集来插入多行。目前,Flink不支持直接使用NULL,因此NULL应该通过’ cast (NULL AS data_type) '转换为实际数据类型。语法: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]
CREATE TABLE demo1 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);insert into demo1 values(1,1,'order','2023-08-04','19'),(2,2,'pay','2023-08-04','20');
select * from demo1;CREATE TABLE demo_p1 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
insert into demo_p1 partition(dt='2023-08-04',hh='21') values(3,3,'pv');
insert into demo_p1 select * from demo1;
select * from demo_p1;

image-20230804153022659

覆盖只支持batch模式。覆盖默认情况下,流读取将忽略INSERT OVERWRITE生成的提交。如果想要读取OVERWRITE的提交,可以配置流读覆盖。对于分区表,Paimon的默认覆盖模式是动态分区覆盖(这意味着Paimon只删除出现在覆盖数据中的分区)。可以配置动态分区覆盖来更改它。

RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'BATCH';
  • 覆盖未分区的表
insert overwrite demo1 values(3,3,'pv','2023-08-04','20');
  • 覆盖分区表
insert overwrite demo_p1 select * from demo1;
insert overwrite demo_p1 partition(dt='2023-08-04',hh='20') select user_id,item_id,behavior from demo1;

更新数据

目前,Paimon支持在Flink 1.17及以后的版本中使用UPDATE更新记录。可以在Flink的批处理模式下执行UPDATE。重要的表属性设置,只有主键表支持此特性。

要支持此特性,需要对MergeEngine进行重复数据删除或部分更新。不支持更新主键。语法:UPDATE table_identifier SET column1 = value1, column2 = value2, … WHERE condition;

update demo_p1 set item_id = 5,behavior='uv' where user_id = 1;# 比如下面,merge-engine默认就是deduplicate
CREATE TABLE MyTable (a STRING,b INT,c INT,PRIMARY KEY (a) NOT ENFORCED
) WITH ( 'write-mode' = 'change-log','merge-engine' = 'deduplicate' 
);
  • deduplicate:删除重复数据,保留最后一行。
  • Partial-update:部分更新非空字段。
  • aggregation:聚合具有相同主键的字段。
  • first-row:删除重复数据并保留第一行。

详细参数配置可以查看:https://paimon.apache.org/docs/master/maintenance/configurations/

删除数据

Flink1.17+以上SQL支持删除数据。且只有写模式设置为更改日志的表才支持此特性;如果表有主键,则需要对MergeEngine进行重复数据删除以支持此特性。

delete from demo_p1 where behavior = 'pv';
select * from demo_p1;

Merge Into

Paimon通过flink run提交“MERGE - INTO”作业来支持“MERGE INTO”。下载paimon-flink-action文件,无需放到lib目录,与普通开发jar包一样指定路径运行即可

wget https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-action/0.5-SNAPSHOT/paimon-flink-action-0.5-20230804.002229-95.jar

通过Merge Into实现行级别更新,只有主键表支持这个功能,该操作不会产生UPDATE_BEFORE,所以不建议设置’ changelog-producer ’ = ’ input '。合并操作使用“upsert”语义而不是“update”语义,这意味着如果行存在,则执行更新,否则执行插入。例如对于非主键表可以更新每一列,但对于主键表如果希望更新主键,则必须插入具有不同于表中行主键的新行。在这种情况下,“upsert”是有用的。

匹配解释如下:

  • 匹配:更改的行来自目标表,并且每个行都可以基于merge-condition和可选的Matched -condition (source∩target)匹配源表的行。
  • 不匹配:根据合并条件和可选的不匹配条件(source - target),更改的行来自源表,所有行不能匹配任何目标表行。
  • Not-matched-by-source:根据merge-condition和可选的Not-matched-by-source条件(target -source),更改的行来自目标表,并且所有行不能匹配任何源表行。

参数格式:

  • Matched-upsert-changes: col = .col | expression [, …] (Means setting .col with given value. Do not add ‘.’ before ‘col’.),可以使用’ * '来设置具有所有源列的列(要求目标表的模式等于源表的模式)。
  • Not-matched-upsert-changes:类似于matched-upsert-changes,但不能引用源表的列或使用’ * '。
  • insert-values:col1, col2, …, col_end。Must specify values of all columns. For each column, you can reference .col or use an expression。可以使用’ * '插入所有源列(要求目标表的模式等于源表的模式)。
  • 不匹配条件不能使用目标表的列来构造条件表达式。
  • 不匹配源条件不能使用源表的列来构造条件表达式。
create database test;use test;CREATE TABLE wstest1 (	id INT,	ts BIGINT,    vc INT,	PRIMARY KEY (id) NOT ENFORCED)insert into wstest1 values(1,1,1),(2,2,2),(3,3,3);select * from wstest1;

image-20230804180704978

CREATE TABLE wstest2 (	id INT,	ts BIGINT,    vc INT,	PRIMARY KEY (id) NOT ENFORCED)insert into wstest2 values(2,2,2),(3,3,3),(4,4,4),(5,5,5);select * from wstest2;

image-20230804180742128

./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    merge-into \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table wstest2 \    --source-table test.wstest1 \    --on "wstest2.id = wstest1.id" \    --merge-actions matched-upsert,matched-delete \    --matched-upsert-condition "wstest2.ts > 2" \    --matched-upsert-set "vc = 100" \    --matched-delete-condition "wstest2.ts <= 2"
select * from wstest2;

image-20230804181219648

查询表

批量查询

Paimon的批处理读取返回表快照中的所有数据。默认情况下,批处理读取返回最新的快照。在sql-client中,设置执行模式为

RESET 'execution.checkpointing.interval';SET 'execution.runtime-mode' = 'batch';

时间旅行

带时间旅行的Paimon批读可以指定一个快照或一个标签,并读取相应的数据。通过查看文件系统存储元数据和数据可以看下上面test库中wstest2表的快照目录可以查看目前有1和2两个快照版本

image-20230807094224697

-- 读取id为1的快照SELECT * FROM wstest2 /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 以Unix毫秒为单位从指定的时间戳读取快照SELECT * FROM wstest2 /*+ OPTIONS('scan.timestamp-millis' = '1691143583490') */;-- 读取标签“my-tag”SELECT * FROM wstest2 /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

读取id和时间戳读取数据如下

image-20230807093656105

批量增量查询

读取开始快照(不包含)和结束快照之间的增量变化。例如:

  • “5,10”表示快照5和快照10之间的变化。
  • ’ TAG1,TAG3 '表示TAG1和TAG3之间的变化。
SELECT * FROM wstest2 /*+ OPTIONS('incremental-between' = '1,2') */;

image-20230807094720754

流式查询

默认情况下,流式读取在第一次启动时生成表上的最新快照,并继续读取最新的更改。

-- Flink SQLSET 'execution.checkpointing.interval'='30s';SET 'execution.runtime-mode' = 'streaming';

可以做流式读取没有快照数据,可以使用最新的扫描模式;连续读取最新更改,而不在开始时生成快照。

SELECT * FROM wstest2 /*+ OPTIONS('scan.mode' = 'latest') */;

如果只想处理今天及以后的数据,可以使用分区过滤器:

SELECT * FROM wstest2 WHERE dt > '2023-08-04'

时间旅行

如果它不是一个分区表,或者不能按分区进行过滤,可以使用Time travel的流读取。

-- 从id为1的快照读取更改SELECT * FROM wstest2 /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 从指定时间戳的快照读取更改SELECT * FROM wstest2 /*+ OPTIONS('scan.timestamp-millis' = '1691143583490') */;-- 在第一次启动时读取快照id 1,并继续读取更改SELECT * FROM wstest2 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;

读取数据如下

image-20230807095452502

ConsumerID

这是一个实验性的功能。可以在流式读表时指定消费者id:

SELECT * FROM wstest2 /*+ OPTIONS('consumer-id' = 'myid') */;

当流读取Paimon表时,要记录到文件系统中的下一个快照id。这有几个好处:

  • 当前一个作业停止时,新启动的作业可以继续使用前一个进度,而无需从状态恢复。新的读取将从消费者文件中找到的下一个快照id开始读取。
  • 在确定快照是否过期时,Paimon查看文件系统中表的所有消费者,如果仍然有消费者依赖于该快照,则该快照将不会在到期时被删除。
  • 当没有水印定义时,Paimon表会将快照中的水印传递给下游的Paimon表,这意味着您可以跟踪整个管道的水印进度。

注意:消费者将阻止快照过期,可以指定consumer.expiration-time来管理消费者的生命周期。可以使用给定的消费者ID和下一个快照ID重置消费者。

SELECT * FROM wstest2 /*+ OPTIONS('consumer-id' = 'itxiaoshen') */;insert into wstest2 values(6,6,6),(7,7,7);SELECT * FROM wstest2 /*+ OPTIONS('consumer-id' = 'itxiaoshen') */;

image-20230807101227738

查询优化

强烈建议与查询一起指定分区和主键过滤器,这将加快查询的数据跳过。可以加速数据跳转的过滤函数有:

  • =
  • <
  • <=
  • >
  • >=
  • IN (...)
  • LIKE 'abc%'
  • IS NULL

Paimon将按主键对数据进行排序,这加快了点查询和范围查询的速度。当使用复合主键时,查询过滤器最好在主键的最左边形成一个前缀,以获得良好的加速。

假设一个表具有以下表结构:

CREATE TABLE orders (    catalog_id BIGINT,    order_id BIGINT,    .....,    PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key)

通过为主键的最左边的前缀指定一个范围过滤器,查询可以获得很好的加速。

SELECT * FROM orders WHERE catalog_id=1025;SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;SELECT * FROM orders  WHERE catalog_id=1025  AND order_id>2035 AND order_id<6000;

但是下面的过滤器不能很好地加速查询。

SELECT * FROM orders WHERE order_id=29495;SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

系统表

表指定系统表

表指定的系统表包含每个表的元数据和信息,例如创建的快照和正在使用的选项。用户可以通过批量查询访问系统表。

目前,Flink、Spark和Trino都支持查询系统表。在某些情况下,表名需要用反引号括起来以避免语法解析冲突,例如三重访问模式:

SELECT * FROM my_catalog.my_db.`MyTable$snapshots`;
  • 快照表
# 通过快照表可以查询该表的快照历史信息,包括快照中发生的记录计数。select * from wstest2$snapshots;

image-20230807094350201

通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。

  • 表模式

可以通过schemas表查询该表的历史模式。

SELECT * FROM wstest2$schemas;

image-20230807103013073

可以连接快照表和模式表以获得给定快照的字段。

SELECT s.snapshot_id, t.schema_id, t.fields     FROM wstest2$snapshots s JOIN MyTable$schemas t     ON s.schema_id=t.schema_id where s.snapshot_id=1;
  • 表选项

可以查询表的选项信息,这些信息是通过选项表从DDL指定的。未显示的选项将是默认值。可以参考[Configuration]。

SELECT * FROM wstest2$options;
  • 审计日志表

如果需要审计表的变更日志,可以使用audit_log系统表。通过audit_log表,可以在获取表的增量数据时获取rowkind列。您可以使用该列进行过滤和其他操作,以完成审计。

SELECT * FROM wstest2$audit_log;
> +I:插入操作,新增数据。> -U:使用更新行之前的内容进行更新操作,一条数据的修改会产生两个U 标识符数据。其中-U 含义为修改前数据。> +U:使用更新行的新内容进行更新操作,修改之后的数据。> -D:删除操作,删除的数据。
  • 表文件

可以查询指定快照表的文件。

-- 查询最新快照的文件SELECT * FROM wstest2$files;

image-20230807103636295

-- 还可以查询指定快照的文件SELECT * FROM wstest2$files /*+ OPTIONS('scan.snapshot-id'='1') */;

image-20230807103757946

  • 表标签

通过标签表可以查询该表的标签历史信息,包括标签基于哪些快照,以及快照的一些历史信息。还可以获得所有标签名称和时间旅行到特定的标签数据名称。

SELECT * FROM wstest2$tags;
  • 表消费者

可以查询包含下一个快照的所有消费者。

SELECT * FROM wstest2$consumers;

image-20230807103944792

  • 表清单文件

可以查询当前表的最新快照或指定快照中包含的所有清单文件。

-- 查询最新快照的清单信息SELECT * FROM wstest2$manifests;

image-20230807104649956

-- 也可以查询带有指定快照的清单SELECT * FROM wstest2$manifests /*+ OPTIONS('scan.snapshot-id'='1') */;

image-20230807104709319

分区表

可以查询表的分区文件。

SELECT * FROM demo_p1$partitions;

image-20230807105913198

全局系统表

全局系统表包含当前存在的所有表的统计信息;为了方便检索,创建了一个参考系统数据库sys,可以用sql在flink中显示所有全局系统表:

所有选项表,这个表类似于Options table,但是它显示所有的表选项都是all database。

SELECT * FROM sys.all_table_options;

维表

Paimon支持Lookup Join,它用于从Paimon查询数据来补充维度字段,是流查询中的一种连接。连接要求一个表具有处理时间属性,另一个表由查找源连接器提供支持。

在Flink中,Paimon支持对带有主键的表和仅追加表进行查找连接。下面的示例说明了这个特性,创建一个Paimon表并实时更新它。

USE CATALOG fs_catalog;CREATE TABLE customers (    id INT PRIMARY KEY NOT ENFORCED,    name STRING,    country STRING,    zip STRING);-- 启动一个流作业来更新客户表INSERT INTO customers values(1,'zhangsan','china','aaa'),(2,'lisi','china','bbb'),(3,'wangwu','china','ccc');select * from customers;-- 创建一个临时左表,就像从kafkaCREATE TEMPORARY TABLE Orders (    order_id INT,    total INT,    customer_id INT,    proc_time AS PROCTIME()) WITH (  'connector' = 'datagen',   'rows-per-second'='1',   'fields.order_id.kind'='random',   'fields.order_id.min'='1',   'fields.order_id.max'='1000000',   'fields.total.kind'='sequence',   'fields.total.start'='1',   'fields.total.end'='1000',   'fields.customer_id.kind'='random',   'fields.customer_id.min'='1',   'fields.customer_id.max'='3');select * from Orders;

现在可以在查找连接查询中使用客户。

-- 用客户信息填充每个订单SELECT o.order_id, o.total, c.country, c.zipFROM Orders AS oJOIN customersFOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;

image-20230807124324530

Lookup Join将在本地维护一个RocksDB缓存,并实时提取表的最新更新。查找连接操作符将只提取必要的数据,因此筛选条件对性能非常重要。此特性仅适用于最多包含数千万条记录的表,以避免过度使用本地磁盘。

如果Orders(主表)join的记录缺失,因为客户(查找表)的相应数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup进行查找。以下选项允许用户微调RocksDB以获得更好的性能,可以在表属性或动态表提示中指定它们。

-- 动态表提示示例SELECT o.order_id, o.total, c.country, c.zipFROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;

CDC集成

通过模式演化,Paimon支持多种方式将数据摄取到Paimon表中;这意味着添加的列将实时同步到Paimon表,并且不会为此重新启动同步作业。目前支持以下同步方式:

  • MySQL同步表:将MySQL中的一个或多个表同步到一个Paimon表中。
  • MySQL同步数据库:将整个MySQL数据库同步到一个Paimon数据库。
  • API同步表:自定义数据流输入到一个Paimon表。
  • Kafka同步表:将一个Kafka主题的表同步到一个Paimon表。
  • Kafka同步数据库:同步一个包含多个表的Kafka主题或多个包含一个表的主题到一个Paimon数据库。

MySQL

Paimon支持使用更改数据捕获(CDC)同步来自不同数据库的更改,此功能需要Flink及其CDC连接器,准备CDC Bundled Jar。在上一篇我们已经将flink-sql-connector-mysql-cdc-2.4.1.jar拷贝到Flink的Lib目录。

  • 同步表:通过在Flink数据流作业中使用mysql-sync-table动作或直接通过Flink运行,用户可以将一个或多个MySQL表同步到一个Paimon表中。

如果指定的Paimon表不存在,则此操作将自动创建表。它的模式将从所有指定的MySQL表派生。如果Paimon表已经存在,它的模式将与所有指定的MySQL表的模式进行比较。

./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    mysql-sync-table \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table my_users_cdc \    --primary-keys id \    --mysql-conf hostname=192.168.50.95 \    --mysql-conf username=root \    --mysql-conf password=123456 \    --mysql-conf database-name='test' \    --mysql-conf table-name='my_users' \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hadoop2:9083 \    --table-conf bucket=4 \    --table-conf changelog-producer=input \    --table-conf sink.parallelism=4

启动后查看下表的信息已经同步

image-20230807140531281

修改MySQL数据库my_users表中id为4的age字段的值从40改为55

image-20230807140750362

可以看到已经获取变更的数据

image-20230807140826925

还可以使用正则表达式设置’ database-name ‘来捕获多个数据库。通过对–mysql-conf database-name=‘source_db.+’ ;一个典型的场景是,一个表’ source_table ‘被分成数据库’ source_db1 ', ’ source_db2 ‘…,然后可以同步所有’ source_table '的数据到一个Paimon表。

  • 同步数据库:通过在Flink数据流作业中使用mysql-sync-database动作或直接通过Flink运行,用户可以将整个MySQL数据库同步到一个Paimon数据库。

只有具有主键的表才会被同步。对于每个要同步的MySQL表,如果对应的Paimon表不存在,该操作将自动创建表。它的模式将从所有指定的MySQL表派生。如果Paimon表已经存在,它的模式将与所有指定的MySQL表的模式进行比较。

./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    mysql-sync-database \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table-prefix "ods_" \    --table-suffix "_cdc" \    --mysql-conf hostname=192.168.50.95 \    --mysql-conf username=root \    --mysql-conf password=123456 \    --mysql-conf database-name=test \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hadoop2:9083 \    --table-conf bucket=4 \    --table-conf changelog-producer=input \    --table-conf sink.parallelism=4

运行后查看已经有对应整库的表了

image-20230807143057749

修改MySQL数据库new_users表中id为1的age字段的值从当前33改为43

image-20230807143418520

可以看到已经获取变更的数据

image-20230807143431515

希望该作业同步包含历史数据的表[order, custom];可以通过从作业的前一个快照中恢复,从而重用作业的现有状态来实现这一点。恢复的作业将首先对新添加的表进行快照,然后继续自动从以前的位置读取变更日志。

./bin/flink run \   --fromSavepoint savepointPath \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    mysql-sync-database \    --warehouse hdfs:///path/to/warehouse \    --database test_db \    --mysql-conf hostname=127.0.0.1 \    --mysql-conf username=root \    --mysql-conf password=123456 \    --mysql-conf database-name=source_db \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hive-metastore:9083 \    --table-conf bucket=4 \    --including-tables 'product|user|address|order|custom'

可以设置——mode组合,以启用同步新添加的表而无需重新启动作业。

--including-tables 'tbl.+'

通过将database-name设置为正则表达式,同步作业将捕获匹配数据库下的所有表,并将同名的表合并到一个表中。可以设置——merge-shards false来阻止合并碎片。同步表将被命名为’ databaseName_tableName ',以避免潜在的名称冲突。

Kafka

在上一篇我们已经将flink-sql-connector-kafka-1.17.1.jar拷贝到Flink的Lib目录。Flink提供了几种Kafka CDC格式:canal-json, debezium-json,ogg-json,maxwell-json。如果Kafka主题中的消息是使用更改数据捕获(CDC)工具从另一个数据库捕获的更改事件,那么您可以使用Paimon Kafka CDC。将解析后的INSERT、UPDATE、DELETE消息写入paimon表。也即是目前只支持CanalCDC,其他在未来应该会支持

image-20230807144558427

准备

[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

先安装canal

# 先下载canalwget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz# 解压tar -xvf canal.deployer-1.1.6.tar.gz

修改canal.properties和instance.properties两个配置文件,vim conf/canal.properties

canal.serverMode = kafkakafka.bootstrap.servers = kafka1:9092,kafka2:9092,kafka3:9092

vim conf/example/instance.properties

canal.instance.master.address=mysqlserver:3306canal.instance.dbUsername=canalcanal.instance.dbPassword=canal# mq configcanal.mq.topic=cc_test2

启动canal

# 启动canal./bin/startup.sh

修改MySQL的account数据库的account_tbl表的数据,先通过

image-20230807173244489

./kafka-console-producer.sh --broker-list kafka1:9092 --topic cc_test2

消费kafka的主题cc_test2数据成功如下,验证canal的配置正确。

{"data":[{"id":"1","user_id":"6","money":"100"}],"database":"account","es":1691400547000,"id":2,"isDdl":false,"mysqlType":{"id":"int","user_id":"varchar(255)","money":"int"},"old":[{"user_id":"5"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"user_id":12,"money":4},"table":"account_tbl","ts":1691400547176,"type":"UPDATE"}{"data":[{"id":"1","user_id":"7","money":"100"}],"database":"account","es":1691400566000,"id":3,"isDdl":false,"mysqlType":{"id":"int","user_id":"varchar(255)","money":"int"},"old":[{"user_id":"6"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"user_id":12,"money":4},"table":"account_tbl","ts":1691400566482,"type":"UPDATE"}

image-20230807173110048

  • 同步表:通过在Flink数据流作业中使用kafka-sync-table动作或直接通过Flink运行,用户可以将Kafka的一个主题中的一个或多个表同步到一个Paimon表中。
./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    kafka-sync-table \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table kafka_account_tbl_cdc \    --primary-keys id \    --kafka-conf properties.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 \    --kafka-conf topic=cc_test2 \    --kafka-conf properties.group.id=itxs \    --kafka-conf value.format=canal-json \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hadoop2:9083 \    --table-conf bucket=4 \    --table-conf changelog-producer=input \    --table-conf sink.parallelism=4

运行后可以到通过Kafka已经将对应表和数据都同步到

image-20230807173538278

如果指定的Paimon表不存在,则此操作将自动创建表。它的模式将从所有指定的Kafka主题的表中派生,它从主题中获得最早的非ddl数据解析模式。如果Paimon表已经存在,它的模式将与所有指定Kafka主题表的模式进行比较。

  • 同步库:通过在Flink数据流作业中使用KafkaSyncDatabaseAction或直接通过Flink运行,用户可以将多主题或一个主题同步到一个Paimon数据库。只有具有主键的表才会被同步。

此操作将为所有表构建单个合并接收器。对于每个要同步的Kafka主题的表,如果对应的Paimon表不存在,这个动作将自动创建表,并且它的模式将从所有指定的Kafka主题的表中派生。如果Paimon表已经存在,并且它的模式与从Kafka记录解析的模式不同,这个动作将尝试进行模式进化。

修改canal.mq.topic=cc_test3后重启启动canal

./bin/flink run \    ./lib/paimon-flink-action-0.5-20230804.002229-95.jar \    kafka-sync-database \    --warehouse hdfs://myns/paimon/hive \    --database test \    --table-prefix "ods_" \    --table-suffix "_cdc" \    --kafka-conf properties.bootstrap.servers=192.168.5.120:9092 \    --kafka-conf topic=cc_test3 \    --kafka-conf properties.group.id=itxs \    --kafka-conf scan.startup.mode=earliest-offset \    --kafka-conf value.format=canal-json \    --catalog-conf metastore=hive \    --catalog-conf uri=thrift://hadoop2:9083 \    --table-conf bucket=4 \    --table-conf changelog-producer=input \    --table-conf sink.parallelism=4

运行后,也可以看下后很多ods开发和cdc结尾的表,可以看到

image-20230807175320517

在MySQL的student表修改数据

image-20230807175508068

可以看到在sql-client中查询表已经捕获到最新变更的数据,至此基于Kafka通过MySQL多表已验证完毕。

select * from ods_student_cdc;

image-20230807175547388

支持schema变更

CDC摄取支持有限数量的模式更改,也即是可以自动同步表结构信息。目前,框架不能重命名表,删除列,所以rename table和drop COLUMN的行为将被忽略,rename COLUMN将添加一个新的列。当前支持的模式更改包括:

  • 添加列。

  • 修改列类型。更具体地说

    • 将字符串类型(char, varchar, text)转换为另一种长度更长的字符串类型
    • 从二进制类型(binary, varbinary, blob)转换为另一种长度更长的二进制类型
    • 从整数类型(tinyint, smallint, int, bigint)转换为另一个范围更大的整数类型
    • 从浮点类型(float, double)转换为另一种范围更大的浮点类型;
  • 本人博客网站IT小神 www.itxiaoshen.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/29229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RISC-V公测平台发布:如何在SG2042上玩转OpenMPI

About HS-2 HS-2 RISC-V通用主板是澎峰科技与合作伙伴共同研发的一款专为开发者设计的标准mATX主板&#xff0c;它预装了澎峰科技为RISC-V高性能服务器定制开发的软件包&#xff0c;包括各种标准bencmark、支持V扩展的GCC编译器、计算库、中间件以及多种典型服务器应用程序。…

C语言内嵌汇编

反编译&#xff08;二进制文件或者so库&#xff09; objdump --help objdump -M intel -j .text -ld -C -S out > out.txt #显示源代码同时显示行号, 代码段反汇编-M intel 英特尔语法-M x86-64-C:将C符号名逆向解析-S 反汇编的同时&#xff0c;将反汇编代码和源代码交替显…

机器学习深度学习——非NVIDIA显卡怎么做深度学习(坑点排查)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——数值稳定性和模型化参数&#xff08;详细数学推导&#xff09; &#x1f4da;订阅专栏&#xff1a;机器…

conda install 和pip install有什么区别?

本篇为分享贴&#xff0c;截图部分选自知乎&#xff0c;部分选自csdn&#xff0c;文字内容是结合自己实践进行总结。 环境引用的包在哪&#xff1f; 首先&#xff0c;一条命令&#xff1a; python -m site 这条命令可以定位引用的包在哪里 &#xff0c;当然也可以自己设置默认…

JavaWeb(9)——前端综合案例3(悬停显示下拉列表)

一、实例需求 ⌛ 实现类似百度首页的“一个简单的鼠标悬停显示的下拉列表效果”。 二、代码实现 ☕ <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><style>.dropdown-cont…

iframe 标签的作用是什么?用法是什么?属性有什么?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ iframe 标签是什么&#xff1f;⭐ iframe 标签的作用什么&#xff1f;⭐ iframe 标签的用法⭐ iframe 标签的属性⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你…

【单片机】51单片机,TLC2543,驱动程序,读取adc

TLC2543 是一款 12 位精密模数转换器 (ADC)。 1~9、11、12——AIN0&#xff5e;AIN10为模拟输入端&#xff1b; 15——CS 为片选端&#xff1b; 17——DIN 为串行数据输入端&#xff1b;&#xff08;控制字输入端&#xff0c;用于选择转换及输出数据格式&#xff09; 16——…

机器学习复习题

1 单选题 ID3算法、C4.5算法、CART算法都是&#xff08; &#xff09;研究方向的算法。 A . 决策树 B. 随机森林 C. 人工神经网络 D. 贝叶斯学习 参考答案&#xff1a;A &#xff08; &#xff09;作为机器学习重要算法之一&#xff0c;是一种利用多个树分类器进行分类和预测…

chatGPT能力培训,客户最关注的99个方向

前言&#xff1a; chatGPT的主要应用&#xff0c;包括文本生成、图像生成和图文关联三大核心方向&#xff1a; 用户的在实际的工作和学习过程中&#xff0c;最关心的内容&#xff0c;可以按照上述类别进行划分&#xff0c;我们总结了&#xff0c;相关的插头GPT能力培训的相关主…

DAY04_SpringMVC—SpringMVC简介PostMan和ApiFox工具使用SpringMVC请求与响应REST风格

目录 一 SpringMVC简介1 SpringMVC概述问题导入1.1 SpringMVC概述 2 入门案例问题导入2.0 回顾Servlet技术开发web程序流程2.1 使用SpringMVC技术开发web程序流程2.2 代码实现【第一步】创建web工程&#xff08;Maven结构&#xff09;【第二步】设置tomcat服务器&#xff0c;加…

【iOS安全】开启任意app的WebView远程调试

参考&#xff1a;https://mp.weixin.qq.com/s/bNKxQaVrPaXsZ5BPbsXy7w &#xff08;来自周智老师的公众号&#xff09; 概述 Safari 有一个内置的前端调试器&#xff0c; 在iPhone通过局域网或者USB连接MacBook 并启用Safari 远程调试之后&#xff0c;前端调试器默认情况下对…

构建Docker容器监控系统 (1)(Cadvisor +InfluxDB+Grafana)

目录 Cadvisor InfluxDBGrafana 1. Cadvisor 2.InfluxDB 3.Grafana 开始部署&#xff1a; 下载组件镜像 创建自定义网络 创建influxdb容器 创建数据库和数据库用户 创建Cadvisor 容器 准备测试镜像 创建granafa容器 访问granfana 添加数据源 Add data source 新建 …

java.sql.SQLFeatureNotSupportedException 问题及可能的解决方法

目录 问题 分析&#xff1a; 解决方法 问题 java.sql.SQLFeatureNotSupportedException 分析&#xff1a; 可能是你的 druid的maven依赖版本太低了&#xff0c;我的以前是1.1.16&#xff0c;就出现了异常&#xff01; 解决方法 把druid的maven依赖版本调高&#xff01; 运…

unity海康威视原生SDK拉取网络摄像头画面,并展示在一个Material上

原理是使用sdk获取视频流&#xff0c;格式为YUV&#xff0c;然后分离YUV通道到三张不同的Texture2D上&#xff0c;通过shader将三个通道重新输出为原始图像。 我将所用的各个部分已经整理成一个压缩包&#xff0c;免积分下载 压缩包结构如下 使用步骤 1 DLL:放在Plugins文件…

湘大oj1138爱你一生一世题解:最大公约数 逆向思维 int整除会向下取整

一、链接 爱你一生一世 二、题目 题目描述 在2013年1月4日&#xff0c;这个“爱你一生一世”的特别日子&#xff0c;男生都想向自己的喜欢的女生表达爱意。 你准备在该死的C语言考试后&#xff0c;去向她&#xff08;或者他&#xff1f;&#xff09;告白。告白怎么能缺了礼…

渗透攻击方法:原型链污染

目录 一、什么是原型链 1、原型对象 2、prototype属性 3、原型链 1、显示原型 2、隐式原型 3、原型链 4、constructor属性 二、原型链污染重现 实例 Nodejs沙箱逃逸 1、什么是沙箱&#xff08;sandbox&#xff09; 2、vm模块 一、什么是原型链 1、原型对象 JavaS…

不只是Axure,这5 个也能轻松画原型图!

在设计和开发过程中&#xff0c;原型图是一个至关重要的工具。它是将设计理念转化为可视化、交互式的形式&#xff0c;使团队成员和利益相关者更好地理解和评估产品的功能和用户体验。选择适合的软件工具对于画原型图至关重要&#xff0c;本文将介绍 5 种常用的画原型图软件&am…

spring 面试题

一、Spring面试题 专题部分 1.1、什么是spring? Spring是一个轻量级Java开发框架&#xff0c;最早有Rod Johnson创建&#xff0c;目的是为了解决企业级应用开发的业务逻辑层和其他各层的耦合问题。它是一个分层的JavaSE/JavaEE full-stack&#xff08;一站式&#xff09;轻量…

【LeetCode 75】第二十三题(2352)相等行列对

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码运行结果&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目很简洁&#xff0c;就是要我们寻找行与列相同的对数。相同行与列不仅是要元素相同&#xff0c;还需要顺序也一样&#xff08…

tomcat虚拟主机配置演示

一.新建用于显示的index.jsp文件&#xff0c;写入内容 二.修改tomcat/apache-tomcat-8.5.70/conf/server.xml配置文件 匹配到Host那部分&#xff0c;按上面格式在后面添加自己的域名和文件目录信息 主要是修改name和docBase 保存退出重启tomcat&#xff0c;确保tomcat运行…