Apache Paimon系列之:Append Table和Append Queue

Apache Paimon系列之:Append Table和Append Queue

  • 一、Append Table
  • 二、Data Distribution
  • 三、自动小文件合并
  • 四、Append Queue
  • 五、压缩
  • 六、Streaming Source
  • 七、Watermark Definition
  • 八、Bounded Stream

一、Append Table

如果表没有定义主键,则默认为追加表。

您只能以流式方式将完整记录插入到表中。此类表适合不需要流式更新的用例(例如日志数据同步)。

CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
);

二、Data Distribution

默认情况下,append table没有bucket的概念。它的作用就像一个 Hive 表。数据文件放置在分区下,可以在其中重新组织和重新排序以加快查询速度。

三、自动小文件合并

在流式写入作业中,如果没有bucket定义,则writer中不会进行压缩,而是使用Compact Coordinator扫描小文件并将压缩任务传递给Compact Worker。在流模式下,如果在flink中运行insert sql,拓扑将是这样的:

在这里插入图片描述
不用担心反压,压实永远不会反压。

如果将 write-only 设置为 true,Compact Coordinator 和 Compact Worker 将在拓扑中删除。

自动压缩仅在 Flink 引擎流模式下支持。您还可以通过 paimon 中的 flink 操作在 flink 中启动压缩作业,并通过 set write-only 禁用所有其他压缩。

四、Append Queue

在这种模式下,您可以将append table视为一个由bucket分隔的队列。同一个桶中的每条记录都是严格排序的,流式读取会严格按照写入的顺序将记录传输到下游。使用此模式,不需要进行特殊配置,所有数据都会以队列的形式放入一个桶中。您还可以定义bucket和bucket-key以实现更大的并行性和分散数据。

在这里插入图片描述

CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH ('bucket' = '8','bucket-key' = 'product_id'
);

五、压缩

默认情况下,sink节点会自动进行compaction来控制文件数量。以下选项控制压缩策略:

KeyDefaultTypeDescription
write-onlyfalseBoolean如果设置为 true,将跳过压缩和快照过期。此选项与专用紧凑作业一起使用。
compaction.min.file-num5Integer对于文件集 [f_0,…,f_N],满足 sum(size(f_i)) >= targetFileSize 触发追加表压缩的最小文件号。该值避免了几乎完整的文件被压缩,这是不划算的。
compaction.max.file-num50Integer对于文件集 [f_0,…,f_N],触发追加表压缩的最大文件数,即使 sum(size(f_i)) < targetFileSize。该值可以避免挂起太多小文件,从而降低性能。
full-compaction.delta-commits(none)Integer增量提交后将不断触发完全压缩。

六、Streaming Source

目前仅 Flink 引擎支持流式源行为。

Streaming Read Order

对于流式读取,记录按以下顺序生成:

  • 对于来自两个不同分区的任意两条记录
    • 如果 scan.plan-sort-partition 设置为 true,则首先生成分区值较小的记录。
    • 否则,将先产生分区创建时间较早的记录。
  • 对于来自同一分区、同一桶的任意两条记录,将首先产生第一条写入的记录。
  • 对于来自同一分区但两个不同桶的任意两条记录,不同的桶由不同的任务处理,它们之间没有顺序保证。

七、Watermark Definition

您可以定义读取 Paimon 表的水印:

