【小贪】大数据处理常用:Pyspark, Pandas

近期致力于总结科研或者工作中用到的主要技术栈,从技术原理到常用语法,这次查缺补漏当作我的小百科。主要技术包括:

  • ✅数据库常用:MySQL, Hive SQL, Spark SQL
  • ✅大数据处理常用:Pyspark, Pandas
  • ⚪ 图像处理常用:OpenCV, matplotlib
  • ⚪ 机器学习常用:SciPy, Sklearn
  • ⚪ 深度学习常用:Pytorch, numpy
  • ⚪ 常用数据结构语法糖:itertools, collections
  • ⚪ 常用命令: Shell, Git, Vim

以下整理错误或者缺少的部分欢迎指正!!!

大数据处理常用:Pyspark, Pandas

性能对比

PysparkPandas
运行环境分布式计算集群(Hadoop/Apache Spark集群)单个计算机
数据规模亿级大规模百万级小规模
优势分布式计算->并行处理,处理速度快API简单->数据处理简单
延迟机制lazy execution, 执行动作之前不执行任务eager execution, 任务立即被执行
内存缓存persist()/cache()将转换的RDDs保存在内存单机缓存
DataFrame可变性不可变,修改则返回一个新的DataFrame可变
可扩展性
列名允许重复×

常用语法对比

