spark的学习-06

SparkSQL读写数据的方式

1)输入Source

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

特殊参数:option,用于指定读取时的一些配置选项

spark.read.format("csv").option("sep", "\t").load(path)jdbcDF = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.load()

1、普通的文件读取方式:

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()deptDf = spark.read.csv("../../datas/zuoye/dept.csv").toDF("deptno","dept_name","dept_address")empDf = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv").toDF("empno","ename","salary","comm","deptno")deptDf.createOrReplaceTempView("dept")empDf.createOrReplaceTempView("emp")# spark.sql("""#     select * from dept# """).show()## spark.sql("""#         select * from emp#     """).show()# 需求:查询统计每个部门薪资最高的前两名员工的信息以及员工所在的部门名称。spark.sql("""with t as (select e.*,d.dept_name,dense_rank() over(partition by e.deptno order by cast(salary as int) desc) paixu from emp e join dept d on d.deptno = e.deptno) select * from t where paixu <= 2""").show()spark.stop()

2、通过jdbc读取数据库数据

注意:使用jdbd读取之前,需要先将mysql5或者8 的驱动放在

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 得到sparkSession对象spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 处理逻辑# 读取json 数据df1 = spark.read.format("json").load("../../datas/sql/person.json")df1.show()# 另一种写法,推荐使用这一种df2 = spark.read.json("../../datas/sql/person.json")df2.show()df3 = spark.read.csv("../../datas/dept.csv")df4 = spark.read.format("csv").load("../../datas/dept.csv")# 读取分隔符为别的分隔符的文件user_schema = StructType([StructField(name="emp_id", dataType=StringType(), nullable=False),StructField(name="emp_name", dataType=StringType(), nullable=True),StructField(name="salary", dataType=DoubleType(), nullable=True),StructField(name="comm", dataType=DoubleType(), nullable=True),StructField(name="dept_id", dataType=LongType(), nullable=True)])# 使用csv 读取了一个 \t 为分隔符的文件,读取的数据字段名很随意,所以可以自定义df5 = spark.read.format("csv").option("sep","\t").load("../../datas/emp.tsv",schema=user_schema)df5.show()# 昨天的作业是否也可以有另一个写法movie_schema = StructType([StructField(name="movie_id", dataType=LongType(), nullable=False),StructField(name="movie_name", dataType=StringType(), nullable=True),StructField(name="movie_type", dataType=StringType(), nullable=True)])movieDF = spark.read.format("csv").option("sep","::").load("../../datas/zuoye/movies.dat",schema=movie_schema)movieDF.show()spark.read.load(path="../../datas/zuoye/movies.dat",format="csv",sep="::",schema=movie_schema).show()dict = {"user":"root","password":"root"}jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)jdbcDf.show()# jdbc的另一种写法jdbcDf2 = spark.read.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark") \.option("dbtable", "spark.dept") \.option("user", "root") \.option("password", "root").load()jdbcDf2.show()# 读取hive表中的数据# 关闭spark.stop()

3、读取集群中hive表中的数据

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession \.builder \.appName("测试spark链接") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()spark.sql("select * from homework.game").show()spark.stop()

2)输出Sink

方式一:给定输出数据源的类型和地址

df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)

方式二:直接调用对应数据源类型的方法

df.write.json(path)
df.write.csv(path)
df.write.parquet(path)

特殊参数:option,用于指定输出时的一些配置选项

df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()

append: 追加模式,当数据存在时,继续追加

overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;

error/errorifexists: 如果目标存在就报错,默认的模式

ignore: 忽略,数据存在时不做任何操作

df.write.mode(saveMode="append").format("csv").save(path)

保存为普通格式:

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()df = spark.read.json("../../datas/person.json")df.createOrReplaceTempView("person")rsDf = spark.sql("""select name,age from person where age = (select max(age) from person)""")rsDf.write.csv("hdfs://bigdata01:9820/result")spark.stop()

如果你的hdfs没有关闭安全模式的话,会报一个错误,使用下面命令关闭安全模式即可

Hdfs 关闭安全模式
hdfs dfsadmin -safemode leave

保存到本地数据库:

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()df = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv") \.toDF("id","name","sal","comm","deptno")# 本地数据库jdbc连接df.write.format("jdbc") \.option("driver","com.mysql.cj.jdbc.Driver") \.option("url","jdbc:mysql://localhost:3306/homework") \.option("dbtable","emp1") \.option("user","root") \.option("password","123456") \.save(mode="overwrite")spark.stop()

