dataframe 筛选_Spark.DataFrame与Spark.ML简介

本文是PySpark销量预测系列第一篇,后面会陆续通过实战案例详细介绍PySpark销量预测流程,包含特征工程、特征筛选、超参搜索、预测算法。

在零售销量预测领域,销售小票数据动辄上千万条,这个量级在单机版上进行数据分析/挖掘是非常困难的,所以我们需要借助大数据利器--Spark来完成。

Spark作为一个快速通用的分布式计算平台,可以高效的使用内存,向用户呈现高级API,这些API将转换为复杂的并行程序,用户无需深入底层。

由于数据挖掘或分析人员,大多数熟悉的编程语言是Python,所以本章我们介绍Spark的Python版--PySpark。本节先介绍必要的基础知识比如DataFrame和ML库,在后续章节中会给出基于Spark机器学习的特征生成/特征选择/超参数调优以及机器学习销量预测算法。

5ce32c9dbe394dfdaedc6fd5a2328363.png

(借用网上一张图)

1.Spark.DataFrame与Spark.ML简介

从Spark 2.0开始,Spark机器学习API是基于DataFrame的Spark.ML ,而之前基于RDD的Spark.MLlib已进入维护模式,不再更新加入新特性。基于DataFrame的Spark.ML是在RDD的基础上进一步的封装,也是更加强大方便的机器学习API,同时如果已经习惯了Python机器学习库如sklearn等,那么你会发现ML用起来很亲切。本节主要厘清一些概念为接下来的机器学习做准备,所以可能知识点比较密集且枯燥。


下面我们就开始介绍DataFrame和ML

DataFrame 从属于 Spark SQL 模块,适用于结构化/数据库表以及字典结构的数据,执行数据读取操作返回的数据格式就是DataFrame,同时熟悉Python的pandas库或者R语言的同学来说,更是觉得亲切,Spark.DataFrame正是借鉴了二者。DataFrame的主要优点是Spark引擎在一开始就为其提供了性能优化,与Java或者Scala相比,Python中的RDD非常慢。每当使用RDD执行PySpark程序时,在PySpark驱动器中,启动Py4j使用JavaSparkContext的JVM,PySpark将数据分发到多个节点的Python子进程中,此时Python和JVM之间是有很多上下文切换和通信开销,而DataFrame存在的意义就是优化PySpark的查询性能。

以上我们交代了Spark.DataFrame的由来,下面介绍其常见操作。


1.1 Spark.DataFrame生成

(1)使用toDF(基于RDD)

from pyspark import SparkConf,SparkContextfrom pyspark.sql import Rowconf = SparkConf().setMaster("local").setAppName("My App")sc = SparkContext(conf = conf)df = sc.parallelize([ \    Row(name='Alice', age=5, height=80), \    Row(name='Alice', age=5, height=80), \    Row(name='Alice', age=10, height=80)]).toDF()#查看数据类型df.dtypes#[('age', 'bigint'), ('height', 'bigint'), ('name', 'string')]查看df类型type(df)#class 'pyspark.sql.dataframe.DataFrame'>

可以将DataFrame视为关系数据表,在其上进行类似于SQL的操作,同时与平时建SQL表需要指定数据类型不同的是,此时数据列的类型是自动推断,这也是其强大之处。

(2)读取本地文件

from pyspark.sql import SparkSessionspark = SparkSession.builder \    .master("local") \    .appName("Test Create DataFrame") \    .config("spark.some.config.option", "some-value") \    .getOrCreate()df = spark.read.csv('python/test_spark/ts_dataset.csv')

同理还可以读取parquet/json文件

df_parquet=spark.read.parquet('....')df_json = spark.read.format('json').load('python/test_spark/ts_dataset.json')

以上两种方式中,第一种是Spark1.x版本中以RDD作为主要API的方式,第二种的SparkSession是随着spark2.x引入,封装了SparkContext、SparkConf、sqlContext等,为用户提供统一的接口来使用Spark各项功能的更高级抽象的启动方式。

强调一点是,我们通过会话SparkSession读取出来的数据类型就是DataFrame,而第一种还需要在RDD的基础上使用toDF进行转换。如果当前读者使用的spark版本是2,那么,推荐使用第二种方式。


