【Flink SQL】Flink SQL 基础概念:SQL 的时间属性

Flink SQL 基础概念:SQL 的时间属性

  • 1.Flink 三种时间属性简介
  • 2.Flink 三种时间属性的应用场景
    • 2.1 事件时间案例
    • 2.2 处理时间案例
    • 2.3 摄入时间案例
  • 3.SQL 指定时间属性的两种方式
  • 4.SQL 事件时间案例
  • 5.SQL 处理时间案例

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间事件时间摄入时间 三种时间语义。

三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。

1.Flink 三种时间属性简介

  • 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
  • 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()),在生产环境中用的次多。
  • 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴们要注意到:

  • 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
  • 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

2.Flink 三种时间属性的应用场景

讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

  • 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
  • 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

2.1 事件时间案例

还是以之前的 clicks 表拿来举例。

在这里插入图片描述
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)

上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.2 处理时间案例

还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。

那么这种触发机制就是处理时间。

2.3 摄入时间案例

在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

3.SQL 指定时间属性的两种方式

如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

4.SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊,如下。

CREATE TABLE user_actions (user_name STRING,data STRING,-- 1. 这个 ts 就是常见的毫秒级别时间戳ts BIGINT,-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  • DataStream 中指定事件时间。

之前介绍了 TableDataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {@Overridepublic long extractTimestamp(Row element) {return (long) element.getField("f2");}});// 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}

5.SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 使用下面这句来将 user_action_time 声明为处理时间user_action_time AS PROCTIME()
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  • DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource());// 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/744884.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习Unity到什么程度可以找工作?

游戏开发是一个充满无限可能的行业&#xff0c;Unity作为最流行的游戏开发引擎&#xff0c;吸引着无数游戏开发者的目光。在这个行业发展迅速、竞争激烈的背景下&#xff0c;许多同学都梦想着通过系统学习获得游戏开发技能&#xff0c;从而在游戏行业找到属于自己的一席之地。 …

服务器将动态IP设置成静态IP(内部网络)

话不多说,直接上干货 打开终端,输入命令行:ifconfig,查看你的网卡配置,此次设置的第一个,如下: 打开配置文件&#xff0c;一般在/etc/sysconfig/network-scripts/文件夹下&#xff1a; 编辑配置文件&#xff1a;vi ifcfg-eno1 修改IP地址分配方式&#xff1a; &#xff08;1&a…

Excel小技巧 (3) - 如何取整

1. 四舍五入 Round&#xff08;对象&#xff0c;小数点后位数&#xff09; 结果 123.1 2.向上取整 Roundup&#xff08;对象&#xff0c;小数点后位数&#xff09; 结果&#xff1a;123.2 3.向下取整 Round&#xff08;对象&#xff0c;小数点后位数&#xff09; 结果123.…

【string一些函数用法的补充】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言 string类对象的修改操作 我们来看 c_str 返回c格式的字符串的操作&#xff1a; 我们来看 rfind 和 substr 的操作&#xff1a; string类非成员函数 我们来看 r…

REDHAWK——组件

文章目录 前言一、REDHAWK 核心资产1、REDHAWK 基本组件2、REDHAWK 基本设备3、REDHAWK 基本波形4、REDHAWK 共享库5、REDHAWK 设备依赖性 二、创建组件项目1、组件向导2、组件描述符3、端口4、属性5、记录6、为组件生成代码 三、创建并运行 Hello World 组件 前言 组件是模块…

LeetCode hot100-10

560. 和为 K 的子数组 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的子数组的个数 。子数组是数组中元素的连续非空序列。我的解法&#xff0c;就是很简单的那种两层循环&#xff0c;没啥好说的。这题是不会超时&#xff0c;能通过。 cla…

【DAY09 软考中级备考笔记】机组:信息加密,系统可靠性

机组&#xff1a;信息加密&#xff0c;系统可靠性 3月8日 – 天气&#xff1a;晴 1. 信息加密 信息加密分为了对称加密和非对称加密&#xff1a; 对称加密&#xff1a;加密和解密的密钥相同且不公开 优点是加密速度快缺点是加密的强度不高&#xff0c;密钥分发困难常见算法&…

es 聚合操作(二)

书接上文&#xff0c;示例数据在上一篇&#xff0c;这里就不展示了 一、Pipeline Aggregation 支持对聚合分析的结果&#xff0c;再次进行聚合分析。 Pipeline 的分析结果会输出到原结果中&#xff0c;根据位置的不同&#xff0c;分为两类&#xff1a; Sibling - 结果和现有…

C语言—求最大公约数(4种算法思路)