# 头文件
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType  # 或者直接导入*
import pandas as pd# 创建SparkSession对象
spark = SparkSession.builder \.appName("username") \.getOrCreate()# 创建空表
schema = StructType([StructField('id', LongType()),StructField('type', StringType()),])  # spark需要指定列名和类型
spark_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=schema)
pandas_df = pd.DataFrame(columns=['id', 'type'], index=[0, 1, 2])# 根据现有数据创建
data = [(1, "Alice", 2000), (2, "Bob", 2001), (3, "Charlie", 2002)]
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("birth_year", IntegerType(), True)
])
spark_df = spark.createDataFrame(data, ["id", "name", "birth_year"])
spark_df = spark.createDataFrame(data, schema)
pandas_df = pd.DataFrame(data=data, columns=["id", "name", "birth_year"])# 读取csv文件
spark_df = spark.read.csv("data.csv", header=True, inferSchema=True)
pandas_df = pd.read_csv("data.csv", sep="\t")  # read_excel
# 保存数据到csv
spark_df.write.csv('data.csv', header=True)
pandas_df.to_csv("data.csv", index=False)# 读取hive表数据
spark_df = spark.sql('select * from tab')
# 保存数据到hive表
spark_df.write.mode('overwrite').saveAsTable('db_name.tab_name')# 相互转换
spark_df = SQLContext.createDataFrame(pandas_df)
pandas_df = spark_df.toPandas()# 转换数据类型
spark_df = spark_df.withColumn("A", col("age").cast(StringType))
pandas_df["A"] = pandas_df['A'].astype("int")# 重置索引
spark_df = spark_df.withColumn("id", monotonically_increasing_id())  # 生成一个增长的id列
pandas_df.reset_index()# 切片
pandas_df['a':'c']  # a-c三行
pandas_df.iloc[1:3, 0:2]  # 1-2行,0-1列。左闭右开
pandas_df.iloc[[0, 2], [1, 2]] # 第0,2行第0,2列
pandas_df.loc['a':'c', ['A', 'B']] # 第a-c行A,B列# 选择列
spark_df.select('A', 'B')
pandas_df[['A', 'B']]# 删除列
spark_df.drop('A', 'B')
pandas_df.drop(['A', 'B'], axis=1, inplace=True)  # inplace表示是否创建新对象# 新增列,设置列值
spark_df = spark_df.withColumn('name', F.lit(0))
pandas_df['name'] = 0# 修改列值
spark_df.withColumn('name', 1)
pandas_df['name'] = 1
# 使用函数修改列值
spark_df = spark_df.withColumn('code', F.when(F.isnull(spark_df.code), 0).otherwise(spark_df.code))# 修改列名
spark_df.withColumnRenamed('old_name', 'new_name')
pandas_df.rename(columns={'old_name1': 'new_name1', 'old_name1': 'new_name2'}, inplace=True)# 显示数据
spark_df.limit(10) # 前10行
spark_df.show/take(10)  # collect()返回全部数据
spark_df/pandas_df.first/head/tail(10)# 表格遍历
saprk_df.collect()[:10]
spark_df.foreach(lambda row: print(row['c1'], row['c2']))
for i, row in pandas_df.iterrows():print(row["c1"], row["c2"])# 排序
spark/pandas_df.sort()  # 按列值排序
pandas_df.sort_index()  # 按轴排序
pandas_df.sort_values(by=["A", "B"], axis=0, ascending=[True, False], inplace=True)  # 指定列升序/降序排序# 过滤
spark_df.filter(df['col_name'] > 1)     # spark_df.where(df['col_name'] > 1)
pandas_df[pandas_df['col_name'] > 1]
pandas_df_new = pandas_df[pandas_df["code"].apply(lambda x: len(x) == 11)]# 去重
spark_df.select('col_name').distinct()
spark_df_filter = spark_df.drop_duplicates(["col_name"])
pandas_df.drop_duplicates(["col_name"], keep='first', inplace=True)# 缺失数据处理
spark_df.na.fill()
spark_df.na.drop(subset=['A', "B"])  # 同dropna
pandas_df.fillna()
pandas_df.dropna(subset=['A', "B"], how="any", inplace=True)# 空值过滤 filter=choose
spark_df.filter(~(F.isnull(spark_df.d)))
spark_df.filter(~(spark_df['A'].isNull() | spark_df['B'].isNull()))   # 选出列值不为空的行  isnan()=isNull()<->isNOtnan()
pandas_df[pandas_df['A'].isna()]  # 选出列值为空的行
pandas_df[pandas_df['A'].notna()] # 选出列值不为空的行# 统计
spark/pandas_df.count()  # spark返回总行数,pandas返回列非空总数
spark/pandas_df.describe() # 描述列的count, mean, min, max...# 计算某一列均值
average_value = spark_df.select("col_name").agg({"col_name": "avg"}).collect()[0][0]
average_value = pandas_df["col_name"].mean()# 表合并
# 按行合并,相当于追加
spark_df = spark_df.unionAll(spark_df1)
pandas_df = pd.concat([df_up, df_down], axis=0)
# 按列合并
spark_df = spark_df.join(df1, df1.id==spark_df.id, 'inner').drop(df1.id)  # df1.id==spark_df.id也可写成['id](当且仅当列名相同)
pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer")  # 聚合函数
spark_df_collect = spark_df.groupBy('number').agg(F.collect_set('province').alias('set_province'),F.first('city').alias('set_city'),F.collect_list('district').alias('set_district'),F.max('report_user').alias('set_report_user'),F.min('first_type').alias('set_first_type'))
# 分组聚合
spark_df.groupBy('A').agg(F.avg('B'), F.min('B'))
spark/pandas_df.groupby('A').avg('B')# 根据函数分组聚合
def func(x):return pd.DataFrame({"A": x["A"].tolist()[0],"B": sum(x["B"])}, index=[0])
pandas_df_result = pandas_df.groupby(["A"]).apply(func)# spark udf函数和pandas apply函数
def func1(a, b):return a + b
spark_df.withColumn("col_name", F.udf(func1, IntegerType())(spark_df.a, spark_df.b))  # spark_df['a']或F.col("a")))
def func2(x,y):return 1 if x > np.mean(y) else 0
pandas_df['A'].apply(func2, args=(pandas_df['B'],))
pandas_df['C'] = pandas_df.apply(lambda x: 1 if x['A'] > (x['B']*0.5) else 0, axis=1)# spark创建临时表
spark_df.createOrReplaceTempView('tmp_table')  # 用sql API
res1 = spark.sql('select * from tmp_table')
spark_df.registerTempTable('tmp_table') # 用dataframe API
res2 = spark.table('tmp_table') 

其他常用设置

class SparkUtils:def __init__(self):self.spark = Nonedef get_spark(self):if self.spark is None:self.spark = SparkSession.builder.appName("username") \.enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \.config("spark.sql.broadcastTimeout", "3600") \.config("spark.driver.memory", "200g") \.config("spark.executor.memory", "40g") \.config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \.config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \.config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest") \.config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest") \.getOrCreate()self.spark.sql('SET hive.exec.dynamic.partition=true')self.spark.sql('SET hive.exec.dynamic.partition.mode=nonstrict')return self.sparkspark = SparkUtils()# 生成dataframe
spark_data = spark.sql("""select id, usernamefrom tab1where status in (1, 2, 3)and dt = '{}'""".format(date))# pandas常用显示设置
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width',1000)
pd.set_option('display.max_colwidth',1000)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/821178.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【每日刷题】Day16

【每日刷题】Day16 &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;每日刷题&#x1f34d; &#x1f33c;文章目录&#x1f33c; 1. 24. 两两交换链表中的节点 - 力扣&#xff08;LeetCode&#xff09; 2. 160. 相交链表 - 力扣&…

基于小程序实现的4s店管理系统

作者主页&#xff1a;Java码库 主营内容&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app等设计与开发。 收藏点赞不迷路 关注作者有好处 文末获取源码 技术选型 【后端】&#xff1a;Java 【框架】&#xff1a;ssm 【…

【C++11】智能指针

> 作者&#xff1a;დ旧言~ > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;理解在C11中智能指针&#xff0c;自己能模拟实现 4 种智能指针 > 毒鸡汤&#xff1a;白日莫闲过&#xff0c;青春不再来。 > 专栏选自&#xff1…

【halcon】C# halcon 内存暴增 续,找到一个解决方案

这里写自定义目录标题 背景释放临时缓存具体的使用感受背景 在之前的文章《【halcon】C# halcon 内存暴增 》中我们提到了一些会导致内存暴增的原因。 其中一个就是使用了计算复杂的算子,且图片很大时,此时内存就会暴增,而且内存无法被释放。 这次,我在做一个项目时,用到…

深入理解负载均衡:原理及常用算法

摘要&#xff1a; 负载均衡在现代网络架构中扮演着至关重要的角色&#xff0c;它通过分配请求到多个服务器来提高系统的性能、可用性和可伸缩性。本文将介绍负载均衡的基本原理以及常用的负载均衡算法&#xff0c;帮助读者更好地理解和应用负载均衡技术。 引言 随着互联网的迅…

TP5使用group报错:1055 Expression #1 of SELECT list is not in GROUP

使用group报错 Mysql环境是5.7的, 使用了View进行了表连接, 进行了表连接 搬迁到本地后, 查询报错 Syntax error or access violation: 1055 Expression 解决方法1 配置 my.cnf(linux)文件 win下面是 mysql.ini文件 在 mysqld 里加上 sql_modeNO_ENGINE_SUBSTITUTION,STR…

L2-3 完全二叉树的层序遍历

完全二叉树的层序遍历 一个二叉树&#xff0c;如果每一个层的结点数都达到最大值&#xff0c;则这个二叉树就是完美二叉树。对于深度为 D 的&#xff0c;有 N 个结点的二叉树&#xff0c;若其结点对应于相同深度完美二叉树的层序遍历的前 N 个结点&#xff0c;这样的树就是完全…

网络爬虫:定义、应用及法律道德考量

网络爬虫技术在当今数据驱动的世界中发挥着重要作用。本文将从网络爬虫的定义和主要功能&#xff0c;其在业界的应用实例&#xff0c;以及涉及的法律和道德问题三个方面进行深入探讨。 1. 爬虫的定义和主要功能 网络爬虫&#xff0c;也称为网页爬虫或蜘蛛&#xff0c;是一种…

RocketMQ 01 Linux安装

RocketMQ 01 主要内容&#xff1a; 编译安装HelloWorld官网名词 官方网站 http://rocketmq.apache.org GitHub https://github.com/apache/rocketmq Quick Start Linux下使用Maven编译源码安装 Rocketmq4.6需要jdk1.8环境编译和运行 各版本要求 VersionClientBroke…

