Spark离线开发指南(详细版)

文章目录

  • 1--Spark—core
      • **2.1--RDD的创建**
        • 2.1.1--并行化创建
        • 2.1.2--获取分区数
        • 2.1.3--读取文件创建RDD
          • textFile
          • wholeTextFile
      • 2.2--RDD算子
        • 2.2.1--算子概念
        • 2.2.2--Transformation算子
          • map
          • flatMap
          • reduceByKey
          • mapValues
          • groupBy
          • filter
          • distinct
          • union
          • join
          • intersection
          • glom
          • groupByKey
          • sortBy
          • sortByKey
          • keys
          • values
        • 2.2.3--Action算子
          • countByKey
          • collect
          • reduce
          • fold
          • aggregate
          • first
          • take
          • top
          • count
          • takeSample
          • takeOrdered
          • foreach
          • saveAsTextFile
          • collectAsMap
        • 2.2.4--分区操作算子
          • mapPartition
          • foreachPartition
          • partitionBy
          • repartition
          • coalesce
      • 2.3--RDD缓存
        • 2.3.1--RDD缓存的目的
        • 2.3.1--RDD缓存的特点
        • 2.3.2--RDD缓存的API
        • 2.3.3--RDD的CheckPoint
      • 2.4--广播变量
        • 2.4.1--概念
        • 2.4.2--API
      • 2.5--累加器
        • 2.5.1--需求
        • 2.5.2--没有累加器的代码演示
        • 2.5.3--增加累加器的代码
      • 2.6--python程序对hdfs文件操作
  • 2--SparkSQL
      • 3.1--DataFrame
        • 3.1.1--SparkSession对象环境的创建
        • 3.1.2--DataFrame的组成
        • 3.1.3--DataFrame代码构建
          • 基于RDD方式1
          • 基于RDD方式2
          • 基于RDD方式3
          • 基于Pandas的DataFrame
          • 读取外部数据
          • 读取mysql数据
          • 读取hive数据
          • 读取es数据
        • 3.1.4--DataFrame DSL风格演示
        • 3.1.5--DataFrame SQL风格演示
        • 3.1.6--数据清理
          • 数据去重
          • 缺失值处理
        • 3.1.7--数据写出
          • 写出到文件
          • 写出到mysql
          • 写出到hive
          • 写出到es
      • 3.2--DataFrameAPI
          • printSchema
          • show
          • createTempView
          • createOrReplaceTempView
          • createGlobalTempView
          • select
          • join
          • filter
          • where
          • groupBy
          • orderBy
          • withColumn
          • withColumnRenamed
          • first
          • limit
          • dropDuplicates
          • dropna
          • fillna
          • write
      • 3.3 类似于RDD的API
          • count
          • collect
          • take
          • first
          • head
          • tail
          • foreach
          • foreachPartition
          • distinct
          • union/unionAll
          • coalesce/repartition
          • cache/persist
          • unpersist
          • columns
          • schema
          • rdd
          • printSchema
      • 3.4--GroupDataAPI
          • count
          • avg
          • sum
          • min
          • max
          • round
          • agg
      • 3.5--column的常用API
          • alias
          • astype
          • between
          • cast
          • astype
          • contains
          • endswith(other)
          • eqNullSafe(other)
          • lit
      • 3.6--UDF
        • 3.6.1--UDF的定义
        • 3.6.2--UDF的返回值
        • 3.6.3--UDAF by RDD
      • 3.7--窗口函数
        • DSL风格
        • SQL风格
  • 3--spark优化

1–Spark—core

2.1–RDD的创建

2.1.1–并行化创建

概念:并行化创建是指:将本地集合=>分布式RDD,这一步就是分布式的开端:本地转分布式

API:

  • parallelize(参数1,参数2)
  • 参数1 集合对象即可 比如list
  • 参数2 分区数,不写默认是电脑的线程数
# 导入spark相关的包
from pyspark import SparkConf,SparkContextif __name__ == '__main__':# 初始化sparkcontext的对象conf = SparkConf().setAppName("text").setMaster("local[*]")sc = SparkContext(conf=conf)# 通过并行化的方式创建RDDrdd= sc.parallelize([1,2,3,4,5,6,7,8,9])# parallelize没有给分区数 默认是电脑cpu的线程print("默认分区",rdd.getNumPartitions())rdd_par = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)print("分区数",rdd_par.getNumPartitions())# 打印rdd的内容# collect方法是将rdd中的每个分区的数据都发送到Driver,# 形成一个python的list的对象 分布式对象==>本地集合print("内容",rdd_par.collect())
2.1.2–获取分区数

getNumPartitions() 获取RDD的分区数量 返回值是Int数字

API:

  • rdd.getNumPartitions()
2.1.3–读取文件创建RDD
textFile

textFile():可以读取本地数据,也可以读取hdfs数据

API:

  • sparkcontext.textFile(参数1,参数2)

  • 参数1:必填,文件路径支持本地,支持HDFS,也支持一些比如S3协议

  • 参数2:可选,表示最小分区数量

  • 注意:参数2话语权不足,spark有自己的判断,在它的允许的范围内,参数2才有效果,超出spark允许的范围,参数2就失效

读取本地文件:

file_rdd1 = sc.textFile("../data/input/a.txt")
print("默认读取分区数:",file_rdd1.getNumPartitions())
print("内容",file_rdd1.collect())

参数2的用法:

# 加最小分区数测试 最小分区数只是个参考值,spark有自己的判断
file_rdd2 = sc.textFile("../data/input/a.txt",5)
file_rdd3 = sc.textFile("../data/input/a.txt",100)
print("rdd2的分区数:",file_rdd2.getNumPartitions())
print("rdd3的分区数",file_rdd3.getNumPartitions())

读取HDFS文件:

hdfs_rdd = sc.textFile("hdfs://node1:8020/input/b.txt")
print("hdfs_rdd分区数:",hdfs_rdd.getNumPartitions())
print("hdfs_rdd内容:",hdfs_rdd.collect())
wholeTextFile

wholeTextFile()适合读取一堆小文件,这个API适合读取小文件,因此文件的数据很小,分区很多,导致shuffle的几率更高,所以尽量少分区读取数据

API:

  • sparkcontext.wholeTextFile(参数1,参数2)
  • 参数1:必填,文件夹的目录,支持本地,支持HDFS,也支持一些比如S3协议
  • 参数2:可选,表示最小分区数量
  • 注意:参数2话语权不足,这API的分区数量最多只能开到文件数量
rdd = sc.wholeTextFiles("../data/input/tiny_files")
print(rdd.map(lambda x:x[1]).collect())

返回结果为二元组的形式展示, 前一个值是文件路径, 后一个值为文件内容

2.2–RDD算子

2.2.1–算子概念

算子:分布式集合对象上的API称之为算子

分类:

  • Transformation:转换算子
  • Action:动作(行动)算子

转换算子:

  • 定义:RDD的算子,返回值仍然是一个RDD的,称之为转换算子
  • 特性:这类算子是lazy 懒加载的,如果没有action算子,Transformation算子是不工作的

动作算子:

  • 定义:返回值不是RDD的就是action算子
2.2.2–Transformation算子
map

map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

API:

  • rdd.map(func)
  • func : f:(T)=>U
  • f:表示这是一个函数(方法)
  • (T)=> U 表示方法传入参数为任意了类型,返回值也是任意类型
  • (A)=> A 表示传入参数为任意类型,返回值也是任意类型。但是传入值和返回值类型相同
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
# 定义方法作为算子的传入
def add(data):return data*10print(rdd.map(add).collect())# 更简单的方式是定义lamda表达式来写的匿名函数
print(rdd.map(lambda data: data * 10).collect())
# 两种方式都可以,第一种适合复杂函数,第二种适合一行解决的函数#结果
[10, 20, 30, 40, 50, 60, 70, 80, 90]
[10, 20, 30, 40, 50, 60, 70, 80, 90]
flatMap

flatMap对rdd执行map操作,然后进行解除嵌套操作.

API:

  • rdd.flatMap(func)
  • func : f:(T)=>U

eg:

  • 嵌套的list: list[[1,2,3,4,5,6],[7,8],[9]]
  • 解除嵌套的list: list[1,2,3,4,5,6,7,8,9]
rdd = sc.parallelize(["hadoop spark hadoop","hadoop flink","spark hadoop"])rdd2 = rdd.map(lambda line: line.split(" "))# flatmap无需传参数就可以直接解除嵌套
rdd3 = rdd.flatMap(lambda line:line.split(" "))#结果
['hadoop', 'spark', 'hadoop', 'hadoop', 'flink', 'spark', 'hadoop']
reduceByKey

