【小贪】大数据处理:Pyspark, Pandas对比及常用语法

近期致力于总结科研或者工作中用到的主要技术栈,从技术原理到常用语法,这次查缺补漏当作我的小百科。主要技术包括:

  • ✅数据库常用:MySQL, Hive SQL, Spark SQL
  • ✅大数据处理常用:Pyspark, Pandas
  • ⚪ 图像处理常用:OpenCV, matplotlib
  • ⚪ 机器学习常用:SciPy, Sklearn
  • ⚪ 深度学习常用:Pytorch, numpy
  • ⚪ 常用数据结构语法糖:itertools, collections
  • ⚪ 常用命令: Shell, Git, Vim

以下整理错误或者缺少的部分欢迎指正!!!

大数据处理常用:Pyspark, Pandas

性能对比

PysparkPandas
运行环境分布式计算集群(Hadoop/Apache Spark集群)单个计算机
数据规模亿级大规模百万级小规模
优势分布式计算->并行处理,处理速度快API简单->数据处理简单
延迟机制lazy execution, 执行动作之前不执行任务eager execution, 任务立即被执行
内存缓存persist()/cache()将转换的RDDs保存在内存单机缓存
DataFrame可变性不可变,修改则返回一个新的DataFrame可变
可扩展性
列名允许重复×

常用语法对比

