Spark SQL的高级用法

一. 快速生成多行的序列

需求:请生成一列数据, 内容为 1 , 2 , 3 , 4 ,5

-- 快速生成多行的序列
-- 方式一
select explode(split("1,2,3,4,5",","));
--方式二
/*序列函数sequence(start,stop,step):生成指定返回的列表数据[start,stop]必须传入,step步长可传可不传,默认为1,也可以传入负数,传入负数的时候,大数要在前,小数*/
select explode(sequence(1,5));
select explode(sequence(1,5,1));
select explode(sequence(1,5,2));
select explode(sequence(5,1,-1));
select explode(sequence(5,1,-2));

二. 快速生成表数据

需求: 生成一个两行两列的数据, 第一行放置 男 M 第二行放置 女 F

-- 快速生成表数据
/*stack(n,expr1, ..., exprk),n代表要分为n行,expr1, ..., exprk是放入每一行每一列的元素如果不传入列名,则默认使用col0,col1等作为列名*/
select stack(2,"男","M","女","F");
select stack(2,"男","M","女","F") as (n,v);

三. 如何将一个SQL的结果给到另外一个SQL进行使用

3.1 视图

临时视图关键字:temporary

  1. 分为永久视图临时视图
  2. 相同点:都不会真正的存储数据。主要是用来简化SQL语句
  3. 不同点:永久试图会创建元数据,在多个会话(Session)中都有效;临时视图只在当前会话有效

3.2 视图和表的区别

视图不会真正的存储数据,而表会真正的存储数据。
但是视图和表在使用的时候区别不大

-- 如何将一个SQL的结果给到另外一个SQL进行使用
-- 方式一:子查询
select*
from (select stack(2,"男","M","女","F"));-- 方式二:子查询
with tmp as (select stack(2,"男","M","女","F")
) select * from tmp;-- 方式三:永久视图
create view forever_view as
select stack(2,"男","M","女","F");select * from forever_view;-- 方式四:临时视图
create temporary view tmp_view as
select stack(2,"男","M","女","F");select * from tmp_view;-- 方式五:创建表
create table tb as
select stack(2,"男","M","女","F");select * from tb;-- 缓存表:类似Spark Core中的缓存,提高数据分析效率
cache table cache_tb as
select stack(2,"男","M","女","F");-- 查询缓存表
select * from cache_tb;-- 清理指定缓存
uncache table cache_tb;select * from cache_tb;-- 清空所有的缓存
clear cache;

四. 窗口函数

格式:
分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])

分析函数的分类:
1- 第一类: 排序函数。row_number() rank() dense_rank() ntile()

1、都是用来编号的
2、如果出现了重复(针对order by中的字段内容)数据
2.1- row_number:不管有没有重复,从1开始依次递增进行编号
2.2- rank():如果数据重复,编号相同,并且会占用后续的编号
2.3- dense_rank():如果数据重复,编号相同,但是不会占用后续的编号
2.4- ntile(n):将数据分为n个桶,不传入参数默认为1

2- 第二类: 聚合函数。sum() avg() count() max() min()…

1、可以通过窗口函数实现级联求各种值的操作。当后续遇到需要在计算的时候,将当前行或者之前之后的数据关联起来计算的情况,可以使用窗口函数。
2、如果没有排序字段,也就是没有order by语句,直接将窗口打开到最大,整个窗口内的数据全部被计算,不管执行到哪一行,都是针对整个窗口内的数据进行计算。
3、如果有排序字段,并且还存在重复数据的情况,默认会将重复范围内的数据放到一个窗口中计算
4、可以通过rows between xxx and xxx来限定窗口的统计数据范围
4.1- unbounded preceding: 从窗口的最开始
4.2- N preceding: 当前行的前N行,例如1 preceding、2 preceding
4.3- current row: 当前行
4.4- unbounded following: 到窗口的最末尾
4.5- N following: 当前行的后N行,例如1 following、2 following

3- 第三类: 取值函数。lead() lag() first_value() last_value()

