PySpark 操作函数一览
Created: Sep 14, 2020 10:28 AM Tags: Big Data, PySpark, Python, Spark
Pyspark.sql.functions
from pyspark.sql import functions as F
函数使用说明
基本数学函数类
abs
sin
、cos
、tan
、asin
、acos
、atan
、sinh
、cosh
、tanh
ceil
、round
、floor
exp
、log
、log2
、pow
、sqrt
、cbrt
factorial
特定类型
日期
current_date
、current_timestamp
、add_months
、unix_timestamp
df = spark.createDataFrame([('2015-04-08',)], ['dt']
df.select(F.add_months(df.dt, 1).alias('next_month'))""""
+----------+
|next_month|
+----------+
|2015-05-08|
+----------+
"""
add_months
、date_add
、date_format
、date_sub
、date_trunc
、date_diff
dayofmonth
、dayofweek
、dayofyear
、weekofyear
hour
、last_day
、minute
、month
、months_between
、next_day
、year
字符
ascii
、substring
、substring_index
base64
、unbase64
decode
、encode
expr
、conv
format_string
length
lower
、upper
reverse
size
Binary
bin
、bitwiseNOT
、hash
、md5
、sha1
、sha2
hex
、unhex
角度
toDegrees
、toRadians
、radians
数字
format_number
判断
isnan
、isnull
统计计算
avg
、corr
、count
、countDistinct
、cume_dist
greatest
、kurtosis
、variance
max
、min
、mean
、rand
、randn
、rank
skewness
、sum
、sumDistinct
数组处理
flatten
、slice
、element_at
、array_contains
、array_distinct、array_except、array_intersect、array_join、array_max、array_min、array_position、array_remove、array_repeat、array_sort、array_union、arrays_overlap、arrays_zip
# 数组列包含元素
df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df.withColumn("array_contains", F.array_contains(df.data, "a")).show()"""
+---------+--------------+
| data|array_contains|
+---------+--------------+
|[a, b, c]| true|
| []| false|
+---------+--------------+
"""
数组函数说明
df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.show()
df_new = df.select(F.arrays_zip(df.vals1, df.vals2).alias('zipped'))
df_new.show()
row = df_new.collect()[0]
row['zipped'][0]['vals1'] == row['zipped'][0][0] == 1"""
+---------+---------+
| vals1| vals2|
+---------+---------+
|[1, 2, 3]|[2, 3, 4]|
+---------+---------++--------------------+
| zipped|
+--------------------+
|[[1, 2], [2, 3], ...|
+--------------------+True
"""
列处理
coalesce
df = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
df.show()
df.withColumn('coalesce', F.coalesce(df.a, df.b)).show()"""
+----+----+
| a| b|
+----+----+
|null|null|
| 1|null|
|null| 2|
+----+----++----+----+--------+
| a| b|coalesce|
+----+----+--------+
|null|null| null|
| 1|null| 1|
|null| 2| 2|
+----+----+--------+
"""
array
# 多列数据合并成单独一列数组
df = spark.createDataFrame([('2015-04-08', 1, )], ['dt', 'int'])
df.select(F.array([df.dt, df.int]).alias("arr")).show()"""
+---------------+
| arr|
+---------------+
|[2015-04-08, 1]|
+---------------+
"""
concat
、concat_ws
df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
df.withColumn('concat', F.concat_ws('-', df.s, df.d).alias('s')).show()
"""
+----+---+--------+
| s| d| concat|
+----+---+--------+
|abcd|123|abcd-123|
+----+---+--------+
"""
col
、column
、lit
df = spark.createDataFrame([(11, 12), (21, 22), (31, 32)], ("a", "b"))
df.withColumn('a+100', F.col('a') + F.lit(100)).show()"""
+---+---+-----+
| a| b|a+100|
+---+---+-----+
| 11| 12| 111|
| 21| 22| 121|
| 31| 32| 131|
+---+---+-----+
"""
explode
、explode_outer
、posexplode
、posexplode_outer
# 将 array/map 展开成新 dataframe 的行df = spark.createDataFrame([(1,2,3)], ArrayType(IntegerType()))
df.select(F.explode(df.value).alias("int")).show()"""
+---+
|int|
+---+
| 1|
| 2|
| 3|
+---+
"""df = spark.createDataFrame([({'a': 1, 'b': 2})], MapType(StringType(), IntegerType()))
df.select(F.explode(df.value).alias('key', 'value')).show()"""
+---+-----+
|key|value|
+---+-----+
| a| 1|
| b| 2|
+---+-----+
"""
from_csv
、from_json
、get_json_object
# 从 json string 提取对应的字段并扩展成列
import json
data = {'a': 1, 'b': [1,2,3]}
data_s = json.dumps(data)schema = StructType([StructField('a', IntegerType(), True),StructField('b', ArrayType(IntegerType()), True)
])df= spark.createDataFrame([(data_s)], schema=StringType())
df.show()"""
+--------------------+
| value|
+--------------------+
|{"a": 1, "b": [1,...|
+--------------------+
"""df_from_json = df.withColumn('json', F.from_json(df.value, schema=schema))df_from_json.select(df_from_json.value, df_from_json.json.a.alias('value.a'), df_from_json.json.b.alias('value.b')
).show()"""
+--------------------+-------+---------+
| value|value.a| value.b|
+--------------------+-------+---------+
|{"a": 1, "b": [1,...| 1|[1, 2, 3]|
+--------------------+-------+---------+
"""
data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
df = spark.createDataFrame(data, ("key", "jstring"))
df.select(df.key, F.get_json_object(df.jstring, '$.f1').alias("c0"), F.get_json_object(df.jstring, '$.f2').alias("c1") ).show()"""
---+-------+------+
|key| c0| c1|
+---+-------+------+
| 1| value1|value2|
| 2|value12| null|
+---+-------+------+
"""
create_map
、map_from_arrays
、map_from_entries
、map_concat
、map_keys
、map_values
、map_entries
df = spark.createDataFrame([([2, 5], ['a', 'b'])], ['k', 'v'])
df.select(F.map_from_arrays(df.k, df.v).alias("map")).show()"""
+----------------+
| map|
+----------------+
|[2 -> a, 5 -> b]|
+----------------+
"""
regexp_extract
、regexp_replace
# 正则提取与正则替换
df = spark.createDataFrame([('100-200',)], ['str'])
df.select('str', F.regexp_extract('str', r'(d+)-(d+)', 1).alias('first'),F.regexp_replace('str', r'(d+)-(d+)', "$2-$1").alias('swap'),
).show()"""
+-------+-----+-------+
| str|first| swap|
+-------+-----+-------+
|100-200| 100|200-100|
+-------+-----+-------+
"""
udf
Pyspark.sql.types
Base 类型
DataType
基本类型
NullType
StringType
BinaryType
BooleanType
DateType
TimestampType
DecimalType
DoubleType
FloatType
ByteType
IntegerType
LongType
ShortType
叠加类型
ArrayType
df = spark.createDataFrame([([1,2,3])], schema=ArrayType(IntegerType()) )
df.show()
# +---------+
# | value|
# +---------+
# |[1, 2, 3]|
# +---------+df.collect()[0].value[0]
# 1
# 默认的 column name 为 value
MapType
df = spark.createDataFrame([({'a': 1, 'b': 2})], schema=MapType(StringType(), IntegerType()) )
df.show()
# +----------------+
# | value|
# +----------------+
# |[a -> 1, b -> 2]|
# +----------------+df.collect()[0]['value']['a']
# 1
StructField
# 需要搭配 StructType 使用
StructField('column_name', DataType, is_nullable)
StructType
df = spark.createDataFrame([(1,)], schema=StructType([StructField('col', IntegerType())]))
df.show()
"""
+---+
|col|
+---+
| 1|
+---+
"""# 复杂一些的情况
df = spark.createDataFrame([({'a': [2,3,4]},)], schema=StructType([StructField('col', MapType(StringType(), ArrayType(IntegerType())))]))
df.show()
"""
+----------------+
| col|
+----------------+
|[a -> [2, 3, 4]]|
+----------------+
"""df.collect()[0]['col']['a'][0]
# 2