dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化

Spark大数据分析中涉及到RDD、Data Frame和SparkSQL的操作,本文简要介绍三种方式在数据统计中的算子使用。


1、在IPython Notebook运行Python Spark程序

IPython Notebook具备交互式界面,可以在Web界面输入Python命令后立刻看到结果,还可将数据分析的过程和运行后的命令与结果存储成笔记本,下次可以打开笔记本,重新执行这些命令,IPython Notebook笔记本可以包含文字、数学公式、程序代码、结果、图形等。

1.1 安装IPython

1)若无gcc,需先安装gcc

[root@tango-spark01 /]# gcc –v
[root@tango-spark01 /]# yum install gcc

2)若无pip,安装pip

[root@tango-spark01 /]# pip –v
[root@tango-spark01 /]# wget https://bootstrap.pypa.io/get-pip.py --no-check-certificate

3)安装Python开发包

[root@tango-spark01 /]# yum install python-devel

4)执行以下命令安装IPython和IPython Notebook:

[root@tango-spark01 /]# pip install ipython
[root@tango-spark01 /]# pip install urllib3
[root@tango-spark01 /]# pip install jupyter

5)输入ipython进入交互界面

3c2489cd0aeb8337c06a9d4ffbf2df34.png

6)输入jupyter notebook

43767c52d32993aec91a6af309970504.png

1.2 IPython配置

1)创建远程连接密码

In [2]: from notebook.auth import passwd;
In [3]: passwd()
Enter password:
Verify password:
Out[3]: 'sha1:fea9052f4d92:114eb32c684004bf4a6196faac0b26a28156fe5d'

2)生成jupyter配置文件

[root@tango-spark01 /]# jupyter notebook --generate-config
Writing default config to: /root/.jupyter/jupyter_notebook_config.py

3)打开配置文件,设置以下内容

## The IP address the notebook server will listen on.
#c.NotebookApp.ip = 'localhost'
c.NotebookApp.ip = '0.0.0.0'
## The directory to use for notebooks and kernels.
#c.NotebookApp.notebook_dir = u''
c.NotebookApp.notebook_dir = u'/usr/local/spark/ipynotebook'
## Hashed password to use for web authentication.
# To generate, type in a python/IPython shell:
# from notebook.auth import passwd; passwd()
# The string should be of the form type:salt:hashed-password.
#c.NotebookApp.password = u''
c.NotebookApp.password = u'sha1:fea9052f4d92:114eb32c684004bf4a6196faac0b26a28156fe5d'

4)打开jupyter notebook

[root@tango-spark01 /]# jupyter notebook --allow-root
[I 14:20:05.618 NotebookApp] Serving notebooks from local directory: /usr/local/spark/ipynotebook
[I 14:20:05.618 NotebookApp] The Jupyter Notebook is running at:
[I 14:20:05.619 NotebookApp] http://(tango-spark01 or 127.0.0.1):8888/
[I 14:20:05.619 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[W 14:20:05.619 NotebookApp] No web browser found: could not locate runnable browser.
[I 14:21:00.346 NotebookApp] 302 GET / (192.168.112.1) 2.50ms
[I 14:21:00.352 NotebookApp] 302 GET /tree? (192.168.112.1) 1.71ms
[I 14:22:16.241 NotebookApp] 302 POST /login?next=%2Ftree%3F (192.168.112.1) 1.58ms

5)浏览器输入地址和端口

baf5f33eac3042564598e3f2bfef83fa.png

输入密码登录进去

f387bfaf0a78ecdb199e7ab2a5cfb770.png

1.3 在IPython Notebook中使用Spark

1)进入ipynotebook工作目录

[root@tango-spark01 /]# cd /usr/local/spark/ipynotebook
[root@tango-spark01 ipynotebook]#

2)在IPython Notebook界面中运行pyspark

[root@tango-spark01 ipynotebook]# PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook"  pyspark

3)单击New选择Python 2,新建Notebook

7b882f871eda1ed1dccad7ef95712df1.png

4)新建Notebook后会出现新的页面,默认notebook名称为Untitled,单击后修改名称

54e7b439ac96ebe9c67efb88468a0f71.png

5)在Notebook运行程序代码

126c9db3f1a7638950e55c3f56c8ca38.png

6)保存Notebook下次可继续打开使用

2、Spark SQL、DataFrame、RDD数据统计与可视化

