RDD编程入口
RDD编程入口对象是SparkContext对象,想要调用相关的计算api都需要通过构造出的sparkcontext对象调用
RDD的创建
- 通过并行化集合创建RDD
- (本地集合转为分布式),api如下
rdd = sc.parrallize(param1, param2)
参数1是本地集合,参数2是分区数,不写的话默认是16
可以通过以下获取分区数:
rdd.getNumPartitions()
默认分区数一版与主机cpu数一致
计算之后的数据可以通过collect方法输出,该方法是将rdd每个分区中的数据都发送到driver中,形成一个python list对象,相当于将分布式数据集汇聚到本地对象中。
- 通过文件读取创建RDD
sparkcontext.testFile(参数1, 参数2)
参数1是文件名,支持本地文件、hdfs文件,参数2是最小分区数,该参数不一定生效,spark会根据实际情况选择一个合理的分区数
如果小文件比较多,使用
WholeTextFiles(参数1, 参数2)读取,参数1是文件路径,参数2是分区数,读出来的是文件名和文件内容的元组
RDD算子
分布式集合对象上的api就是算子,本地集合的api叫方法或函数,spark中算子分两类:转换算子和行动算子,返回值仍为RDD的就是转换算子,这类算子是懒加载的,没有action 算子,transformation算子不工作,action算子就是一个启动开关,开启rdd的计算。
map算子
map算子是将RDD的数据,通过map算子中参数所代表的函数,一条条地处理,返回新的RDD,函数的类型和返回值都是任意的。
def add(data):data * 10
rdd.map(add)
flatmap算子
对rdd执行map操作,然后执行解除嵌套操作
reduceByKey
针对K-V型RDD,自动按KEY进行分组,然后根据用户提供的聚合函数,完成组内数据的聚合操作
rdd.reduceByKey(func)
func(para1, para2) -> ret_value
func接收两个参数,这两个参数的类型要一致,返回他们聚合之后的值,类型也与参数类型一致,这里的类型一般是K-VRDD的value类型,下例按key分组,然后返回各分组内数据之和
需要注意组内数据计算是一个迭代计算,前两个相加的值再和第三个相加。
mapValues算子
针对二元元组,对其组内的二元元组的value执行map操作
mapValues(func)
func(V) -> U
这里的参数V是二元元组的value值
rdd = sc.parallize([('a', 1), ('a', 1), ('b', 2), ('b', 11)])
rdd.mapValues(lambda a : a * 10)
# 等价于以下操作
rdd.map(lambda x : (x[0], x[1] * 10))
一个简单的wordcount案例
# coding:utf8
from pyspark import SparkConf SparkContextif __name__ == '__main__':conf = SparkConf().setAppName('WordCount').setMaster("local[*]")sc = SparkContext(conf = conf)file_rdd = sc.textFile("mytext.txt")word_rdd = file_rdd.flatMap(lambda x : x.split(" "))# 生成计数元组word_count_rdd = word_rdd.map(lambda x : (x, 1))count_result_rdd = word_count_rdd.reduceByKey(lambda a, b : a + b)print(count_result_rdd.collect())
groupBy算子
对RDD的数据进行分组,分组规格和SQL一致
rdd.groupBy(func)
func(T) -> V
这里的参数和返回值类型都不限定,通过函数func,确定根据什么来分组,按谁分组返回谁,例如我们传入一个二元元组,返回value就是按value分组
result = rdd.groupBy(lambda a : a[0])
# 这里打印是将结果强转为list对象是因为result的value值是一个可迭代对象,不能直接展示value,强转后可
print(result.map(lambda x (x[0], list(x[1])).collect())
filter算子
rdd.fileter(func)
func(T) -> bool
传入任意参数类型,返回bool类型,返回为true,数据被保留,false丢弃
rdd.filter(lambda x : x % 2 == 1)
distinct算子
rdd.distinct()
这个算子不需要参数,可以对任意类型重复值进行去重,数字、元组、字符串等。
union算子
rdd.union(other_rdd)
将两个rdd合并成一个,这里只做合并,不做去重
join算子
对两个RDD执行join操作(可实现SQL的内外连接)
rdd.join(rdd2) # 内连接,就是把相同key的内容连接到一块
rdd.leftOuterJoin # 左外连接
rdd.rightOuterJoin # 右外连接
只能对元组进行操作,默认通过key关联,各种连接的定义可看
https://blog.csdn.net/qq_42936727/article/details/136848735?spm=1001.2014.3001.5501
rdd1 sc.parallelize([(1001,"zhangsan"),(1002,"lisi"),(1003,"wangwu"),(1004,"zhaoliu")]
rdd2=sc.parallelize([(1001,"销售部"),(1002,"科技部")])
#通过ioin算子来进行rdd之间的关联
#对于j0in算子来说关联条件按照二元元组的key来进行关联
print(rdd1.join(rdd2).collect())
上面内连接将key为1001,1002的rdd保留下来,1003和1004的数据会丢失,结果如下,会将匹配的value值组成一个元组,作为新的value
[(1001,(zhangsan','销售部')),(1002,('lisi','科技部'))]
下面是左外连接及其结果
print(rdd1.LeftouterJoin(rdd2).collect())
[(1004,('zhaoliu',None),(1001,('zhangsan','销售部')),(1002,('lisi','科技部')),(1003,('wangwu',None)]
可以看到,结果里面,rdd1(左边)的数据都有,但rdd2的数据若不存在时,则为None,右外连接大家自己去试试就知道了。
intersection算子
求两个rdd的交集,返回一个新的rdd
coding:utf8
from pyspark import SparkConf,SparkContext
if __name_=='main_':conf SparkConf().setAppName("test").setMaster("Local[*]")sc SparkContext(conf=conf)rdd1 sc.parallelize([('a',1),('a',3)])rdd2 sc.parallelize([('a',1),('b',3)])#通过intersection算子求RDD之间的交集,将交集取出返回新RDDrdd3 rdd1.intersection(rdd2)print(rdd3.collect())
结果:
[('a',1)]
glom算子
功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行,比如RDD数据[1,2,3,4,5]有2个分区,那么,被glom后,数据变成:
[1,2,3],[4,5]
使用方法:
rdd.glom()
通常用来查看数据的分区情况,如果想要解嵌套,用上面的rdd.flatMap(lambda x : x)即可
groupByKey
功能:针对KV型RDD,自动按照key分组
用法:
rdd.groupByKey()自动按照key分组
代码:
rdd = sc.parallelize([(a',1),(a',1),('b',1),(b',1),('b',1)])
grouped_rdd = rdd.groupByKey()
print(grouped_rdd.map(Lambda x:(x[0],list(x[1]))).collect))
#结果:
[(b',[1,1,1]),(a',[1,1])]
sortBy算子
功能:对RDD数据进行排序,基于你指定的排序依据
语法:
rdd.sortBy(func,ascending=False,numPartitions=1)
#func:(T)→U:告知按照rdd中的哪个数据进行排序,比如Lambda x:x[a]表示按照rdd中的第二列元素进行排序
#ascending True升序False降序
#numPartitions:用多少分区排序
指定分区为1,则所有分区内数据有序(包括分区之间),不为1,则只保证在自己分区内有序,不保证分区外有序
coding:utf8
from pyspark import SparkConf,SparkContext
if __name__==__main__':conf = SparkConf).setAppName ("test").setMaster("Local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('c',3),('f',1),('b',11),('c',3),('a',1),('c',5),('e',1),('n',9),('a',1)],3)#使用sortBy对rdd执行排序#按照value数字进行排序#参数1函数,表示的是,告知Spark按照数据的哪个列进行排序#参数2:True表示升序False表示降序#参数3:排序的分区数"""注意:如果要全局有序,排序分区数请设置为1"""print(rdd.sortBy(Lambda x:x[1],ascending=True,numPartitions=1).collect())#按照key来进行排序print(rdd.sortBy (lambda x:x[0],ascending=False,numPartitions=1).collect())
sortByKey算子
功能:针对KV型RDD,按照key进行排序
语法:
sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD.>)
Oascending:升序or降序,True升序,Falsel降序,默认是升序
numPartitions:按照几个分区进行排序,如果全局有序,设置1
keyfunc:在排序前对key进行处理,语法是:(k)→U,一个参数传入,返回一个值,真正用来排序的是key处理后的值,常用来处理像需要忽略大小写,对key统一处理为小写后再排序的场景
一个提取json文件的综合小案例
# coding:utf8
from pyspark import SparkConf,SparkContext
import json
if __name__=='__main__'
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
#读取数据文件
file_rdd = sc.textFile("./data/input/order.text")
#进行rdd数据的split按照|符号进行,得到一个个的json数据
jsons_rdd = file_rdd.flatMap(lambda line:line.split("|"))
#通过Python内置的json库,完成json字符串到宁典对象的转换
dict_rdd = jsons_rdd.map(lambda json_str:json.loads(json_str))
#过滤数据,只保留北京的数据
beijing_rdd = dict_rdd.filter(Lambda d:d['areaName']="北京")
#组合北京和商品类型形成新的字符串
category_rdd = beijing_rdd.map(lambda x:x['areaName'+""x['category'])
#对结果集进行去重操作
result_rdd = category_rdd.distinct()
#输出
print(result_rdd.collect())
countByKey算子
上面介绍的都是转换算子,下面我们来看一些action 算子,action算子的返回值都不是RDD,
功能:统计key出现的次数(一般适用于KV型RDD)
代码:
rdd1 = sc.textFile("../data/words.txt")
rdd2 = rdd1.flatMap (Lambda x:x.split(''))
rdd3 = rdd2.map(Lambda x:(x,1))
#result不是rdd而是dict
result = rdd3.countByKey()
print(result)
注意上面的result已经不需要collect()
collect算子
功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
这个算子,是将RDD各个分区数据都拉取到Driver
注意的是,RDD是分布式对象,其数据量可以很大,所以用这个算子之前要心知肚明的了解结果数据集不会太大
不然,会把Driverp内存撑爆
reduce算子
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:rdd.reduce(func)
#func:(T, T)→T
#2参数传入1个返回值,返回值和参数要求类型一致.
执行流程如下图:
代码:
rdd = sc.parallelize(range (1,10))
#将rdd的数据进行累加求和
print(rdd.reduce(lambda a,b:a b))
该算子也是两两聚合之后将结果再与下一个值聚合,但它与reduceByKey不同,reduce算子是action算子,返回值不是RDD,reduceByKey算子是transformation算子,返回值是RDD
fold算子
功能:和reduce一样,接受传入逻辑进行聚合,聚合是带有初始值的,也就是说如果是累加,累加的初始值是从给定的初始值开始。另外,这个初始值聚合,会作用在分区内聚合和分区间聚合,
比如:[1,2,3],[4,5,6],[7,8,9]],假设初始值为10
数据分布在3个分区
分区1 123聚合的时候带上10作为初始值得到16
分区2 456聚合的时候带上10作为初始值得到25
分区3 789聚合的时候带上10作为初始值得到34
3个分区的结果做聚合也带上初始值10,所以结果是:10+16+25+34=85
first算子-Action
功能:取出RDD的第一个元素
用法:
sc.parallelize([3,2,1]).first()
take算子-Action
功能:取RDD的前N个元素,组合成list返回给你,即使只取1个
用法:
sc.parallelize([3,2,1,4,5,6]).take(5)
[3,2,1,4,5]
Action算子
功能:对RDD数据集进行降序排序,取前N个
用法:
sc.parallelize([3,2,1,4,5,6]).top(3)#top3表示降序取前3个
[6,5,4]
count算子
功能:计算RDD有多少条数据,返回值是一个数字
用法:
sc.parallelize([3,2,1,4,5,6]).count()
6
takeSample算子
功能:随机抽样RDD的数据
用法:
takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
-参数1:True表示允许取同一个数据,False表示不允许取同一个数据.和数据内容无关,是否重复表示的是同一个位置的数据
-参数2:抽样要几个
-参数3:随机数种子,这个参数传入一个数字即可,随意给
随机数种子数字可以随便传,如果传同一个数字那么取出的结果是一致的,
一般参数3我们不传,Spark会自动给与随机的种子
返回值是一个list
代码:
rdd sc.parallelize([1,1,1,1,1],1)
print(rdd.takeSample(True,8))
如果我们传入的抽样数大于rdd数据的个数,若允许重复,则会返回带重复项的数据,若不允许重复,则会返回原rdd数据的随机重排的值
takeOrdered
功能:对RDD进行排序取前N个
用法:
rdd.takeOrdered(参数1,参数2)
-参数1 标识要几个数据
-参数2对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子,也就是说排序是根据第二个参数来排的,但第二个参数并不会修改数据本身值,只是调整位置)
这个方法使用安装元素自然顺序升序排序,如果你想玩倒序,需要用参数2来对排序的数据进行处理
代码:
rdd sc.parallelize([1,3,2,4,7,9,6],1)
print(rdd.takeOrdered(3))
#将数字转换成负数,那么,原本正数最大的,反而变成排序中最小的,固出现在最前端.
print(rdd.takeOrdered(3,lambda x:-x))
因为底层固定按升序,如果处理其他类型想倒序,非数字型的,可能参数2就需要考虑如何反转了
foreach 算子
功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值
用法:
rdd.foreach(func)
#func:(T)→None
代码:
rdd sc.parallelize([1,3,2,4,7,9,6],1)
#对数据执行乘以10的操作
#注意,函数不能给返回值,所以在里面直接打印了
r= rdd.foreach(lambda x:print(x * 10))
print(r)#这个r对象,是None因为foreach没有返回值
需要注意的是foreach算子结果是excutor直接执行的,不像其他算子还需要收集到Driver,这在一些场景下能提高效率
saveAsTextFile.算子
功能:将RDD的数据写入文本文件中
支持本地写出,hdfs等文件系统,
代码:
rdd sc.parallelize([1,3,2,4,7,9,6],3)
rdd.saveAsTextFile ("hdfs:/node1:8020/output/11111")
这个算子也是由executor直接执行的,多个分区会保存为多个文件
mapPartitions算子
这个算子是对整个分区做map操作,一次性发送一整个分区的数据,一次性得到map的结果,而map一次操作一个分区内一条数据,这是个transformation算子
from pyspark import SparkConf,SparkContext
if __name__=='__main__':conf = SparkConf().setAppName("test").setMaster("Local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,3,2,4,7,9,6],3)def process(iter):result = list()for it in iter:result.append(it 10)return resultprint(rdd.mapPartitions(process).collect())
mapPartitions算子性能是比map高的
foreachPartition.算子-Action
功能:和普通foreach一致,只是一次处理的是一整个分区数据
rdd sc.parallelize([1,3,2,4,7,9,6],3)
def ride10(data):print(--------)result = list())for i in data:result.append(i 10)print(result)
rdd.foreachPartition(ride10)
结果:
[10,30]
[20,40]
[70,90,60]
foreachPartition就是一个没有返回值的mapPartitions
partitionBy.算子-Transformation
功能:对RDD进行自定义分区操作
用法:
rdd.partitionBy(参数1,参数2)
-参数1重新分区后有几个分区
-参数2自定义分区规则,函数传入
参数2:(K)→int
一个M传入参数进来,类型无所谓,但是返回值一定是int类型
将key传给这个函数,你自己写逻辑,决定返回一个分区编号
分区编号从0开始,不要超出分区数-1
from pyspark import SparkConf,SparkContext
if __name__=='__main__':conf = SparkConf().setAppName("test").setMaster("Local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('hadoop',1),('spark',1),('hello',1),('flink',1),('hadoop',1),('spark',1)])#使用partitionBy自定义分区def = process(k):if 'hadoop'=k or 'hello'=k:return 0if 'spark'==k:return 1return 2print(rdd.partitionBy(3,process).glom().collect())
repartition.算子Transformation
功能:对RDD的分区执行重新分区(仅数量)
用法:
注意:对分区的数量进行操作,一定要慎重
rdd.repartition(N)
传入N决定新的分区数
一般情况下,我们写Spark代码除了要求全局排序设置为1个分区外
多数时候,所有API中关于分区相关的代码我们都不太理会,重分区一般推荐减少分区,不推荐增加分区,改分区会影响并行计算(内存迭代的并行管道数量),分区如果增加,极大可能导致shuffle
代码:
#repartition重新分区(数量)
rdd2 = rdd.repartition(5)
print(rdd2.glom).collect))
rdd3 = rdd.repartition(1)
print(rdd3.glom().collect())
#coalesce也可以修改分区,但它有另外一个参数,默认false,标识是否允许shuffle,repartition底层调用的就是coalesce。
print(rdd.coalesce(1).getNumPartitions())
print(rdd.coalesce(5).getNumPartitions())