使用python-Spark使用的场景案例具体代码分析

使用场景

1. 数据批处理

• 日志分析:互联网公司每天会产生海量的服务器日志,如访问日志、应用程序日志等。Spark可以高效地读取这些日志文件,对数据进行清洗(例如去除无效记录、解析日志格式)、转换(例如提取关键信息如用户ID、访问时间、访问页面等)和分析(例如统计页面访问量、用户访问路径等)。

• 数据仓库ETL(Extract,Transform,Load):在构建数据仓库时,需要从各种数据源(如关系型数据库、文件系统等)提取数据,进行清洗、转换和加载到数据仓库中。Spark可以处理大规模的数据,并且通过其丰富的转换操作(如对数据进行聚合、关联等),能够很好地完成ETL流程。

2. 机器学习与数据挖掘

• 推荐系统:基于用户的行为数据(如购买记录、浏览历史等)和物品的属性数据,Spark MLlib(机器学习库)可以用于构建推荐模型。例如,使用协同过滤算法来发现用户的兴趣偏好,为用户推荐可能感兴趣的商品、电影、音乐等。

• 聚类分析:对于大规模的数据集,如客户细分场景下的用户特征数据,Spark可以应用聚类算法(如K - Means)将相似的用户或数据点聚集在一起,帮助企业更好地理解客户群体的结构,进行精准营销等活动。

3. 实时数据处理

• 实时监控与预警:在金融领域,Spark Streaming可以实时处理股票交易数据,计算关键指标(如实时股价波动、成交量变化等),当指标超出设定的阈值时,及时发出预警信号。

• 实时交通数据分析:通过接入交通传感器(如摄像头、测速仪等)的实时数据,Spark可以对交通流量、车速、拥堵情况等进行实时分析,为交通管理部门提供决策支持,如动态调整交通信号灯时间。

Spark使用场景的详细代码案例

1. 数据批处理 - 日志分析

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, count, col# 创建SparkSession
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()# 假设日志数据格式为每行: [IP地址, 时间戳, 请求方法, 请求路径, 协议, 状态码, 用户代理]
# 这里模拟一些日志数据
log_data = [("192.168.1.1", "2024-11-08 10:00:00", "GET", "/index.html", "HTTP/1.1", "200", "Mozilla/5.0"),("192.168.1.2", "2024-11-08 10:05:00", "GET", "/about.html", "HTTP/1.1", "200", "Chrome/100.0"),("192.168.1.1", "2024-11-08 10:10:00", "POST", "/login", "HTTP/1.1", "200", "Mozilla/5.0")
]columns = ["ip", "timestamp", "method", "path", "protocol", "status", "user_agent"]
df = spark.createDataFrame(log_data, columns)# 统计每个页面的访问次数
page_views = df.groupBy("path").agg(count("*").alias("views"))
page_views.show()# 统计每个IP的请求次数
ip_requests = df.groupBy("ip").agg(count("*").alias("requests"))
ip_requests.show()# 找出访问次数最多的前5个页面
top_pages = page_views.orderBy(col("views").desc()).limit(5)
top_pages.show()# 关闭SparkSession
spark.stop()

2. 数据批处理 - 数据仓库 ETL(以从CSV文件提取数据并加载到新表为例)

from pyspark.sql import SparkSession
import os# 创建SparkSession
spark = SparkSession.builder.appName("ETLExample").getOrCreate()# 假设源数据是一个CSV文件,路径为以下(这里可以替换为真实路径)
csv_path = "/path/to/source/csv/file.csv"
if not os.path.exists(csv_path):raise FileNotFoundError(f"CSV file not found at {csv_path}")# 读取CSV文件,假设CSV文件有列名:id, name, age
df = spark.read.csv(csv_path, header=True, inferSchema=True)# 进行一些数据转换,比如将年龄加1(这里只是示例,可以根据实际需求调整)
df = df.withColumn("new_age", df.age + 1)# 假设要将数据加载到一个新的Parquet格式文件中,路径为以下(可替换)
output_path = "/path/to/output/parquet/file.parquet"
df.write.mode("overwrite").parquet(output_path)# 关闭SparkSession
spark.stop()

3. 机器学习与数据挖掘 - 推荐系统(基于用户 - 物品评分的协同过滤)

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator# 创建SparkSession
spark = SparkSession.builder.appName("RecommendationSystem").getOrCreate()# 模拟用户 - 物品评分数据
data = [(1, 1, 5.0),(1, 2, 4.0),(2, 1, 3.0),(2, 2, 2.0),(2, 3, 4.0),(3, 1, 2.0),(3, 3, 5.0),(4, 2, 3.0),(4, 3, 4.0),
]columns = ["user_id", "item_id", "rating"]# 创建DataFrame
df = spark.createDataFrame(data, columns)# 划分训练集和测试集
(train_df, test_df) = df.randomSplit([0.8, 0.2])# 创建ALS模型
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")# 训练模型
model = als.fit(train_df)# 对测试集进行预测
predictions = model.transform(test_df)# 评估模型
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error = {rmse}")# 为用户生成推荐
user_recs = model.recommendForAllUsers(5)
user_recs.show(truncate=False)# 关闭SparkSession
spark.stop()

