Apache Paimon 使用 Postgres CDC 获取数据

a.依赖准备

flink-connector-postgres-cdc-*.jar

b.Synchronizing Tables(同步表)

在Flink DataStream作业中使用 PostgresSyncTableAction 或直接通过flink run,可以将PostgreSQL中的一个或多个表同步到一个Paimon表中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \postgres_sync_table--warehouse <warehouse_path> \--database <database_name> \--table <table_name> \[--partition_keys <partition_keys>] \[--primary_keys <primary_keys>] \[--type_mapping <option1,option2...>] \[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \[--metadata_column <metadata_column>] \[--postgres_conf <postgres_cdc_source_conf> [--postgres_conf <postgres_cdc_source_conf> ...]] \[--catalog_conf <paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]] \[--table_conf <paimon_table_sink_conf> [--table_conf <paimon_table_sink_conf> ...]]

配置信息如下

ConfigurationDescription
–warehouseThe path to Paimon warehouse.
–databaseThe database name in Paimon catalog.
–tableThe Paimon table name.
–partition_keysThe partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”.
–primary_keysThe primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”.
–type_mappingIt is used to specify how to map PostgreSQL data type to Paimon type. Supported options:“to-string”: maps all PostgreSQL types to STRING.
–computed_columnThe definitions of computed columns. The argument field is from PostgreSQL table field name. See here for a complete list of configurations.
–metadata_column–metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its document for a complete list of available metadata.
–postgres_confThe configuration for Flink CDC Postgres sources. Each configuration should be specified in the format “key=value”. hostname, username, password, database-name, schema-name, table-name and slot.name are required configurations, others are optional. See its document for a complete list of configurations.
–catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
–table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

如果指定的Paimon表不存在,将自动创建该表,表结构将从所有指定的PostgreSQL表中派生出来。

如果Paimon表已经存在,其表结构将与所有指定PostgreSQL表的结构进行比较。

示例1:将表同步到一个Paimon表中

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \postgres_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--primary_keys pt,uid \--computed_column '_year=year(age)' \--postgres_conf hostname=127.0.0.1 \--postgres_conf username=root \--postgres_conf password=123456 \--postgres_conf database-name='source_db' \--postgres_conf schema-name='public' \--postgres_conf table-name='source_table1|source_table2' \--postgres_conf slot.name='paimon_cdc' \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4

如示例所示,postgres_conf的表名支持正则表达式,以监控满足正则表达式的多个表。所有表的结构将合并到一个Paimon表结构中。

示例2:将分片的表同步到一个Paimon表中

使用正则表达式设置“schema-name”来捕获多个schemas。

典型场景:表“source_table”被拆分为模式“source_schema1”,“source_schema2”…,然后将所有“source_table”的数据同步到一个Paimon表中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \postgres_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--primary_keys pt,uid \--computed_column '_year=year(age)' \--postgres_conf hostname=127.0.0.1 \--postgres_conf username=root \--postgres_conf password=123456 \--postgres_conf database-name='source_db' \--postgres_conf schema-name='source_schema.+' \--postgres_conf table-name='source_table' \--postgres_conf slot.name='paimon_cdc' \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/746000.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

reduce()方法的应用

reduce() 是 JavaScript 数组&#xff08;Array&#xff09;对象的一个方法&#xff0c;它接收一个函数作为累加器&#xff08;accumulator&#xff09;&#xff0c;数组中的每个值&#xff08;从左到右&#xff09;开始缩减&#xff0c;最终为一个值。 reduce() 方法的基本语…

Redis 除了做缓存,还能做什么?

分布式锁&#xff1a;通过 Redis 来做分布式锁是一种比较常见的方式。通常情况下&#xff0c;我们都是基于 Redisson 来实现分布式锁。关于 Redis 实现分布式锁的详细介绍&#xff0c;可以看这篇文章&#xff1a;分布式锁详解open in new window 。限流&#xff1a;一般是通过 …

json-server 安装成功,查看版本直接报错。安装默认版本埋下的一个坑,和node版本不匹配

文章目录 一、作者的错误二、作者安装的过程三、版本问题的解决方式四、安装成功&#xff0c;显示命令不存在的解决思路五、安装失败的解决思路六、json-server运行命令参考文档 一、作者的错误 安装成功 错误原文 file:///C:/Users/ljj/AppData/Roaming/nvm/v14.18.1/node_g…

go语言基础笔记

1.基本类型 1.1. 基本类型 bool int: int8, int16, int32(rune), int64 uint: uint8(byte), uint16, uint32, uint64 float32, float64 string 复数&#xff1a;complex64, complex128 复数有实部和虚部&#xff0c;complex64的实部和虚部为32位&#xff0c;complex128的实部…

Vue首屏优化方案

在Vue项目中&#xff0c;引入到工程中的所有js、css文件&#xff0c;编译时都会被打包进vendor.js&#xff0c;浏览器在加载该文件之后才能开始显示首屏。若是引入的库众多&#xff0c;那么vendor.js文件体积将会相当的大&#xff0c;影响首屏的体验。可以看个例子&#xff1a;…

Unload-labs

