【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性

Flink SQL 基础概念》系列,共包含以下 5 篇文章:

  • Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API
  • Flink SQL 基础概念(二):数据类型
  • Flink SQL 基础概念(三):SQL 动态表 & 连续查询
  • Flink SQL 基础概念(四):SQL 的时间属性
  • Flink SQL 基础概念(五):SQL 时区问题

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 基础概念(四):SQL 的时间属性

  • 1.Flink 三种时间属性简介
  • 2.Flink 三种时间属性的应用场景
    • 2.1 事件时间案例
    • 2.2 处理时间案例
    • 2.3 摄入时间案例
  • 3.SQL 指定时间属性的两种方式
  • 4.SQL 事件时间案例
  • 5.SQL 处理时间案例

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间事件时间摄入时间 三种时间语义。

三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。

1.Flink 三种时间属性简介

  • 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
  • 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()),在生产环境中用的次多。
  • 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴们要注意到:

  • 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
  • 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

2.Flink 三种时间属性的应用场景

讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

  • 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
  • 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

2.1 事件时间案例

还是以之前的 clicks 表拿来举例。

在这里插入图片描述
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)

上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.2 处理时间案例

还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。

那么这种触发机制就是处理时间。

2.3 摄入时间案例

在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

3.SQL 指定时间属性的两种方式

如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

4.SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊,如下。

CREATE TABLE user_actions (user_name STRING,data STRING,-- 1. 这个 ts 就是常见的毫秒级别时间戳ts BIGINT,-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  • DataStream 中指定事件时间。

之前介绍了 TableDataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {@Overridepublic long extractTimestamp(Row element) {return (long) element.getField("f2");}});// 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}

5.SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 使用下面这句来将 user_action_time 声明为处理时间user_action_time AS PROCTIME()
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  • DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource());// 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/752806.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈C++的模板—— 这一篇就够了

今天我们来谈谈C中有关于模板的知识&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;对于C模板来说&#xff0c;我们首先得了解以下几个术语 函数模板模板函数模板实例化模板特例化模板的实参推演模板的非类型参数非模板函数类模板模板类选择性实例化 下面&#xff0c;我…

在Visual Studio中调试 .NET源代码

前言 在我们日常开发过程中常常会使用到很多其他封装好的第三方类库&#xff08;NuGet依赖项&#xff09;或者是.NET框架中自带的库。如果可以设置断点并在NuGet依赖项或框架本身上使用调试器的所有功能&#xff0c;那么我们的源码调试体验和生产效率会得到大大的提升。今天我…

数据分析 | Matplotlib

Matplotlib 是 Python 中常用的 2D 绘图库&#xff0c;它能轻松地将数据进行可视化&#xff0c;作出精美的图表。 绘制折线图&#xff1a; import matplotlib.pyplot as plt #时间 x[周一,周二,周三,周四,周五,周六,周日] #能量值 y[61,72,66,79,80,88,85] # 用来设置字体样式…

vulnhub ---- Dr4g0n b4ll

文章目录 网段扫描隐藏目录隐写尝试通过ssh连接提权路径劫持 网段扫描 nmap -sn 命令用于执行主机存活扫描&#xff0c;仅检测目标网络中的活动主机&#xff0c;而不进行端口扫描。 ┌──(root㉿kali)-[~/Downloads] └─# nmap -sn 10.10.10.0/24 …

计算机网络——物理层(宽带接入技术)

计算机网络——物理层&#xff08;宽带接入技术&#xff09; 什么是宽带有线带宽接入xDSLADSL 技术ADSL 的大部分组成 光纤同轴混合网&#xff08;HFC 网&#xff09;机顶盒与电缆调制解调器&#xff08;set-top box&#xff09; FTTx 技术光配线网 ODN (Optical Distribution …

nginx 基本使用、借助 nginx 和 mkcert 实现本地 https://localhost 测试。

CSDN 如何将资源设置为免费&#xff1f; 安装和基本使用 进入 nginx 官网 下载 Mainline version 版本 解压到一个你喜欢的位置&#xff0c;比如解压到 C: 目录 不管配不配置环境变量&#xff0c;对 nginx 的所有操作都应该在对应文件夹中 基本命令的使用&#xff1a; cd …

性能测试 事务 -- HPE Virtual User Generator -Web -HTTP/HTML

软件介绍 Virtual User Generator &#xff0c;记录用户流程并创建一个自动化性能测试脚本 Controller&#xff0c;单一控制点&#xff0c;轻松、有效地控制所有Vuser&#xff0c;执行期间监控场景性能 Analysis&#xff0c;生成性能测试报告&#xff0c;以图表形式呈现。 操…

2024图表分析网页模版大数据可视化大屏电子沙盘合集包含金融行业智慧大厅智慧交通智慧门店智慧物流智慧小区

2024图表分析网页模版大数据可视化大屏电子沙盘合集包含金融行业智慧大厅智慧交通智慧门店智慧物流智慧小区 项目介绍&#xff1a; 图表分析网页模版 大数据可视化大屏电子沙盘合集&#xff0c;项目基于html/css/js&#xff0c;包含行业&#xff1a; 智慧政务 智慧社区 金融行…

