Pyspark

2、DataFrame

2.1 介绍

在Spark语义中,DataFrame是一个分布式的行集合,可以想象为一个关系型数据库的表,或者一个带有列名的Excel表格。它和RDD一样,有这样一些特点:

  • Immuatable:一旦RDD、DataFrame被创建,就不能更改,只能通过transformation生成新的RDD、DataFrame
  • Lazy Evaluations:只有action才会触发Transformation的执行
  • Distributed:DataFrame和RDD一样都是分布式的
  • dataframe和dataset统一,dataframe只是dataset[ROW]的类型别名。由于Python是弱类型语言,只能使用DataFrame

DataFrame vs RDD

  • RDD:分布式的对象的集合,Spark并不知道对象的详细模式信息
  • DataFrame:分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。
  • DataFrame和普通的RDD的逻辑框架区别如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qMpnxi2C-1691306807806)(pics/s13.png)]

  • 左侧的RDD Spark框架本身不了解 Person类的内部结构。

  • 右侧的DataFrame提供了详细的结构信息(schema——每列的名称,类型)

  • DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where …)。

  • DataFrame还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。

  • RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

  • DataFrame的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了

  • 通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

  • DataFrame相当于是一个带着schema的RDD

Pandas DataFrame vs Spark DataFrame

  • Cluster Parallel:集群并行执行
  • Lazy Evaluations: 只有action才会触发Transformation的执行
  • Immutable:不可更改
  • Pandas rich API:比Spark SQL api丰富

2.2 创建DataFrame

1,创建dataFrame的步骤

​ 调用方法例如:spark.read.xxx方法

2,其他方式创建dataframe

  • createDataFrame:pandas dataframe、list、RDD

  • 数据源:RDD、csv、json、parquet、orc、jdbc

    jsonDF = spark.read.json("xxx.json")jsonDF = spark.read.format('json').load('xxx.json')parquetDF = spark.read.parquet("xxx.parquet")jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
    
  • Transformation:延迟性操作

  • action:立即操作

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pdJJY02F-1691306807807)(pics/s14.png)]

2.3 DataFrame API实现

基于RDD创建

from pyspark.sql import SparkSession
from pyspark.sql import Rowspark = SparkSession.builder.appName('test').getOrCreate()
sc = spark.sparkContext
# spark.conf.set("spark.sql.shuffle.partitions", 6)
# ================直接创建==========================
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
#为数据添加列名
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
#创建DataFrame
schemaPeople = spark.createDataFrame(people)

从csv中读取数据

# ==================从csv读取======================
#加载csv类型的数据并转换为DataFrame
df = spark.read.format("csv"). \option("header", "true") \.load("iris.csv")
#显示数据结构
df.printSchema()
#显示前10条数据
df.show(10)
#统计总量
df.count()
#列名
df.columns

增加一列

# ===============增加一列(或者替换) withColumn===========
#定义一个新的列,数据为其他某列数据的两倍
#如果操作的是原有列,可以替换原有列的数据
df.withColumn('newWidth',df.SepalWidth * 2).show()

删除一列

# ==========删除一列  drop=========================
#删除一列
df.drop('cls').show()

统计信息

#================ 统计信息 describe================
df.describe().show()
#计算某一列的描述信息
df.describe('cls').show()   

提取部分列

# ===============提取部分列 select==============
df.select('SepalLength','SepalWidth').show()

基本统计功能

# ==================基本统计功能 distinct count=====
df.select('cls').distinct().count()

分组统计

# 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'})
df.groupby('cls').agg({'SepalWidth':'mean','SepalLength':'max'}).show()# avg(), count(), countDistinct(), first(), kurtosis(),
# max(), mean(), min(), skewness(), stddev(), stddev_pop(),
# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()

自定义的汇总方法

# 自定义的汇总方法
import pyspark.sql.functions as fn
#调用函数并起一个别名
df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()

拆分数据集

#====================数据集拆成两部分 randomSplit ===========
#设置数据比例将数据划分为两部分
trainDF, testDF = df.randomSplit([0.6, 0.4])

采样数据

# ================采样数据 sample===========
#withReplacement:是否有放回的采样
#fraction:采样比例
#seed:随机种子
sdf = df.sample(False,0.2,100)

查看两个数据集在类别上的差异

#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类
diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))
diff_in_train_test.distinct().count()

交叉表

