从spark streaming与structured streaming看spark core与spark sql的区别

导读

Spark中针对流式数据处理的方案有:

  • Spark Streaming
  • Structured Streaming

本文通过对比spark streaming与structured streaming,来深入理解spark core与spark sql的区别。

Spark Streaming

基于微批(DStream)

Spark Streaming是基于微批(Micro batch)的,而Flink基于实时流,来一条数据处理一条数据

Spark Streaming是Spark生态系统当中一个重要的框架,它建立在Spark Core之上(而Structured Streaming则基于spark sql),只不过是划分了微批

具体来说,Spark Streaming将流式数据按照时间间隔BatchInterval划分为很多部分,每一部分Batch(批次),针对每批次数据Batch当做RDD进行快速分析和处理

核心概念是DStream,它实质上是一系列时间上连续的RDD的集合(Seq[RDD]),DStream可以按照、分等时间间隔将数据流进行批量的划分:首先从接收到流数据之后,将其划分为多个batch,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS或者数据库以及前端页面展示等等 ,如下图所示:

DStream中每批次数据RDD在处理时,各个RDD之间存在依赖关系,DStream之间也有依赖关系,RDD具有容错性,那么DStream也具有容错性

一个典型例子:

 

Spark Streaming的不足

  • 使用 Processing Time 而不是 Event Time
    • Spark Streaming是基于DStream模型的micro-batch模式,数据切割是基于Processing Time,这样就导致使用 Event Time 特别的困难。
  • Complex, low-level api
    • DStream(Spark Streaming 的数据模型)提供的API类似RDD的API,非常的low level;
    • 当编写Spark Streaming程序的时候,本质上就是要去构造RDD的DAG执行图,然后通过Spark Engine运行。这样导致一个问题是,DAG 可能会因为开发者的水平参差不齐而导致执行效率上的天壤之别
  • reason about end-to-end application
    • DStream 只能保证自己的一致性语义是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Straming 输出到外部存储的语义往往需要用户自己来保证;
  • 延迟较大
    • 对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~5秒钟之间,所以Spark Streaming能够满足流式准实时计算场景,对实时性要求非常高的如高频实时交易场景则不太适合。
  • 批流代码不统一
    • Streaming尽管是对RDD的封装,但是要将DStream代码完全转换成RDD还是有一点工作量的,更何况现在Spark的批处理都用DataSet/DataFrameAPI

Structured Streaming

Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎

 Spark SQL 执行引擎做了非常多的优化工作,比如执行计划优化、codegen、内存管理等。这也是Structured Streaming取得高性能和高吞吐的一个原因

Structured Streaming有两种执行模型:默认使用微批处理执行模型,还有一种低延迟的

默认的micro-batch processing

Structured Streaming默认使用微批处理执行模型。 这意味着Spark流式计算引擎会定期检查流数据源,并对自上一批次结束后到达的新数据执行批量查询。

最核心的思想就是将实时到达的数据看作是一个不断追加的unbound table无界表,到达流的每个数据项就像是表中的一个新行被附加到无边界的表中用静态结构化数据的处理查询方式进行流计算(而spark streaming则是微批)使用类似对于静态表的批处理方式来表达流计算,然后 Spark 以在无限表上的增量计算来运行。

Structured Streaming 周期性或者连续不断的生成微小dataset,然后交由Spark SQL的增量引擎执行,跟Spark Sql的原有引擎相比,增加了增量处理的功能,增量就是为了状态和流表功能实现。由于也是微批处理,底层执行也是依赖Spark SQL的。

在这个体系结构中,Driver驱动程序通过将记录偏移量保存到预写日志中来对数据处理进度设置检查点,然后可以使用它来重新启动查询。 需要注意的是,为了获得确定性的重新执行(deterministic re-executions)和端到端语义,在处理下一个微批数据之前,要将该微批数据中的偏移范围保存到日志中。 所以,当前到达的数据需要等待当前的微批处理作业完成,且其中数据的偏移量范围被计入日志后,才能在下一个微批作业中得到处理。 在细粒度上,时间线看起来像这样。

