Apache Paimon 使用 Kafka CDC 获取数据

a.依赖准备

flink-sql-connector-kafka-*.jar

b.支持的文件格式

Flink提供了几种Kafka CDC格式:Canal、Debezium、Ogg和Maxwell JSON。

如果Kafka的Topic中的消息是使用Change Data Capture(CDC)工具从另一个数据库捕获的change event,那么可以使用Paimon Kafka CDC,将INSERT、UPDATE、DELETE消息写入到paimon表中。

注意

JSON源可能缺少一些信息。例如,Ogg和Maxwell格式标准不包含字段类型;当将JSON源写入Flink Kafka Sink时,它只会保留数据和行类型并删除其他信息。

通常,debezium-json包含“schema”字段,Paimon将从中检索数据类型。确保debezium json具有此字段,否则Paimon将使用“STRING”类型。

  • 如果缺少字段类型,Paimon将默认使用“STRING”类型。
  • 如果缺少数据库名称或表名,则无法进行数据库同步,但仍然可以进行表同步。
  • 如果缺少主键,该作业可能会创建非主键表,可以在表synchronization中提交作业时设置主键。

c.同步表

在Flink DataStream作业中使用 KafkaSyncTableAction 或直接通过flink run,可以将Kafka的一个Topic中的一个或多个表同步到一个Paimon表中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \kafka_sync_table--warehouse <warehouse-path> \--database <database-name> \--table <table-name> \[--partition_keys <partition_keys>] \[--primary_keys <primary-keys>] \[--type_mapping to-string] \[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
ConfigurationDescription
–warehouseThe path to Paimon warehouse.
–databaseThe database name in Paimon catalog.
–tableThe Paimon table name.
–partition_keysThe partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”.
–primary_keysThe primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”.
–type_mappingIt is used to specify how to map MySQL data type to Paimon type. Supported options:“tinyint1-not-bool”: maps MySQL TINYINT(1) to TINYINT instead of BOOLEAN.“to-nullable”: ignores all NOT NULL constraints (except for primary keys). This is used to solve the problem that Flink cannot accept the MySQL ‘ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x’ operation.“to-string”: maps all MySQL types to STRING.“char-to-string”: maps MySQL CHAR(length)/VARCHAR(length) types to STRING.“longtext-to-bytes”: maps MySQL LONGTEXT types to BYTES.“bigint-unsigned-to-bigint”: maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won’t occur when using this option.
–computed_columnThe definitions of computed columns. The argument field is from Kafka topic’s table field name. See here for a complete list of configurations.
–kafka_confThe configuration for Flink Kafka sources. Each configuration should be specified in the format key=value. properties.bootstrap.servers, topic/topic-pattern, properties.group.id, and value.format are required configurations, others are optional.See its document for a complete list of configurations.
–catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
–table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

如果指定的Paimon表不存在,此操作将自动创建该表,它的结构将从所有指定的Kafka的Topic的表中派生出来,它从Topic中获取最早的非DDL数据解析模式。

如果Paimon表已经存在,其模式将与所有指定的Kafka的Topic表的模式进行比较。

示例1:

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \kafka_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--primary_keys pt,uid \--computed_column '_year=year(age)' \--kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \--kafka_conf topic=order \--kafka_conf properties.group.id=123456 \--kafka_conf value.format=canal-json \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4

如果启动同步作业时kafka的Topic中不包含消息,则必须在提交作业之前手动创建表,只能定义分区键和主键,剩下的列将由同步作业添加。

注意:在这种情况下,不应该使用-partition_keys或-primary_keys,因为这些键是在创建表时定义的,不能修改。此外,如果指定了计算列,还应该定义用于计算列的所有参数列。

示例2:如果想同步具有主键“id INT”的表,并且要计算分区键“part=date_format(create_time,yyyy-MM-dd)”,可以先创建这样的表(其他列可以省略)

CREATE TABLE test_db.test_table (id INT,                 -- primary keycreate_time TIMESTAMP,  -- the argument of computed column partpart STRING,            -- partition keyPRIMARY KEY (id, part) NOT ENFORCED
) PARTITIONED BY (part);

启动同步作业

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \kafka_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--computed_column 'part=date_format(create_time,yyyy-MM-dd)' \... (other conf)

d.同步数据库

通过在Flink DataStream作业中使用KafkaSyncDatabaseAction或直接通过flink run,可以将多个Topic或一个Topic同步到一个Paimon数据库中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \kafka_sync_database--warehouse <warehouse-path> \--database <database-name> \[--table_prefix <paimon-table-prefix>] \[--table_suffix <paimon-table-suffix>] \[--including_tables <table-name|name-regular-expr>] \[--excluding_tables <table-name|name-regular-expr>] \[--type_mapping to-string] \[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
ConfigurationDescription
–warehouseThe path to Paimon warehouse.
–databaseThe database name in Paimon catalog.
–ignore_incompatibleIt is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.
–table_prefixThe prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have “ods_” as prefix, you can specify “–table_prefix ods_”.
–table_suffixThe suffix of all Paimon tables to be synchronized. The usage is same as “–table_prefix”.
–including_tablesIt is used to specify which source tables are to be synchronized. You must use ‘|’ to separate multiple tables.Because ‘|’ is a special character, a comma is required, for example: ‘a|b|c’.Regular expression is supported, for example, specifying “–including_tables test|paimon.*” means to synchronize table ‘test’ and all tables start with ‘paimon’.
–excluding_tablesIt is used to specify which source tables are not to be synchronized. The usage is same as “–including_tables”. “–excluding_tables” has higher priority than “–including_tables” if you specified both.
–type_mappingIt is used to specify how to map MySQL data type to Paimon type. Supported options:“tinyint1-not-bool”: maps MySQL TINYINT(1) to TINYINT instead of BOOLEAN.“to-nullable”: ignores all NOT NULL constraints (except for primary keys). This is used to solve the problem that Flink cannot accept the MySQL ‘ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x’ operation.“to-string”: maps all MySQL types to STRING.“char-to-string”: maps MySQL CHAR(length)/VARCHAR(length) types to STRING.“longtext-to-bytes”: maps MySQL LONGTEXT types to BYTES.“bigint-unsigned-to-bigint”: maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won’t occur when using this option.
–kafka_confThe configuration for Flink Kafka sources. Each configuration should be specified in the format key=value. properties.bootstrap.servers, topic/topic-pattern, properties.group.id, and value.format are required configurations, others are optional.See its document for a complete list of configurations.
–catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
–table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

只有带有主键的表才会同步。

此操作将为所有表构建一个combined sink,对于要同步的每个Kafka Topic的表,如果相应的Paimon表不存在,此操作将自动创建该表,其模式将从所有指定的Kafka Topic的表中派生。

如果Paimon表已经存在,并且其模式与从Kafka记录中解析的模式不同,则此操作将尝试模式演变。

示例:从一个Kafka Topic同步到Paimon数据库。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \kafka_sync_database \--warehouse hdfs:///path/to/warehouse \--database test_db \--kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \--kafka_conf topic=order \--kafka_conf properties.group.id=123456 \--kafka_conf value.format=canal-json \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4

从多个Kafka Topic同步到Paimon数据库。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \kafka_sync_database \--warehouse hdfs:///path/to/warehouse \--database test_db \--kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \--kafka_conf topic=order\;logistic_order\;user \--kafka_conf properties.group.id=123456 \--kafka_conf value.format=canal-json \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/748008.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ChatGPT:突破写作限制,开启论文写作新境界

ChatGPT无限次数:点击直达 【引言】 在当今信息爆炸的时代&#xff0c;学术界对于高质量论文的需求日益增加。然而&#xff0c;对于许多研究者而言&#xff0c;论文写作是一个耗时且困难的过程。幸运的是&#xff0c;随着人工智能技术的进步&#xff0c;我们迎来了ChatGPT&…

C语言向C++过渡的基础知识(一)

目录 C关键字 C命名空间 命名空间的介绍 域作用限定符 命名空间的使用 C的输入以及输出 C中的缺省参数 缺省参数的介绍 缺省参数的使用 缺省参数的分类 全缺省参数 半缺省参数 C关键字 在C中&#xff0c;有63个关键字&#xff0c;而C语言只有32个关键字 asm do i…

鸿蒙车载原生开发,拓展新版图

一天内连发“五弹”、HiCar 4.0首次上车 华为鸿蒙狂扩“汽车朋友圈”-上游新闻 汇聚向上的力量 3月15日&#xff0c;在“华为云&华为终端云服务创新峰会2024”上&#xff0c;华为首批汽车行业伙伴广汽传祺、岚图汽车、零跑汽车、凯翼汽车加入鸿蒙生态合作&#xff0c;华为…

FPGA高端项目:FPGA基于GS2971+GS2972架构的SDI视频收发+OSD动态字符叠加,提供1套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐本博已有的 SDI 编解码方案本方案的SDI接收发送本方案的SDI接收图像缩放应用本方案的SDI接收纯verilog图像缩放纯verilog多路视频拼接应用本方案的SDI接收HLS图像缩放HLS多路视频拼接应用本方案的SDI接收HLS多路视频融合叠加应用本方案的S…

如何用 UDP 实现可靠传输?并以LabVIEW为例进行说明

UDP&#xff08;用户数据报协议&#xff09;本身是一个无连接的、不可靠的传输协议&#xff0c;它不提供数据包的到达确认、排序保证或重传机制。因此&#xff0c;如果要在UDP上实现可靠传输&#xff0c;就需要在应用层引入额外的机制。以下是一些常见的方法&#xff1a; 确认和…

在github上如何删除自己仓库里没用的项目

在GitHub上删除自己仓库里的无用项目可以通过以下步骤进行&#xff1a; 在GitHub上登录您的账号并进入要删除的仓库页面。 在仓库页面的右上角&#xff0c;点击“Settings”&#xff08;设置&#xff09;按钮。 在仓库设置页面的左侧导航栏中&#xff0c;点击“Options”&…

工业物联网平台在水务环保、暖通制冷、电力能源等行业的应用

随着科技的不断发展&#xff0c;工业物联网平台作为连接物理世界与数字世界的桥梁&#xff0c;正逐渐成为推动各行业智能化转型的关键力量。在水务环保、暖通制冷、电力能源等行业&#xff0c;工业物联网平台的应用尤为广泛&#xff0c;对于提升运营效率、降低能耗、优化管理等…

mac安装rust开发环境,使用brew安装和全局配置

mac下使用brew可以一键安装环境&#xff1a; brew install rustup 安装完成执行&#xff1a; rustup-init 按照提示配置即可&#xff1a; 出现&#xff1a; 想要全局生效&#xff1a; echo export PATH"$HOME/.cargo/bin:$PATH" >> ~/.bash_profile source…

【Unity+Vuforia】AR 发布安卓的设置

Player Settings > Resolution and Presentation > Default Orientation portrait Player Settings > Other Settings > Auto Graphics API 取消勾选 Player Settings > Other Settings > Graphics APIs 选择OpenGLES3删除其他的 Player Settings…

【elasticsearch实战】从零开始设计全站搜索引擎

业务需求 最近需要一个全站搜索的功能&#xff0c;我们的站点的特点是数据多源&#xff0c;即有我们本地数据库&#xff0c;也包含了第三方数据源&#xff0c;我们的数据类型除了网页&#xff0c;还包括了各种类型的文档&#xff0c;例如&#xff1a;doc、pdf、excel、ppt等格…

MapReduce的原理分析

1.概述 MapReduce的思想核心是“分而治之,先分再合”&#xff0c;适用于大量复杂任务处理场景(大规模数据处理场景)。 MapReduce分两个阶段: map阶段(分)&#xff1a;如果任何可以拆分并且没有依赖&#xff0c;那么就把复杂的任务拆分成小任务&#xff0c;拆分成小任务之后&a…

嵌入式学习day38 HTML

1.格式 <!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>中文测试。。。。</title> </head> <body> 这里是测试body测试内容。。。 </body> </ht…

Ubuntu 安装 KVM 虚拟化

1. Ubuntu 安装 KVM 虚拟化 KVM 是 Linux 内核中一个基于 hypervisor 的虚拟化模块&#xff0c;它允许用户在 Linux 操作系统上创建和管理虚拟机。 如果机器的CPU不支持硬件虚拟化扩展&#xff0c;是无法使用KVM(基于内核的虚拟机)直接创建和运行虚拟机的。此时最多只能使用…

flink重温笔记(十七): flinkSQL 顶层 API ——SQLClient 及流批一体化

Flink学习笔记 前言&#xff1a;今天是学习 flink 的第 17 天啦&#xff01;学习了 flinkSQL 的客户端工具 flinkSQL-client&#xff0c;主要是解决大数据领域数据计算避免频繁提交jar包&#xff0c;而是简单编写sql即可测试数据&#xff0c;文章中主要结合 hive&#xff0c;即…

初探文件包含漏洞

目录 1.什么是文件包含漏洞2.漏洞分类3.php中常见的文件包含函数4.文件包含漏洞的绕过方法4.1本地文件包含&#xff08;LFI&#xff09;绕过方法&#xff1a;4.2远程文件包含&#xff08;RFI&#xff09;绕过方法&#xff1a; 5.对于文件包含漏洞的防御措施 1.什么是文件包含漏…

SpringBoot3整合Elasticsearch8.x之全面保姆级教程

整合ES 环境准备 安装配置ES&#xff1a;https://blog.csdn.net/qq_50864152/article/details/136724528安装配置Kibana&#xff1a;https://blog.csdn.net/qq_50864152/article/details/136727707新建项目&#xff1a;新建名为web的SpringBoot3项目 elasticsearch-java 公…

Hive实现查询左表有右表没有的记录

工作中遇到这样一个场景&#xff0c;业务逻辑是&#xff1a;如果一个主体发生了某一问题&#xff0c;就不再统计该主体的其他问题。 思路&#xff1a;首先想到的方法就是not in方法&#xff0c;但是Hive并不不支持。那么使用left join对两个表进行连接&#xff0c;右表主键为空…

uploads-labs靶场(1-10关)

一、搭建环境: 下载upload-labs源代码 下载链接&#xff1a;https://codeload.github.com/c0ny1/upload-labs/zip/refs/heads/master 将压缩包解压后的文件名改为upload-labs&#xff0c;然后放入phpstudy\www目录下 二、关卡通关: 1、pass-01&#xff08;前端绕过&#xf…

B. Array Fix

思路&#xff1a;我们倒着看&#xff0c;首先判断以下当前元素有没有被操作过&#xff0c;被操作过的话&#xff0c;那么需要改为操作后的数&#xff0c;然后跟当前数的前一个数进行比较&#xff0c;如果a[i] < a[i - 1]的话&#xff0c;那么需要将a[i - 1]拆分&#xff0c;…

【SpringBoot】头条新闻项目实现CRUD登录注册

文章目录 一、头条案例介绍二、技术栈介绍三、前端搭建四、基于SpringBoot搭建项目基础架构4.1 数据库脚本执行4.2 搭建SprintBoot工程4.2.1 导入依赖:4.2.2 编写配置4.2.3 工具类准备 4.3 MybatisX逆向工程 五、后台功能开发5.1 用户模块开发5.1.1 jwt 和 token 介绍5.1.2 jwt…