保存到集群的hive中:

这里的spark环境是yarn,执行代码之前,需要先打开hdfs、yarn、hive(metastore和hiveserver2)

注意:如果没有使用schema,直接用toDf导入的时候,也可以导入成功,但是表中的字段类型就全都是String类型的,使用schema导入的时候可以指定字段的数据类型

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, IntegerTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession \.builder \.appName("测试本地连接hive") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \  #9083 metastore的端口号.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()# 如果没有这样指定表头的数据类型,导入hive中的数据类型全都是String 类型的# 指定了之后,表头的数据类型就是指定的了dept_schema = StructType([StructField(name="id", dataType=IntegerType(), nullable=False),StructField(name="name", dataType=StringType(), nullable=True),StructField(name="address", dataType=StringType(), nullable=True),])df = spark.read.format("csv").load("../../datas/zuoye/dept.csv",schema=dept_schema)df.show()df.write.saveAsTable("homework.dept1")spark.stop()
连接DataGrip

通过DataGrip连接 spark(读取的是hive中的数据)

/opt/installs/spark/sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host=bigdata01 --master yarn --conf spark.sql.shuffle.partitions=2

一个综合案例:

1-统计查询每个省份的总销售额【订单金额要小于1万】:省份、订单金额

2-统计查询销售额最高的前3个省份中,统计各省份单日销售额超过1000的各省份的店铺个数

  • 只对销售额最高的前3个省份做统计:将这三个省份的数据过滤出来

  • 统计每个省份每个店铺每天的销售额超过1000的店铺个数

  • 省份、店铺id【去重】、销售额、天

3-统计查询销售额最高的前3个省份中,每个省份的平均订单金额

  • 只对销售额最高的前3个省份做统计:将这三个省份的数据过滤出来

  • 按照省份分组,求订单金额平均值

4-统计查询销售额最高的前3个省份中,每个省份的每种支付类型的占比

  • 只对销售额最高的前3个省份做统计:将这三个省份的数据过滤出来

  • 支付类型:微信、刷卡、支付宝、现金

  • 支付类型的占比 = 类型支付个数 / 总个数

  • 分组:每个省份每种类型支付的个数 / 每个省份总支付个数

  • 省份、支付类型

读取数据变成DataFrame,并对不合法的数据进行清洗【过滤、转换】

  • 订单金额超过10000的订单不参与统计

  • storeProvince不为空:None, 也不为 ‘null’值

  • 只保留需要用到的字段,将字段名称转换成Python规范:a_b_c

  • 并对时间戳进行转换成日期,获取天

  • 对订单金额转换为decimal类型

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()df = spark.read.json("../../datas/function/retail.json")df.createOrReplaceTempView("orders")# spark.sql("""#     select * from orders# """).show()'''- 核心字段- "storeProvince":订单所在的省份信息,例如:湖南省- "storeID":订单所产生的店铺ID,例如:4064- "receivable":订单收款金额:例如:22.5- "payType":订单支付方式,例如:alipay- "dateTS":订单产生时间,例如:1563758583000'''#数据清洗clearDf = spark.sql("""select storeProvince store_province,storeID store_id,cast(receivable as decimal(10,2)) receivable,payType pay_type,from_unixtime(dateTS/1000,"yyyy-MM-dd") date_tSfrom orders where receivable < 10000 and storeProvince is not null and storeProvince != 'null'""")clearDf.createOrReplaceTempView("clear_orders")# 1-统计查询每个省份的总销售额【订单金额要小于1万】:省份、订单金额df1 = spark.sql("""select store_province,sum(receivable) total_money from clear_orders group by store_province""")df1.createOrReplaceTempView("province_total_money")# 统计查询销售额最高的前3个省份中,统计各省份单日销售额超过1000的各省份的店铺个数# 只对销售额最高的前3个省份做统计:将这三个省份的数据过滤出来qiansanDf = spark.sql("""select * from clear_orders where store_province in (select store_province from province_total_money order by total_money desc limit 3)""")qiansanDf.createOrReplaceTempView("qs_province")# 统计每个省份每个店铺每天的销售额超过1000的店铺个数# spark.sql("""#     select store_province,count(distinct store_id) from qs_province group by store_province having sum(receivable) > 1000# """).show()spark.sql("""with t as(select date_ts,store_id,store_province from qs_province group by date_ts,store_id,store_province having sum(receivable) > 1000)select store_province,count(distinct store_id) store_num from t group by store_province""").show()qiansanDf.createOrReplaceTempView("qs_details")# 统计查询销售额最高的前3个省份中,统计各省份单日销售额超过1000的各省份的店铺个数spark.sql("""with t as(select date_ts,store_id,store_province from qs_details group by date_ts,store_id,store_province having sum(receivable) > 1000)select store_province,count(distinct store_id) store_num from t group by store_province""").show()# 每个省份的平均订单金额spark.sql("""select store_province,round(avg(receivable),2) avg_receivable from qs_details group by store_province""").show()# 每个省份的每种支付类型的占比spark.sql("""with t as(select store_province,pay_type,count(1) total_order from  qs_details group by store_province,pay_type)select store_province,pay_type,round(total_order/(sum(total_order) over(partition by store_province )),2) rate from t """).show()spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/885708.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