-- 准备数据
create temporary view t1 (cookie,datestr,pv) as
values('cookie1','2018-04-10',1),('cookie1','2018-04-11',5),('cookie1','2018-04-12',7),('cookie1','2018-04-13',3),('cookie1','2018-04-14',2),('cookie1','2018-04-15',4),('cookie1','2018-04-16',4),('cookie2','2018-04-10',2),('cookie2','2018-04-11',3),('cookie2','2018-04-12',5),('cookie2','2018-04-13',6),('cookie2','2018-04-14',3),('cookie2','2018-04-15',9),('cookie2','2018-04-16',7);select * from t1;
-- 1- 第一类: 排序函数。row_number() rank() dense_rank() ntile()
selectcookie,pv,row_number() over (partition by cookie order by pv desc) as rs1,rank() over (partition by cookie order by pv desc) as rs2,dense_rank() over (partition by cookie order by pv desc) as rs3,ntile() over (partition by cookie order by pv desc) as rs4
from t1;
-- 2- 第二类: 聚合函数。sum() avg() count() max() min()...
selectcookie,pv,-- 一次性直接将窗口打开到最大sum(pv) over(partition by cookie) as rs1,-- 依次慢慢打开窗口,如果数据相同,直接放到同一个窗口中sum(pv) over(partition by cookie order by pv) as rs2,-- 依次慢慢打开窗口,限定窗口的统计范围从窗口的最开始到当前行sum(pv) over(partition by cookie order by pv rows between unbounded preceding and current row) as rs3,-- 以当前行为中心,往前推一行。也就是从上一行计算到当前行sum(pv) over(partition by cookie order by pv rows between 1 preceding and current row ) as rs4,-- 从窗口的最开始一直统计到窗口的最终结尾sum(pv) over(partition by cookie order by pv rows between unbounded preceding and unbounded following) as rs5,-- 从当前行统计到窗口的结尾sum(pv) over(partition by cookie order by pv rows between current row and unbounded following) as rs6,-- 以当前行为中心,统计上一行、当前行、下一行总共3行的数据sum(pv) over(partition by cookie order by pv rows between 1 preceding and 1 following) as rs7,sum(pv) over(partition by cookie order by pv rows between 2 preceding and 3 following) as rs8
from t1;-- 3- 第三类: 取值函数。lead() lag() first_value() last_value()
selectcookie,pv,-- 默认取下一行数据lead(pv) over(partition by cookie order by pv) as rs1,-- 默认取上一行数据lag(pv) over(partition by cookie order by pv) as rs2,-- 默认取窗口内的第一条数据first_value(pv) over(partition by cookie order by pv) as rs3,-- 默认取窗口内的最后一条数据last_value(pv) over(partition by cookie order by pv) as rs4
from t1;

五. 横向迭代

/*
需求: 已知 c1列数据, 计算出 c2 和 c3列数据
c2 = c1+2
c3=c1*(c2+3)*/
-- 数据准备
select explode(sequence(1,3));
select stack(3,1,2,3);-- 方式一:子查询
-- 计算c2
with t1 as (select explode(sequence(1,3)) as c1
)select c1,(c1+2) as c2 from t1;
-- 计算c3
with t1 as (select explode(sequence(1,3)) as c1
)
select c1,c2,c1*(c2+3) as c3 from
(select c1,(c1+2) as c2 from t1);-- 方式二:视图方式
-- 准备数据
create temporary view view_t1 as
select explode(sequence(1,3)) as c1;select * from view_t1;
-- 计算c2并创建视图
create temporary view view_t2 as
select c1,(c1+2) as c2 from view_t1;select * from view_t2;
-- 计算c3并创建视图
create temporary view view_t3 as
select c1,c2,c1*(c2+3) as c3 from view_t2;select * from view_t3;

六. 纵向迭代

需求: 计算 c4:

计算逻辑: 当c2=1 , 则 c4=1 ; 否则 c4 = (上一个c4 + 当前的c3)/2
在这里插入图片描述

-- 数据准备
create temporary view view_data (c1,c2,c3)
as values
(1,1,6),
(1,2,23),
(1,3,8),
(1,4,4),
(1,5,10),
(2,1,23),
(2,2,14),
(2,3,17),
(2,4,20);select * from view_data;

方式一:创建临时视图继续计算c4的值,对于练习阶段数据量小还行,即使是数量小,也有很多重复代码,所以对于以后海量数据的计算,这种方法显然是不合理的。

