Flink学习连载文章13--FlinkSQL高级部分

eventTime

测试数据如下:

{"username":"zs","price":20,"event_time":"2023-07-17 10:10:10"}
{"username":"zs","price":15,"event_time":"2023-07-17 10:10:30"}
{"username":"zs","price":20,"event_time":"2023-07-17 10:10:40"}
{"username":"zs","price":20,"event_time":"2023-07-17 10:11:03"}
{"username":"zs","price":20,"event_time":"2023-07-17 10:11:04"}
{"username":"zs","price":20,"event_time":"2023-07-17 10:12:04"}
{"username":"zs","price":20,"event_time":"2023-07-17 11:12:04"}
{"username":"zs","price":20,"event_time":"2023-07-17 11:12:04"}
{"username":"zs","price":20,"event_time":"2023-07-17 12:12:04"}
{"username":"zs","price":20,"event_time":"2023-07-18 12:12:04"}

需求:每隔1分钟统计这1分钟的每个用户的总消费金额和消费次数

需要用到滚动窗口

编写好sql:

CREATE TABLE table1 (`username` string,`price` int,`event_time` TIMESTAMP(3),watermark for event_time as event_time - interval '3' second
) WITH ('connector' = 'kafka','topic' = 'topic1','properties.bootstrap.servers' = 'bigdata01:9092','properties.group.id' = 'g1','scan.startup.mode' = 'latest-offset','format' = 'json'
);编写sql:
select window_start,window_end,username,count(1) zongNum,sum(price) totalMoney from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second))
group by window_start,window_end,username;

分享一个错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval) requires the timecol is a time attribute type, but is VARCHAR(2147483647).

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)

说明创建窗口的时候,使用的字段不是时间字段,需要写成时间字段TIMESTAMP(3),使用了eventtime需要添加水印,否则报错。

需求:按照滚动窗口和EventTime进行统计,每隔1分钟统计每个人的消费总额是多少

package com.bigdata.day08;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-28 14:12:28**/
public class _03EventTimeGunDongWindowDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` TIMESTAMP(3),\n" +"   watermark for event_time as event_time - interval '3' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'group-offsets',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

统计结果如下:

测试一下滑动窗口,每隔10秒钟,计算前1分钟的数据:

package com.bigdata.day08;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-28 14:12:28**/
public class _03EventTimeGunDongWindowDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` TIMESTAMP(3),\n" +"   watermark for event_time as event_time - interval '3' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'group-offsets',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(HOP(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

结果如图所示:

package com.bigdata.day08;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-28 14:12:28**/
public class _03EventTimeGunDongWindowDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` TIMESTAMP(3),\n" +"   watermark for event_time as event_time - interval '3' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'group-offsets',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '1' hours,INTERVAL '1' days))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

累积窗口演示效果:

processTime

测试数据:

{"username":"zs","price":20}
{"username":"lisi","price":15}
{"username":"lisi","price":20}
{"username":"zs","price":20}
{"username":"zs","price":20}
{"username":"zs","price":20}
{"username":"zs","price":20}
/*** 滚动窗口大小1分钟 延迟时间3秒** {"username":"zs","price":20}* {"username":"lisi","price":15}* {"username":"lisi","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}**/
package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-28 14:12:28**/
public class _04ProcessingTimeGunDongWindowDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` as proctime()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'group-offsets',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second ))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

计算结果:

结果需要等1分钟,才能显示出来,不要着急!

窗口分为滚动和滑动,时间分为事件时间和处理时间,两两组合,4个案例。

以下是滑动窗口+处理时间:

package com.bigdata.sql;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-11-29 14:28:19**/
public class _04_FlinkSQLProcessTime_HOP {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);tEnv.executeSql("CREATE TABLE table1 (\n" +"  `username` string,\n" +"  `price` int,\n" +"  `event_time` as proctime() \n"+") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +"  'properties.group.id' = 'g1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");// 语句中的 ; 不能添加tEnv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(HOP(TABLE table1, DESCRIPTOR(event_time),INTERVAL '10' second, INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//5. execute-执行env.execute();}
}

