Spark SQL概述与基本操作

目录

一、Spark SQL概述

        (1)概念

        (2)特点

        (3)Spark SQL与Hive异同

        (4)Spark的数据抽象

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        (2)代码演示

三、DataFrame创建

        (1)DataFrame组成

        (2)DataFrame创建方式(转换)

        (3)DataFrame创建方式(标准API读取)

四、DataFrame编程

        (1)DSL语法风格

        (2)SQL语法风格

五、Spark SQL——wordcount代码示例

        (1)pyspark.sql.functions包

        (2)代码示例


一、Spark SQL概述

        (1)概念

        Spark SQL是Apache Spark的一个模块,它用于处理结构化和半结构化的数据。Spark SQL允许用户使用SQL查询和操作数据,这种操作可以直接在Spark的DataFrame/Dataset API中进行。此外,Spark SQL还支持多种语言,包括Scala、Java、Python和R。

        (2)特点

        ①融合性:SQL可以无缝集成在代码中,随时用SQL处理数据。

        ②统一数据访问:一套标准API可读写不同的数据源。

        ③Hive兼容:可以使用Spark SQL直接计算生成Hive数据表。

        ④标准化连接:支持标准化JDBC \ ODBC连接,方便和各种数据库进行数据交互。

        (3)Spark SQL与Hive异同

        共同点:Hive和Spark均是:“分布式SQL计算引擎”,均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。

        (4)Spark的数据抽象

        Spark SQL的数据抽象:

        Data Frame与RDD:

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        在RDD阶段,程序的执行入口对象是:SparkContext。在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。

        Spark Session对象作用:

        ①用于SparkSQL编程作为入口对象。

        ②用于SparkCore编程,可以通过Spark Session对象中获取到Spark Context。

        (2)代码演示
# cording:utf8# Spark Session对象的导包,对象是来自于pyspark.sql包中
from pyspark.sql import SparkSession
if __name__ == '__main__':# 构建Spark Session执行环境入口对象spark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()# 通过Spark Session对象 获取SparkContext对象sc = spark.sparkContext# SparkSQL测试df = spark.read.csv('../input/stu_score.txt', sep=',', header=False)df2 = df.toDF('id', 'name', 'score')# 打印表结构# df2.printSchema()# 打印数据内容# df2.show()df2.createTempView('score')# SQL风格spark.sql("""SELECT * FROM score WHERE name='语文' LIMIT 5""").show()# DSL 风格df2.where("name='语文'").limit(5).show()

三、DataFrame创建

        (1)DataFrame组成

        DataFrame是一个二维表结构,表格结构的组成:

                ①行

                ②列

                ③表结构描述

        比如,在MySQL中的一个表:

                ①有许多列组成

                ②数据也被分为多个列

                ③表也有表结构信息(列、列名、列类型、列约束等)

        基于这个前提下,DataFrame的组成如下:

                在结构层面:

                        ①StructType对象描述整个DataFrame的表结构

                        ②StructField对象描述一个列的信息

                在数据层面:

                        ①Row对象记录一行数据

                        ②Column对象记录一列数据并包含列的信息

        (2)DataFrame创建方式(转换)

        ①基于RDD方式

# cording:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 构建执行环境对象Spark Sessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()# 构建SparkContextsc = spark.sparkContext# 基于RDD转换为DataFramerdd = sc.textFile('../input/people.txt').\map(lambda x: x.split(',')).\map(lambda x: (x[0], int(x[1])))# 构建DataFrame对象# 参数1,被转换的RDD# 参数2,指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可df = spark.createDataFrame(rdd,schema=['name', 'age'])# 打印Data Frame的表结构df.printSchema()# 打印df中的数据# 参数1,表示 展示出多少条数据,默认不传的话是20# 参数2,表示是否对列进行截断,如果列的数据长度超过20个字符串长度,厚旬欸日不显示,以....代替# 如果给False 表示不截断全部显示,默认是Truedf.show(20,False)# 将DF对象转换成临时视图表,可供sql语句查询df.createOrReplaceTempView('people')spark.sql('SELECT * FROM people WHERE age < 30').show()

        ②通过StructType对象来定义DataFrame的 ‘ 表结构 ’ 转换RDD

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':# 构建执行环境对象Spark Sessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()# 构建SparkContextsc = spark.sparkContext# 基于RDD转换为DataFramerdd = sc.textFile('../input/people.txt').\map(lambda x: x.split(',')).\map(lambda x: (x[0], int(x[1])))# 构建表结构的描述对象:StructType 对象# 参数1,列名# 参数2,列数据类型# 参数3,是否允许为空schema = StructType().add('name', StringType(), nullable=True).\add('age', IntegerType(), nullable=False)# 构建DataFrame对象# 参数1,被转换的RDD# 参数2,指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可df = spark.createDataFrame(rdd, schema=schema)df.printSchema()df.show()

        ③通过RDD的toDF方法创建RDD

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':# 构建执行环境对象Spark Sessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()# 构建SparkContextsc = spark.sparkContext# 基于RDD转换为DataFramerdd = sc.textFile('../input/people.txt').\map(lambda x: x.split(',')).\map(lambda x: (x[0], int(x[1])))# toDF构建DataFrame# 第一种构建方式,只能设置列名,列类型靠RDD推断,默认允许为空df1 = rdd.toDF(['name', 'name'])df1.printSchema()df1.show()# toDF方式2:通过StructType来构造# 设置全面,能设置列名、列数据类型、是否为空# 构建表结构的描述对象:StructType 对象# 参数1,列名# 参数2,列数据类型# 参数3,是否允许为空schema = StructType().add('name', StringType(), nullable=True).\add('age', IntegerType(), nullable=False)df2 = rdd.toDF(schema=schema)df2.printSchema()df2.show()

        ④基于Pandas的DataFrame创建DataFrame

