PySpark——Python与大数据

一、Spark 与 PySpark

Apache Spark 是用于大规模数据( large-scala data )处理的统一( unified )分析引擎。简单来说, Spark 是一款分布式的计算框架,用于调度成百上千的服务器集群,计算 TB 、 PB 乃至 EB 级别的海量数据。

Spark 作为全球顶级的分布式计算框架,支持众多的编程语言进行开发,Python 语言是 Spark 重点支持的方向,PySpark 是由 Spark 官方开发的 Python 语言第三方库。

1.1 PySpark的安装

使用快捷键Win+R打开运行窗口,然后输入"cmd"并按下回车键,调出CMD命令窗口,在命令窗口中输入“ pip install pyspark ”,按下回车键,等待安装成功即可。(若无法直接顺利下载,可使用国内代理镜像网站(清华大学源)pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark)

1.2构建 PySpark 执行环境入口对象

SparkContext 类对象,是 PySpark 编程中一切功能的入口。想要使用 PySpark 库完成数据处理,首先需要构建一个执行环境入口对象,PySpark 的执行环境入口对象是类 SparkContext 的类对象。

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#上述写法是一种链式调用,等同于下面的写法
# conf=SparkConf()
# conf.setMaster('local[*]')
# conf.setAppName('test_spark_app')#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#打印PySpark的运行版本
print(sc.version)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/09 15:34:18 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/09 15:34:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

3.5.3

1.3 PySpark 的编程模型

PySpark 的编程,主要分为如下三大步骤:

PySpark 的编程模型 图1
PySpark 的编程模型 图2

通过 SparkContext 对象,完成数据输入,输入数据后得到 RDD 对象,对 RDD 对象进行迭代计算,最终通过 RDD 对象的成员方法,完成数据输出工作。

二、数据输入

2.1 RDD对象

PySpark 支持多种数据的输入,在输入完成后,都会得到一个 RDD 类的对象,RDD 全称为弹性分布式数据集( Resilient Distributed Datasets )。
为什么要使用RDD对象呢?因为PySpark 针对数据的处理,都是以 RDD 对象作为载体,即:

  • 数据存储在 RDD 内
  • 各类数据的计算方法也都是 RDD 的成员方法
  • RDD 的数据计算方法,返回值依旧是 RDD 对象

可以结合 -PySpark 的编程模型 图2- 理解。

2.2数据转换为RDD对象

2.2.1 Python 数据容器转换为 RDD 对象

PySpark 支持通过 SparkContext 对象的 parallelize 成员方法,将Python 数据容器( list、tuple、 set、 dict、str)转换为 PySpark 的 RDD 对象。

语法:rdd=SparkContext类对象.parallelize(Python 数据容器)

代码示例如下:

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#数据输入——Python 数据容器转 RDD 对象
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize((1,2,3,4,5))
rdd3=sc.parallelize('my heart go on')
rdd4=sc.parallelize({'a',2,4,'abc'})
rdd5=sc.parallelize({1:'a',2:'b',3:'c'})
#输出RDD的内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 15:01:20 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 15:01:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
['m', 'y', ' ', 'h', 'e', 'a', 'r', 't', ' ', 'g', 'o', ' ', 'o', 'n']
[2, 'a', 4, 'abc']
[1, 2, 3]

由输出结果可知:

  • 字符串会被拆分成 一个一个的字符,存入 RDD 对象
  • 字典中仅有 key 会被存入 RDD 对象

2.2.2文件转换为 RDD 对象

PySpark 支持通过 SparkContext 的 textFile 成员方法,读取文本文件,转换成 RDD 对象。

语法:rdd=SparkContext类对象.textFile(文件路径)

我们新建一个文件“hello.txt”,在其中写入一句“Say goodbye to all your troubles”,然后保存。