这会导致数据到达和得到处理并输出结果之间的延时超过100ms。

一个典型的计数例子:

微批的数据延时对于大多数实际的流式工作负载(如ETL和监控)已经足够了。然而,一些场景确实需要更低的延时,所以设计并构建了连续处理模式。

Continuous Processing Mode

在连续处理模式中,Spark不再是启动周期性任务,而是启动一系列连续读取,处理和写入数据的长时间运行的任务。 在高层次上,设置和记录级时间线看起来像这些(与上述微量批处理执行图相对照)。

由于事件在到达时会被立即处理和写入结果,所以端到端延迟只有几毫秒

此外,利用著名的Chandy-Lamport算法对查询进度设置检查点。 特殊标记的记录被注入到每个任务的输入数据流中; 我们将它们称为“时间代标记(epoch marker)”,并将它们之间的差距称为“时间代(epoch)”。当任务遇到标记时,任务异步报告处理后的最后偏移量给driver。 一旦driver程序接收到写入接收器的所有任务的偏移量,它就会将它们写入前述的预写日志。 由于检查点的设置是完全异步的,任务可以不间断地持续并提供一致的毫秒级延迟。

对比

在Spark2.3.0中,流数据的连续处理模式还是一种实验性功能,在此模式下支持Structured Streaming所支持的所有流数据源以及DataFrame / Dataset / SQL操作的子集。它支持低延迟(~1 ms)端到端,并保证at-least-once。与默认的微批处理引擎相比,默认的micro-batch processing可以保证exactly-once语义,但最多只能实现约100ms的延迟。


由于spark Streaming是基于spark core,而Structured Streaming是基于spark sql,所以我们自然而然想看下spark core与spark sql的区别,以及为什么会这么设计。

Spark Core

Spark Core实现了 Spark 的基本功能,包含RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。

其中关键是RDD,还记得上面说spark streaming是基于DStream,而Dsteam是一系列时间上连续的RDD集合RDD的转换和聚合算子都是高阶函数高阶函数指的是形参包含函数的函数,或是返回结果包含函数的函数对于这些高阶算子,开发者需要以Lambda函数的形式自行提供具体的计算逻辑。以map为例,我们需要明确对哪些字段做映射,以什么规则映射。再以filter为例,我们需要指明以什么条件在哪些字段上过滤。

比如:

// 创建一个 RDD
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))// 对 RDD 进行转换
val result = rdd.map(x => x * x).collect()

但这样一来,Spark只知道开发者要做map、filter,但并不知道开发者打算怎么做map和filter。也就是说,在RDD的开发模式下,Spark Core只知道“做什么”,而不知道“怎么做”。这会让Spark Core两眼一抹黑,除了把Lambda函数用闭包的形式打发到Executors以外,实在是没有什么额外的优化空间。

对于Spark Core来说,优化空间受限最主要的影响,莫过于让应用的执行性能变得低下,不同人开发的同一功能,性能可能天差地别。

 

Spark SQL

DataFrame

针对优化空间受限这个核心问题,Spark在2013年在1.3版本中发布了DataFrame。

DataFrame就是携带数据模式(Data Schema)的结构化分布式数据集,而RDD是不带Schema的分布式数据集。另外,RDD算子多是高阶函数,这些算子允许开发者灵活地实现业务逻辑,表达能力极强。而DataFrame的表达能力却很弱。一来,它定义了一套DSL(Domain Specific Language)算子,如select、filter、agg、groupBy等等。由于DSL语言是为解决某一类任务而专门设计的计算机语言,非图灵完备,因此,语言表达能力非常有限。二来,DataFrame中的绝大多数算子都是标量函数(Scalar Functions),它们的形参往往是结构化的数据列(Columns),表达能力也很弱。