# cording:utf8from pyspark.sql import SparkSession
import pandas as pdif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 基于pandas的DataFrame构建SparkSQL的DataFrame对象pdf = pd.DataFrame({'id': [1, 2, 3],'name': ['张大仙', '王晓晓', '吕不韦'],'age': [1, 2, 3]})df = spark.createDataFrame(pdf)df.printSchema()df.show()

        (3)DataFrame创建方式(标准API读取)

        统一API示例代码:

        ①读取本地text文件

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 构建StructType,text数据源,# text读取数据的特点是:将一整行只作为一个列读取,默认列名是value 类型是Stringschema = StructType().add('data', StringType(),nullable=True)df = spark.read.format('text').\schema(schema=schema).\load('../input/people.txt')df.printSchema()df.show()

        ②读取json文件

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# json文件类型自带Schema信息df = spark.read.format('json').load('../input/people.json')df.printSchema()df.show()

        ③读取csv文件

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 读取csv文件df = spark.read.format('csv').\option('sep', ';').\option('header', True).\option('encoding', 'utf-8').\schema('name STRING, age INT, job STRING').\load('../input/people.csv')df.printSchema()df.show()

        ④读取parquet文件

        parquet文件:是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多,他们都是列存储格式。

        parquet对比普通的文本文件的区别:

                ①parquet内置schema(列名、列类型、是否为空)

                ②存储是以列作为存储格式

                ③存储是序列化存储在文件中的(有压缩属性体积小)

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 读取parquet文件df = spark.read.format('parquet').load('../input/users.parquet')df.printSchema()df.show()

四、DataFrame编程

        (1)DSL语法风格
# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContextdf = spark.read.format('csv').\schema('id INT, subject STRING, score INT').\load('../input/stu_score.txt')# Column对象的获取id_column = df['id']subject_column = df['subject']# DLS风格df.select(['id', 'subject']).show()df.select('id', 'subject').show()df.select(id_column, subject_column).show()# filter APIdf.filter('score < 99').show()df.filter(df['score'] < 99).show()# where APIdf.where('score < 99').show()df.where(df['score'] < 99).show()# group By API# df.groupBy API的返回值为 GroupedData类型1# GroupedData对象不是DataFrame# 它是一个 有分组关系的数据结构,有一些API供我们对分组做聚合# SQL:group by 后接上聚合: sum avg count min max# GroupedData 类似于SQL分组后的数据结构,同样由上述5中聚合方法# GroupedData 调用聚合方法后,返回值依旧是DayaFrame# GroupedData 只是一个中转的对象,最终还是会获得DataFrame的结果df.groupBy('subject').count().show()df.groupBy(df['subject']).count().show()
        (2)SQL语法风格

        DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql()来执行SQL语句查询,结果返回一个DataFrame。
        如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:        

df.createTempView( "score")            #注册一个临时视图(表)
df.create0rReplaceTempView("score")    #注册一个临时表,如果存在进行替换。
df.createGlobalTempView( "score")      #注册一个全局表

        全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
        global_temp.
        临时表:只在当前SparkSession中可用

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContextdf = spark.read.format('csv').\schema('id INT, subject STRING, score INT').\load('../input/stu_score.txt')# 注册成临时表df.createTempView('score')              # 注册临时视图(表)df.createOrReplaceTempView('score_2')   # 注册或者替换为临时视图df.createGlobalTempView('score_3')      # 注册全局临时视图 全局临时视图使用的时候 需要在前面带上global_temp. 前缀# 可以通过SparkSession对象的sql api来完成sql语句的执行spark.sql("SELECT subject, COUNT(*) AS cnt FROM score GROUP BY subject").show()spark.sql("SELECT subject, COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()spark.sql("SELECT subject, COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()

