pyspark之Structured Streaming file文件案例1

# generate_file.py 
# 生成数据 生成500个文件,每个文件1000条数据
# 生成数据格式:eventtime name province action ()时间 用户名 省份 动作)
import os 
import time
import shutil
import time

FIRST_NAME = ['Zhao', 'Qian', 'Sun', 'Li', 'Zhou', 'Wu', 'Zheng', 'Wang']
SECOND_NAME = ['San', 'Si', 'Wu', 'Chen', 'Yang', 'Min', 'Jie', 'Qi']
PROVINCE = ['BeiJing', 'ShanDong', 'ShangHai', 'HeNan', 'HaErBin']
ACTION = ['login', 'logout', 'purchase']

PATH = "/opt/software/tmp/"
DATA_PATH = "/opt/software/tmp/data/"
    # 初始化环境
    def test_Setup():
        if os.path.exists(DATA_PATH):
            shutil.rmtree(DATA_PATH)
        os.mkdir(DATA_PATH)

    # 清理数据,恢复测试环境
    def test_TearDown():
        shutile.rmtree(DATA_PATH)

    # 数据保存文件
    def writeAndMove(filename,content):
        with open(PATH+filename,'wt',encoding='utf-8') as f:
            f.write(content)
        shutil.move(PATH+filename,DATA_PATH+filename)

if __name__ == '__main__':

    test_Setup()
    
    for i in range(500):
        filename = "user_action_{}.log".format(i)
        """
        验证spark输出模式,complete和update,增加代码,第一个文件i=0时,设置PROVINCE = "TAIWAN"
        """
        if i == 0:
            province= ['TaiWan']
        else:
            province = PROVINCE
        content = ""
        for _ in range(1000):
            content += "{} {} {} {}\n".format(str(int(time.time())),random.choice(FIRST_NAME)+random.choice(SECOND_NAME),random.choice(province),random.choice(ACTION))
        writeAndMove(filename,content)    
        time.sleep(10)

# spark_file_test.py  
# 读取DATA文件夹下面文件,按照省份统计数据,主要考虑window情况,按照window情况测试,同时针对    outputMode和输出console和mysql进行考虑,其中保存到mysql时添加batch字段

from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import split,lit,from_unixtime

DATA_PATH = "/opt/software/tmp/data/"

if __name__ == '__main__':
    spark = SparkSession.builder.getOrCreate()
    lines = spark.readStream.format("text").option("seq","\n").load(DATA_PATH)
    # 分隔符为空格
    userinfo = lines.select(split(lines.value," ").alias("info"))
    # 第一个为eventtime  第二个为name   第三个为province  第四个为action 
    # userinfo['info'][0]等同于userinfo['info'].getIterm(0)
    user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'),
        userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'),
        userinfo['info'][3].alias('action'))
    """
    测试1:数据直接输出到控制台,由于没有采用聚合,输出模式选择update
    user.writeStream.outputMode("update").format("console").trigger(processingTime="8 seconds").start().awaitTermination()
    """
    """
    测试2:数据存储到数据库,新建数据库表,可以通过printSchema()查看数据类型情况
    def insert_into_mysql_batch(df:DataFrame,batch):
        if df.count()>0:
        # 此处将batch添加到df中,采用lit函数
            data = df.withColumn("batch",lit(batch))
            data.write.format("jdbc"). \
        option("driver","com.mysql.jdbc.Driver"). \
        option("url","jdbc:mysql://localhost:3306/spark").option("user","root").\
        option("password","root").option("dbtable","user_log").\
        option("batchsize",1000).mode("append").save()    
        else:
            pass
user.writeStream.outputMode("update").foreachBatch((insert_into_mysql_batch)).trigger(processingTime="20 seconds").start().awaitTermination()
    """
    """
    测试3:数据按照省份统计后,输出到控制台,分析complete和update输出模式区别,针对该问题,调整输入,province="TaiWan"只会输入1次,即如果输出方式complete,则每batch都会输出,update的话,只会出现在一个batch
    userProvinceCounts = user.groupBy("province").count()
    userProvinceCounts = userProvinceCounts.select(userProvinceCounts['province'],userProvinceCounts["count"].alias('sl'))
# 测试输出模式complete:complete将总计算结果都进行输出
"""
batch 0
TaiWan 1000
batch 1
TaiWan 1000
其他省份  sl
batch 2
TaiWan 1000
其他省份  sl
"""    userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination() 
# 测试输出模式update:update只输出相比上个批次变动的内容(新增或修改)
batch 0
TaiWan 1000  
batch 1 中没有TaiWan输出
userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination() 
    """

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/643475.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