# ================交叉表 crosstab=============
df.crosstab('cls','SepalLength').show()

udf

udf:自定义函数

#================== 综合案例 + udf================
# 测试数据集中有些类别在训练集中是不存在的,找到这些数据集做后续处理
trainDF,testDF = df.randomSplit([0.99,0.01])diff_in_train_test = trainDF.select('cls').subtract(testDF.select('cls')).distinct().show()#首先找到这些类,整理到一个列表
not_exist_cls = trainDF.select('cls').subtract(testDF.select('cls')).distinct().rdd.map(lambda x :x[0]).collect()#定义一个方法,用于检测
def should_remove(x):if x in not_exist_cls:return -1else :return x#创建udf,udf函数需要两个参数:
# Function
# Return type (in my case StringType())#在RDD中可以直接定义函数,交给rdd的transformatioins方法进行执行
#在DataFrame中需要通过udf将自定义函数封装成udf函数再交给DataFrame进行调用执行from pyspark.sql.types import StringType
from pyspark.sql.functions import udfcheck = udf(should_remove,StringType())resultDF = trainDF.withColumn('New_cls',check(trainDF['cls'])).filter('New_cls <> -1')resultDF.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/31881.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ssm+vue基于java的少儿编程网上报名系统源码和论文PPT

ssmvue基于java的少儿编程网上报名系统源码和论文PPT006 开发工具&#xff1a;idea 数据库mysql5.7(mysql5.7最佳) 数据库链接工具&#xff1a;navcat,小海豚等 开发技术&#xff1a;java ssm tomcat8.5 摘 要 在国家重视教育影响下&#xff0c;教育部门的密确配合下&#…

沐渥六门氮气柜技术参数详解

氮气柜是用来存储电子元器件、芯片、半导体器件、金属材料、电路板、精密仪器等物品的设备&#xff0c;通过充入氮气降低柜内湿度&#xff0c;达到防潮、防氧化、防静电、防锈和防霉效果。 六门氮气柜参数 1、容积&#xff1a;约1380L&#xff1b;外尺寸&#xff1a;W1200*D700…

100G光模块的应用案例分析:电信、云计算和大数据领域

100G光模块是一种高速光模块&#xff0c;由于其高速率和低延迟的特性&#xff0c;在电信、云计算和大数据领域得到了广泛的应用。在本文中&#xff0c;我们将深入探讨100G光模块在这三个领域的应用案例。 一、电信领域 在电信领域&#xff0c;100G光模块被广泛用于构建高速通…

Nginx使用proxy_cache指令设置反向代理缓存静态资源

场景 CentOS7中解压tar包的方式安装Nginx&#xff1a; CentOS7中解压tar包的方式安装Nginx_centos7 tar文件 怎么load_霸道流氓气质的博客-CSDN博客 参考上面流程实现搭建Nginx的基础上&#xff0c;实现静态资源的缓存设置。 注意上面安装时的目录是在/opt/nginx目录下&…

ELK中grok插件、mutate插件、multiline插件、date插件的相关配置

目录 一、grok 正则捕获插件 自定义表达式调用 二、mutate 数据修改插件 示例&#xff1a; ●将字段old_field重命名为new_field ●添加字段 ●将字段删除 ●将filedName1字段数据类型转换成string类型&#xff0c;filedName2字段数据类型转换成float类型 ●将filedNam…

Leetcode每日一题:1289. 下降路径最小和 II(2023.8.10 C++)

目录 1289. 下降路径最小和 II 题目描述&#xff1a; 实现代码与解析&#xff1a; 动态规划 原理思路&#xff1a; 1289. 下降路径最小和 II 题目描述&#xff1a; 给你一个 n x n 整数矩阵 grid &#xff0c;请你返回 非零偏移下降路径 数字和的最小值。 非零偏移下降路…

数据结构-1

1.2 线性结构树状结构网状结构&#xff08;表 数 图&#xff09; 数据&#xff1a;数值型 非数值型 1.2.3数据类型和抽象数据类型 1.3抽象数据类型 概念小结&#xff1a; 线性表: 如果在独立函数实现的 .c 文件中需要包含 stdlib.h 头文件&#xff0c;而主函数也需要包含 st…

短视频账号矩阵系统/技术开发搭建私有部署

