day10_Structured Steaming

文章目录

  • Structured Steaming
    • 一、结构化流介绍(了解)
      • 1、有界和无界数据
      • 2、基本介绍
      • 3、使用三大步骤(掌握)
      • 4.回顾sparkSQL的词频统计案例
    • 二、结构化流的编程模型(掌握)
      • 1、数据结构
      • 2、读取数据源
        • 2.1 File Source
        • 2.2 Socket Source
        • 2.3 Rate Source
      • 3、数据处理
      • 4、数据输出
        • 4.1 输出模式
          • 4.1.1 append 模式
          • 4.1.2 complete模式
          • 4.1.3 update模式
        • 4.2 输出终端/位置
      • 5、综合案例(练习)
        • 词频统计_读取文件方式
        • 词频统计_Socket方式
        • 自动生成数据_Rate方式
      • 6、设置触发器Trigger
      • 7、CheckPoint检查点目录设置
  • JSON是什么?
    • 三、Spark 和 Kafka 整合(掌握)
      • 0、整合Kafka准备工作
      • 1.spark和kafka集成
        • 1.1 官网文档链接:
        • 1.2 常见选项:
        • 1.3 常见参数
      • 2、从kafka中读取数据
        • 2.1 流式处理
          • 官方示例:
          • 练习示例
        • 2.2 批处理
          • 官方示例:
          • 演示示例
      • 3、数据写入Kafka中
        • 3.1 流式处理
          • 官方示例:
          • 练习示例
        • 3.2 批处理
          • 官方示例:
          • 演示示例
  • 01_回顾sparkSQL词频统计过程.py
  • 02_结构化流词频统计案例_读取文件方式.py
  • 03_结构化流词频统计案例_socket方式.py
  • 04_结构化流词频统计案例_设置触发器和检查点.py
  • 05_流方式读取kafka数据.py
  • 06_流方式写数据到kafka.py

Structured Steaming

  1. 简单来说:Structured Streaming是Spark提供的一种流处理引擎,就像是“实时数据处理的流水线”,能够以批处理的方式处理实时数据流。

  2. 具体而言

    • 核心概念
      • 流式DataFrame:将实时数据流视为一个无限扩展的DataFrame,支持类似批处理的API。
      • 触发器:控制流处理的时间间隔,如每1秒处理一次数据。
      • 输出模式:支持多种输出模式,如append(追加)、update(更新)和complete(完整输出)。
    • 特点
      • 易用性:提供与Spark SQL一致的API,降低学习成本。
      • 容错性:通过检查点机制(Checkpoint)确保数据处理的可靠性。
      • 扩展性:支持从Kafka、文件系统等多种数据源读取数据,并输出到多种目标系统。
  3. 实际生产场景

    • 在实时监控中,使用Structured Streaming处理传感器数据,实时生成报警。
    • 在用户行为分析中,使用Structured Streaming处理点击流数据,实时更新用户画像。
  4. 总之:Structured Streaming通过易用的API和强大的容错机制,为实时数据处理提供了高效、可靠的解决方案,广泛应用于实时监控、用户行为分析等场景。

在这里插入图片描述

一、结构化流介绍(了解)

1、有界和无界数据

  1. 简单来说:有界数据就像是“有限的书本”,数据量固定且已知;无界数据则像是“无限的河流”,数据持续生成且量未知。

  2. 具体而言

    • 有界数据
      • 定义:数据量固定且已知,处理完成后任务结束。
      • 示例:存储在文件或数据库中的历史数据。
      • 处理方式:适合批处理(Batch Processing),如使用Spark的RDD或DataFrame处理。
    • 无界数据
      • 定义:数据持续生成且量未知,处理任务通常不会结束。
      • 示例:实时日志流、传感器数据、用户点击流。
      • 处理方式:适合流处理(Stream Processing),如使用Spark的Structured Streaming或Flink处理。
  3. 实际生产场景

    • 在历史数据分析中,使用有界数据进行批处理,生成报表和洞察。
    • 在实时监控中,使用无界数据进行流处理,实时生成报警和推荐。
  4. 总之:有界数据和无界数据分别适合批处理和流处理,根据数据特点选择合适的处理方式,能够高效地完成数据分析和处理任务。

  • 有界数据:
有界数据: 指的数据有固定的开始和固定的结束,数据大小是固定。我们称之为有界数据。对于有界数据,一般采用批处理方案(离线计算)特点:1-数据大小是固定2-程序处理有界数据,程序最终一定会停止
  • 无界数据:
无界数据: 指的数据有固定的开始,但是没有固定的结束。我们称之为无界数据
注意: 对于无界数据,我们一般采用流式处理方案(实时计算)特点:1-数据没有明确的结束,也就是数据大小不固定2-数据是源源不断的过来3-程序处理无界数据,程序会一直运行不会结束

2、基本介绍

结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL …

​ Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次

​ 真正的流处理引擎: Storm(早期流式处理引擎)、Flink、Flume(流式数据采集)

3、使用三大步骤(掌握)

StructuredStreaming在进行数据流开发时的三个步骤

  • 1、读取数据流数据 : 指定数据源模式
    • sparksession对象.readStream.format(指定读取的数据源).option(指定读取的参数).load()
  • 2、数据处理: 使用dsl或者sql方式计算数据和SparkSQL操作一样
  • 3、将计算的结果保存 : 指定输出模式,指定位置
    • writeStream.outputMode(输出模式).option(输出的参数配置).format(指定输出位置).start().awaitTermination()

4.回顾sparkSQL的词频统计案例

# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.read\.format('text')\.load('file:///export/data/spark_project/structured_Streaming/data/w1.txt')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出sql_df.show()dsl_df.show()# 5.关闭资源spark.stop()

二、结构化流的编程模型(掌握)

1、数据结构

在这里插入图片描述

在结构化流中,我们可以将DataFrame称为无界的DataFrame或者无界的二维表

2、读取数据源

对应官网文档内容:

https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources

结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作。目前提供了如下数据源:

  • File Source:文件数据源。读取文件系统,一般用于测试。如果文件夹下发生变化,有新文件产生,那么就会触发程序的运行

  • Socket Source:网络套接字数据源,一般用于测试。也就是从网络上消费/读取数据

  • Rate Source:速率数据源。了解即可,一般用于基准测试。通过配置参数,由结构化流自动生成测试数据。

  • Kafka Source:Kafka数据源。也就是作为消费者来读取Kafka中的数据。一般用于生产环境。

2.1 File Source

在这里插入图片描述

相关的参数:

option参数描述说明
maxFilesPerTrigger每次触发时要考虑的最大新文件数 (默认: no max)
latestFirst是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)
fileNameOnly是否检查新文件只有文件名而不是完整路径(默认值:false)将此设置为 true 时,以下文件将被视为同一个文件,因为它们的文件名“dataset.txt”相同: “file:///dataset.txt” “s3://a/dataset.txt " “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt”

将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet。。。。

文件数据源特点:
1- 只能监听目录,不能监听具体的文件
2- 可以通过*通配符的形式监听目录中满足条件的文件 
3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况

读取代码通用格式:

# 原生API
sparksession.readStream.format('CSV|JSON|Text|Parquet|ORC...').option('参数名1','参数值1').option('参数名2','参数值2').option('参数名N','参数值N').schema(元数据信息).load('需要监听的目录地址')# 简化API	
针对具体数据格式,还有对应的简写API格式,例如:sparksession.readStream.csv(path='需要监听的目录地址',schema=元数据信息。。。)

可能遇到的错误一:

在这里插入图片描述

原因: 如果是文件数据源,需要手动指定schema信息

可能遇到的错误二:

在这里插入图片描述

原因: File source只能监听目录,不能监听具体文件
2.2 Socket Source

在这里插入图片描述

首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据下载命令: yum -y install nc执行nc命令, 开启端口号, 写入数据: nc -lk 端口号查看端口号是否被使用命令: netstat -nlp | grep 要查询的端口

注意: 要先启动nc,再启动我们的程序

代码格式:df = spark.readStream \.format('socket') \.option('host', '主机地址') \.option('port', '端口号') \.load()
2.3 Rate Source

在这里插入图片描述

此数据源的提供, 主要是用于进行基准测试