读取文件“hello.txt”,将其转换成 RDD 对象,代码如下:

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#数据输入——文件数据转 RDD 对象
rdd=sc.textFile('E:/可视化案例数据/hello.txt')
#输出RDD的内容
print(rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 15:31:40 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 15:31:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['Say goodbye to all your troubles']

三、数据计算

PySpark 的数据计算,都是基于 RDD 对象来进行的,依赖 RDD 对象内置丰富的成员方法,也称为算子,本节主要介绍以下6种算子。

3.1 map算子

功能:对 RDD内的数据逐个处理,处理的具体步骤基于其接收的处理函数,处理完成后返回一个新的RDD。

语法:rdd.map ( func )

f : (T)  ->  U 表示这是一个函数(方法),接收一个参数传入,参数类型不限,返回一个返回值,返回值类型不限。

f : (T)  ->  T 表示这是一个函数(方法),接收一个参数传入,参数类型不限,返回一个返回值,返回值类型和传入参数类型一致。

写好代码后直接运行会报错“SparkException: Python worker failed to connect back.”,简单来说就是spark找不到python在哪里,我们需要通过OS模块设置一个环境变量来告诉spark python在哪里,代码如下所示,environ是一个字典,key为“PYSPARK_PYTHON”,value为python.exe的文件路径(因人而异,安装在哪里就写哪里)。

import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'

利用map算子将RDD中的数据扩大10倍,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
def func(x):return x*10
rdd2=rdd1.map(func)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

也可以使用 lambda匿名函数,让代码更加简洁高效:

 lambda匿名函数语法:lambda  传入参数:函数体(只能写一行代码)

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
rdd2=rdd1.map(lambda data:data*10)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

两种方法输出结果相同:

24/11/10 17:20:52 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 17:20:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[10, 20, 30, 40, 50]

如果函数体复杂,用直接定义方式写比较方便,如果函数体一行就能写完,用lambda匿名函数写比较方便。

如果需要经历数次处理,可以通过链式调用的方式多次调用算子,更为简便:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
def func1(x):return x*10
def func2(x):return x+10
def func3(x):return x-2
rdd2=rdd1.map(func1).map(func2).map(func3) #链式调用map算子
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 17:29:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 17:29:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[18, 28, 38, 48, 58]

注意:此处的函数func1,func2,func3只是为了演示链式调用,其中的操作很简单,可以合并。遇到无法合并的函数又需要调用多次map算子时,可以使用链式调用。

3.2 flatMap算子

功能:对RDD执行map操作,然后进行解除嵌套操作。

语法:rdd.flatMap ( func )

将数据 ['my heart','go on'] 转换成 ['my' , 'heart' , 'go' , 'on'] 这种格式,使用字符串分割函数split,按照空格切分数据,然后解除嵌套。

字符串分割函数split语法:字符串.split(分隔符字符串)    

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd3=sc.parallelize(['my heart','go on'])
print(rdd3.collect())
#按照空格切分数据,然后解除嵌套
rdd4=rdd3.flatMap(lambda x:x.split(' '))
print(rdd4.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 18:32:16 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 18:32:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['my heart','go on']
['my', 'heart', 'go', 'on']

flatMap 算子计算逻辑和map算子 一样,只是比map算子多出一层解除嵌套的功能。

3.3 reduceByKey算子

功能:针对KV型RDD,自动按照Key分组,然后根据提供的聚合逻辑(函数),完成对组内数据(value)的聚合操作。

语法:rdd.reduceByKey ( func )  #func为聚合函数

reduceByKey的聚合逻辑

统计数据[('a',1),('a',1),('b',1),('b',1),('b',1)]中a和b的出现次数,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd5=sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
rdd6=rdd5.reduceByKey(lambda a,b:a+b)
print(rdd6.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 19:20:56 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 19:20:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('a', 2), ('b', 3)]

注意:rdd.reduceByKey ( func )中,func只负责聚合,分组是自动ByKey来分组的。

图解

3.4 filter算子

功能:过滤数据,即对 RDD 数据逐个进行处理,返回值为True的数据被保留,返回值为False的数据被丢弃。

语法:rdd.filter ( func )

过滤掉数据[1,2,3,4,5,6]中的偶数,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5,6])
print(rdd1.collect())
#过滤掉数据中的偶数
rdd2=rdd1.filter(lambda x:x%2==1)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:20:46 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:20:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1,2,3,4,5,6]