mysql 更新时,旧值与新值相同会怎么做?

文章目录 1 问题描述2 验证2.1 验证猜想12.2 验证猜想2 3 结论4 mysql 为什么这么设计呢&#xff1f; 1 问题描述 创建一张表t&#xff0c;插入一行数据 mysql> CREATE TABLE t ( id int(11) NOT NULL primary key auto_increment, a int(11) DEFAULT NULL ) ENGINEInnoDB…

第111讲:Mycat实践指南:固定Hash算法分片下的水平分表详解

文章目录 1.固定Hash算法分片的概念1.1.固定Hash算法的概念1.2.固定Hash算法是如何将数据路由到分片节点的 2.使用固定Hash算法分片对某张表进行水平拆分2.1.在所有的分片节点中创建表结构2.2.配置Mycat实现固定Hash算法分片的水平分表2.2.1.配置Schema配置文件2.2.2.配置Rule分…

unityprotobuf自动生成C#

Release Protocol Buffers v3.19.4 protocolbuffers/protobuf GitHub 导入Source code 里面的 csharp/src/Google.Protobuf 进入Unity 拷贝其他版本的 System.Runtime.CompilerServices.Unsafe进入工程 使用protoc-3.19.4-win32 里面的exe去编译proto文件为C# using Sys…

MybatisPlus-Generator自定义模版生成CRUD、DTO、VO、Convert等

个人博客&#xff1a;无奈何杨&#xff08;wnhyang&#xff09; 个人语雀&#xff1a;wnhyang 共享语雀&#xff1a;在线知识共享 Github&#xff1a;wnhyang - Overview 简介 如标题所言&#xff0c;本篇文章介绍如何使用MybatisPlus-Generator自定义模版生成CRUD、DTO、V…

Redis实战——查询缓存缓存穿透、雪崩、击穿

目录 为什么要使用缓存缓存的作用缓存的成本如何使用缓存缓存模型和思路缓存更新策略数据库和缓存不一致解决方案数据库和缓存不一致采用什么方案对比删除缓存与更新缓存如何保证缓存与数据库的操作同时成功/同时失败先操作缓存还是先操作数据库&#xff1f; 实现商铺缓存与数据…

Tensorflow笔记(二):激活函数、优化器等、神经网络模型实现(商品销量预测)

import tensorflow as tf import numpy as np from tqdm import tqdm# ----------------------------- tensor常用函数2 ----------------------------------- a tf.constant([1, 2, 3, 1, 2]) b tf.constant([0, 1, 3, 4, 5]) c tf.where(tf.greater(a, b), a, b) # 若a&g…

Linux下的多线程编程:原理、工具及应用(4)

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;Flower of Life—陽花 0:34━━━━━━️&#x1f49f;──────── 4:46 &#x1f504; ◀️ ⏸ ▶️ ☰ …

RuoYi-Vue开源项目2-前端登录验证码生成过程分析

前端登录验证码实现过程 生成过程分析 生成过程分析 验证码的生成过程简单概括为&#xff1a;前端登录页面加载时&#xff0c;向后端发送一个请求&#xff0c;返回验证码图片给前端页面展示 前端页面加载触发代码&#xff1a; import { getCodeImg } from "/api/login&q…

Spring6--基础概念

1. 概述 1.1. Spring是什么 Spring 是一套广泛应用于 Java 企业级应用开发领域的轻量级开源框架&#xff0c;由 Rod Johnson 创立&#xff0c;旨在显著降低 Java 企业应用的复杂性&#xff0c;缩短开发周期&#xff0c;并提升开发效率。Spring 不仅适用于服务器端开发&#x…

三个案例,带你看懂智能时代支撑降本增效的底层逻辑

2003年&#xff0c;“神舟五号”成功登上太空&#xff0c;2007年&#xff0c;乔布斯初代苹果发布会&#xff0c;2016年“天宫二号”与“神州十一号”自动交会对接成功&#xff0c;2022年ChatGPT横空出市。 科技发展速度令人惊叹&#xff0c;一不留神就步入了下一个科技时代&am…

【vue elementUI】el-select和弹出框el-option样式调整,::v-deep失效

组件自带样式&#xff1a; 修改后样式&#xff1a; 注意修改弹出框样式需要修改一个属性&#xff1a; 此属性默认值为true&#xff0c;此时可以看到弹出框是放在外面的&#xff0c;没有在el-select里面。此时设置弹窗样式会不生效&#xff0c;::v-deep无效。 需要将此属性改为f…

JavaScript练手小技巧:数字反转时钟

样式基于博主的这篇文章&#xff1a; CSS3技巧38&#xff1a;3D 翻转数字效果-CSDN博客 既然可以实现翻转数字了&#xff0c;肯定就可以跟 JS 相结合去完成一些数字展示效果。 比如&#xff0c;数字反转时钟。 为了方便&#xff0c;所有 HTML 数字根据时间动态生成。因此&a…