option参数描述说明
rowsPerSecond每秒应该生成多少行 : (例如 100,默认值:1)
rampUpTime在生成速度变为rowsPerSecond之前应该经过多久的加速时间(例如5 s,默认0)
numPartitions生成行的分区: (例如 10,默认值:Spark 的默认并行度)

3、数据处理

​ 指的是数据处理部分,该操作和Spark SQL中是完全一致。可以使用SQL方式进行处理,也可以使用DSL方式进行处理。

4、数据输出

​ 在结构化流中定义好DataFrame或者处理好DataFrame之后,调用**writeStream()**方法完成数据的输出操作。在输出的过程中,我们可以设置一些相关的属性,然后启动结构化流程序运行。

在这里插入图片描述

4.1 输出模式

可能遇到的错误:

在这里插入图片描述

原因: 在结构化流中不能调用show()方法
解决办法: 需要使用writeStream().start()进行结果数据的输出

在进行数据输出的时候,必须通过outputMode来设置输出模式。输出模式提供了3种不同的模式:

append模式
- 定义:只输出新增的数据,适用于不需要更新历史结果的场景。
- 示例:实时日志处理中,只输出新产生的日志记录。
update模式
- 定义:输出新增或更新的数据,适用于需要更新历史结果的场景。
- 示例:实时用户行为分析中,输出用户的最新行为数据。
complete模式
- 定义:输出完整的结果集,适用于需要全局统计结果的场景。
- 示例:实时销售统计中,输出所有销售数据的汇总结果。

实际生产场景

  • 在实时日志处理中,使用append模式输出新日志记录。
  • 在实时用户行为分析中,使用update模式输出用户的最新行为数据。
  • 在实时销售统计中,使用complete模式输出所有销售数据的汇总结果。
  • 1- append模式:增量模式 (默认)

    特点:当结构化程序处理数据的时候,如果有了新数据,才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作,直接报错。而且也不支持排序操作。如果有了排序,直接报错。

  • 2- complete模式:完全(全量)模式

    特点:当结构化程序处理数据的时候,每一次都是针对全量的数据进行处理。由于数据越来越多,所以在数据处理阶段,必须要有聚合操作。如果没有聚合操作,直接报错。另外还支持排序,但是不是强制要求。

  • 3- update模式:更新模式

    特点:支持聚合操作。当结构化程序处理数据的时候,如果处理阶段没有聚合操作,该模式效果和append模式是一致。如果有了聚合操作,只会输出有变化和新增的内容。但是不支持排序操作,如果有了排序,直接报错。

4.1.1 append 模式

1- append模式:增量模式

特点:当结构化程序处理数据的时候,如果有了新数据,才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作,直接报错。而且也不支持排序操作。如果有了排序,直接报错。

如果有了聚合操作,会报如下错误:

在这里插入图片描述

如果有了排序操作,会报如下错误:

在这里插入图片描述

4.1.2 complete模式

2- complete模式:完全(全量)模式

特点:当结构化程序处理数据的时候,每一次都是针对全量的数据进行处理。由于数据越来越多,所以在数据处理阶段,必须要有聚合操作。如果没有聚合操作,直接报错。另外还支持排序,但是不是强制要求。

如果没有聚合操作,会报如下错误:

在这里插入图片描述

4.1.3 update模式

3- update模式:更新模式

特点:支持聚合操作。当结构化程序处理数据的时候,如果处理阶段没有聚合操作,该模式效果和append模式是一致。如果有了聚合操作,只会输出有变化和新增的内容。但是不支持排序操作,如果有了排序,直接报错。

如果有了排序操作,会报如下错误:
在这里插入图片描述

4.2 输出终端/位置

默认情况下,Spark的结构化流支持多种输出方案:

1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式

5、综合案例(练习)

需求: 已知文件中存储了多个单词,要求计算统计出现的次数

词频统计_读取文件方式
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('pyspark_demo')\.master('local[*]')\.getOrCreate()# 2.数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream\.format('text')\.load('file:///export/data/spark_project/structured_Streaming/data/')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出# 注意: 输出不能使用原来sparksql的show()# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start()dsl_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 5.关闭资源spark.stop()
词频统计_Socket方式
首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据下载命令: yum -y install nc# 注意: 端口号: 范围0-65535   但是0-1024都是知名端口号查看端口号是否被使用命令: netstat -nlp | grep 55555执行nc命令, 开启端口号(选择没有被占用), 写入数据: nc -lk 55555