[1, 3, 5]

3.5 distinct算子

功能:对 RDD 内数据去重。

语法:rdd.distinct() #无需传参

对数据[1,2,1,4,6,6]进行去重,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,1,4,6,6])
print(rdd1.collect())
#去重
rdd2=rdd1.distinct()
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:29:46 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:29:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 1, 4, 6, 6]
[1, 2, 4, 6]

3.6 sortBy算子

功能:对 RDD 数据进行排序,基于指定的排序依据(由函数规定)。

语法:rdd.sortBy(func,ascending,numPartitions) 

对数据[('苹果',6),('香蕉',3),('橙子',4),('西瓜',9),('火龙果',10)]进行排序,按照value值进行排序(降序),代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([('苹果',6),('香蕉',3),('橙子',4),('西瓜',9),('火龙果',10)])
print(rdd1.collect())
#按照value值进行排序(降序)
rdd2=rdd1.sortBy(lambda x:x[1],ascending=False,numPartitions=1) #此处设置分区数量为1,非要点,无需理解
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 22:05:04 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 22:05:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('苹果', 6), ('香蕉', 3), ('橙子', 4), ('西瓜', 9), ('火龙果', 10)]
[('火龙果', 10), ('西瓜', 9), ('苹果', 6), ('橙子', 4), ('香蕉', 3)]

四、数据输出

4.1输出为 Python 对象

数据输出为 Python 对象的方法是很多的,本小节简单介绍了以下 4 个。

4.1.1 collect算子

功能:将RDD各个分区内的数据统一收集到Driver中,返回一个list对象。

语法:rdd.collect()#无须传参

代码验证:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.collect())
print(type(rdd.collect()))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:33:52 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:33:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
<class 'list'>

4.1.2 reduce算子

功能:对RDD数据集按照传入的逻辑进行聚合

语法:rdd.reduce(func)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.reduce(lambda a,b:a*b))
print(type(rdd.reduce(lambda a,b:a*b)))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:52:33 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:52:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

120
<class 'int'>

图解
​​​​

4.1.3 take算子

功能:取RDD的前N个元素,组合成列表返回。

语法:rdd.take(N)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.take(3))
print(type(rdd.take(3)))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:41:18 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:41:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3]
<class 'list'>

4.1.4 count算子

功能:统计RDD内有多少条数据,返回一个数字

语法:rdd.count() #无须传参

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.count())
print(type(rdd.count()))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:44:34 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:44:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

5
<class 'int'>

4.2输出到文件中

将数据输出到文件中的方法本小节主要介绍了saveAsTextFile算子。

saveAsTextFile算子功能:将RDD中的数据写入文本文件中。

语法:rdd.saveAsTextFile(文件路径)