针对(K,V)型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

API:

  • rdd.reduceByKey(func)
  • func:(v,v)=>v
  • 接受两个传入参数(类型一致),返回一个值,类型和传入的要求一致
rdd = sc.parallelize([('a',1),('b',1),('c',1),('a',1),('a',1)])print(rdd.reduceByKey(lambda a, b: a + b).collect())#结果
[('b', 1), ('c', 1), ('a', 3)]
mapValues

分区操作算子 就只对value 进行操作

API:

rdd.mapValue(func)

rdd = sc.parallelize([('a',1),('b',2),('a',3),('c',2),('a',5)])# 就只对value 进行操作
print(rdd.mapValues(lambda value: value * 10).collect())#结果
groupBy

将RDD的数据进行分组

API:

  • rdd.groupBy(func)
  • func(T)=>K
  • 函数要求传入一个参数,返回一个返回值,类型无所谓
  • 这个函数是拿到你的输入值后,将所有相同输入值的放在一个组里
  • 分组完成后,每个组是一个二元组,key就是输入值,所有的数据都放入一个迭代器里作为value
rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])# groupby 传入的函数的意思是:通过这个函数,确定按照谁来分组(返回谁即可)
result_rdd = rdd.groupBy(lambda t:t[0])
print(result_rdd.collect())# 将value的值强制转换,才能查看迭代器的内容
print(result_rdd.map(lambda t: (t[0], list(t[1]))).collect())#结果
filter

过滤想要的数据进行保留

API:

  • rdd.filter(func)
  • func(T)=>bool
  • 传入任意类型的参数,返回值是False或者True
rdd = sc.parallelize([1,2,3,4,5])# 通过filter算子过滤出奇数
result_rdd = rdd.filter(lambda x:x%2 == 1)
print(result_rdd.collect())#结果
distinct

对RDD数据进行去重,返回新的RDD

API:

  • rdd.distinct(参数1)
  • 参数1,去重分区数量,一般不用传
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 1, 2, 5, 9, 3])# 去重
print(rdd.distinct().collect())#结果
union

2个RDD合并成1个RDD返回

API:

  • rdd.union(other_rdd)
  • 注意只会合并不会去重
  • 不同类型的RDD也可以混合
  • 合并后的分区数是rdd1和rdd2的分区数和
rdd = sc.parallelize([1, 2, 3, 1, 2, 5, 9, 3])
rdd1 = sc.parallelize(["c", "b", "a"])# 将两个rdd进行合并
print(rdd.union(rdd1).collect())#结果
join

分区操作算子 对两个RDD进行Join操作(可实现sql的内\外连接)

API:

  • 注意:join算子只能用于二元元组

  • rdd.join(other_rdd) 内连接

  • rdd.leftOuterJoin(other_rdd)左外连接

  • rdd.rightOuterJoin(other_rdd)右外连接

rdd1 = sc.parallelize([(1001,"zhangshan"),(1002,"lisi"),(1003,"wangwu"),(1004,"zhaoliu")])
rdd2 = sc.parallelize([(1001,"销售部"),(1002,"科技部")])# 通过join来进行rdd间的关联
# 对于join来说,关联条件按照二元组key来进行关联
print(rdd1.join(rdd2).collect())# 左外连接
print(rdd1.leftOuterJoin(rdd2).collect())# 右外连接
print(rdd1.rightOuterJoin(rdd2).collect())#结果
intersection

求2个RDD的交集,返回一个新的rdd

API:

  • rdd.intersection(other_rdd)
rdd1 = sc.parallelize([1,2,3,4,'a','b'])
rdd2 = sc.parallelize([2,3,4,'b'])# 求rdd间的交集
print(rdd1.intersection(rdd2).collect())#结果
glom

将RDD的数据加上嵌套,这个嵌套按照分区来进行

API:

  • rdd.glom()

eg:

  • rdd数据为[1,2,3,4,5]有两个分区,那么glom后,数据就有可能变成[[1,2,3],[4,5]]
rdd = sc.parallelize([1,2,3,4,5,6,7,8],3)# glom可以查看分区排布
print(rdd.glom().collect())#结果
groupByKey

针对KV型RDD,自动按照key分组

API:

  • rdd.groupByKey()
rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])# 与groupby相同,只是不需要指定 直接就是按照key分组的,
# 并且第二个值是相同key的value的集合
rdd1 = rdd.groupByKey()
rdd2 = rdd1.map(lambda x:(x[0],list(x[1])))print(rdd2.collect())#结果
[('b', [1, 1, 1]), ('a', [1, 1])]
sortBy

对RDD数据进行排序,基于你指定的排序依据

API:

  • rdd.sortBy(func,ascending=False,numPartition=1)

  • func: (T)=>U 告知按照rdd中的哪个数据进行排序 比如:lambda x:x[1] 表示按照rdd中的第二列元素进行排序

  • ascending True升序 False降序

  • numPartition 用多少分区进行排序

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c', 1)],3)# 使用sortby对数据进行排序
# 按照value进行排序
# 注意 如果要全局有序 排序分区数应设为1
rdd1 = rdd.sortBy(lambda x:x[1],ascending=True,numPartitions=1)
print(rdd1.collect())#结果
[('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 3)]
sortByKey

针对KV型RDD,按照key进行排序

API:

  • sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD.>
  • ascending:升序or降序,True升序, False降序,默认是升序
  • numPartitions:按照几个分区进行排序,如果全局有序,设置1
  • keyfunc:在排序前对key进行处理,语法是: (k)→ U ,一个参数传入,返回一个值
rdd = sc.parallelize([('A', 1), ('a', 2), ('b', 1), ('b', 3), ('C', 1)],3)# keyfunc = lambda key: str(key).lower() 忽略大小写的影响
rdd1 = rdd.sortByKey(ascending=True,numPartitions=3,keyfunc=lambda key:str(key).lower())
print(rdd1.collect())#结果
[('A', 1), ('a', 2), ('b', 1), ('b', 3), ('C', 1)]
keys

概述: rdd必须是键值对类型的数据, 获取键值对所有的键(key).


values

概述: rdd必须是键值对类型的数据, 获取键值对所有的值(value).


2.2.3–Action算子
countByKey

统计key出现的次数(一般适用于KV型的RDD)

rdd = sc.textFile("../data/input/a.txt")rdd2 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))# 通过countbykey来对key进行计数 这是一个Action算子 不能colloct
result = rdd2.countByKey()
print(result)
print(type(result))#结果
defaultdict(<class 'int'>, {'hadoop': 2, 'hello': 2, 'zxz': 2, 'wrx': 2})
<class 'collections.defaultdict'>
collect

将RDD各个分区内的数据,统一收集到Driver中,形成List对象

API:

  • rdd.collect()
  • 这个算子,是将RDD各个分区数据都拉取到Driver
  • 注意的是, RDD是分布式对象,其数据量可以很大,所以用这个算子之前要心知肚明的了解结果数据集不会太大。不然,会把Driver内存撑爆
reduce

对RDD数据集按照传入的逻辑进行聚合

API:

  • rdd.reduce(func)
  • func:(T,T)=>T
  • 两个参数传入,一个返回值,返回值和参数要求类型一致
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])# reduce是action算子result = rdd.reduce(lambda a,b:a+b)print(result)#结果
45
fold

和reduce一样,接受传入逻辑进行聚合,聚合是带有初始值的,这个初始值聚合会作用在分区内和分区间

API:

  • rdd,fold(初始值,func)

eg:

  • 比如:[[1,2,3],[4,5,6],[7,8,9]]数据分布在3个分区
  • 分区1 123聚合的时候带上10作为初始值得到16
  • 分区2 456聚合的时候带上10作为初始值得到25
  • 分区3 789聚合的时候带上10作为初始值得到34
  • 3个分区的结果做聚合也带上初始值10,所以结果是: 10+16+ 25+34=85
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
# print(rdd.glom().collect())result = rdd.fold(10,lambda a,b:a+b)
print(result)#结果
85
aggregate

和fold算子相同, 唯一不同的是可以分别定义分区内计算函数和分区外计算函数

    rdd1 = sc.parallelize(range(5), 2)print(rdd1.glom().collect())# aggregate算子演示seqOp = (lambda x,y:x+y)comOp = (lambda x,y:x+y+1)print(rdd1.aggregate(2, seqOp, comOp))
  • 下图所示的是三个算子执行时的计算过程, 这里使用的是累加的过程

  • fold和aggregate计算过程是一样的, 但是唯一区别是aggregate可以指定分区内外的计算方式不同.

  • 设置初始值的算子,初始值被加分区数n+1次