--方式一:
-- 步骤一:当c2=1 , 则 c4=1
create temporary view col_tmp1 as
select c1,c2,c3,if(c2=1,1,null)as c4 from view_data;select * from col_tmp1;-- 步骤二:否则 c4 = (上一个c4 +  当前的c3)/2
create temporary view col_tmp2 as
select
c1,c2,c3,
if(c2=1,1,((lag(c4) over (partition by c1 order by c2))+c3)/2) as c4
from col_tmp1;select * from col_tmp2;create temporary view col_tmp3 as
select
c1,c2,c3,
if(c2=1,1,((lag(c4) over (partition by c1 order by c2))+c3)/2) as c4
from col_tmp2;select * from col_tmp3;create temporary view col_tmp4 as
select
c1,c2,c3,
if(c2=1,1,((lag(c4) over (partition by c1 order by c2))+c3)/2) as c4
from col_tmp3;select * from col_tmp4;create temporary view col_tmp5 as
select
c1,c2,c3,
if(c2=1,1,((lag(c4) over (partition by c1 order by c2))+c3)/2) as c4
from col_tmp4;select * from col_tmp5;

方式二:基于pandas进行自定义聚合函数(UDAF)操作

#!/usr/bin/env python
# @desc : 
__coding__ = "utf-8"
__author__ = "bytedance"import pyspark.sql.functions as F
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('sparksql_udaf')\.master('local[*]')\.getOrCreate()# 2- 数据输入spark.sql("""create temporary view view_data (c1,c2,c3)as values(1,1,6),(1,2,23),(1,3,8),(1,4,4),(1,5,10),(2,1,23),(2,2,14),(2,3,17),(2,4,20)""")# 3- 数据处理# 3.1- 当c2=1 , 则 c4=1spark.sql("""create temporary view heng_tmp_1 asselectc1,c2,c3,if(c2=1,1,null) as c4from view_data""")spark.sql("""select * from heng_tmp_1""").show()# 3.2- 否则 c4 = (上一个c4 +  当前的c3)/2# 3.2.1- 基于Pandas实现UDAF函数,创建自定义的Python函数# 3.2.2- 注册进SparkSQL中# @F.pandas_udf(returnType=FloatType())@F.pandas_udf(returnType="float")def c4_udaf_func(c3:pd.Series, c4:pd.Series) -> float:print(f"{c3}")print(f"{c4}")tmp_c4 = Nonefor i in range(0,len(c3)):if i==0:tmp_c4 = c4[i] # c4[0]else:tmp_c4 = (tmp_c4 + c3[i]) / 2return tmp_c4spark.udf.register("c4_udaf",c4_udaf_func)spark.sql("""select c1,c2,c3,c4_udaf(c3,c4) over(partition by c1 order by c2) as c4from heng_tmp_1""").show()# 4- 数据输出# 5- 释放资源spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/658932.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

付费使用GitHub Copilot半年,我残了吗?

有些程序员担心使用AI编程助手来写程序,自己会残掉 代码都不写了,你还是程序员吗? 手生脚慢,你怕是只会写hello word了吧? 接下来我用自己的亲身经验,告诉你答案。 ---- 购买记录 ---- 2023年7月17号,我付费100美金购买了一年Copilot,下一次续费的日期是今年的7月1…

防火墙详解

一、基本定义 所谓“防火墙”是指一种将内部网和公众访问网(如Internet)分开的方法,它实际上是一种建立在现代通信网络技术和信息安全技术基础上的应用性安全技术,隔离技术。越来越多地应用于专用网络与公用网络的互联环境之中&a…

PYTHON蓝桥杯——每日一练(简单题)

题目 利用字母可以组成一些美丽的图形,下面给出了一个例子: ABCDEFG BABCDEF CBABCDE DCBABCD EDCBABC 这是一个5行7列的图形,请找出这个图形的规律,并输出一个n行m列的图形。 输入格式 输入一行,包含两个整数…

AttributeError: module ‘cv2.gapi.wip.draw‘ has no attribute ‘Text‘

解决方法: pip install opencv-python4.6.0.66问题原因,安装的版本不对(或是安装了cv2)

快速美化上百张图片,就是这么简单

你是否曾经遇到过需要批量处理大量图片的情况?比如调整图片大小、裁剪、美化等等。如果你还在一张张地处理,那么你一定需要一款强大的工具来帮助你完成这个任务。今天,我将向你介绍一款名为"固乔智创助手软件"的工具,它…

使用Thumbnailator遇到异常:No suitable ImageReader found for source data

使用Thumbnailator工具完成对图片进行裁剪、缩放的处理: Thumbnails.of(file.getInputStream()).size(width, height).keepAspectRatio(false).toOutputStream(outputStream); 如果前端传入的是webp格式或者二进制文件,该文件中存储内容为将图片转成ba…

浅谈Elastic Search V8版本的一些重大改进

首先说明下本文只阐述一些对我们日常使用影响比较大的更改,比如学的时候是Elastic Search v7.x及其以下的版本,但是用的时候却是Elastic Search v8.x,还有一种情况就是从低版本迁移到高版本,这两种情况的话我们都需要关注下&#…

