数据湖Paimon入门指南

一、主键表(Primary Key Table)

Merge Engines

sink-upsert-realize可能会导致不正常的现象。当输入乱序时,我们建议您使用序列字段来纠正无序。建议设置为None

set table.exec.sink.upsert-materialize = NONE   

Deduplicate(重复数据消除)

如果用户建表时不指定 merge-engine 配置,创建的 PK 表默认的 Merge Engine 是 deduplicate 即只保留最新的记录,其他的同 PK 数据则被丢弃,如果最新的记录是 DELETE 记录,那么相同 PK 的所有数据都将被删除。

'merge-engine' = 'Deduplicate'  

Partial Update

partial-update 必须跟 lookup 或者 full-compaction changelog producer结合使用。Partial无法接收DELETE消息,可以将partial-update.ignore-delete配置为忽略delete消息。

如果用户建表时指定'merge-engine' = 'partial-update',那么就会使用部分更新表引擎,可以做到多个 Flink 流任务去更新同一张表,每条流任务只更新一张表的部分列,最终实现一行完整的数据的更新,对于需要拉宽表的业务场景,partial-update 非常适合此场景,而且构建宽表的操作也相对简单。这里所说的多个 Flink 流任务并不是指多个 Flink Job 并发写同一张 Paimon 表,这样需要拆分 Compaction 任务,就不能在每个 Job 的 Writer 端做 Compaction, 需要一个独立的 Compaction 任务,比较麻烦。目前推荐将多条 Flink 流任务 UNION ALL 起来,启动一个 Job 写 Paimon 表。这里需要注意的是,对于流读场景,partial-update 表引擎需要结合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,同时 partial-update 不能接收和处理 DELETE 消息,为了避免接收到 DELETE 消息报错,需要通过配置 'partial-update.ignore-delete' = 'true' 忽略 DELETE 消息。

--创建Partial update结果表  
CREATE TABLE if not EXISTS paimon.dw.order_detail  
(  `order_id` string   ,`product_type` string   ,`plat_name` string   ,`ref_id` bigint   ,`start_city_name` string   ,`end_city_name` string   ,`create_time` timestamp(3)  ,`update_time` timestamp(3)   ,`dispatch_time` timestamp(3)   ,`decision_time` timestamp(3)   ,`finish_time` timestamp(3)   ,`order_status` int   ,`binlog_time` bigint  ,PRIMARY KEY (order_id) NOT ENFORCED  
)   
WITH (  'bucket' = '20', -- 指定20个bucket  'bucket-key' = 'order_id',  'sequence.field' = 'binlog_time', -- 记录排序字段  'changelog-producer' = 'full-compaction',  -- 选择 full-compaction ,在compaction后产生完整的changelog  'changelog-producer.compaction-interval' = '2 min', -- compaction 间隔时间  'merge-engine' = 'partial-update',  'partial-update.ignore-delete' = 'true' -- 忽略DELETE数据,避免运行报错  
);  

 

Aggregation

如果用户建表时指定 'merge-engine' = 'aggregation',此时使用聚合表引擎,可以通过聚合函数做一些预聚合,每个除主键以外的列都可以指定一个聚合函数,相同主键的数据就可以按照列字段指定的聚合函数进行相应的预聚合,如果不指定则默认为 last-non-null-value ,空值不会覆盖。Agg 表引擎也需要结合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,需要注意的是除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配置 'fields.${field_name}.ignore-retract'='true' 忽略。

CREATE TABLE MyTable (  product_id BIGINT,  price DOUBLE,  sales BIGINT,  PRIMARY KEY (product_id) NOT ENFORCED  
) WITH (  'merge-engine' = 'aggregation',  'fields.price.aggregate-function' = 'max',  'fields.sales.aggregate-function' = 'sum'  
);  

change producer

Changelog 主要应用在流读场景。流式查询将不断产生最新的更改。这些更改可以来自底层表文件,也可以来自像Kafka这样的外部日志系统。与外部日志系统相比,表文件的更改成本较低,但延迟较高(取决于创建快照的频率)。

通过在创建表时指定变更日志生产者表属性,用户可以选择从文件中生成的变更模式。

目前数仓分层是在 Paimon 里做的,数据以 Table Format 的形式存储在文件系统上,如果下游的 Flink 任务要流读 Paimon 表数据,需要存储帮助生成 Changelog(成本较低,但延迟相对较高),以便下游流读的,这时就需要我们在建表时指定 Paimon 的 Changelog Producer 决定以何种方式在何时生成 Changelog。如果不指定则不会在写入 Paimon 表的时候生成 Changelog,那么下游任务需要在流读时生成一个物化节点来产生 Changelog。这种方式的成本相对较高,同时官方不建议这样使用,因为下游任务在 State 中存储一份全量的数据,即每条数据以及其变更记录都需要保存在状态中。

Paimon 支持的 Changelog Produer 包括:

