Spark介绍及RDD操作
- PySpark简介
- spark特点
- 运行原理
- spark实例化
- SparkCore-RDD
- RDD创建
- 转换(Transformation)
- 行动(Action)
PySpark简介
spark特点
- 运行速度快:DAG+内存运算
- 容易使用:Java、Scala、Python、R
- 通用性强:完整而强大的技术栈(Graphx:图计算,SparkSQL:结构化数据处理,MLib/ML:机器学习,Streaming:流式计算框架(微批处理)这方面Flink做的比较好)
- 运行模式多样:Kubernetes、Standalone、YARN、Mesos
- 兼容多种数据源:
备注:使用python调Spark性能稍微不如Java、Scala
运行原理
spark封装了python接口,使用python调用spark工作原理如下
spark实例化
#初始化spark上下文
spark = (SparkSession.builder
# .master("spark://192.168.2.123:7077") # 程序运行环境.master("local[4]") # 如果有集群,这里填写集群资源信息。例如:spark://IP:port.appName("SparkCoreDemo") # 运行程序任务名
# .config("youkey","youvalue") \ # 配置信息
# .config('spark.dynamicAllocation.enabled',True)\ 开启动态分配资源
# .config('spark.dynamicAllocation.initialExecutors',1)\ 动态分配初始executor个数默认值
# .config('spark.dynamicAllocation.minExecutors',1)\ 最少分配1个
# .config('spark.dynamicAllocation.maxExecutors',12)\ 最多分配12个
# .config('spark.executor.memory','12G') \ # 设置每个执行器 内存大小
# .config('spark.executor.cores','4') \ # 设置每个执行器 CPU核数 .getOrCreate())
sc = spark.sparkContext # 用于RDD操作
sc.setLogLevel('ERROR') # 设置日志输出等级
spark.version # 查看spark版本
# spark.stop() # 释放资源
SparkCore-RDD
RDD(Resilient Distributed DataSes)指一个只读的,可分区的分布式数据集。这个数据集放在内存或缓存中,可在计算中重复读取,RDD特点:
- 它是在集群节点上的不可变的、已分区的集合对象
- 通过并行转换的方式来创建,如map、filter、join
- 失败自动重建
- 可以控制存储级别(内存、磁盘等)来进行重用
- 必须是可序列化的
- 是静态类型的
基于RDD的操作:转换、行动
RDD创建
- 基于内存对象
rdd1=sc.parallelize([(“A”,23),(“B”,24),(“C”,29)])
- 读取文件属于转换操作
RDD_A = sc.textFile("./data/A.txt")
转换(Transformation)
返回值还是一个RDD,如Map、GroupBy操作。转换操作是延迟操作的,只有遇到后续的行动(Action)操作才会执行,这也为代码优化提供可能。转换操作如下:
- map(func):对RDD中的每个element都使用func,返回一个新的RDD
- filter(func) : 对RDD中的每个元素都使用func,返回每个执行func为true的元素构成的RDD
- flatMap(func):和map差不多,但是flatMap生成的是多个结果,扁平化的结果。
- mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition
- mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index
- sample(withReplacement,faction,seed):抽样
- union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合
- distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element
- groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist
- reduceByKey(func,[numTasks]):就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数
- sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型
- join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数
- cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks 为并发的任务数
- cartesian(otherDataset):笛卡尔积
对部分方法举例介绍使用 - map实现数据的迭代转换,传入的函数接受的参数为RDD中的每个元素
RDD_A.map(lambda x:x.upper()).collect()
- filter 实现数据的过滤筛选,传入的函数接受的参数为rdd的每个元素,函数的返回值必须为布尔型
# 筛选RDD_A最后一个字符是C的数据
RDD_A.filter(lambda x:x[-1]=="c").collect()
-
flatMp会将每次返回的结果扁平化,例如:map 返回的 是 [1,2] 那么flatmap返回的是1,2
-
reduceByKey可用于文件去重问题
行动(Action)
行动(action)操作经常有返回结果或者将数据写入某个地址,比如count、save等,常用方法如下:
- reduce(func):按照func对数据进行约减
- collect():将RDD封装成数组返回
- count():计算数据集中element的个数
- first():返回数据集中的第一个元素
- take(n):返回前n个elements
- takeSample(withReplacement,num,seed):抽样返回数据集中的num个元素,随机种子seed
- saveAsTextFile(path):写入文本文件到path中
- saveAsSequenceFile(path):将key-value型数据保存到path中
- countByKey():返回的是key对应的个数
- foreach(func):对dataset中的每个元素都使用func
使用介绍如下 - take
rdd_a.take(2)
- countByKey