Spark Structured Streaming 分流或双写多表 / 多数据源(Multi Sinks / Writes)

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

在 Spark Structured Streaming 中,我们有时候需要将最后的处理结果分流或双写到多张表或多个数据源(Multi Sinks / Writes),一个典型的例子是:在 CDC 数据入湖场景里,一个 Kafka Topic 上存放着整库或多张表的 CDC 消息,使用 Spark 从 Kafka 中摄取这些消息后,需要根据消息中提供的数据库名和数据表名对 CDC 消息分流,然后写到数据湖上对应的 ODS 表中,这就是一种典型的“数据分流”场景。在 Spark Structured Streaming 中,实现多表 / 多数据源的分流或双写主要依赖 foreachBatchforeach 这两个方法,本文就围绕它们介绍一下分流或双写多表 / 多数据源的具体实现。

首先,要明确 foreachforeachBatch 都是 action,也就意味着使用它们时已经到了流的末端,绝大数情况下,就是要将记录写入目标数据源了,这也是foreachforeachBatch 这两个方法绝大多数的应用场景。通常,在 Spark 中将数据写入一个数据源是这样做的(以写 parquet 文件为例):

writeStream.format("parquet").option("path", "path/to/destination/dir").start()

由于 Spark 内置了 parquet 格式的 data writer, 所以我们只需填写一些相应的配置,就可以直接把 DF 按对应的格式写到目标位置了,那什么情况下我们要使用 foreachforeachBatch 呢?下面展开来介绍一下。

1. foreachBatch 的应用场景


大多数情况下,一条的流处理的 pipeline 都是从一个 Source 开始,中间经历各种处理后,最终写入了一个 Sink,但是,在某些场景下,我们流的重点可能需要写入的并不是一个 Sink,而是多个,典型的情形有:

  • 数据分流:需要将数据“分流”写入不同的数据源或数据表( 简单说就是 dispatch )

  • 数据多写:需要同时向多个下游数据源相同相同数据( 简单说就是 duplicate )

虽然我们可以非常“粗暴”地通过 for 循环构建多个 writer 实现上述两种典型的写入需求,但是这种做法会让每一个 sink 变成独立的 streaming query(作业),是代价很高的应对方法,并不实用。最好的做法就是通过 foreachBatch 来实现,实际上上面两种需求正是 foreachBatch 的典型应用场景。我们看一下 foreachBatch 的接口声明:

def foreachBatch(function: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]

我们需要为 foreachBatch 传入一个函数字面量,它有两参数,第一个对应一个 micro-batch 的 DataFrame, 第二个是这个 micro-batch 的 ID,拿到 micro-batch 的 DataFrame 后,我们可以在这个 DataFrame 上作相应的转换处理,最后调用现成的 writer 写入目标端。这里涉及到 Spark Streaming 的 Micro-Batch,也就是上述参数列表中的 Dataset[T] 类型的那个 DataFrame ,关于 Micro-Batch 在流上运行方式,下图给出了非常形象的描绘:

在这里插入图片描述

简单地说,Micro-Batch 模式下需要收集齐一定量(或一小段时间范围内)的数据,整理成一个 DataFrame 去处理,它的延迟是在秒级。上图下方时间轴上的每一小撮数据就是 foreachBatch 中传入的那个 DataFrame。

这里,我们特别澄清一个容易误解的地方: foreachBatch 是没有“循环”语义的,这里的 foreach 其实是意在针对每一个 micro-batch 的,不是空间维度上迭代多个 micro-batch, 而是时间维度上针对每一个流经的 micro-batch 进行处理。这里也能提现从 source 构建出的 DF 和这个方法里的 micro-batch 的 DF 的差异,前者是一个无界的 DF,本质上是一个流,更加“实体”的 DF 其实是 foreachBatch 中的这个 DF,它是较短时间内聚齐的“一小撮”数据,边界是确定的!

下面,我们针对分流和双写两种典型场景给出详细的示例代码。

1.1. 通过 foreachBatch 实现数据“分流”


我们以向两种不同的 Hudi 表写入数据为例,先将数据过滤,得到分流后的 DF,然后向对应的 Hudi 表中写入:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>// 分流 table_1 的数据并写入filteredDF1 = batchDF.filter(...)filteredDF1.write.format("hudi").option(TABLE_NAME, "table_1").mode(SaveMode.Append).save("/path/1")// 分流 table_2 的数据并写入filteredDF2 = batchDF.filter(...)filteredDF2.write.format("hudi").option(TABLE_NAME, "table_2").mode(SaveMode.Append).save("/path/2")
}

1.2. 通过 foreachBatch 实现数据“双写”


我们以向两种不同的数据源写入数据为例,可以调用多次 write 操作,但是,由于每次写入都会导致数据被 recomputed,流本身可能不再存在或状态发生了改变,所以,必须要在写入前使用 persist, 保证向下游多次写入的数据是完全一样,最后记得再执行一遍 unpersist 即可。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>batchDF.persist()batchDF.write.format("csv").save(...)  // location 1, 对应数据源的 data writer 是已存在的batchDF.write.format("hudi").save(...)  // location 2, 对应数据源的 data writer 是已存在的batchDF.unpersist()
}