注意: 要先启动nc,再启动我们的程序

代码格式:df = spark.readStream \.format('socket') \.option('host', '主机地址') \.option('port', '端口号') \.load()
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('pyspark_demo')\.master('local[*]')\.getOrCreate()# 2.数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream\.format('socket')\.option('host',"192.168.88.161")\.option('port',"55555")\.load()# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出# 注意: 输出不能使用原来sparksql的show()# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start()dsl_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 5.关闭资源spark.stop()
自动生成数据_Rate方式
from pyspark.sql import SparkSession
import osos.environ["SPARK_HOME"] = "/export/server/spark"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"if __name__ == '__main__':# 1.创建SparkSession对象spark = SparkSession.builder \.appName("StructuredStream_rate") \.master('local[*]') \.getOrCreate()# 2。读取数据df = spark.readStream \.format('rate') \.option("rowsPerSecond", "5") \.option('numPartitions', 1) \.load()# 3.数据处理# 略# 4.数据输出:df.writeStream \.format('console') \.outputMode('update') \.option('truncate', 'false') \.start() \.awaitTermination()# 5.关闭资源spark.stop()

6、设置触发器Trigger

触发器Trigger:决定多久执行一次操作并且输出结果。也就是在结构化流中,处理完一批数据以后,等待一会,再处理下一批数据

主要提供如下几种触发器:

  • 1- 默认方案:也就是不使用触发器的情况。如果没有明确指定,那么结构化流会自动进行决策每一个批次的大小。在运行过程中,会尽可能让每一个批次间的间隔时间变得更短

    result_df.writeStream\.outputMode('append')\.start()\.awaitTermination()
    
  • 2- 配置固定的时间间隔:在结构化流运行的过程中,当一批数据处理完以后,下一批数据需要等待一定的时间间隔才会进行处理**(常用,推荐使用)**

    result_df.writeStream\.outputMode('append')\.trigger(processingTime='5 seconds')\.start()\.awaitTermination()情形说明:
    1- 上一批次的数据在时间间隔内处理完成了,那么会等待我们配置触发器固定的时间间隔结束,才会开始处理下一批数据
    2- 上一批次的数据在固定时间间隔结束的时候才处理完成,那么下一批次会立即被处理,不会等待
    3- 上一批次的数据在固定时间间隔内没有处理完成,那么下一批次会等待上一批次处理完成以后立即开始处理,不会等待
    
  • 3- 仅此一次:在运行的过程中,程序只需要执行一次,然后就退出。这种方式适用于进行初始化操作,以及关闭资源等

    result_df.writeStream.foreachBatch(func)\.outputMode('append')\.trigger(once=True)\.start()\.awaitTermination()
    

7、CheckPoint检查点目录设置

设置检查点,目的是为了提供容错性。当程序出现失败了,可以从检查点的位置,直接恢复处理即可。避免出现重复处理的问题

默认位置: hdfs的/tmp/xxx

如何设置检查点:

1- SparkSession.conf.set("spark.sql.streaming.checkpointLocation", "检查点路径")
2- option("checkpointLocation", "检查点路径")推荐: 检查点路径支持本地和HDFS。推荐使用HDFS路径

检查点目录主要包含以下几个目录位置:
在这里插入图片描述

1-偏移量offsets: 记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据。在处理数据之前会将offset信息写入到该目录2-提交记录commits: 记录已经处理完成的批次。重启任务的时候会检查完成的批次和offsets目录中批次的记录进行对比。确定接下来要处理的批次3-元数据文件metadata: 和整个查询关联的元数据信息,目前只保留当前的job id4-数据源sources: 是数据源(Source)各个批次的读取的详情5-数据接收端sinks: 是数据接收端各个批次的写出的详情6-状态state: 当有状态操作的时候,例如:累加、聚合、去重等操作场景,这个目录会用来记录这些状态数据。根据配置周期性的生成。snapshot文件用于记录状态