1.穷举法 如果大数可以整除小数&#xff0c;那么最大公约数为小数。如果不能整除小数&#xff0c;那么这两个数就按大到小依次对比小数小的数求余&#xff0c;遇到都能够整除的&#xff0c;就是最大公约数。 int gcd(int a, int b) {int i;int min a < b ? a : b;for (i …

“禁止互撕”新规第二天,热搜把#章子怡“怒怼”网友#推上了榜一

3月12日&#xff0c;微博热搜发布公告&#xff0c;对热搜词条处置规则进行了更新。 针对热搜词条长期以来存在的引战互撕、挑唆对立等不良现象&#xff0c;热搜生态秩序亟待改善&#xff0c;微博给出了两大解决方案&#xff1a; 一是更新热搜词条处置规则&#xff0c;当热搜词…

1456.定长子串中元音的最大数目

题目&#xff1a;给你字符串 s 和整数 k 。 请返回字符串 s 中长度为 k 的单个子字符串中可能包含的最大元音字母数。 英文中的 元音字母 为&#xff08;a, e, i, o, u&#xff09;。 解题思路&#xff1a; 1.右侧新进入窗口的字母为元音字母&#xff0c;左侧移出窗口的字母…

无源性指数

无源性指数&#xff08;Passivity Index&#xff09;是控制系统理论中的一个概念&#xff0c;用于量化系统的无源性特性。无源性是系统的一个重要属性&#xff0c;它描述了系统从外部环境中吸收能量并消耗这些能量的能力。具体来说&#xff0c;一个无源系统不能从外部环境中无限…

qiankun:vite/webpack项目配置

相关博文&#xff1a; https://juejin.cn/post/7216536069285429285?searchId202403091501088BACFF113F980BA3B5F3 https://www.bilibili.com/video/BV12T411q7dq/?spm_id_from333.337.search-card.all.click qiankun结构&#xff1a; 主应用base&#xff1a;vue3historyv…

3.1_8 两级页表

文章目录 3.1_8 两级页表&#xff08;一&#xff09;单级页表存在的问题&#xff08;二&#xff09;如何解决单级页表的问题&#xff1f;&#xff08;三&#xff09;两级页表的原理、地址结构&#xff08;四&#xff09;如何实现地址变换&#xff08;五&#xff09;需要注意的几…

QT5.14.2 探索QT的神秘力量:轻松获取MD5值的秘诀

在当今这个信息爆炸的时代&#xff0c;数据安全已经成为了我们每个人都需要关注的话题。MD5作为一种广泛使用的哈希算法&#xff0c;它的应用场景非常广泛&#xff0c;从密码存储到文件完整性校验&#xff0c;MD5都扮演着重要的角色。而在QT的世界里&#xff0c;获取MD5值就像吃…

第三方 cookie 就快被废弃了,再不玩就没机会了!

还记得初学前端时&#xff0c;一直听过 cookie&#xff0c;没碰它之前觉得特别简单&#xff0c;一听就懂。但当自己和后台成员对接时&#xff0c;那叫一个惨烈&#xff0c;搞了搞几天才好&#xff0c;最后弄好的时候还是懵懵的。最近突然看到第三方 cookie 快被废弃了&#xff…

洛谷 B3625 迷宫寻路

本道题需要注意&#xff1a;如果孩子的起始位置就是‘#’&#xff0c;那孩子就无路可走&#xff0c;出不来了。 所以需要特判一下&#xff0c;代码如下&#xff1a;&#xff08;这是废话&#xff0c;不需要特判&#xff0c;注意题目要求&#xff09; if(ch[1][1]#){printf(&q…

基于51单片机的数控直流可调电源设计[proteus仿真]

181基于51单片机的数控直流可调电源设计[proteus仿真] 电源系统这个题目算是课程设计和毕业设计中常见的题目了&#xff0c;本期是一个基于51单片机的数控直流可调电源设计 需要的源文件和程序的小伙伴可以关注公众号【阿目分享嵌入式】&#xff0c;赞赏任意文章 2&#xffe…

动态内存管理(下)

1.C/C程序的内存开辟 C/C程序内存分配的几个区域&#xff1a; 1. 栈区&#xff08;stack&#xff09;&#xff1a;在执行函数时&#xff0c;函数内局部变量的存储单元都可以在栈上创建&#xff0c;函数执行结束时这些存储单元自动被释放。栈内存分配运算内置于处理器的指令集中…

谈谈你对Java平台的理解?

从你接触 Java 开发到现在&#xff0c;你对 Java 最直观的印象是什么呢&#xff1f;是它宣传的 “Write once, run anywhere”&#xff0c;还是目前看已经有些过于形式主义的语法呢&#xff1f;你对于 Java 平台到底了解到什么程度&#xff1f;请你先停下来总结思考一下。 今天…