SparkSql读取数据的方式

一、读取普通文件 

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

1、代码演示最普通的文件读取方式: 

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 构建环境变量# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取sparkSession对象spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()df01 = spark.read.json("../../datas/resources/people.json")df01.printSchema()df02 = spark.read.format("json").load("../../datas/resources/people.json")df02.printSchema()df03 = spark.read.parquet("../../datas/resources/users.parquet")df03.printSchema()#spark.read.orc("")df04 = spark.read.format("orc").load("../../datas/resources/users.orc")df04.printSchema()df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")df05.printSchema()df06 = spark.read.load(path="../../datas/resources/people.csv",format="csv",sep=";")df06.printSchema()spark.stop()

二、 通过jdbc读取数据库数据

先在本地数据库或者linux数据库中插入一张表:

CREATE TABLE `emp`  (`empno` int(11) NULL DEFAULT NULL,`ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`mgr` int(11) NULL DEFAULT NULL,`hiredate` date NULL DEFAULT NULL,`sal` decimal(7, 2) NULL DEFAULT NULL,`comm` decimal(7, 2) NULL DEFAULT NULL,`deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的数据:

CREATE TABLE `dept`  (`deptno` int(11) NULL DEFAULT NULL,`dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

查询时会报如下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driverat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

接着放驱动程序: 

Python环境放入MySQL连接驱动

  • 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
  • 如果是Linux上:注意集群模式所有节点都要放。

第一种情况:

假如你是windows环境:

我的最终的路径是在这里:

第二种情况:linux环境下,按照如下方式进行

# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars# 上传jar包:mysql-connector-java-5.1.32.jar

代码演示: 

import osfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongTypeif __name__ == '__main__':# 获取sparkSession对象# 设置 任务的环境变量os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 得到sparkSession对象spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()dict = {"user":"root","password":"root"}jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)jdbcDf.show()# jdbc的另一种写法jdbcDf2 = spark.read.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark") \.option("dbtable", "spark.dept") \.option("user", "root") \.option("password", "root").load()jdbcDf2.show()# 关闭spark.stop()

三、 读取table中的数据【hive】

  • 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
    • 存储:数据仓库:Hive:将HDFS文件映射成表
    • 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
  • 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的

1)集群环境操作hive 

需要启动的服务: 

先退出base环境:conda deactivate
启动服务:
启动hdfs:  start-dfs.sh  因为hive的数据在那里存储着
启动yarn:  start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore

修改配置: 

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>hive.metastore.uris</name><value>thrift://bigdata01:9083</value></property>
</configuration>接着将该文件进行分发:
xsync.sh hive-site.xml

操作sparkSQL: 

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。

进入后,通过内置对象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德枫|
|  2|  刘浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

2)开发环境如何编写代码,操作hive:

代码实战:

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 构建环境变量# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 防止在本地操作hdfs的时候,出现权限问题os.environ['HADOOP_USER_NAME'] = 'root'# 获取sparkSession对象spark = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()spark.sql("select * from yhdb.student").show()spark.read.table("yhdb.student").show()spark.stop()

不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/58845.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用Python实现图像的手绘风格效果

使用Python实现图像的手绘风格效果 一、引言二、代码详细解释与示例三、完整框架流程四、运行五、结论附&#xff1a;完整代码 一、引言 在数字图像处理领域&#xff0c;模拟手绘风格是一项有趣且具有挑战性的任务。手绘风格图像通常具有独特的纹理和深浅变化&#xff0c;给人…

Oracle Select语句

SELECT语句使用方法 在Oracle中&#xff0c;表是由列和行组成。 例如&#xff0c;示例数据库中的customers表具有以下列&#xff1a;customer_id&#xff0c;name&#xff0c;address&#xff0c;website和credit_limit。customers表中这些列中也有对应的数据。 要从表的一个或…

w~大模型~合集21

我自己的原文哦~ https://blog.51cto.com/whaosoft/12459590 #大模型~微调~用带反馈的自训练 面对当前微调大模型主要依赖人类生成数据的普遍做法&#xff0c;谷歌 DeepMind 探索出了一种减少这种依赖的更高效方法。大模型微调非得依赖人类数据吗&#xff1f;用带反馈的自训…

ctfshow(316,317,318)--XSS漏洞--反射性XSS

反射型XSS相关知识 Web316 进入界面&#xff1a; 审计 显示是关于反射性XSS的题目。 思路 首先想到利用XSS平台解题&#xff0c;看其他师傅的wp提示flag是在cookie中。 当前页面的cookie是flagyou%20are%20not%20admin%20no%20flag。 但是这里我使用XSS平台&#xff0c;…

【案例】Excel使用宏来批量插入图片

一、场景介绍 我有一个excel文件&#xff0c;需要通过一列的文件名称&#xff0c;按照规则给批量上传图片附件。 原始文件&#xff1a; 成功后文件&#xff1a; 二、实现方法 1. 使用【wps】工具打开Excel文件&#xff0c;将其保存为启用宏的文件。 2.找到编辑宏的【VB编辑器…

ENSP OSPF和BGP引入

路由协议分为&#xff1a;内部网关协议和外部网关协议。内部网关协议用于自治系统内部的路由&#xff0c;包括&#xff1a;RIP和OSPF。外部网关协议用于自治系统之间的路由&#xff0c;包括BGP。内部网关协议和外部网关协议配合来共同完成网络的路由。 BGP:边界网关路由协议(b…

Linux磁盘存储

磁盘存储 设备文件 设备文件是类Unix操作系统&#xff08;包括Linux&#xff09;中一种特殊的文件类型&#xff0c;它代表了设备接口&#xff0c;使得用户空间的程序可以通过标准的文件操作来访问和控制硬件设备。设备文件为周边设备提供了简单的接口&#xff0c;如打印机、硬…

阿里云ECS服务器使用限制及不允许做的事情

阿里云ECS&#xff08;Elastic Compute Service&#xff09;是一种高性能的弹性计算服务&#xff0c;允许用户在云端创建和管理虚拟服务器。尽管ECS提供了强大的功能&#xff0c;但在使用过程中&#xff0c;阿里云有一些限制和不允许的行为。以下是一些主要的使用限制和禁止行为…

【手撕排序4】计数排序+快速排序(非递归)

> &#x1f343; 本系列包括常见的各种排序算法&#xff0c;如果感兴趣&#xff0c;欢迎订阅&#x1f6a9; > &#x1f38a;个人主页:[小编的个人主页])小编的个人主页 > &#x1f380; &#x1f389;欢迎大家点赞&#x1f44d;收藏⭐文章 > ✌️ &#x1f91e;…

FPGA实现以太网(一)、以太网基础知识

系列文章目录 FPGA实现以太网&#xff08;二&#xff09;、初始化和配置PHY芯片 文章目录 系列文章目录一、以太网简介二、OSI七层模型三、TCP/IP五层模型四、MAC-PHY接口五、MAC帧格式六、IP帧格式6.1 IP首部校验和计算6.2 IP首部校验和校验 七、UDP帧格式7.1 UDP头部校验和…

Linux平台C99与C++11获取系统时间

源码: #include <iostream> #include <chrono> #include <ctime> #include <thread>using namespace std; int main() {cout << "===使用C99方式获取系统时间===" << endl;time_t now = time(nullptr);struct tm *tm_c99 = lo…

QT版发送邮件程序

简单的TCP邮箱程序 **教学与实践目的&#xff1a;**学会网络邮件发送的程序设计技术。 1.SMTP协议 邮件传输协议包括 SMTP&#xff08;简单邮件传输协议&#xff0c;RFC821&#xff09;及其扩充协议 MIME&#xff1b; 邮件接收协议包括 POP3 和功能更强大的 IMAP 协议。 服务…

【学习记录】使用CARLA录制双目摄像头SLAM数据

一、数据录制 数据录制的部分参考了网上的部分代码&#xff0c;代码本身并不复杂&#xff0c;基本都是简单的CARLA语法&#xff0c;关键的一点在于&#xff0c;CARLA内部本身并没有预设的双目摄像头&#xff0c;需要我们添加两个朝向相同的摄像头来组成双目系统&#xff0c;这…

第02章 CentOS基本操作

2.文件基本操作【文件操作&#xff08;一&#xff09;】 目标 理解Linux下路径的表示方法能够使用命令(mkdir和touch)在指定位置创建目录和文件能够使用命令(rm)删除指定的目录和文件能够使用命令(ls)列出目录里的文件能够使用命令(cat,head,tail,less,more)查看文件内容理解标…

【数字图像处理+MATLAB】解决 imshow 函数图像显示亮度异常问题:自动调整亮度范围使图像能正确显示

问题描述 在MATLAB中&#xff0c;使用imshow函数进行图像显示时&#xff0c;图片亮度显示异常。 imshow(im_avg);执行上述代码后&#xff0c;得到的图片亮度异常&#xff0c;如下图所示&#xff1a; 原因分析 在MATLAB中&#xff0c;imshow函数的亮度显示规则是基于图像数据…

探索 Python 的新边疆:sh 库的革命性功能

文章目录 **探索 Python 的新边疆&#xff1a;sh 库的革命性功能**第一部分&#xff1a;背景介绍第二部分&#xff1a;sh 库是什么&#xff1f;第三部分&#xff1a;如何安装 sh 库&#xff1f;第四部分&#xff1a;简单库函数使用方法1. 执行 ls 命令2. 使用 grep 搜索文件内容…

外贸管理利器7选,助力高效办公

推荐7款外贸管理软件&#xff0c;包括ZohoBooks、ZohoCRM、富通天下等&#xff0c;各具特色&#xff0c;满足外贸企业不同需求&#xff0c;提高管理效率&#xff0c;助力企业全球化竞争。、 一、Zoho Books Zoho Books是一款外贸财务管理软件&#xff0c;不仅为用户提供了一个…

SQL中的内连接(inner join)、外连接(left|right join、full join)以及on关键字中涉及分区筛选、null解释

一、简介 本篇幅主要介绍了&#xff1a; SQL中内连接&#xff08;inner join&#xff09;、外连接&#xff08;left join、right join、full join&#xff09;的机制;连接关键字on上涉及表分区筛选的物理执行及引擎优化&#xff1b;null在表关联时的情况与执行&#xff1b; …

机器学习(一)——基本概念、模型的评估与选择

目录 1 关于2 概念2.1 基础概念2.2 学习过程2.3 预测与评估2.4 标记与分类2.4.1 标记2.4.2 分类 2.5 回归分析2.6 聚类分析2.7 学习类型2.8 泛化能力2.9 统计学概念 3 模型评估与选择3.1 经验误差与过拟合3.2 评估方法3.2.1 留出法3.2.2 交叉验证法3.2.3 自助法3.2.4 调参与最终…

ssm060基于SSM的高校共享单车管理系统的设计与实现+vue(论文+源码)_kaic

设计题目&#xff1a;高校共享单车管理系统的设计与实现 摘 要 网络技术和计算机技术发展至今&#xff0c;已经拥有了深厚的理论基础&#xff0c;并在现实中进行了充分运用&#xff0c;尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代&#xff0…