JSON是什么?

  1. 简单来说:JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,就像是“数据的通用语言”,易于人阅读和编写,也易于机器解析和生成。

  2. 具体而言

    • 结构
      • 对象:用花括号{}表示,包含键值对,键和值之间用冒号:分隔,键值对之间用逗号,分隔。
      • 数组:用方括号[]表示,包含多个值,值之间用逗号,分隔。
      • :可以是字符串、数字、布尔值、对象、数组或null
    • 示例
      {"name": "Alice","age": 30,"isStudent": false,"courses": ["Math", "Science"],"address": {"city": "Beijing","zip": "100000"}
      }
      
    • 特点
      • 轻量级:相比于XML,JSON格式更简洁,数据量更小。
      • 易读性:结构清晰,易于人阅读和编写。
      • 跨平台:支持多种编程语言,如JavaScript、Python、Java等。
  3. 实际生产场景

    • 在Web开发中,使用JSON作为前后端数据交换的格式。
    • 在API设计中,使用JSON作为请求和响应的数据格式。
    • 在配置文件中,使用JSON存储配置信息。
  4. 总之:JSON是一种轻量级、易读、跨平台的数据交换格式,广泛应用于Web开发、API设计和配置文件等领域。

三、Spark 和 Kafka 整合(掌握)

​ Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次(仅且只会处理一次)的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表

0、整合Kafka准备工作

说明: Jar包上传的位置说明

如何放置相关的Jar包?  1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,目录位置: /export/server/spark/jars2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包目录位置: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars jar包路径jar包下载地址: https://mvnrepository.com/

1.spark和kafka集成

1.1 官网文档链接:

https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html

1.2 常见选项:
选项解释
kafka.bootstrap.servers以英文逗号分隔的host:port列表指定kafka服务的地址
subscribe以逗号分隔的Topic主题列表订阅一个主题topic1或者多个主题topic1,topic2
subscribePattern正则表达式字符串订阅主题的模式。可以用 topic.* 代表多个主题
assign通过一个Json 字符串的方式来表示: {“topicA”:[0,1],“topicB”:[2,4]}要使用的特定TopicPartitions
includeHeaders默认false是否在行中包含Kafka headers。
startingOffsets流或者批的查询开始时的起始点: “earliest”(批默认), “latest” (流默认), or json string json串格式如下 { “topicA”: {“0”:23,“1”:-1}, “topicB”:{“0”:-2} }“earliest”表示最早的偏移量, “latest”表示最近的偏移量, 或每个TopicPartition起始偏移量的json字符串。在json中,-2作为偏移量表示最早,-1表示最晚。注意: 对于批量查询:不允许使用latest(无论是隐式查询还是在json中使用-1)。 对于流查询: 这只适用于新查询开始时,恢复总是从查询结束的地方继续。在查询期间新发现的分区将最早开始。
endingOffsets批量查询结束时的结束点: latest(默认) , or json string {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}}“latest”,指的是最新的, 或每个TopicPartition结束偏移量的json字符串。在json中,-1可以用来表示最近的偏移量,-2(最早的)是不允许的!
1.3 常见参数
参数类型解释
topicstring表示消息是从哪个Topic中消费出来
valuebinary最重要的字段。发送数据的value值,也就是消息内容。如果没有,就为null
keybinary发送数据的key值。如果没有,就为null
partitionint分区编号。表示消费到的该条数据来源于Topic的哪个分区
offsetlong表示消息偏移量
timestamptimestamp接收的时间戳

2、从kafka中读取数据

2.1 流式处理
官方示例:
# 订阅Kafka的一个Topic,从最新的消息数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1,topic2") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅符合规则的Topic,从最新的数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribePattern", "topic.*") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅一个Topic,并且指定header信息
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.option("includeHeaders", "true") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
练习示例

