Spark的DataFrame和Schema详解和实战案例Demo

1、概念介绍

Spark是一个分布式计算框架,用于处理大规模数据处理任务。在Spark中,DataFrame是一种分布式的数据集合,类似于关系型数据库中的表格。DataFrame提供了一种更高级别的抽象,允许用户以声明式的方式处理数据,而不需要关心底层数据的细节和分布式计算的复杂性。Schema在Spark中用于描述DataFrame中的数据结构,类似于表格中的列定义。

让我们分别介绍一下DataFrame和Schema:

DataFrame:

DataFrame是由行和列组成的分布式数据集合,类似于传统数据库或电子表格的结构。Spark的DataFrame具有以下特点:
分布式计算:DataFrame是分布式的,可以在集群中的多个节点上进行并行处理,以实现高性能的大规模数据处理。
不可变性:DataFrame是不可变的,这意味着一旦创建,就不能修改。相反,对DataFrame的操作会生成新的DataFrame。
延迟执行:Spark采用了延迟执行策略,即DataFrame上的操作并不立即执行,而是在需要输出结果时进行优化和执行。
用户可以使用SQL语句、Spark的API或Spark SQL来操作DataFrame,进行数据过滤、转换、聚合等操作。DataFrame的优势在于其易用性和优化能力,Spark会根据操作的执行计划来优化整个计算过程,以提高性能。

Schema:

Schema是DataFrame中数据的结构描述,它定义了DataFrame的列名和列的数据类型。在Spark中,Schema是一个包含列名和数据类型的元数据集合。DataFrame的Schema信息对于优化计算和数据类型的正确解释至关重要。
通常,Schema是在创建DataFrame时自动推断的,也可以通过编程方式显式指定。指定Schema的好处是可以确保数据被正确解释并且避免潜在的类型转换错误。如果数据源不包含Schema信息或者需要修改Schema,可以使用StructType和StructField来自定义Schema。例如,可以创建一个包含多个字段和数据类型的Schema,如字符串、整数、日期等。

在使用Spark读取数据源时,如CSV文件、JSON数据、数据库表等,Spark会尝试自动推断数据的Schema。如果数据源本身没有提供足够的信息,可以使用schema选项来指定或者通过后续的数据转换操作来调整DataFrame的Schema。

总结:DataFrame是Spark中一种强大的分布式数据结构,允许用户以声明式的方式处理数据,而Schema则用于描述DataFrame中数据的结构信息,确保数据被正确解释和处理。这两个概念共同构成了Spark强大的数据处理能力。

代码实战

package test.scalaimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}object TestSchema {def getSparkSession(appName: String, localType: Int): SparkSession = {val builder: SparkSession.Builder = SparkSession.builder().appName(appName)if (localType == 1) {builder.master("local[8]") // 本地模式,启用8个核心}val spark = builder.getOrCreate() // 获取或创建一个新的SparkSessionspark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别spark}def main(args: Array[String]): Unit = {println("Start TestSchema")val spark: SparkSession = getSparkSession("TestSchema", 1)val structureData = Seq(Row("36636", "Finance", Row(3000, "USA")),Row("40288", "Finance", Row(5000, "IND")),Row("42114", "Sales", Row(3900, "USA")),Row("39192", "Marketing", Row(2500, "CAN")),Row("34534", "Sales", Row(6500, "USA")))val structureSchema = new StructType().add("id", StringType).add("dept", StringType).add("properties", new StructType().add("salary", IntegerType).add("location", StringType))val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), structureSchema)df.printSchema()df.show(false)val row = df.first()val schema = row.schemaval structTypeList = schema.toListprintln(structTypeList.size)for (i <- 0 to structTypeList.size - 1) {val structType = structTypeList(i)println(structType.name, row.getAs(structType.name), structType.dataType, structType.dataType)}}
}

输出

Start TestSchema
Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
23/07/29 09:47:59 INFO SparkContext: Running Spark version 2.4.0
23/07/29 09:47:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
23/07/29 09:47:59 INFO SparkContext: Submitted application: TestSchema
23/07/29 09:47:59 INFO SecurityManager: Changing view acls to: Nebula
23/07/29 09:47:59 INFO SecurityManager: Changing modify acls to: Nebula
23/07/29 09:47:59 INFO SecurityManager: Changing view acls groups to:
23/07/29 09:47:59 INFO SecurityManager: Changing modify acls groups to:
23/07/29 09:47:59 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Nebula); groups with view permissions: Set(); users with modify permissions: Set(Nebula); groups with modify permissions: Set()
23/07/29 09:48:01 INFO Utils: Successfully started service ‘sparkDriver’ on port 60785.
23/07/29 09:48:01 INFO SparkEnv: Registering MapOutputTracker
23/07/29 09:48:01 INFO SparkEnv: Registering BlockManagerMaster
23/07/29 09:48:01 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/07/29 09:48:01 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/07/29 09:48:01 INFO DiskBlockManager: Created local directory at C:\Users\Nebula\AppData\Local\Temp\blockmgr-6f861361-4d98-4372-b78a-2949682bd557
23/07/29 09:48:01 INFO MemoryStore: MemoryStore started with capacity 8.3 GB
23/07/29 09:48:01 INFO SparkEnv: Registering OutputCommitCoordinator
23/07/29 09:48:01 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.
23/07/29 09:48:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://LAPTOP-PEA8R2PO:4040
23/07/29 09:48:01 INFO Executor: Starting executor ID driver on host localhost
23/07/29 09:48:01 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 60826.
23/07/29 09:48:01 INFO NettyBlockTransferService: Server created on LAPTOP-PEA8R2PO:60826
23/07/29 09:48:01 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/07/29 09:48:01 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, LAPTOP-PEA8R2PO, 60826, None)
23/07/29 09:48:01 INFO BlockManagerMasterEndpoint: Registering block manager LAPTOP-PEA8R2PO:60826 with 8.3 GB RAM, BlockManagerId(driver, LAPTOP-PEA8R2PO, 60826, None)
23/07/29 09:48:01 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, LAPTOP-PEA8R2PO, 60826, None)
23/07/29 09:48:01 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, LAPTOP-PEA8R2PO, 60826, None)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/12445.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MyBatis-Plus

