Iceberg从入门到精通系列之二十四:Spark Structured Streaming

Iceberg从入门到精通系列之二十四:Spark Structured Streaming

  • 一、Streaming Reads
  • 二、Streaming Writes
  • 三、Partitioned table
  • 四、流表的维护

Iceberg 使用 Apache Spark 的 DataSourceV2 API 来实现数据源和目录。 Spark DSv2 是一个不断发展的 API,在 Spark 版本中提供不同级别的支持。

一、Streaming Reads

Iceberg 支持处理从历史时间戳开始的 Spark 结构化流作业中的增量数据:

val df = spark.readStream.format("iceberg").option("stream-from-timestamp", Long.toString(streamStartTimestamp)).load("database.table_name")

Iceberg 仅支持从追加快照中读取数据。覆盖快照无法处理,默认会引发异常。通过设置streaming-skip-overwrite-snapshots=true 可以忽略覆盖。同样,删除快照默认会引发异常,通过设置streaming-skip-delete-snapshots=true可以忽略删除。

二、Streaming Writes

要将流式查询中的值写入 Iceberg 表,请使用 DataStreamWriter:

data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("checkpointLocation", checkpointPath).toTable("database.table_name")

如果您使用的是 Spark 3.0 或更早版本,则需要使用 .option(“path”, “database.table_name”).start(),而不是 .toTable(“database.table_name”)。

对于基于目录的 Hadoop 目录:

data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", "hdfs://nn:8020/path/to/table") .option("checkpointLocation", checkpointPath).start()

Iceberg 支持追加和完整输出模式:

  • append:将每个微批次的行追加到表中
  • complete:替换每个微批次的表内容

在开始流式查询之前,请确保您创建了表。请参阅 SQL 创建表文档以了解如何创建 Iceberg 表。

Iceberg 不支持实验性连续处理,因为它不提供“提交”输出的接口。

三、Partitioned table

Iceberg 需要在写入数据之前按每个任务的分区对数据进行排序。在 Spark 中,任务按 Spark 分区进行分割。针对分区表。对于批量查询,建议您进行显式排序来满足要求(请参阅此处),但该方法会带来额外的延迟,因为重新分区和排序被视为流工作负载的繁重操作。为了避免额外的延迟,您可以启用扇出编写器来消除这一要求。

data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("fanout-enabled", "true").option("checkpointLocation", checkpointPath).toTable("database.table_name")

扇出写入器打开每个分区值的文件,并且在写入任务完成之前不会关闭这些文件。避免使用扇出写入器进行批量写入,因为对输出行进行显式排序对于批量工作负载来说成本较低。

四、流表的维护

流式写入可以快速创建新的表版本,创建大量表元数据来跟踪这些版本。强烈建议通过调整提交率、使旧快照过期以及自动清理元数据文件来维护元数据。

调整提交率

高提交率会产生数据文件、清单和快照,从而导致额外的维护。建议触发间隔至少为 1 分钟,并根据需要增加间隔。

使旧快照过期

写入表的每个批次都会生成一个新快照。 Iceberg 跟踪表元数据中的快照,直到它们过期。快照会随着频繁提交而快速积累,因此强烈建议定期维护流式查询写入的表。快照过期是删除元数据和任何不再需要的数据文件的过程。默认情况下,该过程将使超过五天的快照过期。

压缩数据文件
从流处理写入的数据量通常很小,这可能会导致表元数据跟踪大量小文件。将小文件压缩为大文件可以减少表所需的元数据,并提高查询效率。 Iceberg 和 Spark 附带了 rewrite_data_files 过程。

重写清单
为了优化流工作负载的写入延迟,Iceberg 可以使用不会自动压缩清单的“快速”附加写入新快照。这可能会导致大量小的清单文件。 Iceberg可以重写清单文件的数量来提高查询性能。 Iceberg 和 Spark 附带了 rewrite_manifests 过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/664778.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx简单阐述及安装配置

目录 一.什么是Nginx 二.Nginx优缺点 1.优点 2.缺点 三.正向代理与反向代理 1.正向代理 2.反向代理 四.安装配置 1.添加Nginx官方yum源 2.使用yum安装Nginx 3.配置防火墙 4.启动后效果 一.什么是Nginx Nginx(“engine x”)是一个高性能的HTTP…

Linux Zip解压缩命令

Zip 用法 $ zip [-选项] [-b 路径] [-t 日期] [-n 后缀名] [压缩文件列表] [-xi 列表] 默认操作是添加或替换压缩文件列表中的压缩文件条目,压缩文件列表可以包括特殊名称 -,压缩标准输入数据 Zip 是一个创建和管理 zip 文件的压缩工具 Unzip 是一个用…

算法day10

算法day10 20 有效的括号1047 删除字符串中的所有相邻重复性150 逆波兰表达式求值 20 有效的括号 拿到这个题的想法,首先我在想我能不能用数组的操作来扫描做。后来想想,如果这样做那特判也太多了,不好做。然后第二个想法就是用栈来做&…

冒泡排序(Bubble Sort)、快速排序(Quick Sort)和归并排序(Merge Sort)

