Apache Paimon系列之:Append Table和Append Queue

Apache Paimon系列之:Append Table和Append Queue

  • 一、Append Table
  • 二、Data Distribution
  • 三、自动小文件合并
  • 四、Append Queue
  • 五、压缩
  • 六、Streaming Source
  • 七、Watermark Definition
  • 八、Bounded Stream

一、Append Table

如果表没有定义主键,则默认为追加表。

您只能以流式方式将完整记录插入到表中。此类表适合不需要流式更新的用例(例如日志数据同步)。

CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
);

二、Data Distribution

默认情况下,append table没有bucket的概念。它的作用就像一个 Hive 表。数据文件放置在分区下,可以在其中重新组织和重新排序以加快查询速度。

三、自动小文件合并

在流式写入作业中,如果没有bucket定义,则writer中不会进行压缩,而是使用Compact Coordinator扫描小文件并将压缩任务传递给Compact Worker。在流模式下,如果在flink中运行insert sql,拓扑将是这样的:

在这里插入图片描述
不用担心反压,压实永远不会反压。

如果将 write-only 设置为 true,Compact Coordinator 和 Compact Worker 将在拓扑中删除。

自动压缩仅在 Flink 引擎流模式下支持。您还可以通过 paimon 中的 flink 操作在 flink 中启动压缩作业,并通过 set write-only 禁用所有其他压缩。

四、Append Queue

在这种模式下,您可以将append table视为一个由bucket分隔的队列。同一个桶中的每条记录都是严格排序的,流式读取会严格按照写入的顺序将记录传输到下游。使用此模式,不需要进行特殊配置,所有数据都会以队列的形式放入一个桶中。您还可以定义bucket和bucket-key以实现更大的并行性和分散数据。

在这里插入图片描述

CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH ('bucket' = '8','bucket-key' = 'product_id'
);

五、压缩

默认情况下,sink节点会自动进行compaction来控制文件数量。以下选项控制压缩策略:

KeyDefaultTypeDescription
write-onlyfalseBoolean如果设置为 true,将跳过压缩和快照过期。此选项与专用紧凑作业一起使用。
compaction.min.file-num5Integer对于文件集 [f_0,…,f_N],满足 sum(size(f_i)) >= targetFileSize 触发追加表压缩的最小文件号。该值避免了几乎完整的文件被压缩,这是不划算的。
compaction.max.file-num50Integer对于文件集 [f_0,…,f_N],触发追加表压缩的最大文件数,即使 sum(size(f_i)) < targetFileSize。该值可以避免挂起太多小文件,从而降低性能。
full-compaction.delta-commits(none)Integer增量提交后将不断触发完全压缩。

六、Streaming Source

目前仅 Flink 引擎支持流式源行为。

Streaming Read Order

对于流式读取,记录按以下顺序生成:

  • 对于来自两个不同分区的任意两条记录
    • 如果 scan.plan-sort-partition 设置为 true,则首先生成分区值较小的记录。
    • 否则,将先产生分区创建时间较早的记录。
  • 对于来自同一分区、同一桶的任意两条记录,将首先产生第一条写入的记录。
  • 对于来自同一分区但两个不同桶的任意两条记录,不同的桶由不同的任务处理,它们之间没有顺序保证。

七、Watermark Definition

您可以定义读取 Paimon 表的水印:

CREATE TABLE t (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

给定的代码创建了一个名为"t"的表,它包含了三个列:“user”(BIGINT类型),“product”(STRING类型),和"order_time"(TIMESTAMP类型,精确到毫秒)。它还为"order_time"列定义了一个水印(WATERMARK),表示事件的时间戳早于水印的事件被认为是延迟事件,可以被丢弃。

在创建表之后,该代码启动了一个有界的流作业来读取"t"表中的数据。它使用TUMBLE函数将数据按照"order_time"列分组为固定大小的滚动窗口,窗口大小为10分钟。window_start和window_end表示每个窗口的起始和结束时间戳,COUNT函数用于计算每个窗口内不同用户的数量。结果按照window_start和window_end进行分组。

您还可以启用 Flink Watermark 对齐,这将确保没有源/拆分/分片/分区将其水印增加得远远超出其他部分:

KeyDefaultTypeDescription
scan.watermark.alignment.group(none)String一组用于对齐水印的源。
scan.watermark.alignment.max-drift(none)Duration在我们暂停从源/任务/分区进行消耗之前,对齐水印的最大漂移。

八、Bounded Stream

Streaming Source 也可以是有界的,您可以指定 scan.bounded.watermark 来定义有界流模式的结束条件,流读取将结束,直到遇到更大的水印快照。

快照中的水印是由writer生成的,例如,您可以指定kafka源并声明水印的定义。当使用此kafka源写入Paimon表时,Paimon表的快照将生成相应的水印,以便您在流式读取此Paimon表时可以使用有界水印的功能。

CREATE TABLE kafka_table (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

这段代码包含了几个步骤:

  • 创建一个名为"kafka_table"的表,该表包含了三个列:“user”(BIGINT类型),“product”(STRING类型),和"order_time"(TIMESTAMP类型,精确到毫秒)。它还为"order_time"列定义了一个水印(WATERMARK),表示事件的时间戳早于水印的事件被认为是延迟事件,可以被丢弃。该表的连接器(connector)被指定为"kafka",意味着数据将从Kafka中读取。
  • 启动一个流式插入作业。这个作业将从"kafka_table"中选择所有的数据,并插入到名为"paimon_table"的表中。
  • 启动一个有界的流作业来读取"paimon_table"表中的数据。该作业将返回"paimon_table"表中的所有数据。注释中的"scan.bounded.watermark"选项可以指定有界流作业的水印,用于确定数据的处理范围。
  • 总的来说,这段代码创建了一个从Kafka读取数据的表,并通过流式插入将数据插入到另一个表中。然后,通过有界流作业从目标表中读取数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/29011.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

离散数学复习

1.关系的介绍和性质 &#xff08;1&#xff09;序偶和笛卡尔积 两个元素按照一定的顺序组成的二元组就是序偶&#xff0c;使用尖括号进行表示&#xff0c;尖括号里面的元素一般都是有顺序的&#xff1b; 笛卡尔积就是有两个集合&#xff0c;从第一个集合里面选择一个元素&am…

github国内加速访问有效方法

这里只介绍实测最有效的一种方法&#xff0c;修改主机的Hosts文件&#xff0c;如果访问github网站慢或者根本无法访问的时候可以采用下面方法进行解决。 1、搜索一个IP查询网站 首先百度搜索选择一个IP查询的网站&#xff0c;这里我用下面这个网站&#xff08;如果该网站失效…

相约北京“信通院数据智能大会”

推动企业数智化转型发展&#xff0c;凝聚产业共识&#xff0c;引领行业发展方向&#xff0c;摩斯将参与信通院首届“数据智能大会”&#xff08;6月19-20日&#xff0c;北京&#xff09;。 本次大会设置多个主题论坛&#xff0c;将发布多项研究成果&#xff0c;分享产业最新实…

如何通过改善团队合作来提高招聘效率

当招聘顶尖人才时&#xff0c;时间就是一切。招聘效率取决于团队快速响应和完成任务的能力&#xff0c;但招聘经理和面试官并不总是最关心重要的招聘任务。更重要的是&#xff0c;求职者的经历取决于准备好的面试官是否准时出现。有时候最好的候选人会接受另一份工作&#xff0…

Spring Cloud Alibaba Nacos持久化配置

所谓的持久化就是将Nacos配置持久化存储到数据库里面&#xff0c;在0.7版本之前&#xff0c;在单机模式时nacos使用嵌入式数据库实现数据的存储&#xff0c;不方便观察数据存储的基本情况。0.7版本增加了支持mysql数据源能力。 ① 找到并执行sql脚本 这里路径为&#xff1a;n…

时间复杂度 空间复杂度分析

时间复杂度就是需要执行多少次&#xff0c;空间复杂度就是对象被创建了多少次。 O(1) < O(logn) < O(n) < O(nlogn) < O(n^2) < O(2^n) < O(n!) < O(n^n) 这里写目录标题 时间复杂度O(1)O(logn)、O(nlogn)O(mn)、O(m*n)最好、最坏情况时间复杂度平均情况…

32、循环语句while+until

一、循环控制语句 双层循环和循环语句的使用&#xff0c;while和until的语法使用 1、进入调试模式 在脚本里第一行写入set -x bash -x 脚本 1.1、echo 打印 continue&#xff1a;跳出当次&#xff0c;后续的条件成立&#xff0c;继续执行。 break&#xff1a;一旦break&am…

实时数仓Hologres V2.2发布,Serverless Computing降本20%

Highlight 新发布Serverless Computing&#xff0c;提升大任务稳定性&#xff0c;同时可降低20%计算成本 引擎性能优化&#xff0c;TPC-H 1TB测试相对V1.X 提升100% 实时湖仓加速架构升级&#xff0c;支持Paimon&#xff0c;直读ORC、Parquet数据性能提升5倍以上 新增实例监…

LLM中表格处理与多模态表格理解

文档处理中不可避免的遇到表格&#xff0c;关于表格的处理问题&#xff0c;整理如下&#xff0c;供各位参考。 问题描述 RAG中&#xff0c;对上传文档完成版式处理后进行切片&#xff0c;切片前如果识别文档元素是表格&#xff0c;那么则需要对表格进行处理。一般而言&#x…

JupyterLab使用指南(二):JupyterLab基础

第2章 JupyterLab基础 2.1 JupyterLab界面介绍 JupyterLab的用户界面非常直观和灵活。它包括文件浏览器、工作区、多标签页、命令面板和侧边栏等功能。以下是各个部分的详细介绍&#xff1a; 2.1.1 文件浏览器 文件浏览器位于界面左侧&#xff0c;用于导航和管理文件。你可…

计算机网络:网络层 - 虚拟专用网 VPN 网络地址转换 NAT

计算机网络&#xff1a;网络层 - 虚拟专用网 VPN & 网络地址转换 NAT 专用地址与全球地址虚拟专用网 VPN隧道技术 网络地址转换 NAT网络地址与端口号转换 NAPT 专用地址与全球地址 考虑到 IP 地址的紧缺&#xff0c;以及某些主机只需要和本机构内部的其他主机进行通信&…

cbsd创建ubuntu jail 时下载系统慢的问题解决

下载时速度慢 使用cbsd创建ubuntu jail的时候 cbsd jconstruct-tui 提示&#xff1a; no base dir in: /usr/jails/basejail/base_amd64_amd64_jammy Select base sources:0 .. CANCELa .. build b .. extract c .. pkg d .. repo 选了pkg没找到 fetch: https://pkg.convec…

【减法网络】Minusformer:通过逐步学习残差来改进时间序列预测

摘要 本文发现泛在时间序列(TS)预测模型容易出现严重的过拟合。为了解决这个问题&#xff0c;我们采用了一种去冗余的方法来逐步恢复TS的真实值。具体来说&#xff0c;我们引入了一种双流和减法机制&#xff0c;这是一种深度Boosting集成学习方法。通过将信息聚合机制从加法转…

【第16章】Vue实战篇之跨域解决

文章目录 前言一、浏览器跨域二、配置代理1.公共请求2.代理配置 总结 前言 前后端项目分离衍生出浏览器跨域问题&#xff0c;开发之前我们通过配置代理解决这个问题。 一、浏览器跨域 浏览器的跨域问题主要是由于浏览器的同源策略导致的。同源策略是浏览器的一个安全功能&…

OpenGL3.3_C++_Windows(11)

git submodule项目子模块 Git Submodule &#xff08;子模块的代码并不直接存储在父仓库中&#xff0c;而是通过一个指针来维护&#xff09;克隆含有子模块的仓库时&#xff0c;使用git管理Git Clone &#xff08;复制一份完整的Git仓库到本地&#xff09;若仓库包含子模块&am…

【设计模式-12】代理模式的代码实现及使用场景

&emsp&#xff1b;代理模式是一种应用很广发的结构性设计模式&#xff0c;它的设计初衷就是通过引入新的代理对象&#xff0c;在客户端和目标对象之间起到中介的作用&#xff0c;从而实现控制客户端对目标对象的访问&#xff0c;比如增强或者阉割某些能力。 1. 概述 代理模…

《优化接口设计的思路》系列:第1篇—什么是接口缓存

一、缓存的定义&#xff1a; 缓存是一种存储数据的技术&#xff0c;用于提高数据访问的速度和效率。缓存通常存储在内存中&#xff0c;因为内存访问速度远快于磁盘和网络。数据接口通常会使用缓存技术&#xff0c;以降低对后端数据存储和处理的压力&#xff0c;提高系统性能。…

⭐ ▶《强化学习的数学原理》(2024春)_西湖大学赵世钰 Ch3 贝尔曼最优公式 【压缩映射定理】

PPT 截取必要信息。 课程网站做习题。总体 MOOC 过一遍 1、视频 学堂在线 习题 2、过 电子书&#xff0c;补充 【下载&#xff1a;本章 PDF 电子书 GitHub 界面链接】 [又看了一遍视频] 3、总体 MOOC 过一遍 习题 学堂在线 课程页面链接 中国大学MOOC 课程页面链接 B 站 视频链…

c++qt合并两张灰度图像

需求&#xff1a;将两张尺寸相同的灰度图像进行合并&#xff0c;合并后的图像&#xff0c;每个像素点灰度值为两张原图对应像素点灰度值之和。若超过255&#xff0c;则最大为255。 方法一&#xff1a; 将图像读取为cv::Mat&#xff0c;再调用opencv的cv::add方法&#xff0c;进…

【ai】初识pytorch

初识PyTorch 大神的例子运行: 【ai】openai-quickstart 配置pycharm工程 简单例子初识一下Pytorch 好像直接点击下载比较慢? 大神的代码 在这个例子中,首先定义一个线性模型,该模型有一个输入特征和一个输出特征。然后定义一个损失函数和一个优化器,接着生成一些简单的线性…