注意:调用saveAsTextFile这类保存文件的算子,需要配置 Hadoop 依赖,步骤如下:

  1. 下载 Hadoop 安装包,解压到电脑任意位置。(文件链接: https://pan.baidu.com/s/1HHEE2oIp2kYohTpObgVN8g?pwd=mwdy 提取码: mwdy 复制这段内容后打开百度网盘手机App,操作更方便哦)
  2. 在 Python 代码中使用 os 模块配置: os.environ[‘HADOOP_HOME’] = ‘HADOOP 解压文件夹路径’
  3. 下载 winutils.exe ,并放入 Hadoop 解压文件夹的 bin 目录内(文件链接: https://pan.baidu.com/s/1z67mUP3yrci93uLOXAuOxg?pwd=txv2 提取码: txv2 复制这段内容后打开百度网盘手机App,操作更方便哦)
  4. 下载 hadoop.dll ,并放入 :C:/Windows/System32 文件夹内(文件链接: https://pan.baidu.com/s/15vA0o6j8W9f5kNaEYRopDA?pwd=pbww 提取码: pbww 复制这段内容后打开百度网盘手机App,操作更方便哦)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
os.environ[‘HADOOP_HOME’] = ‘E:/HADOOP’#具体看个人的HADOOP保存路径
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5],numSlices=1)#修改 rdd 分区为 1 个,便于文件输出演示
#输出到文件file_test.txt中
rdd.saveAsTextFile('E:/可视化案例数据/file_test.txt')
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行代码后,打开该文件路径,发现文件“file_test.txt”已被建立,且通过右侧的文件预览可见文件内容与代码内容一致,如下图所示:

五、案例

4.1统计出现的单词数量

我们新建一个文件“test.txt”,在其中写入如下图所示的内容,然后保存。

案例要求:统计文件内所出现的单词的数量。

思路:

  1. 利用flatMap算子,将单词依次独立地取出。
  2. 利用map算子,给所有单词加上value(值为1),单词本身作为key。
  3. 利用reduceByKey算子,分组聚合,从而得出每个单词的出现总数。

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取数据文件
file=sc.textFile('E:/可视化案例数据/test.txt')
#将单词依次独立地取出
words_rdd=file.flatMap(lambda line:line.split(' '))
print(words_rdd.collect())
#给所有单词加上value(值为1),单词本身作为key
words_with_one_rdd=words_rdd.map(lambda x:(x,1))
print(words_with_one_rdd.collect())
#分组聚合
result_rdd=words_with_one_rdd.reduceByKey(lambda a,b:a+b)
print(result_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:02:57 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:02:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['goodbye', 'to', 'troubles', 'comes', 'to', 'comes', 'to', 'ease', 'troubles', 'comes', 'to', 'ease', 'goodbye']
[('goodbye', 1), ('to', 1), ('troubles', 1), ('comes', 1), ('to', 1), ('comes', 1), ('to', 1), ('ease', 1), ('troubles', 1), ('comes', 1), ('to', 1), ('ease', 1), ('goodbye', 1)]
[('goodbye', 2), ('troubles', 2), ('to', 4), ('comes', 3), ('ease', 2)]

4.2销售数据计算

读取文件“orders.txt”中的数据,并按要求进行计算。

文件链接: https://pan.baidu.com/s/1rEiJm3-BNZeo5MrzXiSZnw?pwd=g3a6 提取码: g3a6 复制这段内容后打开百度网盘手机App,操作更方便哦

案例要求:

  1. 各个城市销售额排名,从大到小
  2. 全部城市,有哪些商品类别在售卖
  3. 北京市有哪些商品类别在售卖

1.各个城市销售额排名,从大到小

如上图所示,文件数据是json格式,但并不是每个json数据都独占一行,其中有些json数据之间用“|”隔开了,我们要先利用flatMap算子,接收字符串分割函数,将json数据独立地分隔开。然后利用map算子将json数据转换成python字典,数据初步处理完成,

文件内同一城市的不同商品类别的销售数据是分散的,我们需要先取出城市和销售额数据,然后利用reduceByKey算子分组聚合,聚合同一城市不同商品的销售额,最后利用sortBy算子排序。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出城市和销售额数据
city_with_money_rdd=dict_rdd.map(lambda x:(x['areaName'],int(x['money']))) #注意,文件内money数据是字符串格式,为了满足排序的需求,必须将其强制转换为int类型
#聚合同一城市不同商品的销售额
city_result_money_rdd=city_with_money_rdd.reduceByKey(lambda a,b:a+b)
#按销售额排序,从大到小
result1_rdd=city_result_money_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(result1_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 20:54:15 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 20:54:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('北京', 91556), ('杭州', 28831), ('天津', 12260), ('上海', 1513), ('郑州', 1120)]

2.全部城市,有哪些商品类别在售卖

从文件中可以看出,同一商品可能多个城市都在售卖,那取出全部商品类别数据后,可能存在重复情况,就需要利用distinct算子去重。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出全部商品类别数据
category_rdd=dict_rdd.map(lambda x:x['category'])
#去重
result2_rdd=category_rdd.distinct()
print(result2_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:11:54 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:11:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']

3.北京市有哪些商品类别在售卖

首先利用filter算子过滤掉areaName不是北京的数据,因为北京同一商品类别不同时间的数据也而不同,需要去重。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出北京市的数据
BeiJing_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')
#取出北京市的商品类别数据并去重
result3_rdd=BeiJing_rdd.map(lambda x:x['category']).distinct() #链式调用
print(result3_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:23:00 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:23:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']

4.3搜索引擎日志分析

文件链接: https://pan.baidu.com/s/1y8Ydf2JErNWfIUU5jWMJ7Q?pwd=m81n 提取码: m81n 复制这段内容后打开百度网盘手机App,操作更方便哦

文件“search_log.txt”数据格式如下图所示:

案例要求:

  1. 打印输出:热门搜索时间段(小时精度) Top3
  2. 打印输出:热门搜索词 Top3
  3. 打印输出:统计黑马程序员关键字在哪个时段被搜索最多

1.打印输出:热门搜索时间段(小时精度) Top3

​​​​​​​思路如下:

  • ​​​​​​​从文件中可以看出每列数据之间使用“\t”对齐,利用字符串分割函数取出第一列时间数据。
  • 案例要求小时精度,时间数据格式为【小时:分钟:秒】,我们只需要小时,利用数据容器的切片操作取前两位即可。
  • 将数据转换成(小时,1)的二元元组形式,然后分组聚合,按降序从高到低排序。
  • 取前3个小时时间段即为“热门搜索时间段(小时精度) Top3”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result1=file_rdd.map(lambda x:x.split('\t')).map(lambda x:x[0][:2]).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],ascending=False,numPartitions=1).take(3)
print(result1)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/14 14:25:49 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 14:25:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('20', 3479), ('23', 3087), ('21', 2989)]

我们得到了搜索次数最多的时间段:晚上8点、晚上11点、晚上9点。

代码可以进一步改进:

改进后的代码:

​
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result1=file_rdd.map(lambda x:(x.split('\t')[0][:2],1)).\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(3)
print(result1)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()​

2.打印输出:热门搜索词 Top3

​​​​​​​思路如下:

  • 从文件中可以看出每列数据之间使用“\t”对齐,利用字符串分割函数取出第三列热搜词数据。
  • 将数据转换成(热搜词,1)的二元元组形式,然后分组聚合,按降序从高到低排序。
  • 取前3个数据即为“热门搜索词 Top3”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result2=file_rdd.map(lambda x:x.split('\t')).\map(lambda x:x[2]).\map(lambda x:(x,1)).\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(3)
print(result2)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

​​​​​​​输出:

24/11/14 15:02:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 15:02:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('scala', 2310), ('hadoop', 2268), ('博学谷', 2002)]