例如:

// 创建一个 DataFrame
val df = spark.read.json("path/to/data.json")// 使用 SQL 查询 DataFrame
val result = df.select("name", "age").where("age > 18").collect()

DataFrame API为Spark引擎的内核优化打开了全新的空间。

首先,DataFrame中Schema所携带的类型信息,让Spark可以根据明确的字段类型设计定制化的数据结构,从而大幅提升数据的存储和访问效率

其次,DataFrame中标量算子确定的计算逻辑,让Spark可以基于启发式的规则和策略,甚至是动态的运行时信息去优化DataFrame的计算过程。

另外,为了支持DataFrame开发模式,Spark从1.3版本开始推出Spark SQL。Spark SQL的核心组件有二,其一是Catalyst优化器,其二是Tungsten

Catalyst:执行过程优化

Catalyst是Apache Spark的核心优化器,负责将用户查询计划转换为高效的执行计划。它是一个基于规则的优化器,使用一组规则来分析和转换查询计划,以提高性能。Catalyst的主要功能包括:

  • 语法解析:将SQL或其他查询语言解析为抽象语法树(AST)。
  • 逻辑优化:在AST级别进行优化,例如常量折叠、谓词下推和连接重写。
  • 物理优化:将逻辑计划转换为物理计划,选择最佳的执行策略,例如选择算法、连接顺序和分区策略。
  • 代码生成:将物理计划转换为高效的字节码,用于在分布式集群上执行查询。

而这一系列的优化,首先就需要结合DataFrame的Schema信息,确认计划中的表名、字段名、字段类型与实际数据是否一致。这个过程也叫做把“Unresolved Logical Plan”转换成“Analyzed Logical Plan”。也就是说,Catalyst的优化空间来源DataFrame的开发模式。

具体过程可以看spark的Catalyst到底做了什么-CSDN博客

为什么启发式的规则一定要先转为Analyzed Logical Plan?

因为启发式的规则例如剪枝谓词下推常量替换依赖于字段实际数据是一致如果不一致进行这些优化就是无效甚至导致错误结果因此必须先转为Analyzed Logical Plan这个过程结合DataFrame的Schema信息,确认计划中的表名、字段名、字段类型与实际数据是否一致

Tungsten:数据结构优化

Tungsten使用定制化的数据结构Unsafe Row来存储数据,Unsafe Row的优点是存储效率高、GC效率高Tungsten之所以能够设计这样的数据结构,仰仗的也是DataFrame携带的Schema。

Tungsten是用二进制字节序列来存储每一条用户数据的,因此在存储效率上完胜Java Object

要想实现上图中的二进制序列,Tungsten必须要知道数据条目的Schema才行。也就是说,它需要知道每一个字段的数据类型,才能决定在什么位置安放定长字段、安插Offset,以及存放变长字段的数据值。DataFrame刚好能满足这个前提条件。

虽然RDD也带类型,如RDD[Int]、RDD[(Int, String)],但如果RDD中携带的是开发者自定义的数据类型,如RDD[User]或是RDD[Product],Tungsten就会两眼一抹黑,完全不知道你的User和Product抽象到底是什么。RDD的通用性是一柄双刃剑,在提供开发灵活性的同时,也让引擎内核的优化变得无比困难。

总结

Spark Core实现了 Spark 的基本功能,包含RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。

Spark SQL是Spark 用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL操作数据。Spark SQL is Apache Spark’s module for working with structured data

spark core包含了最核心的分布式计算、数据处理,弹性分布式数据集 (RDD) ;但是rdd很多都是高阶函数(参数、返回类型都可以是函数,例如filter也可以用函数参数),开发者使用rdd的函数自由度很高,但是缺点是无法做计划优化(rdd是数据模式,不带schema),而且不同水平的人开发出来的性能差异很大