none:如果不指定,默认就是 none,成本较高,不建议使用。

总之,'changelog- producer' = 'none' 最适合数据库系统这样的消费者。Flink还有一个内置的“normalize”操作符,它将每个键的值保持在状态中。可以很容易地看出,这种操作成本非常高,应该避免。

input:如果我们的 Source 源是业务库的 Binlog ,即写入 Paimon 表 Writer 任务的输入是完整的 Changelog,此时能够完全依赖输入端的 Changelog, 并且将输入端的 Changelog 保存到 Paimon 的 Changelog 文件,由 Paimon Source 提供给下游流读。通过配置 'changelog-producer' = 'input',将 Changelog Producer 设置为 input 。

通过指定'changelog-producer' = 'input',Paimon编写器将其输入作为完整变更日志的来源。所有输入记录将保存在单独的变更日志文件中,并由Paimon来源提供给消费者。最适合cdc或者flink有状态计算场景。

以下为一个flink cdc如湖的demo

EXECUTE CDCSOURCE cdc_demo WITH (  'connector' = 'mysql-cdc',  'hostname' = 'localhost',  'port' = '3306',  'username' = 'username',  'password' = 'password',  'checkpoint' = '30000',  'scan.startup.mode' = 'initial',  'source.server-time-zone' = 'Asia/Tokyo',  'parallelism' = '4',  'database-name' = 'demo',  'sink.connector' = 'sql-catalog',  'sink.catalog.name' = 'fts_hive',  'sink.catalog.type' = 'fts_hive',  'sink.catalog.uri' = 'thrift://localhost:9083',  'sink.bucket' = '4',  'sink.snapshot.time-retained' = '24h',  'table-list' = 'A01,A02,A03,A04,A05',  'sink.changelog-producer' = 'input',  'sink.catalog.warehouse' = 'hdfs://cluster/warehouse/table_store',  'sink.sink.db' = 'fts_ods_db_demo'  
);  lookup:如果我们的输入不是完整的 Changelog, 并且不想在下游流读时通过 Normalize 节点生成 Changelog, 通过配置 'changelog-producer' = 'lookup',通过 Lookup 的方式在数据写入的时候生成 Changelog,此 Changelog Produer 目前处于实验状态,暂未经过大量的生产验证。

 Paimon将在提交数据写入之前通过'lookup'生成变更日志。

详细:

数据湖paimon入门指南(sink,upsert) - AI牛丝

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/668576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ArrayList和LinkedList的区别是什么

ArrayList 和 LinkedList 分别代表了两类不同的数据结构:动态数组和链表。它们都实现了 Java 的 List 接口,但是有着各自独特的特点和性能表现。 1. 数据结构 ArrayList 是基于可调整大小的数组实现的。它允许快速随机访问,因为内部元素可通…

使用useRoutes提示invalid hook call

包版本&#xff1a; 问题&#xff1a; 今天用vitereactts重新搭建项目时报错 代码&#xff1a; router.tsx import { useRoutes } from react-router-dom; import Home from "../pages/home/index";const routers[{path: /,element: <Home/> } ] // const…

c++ 语法 运算符重载

不要滥用运算符重载 内置类型不能使用运算符重载 号运算符重载 #include <iostream> #include "mathutil.hpp" #include <string> #include "People.hpp" #include "Phone.hpp" using namespace std; class Fclass{ public:int…

jmeter设置关联

一、为什么要设置关联&#xff1f; http协议本身是无状态的&#xff0c;客户端只需要简单向服务器请求下载某些文件&#xff0c;无论是客户端还是服务端都不去记录彼此过去的行为&#xff0c;每一次请求之间都是独立的。如果jmeter需要设置跨线程组脚本&#xff0c;就必须设置…

机器学习本科课程 实验4 支持向量机

第一题&#xff1a;支持向量机的核函数 实验内容&#xff1a; 了解核函数对SVM的影响绘制不同核函数的决策函数图像简述引入核函数的目的 1. 导入模型 import numpy as np import matplotlib.pyplot as plt %matplotlib inline from matplotlib.colors import ListedColorm…

【LeetCode: 292. Nim 游戏+ 博弈问题】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

vue绘制语音波形图---wavesurfer.js

文章目录 创建实例optionsmethod接收Blob流 https://wavesurfer.xyz/ 创建实例 引入插件&#xff1a;import WaveSurfer from "wavesurfer.js"创建实例对象&#xff1a;this.wavesurfer WaveSurfer.create(options); <div id"waveform"><!-- t…

MySQL 小技巧:xtrabackup 软件包的下载及安装

案例&#xff1a;xtrabackup 软件包的下载及安装 软件包下载&#xff1a;Index of /percona/centos/7/RPMS/x86_64/ CentOS7 默认的数据库版本比较老,因此建议使用 xtrabackup 2.4 版本 // CentOS7 默认的数据库版本比较老,因此建议使用 xtrabackup 2.4 版本 // 安装 CentOS7 默…