first

取出RDD第一个元素

rdd = sc.parallelize([1,4,3,2,7,6,2,1,9])
print(rdd.first())#结果
1
take

取RDD的前N个元素,组合成List返回

注意: take是将rdd中前n个元素拉取到Driver所在主机的内存中, 由于driver的内存是有限的, 数据过多会造成driver的内存溢出.

print(rdd.take(4))#结果
[1, 4, 3, 2]
top

对RDD数据集进行降序排序,取前N个

print(rdd.top(3))#结果
[9, 7, 6]
count

计算RDD有多少个数据,返回值是个数字

print(rdd.count())#结果
9
takeSample

随机抽样RDD的数据

API:

  • takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
  • 参数1:True表示运行取同一个数据,False表示不允许取同一个数据. 和数据内容无关,是否重复表示的是同一个位置的数据
  • 参数2:抽样要几个
  • 参数3∶随机数种子,这个参数传入—个数字即可,随意给
rdd = sc.parallelize([1,2,1,4,1,6,7,1,2])
print(rdd.takeSample(False, 5,1))#结果
[6, 7, 1, 1, 4]
takeOrdered

对RDD进行排序取前N个

API:

  • rdd.takeordered(参数1,参数2)
  • 参数1要几个数据
  • 参数2对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
  • 这个方法使用按照元素自然顺序升序排序,如果你想玩倒叙,需要用参数2来对排序的数据进行处理
rdd = sc.parallelize([1, 2, 9, 2, 1, 5, 7, 3, 9])# 默认是升序并选取前n个元素
print(rdd.takeOrdered(3))# 增加匿名函数可以使其降序排序并选取前n个 == top
print(rdd.takeOrdered(3, lambda x: -x))#结果
[1, 1, 2]
[9, 9, 7]
foreach

对RDD的每一个元素,执行提供的逻辑操作(和map一样),但这个方法没有返回值

注意: 但是它执行的时候, 每个分区会抢占cpu资源,从而会导致数据乱序的情况

API:

  • rdd.foreach(func)
  • func(T)=>None
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])result = rdd.foreach(lambda x: x * 10)# foreach对元素进行处理后没有返回值
print(result)# 这种方式的输出是由executor输出的而不是drive
result = rdd.foreach(lambda x: print(x * 10))#结果
None
50
60
70
80
90
10
20
30
40
saveAsTextFile

将RDD的数据写入文本文件,支持本地写出,hdfs等文件系统

  • 注意:写出的时候,每个分区所在的Task直接控制数据写出到目标文件系统中,所以才会一个分区产生1个结果文件.
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)# 将内容写到本地
# rdd.saveAsTextFile("../data/output/out2")# 将内容保存到hdfs
rdd.saveAsTextFile("hdfs://node1:8020/input/output/out1")# saveastextfile中是由executor来写入的
  • action算子中foreach和saveAsTextFile算子是分区直接执行的,其余的Action算子都回将结果发送至Driver
collectAsMap

概述: 将二元组的RDD转化为字典(dict). 将它转为rdd,就相当于将其转为本地,不再是弹性分布式数据集rdd了.


2.2.4–分区操作算子
mapPartition

Transformation算子 mapPartition—次被传递的是一整个分区的数据作为一个迭代器(一次性list)对象传入过来

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)def process(iter):result = list()for it in iter:result.append(it*10)return result# mapParttion 是将一个分区的值按照list或者迭代器送到计算的地方
# 相比map 可以提高网络传送的性能
print(rdd.mapPartitions(process).collect())#结果
[10, 20, 30, 40, 50, 60, 70, 80, 90]
foreachPartition

Action算子 和普通的foreach一致,但是一次处理的是一整个分区数据

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)def process(iter):result = list()for it in iter:result.append(it*10)return print(result)# 与mapPartition相同,但是不返回值
rdd.foreachPartition(process)#结果
[10, 20, 30]
[40, 50, 60]
[70, 80, 90]
  • foreachPartition就是一个没有返回值的mapPartitions
partitionBy

Transformation算子 对RDD进行自定义分区

API:

  • rdd.partitionBy(参数1,参数2)
  • 参数1重新分区后有几个分区
  • 参数2自定义分区规则,函数传入
  • 参数2:(K)→ int 一个传入参数进来,类型无所谓,但是返回值一定是int类型将key传给这个函数,你自己写逻辑,决定返回一个分区编号
  • 分区编号从0开始,不要超出分区数-1
rdd = sc.parallelize([('a',1),('b',2),('a',3),('c',2),('d',5)])# 使用partitionby实现自定义分区
def process(key):if 'a' == key or 'b' ==key:return 0if 'c' == key:return 1return 2print(rdd.partitionBy(3, process).glom().collect())#结果
repartition

Transformation算子 对RDD的分区执行重新分区(仅数量)

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)# repartition修改分区
# 使用该API时,尽量降分区数,因为用该api会影响内存迭代管道
print(rdd.repartition(1).getNumPartitions())
print(rdd.repartition(5).getNumPartitions())
coalesce

Transformation算子 对分区数量增减

API:

  • rdd.coalesce(参数1,参数2)
  • 参数1,分区数
  • 参数2,True or False
  • True表示允许shuffle,也就是可以加分区
  • False表示不允许shuffle,也就是不能加分区,False是默认
# coalesce 修改分区 只有将shuffle改为true才能按照所给的数分区,
# 要不然会自动进行安全鉴定而进行分区
print(rdd.coalesce(1).getNumPartitions())print(rdd.coalesce(5,shuffle=True).getNumPartitions())

2.3–RDD缓存

2.3.1–RDD缓存的目的
  • RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消失。
  • RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。
  • 这个特性可以最大化的利用资源,老l日RDD没用了就从内存中清理,给后续的计算腾出内存空间。
  • RDD的缓存技术: Spark提供了缓存APIl,可以让我们通过调用API,将指定的RDD数据保留在内存或者硬盘上缓存的API
2.3.1–RDD缓存的特点
  • 缓存技术可以将过程RDD数据持久化保存到内存或者硬盘上,但是,这个保存在设定上是认为不安全的。

  • 缓存的数据在设计上是认为有丢失风险的,所以缓存有一个特点就是:其保留RDD之间的血缘(依赖)关系
    一旦缓存丢失,可以基于血缘关系的记录,重新计算这个RDD的数据

  • 缓存如何丢失:在内存中的缓存是不安全的,比如断电\计算任务内存不足,把缓存清理给计算让路
    硬盘中因为硬盘损坏也是可能丢失的。

  • RDD是将自己分区的数据,每个分区自行将其数据保存在其所在的Executor内存和硬盘上,这是分散存储

2.3.2–RDD缓存的API

from pyspark.storagelevel import StorageLevel

rdd3.cache() 缓存到内存中

rdd3. persist(StorageLevel.MEMORY_ONLY) 仅内存缓存

rdd3. persist(StorageLevel.MEMORY_ONLY_2) 仅内存缓存,2个副本

rdd3. persist(StorageLevel.DISK_ONLY) 仅缓存硬盘上

rdd3.persist(StorageLevel.DISK_ONLY_2) 仅缓存硬盘上,2个副本

rdd3.persist(storageLevel.DISK_ONLY_3) 仅缓存硬盘上,3个副本

rdd3.persist(StorageLevel.MEMORY_AND_DISK) 先放内存,不够放硬盘

rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)先放内存,不够放硬盘,2个副本

rdd3.persist(StorageLevel.OFF_HEAP) 堆外内存(系统内存)

如上API,自行选择使用即可

一般建议使用rdd3.persist(StorageLevel.MEMORY_AND_DISK)

如果内存比较小的集群,建议使用rdd3.persist(StorageLevel.DISK_ONLY)或者就别用缓存了用CheckPoint

主动清理缓存的API rdd . unpersist()

from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("Text").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.textFile("../data/input/a.txt")rdd2 = rdd1.flatMap(lambda x:x.split(" "))rdd3 = rdd2.map(lambda x:(x,1))# 增加缓存rdd3.cache()# 自定义缓存rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)rdd4 = rdd3.reduceByKey(lambda a,b:a+b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x:sum(x))print(rdd6.collect())# 解除缓存rdd3.unpersist()
2.3.3–RDD的CheckPoint