1.入门案例 1.1 pom依赖引用 <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version> </dependency><dependency><groupId>mysql</grou…

Pytorch深度学习-----神经网络的基本骨架-nn.Module的使用

系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用&#xff08;ToTensor&#xff0c;Normalize&#xff0c;Resize &#xff0c;Co…

python爬虫基础入门——利用requests和BeautifulSoup

(本文是自己学习爬虫的一点笔记和感悟) 经过python的初步学习,对字符串、列表、字典、元祖、条件语句、循环语句……等概念应该已经有了整体印象,终于可以着手做一些小练习来巩固知识点,写爬虫练习再适合不过。 1. 网页基础 爬虫的本质就是从网页中获取所需的信息,对网…

ClickHouse使用场景和案列分析

目录 一、ClickHouse 概述1. ClickHouse简介2. ClickHouse 发展历程3. ClickHouse 特点 二、ClickHouse 架构1. 数据存储层&#xff1a;2. SQL 解析层&#xff1a;3. 查询执行层&#xff1a;4. 数据压缩层&#xff1a; 三、ClickHouse 性能优化1. 查询优化&#xff1a;2. 数据压…

QT:qInstallMessageHandler打印日志重定向

目录 1、qInstallMessageHandler含义 2、实例&#xff1a; 3、调试级别Q包含用于警告和调试文本的全局宏&#xff1a; 4、打印日志&#xff0c;如何使用 1、qInstallMessageHandler含义 &#xff08;1&#xff09;此函数在使用Qt消息处理程序之前已定义。返回一个指向前一…

Python代理模式介绍、使用

一、Python代理模式介绍 Python代理模式&#xff08;Proxy Pattern&#xff09;是一种结构型设计模式。在代理模式中&#xff0c;代理对象充当了另一个对象的占位符&#xff0c;以控制对该对象的访问。 代理对象和被代理对象实现了相同的接口&#xff0c;因此它们可以互相替代…

类加载机制与类加载器

点击下方关注我&#xff0c;然后右上角点击...“设为星标”&#xff0c;就能第一时间收到更新推送啦~~~ Java 源码是如何形成类文件的&#xff0c;类文件又是如何加载到虚拟机的&#xff0c;类加载有哪些机制和原则呢&#xff1f;本文将为大家一一介绍。 1 Java 源码形成类文件…

基于拉格朗日-遗传算法的最优分布式能源DG选址与定容(Matlab代码实现)

目录 1 概述 2 数学模型 2.1 问题表述 2.2 DG的最佳位置和容量&#xff08;解析法&#xff09; 2.3 使用 GA 进行最佳功率因数确定和 DG 分配 3 仿真结果与讨论 3.1 33 节点测试配电系统的仿真 3.2 69 节点测试配电系统仿真 4 结论 1 概述 为了使系统网损达到最低值&a…