2. foreach 的应用场景


foreachBatch 很实用,但是在如下两种场景下无法工作的:

  • 没有现成的支持目标数据源的 data writer;
  • 当前流运行于 continuous processing 模式,不支持 micro-batch

如果是上述情形,我们就得使用 foreach 了,因为 foreach 要自行实现对目标数据源的链接和读写,同时,它的自定义处理逻辑又是作用到每一行上的,所以它能解决上述两种场景的问题。某种程度上,foreach 相比 foreachBatch 是一种更底层的 API。使用 foreach 需要提供一个 ForeachWriter,实现 open, process, 和close 三个方法,不过要注意的是这三个方案的调用时机是不同的,open / close 显然是 per-partition 要调用一次的, proess 则是要针对每条记录进行处理的。以下是 一个自行实现 foreach 的代码模板:

streamingDatasetOfString.writeStream.foreach(// 没有现成的 DataStreamWriter,需要自行实现行级别的存储逻辑。new ForeachWriter[String] {// 在 partition 这个粒度上创建针对目标数据源的连接,这比较符合常规def open(partitionId: Long, version: Long): Boolean = {// Open connection}// 数据梳理逻辑会作用到记录级别,而不是 miro-batch 的 df 级别。def process(record: String): Unit = {// Write string to connection}// 关闭连接,释放资源def close(errorOrNull: Throwable): Unit = {// Close the connection}}
).start()

关于 foreach 更多信息可以参考官方文档,这里就不再深究了,大多数情况,我们更多使用的还是 foreachBatch

3. 小结


foreachforeachBatch 都能在向目标数据源写入数据时实现定制化的逻辑,它们之间的差别在于:

  • foreachBatch多应用于数据分流或双写场景,目标数据源往往是已经有线程的 data writer 了
  • foreach 则要自行实现对目标数据的连接和读写处理
  • 两者操纵数据的颗粒度不同,foreach 对数据的梳理逻辑(process 方法)作用到 DF 中的每一行上,而 foreachBatch 则直接操纵的是每一个 micro-batch 对应的 DF。

参考资料

  • Spark 关于 foreach 和 foreachBatch 的官方文档
  • Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/5492.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索潜力:中心化交易所平台币的对比分析

核心观点 平台币在过去一年里表现差异显著: 在过去的一年里,只有少数几个平台币如BMX、BGB和MX的涨幅超过了100%。相比之下,由于市值较高,BNB和OKB的涨幅相对较低。 回购和销毁机制在平台币价值中起决定性作用: 像M…

2024五一数学建模竞赛(五一赛)选题建议+初步分析

提示&#xff1a;DS C君认为的难度&#xff1a;B>A>C&#xff0c;开放度&#xff1a;AB<C。 以下为A-C题选题建议及初步分析&#xff1a; A题&#xff1a;钢板最优切割路径问题 l 难度评估&#xff1a;中等难度。涉及数学建模和优化算法&#xff0c;需要设计最优的…

前后端数据加密代码实战(vue3.4+springboot 2.7.18)

简述&#xff1a; 文章主要讲述了在vue3与springboot交互数据的个人使用的一个加密形式 SHA256不可逆加密AES对称加密RSA非对称加密 加密算法就不带大家深入了&#xff0c;对于它的使用文章中有明确的案例 数据加密的大概流程为&#xff1a;&#xff08;有更优秀的方案可以…

Springboot+Vue项目-基于Java+MySQL的入校申报审批系统(附源码+演示视频+LW)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &…

Python用KNN处理缺失值(4月30-5月1日)

首先试验KNN的简单示例代码 #方法3&#xff1a; # 本论文拟采用的填充缺失值的方法为KNN: import pandas as pd from sklearn.impute import KNNImputer #创建一个包含缺失值的数据集 data_KNN{第一列:[1,2,None,4,5],第二列:[3,None,5,7,9],第三列:[2,4,6,None,10] } dfpd.Da…

有哪些ai自动生成图片软件?AI绘画工具推荐

AI绘画工具是近年来快速发展的一种创新技术&#xff0c;它可以通过算法和机器学习技术来自动生成图片。那么又有有哪些ai自动生成图片软件呢&#xff1f;下面是小编给大家的AI绘画工具推荐。 一、爱制作AI 爱制作AI是一款多功能的人工智能助手&#xff0c;具备AI问答、AI写作、…

【FPGA】优化设计指南(一):设计原则

目录 避免采用不可综合的语句设计时采用同步的时钟组合逻辑与毛刺异步复位与同步复位动态分析与静态分析功能流水线时序违例乒乓操作面积和速度的平衡避免采用不可综合的语句 1.#1000延时语句 2.除法运算/,除非除数为2的整次幂 3.实数类型不可综合(real) 4.综上,使用可综合…

STM32进入睡眠模式的方法