SGI_STL和Nginx内存池源码剖析--源码移植

将SGISTL内存配置器和Nginx内存池源码&#xff0c;移植到自己的项目中。 源码文件复杂&#xff0c;并且有很多项目中使用不到的宏定义&#xff0c;所以通过改写和移植&#xff0c;可以很好的适应C的其他项目。 SGI-STL源码移植资源-CSDN文库 nginx内存池源码移植资源-CSDN文库…

游戏动画技术:从传统到深度学习

一、传统游戏动画技术简介 3D游戏动画的骨骼动画和蒙皮技术动画交互控制&#xff1a;状态机、动作融合和IK基于状态机的动画控制原理和问题 二、Motion Matching技术简介 传统状态机动画的缺陷Motion Matching的原理&#xff1a;根据角色状态自动匹配动画Dance Card动捕流程…

android不同版本(支持>10)获取当前连接的wifi名称

1、AndroidManifest.xml 配置权限 <uses-permission android:name"android.permission.ACCESS_COARSE_LOCATION" /> <uses-permission android:name"android.permission.CHANGE_NETWORK_STATE" /> <uses-permission android:name&q…

《大话数据结构》02 算法

算法是解决特定问题求解步骤的描述&#xff0c;在计算机中表现为指令的有限序列&#xff0c;并且每条指令表示一个或多个操作。 1. 两种算法的比较 大家都已经学过一门计算机语言&#xff0c;不管学的是哪一种&#xff0c;学得好不好&#xff0c;好歹是可以写点小程序了。现在…

MySQL事务(学习)

1.事务的特性是什么&#xff1f; 事务是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;事务会把所有的操作作为一个整体一起向系统提交或撤销操作请求&#xff0c;即这些操作要么同时成功&#xff0c;要么同时失败。 事务包含了4个特性ACID 事务4个特性…

开发需求总结14-数组对象中根据相同的id进行排序

需求描述: 大家都知道element里el-table组件实现单选、全选,只需要在el-table-column,设type属性为selection,通过selection-change事件就可以得到选中的数组。一般像批量发布、批量审批、批量拒绝等场景,不涉及选中后排序,是可以直接将选中后的数据的id作为参数,传递给…

windows编译xlnt,获取Excel表里的数据

用git拉取项目 这个文件是空的 要用git拉下来&#xff0c;使用终端编译xlnt库 点击解决方案 运行生成 然后新建项目&#xff0c;配置好库&#xff0c; #include <iostream> #include <xlnt/xlnt.hpp>int main() {// 打开 Excel 文件xlnt::workbook workbook;workb…

优斯特:防静电包装解决方案的巧妙运用

在现代电子产品生产与运输领域&#xff0c;防静电包装已成为保障产品安全的必备环节。优斯特凭借其创新的防静电包装解决方案&#xff0c;为客户提供了一种巧妙的方式来确保产品在存储和运输过程中不受静电影响&#xff0c;并且不会被刮花或损坏。 静电对产品的影响 静电对电子…

大数据行业英语单词巩固20240413

Integration - 整合 Example: The integration of new software into our system will improve efficiency. 示例&#xff1a;将新软件集成到我们的系统中将提高效率。 Automation - 自动化 Example: Automation of repetitive tasks can save time and reduce errors. 示例&a…

MacOS Github Push项目 精简版步骤

大白菜教程&#xff1a;小白菜 macOS github提交代码-CSDN博客 步骤1&#xff1a;git init步骤2&#xff1a; touch .gitignore 创建ignore文件 open .gitignore 打开ignore文件 编写ignore文件.idea/ 是文件夹的意思.git/ 也是自动生成的文件夹 也不上传.DS_St…

ENVI实战—一文学会使用传感器自带信息配准工具进行几何校正

实验1&#xff1a;学会使用传感器自带信息配准工具 目的&#xff1a;利用ENVI的传感器自带信息配准工具&#xff0c;掌握几何校正的一般方法。 过程&#xff1a; 1.对MODIS影像进行校正&#xff1a; ①读取影像&#xff1a;打开文件&#xff0c;点击“打开为”&#xff0c;…