AI Chat 设计模式:10. 组合模式

本文是该系列的第八篇&#xff0c;采用问答式的方式展开&#xff0c;问题由我提出&#xff0c;答案由 Chat AI 作出&#xff0c;灰色背景的文字则主要是我的一些思考和补充。 问题列表 Q.1 给我介绍一下组合模式A.1Q.2 好的&#xff0c;给我举一个组合模式的例子&#xff0c;使…

idea导入maven项目问题

问题产生原因&#xff1a; ①idea加载maven项目&#xff0c;如果网络不通畅&#xff0c;会在maven仓库中产生一个文件&#xff0c;如下图所示: ②当网络通畅时&#xff0c;在下载就会因为此文件导致无法下载正确的maven依赖 解决方案&#xff1a; ①打开maven仓库的根目录 ②…

SpringDataJpa 实体类—主键生成策略

主键配置 IdGeneratedValue(strategy GenerationType.IDENTITY)Column(name "cust_id")private Long custId;//主键 Id&#xff1a;表示这个注解表示此属性对应数据表中的主键GeneratedValue(strategy GenerationType.IDENTITY) 此注解表示配置主键的生成策…

Ubuntu 安装 Wireshark

Ubuntu 安装 Wireshark_ubuntu wireshark_iBlackAngel的博客-CSDN博客

ts中声明引入未使用的报错——解决方案

在编写ts项目的时候&#xff0c;经常会出现如下报错&#xff1a; 导入声明中的所有导入都未使用 这是因为导入的模块暂时没有使用&#xff0c;ts给的一个提示信息 解决方案&#xff1a; 在ts.config.json中 把noUnusedLocals 设置为false即可 {"compilerOptions"…

【雕爷学编程】Arduino动手做(175)---机智云ESP8266开发板模块4

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

【UE5 多人联机教程】03-创建游戏

效果 步骤 打开“UMG_MainMenu”&#xff0c;增加创建房间按钮的点击事件 添加如下节点 其中&#xff0c;“FUNL Fast Create Widget”是插件自带的函数节点&#xff0c;内容如下&#xff1a; “创建会话”节点指游戏成功创建一个会话后&#xff0c;游戏的其他实例即可发现&am…

微服务体系<1>

我们的微服务架构 我们的微服务架构和单体架构的区别 什么是微服务架构 微服务就是吧我们传统的单体服务分成 订单模块 库存模块 账户模块单体模块 是本地调用 从订单模块 调用到库存模块 再到账户模块 这三个模块都是调用的同一个数据库 这就是我们的单体架构微服务 就是…

微信小程序如何实现页面传参?

前言 只要你的小程序超过一个页面那么可能会需要涉及到页面参数的传递&#xff0c;下面我总结了 4 种页面方法。 路径传递 通过在url后面拼接参数&#xff0c;参数与路径之间使用 ? 分隔&#xff0c;参数键与参数值用 相连&#xff0c;不同参数用 & 分隔&#xff1b;如…

Training-Time-Friendly Network for Real-Time Object Detection 论文学习

1. 解决了什么问题&#xff1f; 目前的目标检测器很少能做到快速训练、快速推理&#xff0c;并同时保持准确率。直觉上&#xff0c;推理越快的检测器应该训练也很快&#xff0c;但大多数的实时检测器反而需要更长的训练时间。准确率高的检测器大致可分为两类&#xff1a;推理时…

银河麒麟安装mysql数据库(mariadb)-银河麒麟安装JDK-银河麒麟安装nginx(附安装包)

银河麒麟离线全套安装教程&#xff08;手把手教程&#xff09; 1.银河麒麟服务器系统安装mysql数据库&#xff08;mariadb&#xff09; 2.银河麒麟桌面系统安装mysql数据库&#xff08;mariadb&#xff09; 3.银河麒麟服务器系统安装JDK 4.银河麒麟桌面系统安装JDK 5.银河麒麟…

青大数据结构【2021】

一、单选&#xff08;17&#xff01;&#xff09; 根据中序遍历得到降序序列可以知道&#xff0c;每个结点的左子树的结点的值比该结点的值小&#xff0c;因为没有重复的关键字&#xff0c;所以拥有最大值的结点没有左子树。 二、简答 三、分析计算 四、算法分析 3.迪杰斯特拉…