SparkSql读取数据的方式

一、读取普通文件 

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

1、代码演示最普通的文件读取方式: 

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 构建环境变量# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取sparkSession对象spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()df01 = spark.read.json("../../datas/resources/people.json")df01.printSchema()df02 = spark.read.format("json").load("../../datas/resources/people.json")df02.printSchema()df03 = spark.read.parquet("../../datas/resources/users.parquet")df03.printSchema()#spark.read.orc("")df04 = spark.read.format("orc").load("../../datas/resources/users.orc")df04.printSchema()df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")df05.printSchema()df06 = spark.read.load(path="../../datas/resources/people.csv",format="csv",sep=";")df06.printSchema()spark.stop()

二、 通过jdbc读取数据库数据

先在本地数据库或者linux数据库中插入一张表:

CREATE TABLE `emp`  (`empno` int(11) NULL DEFAULT NULL,`ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`mgr` int(11) NULL DEFAULT NULL,`hiredate` date NULL DEFAULT NULL,`sal` decimal(7, 2) NULL DEFAULT NULL,`comm` decimal(7, 2) NULL DEFAULT NULL,`deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的数据:

CREATE TABLE `dept`  (`deptno` int(11) NULL DEFAULT NULL,`dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

查询时会报如下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driverat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

接着放驱动程序: 

Python环境放入MySQL连接驱动

  • 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
  • 如果是Linux上:注意集群模式所有节点都要放。

第一种情况:

假如你是windows环境:

我的最终的路径是在这里:

第二种情况:linux环境下,按照如下方式进行

# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars# 上传jar包:mysql-connector-java-5.1.32.jar

代码演示: 

import osfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongTypeif __name__ == '__main__':# 获取sparkSession对象# 设置 任务的环境变量os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 得到sparkSession对象spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()dict = {"user":"root","password":"root"}jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)jdbcDf.show()# jdbc的另一种写法jdbcDf2 = spark.read.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark") \.option("dbtable", "spark.dept") \.option("user", "root") \.option("password", "root").load()jdbcDf2.show()# 关闭spark.stop()

三、 读取table中的数据【hive】

  • 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
    • 存储:数据仓库:Hive:将HDFS文件映射成表
    • 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
  • 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的

1)集群环境操作hive 

需要启动的服务: 

先退出base环境:conda deactivate
启动服务:
启动hdfs:  start-dfs.sh  因为hive的数据在那里存储着
启动yarn:  start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore

修改配置: 

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>hive.metastore.uris</name><value>thrift://bigdata01:9083</value></property>
</configuration>接着将该文件进行分发:
xsync.sh hive-site.xml

操作sparkSQL: 

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。

进入后,通过内置对象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德枫|
|  2|  刘浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

2)开发环境如何编写代码,操作hive:

代码实战:

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 构建环境变量# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 防止在本地操作hdfs的时候,出现权限问题os.environ['HADOOP_USER_NAME'] = 'root'# 获取sparkSession对象spark = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()spark.sql("select * from yhdb.student").show()spark.read.table("yhdb.student").show()spark.stop()

不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/58845.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用特征构建进行连续变量的特征提取

特征构建(Feature Engineering)是机器学习过程中至关重要的一步,它直接影响模型的性能和准确性。通过对原始数据进行转换、处理和扩展,可以为模型提供更加丰富的信息,提升预测效果。特征构建的核心思想是利用现有的数据来生成新的特征,以便模型可以更好地捕捉潜在的规律和…

使用Python实现图像的手绘风格效果

使用Python实现图像的手绘风格效果 一、引言二、代码详细解释与示例三、完整框架流程四、运行五、结论附&#xff1a;完整代码 一、引言 在数字图像处理领域&#xff0c;模拟手绘风格是一项有趣且具有挑战性的任务。手绘风格图像通常具有独特的纹理和深浅变化&#xff0c;给人…

Oracle Select语句

SELECT语句使用方法 在Oracle中&#xff0c;表是由列和行组成。 例如&#xff0c;示例数据库中的customers表具有以下列&#xff1a;customer_id&#xff0c;name&#xff0c;address&#xff0c;website和credit_limit。customers表中这些列中也有对应的数据。 要从表的一个或…

Scala的集合。

定义&#xff1a;set表示没有重复元素的集合 特点&#xff1a;唯一&#xff0c;无序 Set有可变mutable和不可变immutable 两种类型。不可变set创建后元素不能修改&#xff0c;可变set可对元素进行添加&#xff0c;删除等操作&#xff0c;这两种类型能满足不同场景需求。 pack…

w~大模型~合集21

我自己的原文哦~ https://blog.51cto.com/whaosoft/12459590 #大模型~微调~用带反馈的自训练 面对当前微调大模型主要依赖人类生成数据的普遍做法&#xff0c;谷歌 DeepMind 探索出了一种减少这种依赖的更高效方法。大模型微调非得依赖人类数据吗&#xff1f;用带反馈的自训…

opencv 中 threshold 函数作用

在 OpenCV 中&#xff0c;threshold 函数用于将图像转换为二值图像&#xff0c;它通过设置一个阈值来将像素值分类为两类&#xff1a;低于阈值的像素设置为 0&#xff08;或黑色&#xff09;&#xff0c;高于阈值的像素设置为最大值&#xff08;通常是 255 或白色&#xff09;。…

ctfshow(316,317,318)--XSS漏洞--反射性XSS

反射型XSS相关知识 Web316 进入界面&#xff1a; 审计 显示是关于反射性XSS的题目。 思路 首先想到利用XSS平台解题&#xff0c;看其他师傅的wp提示flag是在cookie中。 当前页面的cookie是flagyou%20are%20not%20admin%20no%20flag。 但是这里我使用XSS平台&#xff0c;…

java 容器的快速失败(fast-fail)机制

Java容器的快速失败&#xff08;fail-fast&#xff09;机制是Java集合框架中的一种重要特性&#xff0c;它主要用于在迭代过程中检测并处理集合的并发修改。以下是对该机制的详细解释&#xff1a; 一、定义与原理 快速失败机制的核心思想是在迭代过程中&#xff0c;一旦检测到…

【案例】Excel使用宏来批量插入图片

一、场景介绍 我有一个excel文件&#xff0c;需要通过一列的文件名称&#xff0c;按照规则给批量上传图片附件。 原始文件&#xff1a; 成功后文件&#xff1a; 二、实现方法 1. 使用【wps】工具打开Excel文件&#xff0c;将其保存为启用宏的文件。 2.找到编辑宏的【VB编辑器…

ENSP OSPF和BGP引入

路由协议分为&#xff1a;内部网关协议和外部网关协议。内部网关协议用于自治系统内部的路由&#xff0c;包括&#xff1a;RIP和OSPF。外部网关协议用于自治系统之间的路由&#xff0c;包括BGP。内部网关协议和外部网关协议配合来共同完成网络的路由。 BGP:边界网关路由协议(b…

Linux磁盘存储

磁盘存储 设备文件 设备文件是类Unix操作系统&#xff08;包括Linux&#xff09;中一种特殊的文件类型&#xff0c;它代表了设备接口&#xff0c;使得用户空间的程序可以通过标准的文件操作来访问和控制硬件设备。设备文件为周边设备提供了简单的接口&#xff0c;如打印机、硬…

xcode更新完最新版本无法运行调试

‌Xcode更新后无法运行调试的原因可能包括以下几个方面‌&#xff1a; 1.‌版本兼容性问题‌&#xff1a;Xcode更新后&#xff0c;某些旧版本的代码可能不再兼容新版本的Xcode&#xff0c;导致出现错误。解决方法是根据错误提示逐个修复代码&#xff0c;或者尝试使用兼容新版本…

Windows下Python环境安装GDAL

最近发现在Windows环境下安装gdal,rasterio等都会出现头文件缺失的问题,比如: pip install gdal -i https://pypi.tuna.tsinghua.edu.cn/simple pip install rasterio -i https://pypi.tuna.tsinghua.edu.cn/simple 安装会下载tar.gz格式的gdal包,回报一些类似找不到头文件…

vscode生成项目目录结构

在博客中经常看到目录结构如下&#xff1a; project-tree ├─ .git ├─ .gitignore ├─ .vscodeignore ├─ images ├─ node_modules ├─ src │ ├─ config.ts │ ├─ index.ts │ └─ utils.ts ├─ tsconfig.json ├─ tslint.json └─ webpack.config.js 应该如何…

阿里云ECS服务器使用限制及不允许做的事情

阿里云ECS&#xff08;Elastic Compute Service&#xff09;是一种高性能的弹性计算服务&#xff0c;允许用户在云端创建和管理虚拟服务器。尽管ECS提供了强大的功能&#xff0c;但在使用过程中&#xff0c;阿里云有一些限制和不允许的行为。以下是一些主要的使用限制和禁止行为…

el-date-picker 设置开始时间和结束时间

<el-date-picker v-model"ruleForm.RECORDDATE" type"date" placeholder"日期" format"YYYY/M/D" value-format"YYYY/M/D" style"width: 100%;" :disabled-date"publishDateAfter" > </el-dat…

【LeetCode】【算法】240. 搜索二维矩阵II

LeetCode 240. 搜索二维矩阵II 题目描述 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 思路 思路&#xff1a;K神真强啊240.搜索二维矩阵II&#xff0…

【手撕排序4】计数排序+快速排序(非递归)

> &#x1f343; 本系列包括常见的各种排序算法&#xff0c;如果感兴趣&#xff0c;欢迎订阅&#x1f6a9; > &#x1f38a;个人主页:[小编的个人主页])小编的个人主页 > &#x1f380; &#x1f389;欢迎大家点赞&#x1f44d;收藏⭐文章 > ✌️ &#x1f91e;…

FPGA实现以太网(一)、以太网基础知识

系列文章目录 FPGA实现以太网&#xff08;二&#xff09;、初始化和配置PHY芯片 文章目录 系列文章目录一、以太网简介二、OSI七层模型三、TCP/IP五层模型四、MAC-PHY接口五、MAC帧格式六、IP帧格式6.1 IP首部校验和计算6.2 IP首部校验和校验 七、UDP帧格式7.1 UDP头部校验和…

Linux平台C99与C++11获取系统时间

源码: #include <iostream> #include <chrono> #include <ctime> #include <thread>using namespace std; int main() {cout << "===使用C99方式获取系统时间===" << endl;time_t now = time(nullptr);struct tm *tm_c99 = lo…