测试时假如你的控制台不出数据,触发不了,请进入如下操作:

1、重新创建一个新的 topic,分区数为 1

2、kafka 对接的 server,写全 bigdata01:9092,bigdata02:9092,bigdata03:9092

二、窗口TopN(不是新的技术)

需求:在每个小时内找出点击量最多的Top 3网页。

测试数据
{"ts": "2023-09-05 12:00:00", "page_id": 1, "clicks": 100}
{"ts": "2023-09-05 12:01:00", "page_id": 2, "clicks": 90}
{"ts": "2023-09-05 12:10:00", "page_id": 3, "clicks": 110}
{"ts": "2023-09-05 12:20:00", "page_id": 4, "clicks": 23}
{"ts": "2023-09-05 12:30:00", "page_id": 5, "clicks": 456}
{"ts": "2023-09-05 13:10:00", "page_id": 5, "clicks": 456}
假如没有每隔1小时的需求,仅仅是统计点击量最多的Top 3网页,结果如下
select * from (
select page_id,totalSum, row_number() over (order by totalSum desc) pxfrom (select page_id,sum(clicks)  totalSumfrom kafka_page_clicks group by page_id )  ) where px <=3;

根据以上代码,添加滚动窗口的写法:

select window_start,window_end,page_id,sum(clicks) totalSum  from table ( tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR ) ) group by window_start,window_end,page_id;在这个基础之上添加排名的写法:
select window_start,window_end,page_id,pmfrom   (
select window_start,window_end,page_id,row_number() over(partition by window_start,window_end order by totalSum desc ) pmfrom (
select window_start,window_end,page_id,sum(clicks) totalSum  from table ( tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR ) ) group by window_start,window_end,page_id ) t2 ) t1  where pm <= 3;

编写建表语句:

{"ts": "2023-09-05 12:00:00", "page_id": 1, "clicks": 100}CREATE TABLE kafka_page_clicks (`ts` TIMESTAMP(3),`page_id` int,`clicks` int,watermark for ts as ts - interval '3' second
) WITH ('connector' = 'kafka','topic' = 'topic1','properties.bootstrap.servers' = 'bigdata01:9092','properties.group.id' = 'g1','scan.startup.mode' = 'latest-offset','format' = 'json'
)
package com.bigdata.day08;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-28 15:23:46**/
public class _05TopNDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// ctrl + y 删除光标所在的那一行数据  ctrl + d 复制当前行StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. source-加载数据// 一定要注意:ts 是一个年月日时分秒的数据,所以在建表时一定要是TIMESTAMP,否则进行WATERMARK 报错// 因为使用的是event_time 所以,需要指定WATERMARKtenv.executeSql("CREATE TABLE kafka_page_clicks (" +"    `ts` TIMESTAMP(3),\n" +"    page_id INT,\n" +"    clicks INT,\n" +"  WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" +") WITH (\n" +"    'connector' = 'kafka',\n" +"    'topic' = 'topic1',\n" +"    'properties.bootstrap.servers' = 'bigdata01:9092',\n" +"   'scan.startup.mode' = 'group-offsets',\n" +"    'format' = 'json'\n" +")");tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   page_id,\n" +"   pm\n" +"  from   (\n" +"select \n" +"    window_start,\n" +"    window_end,\n" +"    page_id,\n" +"    row_number() over(partition by window_start,window_end order by totalSum desc ) pm\n" +"  from (\n" +"select \n" +"    window_start,\n" +"    window_end,\n" +"    page_id,\n" +"    sum(clicks) totalSum  \n" +"    from \n" +"   table ( \n" +"     tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR ) \n" +"         ) \n" +"    group by window_start,window_end,page_id ) t2 ) t1  where pm <= 3").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