2.1 RDD、DataFrame和Spark SQL比较

RDD和Data Frame都是Spark平台下分布式弹性数据集,都有惰性机制,在进行创建、转换时不会立即执行,等到Action时才会遍历运算。

  1. RDD API进行数据统计,主要使用map配合reduceByKey,需要有Map/Reduce概念

  2. 与RDD不同的是Data Frame更像是传统的数据库表格,除了数据以外,还记录了数据的结构信息

  3. Spark SQL则是由DataFrame派生出来,必须先创建DataFrame,然后通过登录Spark SQL temp table就可以使用Spark SQL语句,直接使用SQL语句进行查询

下表列出在进行数据统计计算时候,RDD、Data Frame和Spark SQL使用的不同方法。

Items功能描述
RDD APIuserRDD.map(lambda x:(x[2],1)).reduceByKey(lambda x,y: x+y).collect()
DataFrameuser_df.select(“gender”).groupby(“gender”).count().show()
Spark SQLsqlContext.sql(“””SELECT gender,count(*) counts FROM user_table GROUP BY gender”””).show()
2.2 创建RDD、DataFrame和Spark SQL

在Hadoop YARN-client模式运行IPython Notebook

[root@tango-spark01 ipynotebook]# PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook"  HADOOP_CONF_DIR=/usr/local/spark/hadoop-2.9.0/etc/hadoop  pyspark --master yarn --deploy-mode client
  • 创建RDD

1)配置文件读取路径

global Path
if sc.master[0:5] =="local":
Path="file:/usr/local/spark/ipynotebook/"
else:
Path="hdfs://tango-spark01:9000/input/"
  1. 如果sc.master[0:5]是“local”,代表当前在本地运行,读取本地文件

  2. 如果sc.master[0:5]不是“local”,有可能是YARN client或Spark Stand Alone,必须读取HDFS文件

2)读取文本文件并且查看数据项数

RawUserRDD=sc.textFile(Path+"data/u.user")
RawUserRDD.count()
RawUserRDD.take(5)

3)获取每一个字段

userRDD= RawUserRDD.map(lambda line:line.split("|"))
userRDD.take(5)

1cb2e8759c70dc9998f179e8a3a322bc.png

  • 创建Data Frame

1)创建sqlContext:在Spark早期版本中,spark context是Spark的入口、SQLContext是SQL入口、HiveContext是hive入口。在Spark 2.0中,使用Spark Session可同时具备spark context、sqlContext、HiveContext功能

sqlContext=SparkSession.builder.getOrCreate()

2)定义Schema:定义DataFrames的每个字段名与数据类型

from pyspark.sql import Row
user_Rows = userRDD.map(lambda p:
Row(userid=int(p[0]),age=int(p[1]),gender=p[2],occupation=p[3],zipcode=p[4]))
user_Rows.take(5)

8d3f226c46f63d57cf2715623f573fbb.png

3)创建DataFrames:使用sqlContext.createDataFrame()方法创建DataFrame

user_df=sqlContext.createDataFrame(user_Rows)
user_df.printSchema()

4)查看DataFrames数据

user_df.show(5)

5)为DataFrame创建别名:可以使用.alias帮DataFrame创建别名

df=user_df.alias("df")
df.show(5)

3b8450f978b8dbb4807aa9426b55ed31.png

  • 使用SparkSQL

创建DataFrame后,使用该DataFrame登录Spark SQL temp table,登录后可以使用Spark SQL

1)登录临时表

user_df.registerTempTable("user_table")

2)使用Spark SQL查看项数

sqlContext.sql("SELECT count(*) counts FROM user_table").show()

3)多行输入Spark SQL语句,需要使用3个双引号引住SQL

sqlContext.sql("""
SELECT count(*) counts
FROM user_table
""").show()

4)使用SparkSQL查看数据,限定数据项

sqlContext.sql("SELECT * FROM user_table").show()
sqlContext.sql("SELECT * FROM user_table").show(5)
sqlContext.sql("SELECT * FROM user_table LIMIT 5").show()

917fc9c01e52110078d343c4369e91c8.png

2.3 数据统计操作
2.3.1 筛选数据
  • 使用RDD筛选数据

RDD中使用filter方法筛选每一项数据,配合lambda语句创建匿名函数传入参数