(3)读取HIVE表

from pyspark.sql import SparkSessionspark = SparkSession. \    Builder(). \    config("spark.sql.crossJoin.enabled", "true"). \    config("spark.sql.execution.arrow.enabled", "true"). \    enableHiveSupport(). \    getOrCreate()df=spark.sql("""select regparam,fitIntercept, elasticNetParam from temp.model_best_param""")

这种类型和上文直接读取本地文件类似,Spark任务在创建时,是默认支持Hive,可以直接访问现有的 Hive支持的存储格式。解释一下,Apache Hive是Hadoop上一种常见的结构化数据源,支持包含HDFS在内的多种存储系统上的表,由于实际工作中我们使用spark.sql读取数据操作的机会更多,也是spark最核心组件之一,所以这里重点讲解一些Spark.SQL。与Spark其他的组件一样,在使用的时候是需要提前引入Spark.SQL,但也无需依赖大量的包,如果需要把Spark.SQL连接到一个部署好的Hive上,则需要把hive-site.xml复制到spark的配置文件目录中,该部分内容参考网络上其他的教程。以上代码中enableHiveSupport的调用使得SparkSession支持Hive。如果是Spark 1.x版本,则使用以下方式引用。

from pyspark.sql import HiveContexthiveCtx=HiveContext(sc)data=hiveCtx.sql("select regparam,fitIntercept, elasticNetParam from temp.model_best_para ")

(4)pandas.DataFrame转换而来

既然使用python进行数据处理,尤其是结构化数据,那么pandas一定绕不开,所以我们经常会有把做过一些处理的pandas.DataFrame数据转换为Spark.DataFrame的诉求,好在Spark.DataFrame在设计之初就参考并考虑到了这个问题,所以实现方式也相当简单。

import pandas as pddf = pd.read_csv('python/test_spark/ts_dataset.csv')#将pandas.Dataframe 转换成-->Spark.DataFrame spark_df=spark.createDataFrame(df)#将Spark.DataFrame 转换成--> pandas.Dataframepd_df = spark_df.toPandas()

以上将Spark.DataFrame 转换成--> pandas.Dataframe的过程,不建议对超过10G的数据执行该操作。

本节开头我们也说了Spark.DataFrame是从属于Spark.sql的,Spark.sql作为Spark最重要的组件,是可以从各种结构化数据格式和数据源读取和写入的,所以上面我们也展示了读取json/csv等本地以及数据库中的数据。同时spark还允许用户通过thrift的jdbc远程访问数据库。总的来说 Spark 隐藏了分布式计算的复杂性, Spark SQL 、DataFrame更近一步用统一而简洁的API接口隐藏了数据分析的复杂性。从开发速度和性能上来说,DataFrame + SQL 无疑是大数据分析的最好选择。


1.2 Spark.DataFrame操作

以上我们强调了Spark.DataFrame可以灵活的读取各种数据源,数据读取加载后就是对其进行处理了,下面介绍读取DataFrame格式的数据以后执行的一些简单的操作。

(1)展示DataFrame

spark_df.show()
  • 打印DataFrame的Schema信息

spark_df.printSchema()
  • 显示前n行

spark_df.head(5)
  • 显示数据长度与列名

df.count()df.columns

(2)操作DataFrame列

  • 选择列

ml_dataset=spark_df.select("features", "label")
  • 增加/产生新的一列

from pyspark.sql.functions import *#注意这个*号,这里是导入了sql.functions中所有的函数,所以下文的abs就是由此而来df2 = spark_df.withColumn("abs_age", abs(df2.age))
  • 删除列

df3= spark_df.drop("age")
  • 筛选

df4= spark_df.where(spark_df["age"]>20)

这里只是简单的展示了一小部分最为常见的DataFrame操作,更详尽的内容请查阅官方文档或者其他参考资料。

51ad911c9abb297b1e65911c80b2c3b2.png

1.3 Spark.ML简介

以上我们介绍了与Spark.ML机器学习密切相关的数据类型和基本操作--Spark.DataFrame

犹如我们通过pandas.DataFrame对数据做加工,下面我们看看用这些清洗过后的制作佳肴的工具包--机器学习建模。

ML包括三个主要的抽象类:转换器(Transformer)、评估器(Estimator)和管道(Pipeline)。