冒泡排序 冒泡排序是一种简单的排序算法,它重复地遍历要排序的列表,依次比较相邻两个元素,如果它们的顺序错误就交换它们。重复多次,直到没有任何一对数字需要交换为止,最终得到有序列表。 冒泡排序的时间复杂度为 O(…

机器学习-基础分类算法-KNN详解

KNN-k近邻算法 k-Nearest Neighbors 思想极度简单应用数学只是少效果好可以解释机器学习算法使用过程中的很多细节问题更完整的刻画机器学习应用的流程 创建简单测试用例 import numpy as np import matplotlib.pyplot as plt raw_data_X [[3.393533211, 2.331273381],[3.1…

ubuntu20.04安装sumo

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 有问题,请大家指出,争取使方法更完善。这只是ubuntu安装sumo的一种方法。一、注意事项1、首先明确你的ubuntu的用户名是什么 二、sumo安装1.…

1.3.。

1、无名管道:是一个特殊的文件,存储于内存中,不在文件系统中展示,适合亲缘进程间的通信; 2、有名管道:与无名管道类似,但该特殊文件能在文件系统中查看,并且时候亲缘和非亲缘进程间的…

基于springboot篮球论坛系统源码和论文

首先,论文一开始便是清楚的论述了系统的研究内容。其次,剖析系统需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确系统的需求。然后在明白了系统的需求基础上需要进一步地设计系统,主要包罗软件架构模式、整体功能模块、数据库设计。本项…

国家博物馆逆向抢票协议

逆向工程的具体步骤可以因项目和目标系统的不同而有所变化。然而,以下是一般逆向工程的一般步骤: 1. 分析目标系统:对待逆向的系统进行调研和了解,包括其架构、功能、使用的技术等方面的信息。 2. 反汇编或反编译:使…

保姆级CDN使用教程,解决可能出现的各种问题

如果你还没有注册雨云(rainyun),可以通过该链接进行注册雨云 - 新一代云服务提供商 通过该链接注册是可以白嫖到优惠券的,可以白嫖CDN。 比直接注册划算。 雨云的cdn在国内表现是非常不错的,而且不需要域名进行备案 在…

从编程中理解:大脑的并行处理与多任务

在编程领域,多线程并行处理是一个重要概念,它允许程序同时执行多个任务以提高效率。这一原理与大脑的并行处理和多任务能力有着异曲同工之妙。现在让我们用Unity C#代码结合金庸武侠小说中的角色来形象地展现这一点。 设想《天龙八部》中的主角段誉,在江湖中身负多种绝世武…

如何创建长期有效的文件二维码?支持多文件批量生成单独二维码

现在很多人为了避免文件通过文件有时效的问题,会将文件生成二维码之后来使用,将文件存入二维码中通过扫码调取云端的储存的数据来实现文件的预览或者下载。那么对于想要学习文件二维码制作的小伙伴,可以学习下面的方法来制作,利用…

Day25 Golang (回溯) 216.组合总和III 17.电话号码的字母组合

//02 216.组合总和III package mainimport "fmt"func combinationSum3(k int, n int) [][]int {// 全局变量path : []int{}res : [][]int{}// 回溯函数var backtracking func(targetSum, sum, k, startindex int)backtracking func(targetSum, sum, k, startindex i…

flask基于django大数据的证券股票分析系统python可视化大屏

证券分析系统采用B/S架构,数据库是MySQL。网站的搭建与开发采用了先进的Python进行编写,使用了Django框架。该系统从两个对象:由管理员和用户来对系统进行设计构建。主要功能包括:个人信息修改,对股票信息、股票买入、…

XML传参方式

export function groupLoginAPI(xmlData) {return http.post(/tis/group/1.0/login, xmlData, {headers: {Content-Type: application/xml,X-Requested-With: AAServer/4.0,}}) }import {groupLoginAPI} from "../api/user"; function (e) { //xml格式传参let groupX…

讲解selenium 获取href find_element_by_xpath

目录 讲解selenium获取href - find_element_by_xpath 什么是XPath? 使用find_element_by_xpath获取href Selenium的特点和优势 Selenium的应用场景 Selenium的核心组件 总结 讲解selenium获取href - find_element_by_xpath Selenium是一个常用的自动化测试工…

网络安全全栈培训笔记(60-服务攻防-中间件安全CVE复现WeblogicJenkinsGlassFish)

第60天 服务攻防-中间件安全&CVE复现&Weblogic&Jenkins&GlassFish 知识点: 中间件及框架列表: lIS,Apache,Nginx,Tomcat,Docker,Weblogic,JBoos,WebSphere,Jenkins, GlassFish,Jira,Struts2,Laravel,Solr,Shiro,Thinkphp,Sprng,Flask,jQuery 1、中间件-Web…

【C/C++ 10】扫雷小游戏

一、题目 写一个扫雷小游戏,每次输入一个坐标,若该处是地雷,则游戏失败,若该处不是地雷,则显示周围地雷数量,若扫除全部非地雷区域,则扫雷成功。 二、算法 设置两张地图(二维数组&…

chisel decoupled

Decoupled 即为接口包装一层valid 和 ready ,decoupled 默认方向为输出,如果需要输入,可以加.flip, Decoupled 可以直接调用Bundle或者Bits,Bundle 内的端口也应该用bits定义 注意: ready和valid不能组合耦合,否则可能…

C# SSH.NET 长命令及时返回

在SSH中执行长时间的命令,SSH.NET及时在文本框中返回连续显示结果。 c# - Execute long time command in SSH.NET and display the results continuously in TextBox - Stack Overflow 博主管理了一个服务器集群,准备上自动巡检工具,测试在…