flinkSql中累计窗口CUMULATE

eventTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 + eventTime* 1 分钟 每十秒计算一次 3秒水印* 数据格式* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:43.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:53.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:03.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:13.000"}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` TIMESTAMP(3),\n" +"   watermark for event_time as event_time - interval '3' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

processTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _06_flinkSql_Cumulate_processTime {/*** 累积窗口 + processTime* 1 分钟 每十秒计算一次* 数据格式* {"username":"zs","price":20}* {"username":"lisi","price":15}* {"username":"lisi","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` as proctime()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

topN案例

需求:在每个分钟内找出点击量最多的Top 3网页。 滚动窗口(1分钟)+eventTime+3秒水印hive sqlwith t1 as (select page_id,sum(clicks)  totalSum  from  table1group by page_id
), t2 as(select page_id,totalSum,row_number() over ( order by totalSum desc) px from t1 
) select  * from t2 where px <=3flink sqlwith t1 as (select window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(event_time), INTERVAL '60' second )) group by window_start,window_end,page_id
), t2 as(select window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 
) select  * from t2 where px <=3* 数据格式
{"ts": "2023-09-05 12:00:10", "page_id": 1, "clicks": 100}
{"ts": "2023-09-05 12:00:20", "page_id": 2, "clicks": 90}
{"ts": "2023-09-05 12:00:30", "page_id": 3, "clicks": 110}
{"ts": "2023-09-05 12:00:40", "page_id": 4, "clicks": 23}
{"ts": "2023-09-05 12:00:50", "page_id": 5, "clicks": 456}
{"ts": "2023-09-05 12:00:55", "page_id": 5, "clicks": 456}
// 触发数据
{"ts": "2023-09-05 12:01:03", "page_id": 5, "clicks": 456}
package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _07_flinkSql_topN {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表//3. 通过sql语句统计结果tenv.executeSql("CREATE TABLE table1 (\n" +"    `page_id` INT,\n" +"    `clicks` INT,\n" +"  `ts` TIMESTAMP(3) ,\n" +"   watermark for ts as ts - interval '3' second \n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");tenv.executeSql("with t1 as (\n" +"\tselect window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(ts), INTERVAL '60' second )) group by window_start,window_end,page_id\n" +"), t2 as(\n" +"\tselect window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 \n" +") select  * from t2 where px <=3").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/63250.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第二篇:k8s工作流程

我们来看通过deployment部署pod的常规流程&#xff1a; kubectl向apiserver发送部署请求&#xff08;例如使用 kubectl create -f deployment.yml&#xff09;apiserver将 Deployment 持久化到etcd&#xff1b;etcd与apiserver进行一次http通信。controller manager通过watch a…

SPC三种判定准则的算法

1.连续6个点递增或递减 //传入数据列表 //返回连续X个及以上递增或递减的数组下标int n = array.Length; int X = X_in; List<int> regions_start = new List<int>(); List<int> regions_end = new List<int>();if(Open){for (int i = 0; i < n - (…

工业—使用Flink处理Kafka中的数据_ProduceRecord1

1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为

Advanced Macro Techniques in C/C++: `#`, `##`, and Variadic Macros

Advanced Macro Techniques in C/C: #, ##, and Variadic Macros 文章目录 Advanced Macro Techniques in C/C: #, ##, and Variadic MacrosIllustrative Examples of Macros Using # and ##Stringification ExampleToken Concatenation ExampleNested Macros Example Key Conc…

python爬虫01

前言 之前的学习不是很努力就间断更新了&#xff0c;现在所有的内容是在具有python基础和web基础上继续更新的。接下来是爬虫和Flask框架共同更新&#xff0c;一起加油吧。 接v&#xff1a;13053025350&#xff08;毕设&#xff0c;小程序&#xff09; 看不懂python基础的可以…

使用CIFS挂载nas到centos

要将 NFS 挂载改为 CIFS 挂载方式&#xff0c;你需要确保以下条件满足&#xff1a; NAS 支持 SMB/CIFS 协议&#xff1a; 大多数 NAS 设备同时支持 NFS 和 SMB/CIFS 协议。在 NAS 配置中&#xff0c;确保 CIFS 服务已启用&#xff0c;并且你有访问共享路径的用户名和密码。 安…

开发基础(3):开发应用沉浸式效果 组件安全区方案

什么是沉浸式效果 典型应用全屏窗口UI元素包括状态栏、应用界面和底部导航条,其中状态栏和导航条,通常在沉浸式布局下称为避让区;避让区之外的区域称为安全区。 开发应用沉浸式效果主要指通过调整状态栏、应用界面和导航条的显示效果来减少状态栏导航条等系统界面的突兀感…

第四篇:k8s 理解Service工作原理

什么是service&#xff1f; Service是将运行在一组 Pods 上的应用程序公开为网络服务的抽象方法。 简单来说K8s提供了service对象来访问pod。我们在《k8s网络模型与集群通信》中也说过k8s集群中的每一个Pod&#xff08;最小调度单位&#xff09;都有自己的IP地址&#xff0c;都…

Spring Boot 3.4.0 发布:功能概览与示例

Spring Boot 3.4.0 带来了许多增强功能&#xff0c;使现代应用开发更加高效、便捷和强大。以下是最新功能的完整概述&#xff0c;以及一些帮助您快速入门的代码示例。 1. 应用程序版本管理 Spring Boot 引入了 spring.application.version 属性&#xff0c;方便开发者设置和访…

hhdb数据库介绍(10-43)

安全 密码安全管理 密码安全管理为用户提供了对计算节点数据库用户与存储节点的连接用户、备份用户的密码有效期监控提醒。到期后自动提示用户修改密码以提升系统的安全性。 数据库用户密码 &#xff08;一&#xff09;密码修改 用户可以在“安全->密码安全管理->数据…

基于DFA算法实现敏感词过滤

1、什么是DFA&#xff1f; DFA&#xff08;Deterministic Finite Automaton&#xff09;&#xff0c;即确定有穷自动机。其特征为&#xff1a;有一个有限状 态集合和一些从一个状态通向另一个状态的边&#xff0c;每条边上标记有一个符号&#xff0c;其中一个状态是 初态&#…

隐私安全大考,Facebook 如何应对?

随着数字时代的到来和全球互联网用户的快速增长&#xff0c;隐私安全问题已上升为网络世界的重要议题。社交媒体巨头Facebook因其庞大的用户群体和大量的数据处理活动&#xff0c;成为隐私问题的聚焦点。面对隐私安全的大考&#xff0c;Facebook采取了一系列策略来应对这些挑战…

基于深度学习的甲状腺结节影像自动化诊断系统(PyQt5界面+数据集+训练代码)

随着医学影像技术的发展&#xff0c;计算机辅助诊断在甲状腺结节的早期筛查中发挥着重要作用。甲状腺结节的良恶性鉴别对临床治疗具有重要意义&#xff0c;但传统的诊断方法依赖于医生的经验和影像学特征&#xff0c;存在一定的主观性和局限性。为了解决这一问题&#xff0c;本…

更换 Git 项目的远程仓库地址(五种方法)

更换 Git 项目的远程仓库地址有几种不同的方法&#xff0c;下面是详细的步骤和一些额外的方法来完成这个任务。 方法1&#xff1a;使用 git remote set-url 这是最直接的方法。假设你想要更改名为 origin 的远程仓库地址到新的 URL。 查看当前的远程仓库配置&#xff1a; git…

秒懂:使用js验证hash, content hash , chunk hash的区别

一、使用js验证hash, content hash , chunk hash的区别 1、计算一般的 Hash&#xff08;以简单字符串为例&#xff09; 使用crypto-js库来进行哈希计算&#xff0c;需提前引入npm install crypto-js库。 crypto-js&#xff1a; 是一个JavaScript加密算法库&#xff0c;用于实…

深入浅出:PHP中的字符串处理函数全解析

文章目录 引言理解字符串创建字符串使用单引号使用双引号 访问和修改字符串访问字符修改字符 字符串连接 常用字符串处理函数获取字符串长度查找子字符串查找首次出现的位置查找最后一次出现的位置 替换子字符串简单替换多次替换 分割字符串按分隔符分割 合并字符串转换大小写全…

Telnet不安全?如何配置使用更安全的STelnet远程登录华为AR1000V路由器?

在上一篇文章中&#xff0c;我们介绍了如何配置一台全新的AR1000V&#xff0c;来实现通过Telnet远程登录设备&#xff08;如何配置使用Telnet远程登录华为AR1000V路由器&#xff1f;&#xff09;。其实&#xff0c;在之前的文章中&#xff0c;我们已经介绍过Telnet是一种不安全…

kafka 和 rocketmq 的区别

Kafka 和 RocketMQ 是两种高性能的分布式消息队列系统&#xff0c;广泛用于实时数据处理、事件流处理和分布式系统的解耦。以下是两者的主要区别&#xff1a; 起源和生态 Kafka 起源于 LinkedIn&#xff0c;后贡献给 Apache 社区。拥有强大的开源生态和广泛的社区支持。广泛应…

CV(2)-插值和卷积

前言 仅记录学习过程&#xff0c;有问题欢迎讨论 看看年前可以学到哪。 频率&#xff1a; 灰度值变化程度的指标&#xff0c;是灰度再平面上的梯度幅值: 幅值&#xff1a; 是在一个周期内&#xff0c;交流电瞬时出现的最大绝对值&#xff0c;也是一个正弦波&#xff0c;波…

python数据分析之爬虫基础:解析

目录 1、xpath 1.1、xpath的安装以及lxml的安装 1.2、xpath的基本使用 1.3、xpath基本语法 2、JsonPath 2.1、jsonpath的安装 2.2、jsonpath的使用 2.3、jsonpath的基础语法 3、BeautifulSoup 3.1、bs4安装及创建 3.2、beautifulsoup的使用 3.3、beautifulsoup基本语…