CREATE TABLE t (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

给定的代码创建了一个名为"t"的表,它包含了三个列:“user”(BIGINT类型),“product”(STRING类型),和"order_time"(TIMESTAMP类型,精确到毫秒)。它还为"order_time"列定义了一个水印(WATERMARK),表示事件的时间戳早于水印的事件被认为是延迟事件,可以被丢弃。

在创建表之后,该代码启动了一个有界的流作业来读取"t"表中的数据。它使用TUMBLE函数将数据按照"order_time"列分组为固定大小的滚动窗口,窗口大小为10分钟。window_start和window_end表示每个窗口的起始和结束时间戳,COUNT函数用于计算每个窗口内不同用户的数量。结果按照window_start和window_end进行分组。

您还可以启用 Flink Watermark 对齐,这将确保没有源/拆分/分片/分区将其水印增加得远远超出其他部分:

KeyDefaultTypeDescription
scan.watermark.alignment.group(none)String一组用于对齐水印的源。
scan.watermark.alignment.max-drift(none)Duration在我们暂停从源/任务/分区进行消耗之前,对齐水印的最大漂移。

八、Bounded Stream

Streaming Source 也可以是有界的,您可以指定 scan.bounded.watermark 来定义有界流模式的结束条件,流读取将结束,直到遇到更大的水印快照。

快照中的水印是由writer生成的,例如,您可以指定kafka源并声明水印的定义。当使用此kafka源写入Paimon表时,Paimon表的快照将生成相应的水印,以便您在流式读取此Paimon表时可以使用有界水印的功能。

CREATE TABLE kafka_table (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

这段代码包含了几个步骤:

  • 创建一个名为"kafka_table"的表,该表包含了三个列:“user”(BIGINT类型),“product”(STRING类型),和"order_time"(TIMESTAMP类型,精确到毫秒)。它还为"order_time"列定义了一个水印(WATERMARK),表示事件的时间戳早于水印的事件被认为是延迟事件,可以被丢弃。该表的连接器(connector)被指定为"kafka",意味着数据将从Kafka中读取。
  • 启动一个流式插入作业。这个作业将从"kafka_table"中选择所有的数据,并插入到名为"paimon_table"的表中。
  • 启动一个有界的流作业来读取"paimon_table"表中的数据。该作业将返回"paimon_table"表中的所有数据。注释中的"scan.bounded.watermark"选项可以指定有界流作业的水印,用于确定数据的处理范围。
  • 总的来说,这段代码创建了一个从Kafka读取数据的表,并通过流式插入将数据插入到另一个表中。然后,通过有界流作业从目标表中读取数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/29011.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

离散数学复习

1.关系的介绍和性质 &#xff08;1&#xff09;序偶和笛卡尔积 两个元素按照一定的顺序组成的二元组就是序偶&#xff0c;使用尖括号进行表示&#xff0c;尖括号里面的元素一般都是有顺序的&#xff1b; 笛卡尔积就是有两个集合&#xff0c;从第一个集合里面选择一个元素&am…

github国内加速访问有效方法

这里只介绍实测最有效的一种方法&#xff0c;修改主机的Hosts文件&#xff0c;如果访问github网站慢或者根本无法访问的时候可以采用下面方法进行解决。 1、搜索一个IP查询网站 首先百度搜索选择一个IP查询的网站&#xff0c;这里我用下面这个网站&#xff08;如果该网站失效…

相约北京“信通院数据智能大会”

推动企业数智化转型发展&#xff0c;凝聚产业共识&#xff0c;引领行业发展方向&#xff0c;摩斯将参与信通院首届“数据智能大会”&#xff08;6月19-20日&#xff0c;北京&#xff09;。 本次大会设置多个主题论坛&#xff0c;将发布多项研究成果&#xff0c;分享产业最新实…

如何通过改善团队合作来提高招聘效率

当招聘顶尖人才时&#xff0c;时间就是一切。招聘效率取决于团队快速响应和完成任务的能力&#xff0c;但招聘经理和面试官并不总是最关心重要的招聘任务。更重要的是&#xff0c;求职者的经历取决于准备好的面试官是否准时出现。有时候最好的候选人会接受另一份工作&#xff0…

Spring Cloud Alibaba Nacos持久化配置

所谓的持久化就是将Nacos配置持久化存储到数据库里面&#xff0c;在0.7版本之前&#xff0c;在单机模式时nacos使用嵌入式数据库实现数据的存储&#xff0c;不方便观察数据存储的基本情况。0.7版本增加了支持mysql数据源能力。 ① 找到并执行sql脚本 这里路径为&#xff1a;n…

时间复杂度 空间复杂度分析

时间复杂度就是需要执行多少次&#xff0c;空间复杂度就是对象被创建了多少次。 O(1) < O(logn) < O(n) < O(nlogn) < O(n^2) < O(2^n) < O(n!) < O(n^n) 这里写目录标题 时间复杂度O(1)O(logn)、O(nlogn)O(mn)、O(m*n)最好、最坏情况时间复杂度平均情况…

32、循环语句while+until

一、循环控制语句 双层循环和循环语句的使用&#xff0c;while和until的语法使用 1、进入调试模式 在脚本里第一行写入set -x bash -x 脚本 1.1、echo 打印 continue&#xff1a;跳出当次&#xff0c;后续的条件成立&#xff0c;继续执行。 break&#xff1a;一旦break&am…

实时数仓Hologres V2.2发布,Serverless Computing降本20%

Highlight 新发布Serverless Computing&#xff0c;提升大任务稳定性&#xff0c;同时可降低20%计算成本 引擎性能优化&#xff0c;TPC-H 1TB测试相对V1.X 提升100% 实时湖仓加速架构升级&#xff0c;支持Paimon&#xff0c;直读ORC、Parquet数据性能提升5倍以上 新增实例监…

LLM中表格处理与多模态表格理解

文档处理中不可避免的遇到表格&#xff0c;关于表格的处理问题&#xff0c;整理如下&#xff0c;供各位参考。 问题描述 RAG中&#xff0c;对上传文档完成版式处理后进行切片&#xff0c;切片前如果识别文档元素是表格&#xff0c;那么则需要对表格进行处理。一般而言&#x…

JupyterLab使用指南(二):JupyterLab基础

第2章 JupyterLab基础 2.1 JupyterLab界面介绍 JupyterLab的用户界面非常直观和灵活。它包括文件浏览器、工作区、多标签页、命令面板和侧边栏等功能。以下是各个部分的详细介绍&#xff1a; 2.1.1 文件浏览器 文件浏览器位于界面左侧&#xff0c;用于导航和管理文件。你可…

白酒起源传说(三)——仪狄造酒说

在古代文献中&#xff0c;仪狄被记载为酿酒的始祖&#xff0c;这一创造更是与传说中的帝王夏禹相联系。据《吕氏春秋》记载&#xff1a;“仪狄作酒”。而在汉代刘向编纂的《战国策》中&#xff0c;也有着关于仪狄酿酒的故事。在这些古典文字里&#xff0c;仪狄被描述为一个致力…

计算机网络:网络层 - 虚拟专用网 VPN 网络地址转换 NAT

计算机网络&#xff1a;网络层 - 虚拟专用网 VPN & 网络地址转换 NAT 专用地址与全球地址虚拟专用网 VPN隧道技术 网络地址转换 NAT网络地址与端口号转换 NAPT 专用地址与全球地址 考虑到 IP 地址的紧缺&#xff0c;以及某些主机只需要和本机构内部的其他主机进行通信&…

【镜像制作】node.js+pm2的latest版镜像制作

文章目录 简介dockerfile代码 简介 本司的一些nodejs环境基本都运行在pm2的环境下&#xff0c;pm2是一个node环境下的多进程管理工具&#xff0c;通过pm2可以提升不少性能&#xff0c;管理起来也比较方便。在制作nodejspm2镜像时&#xff0c;建议指定版本进行安装&#xff0c;这…

cbsd创建ubuntu jail 时下载系统慢的问题解决

下载时速度慢 使用cbsd创建ubuntu jail的时候 cbsd jconstruct-tui 提示&#xff1a; no base dir in: /usr/jails/basejail/base_amd64_amd64_jammy Select base sources:0 .. CANCELa .. build b .. extract c .. pkg d .. repo 选了pkg没找到 fetch: https://pkg.convec…

【减法网络】Minusformer:通过逐步学习残差来改进时间序列预测

摘要 本文发现泛在时间序列(TS)预测模型容易出现严重的过拟合。为了解决这个问题&#xff0c;我们采用了一种去冗余的方法来逐步恢复TS的真实值。具体来说&#xff0c;我们引入了一种双流和减法机制&#xff0c;这是一种深度Boosting集成学习方法。通过将信息聚合机制从加法转…

Jenkins简要说明

Jenkins 是一个开源的持续集成和持续部署&#xff08;CI/CD&#xff09;工具&#xff0c;广泛用于自动化软件开发过程中的构建、测试和部署等任务。它是基于Java开发的&#xff0c;因此可以在任何支持Java的平台上运行&#xff0c;并且能够与各种操作系统、开发工具和插件无缝集…

【第16章】Vue实战篇之跨域解决

文章目录 前言一、浏览器跨域二、配置代理1.公共请求2.代理配置 总结 前言 前后端项目分离衍生出浏览器跨域问题&#xff0c;开发之前我们通过配置代理解决这个问题。 一、浏览器跨域 浏览器的跨域问题主要是由于浏览器的同源策略导致的。同源策略是浏览器的一个安全功能&…

OpenGL3.3_C++_Windows(11)

git submodule项目子模块 Git Submodule &#xff08;子模块的代码并不直接存储在父仓库中&#xff0c;而是通过一个指针来维护&#xff09;克隆含有子模块的仓库时&#xff0c;使用git管理Git Clone &#xff08;复制一份完整的Git仓库到本地&#xff09;若仓库包含子模块&am…

【设计模式-12】代理模式的代码实现及使用场景

&emsp&#xff1b;代理模式是一种应用很广发的结构性设计模式&#xff0c;它的设计初衷就是通过引入新的代理对象&#xff0c;在客户端和目标对象之间起到中介的作用&#xff0c;从而实现控制客户端对目标对象的访问&#xff0c;比如增强或者阉割某些能力。 1. 概述 代理模…

数据库-数据定义和操纵-DML语言的使用

为表的所有字段插入数据&#xff1a; INSERT INTO 表名 (字段名) VALUES (内容); 更新表中指定的内容: update语句三要素&#xff1a; 需要更新的表&#xff08;table&#xff09;名&#xff1b; 需要更新的字段&#xff08;column&#xff09;名和它的新内容&#xff08;valu…