【Flink SQL】Flink SQL 基础概念:SQL 的时间属性

Flink SQL 基础概念:SQL 的时间属性

  • 1.Flink 三种时间属性简介
  • 2.Flink 三种时间属性的应用场景
    • 2.1 事件时间案例
    • 2.2 处理时间案例
    • 2.3 摄入时间案例
  • 3.SQL 指定时间属性的两种方式
  • 4.SQL 事件时间案例
  • 5.SQL 处理时间案例

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间事件时间摄入时间 三种时间语义。

三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。

1.Flink 三种时间属性简介

  • 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
  • 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()),在生产环境中用的次多。
  • 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴们要注意到:

  • 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
  • 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

2.Flink 三种时间属性的应用场景

讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

  • 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
  • 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

2.1 事件时间案例

还是以之前的 clicks 表拿来举例。

在这里插入图片描述
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)

上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.2 处理时间案例

还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。

那么这种触发机制就是处理时间。

2.3 摄入时间案例

在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

3.SQL 指定时间属性的两种方式

如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

4.SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊,如下。

CREATE TABLE user_actions (user_name STRING,data STRING,-- 1. 这个 ts 就是常见的毫秒级别时间戳ts BIGINT,-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  • DataStream 中指定事件时间。

之前介绍了 TableDataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {@Overridepublic long extractTimestamp(Row element) {return (long) element.getField("f2");}});// 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}

5.SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 使用下面这句来将 user_action_time 声明为处理时间user_action_time AS PROCTIME()
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  • DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource());// 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/744884.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

服务器将动态IP设置成静态IP(内部网络)

话不多说,直接上干货 打开终端,输入命令行:ifconfig,查看你的网卡配置,此次设置的第一个,如下: 打开配置文件&#xff0c;一般在/etc/sysconfig/network-scripts/文件夹下&#xff1a; 编辑配置文件&#xff1a;vi ifcfg-eno1 修改IP地址分配方式&#xff1a; &#xff08;1&a…

Excel小技巧 (3) - 如何取整

1. 四舍五入 Round&#xff08;对象&#xff0c;小数点后位数&#xff09; 结果 123.1 2.向上取整 Roundup&#xff08;对象&#xff0c;小数点后位数&#xff09; 结果&#xff1a;123.2 3.向下取整 Round&#xff08;对象&#xff0c;小数点后位数&#xff09; 结果123.…

【string一些函数用法的补充】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言 string类对象的修改操作 我们来看 c_str 返回c格式的字符串的操作&#xff1a; 我们来看 rfind 和 substr 的操作&#xff1a; string类非成员函数 我们来看 r…

REDHAWK——组件

文章目录 前言一、REDHAWK 核心资产1、REDHAWK 基本组件2、REDHAWK 基本设备3、REDHAWK 基本波形4、REDHAWK 共享库5、REDHAWK 设备依赖性 二、创建组件项目1、组件向导2、组件描述符3、端口4、属性5、记录6、为组件生成代码 三、创建并运行 Hello World 组件 前言 组件是模块…

【DAY09 软考中级备考笔记】机组:信息加密,系统可靠性

机组&#xff1a;信息加密&#xff0c;系统可靠性 3月8日 – 天气&#xff1a;晴 1. 信息加密 信息加密分为了对称加密和非对称加密&#xff1a; 对称加密&#xff1a;加密和解密的密钥相同且不公开 优点是加密速度快缺点是加密的强度不高&#xff0c;密钥分发困难常见算法&…

es 聚合操作(二)

书接上文&#xff0c;示例数据在上一篇&#xff0c;这里就不展示了 一、Pipeline Aggregation 支持对聚合分析的结果&#xff0c;再次进行聚合分析。 Pipeline 的分析结果会输出到原结果中&#xff0c;根据位置的不同&#xff0c;分为两类&#xff1a; Sibling - 结果和现有…

“禁止互撕”新规第二天,热搜把#章子怡“怒怼”网友#推上了榜一

3月12日&#xff0c;微博热搜发布公告&#xff0c;对热搜词条处置规则进行了更新。 针对热搜词条长期以来存在的引战互撕、挑唆对立等不良现象&#xff0c;热搜生态秩序亟待改善&#xff0c;微博给出了两大解决方案&#xff1a; 一是更新热搜词条处置规则&#xff0c;当热搜词…

1456.定长子串中元音的最大数目

题目&#xff1a;给你字符串 s 和整数 k 。 请返回字符串 s 中长度为 k 的单个子字符串中可能包含的最大元音字母数。 英文中的 元音字母 为&#xff08;a, e, i, o, u&#xff09;。 解题思路&#xff1a; 1.右侧新进入窗口的字母为元音字母&#xff0c;左侧移出窗口的字母…