五、Spark SQL——wordcount代码示例

        (1)pyspark.sql.functions包

        这个包里面提供了一系列的计算函数供SparkSQL使用

        导包:from pyspark.sql import functions as F

        这些函数返回值多数都是Column对象。

        (2)代码示例
# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder.appName('wordcount').master('local[*]').getOrCreate()sc = spark.sparkContext# TODO 1:SQL风格进行处理rdd = sc.textFile('../input/words.txt').\flatMap(lambda x: x.split(' ')).\map(lambda x: [x])df = rdd.toDF(['word'])# 注册DF为表格df.createTempView('words')spark.sql('SELECT word,COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC').show()# TODO 2:DSL 风格处理df = spark.read.format('text').load('../input/words.txt')# withColumn 方法# 方法功能:对已存在的列进行操作,返回一个新的列,如果名字和老列相同,那么替换,否则作为新列存在df2 = df.withColumn('value', F.explode(F.split(df['value'], ' ')))df2.groupBy('value').\count().\withColumnRenamed('value', 'word').\withColumnRenamed('count', 'cnt').\orderBy('cnt', ascending=False).show()# withColumnRenamed() 对列名进行重命名# orderBy() 排序

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120935.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hive使用中的参数优化与问题排查

1.使用hive的虚拟列排查错误案例 set hive.exec.rowoffsettrue; SELECT –输入文件名 INPUT__FILE__NAME, –文件中的块内偏移量 BLOCK__OFFSET__INSIDE__FILE, –文件行偏移量 ROW__OFFSET__INSIDE__BLOCK, * from hdp_lbg_zhaopin_defaultdb.zzdetail where dt‘20201117’…

07-定位布局

定位布局 1.定位布局- 定位流分类1.1.静态定位1.2.相对定位1.3.绝对定位1.4.固定定位1.5.粘滞定位1.6.z-index - 1.1.静态定位&#xff08; Static positioning&#xff09;- 1.2.什么是相对定位?&#xff08; Relative positioning &#xff09;- 相对定位注意点- 相对定位应…

Flink on yarn 加载失败plugins失效问题解决

Flink on yarn 加载失败plugins失效问题解决 flink版本&#xff1a;1.13.6 1. 问题 flink 任务运行在yarn集群,plugins加载失效,导致通过扩展资源获取任务参数失效 2. 问题定位 yarn容器的jar包及插件信息,jar包是正常上传 源码定位 加载plugins入口&#xff0c;TaskMana…

Unity的屏幕坐标获取

Screen.width public static int width ; 描述 屏幕窗口的当前宽度(以像素为单位)(只读)。 此为玩家窗口的实际宽度(在全屏模式下,它也是当前分辨率)。 using System.Collections; using System.Collections.Generic; using UnityEngine;public class Example : Mo…

TCP三次握手具体过程

四次挥手 1&#xff09;客户端进程发出连接释放报文&#xff0c;并且停止发送数据。释放数据报文首部&#xff0c;FIN1&#xff0c;其序列号为sequ&#xff08;等于前已经传送过来的数据的最后一个字节的序号加1)&#xff0c;此时&#xff0c;客户端进入FIN_WAIT_1&#xff08…

【Python机器学习】零基础掌握RandomTreesEmbedding集成学习