react-markdown内容宽度溢出和换行不生效问题

情景复现&#xff1a; 解决办法&#xff0c;添加样式进行限制 /* index.css */ .markdown-container {word-break: break-word; /* 强制长单词断行 */white-space: pre-wrap; /* 保留空白符序列&#xff0c;但是正常地进行换行 */overflow-wrap: break-word; /* 在长单词或…

java双向链表解析实现双向链表的创建含代码

双向链表 一.双向链表二.创建MyListCode类实现双向链表创建一.AddFirst创建&#xff08;头插法&#xff09;二.AddLast创建&#xff08;尾叉法&#xff09;三.size四.remove(指定任意节点的首位删除)五.removeAll(包含任意属性值的所有删除)六.AddIndex(给任意位置添加一个节点…

VMWare虚拟机NAT模式下与外部主机(非宿主机)通信

VMWare虚拟机NAT模式下与外部主机(非宿主机)通信 1. VMWare虚拟机网络 VMWare的三种网络工作模式&#xff1a; Bridged&#xff1a;桥接模式NAT&#xff1a;网络地址转换模式Host-Only &#xff1a;仅主机模式 VMWare 网络连接配置界面如下&#xff1a; 在本次测试环境中&a…

Mac保护电池健康,延长电池使用寿命的好方法

使用Mac的过程中&#xff0c;如何延长电池的使用寿命是大家非常关心的问题&#xff0c;而养成一个良好的充电习惯能够有效的延长电池的使用寿命 避免过度充电和过度放电能够有效的保护电池&#xff0c;因此长时间的充电与长时间放点都不可取&#xff0c;但是在日常的使用过程中…

显示器接口种类 | 附图片

显示器接口类型主要包括VGA、DVI、HDMI、DP和USB Type-C等。 VGA、DVI、HDMI、DP和USB Type-C 1. 观察 VGA接口:15针 DP接口&#xff1a;在DP接口旁&#xff0c;都有一个“D”型的标志。 电脑主机&#xff1a;DP(D) 显示器&#xff1a;VGA(15针) Ref https://cloud.tenc…

qt QUndoCommand 与 QUndoStack详解

1、概述 QUndoCommand 和 QUndoStack 是 Qt 框架中用于实现撤销/重做&#xff08;undo/redo&#xff09;功能的两个核心类。QUndoCommand 是表示单个可撤销操作的基类&#xff0c;而 QUndoStack 则负责管理这些命令的堆栈&#xff0c;提供撤销和重做操作的接口。 QUndoCommand…

双指针(二)双指针到底是怎么个事

