Spark-SQL3

Spark-SQL

一.Spark-SQL核心编程(四)

1.数据加载与保存:

1)通用方式:

SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式为parquet。

2)加载数据:

spark.read.load 是加载数据的通用方法。如果读取不同格式的数据,可以对不同的数据格式进行设定。

spark.read.format("…")[.option("…")].load("…")

format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。

load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。

option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable

我们前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,其实,我们也可以直接在文件上进行查询: 文件格式.`文件路径`

spark.sql("select * from json.’ Spark-SQL/input/user.json’").show

3)保存数据:

df.write.save 是保存数据的通用方法。如果保存不同格式的数据,可以对不同的数据格式进行设定。

df.write.format("…")[.option("…")].save("…")

format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。

save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。

option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable

保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。

 

例如:df.write.mode("append").json("Spark-SQL/output")

 

 

2.Parquet

Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式

存储格式。数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。修改配置项 spark.sql.sources.default,可修改默认数据源格式。

1)加载数据:

Val df =s park.read.load("examples/src/main/resources/users.parquet")

2)保存数据:

var df = spark.read.json("/opt/module/data/input/people.json")

df.write.mode("append").save("/opt/module/data/output")

3.JSON

Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]. 可以

通过 SparkSession.read.json()去加载 JSON 文件。

注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串

 

加载json文件

val path = "/opt/module/spark-local/people.json" 

val peopleDF = spark.read.json(path)

创建临时表

peopleDF.createOrReplaceTempView("people")

数据查询

val resDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")

4.CSV

Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为

数据列。

spark.read.format("csv").option("sep",";").option("inferSchema","true")

.option("header", "true").load("data/user.csv")

5.MySQL

Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对

DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。

IDEA通过JDBC对MySQL进行操作:

1)导入依赖

<dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>5.1.27</version>

</dependency>

MySQL8 <version>8.0.11</version>

 

2)读取数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")

val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

 

import spark.implicits._

//通用的load方式读取

 

spark.read.format("jdbc")

  .option("url","jdbc:mysql://localhost:3306/system")

  .option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver

  .option("user","root")

  .option("password","123456")

  .option("dbtable","user")

  .load().show()

 

spark.stop()

 

 

 

 

//通用的load方法的另一种形式

spark.read.format("jdbc")

  .options(

    Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))

  .load().show()

 

 

//通过JDBC

val pros :Properties = new Properties()

pros.setProperty("user","root")

pros.setProperty("password","123456")

val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)

df.show()

 

1)写入数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")

val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

 

import spark.implicits._

val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),

  Stu("zs", 30)))

val ds:Dataset[Stu] = rdd.toDS()

 

ds.write.format("jdbc")

  .option("url","jdbc:mysql://localhost:3306/system")

  .option("driver","com.mysql.jdbc.Driver")

  .option("user","root")

  .option("password","123456")

  .option("dbtable","user2")

  .mode(SaveMode.Append)

  .save()

 

spark.stop()

 

二.Spark-SQL核心编程(五)

1.Spark-SQL连接Hive

Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)、Hive 查询语言(HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译 Spark SQL 时引入 Hive支持,这样就可以使用这些特性了。

使用方式分为内嵌Hive、外部Hive、Spark-SQL CLI、Spark beeline 以及代码操作。

1)内嵌的 HIVE

如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可。但是在实际生产活动当中,几乎没有人去使用内嵌Hive这一模式。

2)外部的 HIVE

在虚拟机中下载以下配置文件:

 

如果想在spark-shell中连接外部已经部署好的 Hive,需要通过以下几个步骤:

Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下,并将url中的localhost改为node01

 

 

 

把 MySQL 的驱动 copy 到 jars/目录下

 

把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下

重启 spark-shell

 

 

3)运行 Spark beeline(了解)

Spark Thrift Server 是 Spark 社区基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容HiveServer2。因为 Spark Thrift Server 的接口和协议都和 HiveServer2 完全一致,因此我们部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 访问 Spark Thrift Server 执行相关语句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依旧可以和 Hive Metastore进行交互,获取到 hive 的元数据。

如果想连接 Thrift Server,需要通过以下几个步骤:

Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下

把 Mysql 的驱动 copy 到 jars/目录下

把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下

启动 Thrift Server

使用 beeline 连接 Thrift Server

beeline -u jdbc:hive2://node01:10000 -n root

4)运行Spark-SQL CLI

Spark SQL CLI 可以很方便的在本地运行 Hive 元数据服务以及从命令行执行查询任务。在 Spark 目录下执行如下命令启动 Spark SQL CLI,直接执行 SQL 语句,类似于 Hive 窗口。

操作步骤:

1.将mysql的驱动放入jars/当中;

2.将hive-site.xml文件放入conf/当中;

3.运行bin/目录下的spark-sql.cmd 或者打开cmd,在D:\spark\spark-3.0.0-bin-hadoop3.2\bin当中直接运行spark-sql

 