而spark sql是用dataframe/dataset(带schema的rdd)做抽象,都是简单的标量函数如select,filter,可以用catalyst和钨丝计划进行优化

联系

Spark Core 与 Spark SQL 的关系可以比作汽车的引擎和车身。Spark Core 是引擎,提供动力和功能,而 Spark SQL 是车身,提供用户界面和特定于领域的特性。

Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL)。即Spark SQL是Spark Core封装而来的,Spark SQL在Spark Core的基础上针对结构化数据处理进行很多优化和改进,

Spark SQL 支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取数据。当你使用SQL查询这些数据源中的数据并且只用到了一部分字段时,SparkSQL可以智能地只扫描这些用到的字段,而不是像SparkContext.hadoopFile中那样简单粗暴地扫描全部数据。

当然,最终spark sql编写后也会转换为rdd

目前所有子框架的源码实现都已从 RDD 切换到 DataFrame

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/726365.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.【Labview白话系列】Labview数组精讲

题主经过写文章一段时间的发现,许多同学对该软件的理解和编程能力是不太一样的,有些知识相对一些同学较为简单,但是有些同学提问就比较困难。那么针对这个问题,题主打算出一期说白话系列的专栏,在该栏目中用最通俗的大…

pycharm手动安装常用插件

下载插件 (1)下载地址:JetBrains Marketplace 这里以语言包为例子 2、中文语言包 进入pycharm中的设置,点击plugins,选从磁盘中安装插件

六、矩阵问题

73、矩阵置零(中等) 题目描述 给定一个 m x n 的矩阵,如果一个元素为 0 ,则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1: 输入:matrix [[1,1,1],[1,0,1],[1,1,1]] 输出&#xff1a…

应用案例 | Softing echocollect e网关助力汽车零部件制造商构建企业数据库,提升生产效率和质量

为了提高生产质量和效率,某知名汽车零部件制造商采用了Softing echocollect e多协议数据采集网关——从机器和设备中获取相关数据,并直接将数据存储在中央SQL数据库系统中用于分析处理,从而实现了持续监控和生产过程的改进。 一 背景 该企业…

【国家机关办公建筑 大型公共建筑的能耗监测、集中统一管理】安科瑞能耗监测系统整体解决方案

背景 为全面推进大型公建节能管理工作,需建立大型公建节能监管体系,逐步建立起全国联网的大型公建能耗监测平台,在大型公建安装分项计量装置,通过远程传输等手段及时采集分析能耗数据,实现对大型公建的实时动态监测、汇…

Docker数据卷的挂载

目录 1 概念 2 常用命令 3 操作步骤(主要讲在创建容器时的挂载) 3.1 挂载在默认目录 3.2 挂载在自定义目录 4 附加内容(查看容器的挂载情况) 1 概念 数据卷(volume)是一个虚拟目录,是容器内目录与宿主机目录之间映射的桥梁。这样容器内…

微服务day05-Gateway网关

Gateway网关 为了防止微服务能被任何身份的人访问,需要对访问微服务的人做身份认证和权限校验。网关的功能就是对访问用户进行身份认证和权限校验。网关具有3种功能: 身份验证和权限校验:网关作为微服务入口,需要校验用户是是否…

git 如何将多个提交点合并为一个提交点 commit

文章目录 核心命令详细使用模式总结示例 核心命令 git merge branch2 是将分支branch2的提交点合并到本地当前分支。 而在执行这条命令的时候,加一个选项--squash就表示在合并的时候将多个提交点合并为一个提交点。 git merge --squash branch2 先看squash单词的意…

React Hooks 完全指南:无类组件革命

目录 ​编辑 前言 Hooks的前世 函数组件 类组件 状态和生命周期的管理 Hooks用途以及相应代码 状态管理 用于生命周期管理和副作用操作的 Hooks 用于上下文管理的 Hooks 其他用途的 Hooks 前言 React Hooks 是在 React 16.8 版本中引入的一个非常强大的新特性&…