概念杂记--到底啥是啥?(数据库篇)

文章目录 1.聚集索引(clustered index)2.非聚集索引(Non-clustered index)3.聚集索引和非聚集索引区别?4.覆盖索引(covering index)5、复合索引 (Composite Index)6.索引…

【MySQL 流浪之旅】 第四讲 MySQL 逻辑备份

系列文章目录 【MySQL 流浪之旅】 第一讲 MySQL 安装【MySQL 流浪之旅】 第二讲 MySQL 基础操作【MySQL 流浪之旅】 第三讲 MySQL 基本工具 文章目录 系列文章目录 文章目录 一、什么是逻辑备份? 二、 mysqldump原理 三、mysqldump常用参数 四、mysqldump常见问题 …

HarmonyOS鸿蒙学习基础篇 - 基本语法概述

书接上文 HarmonyOS鸿蒙学习基础篇 - 运行第一个程序 Hello World 基本语法概述 打开 entry>src>main>ets>pages>index.ets 代码如下代码详细解释如下: Entry //Entry装饰的自定义组件将作为UI页面的入口。在单个UI页面中,最多可以使用…

融资项目——EasyExcel将Excel文件保存至数据库

上一篇博客已经基本介绍了EasyExcel的配置与基本使用方法。现在准备使用EasyExcel将Excel文件保存至数据库。 1.由于我们想每读取Excel中的N条记录后将这些记录全部写入数据库中。所以首先我们在Mybatis文件内先要写一个批量保存Excel文件中的记录的sql语句。 <insert id&q…

Dify学习笔记-应用发布(四)

1、发布为公开 Web 站点 使用 Dify 创建 AI 应用的一个好处在于&#xff0c;你可以在几分钟内就发布一个可供用户使用的 Web 应用&#xff0c;该应用将根据你的 Prompt 编排工作。 如果你使用的是自部署的开源版&#xff0c;该应用将运行在你的服务器上 如果你使用的是云服务&…

2024年跨境电商上半年有哪些营销节日?

2024年伊始&#xff0c;跨境电商开启新一轮的营销竞技&#xff0c;那么首先需要客户需求&#xff0c;节假日与用户需求息息相关&#xff0c;那么接下来小编为大家整理2024上半年海外都有哪些节日和假期&#xff1f;跨境卖家如何见针对营销日历选品&#xff0c;助力卖家把握2024…

Java框架篇面试题

&#x1f4d5;作者简介&#xff1a; 过去日记&#xff0c;致力于Java、GoLang,Rust等多种编程语言&#xff0c;热爱技术&#xff0c;喜欢游戏的博主。 &#x1f4d7;本文收录于java面试题系列&#xff0c;大家有兴趣的可以看一看 &#x1f4d8;相关专栏Rust初阶教程、go语言基…

flutter底层架构初探

本文出处&#xff1a;​​​​​​​​​​​​​Flutter 中文开发者网站 架构 embedder嵌入层 提供程序入口&#xff08;其他原生应用也采用此方式&#xff09;&#xff0c;程序由此和底层操作系统协调&#xff08;surface渲染、辅助功能和输入服务&#xff0c;管理事件循环…

书生·浦语大模型--第四节课笔记--XTuner大模型单卡低成本微调

文章目录 Finetune简介指令跟随微调增量预训练微调LoRA QLoRA XTuner介绍快速上手 8GB显卡玩转LLM动手实战环节 Finetune简介 增量预训练和指令跟随 通过指令微调获得instructed LLM 指令跟随微调 一问一答的方式进行 对话模板 计算损失 增量预训练微调 不需要问题只…

PostgreSQL 17新特性:PL/pgSQL支持数组%TYPE以及%ROWTYPE