# 头文件
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType  # 或者直接导入*
import pandas as pd# 创建SparkSession对象
spark = SparkSession.builder \.appName("username") \.getOrCreate()# 创建空表
schema = StructType([StructField('id', LongType()),StructField('type', StringType()),])  # spark需要指定列名和类型
spark_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=schema)
pandas_df = pd.DataFrame(columns=['id', 'type'], index=[0, 1, 2])# 根据现有数据创建
data = [(1, "Alice", 2000), (2, "Bob", 2001), (3, "Charlie", 2002)]
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("birth_year", IntegerType(), True)
])
spark_df = spark.createDataFrame(data, ["id", "name", "birth_year"])
spark_df = spark.createDataFrame(data, schema)
pandas_df = pd.DataFrame(data=data, columns=["id", "name", "birth_year"])# 读取csv文件
spark_df = spark.read.csv("data.csv", header=True, inferSchema=True)
pandas_df = pd.read_csv("data.csv", sep="\t")  # read_excel
# 保存数据到csv
spark_df.write.csv('data.csv', header=True)
pandas_df.to_csv("data.csv", index=False)# 读取hive表数据
spark_df = spark.sql('select * from tab')
# 保存数据到hive表
spark_df.write.mode('overwrite').saveAsTable('db_name.tab_name')# 相互转换
spark_df = SQLContext.createDataFrame(pandas_df)
pandas_df = spark_df.toPandas()# 转换数据类型
spark_df = spark_df.withColumn("A", col("age").cast(StringType))
pandas_df["A"] = pandas_df['A'].astype("int")# 重置索引
spark_df = spark_df.withColumn("id", monotonically_increasing_id())  # 生成一个增长的id列
pandas_df.reset_index()# 切片
pandas_df['a':'c']  # a-c三行
pandas_df.iloc[1:3, 0:2]  # 1-2行,0-1列。左闭右开
pandas_df.iloc[[0, 2], [1, 2]] # 第0,2行第0,2列
pandas_df.loc['a':'c', ['A', 'B']] # 第a-c行A,B列# 选择列
spark_df.select('A', 'B')
pandas_df[['A', 'B']]# 删除列
spark_df.drop('A', 'B')
pandas_df.drop(['A', 'B'], axis=1, inplace=True)  # inplace表示是否创建新对象# 新增列,设置列值
spark_df = spark_df.withColumn('name', F.lit(0))
pandas_df['name'] = 0# 修改列值
spark_df.withColumn('name', 1)
pandas_df['name'] = 1
# 使用函数修改列值
spark_df = spark_df.withColumn('code', F.when(F.isnull(spark_df.code), 0).otherwise(spark_df.code))# 修改列名
spark_df.withColumnRenamed('old_name', 'new_name')
pandas_df.rename(columns={'old_name1': 'new_name1', 'old_name1': 'new_name2'}, inplace=True)# 显示数据
spark_df.limit(10) # 前10行
spark_df.show/take(10)  # collect()返回全部数据
spark_df/pandas_df.first/head/tail(10)# 表格遍历
saprk_df.collect()[:10]
spark_df.foreach(lambda row: print(row['c1'], row['c2']))
for i, row in pandas_df.iterrows():print(row["c1"], row["c2"])# 排序
spark/pandas_df.sort()  # 按列值排序
pandas_df.sort_index()  # 按轴排序
pandas_df.sort_values(by=["A", "B"], axis=0, ascending=[True, False], inplace=True)  # 指定列升序/降序排序# 过滤
spark_df.filter(df['col_name'] > 1)     # spark_df.where(df['col_name'] > 1)
pandas_df[pandas_df['col_name'] > 1]
pandas_df_new = pandas_df[pandas_df["code"].apply(lambda x: len(x) == 11)]# 去重
spark_df.select('col_name').distinct()
spark_df_filter = spark_df.drop_duplicates(["col_name"])
pandas_df.drop_duplicates(["col_name"], keep='first', inplace=True)# 缺失数据处理
spark_df.na.fill()
spark_df.na.drop(subset=['A', "B"])  # 同dropna
pandas_df.fillna()
pandas_df.dropna(subset=['A', "B"], how="any", inplace=True)# 空值过滤 filter=choose
spark_df.filter(~(F.isnull(spark_df.d)))
spark_df.filter(~(spark_df['A'].isNull() | spark_df['B'].isNull()))   # 选出列值不为空的行  isnan()=isNull()<->isNOtnan()
pandas_df[pandas_df['A'].isna()]  # 选出列值为空的行
pandas_df[pandas_df['A'].notna()] # 选出列值不为空的行# 统计
spark/pandas_df.count()  # spark返回总行数,pandas返回列非空总数
spark/pandas_df.describe() # 描述列的count, mean, min, max...# 计算某一列均值
average_value = spark_df.select("col_name").agg({"col_name": "avg"}).collect()[0][0]
average_value = pandas_df["col_name"].mean()# 表合并
# 按行合并,相当于追加
spark_df = spark_df.unionAll(spark_df1)
pandas_df = pd.concat([df_up, df_down], axis=0)
# 按列合并
spark_df = spark_df.join(df1, df1.id==spark_df.id, 'inner').drop(df1.id)  # df1.id==spark_df.id也可写成['id](当且仅当列名相同)
pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer")  # 聚合函数
spark_df_collect = spark_df.groupBy('number').agg(F.collect_set('province').alias('set_province'),F.first('city').alias('set_city'),F.collect_list('district').alias('set_district'),F.max('report_user').alias('set_report_user'),F.min('first_type').alias('set_first_type'))
# 分组聚合
spark_df.groupBy('A').agg(F.avg('B'), F.min('B'))
spark/pandas_df.groupby('A').avg('B')# 根据函数分组聚合
def func(x):return pd.DataFrame({"A": x["A"].tolist()[0],"B": sum(x["B"])}, index=[0])
pandas_df_result = pandas_df.groupby(["A"]).apply(func)# spark udf函数和pandas apply函数
def func1(a, b):return a + b
spark_df.withColumn("col_name", F.udf(func1, IntegerType())(spark_df.a, spark_df.b))  # spark_df['a']或F.col("a")))
def func2(x,y):return 1 if x > np.mean(y) else 0
pandas_df['A'].apply(func2, args=(pandas_df['B'],))
pandas_df['C'] = pandas_df.apply(lambda x: 1 if x['A'] > (x['B']*0.5) else 0, axis=1)# spark创建临时表
spark_df.createOrReplaceTempView('tmp_table')  # 用sql API
res1 = spark.sql('select * from tmp_table')
spark_df.registerTempTable('tmp_table') # 用dataframe API
res2 = spark.table('tmp_table') 

其他常用设置