userRDD.filter(lambda r:r[3]=='technician' and r[2]=='M' and r[1]=='24').take(5)
  • 输入DataFrames筛选数据

user_df.filter((df.occupation=='technician')&(df.gender=='M')&(df.age==24)).show()
  • 使用Spark SQL筛选数据

sqlContext.sql("""
SELECT *
FROM user_table
where occupation='technician' and gender='M' and age=24""").show(5)
2.3.2 按字段给数据排序
  • RDD按字段给数据排序

userRDD.takeOrdered(5,key=lambda x:int(x[1]))——升序排序
userRDD.takeOrdered(5,key=lambda x:-1*int(x[1]))——降序排序
userRDD.takeOrdered(5,key=lambda x:(-int(x[1]),x[2]))——多个字段排序
  • 使用DataFrame排序

user_df.select("userid","occupation","gender","age").orderBy("age").show(5)——升序
user_df.select("userid","occupation","gender","age").orderBy("age",ascending=0).show(5)
df.orderBy(["age","gender"],ascending=[0,1]).show(5)——多个字段排序
  • 使用Spark SQL排序

sqlContext.sql("""
SELECT userid,occupation,gender,age FROM user_table
order by age desc,gender""").show(5)
2.3.3 显示不重复数据
  • RDD显示不重复数据

userRDD.map(lambda x:x[2]).distinct().collect()
  • DataFrame显示不重复数据

user_df.select("gender").distinct().show()
  • Spark SQL显示不重复数据

sqlContext.sql("select distinct gender FROM user_table").show()
2.3.4 分组统计数据

1)RDD分组统计数据

userRDD.map(lambda x:(x[2],1)).reduceByKey(lambda x,y:x+y).collect()

2)DataFrames分组统计数据

user_df.select("gender").groupby("gender").count().show()

3)Spark SQL分组统计数据

sqlContext.sql("""
SELECT gender,count(*) counts FROM user_table
group by gender""").show()
2.3.5 Join联接数据
  • 准备zipcode数据

1)拷贝数据到HDFS目录下

[root@tango-spark01 data]# hadoop fs -copyFromLocal -f /usr/local/spark/ipynotebook/data/free-zipcode-database-Primary.csv  /input/data

2)读取并查看数据

Path="hdfs://tango-spark01:9000/input/"
rawDataWithHeader=sc.textFile(Path+"data/free-zipcode-database-Primary.csv")
rawDataWithHeader.take(5)

3)删除第一项数据

header = rawDataWithHeader.first()
rawData = rawDataWithHeader.filter(lambda x:x !=header)

4)删除特殊符号

rawData.first()
rData=rawData.map(lambda x:x.replace("\"",""))
rData.first()

5)获取每一个字段

zipRDD=rData.map(lambda x:x.split(","))
zipRDD.first()
  • 创建zipcode_tab

1)创建zipCode Row的schema

from pyspark.sql import Row
zipcode_data = zipRDD.map(lambda p:
Row(zipcode=int(p[0]),zipCodeType=p[1],city=p[2],state=p[3]))
zipcode_data.take(5)

2)Row类型数据创建DataFrames

zipcode_df=sqlContext.createDataFrame(zipcode_data)
zipcode_df.printSchema()

3)创建登录临时表

zipcode_df.registerTempTable("zipcode_table")
zipcode_df.show(10)
  • Spark SQL联接zipcode_table

sqlContext.sql("""
select u.*,z.city,z.state from user_table u
left join zipcode_table z ON u.zipcode=z.zipcode
where z.state='NY'
""").show(10)

6ba640151111b4588f9fa2e554a20a6c.png

2.3.6 使用Pandas DataFrame绘图
  • 按照不同的州统计并以直方图显示

1)转换为Pandas DataFrames

import pandas as pd
GroupByState_pandas_df = GroupByState_df.toPandas().set_index('state')
GroupByState_pandas_df

9091c150d0c6584b75b8bfb77e300220.png

2)使用Pandas DataFrames绘出直方图

import matplotlib.pyplot as plt
#matplotlib inline
ax=GroupByState_pandas_df['count'].plot(kind='bar',title='State',figsize=(12,6),legend=True,fontsize=12)
plt.show()

febd4eb49865ae41424102fe6991db82.png

  • 按照不同的职业统计并以饼图显示

1)创建Occupation_df