转换器,顾名思义就是在原对象的基础上对DataFrame进行转换操作,常见的有spark.ml.feature中的对特征做归一化,分箱,降度,OneHot等数据处理,通过transform()方法将一个DataFrame转换成另一个DataFrame。

评估器,评估器是用于机器学习诸如预测或分类等算法,训练一个DataFrame并生成一个模型。用实现fit()方法来拟合模型。

from pyspark.ml.feature import MinMaxScaler#定义/引入转换类max_min_scaler = MinMaxScaler(inputCol="age", outputCol="age_scaler")#fit数据max_min_age = max_min_scaler.fit(df)#执行转换max_min_age_=max_min_age.transform(spark_df)

管道这一概念同样受Python的Scikit-Learn库的影响,PySpark ML中的管道指从转换到评估的端到端的过程,为简化机器学习过程并使其具备可扩展性,PySpark ML中的Pipelines API,类似于 Python 机器学习库 Scikit-Learn 中的 Pipeline,采用了一系列 API 定义并标准化的的机器学习工作流,包含数据读取、预处理、特征加工、特征选择、模型拟合、模型验证、模型评估等一系列工作,对DataFrame数据执行计算操作。Spark机器学习部分其他的如特征生成,模型训练,模型保存,数据集划分/超参数调优,后面我们会有实际案例进行详细阐述。另外,随着spark.3.0的发布,最近的ml简介可以通过此链接了解

http://spark.apache.org/docs/latest/ml-guide.html


最后,顺便介绍手头上几本密切相关书籍:

1.《Spark快速大数据分析》本书有些旧,主要是spark.1.x为主,少量的spark.2.X介绍,如果想要了解或者不得不使用rdd based  APIs进行数据分析或者想深入spark更底层学习一点scala等函数式编程入门的还是不错的选择,比较全面通俗,豆瓣评分7.9。

2.《PySpark实战指南》用Python进行Spark数据分析那就不得不提到这本书,倒不见得有多好,只是目前市面上也没有更好的专门使用Python介绍Spark的中文书籍,本书从rdd到mllib的介绍以及ml包的介绍,可以通过书中提供的API介绍了解使用python进行spark机器学习的过程,当然,机器学习的一些细节是没有涉及到的,总的来说更多的是展示流程和API的使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/446183.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

密码学专题 证书和CA指令 证书和CA功能概述

为什么需要证书 实现了公钥和私钥的相互验证,但是任何人都可以生成很多的密钥对,密钥对并没有关联实体身份,因此诞生可数字证书前提是CA是所有用户都信任的用户需要将自己的信息和公钥交给CA进行认证生成一个属于自己并被其与用户认可的数字…

密码学专题 证书和CA指令 申请证书|建立CA|CA操作|使用证书|验证证书

Req指令介绍 功能概述和指令格式 req指令一般来说应该是提供给证书申请用户的工具,用来生成证书请求以便交给CA验证和签发证书。但是,OpenSSL的req指令的功能远比这样的要求强大得多,它不仅可以生成RSA密钥、DSA密钥,以及将它们…

密码学专题 OpenSSL标准转换指令

概述 繁多复杂的各种文件编码格式、证书格式和密钥格式等。事实上,并非OpenSSL开发者想要将数字世界弄得如此令人头疼,只是由于各种原因,数字世界存在各种不同的标准,为了尽量兼容这些不同的标准,OpenSSL开发者才相应…

java pakage、import关键字

package介绍 常用的包 import 案例

密码学专题 OpenSSL中SSL相关指令

再谈SSL和OpenSSL 由于SSL协议已经是密码学和PKI技术中非常具体的一个应用协议,为了实现它,OpenSSL在密码学基础应用和PKI技术的基础实现上做了大量的工作,才逐渐形成和奠定了OpenSSL在密码学应用和PKI技术开发中的重要基础软件包地位。Open…

使用MetaMask实现转账交易时附带Input Data数据

进入如下页面,点击View Assert in Explorer进入Etherscan网站,就可以看到本账户先前所有的交易信息。 字段分析 Txn Hash 是交易的HashMethod:交易的类型,一般是两种,如果是Transfer指定的是合约部署;如…

Ubuntu安装Google浏览器

