Spark 7:Spark SQL 函数定义

SparkSQL 定义UDF函数

8e9d7ddcc2f04a4fa3d0b7e78921b4f8.png

21cc4f7f05aa4c4d8e9db15c1d271cb1.png

方式1语法:
udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
方式2语法:
udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
其中F是:
from pyspark.sql import functions as F
其中,被注册成UDF的方法名是指具体的计算方法,如:
def add(x, y): x + y
add就是将要被注册成UDF的方法名

8dee25a8da254eec809eb0c8a5d95baf.png  

# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])df = rdd.toDF(["num"])# TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用# UDF的处理函数def num_ride_10(num):return num * 10# 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格# 参数2: UDF的处理逻辑, 是一个单独的方法# 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致# 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法# 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())# SQL风格中使用# selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)# select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算df.selectExpr("udf1(num)").show()# DSL 风格中使用# 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象df.select(udf2(df['num'])).show()# TODO 2: 方式2注册, 仅能用于DSL风格udf3 = F.udf(num_ride_10, IntegerType())df.select(udf3(df['num'])).show()df.selectExpr("udf3(num)").show()

2562d46c19ee47c3b20f47fea545fb36.png

af9b0504fc474fd8896cb20f93094922.png 

# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])df = rdd.toDF(["line"])# 注册UDF, UDF的执行函数定义def split_line(data):return data.split(" ")  # 返回值是一个Array对象# TODO1 方式1 构建UDFudf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))# DLS风格df.select(udf2(df['line'])).show()# SQL风格df.createTempView("lines")spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)# TODO 2 方式2的形式构建UDFudf3 = F.udf(split_line, ArrayType(StringType()))df.select(udf3(df['line'])).show(truncate=False)

0e2c268c421848cfbc639a80fc067838.png

fe0861021c4d4be5a8b0566697bf5905.png 

# coding:utf8
import string
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回# 比如传入1 我们返回 {"num":1, "letters": "a"}rdd = sc.parallelize([[1], [2], [3]])df = rdd.toDF(["num"])# 注册UDFdef process(data):return {"num": data, "letters": string.ascii_letters[data]}"""UDF的返回值是字典的话, 需要用StructType来接收"""udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\add("letters", StringType(), nullable=True))df.selectExpr("udf1(num)").show(truncate=False)df.select(udf1(df['num'])).show(truncate=False)

2034e0d567664867a1c76c153620c99e.png  

c5a05c06e0c24009abe4b20cdc81ed7e.png

 SparkSQL 使用窗口函数

f3cf9802e8084fc4bf2518765b500397.png

7d43ea9fba3c479094ff69bb62cd99da.png

# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
import string
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder. \appName("create df"). \master("local[*]"). \config("spark.sql.shuffle.partitions", "2"). \getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([('张三', 'class_1', 99),('王五', 'class_2', 35),('王三', 'class_3', 57),('王久', 'class_4', 12),('王丽', 'class_5', 99),('王娟', 'class_1', 90),('王军', 'class_2', 91),('王俊', 'class_3', 33),('王君', 'class_4', 55),('王珺', 'class_5', 66),('郑颖', 'class_1', 11),('郑辉', 'class_2', 33),('张丽', 'class_3', 36),('张张', 'class_4', 79),('黄凯', 'class_5', 90),('黄开', 'class_1', 90),('黄恺', 'class_2', 90),('王凯', 'class_3', 11),('王凯杰', 'class_1', 11),('王开杰', 'class_2', 3),('王景亮', 'class_3', 99)
])
schema = StructType().add("name", StringType()). \add("class", StringType()). \add("score", IntegerType())
df = rdd.toDF(schema)
# 窗口函数只用于SQL风格, 所以注册表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *, AVG(score) OVER() AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu
# 两个SQL的结果集进行整合而来
spark.sql("""
SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu GROUP BY class
# 两个SQL的结果集进行整合而来
# TODO 排序窗口
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, 
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()

SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF
UDF定义支持2种方式, 1:使用SparkSession对象构建. 2: 使用functions包中提供的UDF API构建. 要注意, 方式1可用DSL和SQL风格, 方式2 仅可用于DSL风格
SparkSQL支持窗口函数使用, 常用SQL中的窗口函数均支持, 如聚合窗口\排序窗口\NTILE分组窗口等

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58226.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

聊聊中南亚跨境电商的机遇与前景

随着工业转移,中南亚地区作为一个充满潜力的区域,正逐渐成为跨境电商领域的热点。这一地区包括印度、孟加拉国、巴基斯坦、斯里兰卡等国家,拥有庞大的人口、不断增长的中产阶级,以及逐步完善的数字基础设施,为跨境电商…

电子仓库预测水浸事件,他怎么做到的?

仓库环境中水浸事件可能导致严重的损失,不仅对货物造成损害,还可能影响设备的正常运行甚至威胁安全。 因此,为了应对这一挑战,引入一套完善的仓库水浸监控系统成为了不可或缺的措施。 客户案例 广东某电子公司是一家领先的电子设…

前端学习之轮播图

前端学习之轮播图 该案例涉及到定时器的使用&#xff0c;元素的活获取&#xff0c;函数的调用等知识的运用 显示图如下&#xff1a;可以点击图标跳转图片&#xff0c;也可以自动轮播 源码如下 <!DOCTYPE html> <html><head><meta charset"UTF-8&q…

C语言网络编程:实现自己的高性能网络框架

一般生产环境中最耗时的其实是业务逻辑处理。所以&#xff0c;是不是可以将处理业务逻辑的代码给拆出来丢到线程池中去执行。 比如像下面这样&#xff1a; ​我们事先创建好一堆worker线程&#xff0c;主线程accepter拿到一个连接上来的套接字&#xff0c;就从线程池中取出一个…

hive lateral view 实践记录(Array和Map数据类型)

目录 一、Array 1.建表并插入数据 2.lateral view explode 二、Map 1、建表并插入数据 2、lateral view explode() 3、查询数据 一、Array 1.建表并插入数据 正确插入数据&#xff1a; create table tmp.test_lateral_view_movie_230829(movie string,category array&…

[halcon] 局部图片保存 gen_circle 和 gen_rectangle2 对比 这怕不是bug吧

背景 我想实现一个功能&#xff0c;获取图片中瑕疵的位置&#xff0c;将瑕疵周边的一块区域抠图并保存。 上代码 一开始我代码这么写的&#xff1a; gen_circle (Rectangle, Row[i], Column[i], 256) reduce_domain(Image,Rectangle,GrayEllipse) crop_domain(GrayEllipse,…

购买服务器以及宝塔部署

1.买完服务器之后该做什么 服务器就是一个远程Linux。 1、在阿里云购买的&#xff0c;需要开通安全组设置&#xff1b;端口映射&#xff08;必须要在安全组映射&#xff09; 2.获取服务器的公网ip地址,修改实例名称和密码&#xff0c;第一次修改需要重启 成功连上 3.连接到服…

MR混合现实石油化工课堂情景实训教学演示

MR&#xff08;混合现实&#xff09;技术是一种结合了虚拟现实&#xff08;VR&#xff09;和增强现实&#xff08;AR&#xff09;优势的新型技术&#xff0c;在教育领域具有广阔的应用前景。在石油化工课堂中&#xff0c;MR混合现实情景实训教学的应用可以大大提高学生的学习效…

ELK之LogStash介绍及安装配置

一、logstash简介 集中、转换和存储数据 Logstash 是免费且开放的服务器端数据处理管道&#xff0c;能够从多个来源采集数据&#xff0c;转换数据&#xff0c;然后将数据发送到您最喜欢的“存储库”中。 Logstash 能够动态地采集、转换和传输数据&#xff0c;不受格式或复杂度的…

【MySQL系列】MySQL复合查询的学习 _ 多表查询 | 自连接 | 子查询 | 合并查询

「前言」文章内容大致是对MySQL复合查询的学习。 「归属专栏」MySQL 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 一、基本查询回顾二、多表查询三、自连接四、子查询4.1 单行子查询4.2 多行子查询4.3 多列子查询4.4 在from子句中使用子查询 五、合并查询 一、基本查询回顾…

数据结构:八种数据结构大全

数据结构 1.1 数据结构概述 数据结构是计算机存储、组织数据的方式&#xff1b;通常情况下&#xff0c;精心选择的数据结构可以带来更高的运行或者存储效率。数据结构的优良将直接影响着我们程序的性能&#xff1b;常用的数据结构有&#xff1a;数组&#xff08;Array&#xff…

基于Qt5开发图形界面——WiringPi调用Linux单板电脑IO

Qt5——WiringPi Qt5WiringPi示例教程 Qt5 Qt是一种跨平台的应用程序开发框架。它被广泛应用于图形用户界面&#xff08;GUI&#xff09;开发&#xff0c;可以用于构建桌面应用程序、移动应用程序和嵌入式应用程序。Qt提供了丰富的功能和工具&#xff0c;使开发人员可以快速、高…

上滑动导航栏手势桌面最近任务可见解密-千里马手把手带你搞定framework车载车机系统开发

建议先看另一篇blog&#xff1a; https://blog.csdn.net/learnframework/article/details/123032419 系统如何让桌面执行对应的onStart方法呢&#xff1f; 具体的堆栈显示如下&#xff1a; makeActiveIfNeeded:5788, ActivityRecord (com.android.server.wm) makeVisibleIfNe…

回归预测 | MATLAB实现GWO-ELM灰狼算法优化极限学习机多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现GWO-ELM灰狼算法优化极限学习机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现GWO-ELM灰狼算法优化极限学习机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本介绍程…

uniapp接入广告的问题总结

Uniapp官方解决方案 uni-app 支持接入uni-ad广告联盟&#xff0c;开发者可实现一次开发&#xff0c;多端变现。 uni-ad 支持开屏、信息流、激励视频、视频流、悬浮红包、推送等丰富的广告形式&#xff1b; uni-ad 聚合了全网所有主流广告源&#xff0c;包括腾讯优量汇、字节…

自动化测试(三):接口自动化pytest测试框架

文章目录 1. 接口自动化的实现2. 知识要点及实践2.1 requests.post传递的参数本质2.2 pytest单元测试框架2.2.1 pytest框架简介2.2.2 pytest装饰器2.2.3 断言、allure测试报告2.2.4 接口关联、封装改进YAML动态传参&#xff08;热加载&#xff09; 2.3 pytest接口封装&#xff…

【Vue3】transition 组件

1. 基础用法 <template><div class"content"><button click"flag !flag">switch</button><transition name"fade"><div v-if"flag" class"box"></div></transition><…

学习c++的第6天

#include <iostream> using namespace std; class Animal { public: virtual void perform()0; virtual ~Animal() { cout<<"Animal的析构函数"<<endl; } }; class Lion :public Animal { public : void perform() { cout<<"狮子…

政务大厅人员睡岗离岗玩手机识别算法

人员睡岗离岗玩手机识别算法通过pythonyolo系列网络框架算法模型&#xff0c;人员睡岗离岗玩手机识别算法利用图像识别和行为分析&#xff0c;识别出睡岗、离岗和玩手机等不符合规定的行为&#xff0c;并发出告警信号以提醒相关人员。Python是一种由Guido van Rossum开发的通用…

Unity引擎修改模型顶点色的工具

大家好&#xff0c;我是阿赵。   之前分享过怎样通过MaxScript在3DsMax里面修改模型的顶点色。不过由于很多时候顶点色的编辑需要根据在游戏引擎里面的实际情况和shader的情况来动态调整&#xff0c;所以如果能在引擎里面直接修改模型的顶点色&#xff0c;将会方便很多。于是…