大数据-玩转数据-Flink Sql 窗口

一、说明

时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算了。Table API和SQL中,主要有两种窗口:分组窗口(Group Windows)和 含Over字句窗口(Over Windows)。

二、Group Windows

分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。

2.1、分组窗口中的滚动窗口

代码示例:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;public class Sql_Group_Windows_List {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 100),new WaterSensor("sensor_2", 1000L, 100)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));table.window(Tumble.over(lit(10).second()).on($("ts")).as("w"))  // 定义滚动窗口并给窗口起一个别名.groupBy($("id"), $("w")) // 窗口必须出现的分组字段中.select($("id"), $("w").start(), $("w").end(), $("vc").sum()).execute().print();env.execute();}
}

运行结果:
在这里插入图片描述

2.2、分组窗口中的滑动窗口

.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("w"))

运行结果:
在这里插入图片描述

2.3、分组窗口中的会话窗口

.window(Session.withGap(lit(6).second()).on($("ts")).as("w"))

运行结果:
在这里插入图片描述

三、Over windows

Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。

3.1、Unbounded Over Windows

代码示例:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.*;public class Sql_Over_windows {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 4000L, 200),new WaterSensor("sensor_2", 2000L, 200),new WaterSensor("sensor_2", 3000L, 200),new WaterSensor("sensor_2", 5000L, 200),new WaterSensor("sensor_2", 6000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, stamptime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSource, $("id"), $("ts").rowtime(), $("vc"));table.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w")).select($("id"), $("ts"), $("vc").sum().over($("w")).as("sum_vc")).execute().print();env.execute();}
}

运行结果:
在这里插入图片描述

# 使用UNBOUNDED_RANGE
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w"))

在这里插入图片描述

3.2、Bounded Over Windows

当事件时间向前算3s得到一个窗口

.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w"))

运行结果:
在这里插入图片描述

当行向前推算2行算一个窗口

.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(rowInterval(2L)).as("w"))

四、总结

作为大数据工程师,我们最为熟悉的数据统计方式,当然就是写 SQL 了。SQL 是结构化查询语言(Structured Query Language)的缩写,是我们对关系型数据库进行查询和修改的通用编程语言。在关系型数据库中,数据是以表(table)的形式组织起来的,所以也可以认为 SQL 是用来对表进行处理的工具语言。无论是传统架构中进行数据存储的MySQL、PostgreSQL,还是大数据应用中的 Hive,都少不了 SQL 的身影;而 Spark 作为大数据处理引擎,为了更好地支持在 Hive 中的 SQL 查询,也提供了 Spark SQL 作为入口。Flink 同样提供了对于“表”处理的支持,这就是更高层级的应用 API,在 Flink 中被称为Table API 和 SQL。Table API 顾名思义,就是基于“表”(Table)的一套 API,它是内嵌在 Java、Scala 等语言中的一种声明式领域特定语言(DSL),也就是专门为处理表而设计的;在此基础上,Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来,我们就可以在 Flink 程序中直接写 SQL 来实现处理需求了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/92119.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

卤制品配送经营商城小程序的用处是什么

卤制品也是食品领域重要的分支&#xff0c;尤其对年轻人来说&#xff0c;只要干净卫生好吃价格合理&#xff0c;那复购率宣传性自是不用说&#xff0c;而随着互联网发展&#xff0c;传统线下门店也须要通过线上破解难题或进一步扩大生意。 而商城小程序无疑是商家通过线上私域…

字符串函数与内存函数讲解

文章目录 前言一、字符串函数1.求字符串长度strlen 2.长度不受限制的字符串函数(1)strcpy(2)strcat(3)strcmp 3.长度受限制的字符串函数(1)strncpy(2)strncat(3)strncmp 4.字符串查找(1)strstr(2)strtok 5.错误信息报告(1)strerror(2)perror 二、内存函数1.memcpy2.memmove3.me…

Neural Networks for Fingerprint Recognition

Neural Computation ( IF 3.278 ) 摘要&#xff1a; 在采集指纹图像数据库后&#xff0c;设计了一种用于指纹识别的神经网络算法。当给出一对指纹图像时&#xff0c;算法输出两个图像来自同一手指的概率估计值。在一个实验中&#xff0c;神经网络使用几百对图像进行训练&…

第 365 场 LeetCode 周赛题解