我们得到了被检索次数最多的3个热搜词及其具体被检索次数。

此处依然可以简化3个map算子,但为了代码易读性,此处不再进行简化,简化可看下图:

3.打印输出:统计黑马程序员关键字在哪个时段(小时精度)被搜索最多

思路如下:

  • 从文件中可以看出每列数据之间使用“\t”对齐利用字符串分割函数取出第一列时间数据和第三列关键字数据。案例要求小时精度,时间数据格式为【小时:分钟:秒】,我们只需要小时,利用数据容器的切片操作取前两位即可。
  • 过滤出关键词=黑马程序员的数据,过滤完成后不再需要关键字数据,将数据转换成(时间,1)的二元元组形式,然后分组聚合,统计不同时间段内关键词黑马程序员被搜索的次数。
  • 最后按降序从高到低排序,取第一个数据即为“黑马程序员关键字被搜索次数最多的时段”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result3=file_rdd.map(lambda x:x.split('\t')).\map(lambda x:(x[0][:2],x[2])).\filter(lambda x:x[1]=='黑马程序员').\map(lambda x:(x[0],1)).\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(1)
print(result3)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

​​​​​​​输出:

24/11/14 15:21:20 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 15:21:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('22', 245)]

黑马程序员关键字在晚上10点这个时间段被检索次数最多,被检索次数为245次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/61041.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法日记 26-27day 贪心算法