可以直接运行SQL语句,如下所示:

5)代码操作Hive

1.导入依赖。

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-hive_2.12</artifactId>

    <version>3.0.0</version>

</dependency>

<dependency>

    <groupId>org.apache.hive</groupId>

    <artifactId>hive-exec</artifactId>

    <version>2.3.3</version>

</dependency>

 

可能出现下载jar包的问题:

D:\maven\repository\org\pentaho\pentaho-aggdesigner-algorithm\5.1.5-jhyde

 

2. 将hive-site.xml 文件拷贝到项目的 resources 目录中。

3.代码实现。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("hive")

val spark:SparkSession = SparkSession.builder()

  .enableHiveSupport()

  .config(sparkConf)

  .getOrCreate()

 

spark.sql("show databases").show()

spark.sql("create database spark_sql")

spark.sql("show databases").show()

 

 

注意:

1.如果在执行操作时,出现如下错误:

 

可以在代码最前面增加如下代码解决:

System.setProperty("HADOOP_USER_NAME", "node01")

此处的 node01 改为自己的 hadoop 用户名称

2.在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址: config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse")

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/77610.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DeepSeek与Napkin:信息可视化领域的创新利器

摘要 在数字化信息爆炸的时代&#xff0c;如何高效地组织思路并将其转化为直观、清晰的可视化图表&#xff0c;成为众多领域面临的关键问题。本文深入剖析了DeepSeek与Napkin这两款工具&#xff0c;详细探讨它们在信息处理与可视化过程中的功能特性、协同工作机制、应用场景、…

conda 创建、激活、退出、删除环境命令

参考博客&#xff1a;Anaconda创建环境、删除环境、激活环境、退出环境 使用起来觉得有些不方便可以改进&#xff0c;故写此文。 1. 创建环境 使用 -y 跳过确认 conda create -n 你的环境名 -y 也可以直接选择特定版本 python 安装&#xff0c;以 3.10 为例&#xff1a; co…

嵌入式芯片中的 低功耗模式 内容细讲

电源域与运行级别概述 电源域&#xff08;Power Domain&#xff09; 核心域&#xff08;Core Domain&#xff09;&#xff1a;包括 CPU 核心和关键架构模块&#xff08;如 NVIC、CPU 内核寄存器&#xff09;。 外设域&#xff08;Peripheral Domain&#xff09;&#xff1a;…

Java中常见的锁synchronized、ReentrantLock、ReentrantReadWriteLock、StampedLock

在Java中&#xff0c;锁是实现多线程同步的核心机制。不同的锁适用于不同的场景&#xff0c;理解其实现原理和使用方法对优化性能和避免并发问题至关重要。 一、隐式锁&#xff1a;synchronized 关键字 实现原理 基于对象监视器&#xff08;Monitor&#xff09;&#xff1a;每…

@JsonView + 单一 DTO:如何实现多场景 JSON 字段动态渲染

JsonView 单一 DTO&#xff1a;如何实现多场景 JSON 字段动态渲染 JsonView 单一 DTO&#xff1a;如何实现多场景 JSON 字段动态渲染1、JsonView 注解产生的背景2、为了满足不同场景下返回对应的属性的做法有哪些&#xff1f;2.1 最快速的实现则是针对不同场景新建不同的 DTO…

Etcd 压缩整理

etcd数据存储 在实际生产中使用 ETCD 存储元数据&#xff0c;起初集群规模不大的时候元数据信息不多没有发现什么问题。随着集群规模越来越大&#xff0c;可能引发存储问题。 —auto-compaction-retention 由于ETCD数据存储多版本数据&#xff0c;随着写入的主键增加历史版本需…

【更新完毕】2025妈妈杯C题 mathercup数学建模挑战赛C题数学建模思路代码文章教学:音频文件的高质量读写与去噪优化

完整内容请看文章最下面的推广群 我将先给出文章、代码、结果的完整展示, 再给出四个问题详细的模型 面向音频质量优化与存储效率提升的自适应编码与去噪模型研究 摘 要 随着数字媒体技术的迅速发展&#xff0c;音频处理技术在信息时代的应用愈加广泛&#xff0c;特别是在存储…

React-请勿在循环或者条件语句中使用hooks

这是React Hooks的首要规则&#xff0c;这是因为React Hooks 是以单向循环链表的形式存储&#xff0c;即是有序的。循环是为了从最后一个节点移到一个节点的时候&#xff0c;只需通过next一步就可以拿到第一个节点&#xff0c;而不需要一层层回溯。React Hooks的执行&#xff0…

【大模型】 LangChain框架 -LangChain实现问答系统

LangChain 介绍与使用方法 1. 什么是 LangChain&#xff1f;2. LangChain 的主要功能3. 如何使用 LangChain&#xff1f;3.1 环境准备3.2 基本使用示例3.2.1 简单的问答系统3.2.2 结合外部工具 3.3 高级用法 4. 常见问题及解决方法4.1 安装问题4.2 运行问题4.3 性能问题 5. 实战…