ECMAScript新特新

1.兼容性 当低版本IE浏览器不支持ES6语法是需要引用browser.js 2.ES6新特性介绍 序号ES6ES71块级作用域变量声明关键字let和constArray.prototype.includes:新增了 includes 方法,用于判断数组中是否包含指定的元素。返回布尔值,表示是否包…

如何开通GitHub Copilot

GitHub Copilot 是由GitHub 和OpenAI共同开发的人工智能代码辅助工具,可以自动地生成高质量代码片段、上下文信息等。 通过自然语言处理和机器学习技术,能够通过分析程序员编写的代码、注释和上下文信息,自动生成代码,减轻程序员的…

基于ssm的法律咨询系统(有报告)。Javaee项目,ssm项目。

演示视频: 基于ssm的法律咨询系统(有报告)。Javaee项目,ssm项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系结构,通过Sp…

代码随想录训练营第三十六天打卡| 435. 无重叠区间 763.划分字母区间 56. 合并区间

435. 无重叠区间 1.模仿射气球那一题的思路,把区间按左边界从小到大排序,遇到重叠区间就删除,同时更新区间右边界,保留最小的那个。当前删除最少的区间,从而全局达到删除区间最少。个人是AC了这道题,但是发…

基于node.js和Vue3的医院挂号就诊住院信息管理系统

摘要: 随着信息技术的快速发展,医院挂号就诊住院信息管理系统的构建变得尤为重要。该系统旨在提供一个高效、便捷的医疗服务平台,以改善患者就医体验和提高医院工作效率。本系统基于Node.js后端技术和Vue3前端框架进行开发,利用其…

Django模型(七)

一、聚合与分组查询 1.1、准备数据 class Cook(models.Model):"""厨师"""name = models.CharField(max_length=32,verbose_name=厨师名)level = models.IntegerField(verbose_name=厨艺等级)age = models.IntegerField(verbose_name=年龄)sect …

图像旋转_题解

【题解提供者】吴立强 解法 思路 设旋转后的图像为 B B B&#xff0c;那么有 A i , j B j , n − i A_{i,j} B_{j,n-i} Ai,j​Bj,n−i​ 成立&#xff0c;故 i , j i,j i,j 互换且 i i i 倒序输出即可。 代码展示 #include <iostream> using namespace std;co…

《二叉树》——3(层序遍历)

目录 前言&#xff1a; 层序遍历: 解析&#xff1a; 前言&#xff1a; 本文主讲链式二叉树的层序遍历&#xff0c;在前面的张篇blog我们初步实现了链式二叉树递归部分的内容&#xff0c;对于递归算法的学习和思维方式我们仍然需要不断加强&#xff0c;所以将对链式二叉树进行…

Python与ArcGIS系列(二十)GDAL之合并shp和geojson要素图层

目录 0 简述1 代码实现2 结果展示0 简述 Shp格式是GIS中非常重要的数据格式,主要在Arcgis中使用,但在进行很多基于网页的空间数据可视化时,通常只接受GeoJSON格式的数据,众所周知JSON是利用键值对+嵌套来表示数据的一种格式,以其轻量、易解析的优点,被广泛使用与各种领域…

Flink与Redis集成:自定义连接器实现维表创建与数据汇入

目录 一、问题引入 二、Redis创建维表 2.1 预期效果展示 2.2 设计要点

python爬虫学习之解析_BeautifulSoup

目录 一、bs4的基本使用 &#xff08;1&#xff09;导入 &#xff08;2&#xff09;创建对象 二、节点定位 1、根据标签名查找节点 2、基本函数使用 &#xff08;1&#xff09;find &#xff08;2&#xff09;find_all &#xff08;3&#xff09;select 三、节点信息 1、获取节…

微搭低代码从入门到精通03用户注册

文章目录 1 搭建数据源2 开发API3 搭建页面4 数据入库5 页面跳转总结 小程序开发中&#xff0c;如果定位是面向内部人员使用的应用&#xff0c;那么我们就需要仔细考虑用户鉴权的问题。首先需要提供用户注册的通道&#xff0c;让用户可以自主完成注册。其次要提供角色分配的功能…

力扣349两个数的交集

题目连接&#xff1a;349. 两个数组的交集 - 力扣&#xff08;LeetCode&#xff09; 给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 示例 1&#xff1a; 输入&#xff1a; nums1 [1,2,2…