接下来的题目有些地方比较相似。需要注意多个条件。 题目&#xff1a;分发糖果 135. 分发糖果 - 力扣&#xff08;LeetCode&#xff09; n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每…

Python绘制雪花

文章目录 系列目录写在前面技术需求完整代码代码分析1. 代码初始化部分分析2. 雪花绘制核心逻辑分析3. 窗口保持部分分析4. 美学与几何特点总结 写在后面 系列目录 序号直达链接爱心系列1Python制作一个无法拒绝的表白界面2Python满屏飘字表白代码3Python无限弹窗满屏表白代码4…

【一键整合包及教程】AI照片数字人工具EchoMimic技术解析

在数字化时代&#xff0c;人工智能&#xff08;AI&#xff09;正以前所未有的速度改变着我们的生活。EchoMimic&#xff0c;作为蚂蚁集团旗下支付宝推出的开源项目&#xff0c;不仅为数字人技术的发展掀开了新的一页&#xff0c;更为娱乐、教育、虚拟现实、在线会议等多个领域带…

STM32中断系统

目录 一、中断的基本概念 二、NVIC 1.NVIC的概念 2、NVIC的组成 3、NVIC的应用 4.NVIC的结构 三、外部中断EXTI 1.外部中断的概念 2.EXTI基本结构 四、EXTI外部中断的配置流程 1.开启APB2中的GPIO口/AFIO时钟 2.GPIO配置成输入模式 3.AFIO选择中断引脚 4.EXTI初始…

解锁远程AI工作流:Flowise搭配cpolar跨地域管理AI项目

文章目录 前言1. Docker安装Flowise2. Ubuntu安装Cpolar3. 配置Flowise公网地址4. 远程访问Flowise5. 固定Cpolar公网地址6. 固定地址访问 前言 如今&#xff0c;工作流自动化与人工智能的结合已成为提升生产力的重要手段。Flowise正是这样一个工具&#xff0c;通过直观的拖拽…

Cyberchef配合Wireshark提取并解析HTTP/TLS流量数据包中的文件

本文将介绍一种手动的轻量级的方式&#xff0c;还原HTTP/TLS协议中传输的文件&#xff0c;为流量数据包中的文件分析提供帮助。 如果捕获的数据包中存在非文本类文件&#xff0c;例如png,jpg等图片文件&#xff0c;或者word&#xff0c;Excel等office文件异或是其他类型的二进…

MATLAB用CNN-LSTM神经网络的语音情感分类深度学习研究

全文链接&#xff1a;https://tecdat.cn/?p38258 在语音处理领域&#xff0c;对语音情感的分类是一个重要的研究方向。本文将介绍如何通过结合二维卷积神经网络&#xff08;2 - D CNN&#xff09;和长短期记忆网络&#xff08;LSTM&#xff09;构建一个用于语音分类任务的网络…

Android笔记(三十七):封装一个RecyclerView Item曝光工具——用于埋点上报

背景 项目中首页列表页需要统计每个item的曝光情况&#xff0c;给产品运营提供数据报表分析用户行为&#xff0c;于是封装了一个通用的列表Item曝光工具&#xff0c;方便曝光埋点上报 源码分析 核心就是监听RecyclerView的滚动&#xff0c;在滚动状态为SCROLL_STATE_IDLE的时…

Minikube 上安装 Argo Workflow

文章目录 步骤 1&#xff1a;启动 Minikube 集群步骤 2&#xff1a;安装Argo Workflow步骤 3&#xff1a;访问UI创建流水线任务参考 前提条件&#xff1a; Minikube&#xff1a;确保你已经安装并启动了 Minikube。 kubectl&#xff1a;确保你已经安装并配置了 kubectl&#xff…

GCP Cloud Storage 的lock retention policy是什么

简介 Google Cloud Storage 的锁定保留策略&#xff08;Lock Retention Policy&#xff09;是一种用于保护存储桶中对象数据的功能。它允许用户设置一个保留期&#xff0c;在此期间对象不能被删除或覆盖。这对于确保数据的长期保留和合规性非常重要&#xff0c;尤其是在需要满…