class SparkUtils:def __init__(self):self.spark = Nonedef get_spark(self):if self.spark is None:self.spark = SparkSession.builder.appName("username") \.enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \.config("spark.sql.broadcastTimeout", "3600") \.config("spark.driver.memory", "200g") \.config("spark.executor.memory", "40g") \.config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \.config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \.config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest") \.config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest") \.getOrCreate()self.spark.sql('SET hive.exec.dynamic.partition=true')self.spark.sql('SET hive.exec.dynamic.partition.mode=nonstrict')return self.sparkspark = SparkUtils()# 生成dataframe
spark_data = spark.sql("""select id, usernamefrom tab1where status in (1, 2, 3)and dt = '{}'""".format(date))# pandas常用显示设置
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width',1000)
pd.set_option('display.max_colwidth',1000)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/826.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何实现文件上传到阿里云OSS!!!(结合上传pdf使用)

一、开通阿里云OSS对象存储服务 对象存储 OSS_云存储服务_企业数据管理_存储-阿里云阿里云对象存储 OSS 是一款海量、安全、低成本、高可靠的云存储服务&#xff0c;提供 99.995 % 的服务可用性和多种存储类型&#xff0c;适用于数据湖存储&#xff0c;数据迁移&#xff0c;企…

数据结构- 顺序表-单链表-双链表 --【求个关注!】

文章目录 一 顺序表代码&#xff1a; 二 链表单链表双向链表 一 顺序表 顺序表是线性表的一种 所谓线性表指一串数据的组织存储在逻辑上是线性的&#xff0c;而在物理上不一定是线性的 顺序表的底层实现是数组&#xff0c;其由一群数据类型相同的元素组成&#xff0c;其在逻辑…

「Word 论文排版」插入分节符导致word转PDF后出现空白页

问题 word转PDF后出现空白页 解决 但是此方法会让页面页脚标记出错 TODO 如下图所示 在论文目录后有一个分节符&#xff0c;转成PDF之后就多了一个空白页 文件-打印-页面设置-选中封面那一页-版式-从偶数页开始 再导出空白页就没了

Java编程题 | 数组元素交换

大家可以关注一下专栏&#xff0c;方便大家需要的时候直接查找&#xff0c;专栏将持续更新~ 题目描述 编写一个Java程序&#xff0c;输入一个整数数组&#xff0c;将最大的元素与第一个元素交换&#xff0c;最小的元素与最后一个元素交换&#xff0c;然后输出修改后的数组…

3D抓取算法的优点及缺点

3D抓取算法作为三维点云数据上的物体抓取解决方案&#xff0c;具有一些显著的优点&#xff0c;但同时也存在一些潜在的缺点。以下是对其优点和缺点的详细分析&#xff1a; 优点&#xff1a; 基于深度学习&#xff1a;3D抓取算法利用深度学习的强大能力来处理和分析三维点云数据…

香港多IP服务器在建立高可用性网站架构中的作用?

香港多IP服务器在建立高可用性网站架构中的作用&#xff1f; 在构建高可用性的网站架构时&#xff0c;选择合适的服务器和配置是至关重要的。香港多IP服务器因其独特的地理和技术优势&#xff0c;成为了全球许多企业的首选。这些服务器由于拥有多个IP地址&#xff0c;能够提供…

OpenHarmony多媒体-mp3agic

简介 mp3agic 用于读取 mp3 文件和读取/操作 ID3 标签&#xff08;ID3v1 和 ID3v2.2 到 ID3v2.4&#xff09;,协助开发者处理繁琐的文件操作相关&#xff0c;多用于操作文件场景的业务应用。 效果展示&#xff1a; 下载安装 ohpm install ohos/mp3agicOpenHarmony ohpm环境配…

qt 信号槽

信号槽 前提&#xff1a;如果没有消息循环&#xff0c;那么Qt的信号和槽无法完全使用。最开始的Qt消息循环开始于QCoreApplication::exec。在绝大部分GUI程序中&#xff0c;GetMessage, DispatchMessage是写在一个死循环中的&#xff0c;除非程序退出&#xff0c;否则会一直处…

Docker Desktop打开一直转圈的解决办法

安装Docker Desktop之前确保你的Hyper-V已经打开 开启后需要重新安装重新安装重新安装这是最关键的一步&#xff0c;博主自己看了很多教程&#xff0c;最后试着重装了一下解决了 安装DockerDesktop的时候我的电脑根本就没有Hyper-V这个功能选项&#xff0c;可能是这个问题 如…

达梦数据库的DMRMAN工具介绍