CheckPoint技术,也是将RDD的数据,保存起来,但是它仅支持硬盘存储,并且,它被设计认为是安全的不保留血缘关系

特点:

  • CheckPoint存储RDD数据,是集中收集各个分区数据进行存储,而缓存是分散存储
  • CheckPoint不管分区数量多少,风险是一样的,缓存分区越多,风险越高
  • CheckPoint支持写入HDFS,缓存不行, HDFS是高可靠存储, CheckPoint被认为是安全的.
  • CheckPoint不支持内存,缓存可以,缓存如果写内存性能比CheckPoint要好一些
  • CheckPoint因为设计认为是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全,所以保留

使用:

CheckPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用CheckPoint比较合适,或者数据量很大,用CheckPoint比较合适.

如果数据量小,或者RDD重新计算是非常快的,用CheckPoint没啥必要,直接缓存即可.

Cache和CheckPoint两个API都不是Action类型,所以想要它俩工作,必须在后面接上Action,接上Action的目的,是让RDD有数据,而不是为了让checkPoint和Cache工作。

from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("Text").setMaster("local[*]")sc = SparkContext(conf=conf)# 1、告知spark开启checkpoint功能sc.setCheckpointDir("hdfs://node1:8020/input/output/ckp")rdd1 = sc.textFile("../data/input/a.txt")rdd2 = rdd1.flatMap(lambda x:x.split(" "))rdd3 = rdd2.map(lambda x:(x,1))# 2、调用checkpoint的api 保存数据即可rdd3.checkpoint()rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())# 解除缓存rdd3.unpersist()

2.4–广播变量

2.4.1–概念

本地list对象,被发送到每个分区的处理线程上使用,也就是一个executor内,其实存放了2份一样的数据。
executor是进程,进程内资源共享,这2份数据没有必要,造成了内存浪费。

如果将本地list对象标记为广播变量对象,那么当上述场景出现的时候,Spark只会,给每个Executor来一份数据,而不是像原本那样,每一个分区的处理线程都来一份,节省内存.

2.4.2–API

1.将本地list标记成广播变量是即可

broadcast = sc. broadcast(stu_info_list)

2.使用广播变量,从broadcast对象中取出本地list对象即可value = broadcast.value

  • 也就是先放进去broadcast内部,然后从broadcast内部在取出来用,中间传输的是broadcast这个对象了
  • 只要中间传翰的是broadcast对象,spark就会留意,只会给每个Executor发一份了,而不是傻傻的哪个分区要都给.
from pyspark import SparkContext,SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("text").setMaster("local[*]")sc = SparkContext(conf=conf)stu_info_list = {(1,'zxz',11),(2,'wrx',13),(3,'小明',11),(4,'王大力',11)}# 1.将本地python list对象标记为广播变量broadcast = sc.broadcast(stu_info_list)score_info_rdd = sc.parallelize([(1,'语文',99),(2,'数学',99),(3,'英语',99),(4,'数学',99),(2,'编程',99),(4,'语文',99),(1,'数学',99)])def map_func(data):id = data[0]name = ''# 匹配分布list和rdd分布式的id# 2.在使用到本地集合对象的地方,从广播变量中取出来用即可for stu_info in broadcast.value:stu_id = stu_info[0]if id == stu_id:name = stu_info[1]return (name,data[1],data[2])print(score_info_rdd.map(map_func).collect())"""场景:本地集合对象和分布式对象(RDD)进行关联的时候需要将本地集合封装为广播变量可以节省:1.网络io的次数2.Executor的内存占用"""#结果

2.5–累加器

2.5.1–需求

想要对map算子计算中的数据,进行计数累加,得到全部数据计算完后的累加结果

2.5.2–没有累加器的代码演示
from pyspark import SparkContext,SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("text").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)count = 0def map_func(data):global countcount += 1print(count)rdd.map(map_func).collect()print(count)# 可以看出count最外面的值没有变化#结果0
2.5.3–增加累加器的代码

原因:

count来自driver对象,当在分布式的map算子中需要count对象的时候,driver会将count对象发送给每一个executor一份(复制发送),每个executor各自收到一个,在最后执行print(count)的时候,这个被打印的count依旧是driver的那个,所以不管executor中累加到多少,都和driver这个count无关。

API:

  • sc.accumulator(初始值)
  • 这个对象唯一和前面提到的count不同的是
  • 这个对象可以从各个Executor中收集它们的执行结果,作用回自己身上
from pyspark import SparkContext,SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("text").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)# spark提供的累加器变量,参数是初始值scmlt = sc.accumulator(0)def map_func(data):global scmltscmlt += 1# print(scmlt)rdd2 = rdd.map(map_func)rdd2.collect()rdd3 = rdd2.map(lambda x:x)rdd3.collect()print(scmlt)#结果20

如上代码,第一次rdd2被action后,累加器值是10,然后rdd2就没有了(没数据了),当rdd3构建出来的时候,是依赖rdd2的, rdd2没数据,那么rdd2就要重新生成.重新生成就导致累加器累加数据的代码再次被执行,所以代码的结果是20。

注意事项:

也就是,使用累加器的时候,要注意,因为rdd是过程数据,如果rdd被多次使用可能会重新构建此rdd如果累加器累加代码,存在重新构建的步骤中,累加器累加代码就可能被多次执行。

如何解决:加缓存或者checkPoint即可.

2.6–python程序对hdfs文件操作

from hdfs.client import Clientclient = Client('http://node1:9870')# 创建文件夹
client.makedirs('/datas/aaa')# 删除文件夹
client.delete('/datas/output')# 上传文件
client.upload('/datas/aaa/a.txt','E:/aa/a.txt')# 下载文件
client.download('/datas/a.txt','E:/a.txt')

2–SparkSQL

3.1–DataFrame

3.1.1–SparkSession对象环境的创建
  • SparkSession对象
# Sparksession对象的导包
from pyspark.sql import SparkSessionif __name__ == '__main__':# 构建session环境入口spark = SparkSession.builder.appName("test").master('local[*]') \.getOrCreate()
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
# 通过SparkSession对象获取SparkContext对象
sc = spark.sparkContext
  • 用于SparkSQL编程作为入口对象
# SparkSql的helloworld
df = spark.read.csv("../data/input/stu_score.txt", sep=',', header=False)
df2 = df.toDF("id", "name", "score")
df2.printSchema()
df2.show()df2.createTempView("score")
# SQL风格
spark.sql("""SELECT * FROM score WHERE name='语文' LIMIT 5   
""").show()
  • 所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象
3.1.2–DataFrame的组成

DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:


  • 表结构描述

比如,在MySQL中的一张表:

  • 由许多行组成
  • 数据也被分成多个列
  • 表也有表结构信息(列、列名、列类型、列约束等)

基于这个前提,DataFrame的组成如下:
在结构层面:

  • StructType对象描述整个DataFrame的表结构
  • StructField对象描述一个列的信息

在数据层面:

  • Row对象记录一行数据
  • Column对象记录一列数据并包含列的信息

一个StructField记录:列名、列类型、列是否运行为空

多个StructField组成一个StructType对象。

一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空

同时,一行数据描述为Row对象,如Row(1, 张三, 11)

一列数据描述为Column对象,Column对象包含一列数据和列的信息

3.1.3–DataFrame代码构建
基于RDD方式1

DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构

API:

  • spark.createDataFrame(data_rdd,schema=[‘name’,‘age’])
  • 参数1 被转换的rdd
  • 参数2 通过list的形式按照顺序依次提供字符串名称即可
from pyspark.sql import SparkSessionif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.appName('test').master('local[*]').\getOrCreate()sc = spark.sparkContext# 基于rdd转换成Dataframe类型file_rdd = sc.textFile('../data/input/sql/people.txt')data_rdd = file_rdd.map(lambda x:x.split(', ')).map(lambda x:(x[0],int(x[1])))# 构建Dataframe对象# 参数1 被转换的rdd# 参数2 通过list的形式按照顺序依次提供字符串名称即可df = spark.createDataFrame(data_rdd,schema=['name','age'])# 打印DataFrame的表结构df.printSchema()# 打印df的数据# 参数1:展示多少条数据,默认不传的话是20# 参数2:表示是否对列进截断,如果列的数据超过20个字符串长度,# 后续的内容不显示,以。。。代替,如果给False,表示不截断全部显示,默认是Truedf.show(20,False)# 将df对象转换成临时视图表,可供sql语句查询df.createOrReplaceTempView("people")spark.sql("""SELECT * FROM people WHERE age<30""").show()
基于RDD方式2