一.有效的三角形个数 有效的三角形个数 class Solution {public int triangleNumber(int[] nums) {Arrays.sort(nums);int i0,end nums.length-1;int count 0;for( i end;i>2;i--){int left 0;int right i-1;while(left<right){if(nums[left]nums[right]>nums…

springboot的增删改查商城小实践(b to c)

首先准备一张表&#xff0c;根据业务去设计表 订单编号是参与业务的&#xff0c;他那订单编号里面是有特殊意义的&#xff0c;比如说像什么一些年月日什么的&#xff0c;一些用户的ID都在那编号里面呢&#xff1f;不能拿这种东西当主件啊 根据数据量去决定数据类型 价格需要注意…

动态规划-背包问题——416.分割等和子集

1.题目解析 题目来源 416.分割等和子集——力扣 测试用例 2.算法原理 1.状态表示 这里背包问题基本上和母题的思路大相径庭&#xff0c;母题请见 [模板]01.背包 &#xff0c;这里的状态表示与装满背包的情况类似&#xff0c;第二个下标就是当选择的物品体积直接等于j时是否可…

开源 - Ideal库 -获取特殊时间扩展方法(四)

书接上回&#xff0c;我们继续来分享一些关于特殊时间获取的常用扩展方法。 01、获取当前日期所在月的第一个指定星期几 该方法和前面介绍的获取当前日期所在周的第一天&#xff08;周一&#xff09;核心思想是一样的&#xff0c;只是把求周一改成求周几而已&#xff0c;当然其…

移位寄存器设计—FDRE、SRL16E及原语约束

信号处理中&#xff0c;实现数据对齐时&#xff0c;常常对单bit或多bit信号进行打拍操作&#xff0c;这个可以通过移位寄存器实现&#xff0c;SLICEM中的SRL即为移位寄存器。 这里主要记录下不同写法的效果。 1 //同步复位2 module static_multi_bit_sreg_poor #(3 parame…

AFK架构设计思想概述

一、AKF的核心思想 AKF架构设计的核心思想源于对系统可扩展性、可用性和灵活性的深刻理解。AKF&#xff08;Availability, Scalability, Flexibility&#xff09;架构模型由Martin L. Abbott和Michael T. Fisher在《The Art of Scalability》一书中提出&#xff0c;旨在帮助工…

【layui】echart的简单使用

图表类型切换&#xff08;柱形图和折线图相互切换&#xff09; <title>会员数据</title><div class"layui-card layadmin-header"><div class"layui-breadcrumb" lay-filter"breadcrumb"><a lay-href""&g…

期权开户难不难?期权开户成功后当天是否能交易

期权开户难不难&#xff1f;这取决于投资者的准备情况和所选的开户途径。对于满足一定资金和经验要求的投资者来说&#xff0c;通过正规期货公司或期权交易平台进行开户&#xff0c;虽然流程相对复杂&#xff0c;但只要遵循步骤&#xff0c;仍然可以顺利完成&#xff0c;下文为…

Java并发无锁篇--乐观锁(非阻塞)

共享模型之无锁 问题提出 package com.zjy.unlock;import java.util.ArrayList; import java.util.List;public class AccountDemo {public static void main(String[] args) {Account account new AccountUnsafe(10000);Account.demo(account);} }class AccountUnsafe impl…

分享三个python爬虫案例

一、爬取豆瓣电影排行榜Top250存储到Excel文件 近年来&#xff0c;Python在数据爬取和处理方面的应用越来越广泛。本文将介绍一个基于Python的爬虫程序&#xff0c;用于抓取豆瓣电影Top250的相关信息&#xff0c;并将其保存为Excel文件。 获取网页数据的函数&#xff0c;包括以…

第8章利用CSS制作导航菜单

8.1 水平顶部导航栏 <!DOCTYPE html> <html><head><meta charset"utf-8" /><title></title></head><body><center><h3>简单水平菜单导航栏</h3></center><hr /><nav><ul&g…

Spring Plugin与策略模式:打造动态可扩展的应用

目录 一、策略模式 二、Spring Plugin 2.1 Spring Plugin 实现策略模式开发 2.2 策略模式优缺点 三、Spring Plugin 原理 一、策略模式 策略模式是一种设计模式&#xff0c;它允许程序在运行中动态的选择不同的行为方式进行动态执行。策略模式的核心思想是将行为封装在一个个…

spark的学习-05

SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据&#xff08;统计的都是结构化的数据&#xff09;一般都使用sparkSql处理结构化的数据 结构化的文件&#xff1a;JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表&#xff1a;…

Android 源码的下载与编译

Android 源码的下载与编译 本章节主要介绍安卓系统的编译以及编译产物&#xff0c;根据我自己的经验只总结个人觉得重要的部分。 有价值的博客&#xff1a; https://blog.csdn.net/wuye110/article/details/8463409 https://juejin.cn/post/7288166472131018786 值得一看的…