达梦数据库的DMRMAN工具介绍 DMRMAN&#xff08;DM RECOVERY MANAGER&#xff09;是 DM 的脱机备份还原管理工具&#xff0c;由它来统一负责库级脱机备份、脱机还原、脱机恢复等相关操作&#xff0c;该工具支持命令行指定参数方式和控制台交互方式执行&#xff0c;降低了用户的…

域名信息查询同款WHOIS源码

域名查询一般是指查询域名的whois注册信息&#xff0c;域名WHOIS是当前域名系统中不可或缺的一项信息服务。在使用域名进行Internet冲浪时&#xff0c;很多用户希望进一步了解域名、名字服务器详细信息 源码免费下载地址抄笔记 (chaobiji.cn)https://chaobiji.cn/

部署轻量级Gitea替代GitLab进行版本控制(二)

version: 3.9 # 创建自定义网络 networks:gitea:name: giteadriver: bridgeservices:## 数据库服务db:image: postgres:latestcontainer_name: gitea_dbrestart: alwaysnetworks:- gitea # 加入到gitea网络ports:- 3003:5432environment:- POSTGRES_USERgitea # PGSQL默认用户-…

ubuntu系统下opencv的编译安装

ubuntu系统下opencv的编译安装 参考https://blog.csdn.net/KIK9973/article/details/118830187 1 安装准备 1.1安装依赖环境(Ubuntu18.04) 下载opencv的依赖&#xff0c;其中第三行的依赖是可选的&#xff0c;前两行的依赖则是必要的。 sudo apt-get install build-essent…

力扣打卡第一天

101. 对称二叉树 C&#xff1a; class Solution { public:bool isSymmetric(TreeNode* root) {return check(root->left,root->right);}bool check(TreeNode *p,TreeNode *q){ /**定义check方法用来检查两棵树是否是镜像的*/if (!p && !q) return true; /* 如…

鸿蒙开发语言_ArkTS开发语言体验_TypeScript语言环境搭建_TS声明和数据类型---HarmonyOS4.0+鸿蒙NEXT工作笔记003

可以看到我们新建的这个项目,有个 @State message: String =Hello ArkTS 这个就是定义了一个变量,可以看到 message是变量名,String是变量类型. 然后我们可以看看它的结构可以看到 build() 下面有个Row,然后再下面有个Column方法,然后,里面就是具体的内容了,首先就是显示了一…

python模拟股票交易

模拟股票交易是一个相对复杂的任务,涉及许多因素,如市场数据获取、交易策略、风险管理等。在Python中,你可以使用多种库和工具来模拟股票交易。以下是一个简单的示例,展示如何使用Python来模拟基本的股票交易。 1. 准备环境 首先,你需要安装一些必要的Python库。你可以使…

Python数据结构【四】排序(二)难度:困难

文章目录 前言一、书接上回二、快速排序&#xff08;Quick Sort&#xff09;2.1 快速排序思想2.2 快速排序代码实现2.3 快速排序复杂度分析 三、堆排序&#xff08;Heap Sort&#xff09;3.1 堆排序思想3.2 堆排序代码实现3.3 堆排序复杂度分析 结语 前言 可私聊进一千多人Pyth…

文件名批量改名,高效将文件名里的符号进行替换删除掉,实现文件名的高效管理

在信息爆炸的时代&#xff0c;我们每天都在与大量的文件打交道。从工作文档到个人照片&#xff0c;从视频剪辑到音频录音&#xff0c;每个文件背后都承载着我们的辛勤付出和美好回忆。然而&#xff0c;随着文件数量的不断增加&#xff0c;如何高效管理这些文件成为了一个亟待解…

MongoDB安装及集成

MongoDB安装及集成 前言 MongoDB是一个开源的、面向文档的 NoSQL 数据库&#xff0c;它采用了 JSON 风格的文档来存储数据&#xff0c;而不是传统的表格形式。MongoDB在数据存储方面具有灵活性和可扩展性&#xff0c;使得它成为了当今流行的数据库之一。 MongoDB的主要特点和…

SQL语言初步认识

1. SQL简介 2. 基本的数据定义 2.1 创建基本表 CREATE TABLE <表名> <列名><数据类型>[<默认值>|<标识列设置>][<该列的完整性约束>] 完整性约束&#xff1a; ①NOT NULL &#xff1a;该列值不能为空 ②NULL &#xff1a;该列值可以为…