#STM32进入睡眠模式的方法 今天学习了如何控制STM32进入睡眠模式&#xff0c;进入睡眠模式的好处就是省电&#xff0c;今天学习的只是浅度睡眠&#xff0c;通过中断就能唤醒。比如单片机在那一放&#xff0c;也许好几天好几个月都不用一次&#xff0c;整天的在那空跑while循环…

C#应用程序实现多屏显示

前言 随着业务发展&#xff0c;应用程序在一些特定场景下&#xff0c;只在一个显示器上展示信息已经不能满足用户需求。我们如何把主屏运行程序中多个窗体移动到各个扩展屏幕位置显示呢&#xff1f;C# 是通过什么方式来实现的&#xff0c;下面介绍 C# 使用 Screen 类的方式来实…

64、二分-搜索二维矩阵

思路&#xff1a; 通过使用二分方式&#xff0c;对于每行进行二分&#xff0c;因为每行的最后一个数小于下一行的第一个数&#xff0c;我们就可以依次二分。首先取出行数N&#xff0c;然后从0-N进行二分&#xff0c;如果mid最后一个数小于目标值说明0-mid中没有&#xff0c;舍弃…

jenkins转载文本

基于Docker容器DevOps应用方案 企业业务代码发布系统 一、企业业务代码发布方式 1.1 传统方式 以物理机或虚拟机为颗粒度部署部署环境比较复杂&#xff0c;需要有先进的自动化运维手段出现问题后重新部署成本大&#xff0c;一般采用集群方式部署部署后以静态方式展现 1.2 容…

鸿蒙开发接口Ability框架:【@ohos.ability.wantConstant (wantConstant)】

wantConstant wantConstant模块提供want中action和entity的权限列表的能力&#xff0c;包括系统公共事件宏&#xff0c;系统公共事件名称等。 说明&#xff1a; 本模块首批接口从API version 6开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导…

基于电磁激励原理利用视触觉传感器估计抓取力矩的方法

由于触觉感知能使机器人通过其触觉传递获取丰富的接触信息&#xff0c;触觉感知已经成为机器人机械臂的一种流行的感知方式。而在触觉传感器可获取的各种信息中&#xff0c;通过外界接触从抓取物体传递到机器人手指的力矩等信息&#xff0c;在完成各种指令的实现尤为重要。如图…

可重构柔性装配产线:AI边缘控制技术的崭新探索

在信息化和智能化浪潮的推动下&#xff0c;制造业正面临着前所未有的转型升级挑战。其中&#xff0c;可重构柔性装配产线以其独特的AI边缘控制技术&#xff0c;为制造业的智能化转型提供了新的解决方案。 可重构柔性装配产线是基于AI工业控制与决策平台打造的智能化生产系统。…

Spring Security介绍(三)过滤器(2)自定义

除了使用security自带的过滤器链&#xff0c;我们还可以自定义过滤器拦截器。 下面看下自定义的和security自带的执行顺序。 一、总结 1、自定义过滤器&#xff1a; 一般自定义fliter都是&#xff1a; import lombok.extern.slf4j.Slf4j; import org.springframework.ster…

QT - 创建Qt Widgets Application项目

在Qt中结合OpenGL使用&#xff0c;可以创建一个Qt Widgets应用程序项目。在创建项目时&#xff0c;您可以选择使用OpenGL模板来生成一个已经集成了OpenGL的项目。这个模板会自动帮助您集成OpenGL和Qt&#xff0c;并生成一个基本的OpenGL窗口。您可以在这个窗口中进行OpenGL的开…

闭嘴,如果你遇到偏执型人格!头脑风暴:王阳明心学向内求——早读(逆天打工人爬取热门微信文章解读)

看我极限头脑风暴 引言Python 代码第一篇 洞见 偏执型人格&#xff1a;跟谁在一起&#xff0c;谁痛苦第二篇 人民日报 来啦新闻早班车要闻社会政策 结尾 若天意未许晴好时&#xff0c; 勿将雨声作悲泣。 不向外界寻怨尤&#xff0c; 反求诸己养性灵。 引言 五一劳动节 第一天就…

C语言(操作符)1

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸各位能阅读我的文章&#xff0c;诚请评论指点&#xff0c;关注收藏&#xff0c;欢迎欢迎~~ &#x1f4a5;个人主页&#xff1a;小羊在奋斗 &#x1f4a5;所属专栏&#xff1a;C语言 本系列文章为个人学习笔记&#x…

区块链论文总结速读--CCF B会议 ICDCS 2023 共8篇

Conference&#xff1a;IEEE 43rd International Conference on Distributed Computing Systems (ICDCS) CCF level&#xff1a;CCF B Categories&#xff1a;Computer Architecture/Parallel and Distributed Computing/Storage Systems 计算机体系结构/并行与分布计算/存储…

【C语言】深入了解文件:简明指南

&#x1f308;个人主页&#xff1a;是店小二呀 &#x1f308;C语言笔记专栏&#xff1a;C语言笔记 &#x1f308;C笔记专栏&#xff1a; C笔记 &#x1f308;喜欢的诗句:无人扶我青云志 我自踏雪至山巅 文章目录 一、文件的概念1.1 文件名:1.2 程序文件和数据文件 二、数据文…