建筑外窗遮阳系数测试的太阳光模拟器

太阳光模拟器是一种用于测试建筑外窗遮阳系数的高科技设备。它能够模拟太阳光照射房屋的情景,帮助建筑师和设计师更好地了解建筑外窗的遮阳性能,从而提高建筑的能源效率和舒适度。 这种模拟器的工作原理非常简单,它通过使用高亮度的光源和精…

scrapy 爬虫:多线程爬取去微博热搜排行榜数据信息,进入详情页面拿取第一条微博信息,保存到本地text文件、保存到excel

如果想要保存到excel中可以看我的这个爬虫 使用Scrapy 框架开启多进程爬取贝壳网数据保存到excel文件中,包括分页数据、详情页数据,新手保护期快来看!!仅供学习参考,别乱搞_爬取贝壳成交数据c端用户登录-CSDN博客 最终…

Bee Mobile组件库重磅升级

Bee Mobile组件库重磅升级! 丰富强大的组件移动预览快速上手create-bee-mobile Bee Mobile组件库重磅升级! Bee Mobile组件库最新 v1.0.0 版本,支持最新的 React v18。 主页:Bee Mobile 丰富强大的组件 一共拥有50多个组件&…

Java面向对象总结 ( 知识点 | 代码详解 )

类和对象 什么是类? ● 概念:具有相同特征(同一类)事物的抽象描述,如人类,车类,学生类等。 类的结构: ● 变量: 事物属性的描述(名词) ● 方法: 事物的行为(可以做…

基于Flask的宠物领养系统的设计与实现

基于Flask的宠物领养系统的设计与实现 涉及技术:python3.10flaskmysql8.0 系统分为普通用户和管理员两种角色,普通用户可以浏览搜索宠物,申请领养宠物;管理员可以分布宠物信息,管理系统等。 采用ORM模型创建数据&am…

QT----云服务器部署Mysql,Navicat连接1698 -Access denied for user ‘root‘@‘‘

阿里云有活动,白嫖了一年的新加坡轻量级服务器,有点卡,有时候要开梯子 白嫖300元优惠券 目录 1 安装启动Mysql服务2 更改连接权限2.1 Navicat连接报错1698 -Access denied for user root 3 qt连接云服务器数据库 1 安装启动Mysql服务 我使用…

f5——>字符串三角

暴力破解,双层循环,注意复制到新列表用append,这样更不容易出错 格式还是“”.join(str)

Grid网格布局的基本使用

文章目录 什么是网格布局属性display 属性grid-row-gap 属性, grid-column-gap 属性, grid-gap 属性grid-template-areas 属性grid-auto-flow 属性justify-items 属性 , align-items 属性, place-items 属性justify-content 属性 …

Java高频面试之集合篇

Java 中常用的容器有哪些? ArrayList 和 LinkedList 的区别? ArrayList 是基于数组实现的,LinkedList 是基于链表实现的. ArrayList实现了RandomAccess接口,可基于下标访问. LinkedList 实现了Deque /dek/,可以当做双端队列使用. 插入效率对比 如果从头部…

Unity的滑动控制相机跟随和第三人称视角三

Unity的相机跟随和第三人称视角三 第三人称相机优化介绍讲解拖动事件相机逻辑人物移动逻辑总结 第三人称相机优化 Unity第三人称相机视角一 Unity第三人称相机视角二 介绍 之前相机视角讲过了两篇文章了,但是都是自动旋转视角,今天来了新需求&#xf…

支部管理系统微信小程序(管理端+用户端)flask+vue+mysql+微信小程序

系统架构如图所示 高校D支部管理系统 由web端和微信小程序端组成,由web端负责管理,能够收缴费用、发布信息、发布问卷、发布通知等功能 部分功能页面如图所示 微信小程序端 包含所有源码和远程部署,可作为毕设课设