通过StructType对象来定义DataFrame的“表结构”转换RDD

API:

  • StructType().add(“name”,StringType(),nullable=True)
  • 参数1 列名 参数2 类型 参数3 是否能为空
  • 如果是多个列就多次调用add()
    # 构建表结构的描述对象,structType对象# 参数1 列名 参数2 类型  参数3 是否能为空schema = StructType().add("name",StringType(),nullable=True).\add("age",IntegerType(),nullable=False)# 基于StructType对象去构建dataframe的对象的rdd转化df = spark.createDataFrame(data_rdd,schema = schema)df.printSchema()df.show()
基于RDD方式3

使用RDD的toDF方法转换RDD

API:

  • data_rdd.toDF(参数)
  • 参数可以是一个列表,也可以是schema对象
# todf 的方式构建Dataframe
df1 = data_rdd.toDF(["name","age"])
df1.printSchema()
df1.show()# todf方式2 通过structType来构建
schema = StructType().add('name',StringType(),nullable=True).\add('age',IntegerType(),nullable=False)df2 = data_rdd.toDF(schema=schema)
df2.printSchema()
df2.show()
基于Pandas的DataFrame

将Pandas的DataFrame对象,转变为分布式的SparkSQL DataFrame对象

API:

  • spark.createDataFrame(参数)
  • 参数为pandas的DataFrame数据
# 基于pandas的dataframe构建sparksql的dataframe对象
pdf = pd.DataFrame({"id":[1,2,3],"name":['张大仙','王小名','张三'],"age":[11,18,16]
})df = spark.createDataFrame(pdf)
df.printSchema()
df.show()
读取外部数据

通过SparkSQL的统一API进行数据读取构建DataFrame

  • 读取text数据源
  • 使用format(“text”)读取文本数据
  • 读取到的DataFrame只会有一个列,列名默认称之为:value

API:

  • spark.read.format(“读取文件类型”). schema().load(读取文件位置)
# 构建StructType,text,
# 读取数据的特点是将一整行当成一个列。
# 默认列名是value 类型是String
schema = StructType().add('data',StringType(),nullable=True)
df = spark.read.format("text").\schema(schema=schema).\load("../data/input/sql/people.txt")df.printSchema()
df.show()
  • 读取json数据源,使用format(“json”)读取json数据
df = spark.read.format('json').load("../data/input/sql/people.json")
df.printSchema()
df.show()
  • 读取csv数据源,使用format(“csv”)读取csv数据
df =spark.read.format('csv').\option("sep",";").\  按照什么切割option("harder",True).\  是否有表头option("encoding","utf-8").\ 编码schema("name STRING,age INT,job STRING").\ load("../data/input/sql/people.csv")df.printSchema()
df.show()
  • 读取parquet数据源,使用format(“parquet”)读取parquet数据