Occupation_df=sqlContext.sql("""
SELECT u.occupation,count(*) counts
FROM user_table u
GROUP BY occupation
""")
Occupation_df.show(5)

2)创建Occupation_pandas_df

Occupation_pandas_df=Occupation_df.toPandas().set_index('occupation')
Occupation_pandas_df

f1e18afb94ea415329d72369ff8143c0.png

3)用Pandas DataFrame是绘出饼图PieChart

ax=Occupation_pandas_df['counts'].plot(kind='pie',
title='occupation',figsize=(8,8),startangle=90,autopct='%1.1f%%')
ax.legend(bbox_to_anchor=(1.05,1),loc=2,borderaxespad=0.)
plt.show()
  1. kind='pie':绘制饼图

  2. startangle=90:设置图形旋转角度

  3. autopct='%1.1f%%':设置显示饼图%

3bf71f9a2f2df71b69efef7c4ea57f4d.png


参考资料

  1. Python+Spark 2.0+Hadoop机器学习与大数据实战,林大贵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/555091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

bug的生命周期、bug状态转换图

当我们发现一个bug的时候,应该怎么理清他们之间的关系呢?一个bug 从open到close的所有状态 都是我们测试人员需要注意的。 一、bug的状态 新建(New) 新发现的bug,未经评审决定是否指派给开发人员进行修改。 确认&…

wps居中对齐不在中间_WPS文字快捷键总结(Windows版本)--值得收藏

WPS Office是一款国产的办公软件套装,有WPS文字、WPS表格和WPS演示三个板块,可以实现办公软件最常见的文字、表格、演示等多种功能,支持阅读和输出PDF文件,全面兼容Microsoft Office97-2010格式。想要熟练地使用WPS办公软件&#…

bugzilla使用规范分享

bugzilla使用规范分享 1.new/confirmed 测试人员将Bug提交给任务分发人员(研发模块负责人), 此时Bug状态为new/confirmed,开始Bug的生命周期,如果测试人员知道具体负责的研发人员,也可以直接指定&#x…

编程语言_如何正确地学习编程语言

首先,当前学生和职场人学习编程已经成为了一个大的趋势,掌握编程语言不仅能够提升自身获取信息的能力,同时也能够拓展自身的能力边界,这一点在工业互联网时代会有更加明显的体现。编程语言本身并不难,但是要想形成自己…

测试游戏帧率电脑温度的软件,游戏中显示帧数和温度方法_游戏画面中实时显示FPS帧数温度技巧...

相信很多游戏玩家平时都是在用电脑玩游戏,而且大家也喜欢看一看自己在在游戏画面中实时的FPS帧数和温度信息。但是很网友对这个游戏画面中如何实时显示FPS帧数、频率、硬件温度不太清楚,下面智能手机网分享一下具体的操作方法,以便大家在玩游…

12面魔方公式图解法_一位建筑工程师:多年渴望就是学会魔方还原,只按这七步就可以!...

本人性别男,年龄47岁,一位建筑工程师,性格开朗,喜欢学习,2013年在网上搜索记忆关键词,从此开始了学习超级记忆和思维导图之路!也因此,接触了魔方!初学魔方,我…

navicat运行db文件_使用 YAML 文件配置 Jenkins 流水线

本文转载自:Jenkins 中文社区这也是一种自定义流水线 DSL 的方法几年前,我们的 CTO 写了一篇关于 使用 Jenkins 和 Docker 为 Ruby On Rails 应用提供持续集成服务 的文章。这些年,我们一直使用这个 CI 流水线解决方案,直到我们最…

Mybatis-Plus实现逻辑删除

数据库中的数据删除会分为两种:物理删除 和 逻辑删除 物理删除 物理删除就是我们删除数据库中的一条数据时,数据会真的被删除 逻辑删除 逻辑删除指的是我们删除一条数据时,数据不会在数据库中消息,逻辑删除是我们现在开发中经…

完美国际单机修改服务器端,完美国际改国内版单机一键服务端

最喜欢的一款网游,曾经出来的时候可以说是划时代的,3D游戏,空战,大地图无缝对接等。从比较早的113版本,经典的六职业136版本,到现在的155版本。都保存的有。玩官服就不说啦,从13年接触服务端到现…

开发 数组里面的字典_Redis字典结构与rehash解读