正在开发中的 PostgreSQL 17 即将增加一个新功能&#xff1a;PL/pgSQL 支持定义伪类型 %TYPE以及%ROWTYPE 数组。 当我们使用 PL/pgSQL 编写存储过程或者函数时&#xff0c;可以定义不同类型的变量。例如&#xff1a; user_id integer; quantity numeric(5); url varchar; my…

微签电子印章系统赋能国泰基金办公自动化升级

近期&#xff0c;国泰基金引入微签电子印章系统&#xff0c;通过印章服务器自动化印章方案&#xff0c;成功搭建起电子印章自动化平台&#xff0c;主要解决了其账单数量过多、极度耗费人力的难题&#xff0c;缩短了印章发送流程和时间&#xff0c;提升了国泰基金的整体办公效率…

Prometheus 架构全面解析

在本指南中&#xff0c;我们将详细介绍 Prometheus 架构。 Prometheus 是一个用 Golang 编写的开源监控和告警系统&#xff0c;能够收集和处理来自各种目标的指标。您还可以查询、查看、分析指标&#xff0c;并根据阈值收到警报。 此外&#xff0c;在当今世界&#xff0c;可观…

Redis面试题26

Redis 的数据类型有哪些&#xff1f;它们分别适用于什么场景&#xff1f; 答&#xff1a;Redis 提供了多种数据类型&#xff0c;每种类型都有不同的特点和适用场景。以下是 Redis 支持的主要数据类型&#xff1a; 字符串&#xff08;String&#xff09;&#xff1a;最基本的数…

nestjs之策略模式的应用

策略模式&#xff08;Strategy Pattern&#xff09;是一种软件设计模式&#xff0c;它定义了算法族&#xff0c;分别封装起来&#xff0c;使它们可以互相替换。策略模式让算法的变化独立于使用算法的客户。这种模式涉及到三个角色&#xff1a; 上下文&#xff08;Context&…

YOLOv8改进 | 检测头篇 | 辅助特征融合检测头FASFFHead (增加额外目标检测层,独家创新)

一、本文介绍 本文给大家带来的改进机制是由我独家创新的FASFFHead检测头,我根据ASFFHead检测头(只能用于三头检测)的基础上进行二次创新,解决由于跨尺度融合的特征丢失情况,同时本文的内容全网无第二份,非常适合大家拿来发表论文,该检测头为四头版本,增加小目标检测层或…

通过代理服务器的方式解决跨域问题

学习源码可以看我的个人前端学习笔记 (github.com):qdxzw/frontlearningNotes 觉得有帮助的同学&#xff0c;可以点心心支持一下哈 这里以本地访问https://heimahr.itheima.net/api/sys/permission接口为列子 Node.js 代理服务器 (server.js) 本次考虑使用JSONP或CORS代理来…

助力医疗数字化转型,贝锐x医百科技案例解析

在医疗数字化这个历史进程的大浪潮中&#xff0c;医药企业扮演着重要的角色&#xff0c;其重要程度恐怕仅次于医疗机构本身。同时&#xff0c;数字化转型对于医药企业的赋能作用也是十分明显的&#xff0c;尤其在营销端&#xff0c;一系列的数字化管理、数字化推广方案已经成为…

安装IIS及搭建asp.net程序遇到的问题

一、安装IIS 在服务器管理中选择IIS&#xff0c;右键选择添加角色和功能 在服务器角色中&#xff0c;应用程序开发中要选择ASP.NET3.5、ASP.NET4.6功能。 单击下一步执行安装即可。 二、常见问题 问题1. HTTP 错误404.17 - Not Found 解决方法&#xff1a; 出现以上问题没有…

幻兽帕鲁Docker服务端搭建

幻兽帕鲁Docker服务端搭建 各种命令 https://bbs.saraba1st.com/2b/thread-2168983-1-1.html 存档恢复 这里直接看这个工程的readme就行&#xff1a;https://github.com/yoko-murasame/palworld-host-save-fix 其他参考&#xff1a;https://forum.gamer.com.tw/C.php?bsn7…

SpringBoot使用Swagger2生成接口文档

一、导入依赖 <!-- knife4j--><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>2.0.7</version></dependency> 二、配置类 通过一下配置&am…