本系统是基于短视频领域的新一代系统&#xff0c;旨在提供一个高效、全面的短视频管理与分发平台。系统采用先进的开发算法和技术&#xff0c;实现了智能化视频分类、推荐和用户互动功能。 目录 一、抖音SEO账号矩阵系统的开发和部署遵循以下原则&#xff1a; 二、账号矩阵绑…

谷歌全栈多平台应用开发神器Project IDX来了!PaLM 2加持,代码效率翻倍

一直以来&#xff0c;从0开始构建应用&#xff0c;都是一项复杂的工作。尤其是跨越手机、Web和桌面平台的程序。 这是一片无尽的复杂海洋&#xff0c;需要把技术堆栈融合在一起&#xff0c;来引导、编译、测试、部署、监控应用程序。 多年来&#xff0c;谷歌一直致力于让多平…

EFLFK——ELK日志分析系统+kafka+filebeat架构

环境准备 node1节点192.168.40.16elasticsearch2c/4Gnode2节点192.168.40.17elasticsearch2c/4GApache节点192.168.40.170logstash/Apache/kibana2c/4Gfilebeat节点192.168.40.20filebeat2c/4G https://blog.csdn.net/m0_57554344/article/details/132059066?spm1001.2014.30…

HTTP代理授权方式介绍

在网络爬虫过程中&#xff0c;我们经常需要使用HTTP代理来实现IP隐藏、突破限制或提高抓取效率。而为了确保代理的正常使用&#xff0c;并避免被滥用&#xff0c;代理服务商通常会采用授权方式。在本文中&#xff0c;我们将介绍几种常见的HTTP代理授权方式&#xff0c;以帮助你…

matplotlib 设置legend的位置在轴最上方,长度与图的长度相同

import matplotlib.pyplot as plt import numpy as npx1 np.linspace(0, 10, 50) x2 [6,4,3]ax plt.subplot() ax.plot(x1, label"test1") ax.plot(x2, label"test2") # 设置图例的位置 # 将左下角放置在【0, 1.02】位置处&#xff0c;横为1&#xff0c…

9.2.1Socket(UDP)

一.传输层: 1.UDP:无连接,不可靠,面向数据报,全双工. 2.TCP:有连接,可靠,面向字节流,全双工. 注意:这里的可不可靠是相对的,并且和安不安全无关. 二.UDP数据报套接字编程: 1.socket文件:表示网卡的这类文件. 2.DatagramPacket:表示一个UDP数据报. 三.代码实现: 1.回显服务…

原型和原型链理解

这个图大概能概括原型和原型链的关系 1.对象都是通过 _proto_ 访问原型 2.原型都是通过constructor 访问构造函数 3.原型是构造函数的 prototype 4.原型也是对象实例 也是通过 _proto_ 访问原型(Object.prototype) 5.Object.prototype的原型通过 _proto_ 访问 为null 那么…

【ChatGPT】自我救赎

ChatGPT辅助学习C之【在C中如果大数据类型转小数据类型会发生什么呢?】&#xff0c;今天问ChatGPT一个问题&#xff0c;让它解析下面这个C程序&#xff1a; #include <iostream> #include <cstdio> using namespace std; int main() {int a;long long b532165478…

初学HTML:在线简易画板设计。

最近在HTML&#xff0c;记录下一点点成果。 设计了一个简易画板&#xff0c;通过HTML的Canvas元素实现一个在线画板&#xff0c;用户可以在上面绘制图形或涂鸦。 下面是运行效果&#xff1a; 下面是代码&#xff1a; <!DOCTYPE html> <html> <head><ti…

【Nginx】静态资源部署、反向代理、负载均衡

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ nginx静态资源部署、反向代理、负载均衡 &…

详细教程:如何搭建废品回收小程序

废品回收是一项环保举措&#xff0c;通过回收和再利用废弃物品&#xff0c;可以减少资源浪费和环境污染。近年来&#xff0c;随着智能手机的普及&#xff0c;小程序成为了推广和运营的重要工具。本文将详细介绍如何搭建一个废品回收小程序。 1. 进入乔拓云网后台 首先&#xf…

微信朋友圈置顶功能已大范围上线!

微信是目前全球最受欢迎的社交媒体应用之一&#xff0c;拥有数十亿的用户。作为一款持续发展和改进的应用&#xff0c;微信不断推出新的功能来提升用户体验。 近日&#xff0c;iOS微信8.0.41内测版迎来了更新&#xff0c;本次更新距离上个正式版间隔了大概10天的时间。 微信朋友…