function checkFile() {var file document.getElementsByName(upload_file)[0].value;if (file null || file "") {alert("请选择要上传的文件!");return false;}//定义允许上传的文件类型var allow_ext ".jpg|.png|.gif";//提取上传文件的类…

初见Dynamo2.13 for Revit2023~

Hello大家好&#xff01;我是九哥~ 今天我们来聊聊Dynamo2.13 for Revit有哪些新功能&#xff08;后台回复"Revit2013"获取&#xff09;~ 首先&#xff0c;Dynamo2.13版本其实早就发布了&#xff0c;官方博客更是花了三篇文章的篇幅来详细介绍&#xff0c;小伙伴…

C# EPPlus导出dataset----Excel1

仅限XLSX 2007以后版本(2007之前版本不支持) 目录 一、安装EPPlus程序包 二、全局参数 二、配置许可证

Hack The Box-Monitored

目录 信息收集 rustscan dirsearch WEB web信息收集 snmpwalk curl POST身份验证 漏洞探索 漏洞挖掘 sqlmap 登录后台 提权 get user get root 信息收集 rustscan ┌──(root㉿ru)-[~/kali/hackthebox] └─# rustscan -b 2250 10.10.11.248 --range0-65535 --…

今天我们来学习一下关于MySQL数据库

目录 前言: 1.MySQL定义&#xff1a; 1.1基础概念&#xff1a; 1.1.1数据库&#xff08;Database&#xff09;&#xff1a; 1.1.2表&#xff08;Table&#xff09;&#xff1a; 1.1.3记录&#xff08;Record&#xff09;与字段&#xff08;Field&#xff09;&#xff1a; …

C#,入门教程(27)——应用程序(Application)的基础知识

上一篇: C#,入门教程(26)——数据的基本概念与使用方法https://blog.csdn.net/beijinghorn/article/details/124952589 一、什么是应用程序 Application? 应用程序是编程的结果。一般把代码经过编译(等)过程,最终形成的可执行 或 可再用 的文件称为应用程序。可执行文…

IIFE函数

IIFE&#xff08;Immediately Invoked Function Expression&#xff09;是立即调用函数表达式的缩写。它是一种 JavaScript 函数执行方式&#xff0c;定义一个匿名函数并立即调用它&#xff0c;通常用于创建一个私有作用域以避免变量污染全局作用域。 (function() {var messag…

GaussDB数据库的索引管理

目录 一、引言 二、GaussDB数据库中的索引基本概念 1. 什么是GaussDB索引&#xff1f; 2. GaussDB索引的作用 三、GaussDB支持的索引类型 1. B-Tree索引 2. GIN索引 3. GiST索引 4. SP-GiST索引 四、创建和管理GaussDB索引 1. 创建索引 2. 删除索引 3. 索引的优化…

【AI论文阅读笔记】ResNet残差网络

论文地址&#xff1a;https://arxiv.org/abs/1512.03385 摘要 重新定义了网络的学习方式 让网络直接学习输入信息与输出信息的差异(即残差) 比赛第一名1 介绍 不同级别的特征可以通过网络堆叠的方式来进行丰富 梯度爆炸、梯度消失解决办法&#xff1a;1.网络参数的初始标准化…

RabbitMQ详解与常见问题解决方案

文章目录 什么是 RabbitMQ&#xff1f;RabbitMQ 和 AMQP 是什么关系&#xff1f;RabbitMQ 的核心组件有哪些&#xff1f;RabbitMQ 中有哪几种交换机类型&#xff1f;Direct Exchange(直连交换机)Topic Exchange(主题交换机)Headers Exchange(头部交换机)Fanout Exchange(广播交…

安装linux_centos7虚拟机_开启网络_ssh_防火墙

文章目录 安装linux_centos7虚拟机_开启网络_ssh_防火墙安装centos7虚拟机1. 进入VMware --> 点击文件 --> 新建虚拟机2. 选择典型 --> 选择下一步3. 选择--> 稍后安装操作系统4. 选择--> Linux --> CentOS 7 64位5. 在虚拟机名称输入(虚拟机名) --> 选择…

李三清研究引领力学定律新篇章,光子模型图揭秘

一周期内&#xff0c;垂直&#xff0c;曲率不变&#xff0c;方向转向互变&#xff0c;正向反向互变&#xff0c;左旋右旋互变。变无限粗或变无限厚才发生质变&#xff0c;且属于由内向外变换&#xff0c;所以对应变换就是由内点向外点变换。 由于方向转向不能分割&#xff0c;…

[数据集][目标检测]昆虫检测数据集VOC+YOLO格式1873张7类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;1873 标注数量(xml文件个数)&#xff1a;1873 标注数量(txt文件个数)&#xff1a;1873 标注…

【Vue2】组件通信

父子通信 父 -> 子 子 -> 父 props 校验 props: {校验的属性名: {type: 类型, // Number String Boolean ...required: true, // 是否必填default: 默认值, // 默认值validator (value) {// 自定义校验逻辑return 是否通过校验}} },data 的数据是自己的 → 随便改pr…

浅浅探索Memcached

一、NoSQL介绍 NoSQL是对 Not Only SQL、非传统关系型数据库的统称。 NoSQL一词诞生于1998年&#xff0c;2009年这个词汇被再次提出指非关系型、分布式、不提供ACID的数据库设计模式。 随着互联网时代的到来&#xff0c;数据爆发式增长&#xff0c;数据库技术发展日新月异&a…