一.实验题目
实验所需要求: centos7虚拟机 pyspark spark python3 hadoop分布式
统计历届春晚的节目数目
统计各个类型节目的数量,显示前10名
统计相声类节目历年的数目。
查询每个演员在春晚上表演节目的数量。
统计每年各类节目的数量,打印(节目类型、年份、数量),按照节目类型升序排序,节目类型相同时其次按照年份降序排序
二 实验代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, asc, split, explode
# 创建 SparkSession
spark = SparkSession.builder \
.appName("SpringFestivalGalaAnalysis") \
.getOrCreate()
# 读取 CSV 文件
df = spark.read.csv("party.csv", sep="\t", header=False, inferSchema=True)
# 为 DataFrame 设置列名
columns = ["year", "program_type", "program_name", "performers"]
df = df.toDF(*columns)
# 统计历届春晚的节目数目
total_programs = df.count()
print(f"Total programs: {total_programs}")
# 统计各个类型节目的数量,显示前10名
program_type_counts = df.groupBy("program_type").count().orderBy(col("count").desc()).limit(10)
program_type_counts.show()
# 统计相声类节目历年的数目
xiangsheng_counts = df.filter(col("program_type") == "相声").groupBy("year").count().orderBy("year")
xiangsheng_counts.show()
# 查询每个演员在春晚上表演节目的数量
# 将 performers 字段拆分并展开
performers_df = df.withColumn("performer", explode(split(col("performers"), "、")))
performer_counts = performers_df.groupBy("performer").count().orderBy(col("count").desc())
performer_counts.show()
# 统计每年各类节目的数量,打印(节目类型、年份、数量)
# 按照节目类型升序排序,节目类型相同时其次按照年份降序排序
yearly_program_counts = df.groupBy("program_type", "year").count().orderBy([col("program_type").asc(), col("year").desc()])
yearly_program_counts.show()
# 停止 SparkSession
spark.stop()