df = spark.read.format("parquet").load("../data/input/sql/users.parquet")
df.printSchema()
df.show()
读取mysql数据
    df2 = spark.read.format("jdbc").option("url","jdbc:mysql://node1:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \.option("dbtable", "pyspark_info.stu_info") \.option("user", "root") \.option("password", "123456") \.load()df2.show()
读取hive数据
  • 要求开启hive的metastore服务

  • 配置文件

#----------------local模式:在node1配置--------------------------
# 进入配置文件目录,创建配置文件
cd /export/server/spark-local/conf
vim hive-site.xml# 添加以下内容
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>hive.metastore.uris</name><value>thrift://node1.itcast.cn:9083</value></property>
</configuration>#----------------Spark On Yarn模式:在node1,node2,node3配置------------------------
# 进入配置文件目录创建配置文件
cd /export/server/spark-yarn/conf
vim hive-site.xml# 添加以下内容
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>hive.metastore.uris</name><value>thrift://node1.itcast.cn:9083</value></property>
</configuration>
import timefrom pyspark import SparkContext, SparkConf, StorageLevel
import os
import sys
from pyspark.sql import SparkSession, Window# spark入门案例 ---  WordCount
# 1、设置环境变量
from pyspark.sql.functions import *os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark'  # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"if __name__ == '__main__':#创建用于操作Hive的SparkSession"""spark.sql.warehouse.dir:用来指定Hive表数据在HDFS的位置hive.metastore.uris    :用来指定hive的metastore服务(runjar)在哪台主机enableHiveSupport      :开启spark支持hive"""spark = SparkSession \.builder \.appName("SparkSQLAppName") \.master("local[2]") \.config("spark.sql.shuffle.partitions", 2) \.config("spark.sql.warehouse.dir", 'hdfs://node1.itcast.cn:8020/user/hive/warehouse')\.config("hive.metastore.uris", "thrift://node1.itcast.cn:9083")\.enableHiveSupport()\.getOrCreate()# 设置日志级别为WARNspark.sparkContext.setLogLevel("WARN")#--------------------从hive读取-SQL风格--------------------------------#使用spark操作hive的表spark.sql("""select * from  db_hive.emp;""").show()spark.sql("""select deptno,count(*) as cnt from  db_hive.emp group by deptno;""").show()# --------------------从hive读取-DSL风格--------------------------------dataFrame1 = spark.read.table("db_hive.emp")dataFrame2 = dataFrame1 \.select("deptno", "sal") \.groupBy("deptno") \.agg(count("*").alias("cnt"))dataFrame2.show()
读取es数据
df_es_source = spark.read.format('es')\.option("es.node",f"{rule['esNodes']}")\.option("es.resource",f"{rule['esIndex']}/{rule['esType']}")\.option("es.index.read.missing.as.empty","yes")\.option("es.query","?q=*") \.option("es.read.field.include",f"{rule['selectFields']}")\.load()
3.1.4–DataFrame DSL风格演示
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
import pandas as pdif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.appName('test').master('local[*]').\getOrCreate()df = spark.read.format("csv").\schema("id INT,subject STRING,score INT").\load("../data/input/sql/stu_score.txt")# column对象的获取id_column = df['id']subject_column = df['subject']# Dsl风格演示# df.select(["id","subject"]).show()# df.select("id","subject").show()# df.select(id_column,subject_column).show()# filter API# df.filter("score<99").show()# df.filter(df["score"]<99).show()# where API# df.where("score<99").show()# df.where(df["score"] < 99).show()# group by APIdf.groupBy("subject").count().show()df.groupBy(df["subject"]).count().show()# groupby API的返回值不是dataframe类型# groupdata对象是一个关于分组的数据结构,有统一的API供我们分组做聚合# SQL: groupby 后接聚合:sum avg count min max# groupdata对象也类似Sql分组的数据结构,同样有上述的5中聚合方式# groupdata对象其实只是一个中转的一个对象,最终还是要获取dataframe对象r = df.groupBy("subject")print(type(r))
3.1.5–DataFrame SQL风格演示
from pyspark.sql import SparkSessionif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.appName('test').master('local[*]').\getOrCreate()df = spark.read.format("csv").\schema("id INT,subject STRING,score INT").\load("../data/input/sql/stu_score.txt")# 注册成临时表df.createTempView("score")# 注册成临时表df.createOrReplaceTempView("score_2")# 注册或替换临时表# 注册全网临时视图,使用的时候需要在前面带上global_temp.前缀df.createGlobalTempView("score_3")# 可以通过SparkSession对象的sql_api完成sql语句执行spark.sql("SELECT subject,COUNT(*) AS cnt FROM score GROUP BY subject").show()spark.sql("SELECT subject,COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()spark.sql("SELECT subject,COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()
3.1.6–数据清理
数据去重
# dropDuplicates 是dataframe的api可以完成数据去重
# 无参数使用,对全部的列联合起来比较进行去重
df.dropDuplicates().show()# 也可指定列来进行去重
df.dropDuplicates(["age","job"]).show()
缺失值处理
# 无参数使用,只要列中有null就删除这一行数据
df.dropna().show()# thresh = 3 至少满足三个有效列
df.dropna(thresh=3).show()# 表示在指定的列中至少有两个不为空
df.dropna(thresh=2,subset=["name","age"]).show()# 缺失值处理也可以给缺失处进行填写
# dataframe的fillna api可以对确实的列进行填充
df.fillna("loss").show()# 指定列进行填充
df.fillna("N/A",subset=['job']).show()# 设定一个字典对所有列进行填充规则
df.fillna({"name":"未知名称","age":1,"job":"worker"}).show()
3.1.7–数据写出
写出到文件
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerTypefrom pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder.appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions",2).\getOrCreate()sc = spark.sparkContext# 读取数据df = spark.read.format("csv").\option("sep",";").\option("header",True).\load("../data/input/sql/people.csv")schema = StructType().add("user_id", StringType(), nullable=True). \add("movie_id", IntegerType(), nullable=True). \add("rank", IntegerType(), nullable=True). \add("times", StringType(), nullable=True)# 读取文件df = spark.read.format("csv"). \option("sep", "\t"). \option("header", False). \option("encoding", "utf-8"). \schema(schema=schema). \load("../data/input/sql/u.data")# Write text 写出 只能写成一个列的数据,需要将df转换为单列dfdf.select(F.concat_ws("---" , "user_id" , "movie_id", "rank","times")).\write.\mode("overwrite").\format("text").\save("../data/output/sql/text")# Write csvdf.write.mode("overwrite").\format("csv").\option("sep",";").\option("header",True).\save("../data/output/sql/csv")# write jsondf.write.mode("overwrite").\format("json").\save("../data/output/sql/json")# write parquetdf.write.mode("overwrite").\format("parquet").\save("../data/output/sql/parquet")
写出到mysql
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
import os# spark入门案例 ---  WordCount
# 1、设置环境变量
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark-local'  # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"if __name__ == '__main__':# 创建spark对象 用于写sparksqlspark = SparkSession \.builder.appName("test") \.master("local[*]") \.config("spark.sql.shuffle.partitions", 2) \.getOrCreate()# 创建sc对象 用于写rddsc = spark.sparkContextdf1 = spark.read.format("csv") \.schema("id int,name string") \.option("sep", ",") \.option("encoding", "utf8") \.load("./data/input/stu_info.txt")# 写入到mysqldf1.write.mode("append") \.format("jdbc") \.option("url", "jdbc:mysql://node1:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \.option("dbtable", "pyspark_info.stu_info") \.option("user", "root") \.option("password", "123456") \.save()
写出到hive
    # --------------------向hive写入--------------------------------#test_rs表如果不存在,则创建,如果存在,则覆盖数据dataFrame2.write.mode("overwrite").format("hive").saveAsTable("db_hive.test_rs")# 创建分区表df.write.saveAsTable("db_hive.test_rs", mode='append',partitionBy=[''])# append 是新增数据, overwrite则会将已存在的表删除,新建表
写出到es

(‘es.write.operation’, ‘upsert’) 表示插入模式为upsert 没有就插入 有就更新

.option(‘es.mapping.id’, ‘userId’) 表示插入时按照哪个来表示是否已经存在, 也就是给es索引下的_id赋值

(‘es.mapping.name’, ‘userId:userId,tagsId:tagsId’) 表示插入时, df和es索引的映射关系(df字段:es字段)

def es_write(result_df: DataFrame):result_df.write.format('es') \.option('es.nodes', 'up01:9200') \.option('es.resource', 'tfec_userprofile_index') \.option('es.write.operation', 'upsert') \       .option('es.mapping.id', 'userId') \.option('es.mapping.name', 'userId:userId,tagsId:tagsId') \.mode('append') \.save()

3.2–DataFrameAPI

printSchema

打印DataFrame的表结构

API:

  • df.printSchema()
show

打印df的数据

API:

  • df.show(20,False)
  • 参数1:展示多少条数据,默认不传的话是20
  • 参数2:表示是否对列进截断,如果列的数据超过20个字符串长度,
  • 后续的内容不显示,以。。。代替,如果给False,表示不截断全部显示,默认是True
createTempView

将df对象注册成临时视图表,可供sql语句查询

API:

  • df.createTempView(“score”)
createOrReplaceTempView

将df对象注册或替换临时表,可供sql语句查询

API:

  • df.createOrReplaceTempView(“people”)
createGlobalTempView

将df注册全网临时视图,使用的时候需要在前面带上global_temp.前缀

API:

  • df.createGlobalTempView(“score_3”)
  • spark.sql(“SELECT subject,COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject”).show()
select

选择DataFrame中的指定列(通过传入参数进行指定)

API:

  • select(cols)
  • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串列名来指定列
    List[Column]对象或者List[str]对象, 用来选择多个列
  • df.select(“id”,“subject”).show()
  • df.select(id_column,subject_column).show()
  • df.select([“id”,“subject”]).show()
join

表的连接

df1.join(df2, on条件 , 连接方式 ) 'inner ’ 'left ’ 'right ’ ‘full’

df1.withColumn('rk',F.rank().over(Window.partitionBy('dep_id').orderBy(F.col('salary').desc()))).where('rk<=2').join(df2, df1.dep_id == df2.dep_id_2, 'inner')\.select(['emp_id','emp_name','salary','dep_id','dep_name','dep_addr','rk']).show()
filter

过滤DataFrame内的数据,返回一个过滤后的DataFrame

API:

  • filter(参数)
  • df.filter(“score<99”).show()
  • df.filter(df[“score”]<99).show()
where

过滤DataFrame内的数据,返回一个过滤后的DataFrame

API:

  • where(参数)
  • df.where(“score<99”).show()
  • df.where(df[“score”] < 99).show()
groupBy

按照指定的列进行数据的分组, 返回值是GroupedData对象

API:

  • groupBy(参数)
  • df.groupBy(“subject”).count().show()
  • df.groupBy(df[“subject”]).count().show()
  • groupby API的返回值不是dataframe类型
  • groupdata对象是一个关于分组的数据结构,有统一的API供我们分组做聚合
  • SQL: groupby 后接聚合:sum avg count min max
  • groupdata对象也类似Sql分组的数据结构,同样有上述的5中聚合方式
  • groupdata对象其实只是一个中转的一个对象,最终还是要获取dataframe对象
orderBy

按照某列进行排序

API:

  • orderBy(参数1,参数2)
  • 参数1是被排序的列
  • 参数2是ascending 默认是升序 ascending=False为降序
  • orderBy(“cnt”, ascending=False)
withColumn

withcolum方法对已存在的列进行操作返回一个新的列

API:

  • df1.withColumn(参数1,参数2)
  • 参数1是新的一列的名字,如果名字和老列相同则替换,否则作为新列存在
  • 参数2是对已存在列的操作
  • df1.withColumn(“value”,F.explode(F.split(df1[“value”]," ")))
  • 它还能用来实现窗口函数.
withColumnRenamed

对列名进行修改

API:

  • withColumnRenamed(参数1,参数2)
  • 参数1是要原列名
  • 参数2是要现列名
  • 一次只能修改一列,若要修改多个列,多次调用此方法
first

取第一行

limit

取第n行

dropDuplicates

数据去重

API:

  • df.dropDuplicates()
  • 无参数是对全部的列联合起来比较进行去重
  • df.dropDuplicates([“age”,“job”])
  • 也可指定列来进行去重
dropna

缺失值删除

API:

  • df.dropna()
fillna

缺失值填充

API:

  • df.dropna()
write

数据写出

API:

  • write.mode(“overwrite”).format(写出文件类型).save(写出路径)
  • mode(“overwrite”)表示是否覆盖
    • append: 如果数据存在,继续追加
    • overwrite: 如果目标存在时, 覆写以前数据, 存储当前最新数据
    • error/errorifexists: 如果目标存在就报错, 默认模式
    • ignore: 如果目标存在, 不做任何操作. 忽略
  • 注意写出text时应转换成单列的dataframe
  • 案例

3.3 类似于RDD的API

count

概述: 统计行数

collect

概述: 将DataFrame转换成一个列表

take

概述: 取DataFrame中前N行的数据

first

概述: 取DataFrame中第一行的数据

head

概述: 默认取DataFrame中第一行的数据,可以指定返回前N行的数据

tail

概述: 可以指定返回后N行的数据

foreach

概述: 对DataFrame中每条数据进行处理,没有返回值

foreachPartition

概述: 对DataFrame中每个分区数据进行处理,没有返回值

distinct

概述: 对DataFrame中每条数据进行去重处理

union/unionAll

概述: 实现两个DataFrame的合并

coalesce/repartition

概述: 调整DataFrame的分区数

cache/persist

概述: 对DataFrame进行缓存

unpersist

概述: 取消DataFrame的缓存

columns

概述: 返回DataFrame中的所有列名

schema

概述: 返回DataFrame中Schema的信息

rdd

概述: 返回DataFrame中的数据放入RDD中

printSchema

概述: 打印DataFrame的Schema信息

3.4–GroupDataAPI

count

按照分组进行计数,返回一个新的列默认名称是cnt

avg

按照分组求平均值

sum

求和

min

求最小值

max

求最大值

round

保留几位小数

file_df\.select("movie_id","rank")\.groupby("movie_id")\.avg("rank")\.withColumnRenamed("avg(rank)","avg_rank")\.withColumn("avg_rank",F.round("avg_rank",2))\.show()
agg

作用是在里面可以写多个聚合

df.groupBy("user_id").\agg(F.round(F.avg("rank"),2).alias("avg_rank"),F.min("rank").alias("min_rank"),F.max("rank").alias("max_rank")
).show()

3.5–column的常用API

alias

column对象的API 可以针对一个列改名

col("字段名").alias("别名")
astype

转换数据类型,是cast的别名

between

判断数据在什么和什么之间

df_etl.select(F.col("receivable").between(10,20))
cast

数据类型转化

df_etl.select(F.col("receivable").cast(DecimalType(10,2)))
astype

转换数据类型,是cast的别名

contains

判断是否包含

df_etl.select(F.col("receivable").contains("小"))
endswith(other)

boolen值,以other结尾的字符串

df.filter(F.col(‘对手’).endswith(‘熊’)).show()

eqNullSafe(other)

空值/指定值判断

lit

df增加一列每一行都为固定参数的列

new_result_df = result_df.withColumn('rule', F.lit(','.join(index_id_list)))

3.6–UDF

3.6.1–UDF的定义

方式1:

udf2 = spark.udf.register("udf1",num_ride_10,IntegerType())
  • 参数1 注册udf的名称 这个名称仅可以用于sql风格
  • 参数2 udf的处理逻辑 是一个单独的方法
  • 参数3 udf的返回值类型
  • 注意:udf注册的时候必须声名返回值类型,并且udf的真实返回值必须要和声名的一致
  • 返回值对象 这是一个udf对象,仅可以用于dsl语法
  • 当前这种方式定义的udf,可以通过参数1的名称用于sql风格,通过返回值对象来用于dsl风格

调用:

dsl df.select(udf2(df[‘num’])).show()

sql df.selectExpr(“udf1(num)”).show()

方式2:

udf3 = F.udf(num_ride_10,IntegerType())
  • 仅能用于dsl风格

调用:

df.select(udf3((df[‘num’]))).show()

3.6.2–UDF的返回值

返回array

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,ArrayType
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder.appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions",2).\getOrCreate()sc = spark.sparkContext# 构建一个rddrdd = sc.parallelize([["hadoop spark flink"],["hadoop java python"]])df = rdd.toDF(['line'])df.show()# 注册一个udf函数def split(data):return data.split(" ") #返回值是个array对象# TODO 1:方式1构建udfudf2 = spark.udf.register("udf1",split,ArrayType(StringType()))df.select(udf2(df['line'])).show()df.createTempView("lines")spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)# TODO 2:方式2构建udf3 = F.udf(split,ArrayType(StringType()))df.select(udf3(df['line'])).show(truncate=False)

返回dict

import stringfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,ArrayType
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder.appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions",2).\getOrCreate()sc = spark.sparkContext# 假设 有三个数字 1 2 3 我们传入数字,返回数字所在的序号的字母 然后和数字结合成dict返回# 比如传入 1 返回{"num":1,"letters":"a"}rdd = sc.parallelize([[1],[2],[3]])df = rdd.toDF(["num"])# 注册udfdef process(data):return {"num":data,"letters":string.ascii_letters[data]}"""UDF的返回值是字典话,需要用structType来接收"""udf1 = spark.udf.register("udf1",process,StructType().add("num",IntegerType(),nullable=True).\add("letters",StringType(),nullable=True))df.selectExpr("udf1(num)").show(truncate=False)df.select(udf1(df['num'])).show(truncate=False)
3.6.3–UDAF by RDD
import stringfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,ArrayType
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder.appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions",2).\getOrCreate()sc = spark.sparkContextrdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)df = rdd.map(lambda x:[x]).toDF(['num'])# df.show()# 折中的方式 就是使用认定的的mappartitions 算子来完成聚合操作# 如果用mappartition API 完成UDAF聚合 一定要是单分区single_parttion_rdd = df.rdd.repartition(1)# print(single_parttion_rdd.collect())def process(data):sum = 0for row in data:sum += row['num']return [sum] #一定要嵌套list,因为mappartition方法要求的返回值是listprint(single_parttion_rdd.mapPartitions(process).collect())

3.7–窗口函数

DSL风格
"""
@Project :pyspark
@File    :pys_04_windows_func.py
@IDE     :PyCharm
@Author  :wrx
@Date    :2023/7/19 15:44
@describe:
"""from pyspark.sql import SparkSession,Window
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
import os# spark入门案例 ---  WordCount
# 1、设置环境变量
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark-local'  # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"if __name__ == '__main__':# 创建spark对象 用于写sparksqlspark = SparkSession \.builder.appName("test") \.master("local[*]") \.config("spark.sql.shuffle.partitions", 2) \.getOrCreate()# 创建sc对象 用于写rddsc = spark.sparkContext# 定义表结构schema = StructType()\.add("stu_id",StringType(),nullable=True) \.add("project",StringType(),nullable=True) \.add("score",IntegerType(),nullable=True) \df1 = spark.read.format('csv')\.option("header",False)\.option("sep",",")\.option("encoding","utf8")\.schema(schema=schema)\.load("./data/input/stu_score.txt")# DSL实现开窗函数df1.withColumn('rk',F.rank().over(Window.partitionBy("project").orderBy(F.col("score").desc()))).where("rk<=2").show()# 关闭资源sc.stop()spark.stop()
SQL风格
import stringfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,ArrayType
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder.appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions",2).\getOrCreate()sc = spark.sparkContextrdd = sc.parallelize([("张三","class1",99),("网易", "class2", 79),("张六", "class1", 99),("王如", "class2", 88),("张二", "class3", 89),("知晓", "class1", 77),("张力", "class2", 59),("网卡", "class1", 66),("网课", "class2", 100),("回合", "class3", 79)])schema = StructType().add("name",StringType()).add("class",StringType()).add("score",IntegerType())df = rdd.toDF(schema)df.createTempView("stu")# todo :聚合窗口函数的演示spark.sql("""SELECT *,AVG(score) OVER() AS avg_score FROM stu""").show()# todo :排序相关的窗口函数# RAKN over ,DENSE_RANK over,ROW_NUMBER overspark.sql("""SELECT *,ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,RANK() OVER(ORDER BY score) AS rankFROM stu""").show()# todo : NTILEspark.sql("""SELECT * ,NTILE(6) OVER(ORDER BY score DESC) FROM stu""").show()

3–spark优化

  1. 避免创建重复的RDD
    在我们开发一个spark作业时,首先会基于某个数据源(hive或者hdfs文件)来创建一个RDD,然后对这个RDD执行某个算子操作,而得到下一个RDD。以此循环,直到计算出我们需要的结果,在这个过程中,多个RDD会通过不同的算子操作窜起来,这个“RDD串”就是RDD lineage,就是“RDD的血缘关系链”。但是在我们开发过程中,对于同一份数据,只能创建一个RDD,不能创建多个RDD代表同一份数据,从而增加作业的性能开销。

  2. 尽可能复用同一个RDD
    目的是尽可能的减少RDD的数量,从而尽可能减少算子执行的次数

  3. 对多次使用的RDD进行持久化
    当你在Spark代码中多次对一个RDD算子操作时,每调用一次此RDD都会从源头计算一遍,这样性能很差。所以可以对其进行持久化操作。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中,后面再用这个RDD就不用从源头再计算一遍了。

  4. 尽量避免使用shuffle类算子
    如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。
    shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

  5. 使用map-side预聚合的shuffle操作
    如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

  6. 使用高性能算子
    除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。
    a)使用reduceByKey/aggregateByKey替代groupByKey
    b)使用mapPartitions替代普通map
    c)使用foreachPartitions替代foreach
    d)使用filter之后进行coalesce操作
    e)使用repartitionAndSortWithinPartitions替代repartition与sort类操作
    (以上可以自行查博客,找疑点)

  7. 广播大变量
    有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
    示例:
    val list1 = …
    val list1Broadcast = sc.broadcast(list1)
    rdd1.map(list1Broadcast…)

  8. 使用Kryo优化序列化性能
    在Spark中,主要有三个地方涉及到了序列化:

     1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
    

    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
    示例:
    // 创建SparkConf对象。
    val conf = new SparkConf().setMaster(…).setAppName(…)
    // 设置序列化器为KryoSerializer。
    conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

  9. 优化数据结构
    实用性不强,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/860364.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【遇到的问题】集群上查看gpu的使用情况

流程&#xff1a; 查看bme_cpu所有节点的详细情况scontrol show node bme_gpu[12-23] 下面这个看起来分配出去较少 查看bme_cpu空闲节点sinfo -p bme_gpu -o "%n %G %C %m %e NVIDIAA10080GBPCIe 卡 gpu 13看起来最少 在命令中选择这个节点 #!/bin/bash #SBATCH -J rati…

别再盲目生产了!精益KPI管理让你事半功倍!

在竞争日益激烈的制造业领域&#xff0c;如何提升生产效率、降低成本、确保产品质量&#xff0c;是每个企业都需要面对的重要课题。而研华科技作为工业自动化领域的领军企业&#xff0c;凭借其独特的精益生产KPI分析与管理平台&#xff0c;为企业提供了一套行之有效的解决方案。…

OpenAI突然宣布停止向中国提供API服务!

标题 &#x1f31f; OpenAI突然宣布停止向中国提供API服务! &#x1f31f;摘要 &#x1f4dc;引言 &#x1f4e2;正文 &#x1f4dd;1. OpenAI API的重要性2. 停止服务的原因分析3. 对中国市场的影响4. 应对措施代码案例 &#x1f4c2;常见问题解答&#xff08;QA&#xff09;❓…

Java-HashMap和ConcurrentHashMap的区别

Java-HashMap和ConcurrentHashMap的区别 一、关键区别1.数据结构2.线程安全3.性能4.扩容机制 二、源码简析1.并发控制机制2.数据结构转换&#xff1a;链表转红黑树3.扩容机制触发hashMap和concurentHashMap扩容机制的条件 三、putIfAbsent方法computeIfAbsent方法区别 ​ 在 J…

Linux(简单概述)

目录 第一章 初识Linux 第四章 文件管理与常用命令 1.文件基础知识 2.文件显示命令 3.文件内容查询 4. 文件和目录基本操作 5. 文件复制、移动、删除 7. 链接 8. 文件访问权限 9. 文件查找命令 10. 压缩和解压缩 第五章用户与用户组 第六章软件包管理RPM和YUM数据库…

CesiumJS【Basic】- #011天气特效

文章目录 天气特效1 目标2 实现2.1 Weather.ts2.2 main.ts天气特效 1 目标 用着色器实现 - 白天 - 多云 - 雾 - 雨 - 雪 2 实现 在Cesium version 1.118.1中,默认是gles 3.0的语法,以前的gl_FragColor、varying和texture2D无法继续使用 2.1 Weather.ts import * as Ces…

面试-synchronized(java5以前唯一)和ReentrantLock的区别

1.ReentrantLock&#xff08;再入锁&#xff09;&#xff1a; (1).在java.util.concurrent.locks包 (2).和CountDownLatch,FutureTask,Semaphore一样基于AQS实现。 AQS:AbstractQueuedSynchronizer 队列同步器。Java并发用来构建锁或其他同步主键的基础框架&#xff0c;是j.u.c…

【金】04Y? 人脸识别系统 | 前端PyQT

参考-教程bilibil视频&#xff1a;树莓派进阶玩法 | 人脸识别项目教程 界面参考&#xff1a;基于深度学习的人脸识别与管理系统&#xff08;UI界面增强版&#xff0c;Python代码&#xff09;_python管理系统深度学习-CSDN博客 1、 树莓派小项目&#xff1a;人脸识别&#xff…

全面掌握 Jackson 序列化工具:原理、使用与高级配置详解

全面掌握 Jackson 序列化工具:原理、使用与高级配置详解 Jackson 是一个功能强大的 JSON 处理库,广泛应用于 Java 项目中。它提供了丰富的功能和灵活的配置选项,可以轻松地在 Java 对象和 JSON 数据之间进行转换。本文将详细介绍 Jackson 的核心概念、基本用法、高级配置及…

常用的 js 代码片段

常用的 js 代码片段 1. 不使用临时变量交换两个变量2. 浅克隆对象3. 合并对象3. 过滤数组中的假值5. NodeList 转换为数组6. 数组去重7. 两数组的交集8. 两数组的差集9. 两数组的并集10. 数组求和11. 对象数组指定属性求和12. 对象的计算属性13. 检查联网状态14. URL 的查询参数…

如何使用命令提示符查询电脑相关序列号等信息的操作方法

如何使用命令提示符查询硬盘的序列号&#xff1f; 如果出于保修或其他目的&#xff0c;你想知道硬盘驱动器的序列号&#xff0c;你不想使用第三方应用程序&#xff0c;或者如果你更喜欢命令行方法&#xff0c;则可以使用带有命令提示符的命令来显示硬盘驱动器的序列号。 1. 按…

渗透测试之内核安全系列课程:Rootkit技术初探(六)

今天&#xff0c;我们来讲一下内核安全&#xff01; 本文章仅提供学习&#xff0c;切勿将其用于不法手段&#xff01; 目前&#xff0c;在渗透测试领域&#xff0c;主要分为了两个发展方向&#xff0c;分别为Web攻防领域和PWN&#xff08;二进制安全&#xff09;攻防领域。在…

用python写出银行管理系统

1 问题 怎么利用已学的python知识简单写出一个银行管理系统&#xff0c;且编写出开户、查询、取款、存款、转账和管理员登录等功能。 2 方法 使用def定义函数、while循环函数、if函数和import函数并带上一些简单的逻辑思维便可以轻松解决这个看似困难实则简单的程序。 # 1.开…

BAT 利用BAT替换SQL文件中的参数成为可执行SQL文件

1. BAT文件 将下面的代码保存成“01_ExeSqlCre.bat”文件。 echo off SETLOCAL ENABLEDELAYEDEXPANSIONIF EXIST %~dp0\10_Program_Exec.sql (DEL /Q %~dp0\10_Program_Exec.sql )CHCP 65001 FOR /F "EOL. TOKENS* DELIMS" %%a IN (dir /a /b *.sql) DO (FOR /F &q…

ACIS中如何求点在FACE参数域内的坐标

1. 点在 FACE 上 如果点在FACE上&#xff0c;可以采用surface的直接接口&#xff1a;surface::param、surface::test_point和surface::test_point_tol。 virtual SPApar_pos surface::param ( const SPAposition & pos, const SPApar_pos & param_guess SpaAcis::…

【SQL Server数据库】数据的增删改操作

目录 一、用SQL语句完成下列功能。 1、新开设一门课程&#xff0c;名叫网络安全与防火墙&#xff0c;学时40&#xff0c;编号为“0118”&#xff0c;主要介绍网络的安全与主要的防火墙软件。 2、先建立monitor表&#xff0c;其结构与student表大致一样&#xff0e;…

华为仓颉编程语言观感

这里写自定义目录标题 相似点&#xff08;主要与Swift进行对比&#xff09;不同点亮点 花了半天时间&#xff0c;对华为新出的仓颉编程语言做了简单的了解&#xff0c;整体观感如下&#xff1a; 仓颉语言看起来是一门大而全的语言&#xff0c;吸纳了现存的很多中编程语言的范式…

图书管理系统(详解版 附源码)

目录 项目分析 实现页面 功能描述 页面预览 准备工作 数据准备 创建数据库 用户表 创建项目 导入前端页面 测试前端页面 后端代码实现 项目公共模块 实体类 公共层 统一结果返回 统一异常处理 业务实现 持久层 用户登录 用户注册 密码加密验证 添加图书…

Cesium默认bing地图数据,还支持哪些地图的数据源呢?

传统的前端开发增长乏力了&#xff0c;新兴的web3D方向前端开发需求旺盛&#xff0c;这一块在国外很成熟&#xff0c;在国内兴起不久&#xff0c; 甚至很多前端老铁都没听过&#xff0c;没见过&#xff0c;没有意识到&#xff0c;前端除了框架、vue、uniapp这些烂大街的&#x…

黑马苍穹外卖7 用户下单+订单支付(微信小程序支付流程图)

地址簿 数据库表设计 就是基本增删改查&#xff0c;与前面的类似。 用户下单 用户点餐业务流程&#xff1a; 购物车-订单提交-订单支付-下单成功 展示购物车数据&#xff0c;不需要提交到后端 数据库设计&#xff1a;两个表【订单表orders&#xff0c;订单明细表order_d…