4. 机器学习与数据挖掘 - 聚类分析(K - Means 聚类)

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans# 创建SparkSession
spark = SparkSession.builder.appName("KMeansClustering").getOrCreate()# 模拟客户特征数据,这里假设每个客户有两个特征:年龄和收入
data = [(25, 50000),(30, 60000),(35, 70000),(40, 80000),(28, 55000),(32, 65000),
]columns = ["age", "income"]
df = spark.createDataFrame(data, columns)# 将特征列组合成一个向量列
assembler = VectorAssembler(inputCols=columns, outputCol="features")
df = assembler.transform(df)# 创建K - Means模型,设置聚类数为3
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df)# 预测聚类结果
predictions = model.transform(df)
predictions.show()# 关闭SparkSession
spark.stop()

5. 实时数据处理 - 实时监控与预警(以简单的股票价格监控为例,使用Spark Streaming和Socket模拟实时数据)

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext# 创建SparkSession和StreamingContext,设置批处理间隔为5秒
spark = SparkSession.builder.appName("StockMonitoring").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 5)# 这里使用Socket模拟接收实时股票价格数据,实际中可能是从消息队列等接收
# 假设数据格式为: 股票代码,价格
lines = ssc.socketTextStream("localhost", 9999)# 解析数据
data = lines.map(lambda line: line.split(","))# 将数据转换为DataFrame格式(这里只是简单示例,可能需要更多处理)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([StructField("symbol", StringType(), True),StructField("price", DoubleType(), True)
])
df = data.toDF(schema)# 假设要监控某只股票(这里以股票代码为 'AAPL' 为例),当价格超过150时预警
apple_stock = df.filter(df.symbol == "AAPL")
alert = apple_stock.filter(df.price > 150).map(lambda row: f"Alert: {row.symbol} price {row.price} is high!")alert.pprint()# 启动 StreamingContext
ssc.start()
# 等待停止
ssc.awaitTermination()
6. 实时数据处理 - 实时交通数据分析(使用Spark Streaming和Kafka,假设已经有Kafka环境和交通数据主题)
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json# 创建SparkSession和StreamingContext,设置批处理间隔为10秒
spark = SparkSession.builder.appName("TrafficAnalysis").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 10)# Kafka参数,这里需要替换为真实的Kafka服务器地址和交通数据主题
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topic = "traffic_data"# 从Kafka读取实时交通数据
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)# 解析JSON格式的交通数据,假设数据包含车速、车流量等信息
def parse_traffic_data(json_str):try:data = json.loads(json_str)return (data["location"], data["speed"], data["volume"])except Exception as e:print(f"Error parsing data: {e}")return Noneparsed_data = kafkaStream.map(lambda x: parse_traffic_data(x[1]))
valid_data = parsed_data.filter(lambda x: x is not None)# 将数据转换为DataFrame格式(这里只是简单示例,可能需要更多处理)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([StructField("location", StringType(), True),StructField("speed", DoubleType(), True),StructField("volume", DoubleType(), True)
])
df = valid_data.toDF(schema)# 计算每个区域的平均车速和车流量
average_speed_volume = df.groupBy("location").agg({"speed": "avg", "volume": "sum"})
average_speed_volume.pprint()# 启动 StreamingContext
ssc.start()
# 等待停止
ssc.awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/60977.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AXI DMA IP BUG踩坑记录

1. 问题描述 在突发的过程中总是一旦使用XAxiDma_SimpleTransfer函数就会出现AXI STREAM信号的READY信号先拉高4个数据(32位)的时钟后会迅速拉低,换句话说就是一旦PS端发起了XAxiDma_SimpleTransfer,AXI总线的READY信号就会拉高四个节拍,这样就会导致传输的数据出现问题。…

Vue2教程001:初识Vue

文章目录 1、初识Vue1.1、Vue2前言1.2、创建Vue实例1.3、插值表达式1.4 Vue响应式特性 1、初识Vue 1.1、Vue2前言 Vue是什么? 概念:Vue是一个用于构建用户界面的渐进式框架。 Vue的两种使用方式: Vue核心包开发 场景:局部模块…

vscode vite+vue3项目启动调试

1、经常我们在普通的项目中,如果算法并不复杂,那么基本上console.log就可以搞定,当然也可以直接alert,打包的时候如果不去掉,还会在发版中上接弹出,给你个惊喜。 2、碰到了有些算法过程比较复杂的情况下&a…

Jdbc学习笔记(三)--PreparedStatement对象、sql攻击(安全问题)

目录 (一)使用PreparedStatement对象的原因: 使用Statement对象编写sql语句会遇到的问题 ​编辑 (二)sql攻击 1.什么是sql攻击 2.演示sql攻击 (三)防止SQL攻击 1.PreparedStatement是什么 …

后端分层解耦

引入 在上篇所举的例子中,我们将所有的代码均放在HelloControl方法之中,这样会导致代码的复用性、可读性较差,难以维护。因此我们需 三层架构 在之前的代码中,代码大体可以分为三部分:数据访问、数据逻辑处理、响应数…