A 有序三元组中的最大值 I 参考 B B B 题做法… class Solution { public:using ll long long;long long maximumTripletValue(vector<int> &nums) {int n nums.size();vector<int> suf(n);partial_sum(nums.rbegin(), nums.rend(), suf.rbegin(), [](int x…

golang工程——protobuf使用及原理

相关文档 源码&#xff1a;https://github.com/grpc/grpc-go 官方文档&#xff1a;https://www.grpc.io/docs/what-is-grpc/introduction/ protobuf编译器源码&#xff1a;https://github.com/protocolbuffers/protobuf proto3文档&#xff1a;https://protobuf.dev/programmin…

加入PreAuthorize注解鉴权之后NullPointerException报错

记录一次很坑的bug&#xff0c;加入PreAuthorize注解鉴权之后NullPointerException报错&#xff0c;按理来说没有权限应该403报错&#xff0c;但是这个是500报错&#xff0c;原因是因为controller层的service注入失败&#xff0c;然而我去掉注解后service注入成功&#xff0c;并…

使用VSCODE 调试ros2具体设置

vscode 调试 ROS2 张得帅&#xff01; 于 2023-09-09 15:39:39 发布 456 收藏 1 文章标签&#xff1a; vscode ros2 版权 1、在下列目录同层级找到.vscode文件夹 . ├── build ├── install ├── log └── src 2、 安装ros插件 3、创建tasks.json文件&#xff0c;添…

二十七、[进阶]MySQL默认存储引擎InnoDB的简单介绍

1、MySQL体系结构 MySQL大致可以分为连接层、服务层、引擎层、存储层四个层&#xff0c;这里需要注意&#xff0c;索引的结构操作是在存储引擎层完成的&#xff0c;所以不同的存储引擎&#xff0c;索引的结构是不一样的。 &#xff08;1&#xff09;体系结构示意图 &#xff0…

国庆10.01

TCPselect 代码 服务器 #include<myhead.h> #include<sqlite3.h> #define PORT 6666 //端口号 #define IP "192.168.0.104" //IP地址//键盘事件 int jp(fd_set tempfds,int maxfd) {char buf[128] ""; //用来接收数据char buf1[128] …

【算法|贪心算法系列No.2】leetcode2208. 将数组和减半的最少操作次数

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【LeetCode】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望…

Spring注册Bean系列--方法1:@Component

原文网址&#xff1a;Spring注册Bean系列--方法1&#xff1a;Component_IT利刃出鞘的博客-CSDN博客 简介 本文介绍Spring注册Bean的方法&#xff1a;Component。 注册Bean的方法我写了一个系列&#xff0c;见&#xff1a;Spring注册Bean(提供Bean)系列--方法大全_IT利刃出鞘…

开绕组电机零序Bakc EMF-based无感控制以及正交锁相环inverse Park-based

前言 最近看论文遇到了基于反Park变换的锁相环&#xff0c;用于从开绕组永磁同步电机零序电压信号中提取转子速度与位置信息&#xff0c;实现无感控制。在此记录 基于零序Back EMF的转子估算 开绕组电机的零序反电动势 e 0 − 3 ω e ψ 0 s i n 3 θ e e_0-3\omega_e\psi_…

​68条萝卜刀《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书

​68条萝卜刀《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书 ​68条萝卜刀《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书

借助 ControlNet 生成艺术二维码 – 基于 Stable Diffusion 的 AI 绘画方案

背景介绍 在过去的数月中&#xff0c;亚马逊云科技已经推出了多篇博文&#xff0c;来介绍如何在亚马逊云科技上部署 Stable Diffusion&#xff0c;或是如何结合 Amazon SageMaker 与 Stable Diffusion 进行模型训练和推理任务。 为了帮助客户快速、安全地在亚马逊云科技上构建、…

【QT开发(6)】0926-QT 中加入 fastDDS 通信库的程序使用说明

在智能驾驶中&#xff0c;DDS有可能被广泛使用&#xff0c;因此推出这篇说明教程。 1、基于【QT开发&#xff08;5&#xff09;】教程的项目文档进行开发 2、安装DDS 查看《【eProsima Fast DDS&#xff08;1&#xff09;】安装eProsima Fast DDS》 至少安装: foonathan_m…

LeetCode每日一题:2136. 全部开花的最早一天(2023.9.30 C++)

目录 2136. 全部开花的最早一天 题目描述&#xff1a; 实现代码与解析&#xff1a; 贪心 原理思路&#xff1a; 2136. 全部开花的最早一天 题目描述&#xff1a; 你有 n 枚花的种子。每枚种子必须先种下&#xff0c;才能开始生长、开花。播种需要时间&#xff0c;种子的生…

Java八股文

JAVA八股文 这里写目录标题 **JAVA八股文**面向对象三大特征接口与抽象类的区别重载与重写与equals异常处理机制HashMap原理红黑树乐观锁和悲观锁HashTable与HashMap的区别ArrayList和LinkedList的区别如何保证ArrayList的线程安全什么是线程上下文切换sleep()和wait()的区别yi…

Nginx简介与Docker Compose部署指南

Nginx是一款高性能的开源Web服务器和反向代理服务器&#xff0c;以其卓越的性能、可伸缩性和灵活性而闻名。它在全球范围内广泛用于托管Web应用程序、负载均衡、反向代理和更多场景中。在本文中&#xff0c;我们将首先介绍Nginx的基本概念&#xff0c;然后演示如何使用Docker C…

stm32 - GPIO

stm32 - GPIO GPIO结构图GPIO原理图输入上拉/下拉/浮空施密特触发器片上外设 输出推挽/开漏/关闭输出方式 GPIO88种模式复用输出 GPIO寄存器端口配置寄存器_CRL端口输入数据寄存器_IDR端口输出数据寄存器_ODR端口位设置/清除寄存器_BSRR端口位清除寄存器_BRR端口配置锁定寄存器…

Window 安装多个版本的 java 并按需切换

1、按需下载对应版本的 java 官网链接&#xff1a;Java Downloads | Oracle 2、执行安装程序&#xff0c;根据安装向导一步一步走就行&#xff0c;每个版本安装在不同的目录下。 3、配置环境变量 a&#xff09;为每个版本 java 新建不同名称的 JAVA_HOME 系统变量&#xff0…