性能评测|虚拟化和裸金属 K8s 哪个性能更好?

本文重点 整体而言&#xff0c;SKS&#xff08;虚拟机 Kubernetes&#xff09;可以达到裸金属 Kubernetes 性能的 82% – 96%&#xff0c;满足绝大部分场景下生产容器应用的性能需求。更多虚拟化与裸金属 Kubernetes 架构、特性、适用场景与性能对比&#xff0c;欢迎阅读文末电…

L1-040 最佳情侣身高差-java

输入样例&#xff1a; 2 M 1.75 F 1.8输出样例&#xff1a; 1.61 1.96 import java.util.Scanner;public class Main { public static void main(String[] args) {Scanner scanner new Scanner(System.in);int nscanner.nextInt();String[][] strnew String[n][2];for(int i…

自定义Dockerfile构建运行springboot

自定义Dockerfile构建运行springboot 通过dockerfile生成自定义nginx镜像 &#xff01;&#xff01;&#xff01;docker 必须在linux环境下才能进行如果你是window则需要装虚拟机 新建一个文件名字为Dockerfile&#xff0c;无需后缀 文件完整名就是Dockerfile,也可以自定义d…

相同的树[简单]

优质博文&#xff1a;IT-BLOG-CN 一、题目 给你两棵二叉树的根节点p和q&#xff0c;编写一个函数来检验这两棵树是否相同。如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1&#xff1a; 输入&#xff1a;p [1,2,3], q [1,…

敏捷软件研发管理流程- scrum

Leangoo领歌是一款永久免费的专业的敏捷开发管理工具&#xff0c;提供端到端敏捷研发管理解决方案&#xff0c;涵盖敏捷需求管理、任务协同、进展跟踪、统计度量等。 Leangoo领歌上手快、实施成本低&#xff0c;可帮助企业快速落地敏捷&#xff0c;提质增效、缩短周期、加速创新…

了解 WebSocket 和 TCP :有何不同

WebSocket — 双向通讯的艺术 简要概述 WebSocket 代表着WebSocket通讯协议&#xff0c;提供了一条用于客户端和服务器间实现实时、双向、全双工通信的渠道。在WebSocket引入之前&#xff0c;网页应用的数据更新依赖于频繁的轮询&#xff0c;这种做法不仅效率低下&#xff0c;…

[python] 过年燃放烟花

目录 新年祝福语 一、作品展示 二、作品所用资源 三、代码与资源说明 四、代码库 五、完整代码 六、总结 新年祝福语 岁月总是悄然流转&#xff0c;让人感叹时间的飞逝&#xff0c;转眼间又快到了中国传统的新年&#xff08;龙年&#xff09;。 回首过去&#xf…

大模型 AI Agent 详细介绍

"大模型 AI agent" 通常指的是基于大型预训练模型的人工智能助手或智能代理。这些 AI 代理利用了大规模的语言模型&#xff08;如 GPT-3、BERT、T5 等&#xff09;或其他类型的模型&#xff08;如图像识别模型、多模态模型等&#xff09;来模拟人类行为和决策过程。这…

异地办公必不可缺的远程控制软件,原理到底是什么?

目录 引言远程桌面连接软件的作用与重要性 基本概念与架构客户端-服务器模型网络通信协议 核心技术组件图形界面捕获与传输输入转发会话管理 性能优化策略带宽优化延迟优化 引言 远程桌面连接软件的作用与重要性 在当今这个高度数字化和网络化的时代&#xff0c;远程桌面连接软…

【动态规划】【状态压缩】【2次选择】【广度搜索】1494. 并行课程 II

作者推荐 视频算法专题 本文涉及知识点 动态规划汇总 状态压缩 广度优先搜索 LeetCode1494. 并行课程 II 给你一个整数 n 表示某所大学里课程的数目&#xff0c;编号为 1 到 n &#xff0c;数组 relations 中&#xff0c; relations[i] [xi, yi] 表示一个先修课的关系&am…

关于服务器解析A记录和CNAME记录的分析

内容提要: 大致讲下理解,dns域名解析这一块 0 . 问题来源 最近搞了一个七牛云上传,然后需要配置融合cdn加速,也就是可以加速域名,中间有一部需要CNAME 域名,也就是将七牛云提供的域名CNAME一下,查阅资料其实就是起一个别名,好访问而已. 方便我们访问云存储,达到加速的效果. …

论文阅读-一个用于云计算中自我优化的通用工作负载预测框架

论文标题&#xff1a;A Self-Optimized Generic Workload Prediction Framework for Cloud Computing 概述 准确地预测未来的工作负载&#xff0c;如作业到达率和用户请求率&#xff0c;对于云计算中的资源管理和弹性非常关键。然而&#xff0c;设计一个通用的工作负载预测器…