从某一个Topic中读取消息数据

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092")\.option("subscribe","itheima")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(df.topic,F.decode(df.value, 'utf8').alias('key'),F.decode(df.key,'utf8').alias('value'),df.partition,df.offset,df.timestamp,df.timestampType)# 获取数据etl_df.writeStream.format("console").outputMode("append").start().awaitTermination()# 3- 数据处理# result_df1 = df.select(F.expr("cast(value as string) as value"))# # selectExpr = select + F.expr# result_df2 = df.selectExpr("cast(value as string) as value")# result_df3 = df.withColumn("value",F.expr("cast(value as string)"))# 4- 数据输出# 5- 启动流式任务"""如果有多个输出,那么只能在最后一个start的后面写awaitTermination()"""# result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()# result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()# result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()
2.2 批处理
官方示例:
# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df = spark \.read \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df = spark \.read \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribePattern", "topic.*") \.option("startingOffsets", "earliest") \.option("endingOffsets", "latest") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅多个主题,明确指定Kafka偏移量
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
演示示例

订阅一个Topic

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('sparksql_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从Topic开头一直消费到结尾df = spark.read\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(F.expr("cast(key as string) as key"),F.decode(df.key,'utf8'),F.expr("cast(value as string) as value"),F.decode(df.value, 'utf8'),df.topic,df.partition,df.offset)# 获取数据etl_df.show()# # 3- 数据处理# result_df1 = init_df.select(F.expr("cast(value as string) as value"))# # selectExpr = select + F.expr# result_df2 = init_df.selectExpr("cast(value as string) as value")# result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))# # 4- 数据输出# print("result_df1")# result_df1.show()# print("result_df2")# result_df2.show()# print("result_df3")# result_df3.show()# # 5- 释放资源# spark.stop()

3、数据写入Kafka中

3.1 流式处理
官方示例:
# 将Key和Value的数据都写入到Kafka当中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
# 的哪个Topic中。这种方式适用于消费多个Topic的情况
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
练习示例

写出到指定Topic

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费init_df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 3- 数据处理result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))# 4- 数据输出# 注意: 咱们修改完直接保存到kafka的itcast主题中,所以控制台没有数据,这是正常的哦!!!# 5- 启动流式任务result_df.writeStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("topic","itcast")\.option("checkpointLocation", "hdfs://node1:8020/ck")\.start()\.awaitTermination()
3.2 批处理
官方示例:
# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \.write \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("topic", "topic1") \.save()# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \.write \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.save()
演示示例
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费init_df = spark.read\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 3- 数据处理result_df = init_df.select(F.expr("concat(cast(value as string),'_666') as value"))# 4- 数据输出# 5- 启动流式任务result_df.write.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("topic","itcast")\.option("checkpointLocation", "hdfs://node1:8020/ck")\.save()

01_回顾sparkSQL词频统计过程.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 1.读取文件生成dfdf = spark.read.text("file:///export/data/spark_project/09_结构化流/data3.txt")# df.show()# 2.数据处理etl_df = df.dropDuplicates().fillna('未知')# 3.数据分析# 需求: 统计每个单词出现的次数# 方式1: sql方式etl_df.createTempView("word_tb")sql_result_df = spark.sql("""with t as (select explode(split(value," ")) as wordfrom word_tb)select word,count(*) as cnt from t group by word""")# 方式2: dsl方式dsl_result_df = etl_df.select(F.explode(F.split("value", " ")).alias("word")).groupby("word").agg(F.count("word").alias("cnt"))# 4.数据展示/导出sql_result_df.show()dsl_result_df.show()# 注意: 最后一定释放资源spark.stop()

02_结构化流词频统计案例_读取文件方式.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder \.config('spark.sql.shuffle.partitions', 1) \.appName('pyspark_demo') \.master('local[*]') \.getOrCreate()# 2.TODO 数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream \.format('text') \.load('file:///export/data/spark_project/09_结构化流/data/')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出# 注意: 输出不能使用原来sparksql的show(),否则报错# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()

03_结构化流词频统计案例_socket方式.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 1.读取socket发来的消息df = spark.readStream \.format('socket') \.option('host', '192.168.88.161') \.option('port', '55555') \.load()# 2.数据处理# 3.数据分析# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出sql_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()

04_结构化流词频统计案例_设置触发器和检查点.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# TODO: 设置检查点路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://node1:8020/ckpt2")# 1.读取socket发来的消息df = spark.readStream \.format('socket') \.option('host', '192.168.88.161') \.option('port', '55555') \.load()# 2.数据处理# 3.数据分析# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出# TODO: .trigger(processingTime='5 seconds')添加触发器sql_df.writeStream.format('console').outputMode('complete').trigger(processingTime='5 seconds').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()

05_流方式读取kafka数据.py

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("subscribe","kafka_spark1")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(df.topic,F.decode(df.key, 'utf8').alias('key'),F.decode(df.value,'utf8').alias('value'),df.partition,df.offset,df.timestamp,df.timestampType)# 展示数据# 直接展示到控制台etl_df.writeStream.format("console").outputMode("append").start().awaitTermination()

06_流方式写数据到kafka.py

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("subscribe","kafka_spark1")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(F.decode(df.value,'utf8').alias('value'))# TODO: 原来默认展示到控制台,接下来演示如何把数据存储到kafka中etl_df.writeStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("topic","kafka_spark2")\.option("checkpointLocation", "hdfs://node1:8020/ckpt3")\.start()\.awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/67519.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【设计模式】 单例模式(单例模式哪几种实现,如何保证线程安全,反射破坏单例模式)

单例模式 作用:单例模式的核心是保证一个类只有一个实例,并且提供一个访问实例的全局访问点。 实现方式优缺点饿汉式线程安全,调用效率高 ,但是不能延迟加载懒汉式线程安全,调用效率不高,能延迟加载双重检…

无公网IP 实现外网访问本地 Docker 部署 Navidrome

Navidrome 是一款可以在 macOS、Linux、Windows以及 Docker 等平台上运行的跨平台开源音乐服务器应用,它支持传输常见的 MP3、FLAC、WAV等音频格式。允许用户通过 Web 界面或 API 进行音乐库的管理和访问。本文就介绍如何快速在 Linux 系统使用 Docker 进行本地部署…

解决conda create速度过慢的问题

问题 构建了docker容器 想在容器中创建conda环境,但是conda create的时候速度一直很慢 解决办法 宿主机安装的是anaconda 能正常conda create,容器里安装的是miniforge conda create的时候速度一直很慢,因为容器和宿主机共享网络了,宿主机…

【Hive】新增字段(column)后,旧分区无法更新数据问题

TOC 【一】问题描述 Hive修改数据表结构的需求,比如:增加一个新字段。 如果使用如下语句新增列,可以成功添加列col1。但如果数据表tb已经有旧的分区(例如:dt20190101),则该旧分区中的col1将为…

【Python】Selenium根据网页页面长度,模拟向下滚动鼠标,直到网页底部的操作

最近在弄selenium的爬取的过程中,我发现一些网站上的表格,是需要手动拉到底部才能加载完成的。 如果没有拉到底部,那么在获取网页表格的时候,表格就会只有显示的一部分,页面就不完整。 所以我就整理了一些模拟滚动鼠…

openharmony电源管理子系统

电源管理子系统 简介目录使用说明相关仓 简介 电源管理子系统提供如下功能: 重启服务:系统重启和下电。系统电源管理服务:系统电源状态管理和休眠运行锁管理。显示相关的能耗调节:包括根据环境光调节背光亮度,和根…

麒麟操作系统服务架构保姆级教程(十一)https配置

如果你想拥有你从未拥有过的东西,那么你必须去做你从未做过的事情 在运维工作中,加密和安全的作用是十分重要的,如果仅仅用http协议来对外展示我们的网站,过一段时间就会发现网站首页被人奇奇怪怪的篡改了,本来好好的博…

RabbitMQ---消息确认和持久化

(一)消息确认 1.概念 生产者发送消息后,到达消费端会有以下情况: 1.消息处理成功 2.消息处理异常 如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时…

Linux:System V - 共享内存

1.System V共享内存的原理 通过为用户提供系统调用接口,让用户可以申请一块空间,进程A/B也可以通过系统调用接口将创建好的内存通过页表映射进进程的地址空间。完成让不同的两个进程看见同一份资源的目的。如果未来不想继续通信,取消进程和内…

SpringBoot错误码国际化

先看测试效果: 1. 设置中文 2.设置英文 文件结构 1.中文和英文的错误消息配置 package com.ldj.mybatisflex.common;import lombok.Getter;/*** User: ldj* Date: 2025/1/12* Time: 17:50* Description: 异常消息枚举*/ Getter public enum ExceptionEnum {//…

道旅科技借助云消息队列 Kafka 版加速旅游大数据创新发展

作者:寒空、横槊、娜米、公仪 道旅科技:科技驱动,引领全球旅游分销服务 道旅科技 (https://www.didatravel.com/home) 成立于 2012 年,总部位于中国深圳,是一家以科技驱动的全球酒店资源批发商…

Solidity01 Solidity极简入门

一、Solidity 简介 Solidity 是一种用于编写以太坊虚拟机(EVM)智能合约的编程语言。我认为掌握 Solidity 是参与链上项目的必备技能:区块链项目大部分是开源的,如果你能读懂代码,就可以规避很多亏钱项目。 Solidity …

如何使用WPS的JS宏实现Word表格的自动编号

如何使用WPS的JS宏实现Word表格的自动编号&#xff1f;如下图&#xff0c;想要给表格的编号列中添加序号。 使用WPS的JS宏可以实现自动编号&#xff0c;代码如下&#xff1a; n Selection.Tables.Item(1).Rows.Count;for(i 2;i<n;i){Selection.Tables.Item(1).Cell(i,1).…

Python在多个Excel文件中找出缺失数据行数多的文件

本文介绍基于Python语言&#xff0c;针对一个文件夹下大量的Excel表格文件&#xff0c;基于其中每一个文件内、某一列数据的特征&#xff0c;对其加以筛选&#xff0c;并将符合要求与不符合要求的文件分别复制到另外两个新的文件夹中的方法。 首先&#xff0c;我们来明确一下本…

【Linux】应用层自定义协议与序列化

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;Linux 目录 一&#xff1a;&#x1f525; 应用层 &#x1f98b; 再谈 "协议"&#x1f98b; 网络版计算器&#x1f98b; 序列化 和 反序列化 二&#xff1a;&#x1f525; 重新理解 read、…

数字化贷款管理:助贷系统软件为贷款中介带来的五大改变

随着金融行业的数字化转型&#xff0c;贷款中介的业务模式也在不断创新。助贷系统作为数字化管理的核心工具&#xff0c;正在为贷款中介带来深刻的变革。本文将从五个方面探讨助贷系统软件如何改变贷款中介行业的管理模式&#xff0c;提升业务效率&#xff0c;降低运营成本。乐…

MIAOYUN信创云原生项目亮相西部“中试”生态对接活动

近日&#xff0c;以“构建‘中试’生态&#xff0c;赋能科技成果转化”为主题的“科创天府智汇蓉城”西部“中试”生态对接活动在成都高新区菁蓉汇隆重开幕。活动分为成果展览、“中试”生态主场以及成果路演洽谈对接三大板块。在成果展览环节&#xff0c;成都元来云志科技有限…

一文简要了解为什么需要RAG、核心原理与应用场景

欢迎来到AI应用探索&#xff0c;这里专注于探索AI应用。 一、为什么需要RAG&#xff0c;它解决了哪些问题 在自然语言处理领域&#xff0c;生成式预训练模型&#xff08;如GPT&#xff09;已经展示了强大的文本生成能力。然而&#xff0c;这些模型有以下局限性&#xff1a; 知…

gametime

gametime 一、查壳 无壳&#xff0c;32位 二、IDA分析 先看看main 妈呀&#xff0c;好多函数&#xff0c;脑子有点乱 先运行下EXE看看有什么突破口没 可以看出是游戏&#xff0c;明显是看你的输入对不对&#xff0c;来通关的&#xff0c;所以有关判定的条件或者函数是解题…

基于机器学习随机森林算法的个人职业预测研究

1.背景调研 随着信息技术的飞速发展&#xff0c;特别是大数据和云计算技术的广泛应用&#xff0c;各行各业都积累了大量的数据。这些数据中蕴含着丰富的信息和模式&#xff0c;为利用机器学习进行职业预测提供了可能。机器学习算法的不断进步&#xff0c;如深度学习、强化学习等…