最后的运行结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/63729.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

16、PyTorch中进行卷积残差模块算子融合

文章目录 1. 1x1卷积核-> 3x3卷积核2. 输入x --> 3x3卷积核&#xff0c;无变化3. 代码 1. 1x1卷积核-> 3x3卷积核 假设我们有一个1x1的卷积核&#xff0c;需要通过填充变为一个3x3的卷积核,实现的是像素之间无关联 [ 4 ] → [ 0 0 0 0 4 0 0 0 0 ] \begin{equation}…

深入理解代理模式(Proxy):静态代理、动态代理与AOP

目录 1. 代理模式简介2. 静态代理3. 动态代理 3.1 JDK动态代理3.2 CGLIB动态代理 4. 面向切面编程(AOP)5. 实战示例6. 总结与最佳实践 1. 代理模式简介 代理模式是一种结构型设计模式&#xff0c;它允许我们提供一个代理来控制对其他对象的访问。代理模式在不改变原始类代码…

java+springboot+mysql私人会所管理系统

项目介绍&#xff1a; 使用javaspringbootmysql开发的私人会所管理系统&#xff0c;系统包含管理员、技师、用户角色&#xff0c;功能如下&#xff1a; 管理员&#xff1a;用户管理&#xff1b;服务项目&#xff1b;技师管理&#xff1b;房间管理&#xff1b;预约管理&#x…

Formality:set_svf命令

相关阅读 Formalityhttps://blog.csdn.net/weixin_45791458/category_12841971.html?spm1001.2014.3001.5482 svf文件的全称是Setup Verification for Formality&#xff0c;即Design Compiler提供给Formality的设置验证文件&#xff0c;它的作用是为Formality的指导模式(Gui…

Hive 数据操作语言全面解析

Hive 数据操作语言全面解析 在 Hive 大数据处理框架中&#xff0c;数据操作语言&#xff08;DML&#xff09;提供了多种方式来操作和修改数据&#xff0c;包括数据的加载、插入、更新、删除以及合并等操作。本文将详细介绍 Hive 中各类数据操作语句的语法、用法、注意事项以及…

JS API日期对象

目标&#xff1a;掌握日期对象&#xff0c;可以让网页显示日期 日期对象&#xff1a;用来表示时间的对象 作用&#xff1a;可以得到当前系统时间 实例化 目标&#xff1a;能够实现实例化日期对象 在代码中发现了new关键字时&#xff0c;一般将这个操作称为实例化 创建一个时…

【前端】JavaScript中的闭包与垃圾回收机制详解

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: 前端 文章目录 &#x1f4af;前言&#x1f4af;垃圾回收机制&#xff08;Garbage Collection, GC&#xff09;垃圾回收的核心原理核心过程 函数作用域与垃圾回收运行分析输出结果 垃圾回收的局限性与挑战 &#x1f4a…

单臂路由配置

知识点 单臂路由指在路由器上的一个接口配置子接口&#xff08;逻辑接口&#xff09;来实现不同vlan间通信 路由器上的每个物理接口都可以配置多个子接口&#xff08;逻辑接口&#xff09; 公司的财务部、技术部和业务部有多台计算机&#xff0c;它们使用一台二层交换机进行互…

verilog编程规范

verilog编程规范 文章目录 verilog编程规范前言一、代码划分二、verilog编码ABCDEFG 前言 高内聚&#xff0c;低耦合&#xff0c;干净清爽的代码 一、代码划分 高内聚&#xff1a; 一个功能一个模块干净的接口提取公共的代码 低耦合&#xff1a; 模块之间低耦合尽量用少量…

WEB安全基础知识

WAF全称为Web Application Firewall&#xff08;网页应用防火墙&#xff09;是一种专门设计用来保护web应用免受各种网络攻击的安全防护措施。它位于客户端与服务器之间&#xff0c;监控和过滤HTTP流量&#xff0c;从而拦截恶意请求、识别并防御常见的web攻击。 WAF的主要功能…

qemu安装arm64架构银河麒麟

qemu虚拟化软件&#xff0c;可以在一个平台上模拟另一个硬件平台&#xff0c;可以支持多种处理器架构。 一、安装 安装教程&#xff1a;https://blog.csdn.net/qq_36035382/article/details/125308044 下载链接&#xff1a;https://qemu.weilnetz.de/w64/2024/ 我下载的是 …

前端怎么用 EventSource?EventSource 怎么配置请求头及加参数?EventSourcePolyfill 使用方法

前言 在前端开发中&#xff0c;特别是实时数据更新的场景下&#xff0c;EventSource 是一个非常实用的 API。它允许浏览器与服务器建立单向连接&#xff0c;服务器可以持续地发送数据给客户端&#xff0c;而无需客户端不断轮询。本文将详细介绍 EventSource 的使用方法、如何配…

188-下翻便携式6U CPCI工控机箱

一、板卡概述 下翻式CPCI便携工控机,系统采用6u cpci背板结构,1个系统槽,7个扩展槽, 满足对携带的需求,可装标准6U8槽CPCI主板,8个扩展槽, 满足客户对空间扩展的需求.可宽温服务的工作产品,15高亮度液晶显示屏,超薄88键笔记本键盘,触摸式鼠标,加固型机箱结构,使它能够适应各种复…

网页核心页面设计(第9章)

一、多个边框阴影 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-…

SpringBoot中Selenium详解

文章目录 SpringBoot中Selenium详解一、引言二、集成Selenium1、环境准备1.1、添加依赖 2、编写测试代码2.1、测试主类2.2、页面对象2.3、搜索组件 三、使用示例四、总结 SpringBoot中Selenium详解 一、引言 在现代软件开发中&#xff0c;自动化测试是提高软件质量、减少重复…

Edge SCDN的独特优势有哪些?

强大的边缘计算能力 Edge SCDN&#xff08;边缘安全加速&#xff09;是酷盾安全推出的边缘集分布式 DDoS 防护、CC 防护、WAF 防护、BOT 行为分析为一体的安全加速解决方案。通过边缘缓存技术&#xff0c;智能调度使用户就近获取所需内容&#xff0c;为用户提供稳定快速的访问…

Fastapi教程:使用aioredis异步访问redis

本文将介绍如何使用 FastAPI 异步访问 Redis&#xff0c;包括环境配置、连接创建、数据库初始化、增删查改操作、键过期、管道&#xff08;pipeline&#xff09;操作以及事务管理等内容。 环境准备 首先&#xff0c;我们需要安装必要的依赖包。Redis 是一个键值存储系统&…

duxapp 2024-12-09 更新 PullView可以弹出到中间,优化CLI使用体验

UI库 修复 Button 禁用状态失效的问题Modal 组件即将停用&#xff0c;请使用 PullView 基础库 PullView side 新增 center 指定弹出到屏幕中间PullView 新增 duration 属性&#xff0c;指定动画时长新增 useBackHandler hook 用来阻止安卓端点击返回键 RN端 修复 windows …

多线程与线程互斥

目录 引言 一、多线程设计 多线程模拟抢票 二、互斥锁 互斥量的接口 修改抢票代码 锁的原理 锁的封装&#xff1a;RAII 引言 随着信息技术的飞速发展&#xff0c;计算机软件正变得越来越复杂&#xff0c;对性能和响应速度的要求也日益提高。在这样的背景下&#xff0c;…

Vue导出报表功能【动态表头+动态列】

安装依赖包 npm install -S file-saver npm install -S xlsx npm install -D script-loader创建export-excel.vue组件 代码内容如下&#xff08;以element-ui样式代码示例&#xff09;&#xff1a; <template><el-button type"primary" click"Expor…