下载谷歌浏览器 直接使用Ubuntu自带的火狐浏览器进行下载 默认下载到/tmp临时文件夹里面,考虑到权限问题,需要将其移动到 Downloads文件夹下面使用命令 sudo mv ./goo(Tab)~/Down(Tab)使用命令行的方式进行下载 wget https://dl.g…

普通类创建获取session 方式_猿蜕变11——一文搞懂mybatis花式使用方式

看过之前的蜕变系列文章,相信你对mybatis有了初步的认识。但是这些还不够,我们今天进一步来了解下mybatis的一些用法。猿蜕变同样是一个原创系列文章,帮助你从一个普通的小白,开始掌握一些行业内通用的框架技术知识以及锻炼你对系…

Ubuntu配置IPFS的环境

参考链接 Ubuntu上IPFS环境搭建 - 简书 下载安装包 下载地址:https://dist.ipfs.io/#go-ipfs页面会自动根据你的操作系统提供适合的下载安装包,所以需要在Ubuntu环境下点开上面那个链接,网页自动识别当前的平台并提供对应的版本&#xff0c…

composer升级_Composer-命令简介

简介Composer 是一个用于 PHP 依赖管理的工具。它实现了让你声明项目所依赖的库,并帮你完成安装/更新过程。以下命令来自 composer version 1.8.0。翻译使用【百度翻译】。通过在命令窗口执行:composer或者:composer list得到 composer 的全部…

Ubuntu搭建联盟链,实现节点之间数据同步

安装go环境 从参考链接选择Linux版本的go的安装包 使用命令 mv go(Tab补全)/usr/local 移动go安装包到/usr/local目录下使用命令解压 sudo tar -xvzf go(Tab补全) 配置环境 sudo gedit ~/.profile export PATH$PATH:/usr/local/go/bin激活生效 sou…

火狐浏览器添加MetaMask钱包和本地开启私有链开发

火狐浏览器添加MetaMask钱包 因为对其配置了代理工具,所以直接使用谷歌引擎搜索MetaMask钱包即可第一次使用,立即开始设置 点击我同意,进行密码的创建 牢记助记词,助记词及其关键,将其存储在安全的地方 区块链-开发 M…

CLion导入用户自己的lib和头文件

文件的层级结构如上面所示对应的CMakeLists.txt配置文件如下面所示 cmake_minimum_required(VERSION 3.15) project(smart_shap)set(CMAKE_CXX_STANDARD 14)add_executable(${PROJECT_NAME} main.cpp )#target_link_directories(${PROJECT_NAME} PRIVATE ${PROJECT_SOURCE_DIR}…

东芝移动硬盘驱动_传输数据不用等,高速移动硬盘数据线畅享快传体验

不管你是设计师,摄影师亦或是办公一族,几乎都能用到电脑,而电脑里的文件如果很多的话,为了安全起见都会备份一份数据,以免电脑储存容量过大导致电脑卡顿。另一方面,为了保护数据防止丢失造成不必要的麻烦&a…

使用国密浏览器和使用Wireshark进行国密抓包

使用的软件 信密浏览器 密信浏览器发布Windows正式版 - 密信技术国密Wireshark GMSSL - 国密SSL实验室支持国密算法的网站 https://www.wotrus.com/ 流程操作 打开windows终端,使用命令ping沃通网站,找到这个网站的ip地址 ping www.wotrus.com打…

django框架学习文档_Python四大主流网络编程框架,你知道么?

高并发处理框架—— TornadoTornado 是使用 Python 编写的一个强大的可扩展的 Web 服务器。它在处理高网络流量时表现得足够强健,却在创建和编写时有着足够的轻量级,并能够被用在大量的应用和工具中。Tornado 作为 FriendFeed 网站的基础框架&#xff0c…

VS Studio报错无法解析的外部符号 _imp_XXXXXXXXX

出现字符_imp,说明不是真正的静态库,而是某个动态库的导入库,导入函数和自己不同名,所以加了字符_imp。比如说_imp_GetUserNameA就是GetUserNameA函数。会报这种错误的原因: 1、说明注册表函数没有相关的lib库&#xf…

hashmap put过程_阿里十年技术大咖,教你如何分析1.7中HashMap死循环

在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry…