SparkSession对象操作--学习笔记

1,SparkSession对象操作

    from pyspark.sql import SparkSessionfrom pyspark import SparkConffrom pyspark.sql import functions as F"""创建ss对象时可以指定一些参数如果参数在脚本中不生效,就需要通过saprk-submit指令中进行设置spark sql 的分区数是由catalyst引擎的优化器决定发生shuffle过程(遇到宽依赖算子时)分区数会调整为200个,200个分区对应者200个task任务可以通过spark.sql.shuffffle.partitions调整shuffle过程中的分区数(根据实际业务情况调整)"""# conf = SparkConf().set('driver-mimory','2g').set('num-executors','3')conf = SparkConf().set('spark.sql.shuffle.partitions','6')ss = SparkSession.\builder.\master('yarn').\appName('yarn_demo').\config(conf=conf).\getOrCreate()# 创建sc对象sc= ss.sparkContext#读取hdfs上的文件数据转换成rdd对象rdd1 = sc.textFile('/test/stu.txt')rdd_split = rdd1.map(lambda x:x.split(','))print(rdd1.take(10))df1 = rdd_split.toDF(schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')df1.show()#通过cast()修改字段类型,格式为df.列名.cast('修改后的列名')df_select4 = df1.select(df1.id.cast('int'),df1.name,df1['age'].cast('int'),df1['gender'],df1['major'],df1['birthday'])# print(df_select4.collect())df_groupby= df_select4.groupby('gender').agg(F.avg('age').alias('avg_age'))df_groupby.show()

2,数据源和格式

1.1数据读取

from pyspark.sql import SparkSession,functions as Fss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()print('--------------------txt格式文件----------------')#将读取到的数据保存到value列中df1 = ss.read.text(paths='/test/words.txt')# df1.show(truncate=False)# df1.printSchema()df_txt =df1.select(F.split('value',',')[0].alias('id'),F.split('value', ',')[1].alias('name'),F.split('value', ',')[2].alias('age'),F.split('value', ',')[3].alias('gender'),)# df_txt.show()# df_txt.printSchema()print('--------------------csv格式文件----------------')#path:文件路径#sep:分隔符,默认时逗号# schema:表结构,列名,类型# header:加载第一行列名信息#inferSchema:自动解析表结构df_csv = ss.read.csv(path ='/test/stu.csv',sep=',',inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')df_csv.show()#另一种书写方式ss.read.load(path= '/test/stu.csv',format='csv',schema='name string,age int,gender string,phone string,email string,city string,address string').show()# ss.read.format('csv')print('--------------------json格式文件----------------------')df_json = ss.read.json(path = '/test/x0.json')df_json.show(truncate=False)print('--------------------mysql格式文件----------------------')df_mysql = ss.read.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=utf-8',table='test',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})df_mysql.show()print('--------------------orc格式文件读取----------------------')df_orc = ss.read.orc('file:///export/server/spark/examples/src/main/resources/users.orc')df_orc.show()print('--------------------parquet格式文件读取----------------------')df_parquet = ss.read.parquet('file:///export/server/spark/examples/src/main/resources/users.parquet')df_parquet.show()

1.2数据写入

    """ss.write.text/json/csv/jdbc()mode:写入模式overwrite:覆盖写,append:追加写"""from pyspark.sql import SparkSession,functions as Fss = SparkSession.builder.getOrCreate()df = ss.createDataFrame([[1,'张三',20,'男'],[2,'王五',21,'女']],schema='id int,name string,age int,gender string')df.show()print('-----------------text文件---------------')#对字符串进行处理,以字符串类型保存到value字段中df_text = df.select(F.concat_ws(',','id','name','age','gender').alias('value'))df_text.show()# path:目录路径  按照分区数据写入到目录下的文件中# df_text.write.text(path='/test/data_test')# df_text.write.save(path='/test/data_test',format='text',mode='append')df_text.write.mode('overwrite').format('text').save(path='/test/data_test')print('-----------------csv文件---------------')#header:是否将列名写入df.write.csv(path='/test/data_csv',mode='overwrite',header=True)print('-----------------json文件---------------')df.write.mode('overwrite').format('json').save(path='/test/data_json')print('-----------------mysql表文件---------------')#参数说明#table:表不存在的话会自动创建#mode:写入的模式有两种overwrite和append,需要指定,不指定第一遍创建可以成功第二遍创建会失败df.write.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=utf-8',table='test2',mode='append',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

3,自定义函数

业务中需求无法是使用内置函数处理数据时,可以来自己定义函数实现需求处理

3.1分类

  • UDF函数
    • 一对一关系, df中的一行数据经过函数处理返回一行计算结果
    • concat()/concat_ws()/split()…
    • 可以自定义
  • UDAF函数 聚合函数
    • 多对一关系, df中的多行数据经过函数处理返回一行计算结果
    • sum()/avg()/count()…
    • 可以自定义
  • UDTF函数
    • 一对多关系, df中的一行数据经过函数处理返回多行计算结果
    • explode() -> 爆炸函数, 接受容器类型(array or map type), 将容器中的元素拆分成多行

3.1UDTF函数

    from pyspark.sql import SparkSession,functions as Fss = SparkSession.builder.getOrCreate()#读取文件df1 = ss.read.text(paths='/test/words.txt')df1.show(truncate=False)#对value字段中的字符串数据以逗号进行分割,返回列表df_split = df1.select(F.split('value',',').alias('words_list'))df_split.show(truncate=False)#使用udtf函数对列表进行拆分df3 = df_split.select(F.explode('words_list').alias('word'))df3.show()df3.groupby('word').count().orderBy('count',ascending=False).show()#sql方式df1.createTempView('test')res_df = ss.sql('select split(value,',') from test')res_df.show()res_df.printSchema()

3.2,UDF函数使用

自定义udf函数需要啊先注册才能够使用

两种注册方式:

普通注册:
    import refrom pyspark.sql import SparkSession, functions as Ffrom pyspark.sql.types import StringType, ArrayTypess = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()df_csv = ss.read.csv(path='/test/stu.csv', sep=',', inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')df_csv.show()# 需求获取用户名称公司名称信息def get_emial(x):# print('x的值是'+x)# 通过正则表达式获取想要的部分res = re.match("(.*?)@(.*?)\.(.*)", x)# print(res)name = res.group(1)company = res.group(2)# 返回列表return [name, company]# 方式一:普通注册email_func = ss.udf.register(name='new_func', f=get_emial, returnType=ArrayType(StringType()))# 在df对象中使用自定义函数new_df = df_csv.select('name', 'age', email_func('email'))new_df.show()# Sql方式df_csv.createTempView('stu')sql_df = ss.sql("select name,age,new_func(email)[0] as user_name,new_func(email)[1] as company from stu")sql_df.show()
装饰器注册方式

UDF只能在DSL方式中使用

    import refrom pyspark.sql import SparkSession, functions as Ffrom pyspark.sql.types import *ss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()df_csv = ss.read.csv(path='/test/stu.csv', sep=',', inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')# 需求获取用户名称公司名称信息#步骤一:定义函数#步骤2,将自定义@F.udf(returnType=ArrayType(StringType()))def get_emial(x):# print('x的值是'+x)# 通过正则表达式获取想要的部分res = re.match("(.*?)@(.*?)\.(.*)", x)# print(res)name = res.group(1)company = res.group(2)# 返回列表return [name, company]# 在df对象中使用自定义函数new_df = df_csv.select('name', 'age', get_emial('email'))new_df.show()# # Sql方式,装饰器方式不能在sql方式中使用# df_csv.createTempView('stu')# sql_df = ss.sql("select name,age,new_func(email)[0] as user_name,new_func(email)[1] as company from stu")# sql_df.show()

4,UDAF函数

注意:UDAF函数需要借助pandas中的series类型进行操作

UDAF函数中多行数就是pandas中的series类型数据

pandas介绍:

pandas是python中一个数据分析包,需要通过pip install pandas进行安装

4.1pandas有两种数据类型:Series和DataFrame

    import pandas as pd#创建series对象s1 = pd.Series(data=[1,2,3,4])#不指定索引时默认生成0,1,2,3,4print(s1)#指定行索引 index=s2 = pd.Series(data=(5,6,7,8),index=['a','b','c','d'])print(s2)print(type(s2))print(type(s1))#获取具体值#根据行索引获取对应位置的值print(s1[0])#通过key获取值print(s2['a'])#使用聚合函数print(s1.sum())print(s1.mean())# print(s1.cumsum())#获取行索引print(s1.index)

4.2dataFrame对象操作

import pandas as pd#创建对象
df = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23]])
print(df)#指定行列索引
df2 = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23],[3,'w',12]],index=['a','b','c'],columns=['id','name','age'])
print(df2)
#获取df中的数据值
#通过df[列名]->获取列数据
print(df2['id'])
print(df2.age)
#得到一个df对象
print(df2[['id','name']])
"""
loc:通过索引标签值获取数据
iloc:通过索引下标值获取数据
"""
#获取行数据
print(df2.loc['a'])
print(df2.iloc[0])#获取列数据
print(df2.loc[:,'id'])
print(df2.loc[:,['id','name']])
print(df2.iloc[:,0])
print(df2.iloc[:,[0,2]])
#获取行列数据
print(df2.loc['b','name'])
print(df2.iloc[1,1])#聚合函数
print(df2.sum())
print(df2['age'].mean())#分组聚合
print(df2.groupby('id')['age'].sum())

4.3pandas和spark的df相互转换

import pandas as pddf2 = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23],[3,'w',12]],index=['a','b','c'],columns=['id','name','age'])from pyspark.sql import SparkSession# 创建ss对象ss = SparkSession.builder.getOrCreate()
spark_df = ss.createDataFrame(data=df2,schema='id int,name string,age int')spark_df.show()#saprk的df对象转换成pandas的df对象
new_pandas_dfd = spark_df.toPandas()
print(new_pandas_dfd)

4.4UDAF函数使用

注意点:需要安装pyspark模块

pyspark代码是会转换成java代码, 而pandas是python中特有的模块, java中没有此模块

自定义UDAF函数只能通过装饰器方式注册

自定义UDAF函数只能在DSL方式中使用

import pandas as pd
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()df_csv = ss.read.csv(path ='/test/stu.csv',sep=',',inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')df_csv.groupby('gender').agg(F.mean('age').alias('avg_age')).show()
#手写聚合函数mean()
"""
注意点:
①:需要指定自定义函数的参数类型,pandas的series类型
②:需要指定自定义函数的返回值类型python的类型
"""
@F.pandas_udf(returnType=FloatType())
def avg_age(age:pd.Series) ->float:print('age的值',age)res = age.mean()return res#第三步在df对象中使用udaf函数
df_csv.select(avg_age('age')).show()#sql方式
#将自定义rdaf函数
new_func = ss.udf.register('new_func',avg_age)
df_csv.createTempView('stu')
ss.sql("select gender,new_func(age) from stu group by gender").show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/630788.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 vsCode创建GO项目

最近回顾了一下go的使用:具体操作看下面的参考连接,下面只描述一些踩过的坑: 1. go安装配置 安装go->配置go环境变量 推荐官网下载,速度很快; 这里需要配置五个参数:GOPATH/GOROOT/Path、GO111MODULE/…

基于kubernetes部署MySQL主从环境

部署方式 通过部署mysql主从容器,配置主从pod之间数据同步。 配置数据库访问的密码 创建 Mysql 密码的 Secret [rootk8s-master1 master]# kubectl create secret generic mysql-password --namespaceapp --from-literalmysql_root_passwordroot secret/mysql-pas…

Vue2:给组件绑定自定义事件

一、场景描述 我们在页面开发中,难免要使用事件。 在之前的学习中,我们学过click、keyup、change等事件,这些是Vue自带的事件。 它一般是用在原生的HTML元素上的。在组件上使用需要加native修饰 比如: h1绑定一个click事件&…

如何快速打造属于自己的接口自动化测试框架

1 接口测试 接口测试是对系统或组件之间的接口进行测试,主要是校验数据的交换,传递和控制管理过程,以及相互逻辑依赖关系。 接口自动化相对于UI自动化来说,属于更底层的测试,这样带来的好处就是测试收益更大&#xff…

WordPress设置回收站自动清理天数的插件Change Empty Trash Time

前面boke112百科跟大家分享的『WordPress回收站自动清空时间?如何关闭回收站或设置自动清理天数?』一文,就介绍了可以添加一行代码实现关闭或设置回收站自动清理时间,也可以通过安装Change Empty Trash Time插件来实现。 今天bok…

【论文阅读】One For All: Toward Training One Graph Model for All Classification Tasks

目录 0、基本信息1、研究动机2、创新点——One For All :unique features3、准备4、具体实现4.1、用TAGs统一来自不同领域的图数据4.2、用NOI(NODES-OF-INTEREST)统一不同图任务4.2.1、NOI子图4.2.2、NOI提示结点 4.3、用于图的上下文学习&am…

TypeError the JSON object must be str, bytes or bytearray, not ‘list‘

在使用python的jason库时,偶然碰到以下问题 TypeError: the JSON object must be str, bytes or bytearray, not ‘list’ 通过如下代码可复现问题 >>> a [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> import json >>> ra json.loads(a) Trac…

java大数据hadoop2.9.2 Java编写Hadoop分析平均成绩

1、准备文件&#xff0c;例如score.txt&#xff0c;内容如下&#xff1a; zs k1 88 ls k1 98 ww k1 78 zs k2 88 ls k2 98 ww k2 78 zs k3 88 ls k3 98 ww k3 78 2、创建maven项目 <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --><d…

【XTuner 大模型单卡低成本微调实战】学习笔记

参考学习教程【XTuner 大模型单卡低成本微调实战】 理论 Finetune简介 大语言模型 微调模式 增量预训练 指令跟随微调 LoRA和QLoRA Xtuner介绍 实战 自定义微调 用 Medication QA 数据集进行微调 将数据转为 XTuner 的数据格式 目标格式&#xff1a;(.jsonL) 写提示词请C…

ChatGPT 商业提示词攻略书

原文&#xff1a;ChatGPT Business Prompt Playbook 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 一、书系介绍 人工智能发展迅速。非常迅速。 所以我希望你做两件事&#xff1a; (1) 在 Twitter 上关注我&#xff1a;iamkylebalmer (2) 订阅我的免费电子邮件通…

react+antd,Table表头文字颜色设置

1、创建一个自定义的TableHeaderCell组件&#xff0c;并设置其样式为红色 const CustomTableHeaderCell ({ children }) > (<th style{{ color: "red" }}>{children}</th> ); 2、将CustomTableHeaderCell组件传递到Table组件的columns属性中的titl…

SpringMVC JSON数据处理见解6

6.JSON数据处理 6.1.添加json依赖 springmvc 默认使用jackson作为json类库,不需要修改applicationContext-servlet.xml任何配置&#xff0c;只需引入以下类库springmvc就可以处理json数据&#xff1a; <!--spring-json依赖--> <dependency><groupId>com.f…

flutter 播放SVGA动图

SVGAPlayer-Flutter&#xff1a;这是一个轻量级的动画渲染库&#xff0c;可以通过Flutter CustomPainter原生渲染动画&#xff0c;为您带来高性能&#xff0c;低成本的动画体验123。 您可以按照以下步骤使用 SVGAPlayer-Flutter 插件&#xff1a; 1.在 pubspec.yaml 文件中添…

美易官方:苹果承认GPU安全漏洞存在:iPhone 12和M2系列受影响

苹果承认GPU安全漏洞存在&#xff1a; iPhone 12和M2 MacBook Air受影响 近日&#xff0c;苹果公司承认其部分产品存在GPU安全漏洞&#xff0c;这些漏洞可能会影响iPhone 12和M2 MacBook Air等设备的安全性。这一消息引起了广泛的关注和担忧&#xff0c;因为这些设备是许多用户…

MySQL窗口函数(MySQL Window Functions)

1、窗口函数基本概念 官网地址&#xff1a;https://dev.mysql.com/doc/refman/8.0/en/window-functions.html 窗口可以理解为 记录集合&#xff0c;窗口函数就是在满足某种条件的记录集合上执行的特殊函数。 即&#xff1a;每条记录都要在此窗口内执行函数。 静态窗口&#x…

springboot集成shiro+前端vue,前后端分离项目遇到跨域以及sessionid拿不到等问题

近期在写前后端分离的项目&#xff0c;由于前后端分离导致原来使用的shiro配置无法满足现有系统要求。同时在前后端分离项目中存在的一些问题。例如&#xff0c;一些用户信息需要存储在后端方便进行安全性判断&#xff0c;但这些存储在后端的session前端却获取不到&#xff08;…

Docker本地私有仓库搭建配置指导

一、说明 因内网主机需要拉取镜像进行Docker应用&#xff0c;因此需要一台带外主机作为内网私有仓库来提供内外其他docker业务主机使用。参考架构如下&#xff1a; 相关资源&#xff1a;加密、Distribution registry、Create and Configure Docker Registry、Registry部署、D…

K8s-架构

一、K8s节点划分 K8s集群包含Master(控制节点)和Node(工作节点)&#xff0c;应用部署在Node节点上。 集群架构图&#xff1a; 二、Master节点 Master节点分成四个组件&#xff1a;scheduler、ApiServer、Controller Manager、ETCD。类似三层结构&#xff0c;controller&#…

2024年外贸新兴市场有哪些 | 箱讯科技国际贸易平台

当前欧美市场经济增速放缓&#xff0c;通胀持续高位导致物价普遍上涨&#xff0c;进一步引发消费疲软。此外&#xff0c;受原材料价格、劳动力、土地等经营成本上升影响&#xff0c;外贸出口企业利润被进一步压缩。 困顿之中&#xff0c;新兴市场成为破局关键&#xff0c;巨大的…

Mysql流程控制函数

1概述 Mysql中的流程控制函数非常重要&#xff0c;可以根据不同的条件&#xff0c;执行不同的流程转换&#xff0c;可以在SQL语句中实现不同的条件选择。MySQL中的流程处理函数主要包括IF()、IFNULL()和CASE()函数。 1.1 IF函数 SELECT IF(1 > 0, 正确, 错误);1.2 IFNULL…