STM32设计防丢防摔智能行李箱

目录 目录 前言 一、本设计主要实现哪些很“开门”功能&#xff1f; 二、电路设计原理图 1.电路图采用Altium Designer进行设计&#xff1a; 2.实物展示图片 三、程序源代码设计 四、获取资料内容 前言 随着科技的不断发展&#xff0c;嵌入式系统、物联网技术、智能设备…

CSP/信奥赛C++语法基础刷题训练(11):洛谷P5743:猴子吃桃

CSP/信奥赛C语法基础刷题训练&#xff08;11&#xff09;&#xff1a;洛谷P5743&#xff1a;猴子吃桃 题目描述 一只小猴买了若干个桃子。第一天他刚好吃了这些桃子的一半&#xff0c;又贪嘴多吃了一个&#xff1b;接下来的每一天它都会吃剩余的桃子的一半外加一个。第 n n n…

控制器ThinkPHP6

五、控制器中对数组值的返回 在做接口服务时&#xff0c;很多时候回使用数组作为返回值&#xff0c;那么数组如何返回成 json呢&#xff1f; 在 tp6 中返回json 很简单&#xff0c;直接使用 json 进行返回即可&#xff0c;例如&#xff1a; public function index(){$resarra…

洛谷刷题日记||基础篇8

#include <iostream> #include <vector> using namespace std;int N, M; // N为行数&#xff0c;M为列数 vector<vector<char>> field; // 表示田地的网格&#xff0c;每个元素是W或. vector<vector<bool>> visited; // 用来记录网格是否访…

Qwen2.5-3B-Instruct-GGUF部署

注册账号&#xff1a; 魔搭社区 等一会&#xff1a; 部署好了&#xff1a; 立即使用&#xff1a; 您部署的服务提供OpenAI API接口&#xff0c;可通过OpenAI SDK进行调用。请确保您的服务处于正常运行状态&#xff0c;并预先安装OpenAI SDK: pip install openai 在本地新建…

恶意PDF文档分析记录

0x1 PDF是什么 PDF&#xff08;便携式文件格式&#xff0c;Portable Document Format&#xff09;是由Adobe Systems在1993年用於文件交换所发展出的文件格式。 因为PDF的文件格式性质广泛用于商业办公&#xff0c;引起众多攻击者对其开展技术研究&#xff0c;在一些APT&#…

Spring-事务学习

spring事务 1. 什么是事务? 事务其实是一个并发控制单位&#xff0c;是用户定义的一个操作序列&#xff0c;这些操作要么全部完成&#xff0c;要不全部不完成&#xff0c;是一个不可分割的工作单位。事务有 ACID 四个特性&#xff0c;即&#xff1a; 原子性&#xff08;Atom…

CVE-2024-2961漏洞的简单学习

简单介绍 PHP利用glibc iconv()中的一个缓冲区溢出漏洞&#xff0c;实现将文件读取提升为任意命令执行漏洞 在php读取文件的时候可以使用 php://filter伪协议利用 iconv 函数, 从而可以利用该漏洞进行 RCE 漏洞的利用场景 PHP的所有标准文件读取操作都受到了影响&#xff1…

段探测的研究

在介绍今天的内容之前&#xff0c;我们先要知道一些前置的知识 跳过繁琐的介绍&#xff0c;我们单刀直入&#xff0c;介绍一个划时代的CPU 8086 8086 从8086开始&#xff0c;CPU扩展到了16位&#xff0c;地址的位宽扩展到了20位&#xff0c;自此之后我们现在所熟知的计算机结…

Linux:进程的优先级 进程切换

文章目录 前言一、进程优先级1.1 基本概念1.2 查看系统进程1.3 PRI和NI1.4 调整优先级1.4.1 top命令1.4.2 nice命令1.4.3 renice命令 二、进程切换2.1 补充概念2.2 进程的运行和切换步骤&#xff08;重要&#xff09; 二、Linux2.6内核进程O(1)调度队列&#xff08;重要&#x…