企业级HAProxy高可用离线部署实战(附Kubernetes APIServer负载均衡配置)

企业级HAProxy高可用离线部署实战&#xff08;附Kubernetes APIServer负载均衡配置&#xff09; 摘要&#xff1a;本文深入讲解在离线环境下部署HAProxy 3.1.1的全流程&#xff0c;涵盖源码编译、系统服务封装、K8S APIServer四层负载配置等核心环节&#xff0c;并提供生产级高…

Python网络爬虫设计(一)

目录 一、网络爬虫 1、基本的爬虫 2、获取URL 3、查找网页源码关键字 4、代码实现 二、requests库 1、requests的优势和劣势 2、获取网页的其他库 &#xff08;1&#xff09;selenium库 &#xff08;2&#xff09;pyppeteer库 三、pyppeteer库 1、pyppeteer库的来历…

BR_频谱20dB 带宽(RF/TRM/CA/BV-05-C [TX Output Spectrum – 20 dB Bandwidth])

目录 一、规范要求 1、协议章节 2、测试目的 二、测试方法 1、样机初值条件&#xff1a; 2、测试步骤: 方法一&#xff1a;频谱仪 方法二&#xff1a;综测仪CMW500 3、预期结果 一、规范要求 1、协议章节 4.5.5 RF/TRM/CA/BV-05-C [TX Output Spectrum – 20 dB Ba…

【橘子大模型】初探rag知识库的构建

一、简介 我们在实现了一系列功能之后&#xff0c;终于来到了rag的部分&#xff0c;下面我们将基于langchain来实现一个rag检索。 关于rag方面的知识&#xff0c;可以查看这两篇文章&#xff1a; 大模型应用之RAG详解 什么是 RAG&#xff08;检索增强生成&#xff09; 或者是去…

CentOS7执行yum命令报错 Could not retrieve mirrorlist http://mirrorlist.centos.org

CentOS7执行yum命令报错 引更新yum源备份原有源创建新的源文件清理并重建缓存 引 CentOS 7 系统无法连接到 CentOS 的官方镜像站点。这通常是由于网络问题或 CentOS 7 已停止维护导致的&#xff08;2024年6月30日后 CentOS 7 已进入 EOL&#xff09; 报错明细&#xff1a; 已…

VSCode安装与环境配置(Mac环境)

20250419 - 概述 大概是非常久之前了&#xff0c;装了VSCode&#xff0c;估计都得21的时候了&#xff0c;电脑上也没更新过。当时安装也直接装上就完事了。这次把版本更新一下&#xff0c;同时记录一下这个安装过程。 安装 mac下安装非常简单&#xff0c;直接从官网下载&am…

QML动画--ParallelAnimation和SequentialAnimation

一、ParallelAnimation ParallelAnimation 是 QML 中用于并行执行多个动画的容器动画类型&#xff0c;可以同时运行多个子动画。 基本用法 qml import QtQuick 2.15Rectangle {id: rectwidth: 100; height: 100color: "red"x: 0; y: 0; opacity: 1.0ParallelAnim…

NLP高频面试题(四十三)——什么是人类偏好对齐中的「对齐税」(Alignment Tax)?如何缓解?

一、什么是「对齐税」(Alignment Tax)? 所谓「对齐税」(Alignment Tax),指的是在使人工智能系统符合人类偏好的过程中,所不可避免付出的性能损失或代价。换句话说,当我们迫使AI遵循人类价值观和规范时,AI系统往往无法达到其最大理论性能。这种性能上的妥协和折衷,就…

速查手册:TA-Lib 超过150种量化技术指标计算全解 - 1. Overlap Studies(重叠指标)

速查手册&#xff1a;TA-Lib 超过150种量化技术指标计算全解 - 1. Overlap Studies&#xff08;重叠指标&#xff09; TA-Lib&#xff08;Technical Analysis Library&#xff09;是广泛使用的金融技术分析库&#xff0c;实现了超过150种技术指标计算函数&#xff0c;适用于股票…

重构未来智能:Anthropic 解码Agent设计哲学三重奏

第一章 智能体进化论&#xff1a;从工具到自主体的认知跃迁 1.1 LLM应用范式演进图谱 阶段技术形态应用特征代表场景初级阶段单功能模型硬编码规则执行文本摘要/分类进阶阶段工作流编排多模型协同调度跨语言翻译流水线高级阶段自主智能体动态决策交互编程调试/客服对话 1.1.…

Git 中修改某个特定的commit提交内容

在 Git 中修改某个特定的提交&#xff08;commit&#xff09;通常需要使用 交互式变基&#xff08;Interactive Rebase&#xff09; 或 修改提交&#xff08;Commit Amend&#xff09;。以下是不同场景下的具体操作步骤&#xff1a; 一、修改最近的提交&#xff08;最新提交&am…