97.【C语言】数据结构之栈

目录 栈 1.基本概念 2.提炼要点 3.概念选择题 4.栈的实现 栈初始化函数 入栈函数 出栈函数和栈顶函数 栈顶函数 栈销毁函数 栈 基本概念参见王爽老师的《汇编语言 第四版》第56和57页 节选一部分 1.基本概念 注意:这里提到的数据结构中的栈有别于操作系统的栈,后者是…

初识算法 · 模拟(1)

目录 前言: 替换所有的问号 题目解析 算法原理 算法编写 提莫攻击 题目解析 算法原理 算法编写 外观数列 题目解析 算法原理 算法编写 前言: ​本文的主题是模拟,通过三道题目讲解,一道是提莫攻击,一道是…

【数值分析】高斯-赛德尔方法、规范化幂法、原点移位法

【数值分析】高斯-赛德尔方法、规范化幂法、原点移位法 题目 要求 代码实现过程不能调用任何库函数自带的“线性 方程组求解、特征值求解库函数” 利用高斯-赛德尔方法求解上述线性方程组 使用Python编程求解矩阵A与列向量b import numpy as np import sympy as spdef crea…

【CUDA】了解GPU架构

目录 一、初步认识 二、Fermi架构 三、Kepler 架构 3.1 动态并行 3.2 Hyper-Q 一、初步认识 SM(Streaming Multiprocessors)是GPU架构中非常重要的部分,GPU硬件的并行性就是由SM决定的。以Fermi架构为例,其包含以下主要组成…

64位程序调用32位dll解决方案

最近在做64位代码移植,发现很多老代码使用到了第三方的32位dll;而且这些第三方32位dll库已经年代久远,原开发商已不再了;所以急切的需要在64位主程序 中使用老的32位dll;查询很多解决方案 发现目前只有使用com 进程外组件的方法可以解决此问题…

【HOT100第五天】搜索二维矩阵 II,相交链表,反转链表,回文链表

240.搜索二维矩阵 II 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 先动手写写最简单方法,二重循环。 class Solution { public:bool searchMa…

模板元函数应用:输出字符串。

看下面三个字符串,s1,s2,s3 : string s1 "逆天邪神";wstring s2 _t("焚星妖莲");_string s3 "焚绝尘"; 在控制台输出字符串,可能的一个方案是: void print_test(const wstring& s) {std::…

pytest | 框架的简单使用

这里写目录标题 单个文件测试方法执行测试套件的子集测试名称的子字符串根据应用的标记进行选择 其他常见的测试命令 pytest框架的使用示例 pytest将运行当前目录及其子目录中test_*.py或 *_test.py 形式的所有 文件 文件内的函数名称可以test* 或者test_* 开头 单个文件测试…

【C++】类和对象-深度剖析默认成员函数-上

> 🍃 本系列为初阶C的内容,如果感兴趣,欢迎订阅🚩 > 🎊个人主页:[小编的个人主页])小编的个人主页 > 🎀 🎉欢迎大家点赞👍收藏⭐文章 > ✌️ 🤞 &#x1…

Web性能优化:从基础到高级

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 Web性能优化:从基础到高级 Web性能优化:从基础到高级 Web性能优化:从基础到高级 引言 基础优…

当 docker-compose.yaml 文件部署时,Dify 线上版本升级过程

如果线上 Dify 是通过 docker-compose.yaml 文件部署的,那么当 Dify 版本升级时该如何操作呢?官方已经给出了 Docker compose 和 Source Code 两种方式。相对而言,前者更简单些,至少不需要安装依赖包和迁移数据库文件。为了更加具…

如何让手机ip变成动态

在数字化浪潮中,手机已成为我们日常生活中不可或缺的一部分。无论是浏览网页、使用社交媒体还是进行在线购物,手机都扮演着举足轻重的角色。然而,在享受网络带来的便利时,我们也需要关注网络安全和隐私保护。静态IP地址可能让手机…

vue3 如何调用第三方npm包内部的 pinia 状态管理库方法

抛砖引玉: 如果在开发vue3项目是, 引用了npm第三方包 ,而且这个包内使用了Pinia 状态管理库,那我们如何去调用 npm内部的 Pinia 状态管理库呢? 实际遇到的问题: 今天在制作npm包时遇到的问题,之前Vue2版本的时候状态管理库用的Vuex ,当时调用npm包内的状态管理库很简单,直接引…

Linux笔记---调试工具GDB(gdb)

1. gdb的概念 GDB,全称GNU Debugger,是一个功能强大的开源调试工具,广泛用于Unix和类Unix系统,以及Microsoft Windows和macOS平台。GDB允许开发者在程序执行过程中查看内部运行情况,帮助定位和修复程序中的错误。 gd…

编译器gcc/g++

gcc 只用来编译C g 编译C/C 1.预处理(进行宏替换/去注释/条件编译/头文件展开等) 先创建 code.c 文件 -E --> 从现在开始,进行程序的翻译,一旦预处理做完,就停下来 -o --> 表明 -o 后面的文件名称 code…