关注公众号:后端技术漫谈,技术之路不迷路~字典是一种用于保存键值对的抽象数据结构,也被称为查找表、映射或关联表。在字典中,一个键(key)可以和一个值(value)进行关联,这些关联的键和值就称之为键值对。抽象数据结构&…

MyBatisPlus 学习笔记_MP的AR模式

狂神说 MyBatisPlus 学习笔记 一、快速入门 文档:https://mp.baomidou.com/ 使用第三方组件: 导入对应依赖研究依赖如何配置代码如何编写提高扩展技术能力 步骤: 1、创建数据库 mybatis_plus 2、创建user表 DROP TABLE IF EXISTS user;CREATE…

ajax调用java程序,从微信小程序到鸿蒙JS开发-JS调用Java

除轻量级智能穿戴设备,现鸿蒙支持的手机、汽车、TV、手表、平板等属于富鸿蒙,在JS语言的项目中也有Java模块,并提供了JS跨语言调用Java方法的技术。现需要实现查看商品评论时,统计出长评、中评和短评的比例,这里将评论…

文本删除空行_010 Editor for mac(文本和十六进制编辑器)

为大家带来最新版本的010 Editor for mac,这是一款专业的文本和十六进制编辑器,新版本的010 editor mac版包含了语法突出显示、更多字符集支持、添加了删除行和删除空行命令等新功能,另外修复了各种错误,功能更加全面。010editor …

Mybatis-Plus之四种lambda方式LambdaQueryWrapper,QueryWrapper<实体>().lambda(),LambdaQueryChainWrapper<实体>

Mybatis-Plus之四种lambda方式 lambda四种表达形式 前言 使用了lambda表达式 可以通过方法引用的方式来使用实体字段名的操作&#xff0c;避免直接写数据库表字段名时的错写名字&#xff1b; 一、LambdaQueryWrapper<> /*** lambda 条件构造器* 生成的sql语句 SELECT…

sql怎么修改服务器角色,sql角色服务器的设置

sql角色服务器的设置 内容精选换一换如果您需要对华为云上购买的DDM资源&#xff0c;为企业中的员工设置不同的访问权限&#xff0c;为达到不同员工之间的权限隔离&#xff0c;您可以使用统一身份认证服务(Identity and Access Management&#xff0c;简称IAM)进行精细的权限管…

MyBatis-Plus——字段类型处理器TypeHandler

字段类型处理器&#xff08;TypeHandler&#xff09; 1&#xff0c;准备工作 &#xff08;1&#xff09;MyBatis 中的 TypeHandler 类型处理器用于 JavaType 与 JdbcType 之间的转换&#xff0c;假设我们用户表中有一个联系方式字段&#xff0c;类型为字符串&#xff1a; &am…

额外参数_Pytorch获取模型参数情况的方法

分享人工智能技术干货&#xff0c;专注深度学习与计算机视觉领域&#xff01;相较于Tensorflow&#xff0c;Pytorch一开始就是以动态图构建神经网络图的&#xff0c;其获取模型参数的方法也比较容易&#xff0c;既可以根据其内建接口自己写代码获取模型参数情况&#xff0c;也可…

Mybatis-Plus之逻辑删除

概念 什么是逻辑删除 逻辑删除:假删除。将对应数据中代表是否被删除字段状态修改为“被删除状态”,之后在数据库中仍旧能看到此条数据记录。 数据库实现思路:插入数据时,标记为未删除状态;查询、修改时,只获取未删除状态的数据进行操作;删除时则更新删除状态为已删除…

查看分支编码_MySQL分支数据库MariaDB之CentOS安装教程

MariaDB数据库管理系统是MySQL的一个分支&#xff0c;由MySQL的创始人Michael Widenius主持开发。采用GPL授权许可 MariaDB的目的是完全兼容MySQL&#xff0c;包括API和命令行&#xff0c;在存储引擎方面&#xff0c;使用XtraDB(英语&#xff1a;XtraDB)来代替MySQL的InnoDB。1…

关联规则算法c语言样例及分析_推荐系统总结系列-关联规则算法(四)

基于关联规则的推荐有三种方法&#xff1a;Apriori关联规则算法FP Tree关联规则算法&#xff1b;PrefixSpan关联规则算法&#xff1b;关联规则挖掘推荐算法&#xff1a;关联规则挖掘是一种在大规模交易中识别类似规则关系模式的通用技术&#xff0c;可以应用到推荐系统中。交易…