qiankun:vite/webpack项目配置

相关博文&#xff1a; https://juejin.cn/post/7216536069285429285?searchId202403091501088BACFF113F980BA3B5F3 https://www.bilibili.com/video/BV12T411q7dq/?spm_id_from333.337.search-card.all.click qiankun结构&#xff1a; 主应用base&#xff1a;vue3historyv…

3.1_8 两级页表

文章目录 3.1_8 两级页表&#xff08;一&#xff09;单级页表存在的问题&#xff08;二&#xff09;如何解决单级页表的问题&#xff1f;&#xff08;三&#xff09;两级页表的原理、地址结构&#xff08;四&#xff09;如何实现地址变换&#xff08;五&#xff09;需要注意的几…

基于51单片机的数控直流可调电源设计[proteus仿真]

181基于51单片机的数控直流可调电源设计[proteus仿真] 电源系统这个题目算是课程设计和毕业设计中常见的题目了&#xff0c;本期是一个基于51单片机的数控直流可调电源设计 需要的源文件和程序的小伙伴可以关注公众号【阿目分享嵌入式】&#xff0c;赞赏任意文章 2&#xffe…

动态内存管理(下)

1.C/C程序的内存开辟 C/C程序内存分配的几个区域&#xff1a; 1. 栈区&#xff08;stack&#xff09;&#xff1a;在执行函数时&#xff0c;函数内局部变量的存储单元都可以在栈上创建&#xff0c;函数执行结束时这些存储单元自动被释放。栈内存分配运算内置于处理器的指令集中…

谈谈你对Java平台的理解?

从你接触 Java 开发到现在&#xff0c;你对 Java 最直观的印象是什么呢&#xff1f;是它宣传的 “Write once, run anywhere”&#xff0c;还是目前看已经有些过于形式主义的语法呢&#xff1f;你对于 Java 平台到底了解到什么程度&#xff1f;请你先停下来总结思考一下。 今天…

TikTok新手如何起号?环境因素与内容创新技巧

相信很多刚入行的TikTok玩家都遇到过一个难题&#xff0c;那就是账号权重低&#xff0c;播放量在个位数徘徊&#xff0c;其实都是因为还没起号&#xff01;那么具体如何起号呢&#xff1f;下面小编也给大家分享一下技巧。 一、如何起号 1、明确注册 TikTok 账号的目的 无论是…

怎么把mp4转换成amv格式?如何下载amv格式视频?

MP4&#xff08;MPEG-4 Part 14&#xff09;是一种通用的视频文件格式&#xff0c;广泛用于多媒体应用。作为MPEG-4标准的一部分&#xff0c;MP4以其卓越的压缩性能、出色的视频质量和广泛的兼容性成为当前最流行的视频格式之一。 AMV文件格式的介绍 AMV文件格式起源于中国公司…

深入理解JAVA异常(自定义异常)

目录 异常的概念与体系结构 异常的概念&#xff1a; 异常的体系结构&#xff1a; 异常的分类&#xff1a; 异常的处理 防御式编程 LBYL: EAFP: 异常的抛出 异常的捕获 异常声明throws try-catch捕获并处理 finally 面试题&#xff1a; 异常的处理流程 异常处…

计算机网络——OSI网络层次模型

计算机网络——OSI网络层次模型 应用层表示层会话层传输层TCP和UDP协议复用分用 网络层数据链路层物理层OSI网络层次模型中的硬件设备MAC地址和IP地址MAC地址IP地址MAC地址和IP地址区别 OSI网络层次模型通信过程解释端到端点到点端到端和点到点的区别 我们之前简单介绍了一下网…

sqllab第十三关通关笔记

知识点&#xff1a; 登录框处常见的语句(一般都是查询语句) where usernamewhere username""where usernam()where username("")错误注入知识回顾这里使用错误注入 通过admin admin登录发现没有任何回显信息&#xff1b;但是有成功登录的提示 通过bp抓包…

CSPM有必要换吗?目前持有PMP?

从事项目管理岗位的&#xff0c;尤其是国企工作的&#xff0c;建议换一个&#xff0c;但是有进一步发展打算的&#xff0c;可以直接考CSPM-3级更好&#xff0c;PMP持证增持的是CSPM-2级证书&#xff0c;相对来说是个初级证书&#xff0c;CSPM-3级含金量更高。 一、什么是 CSPM…

美众议院通过强制要求 TikTok 剥离的法案; 首个 AI 软件工程师上线丨 RTE 开发者日报 Vol.165

开发者朋友们大家好&#xff1a; 这里是**「RTE 开发者日报」**&#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real Time Engagement&#xff09; 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 …