如何在高维数据中找到隐藏的结构? 面临大量复杂、高维的数据,例如社交网络分析、电子商务推荐系统或医疗诊断,如何有效地分析和解读这些数据成为一大挑战。一个有效的方法是使用嵌入技术将高维数据转化为低维形式,同时保留其内在结构。这次将介绍一种称为“随机树嵌入”(…

AI新能量!FortiGate NGFW面向数据中心全面集成FortiGuard AI 安全服务

企业IT技术正在以惊人的速度发展&#xff0c;转型最大的领域之一是下一代防火墙&#xff08;NGFW&#xff09;市场。如今&#xff0c;混合云、多云、边缘等多种基础设施形态共存&#xff0c;已经成为大部分企业的常态&#xff0c;不断扩张的攻击面需要不同形态防火墙的安全防护…

若依ruoyi-nbcio如何做一个仿钉钉流程设计器的思考

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 看到有些流程图采用仿钉钉的流程设计&#xff0c;比如下面界面&#xff1a; 这种方式虽然简单&#xff0c…

kafka安装配置

Kafka的安装配置可以按照以下步骤进行&#xff1a; 确保已安装Java运行环境&#xff1a;Kafka是使用Java语言编写的&#xff0c;因此需要在安装Kafka之前先安装Java运行环境。Kafka支持Java 8及以上版本。硬件要求&#xff1a;Kafka可以在任何硬件上运行&#xff0c;但是在生产…

计算机网络文章荟萃

脑残式网络编程入门(二)&#xff1a;我们在读写Socket时&#xff0c;究竟在读写什么&#xff1f;-网络编程/专项技术区 - 即时通讯开发者社区! 1.什么是 socket - 掘金2.socket 的实现原理 - 掘金本文讲述了 socket 在 linux 操作系统下的数据结构&#xff0c;以及阻塞 IO 利用…

【Java】PAT Basic Level 1023 组个最小数

题目 1024 组个最小数 作者 CAO, Peng 单位 Google 给定数字 0-9 各若干个。你可以以任意顺序排列这些数字&#xff0c;但必须全部使用。目标是使得最后得到的数尽可能小&#xff08;注意 0 不能做首位&#xff09;。例如&#xff1a;给定两个 0&#xff0c;两个 1&#xff…

檢測項目簡體字

某些項目可能要求代碼中不允許使用簡體字 安裝stcheck檢查 yarn add stcheck --dev在項目根目錄創建 st.config.json 文件 {"patterns": ["./**/*.(ts|js|tsx|jsx|vue|html)","!**/node_modules/**","!.git/**"],"gitignore&q…

Express框架开发接口之登录与注册API

我们利用nodeexpressmysql开发接口&#xff0c;对数据库数据进行简单增、删、查改等操作。 接口是什么&#xff1f; 接口是 前后端通信的桥梁 简单理解&#xff1a;一个接口就是 服务中的一个路由规则 &#xff0c;根据请求响应结果 接口的英文单词是 API (Application Progra…

【socket】网卡内部缓冲区、socket缓冲区、滑动窗口

一、网卡内部缓冲区 网卡内部的缓冲区&#xff0c;通常称为环形缓冲区&#xff08;Ring Buffer&#xff09;。环形缓冲区是一种用于数据存储和数据传输的结构&#xff0c;它允许数据在网络接口卡&#xff08;网卡&#xff09;和计算机操作系统之间进行高效传输。 当网卡接收到…

Java工具库——FastJson的40个常用方法

那些想看却没看的书&#xff0c;在心里摆满一个图书馆… 工具库介绍 阿里巴巴的 FastJSON&#xff0c;也被称为 Alibaba FastJSON 或阿里巴巴 JSON&#xff0c;是一个高性能的 Java JSON 处理库&#xff0c;用于在 Java 应用程序中解析和生成 JSON 数据。FastJSON 以其卓越的性…

C++:string的使用

目录 1、string的介绍 2、string的初始化 2.1、方法1 2.2、方法2 2.3、方法3 2.4、方法4 2.5、方法5 2.6、方法6 3、string的赋值运算符重载 4、 string的常用内置函数使用 5、string的遍历 4.1数组下标访问 4.2迭代器 4.3范围for 6、模拟实现MyString 6.1、头文…

HTTP发起请求与收到响应的大致过程

可以《《透视 HTTP 协议》Windows 10 搭建最小实验环境》搭建环境&#xff0c;之后才能进行下边的操作。 1.鼠标左键点击两下www目录下的start.bat批处理文件。 2.打开Wireshark&#xff0c;然后选择Adapter for loopback traffic capture。 3.然后把tcp.port 80 || udp.…

碳排放数据,各地区的(直辖市数据细分到区县),含shp和xlsx格式,带符号化

这两天推了道路相关的数据&#xff0c;道路线路、客运飞机场、航空、地铁、火车站点等等交通出行类的数据之前都已发过&#xff0c;需要的可以自己翻一翻。 交通运输行业还比较关注的碳排放数据&#xff0c;也整理出来了&#xff0c;有需要的自取。 数据地址&#xff1a; 全…

项目管理工具ConceptDraw PROJECT mac中文版自定义列功能

ConceptDraw PROJECT Mac是一款专业的项目管理工具&#xff0c;适用于MacOS平台。它提供了成功规划和执行项目所需的完整功能&#xff0c;包括任务和资源管理、报告和变更控制。 这款软件可以与ConceptDraw office集成&#xff0c;利用思维导图和数据可视化的强大功能来改进项目…

Microsoft.Extensions 简介

Microsoft.Extensions 简介 一、Microsoft.Extensions 简介 .NET Extensions 是一套官方的、开源的、跨平台的 API 集合&#xff0c;提供了一些常用的编程模式和实用工具&#xff0c;例如依赖项注入、日志记录、缓存、Host以及配置等等。该项目的大多数 API 都被用在 .NET 平…