【flink sql table api】时间属性的指定与使用注意事项

文章目录

  • 一. 时间属性介绍
  • 二. Table api指定时间属性
  • 三. 处理时间的指定
    • 1. 在创建表的 DDL 中定义
    • 2. 在 DataStream 到 Table 转换时定义
    • 3. 使用 TableSource 定义
  • 四. 事件时间的指定
    • 1. 在 DDL 中定义
    • 2. 在 DataStream 到 Table 转换时定义
    • 3. 使用 TableSource 定义
  • 五. 小结

Flink 可以基于几种不同的 时间 概念来处理数据。

  • 处理时间 指的是执行具体操作时的机器时间(大家熟知的绝对时间, 例如 Java的 System.currentTimeMillis()) )
  • 事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间。
  • 摄入时间 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。

本页面说明了如何在 Flink Table API & SQL 里面定义时间以及相关的操作。
 

一. 时间属性介绍

像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。

时间属性声明

  • 在CREATE TABLE DDL创建表的时候指定
  • 在 DataStream 中指定
  • 在定义 TableSource 时指定

一旦时间属性定义好,就可以像普通列一样使用,也可以在时间相关的操作中使用。

 
时间属性的传递和物化

  • 只要时间属性没有被修改,而是简单地从一个表传递到另一个表,它就仍然是一个有效的时间属性。
  • 时间属性可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。

注意:

普通的时间戳是无法跟 Flink 的时间以及watermark等一起使用的,所以普通的时间戳就无法用在时间相关的操作中。

 

二. Table api指定时间属性

Table API 程序需要在 streaming environment 中指定时间属性:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

三. 处理时间的指定

处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性它既不需要从数据里获取时间,也不需要生成 watermark。

共有三种方法可以定义处理时间。

1. 在创建表的 DDL 中定义

处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME() 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ 。

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

 

2. 在 DataStream 到 Table 转换时定义

ing
 

3. 使用 TableSource 定义

ing
 

四. 事件时间的指定

事件时间允许程序按照数据中包含的时间来处理,这样可以在数据乱序或者晚到情况下产生一致的处理结果。
它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。

为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

 
同样事件时间的指定也有三种方式

1. 在 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。

WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段

Flink 支持和在 TIMESTAMP(不带时区) 列和 TIMESTAMP_LTZ(带有本地时区) 列上定义事件时间。

如果源数据中的时间戳数据表示为年-月-日-时-分-秒,则通常为不带时区信息的字符串值,例如 2020-04-15 20:13:40.564,建议将事件时间属性定义在 TIMESTAMP(不带时区) 列上:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermarkWATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

 

当源数据中的时间戳数据表示为一个纪元 (epoch) 时间,通常是一个 long 值,例如 1618989564564,此时建议将事件时间属性定义在 TIMESTAMP_LTZ 列上:

CREATE TABLE user_actions (user_name STRING,data STRING,ts BIGINT,time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategyWATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

Epoch Time 是一种计算机系统中常用的时间表示方法,它以秒为单位从一个特定时间点(通常是1970年1月1日午夜UTC)开始计算时间,用于在计算机系统中跟踪和比较时间戳。

 

2. 在 DataStream 到 Table 转换时定义

ing

3. 使用 TableSource 定义

ing
 

五. 小结

本文讨论了flink sql中时间属性的指定方法,其中有几点细节:

  1. 普通的时间戳无法用在时间相关的操作中,需要进行时间属性的定义
  2. 通过PROCTIME()或WATERMARK关键字可以在create语句中分别定义处理时间和事件时间类型的时间属性
  3. 时间属性定义好后,就可以像普通列一样使用,也可以在时间相关的操作中使用
  4. 一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。也就无法进行时间相关操作。

 
参考:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/concepts/time_attributes/#%E5%9C%A8-ddl-%E4%B8%AD%E5%AE%9A%E4%B9%89

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119735.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣每日一题70:爬楼梯

题目描述: 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢? 示例 1: 输入:n 2 输出:2 解释:有两种方法可以爬到楼顶。 1. 1 阶 1 阶 …

redis 配置主从复制,哨兵模式案例

哨兵(Sentinel)模式 1 . 什么是哨兵模式? 反客为主的自动版,能够自动监控master是否发生故障,如果故障了会根据投票数从slave中挑选一个 作为master,其他的slave会自动转向同步新的master,实现故障自动转义 2 . 原理…

简析新能源汽车充电桩设计与应用

叶根胜 安科瑞电气股份有限公司 上海嘉定 201801 摘要:本文针对新能源汽车充电桩建设工作进行探究,采用案例分析法、文献查阅法,指出了新能源汽车充电桩建设存在的问题,阐述了充电桩建设与优化的对策。研究表明:目前…

php实现防止Struts2框架漏洞中的远程命令执行攻击的代码

使用PHP过滤器 PHP提供了一些内置的过滤器,可以过滤掉一些恶意代码。可以使用以下代码将输入数据进行过滤: ​$input $_POST[input]; $safe_input filter_var($input, FILTER_SANITIZE_STRING); 使用escapeshellcmd()和escapeshellarg() 可以使用PHP…

189. 轮转数组 --力扣 --JAVA

题目 给定一个整数数组 nums,将数组中的元素向右轮转 k 个位置,其中 k 是非负数。 解题思路 通过位移后位置对数组长度的取余来判断元素变换后的位置 代码展示 class Solution {public void rotate(int[] nums, int k) {int size nums.length;int[]…

python 使用json包在json格式字符串和python对象之间的变化

起因:使用python json包时,将键值对均为数字的字典存入txt文件后重新加载进字典后出现“字典key值不唯一”的神奇现象。 相关代码: 字典添加数据部分 def xuhao_chuti(self):rand random.randint(1, 908)if rand in self.memery.keys() an…

element-ui的日历组件el-calendar高度咋调小

最近项目首页有个空余 不知道放啥 打算放个日历card 充充位置, el-calendar日历组件的整体宽度可以用el-row el-col :gutter :span来控制自适应 但是官网文档没说高度咋缩小 细长一条好难看 自己尝试改了改element的样式没整出来 最后照着这位博主的方法改是好使滴…

软考系列(系统架构师)- 2014年系统架构师软考案例分析考点

试题一 软件架构(MYC 架构、扩展接口模式) MVC架构风格最初是Smalltalk-80中用来构建用户界面时采用的架构设计风格。其中M代表模型(Model),V代表视图(View),C代表控制器(Controller)。在该风格…

windows/mac/linux 用C++搭建一个刷题模拟器

一个刷题模拟器,一定要有题,题目从这里下载 欢迎大家提供新题目(请把题目文件夹压缩成zip后发给我chengyixuan130812163.com),我会定期更新的。 题目格式: 题目编号 nums.txt(评测点个数&#x…

动态规划(记忆化搜索)

AcWing 901. 滑雪 给定一个 R行 C 列的矩阵,表示一个矩形网格滑雪场。 矩阵中第 i 行第 j 列的点表示滑雪场的第 i 行第 j 列区域的高度。 一个人从滑雪场中的某个区域内出发,每次可以向上下左右任意一个方向滑动一个单位距离。 当然,一个人能…

【前端设计模式】之桥接模式

桥接模式是一种常用的设计模式,它可以将抽象部分与实现部分分离,使它们可以独立地变化。在前端开发中,桥接模式可以帮助我们更好地组织和管理代码,提高代码的可维护性和可扩展性。 桥接模式特性 分离抽象和实现:桥接…

liunx Centos-7.5上 rabbitmq安装

在安装rabbitmq中需要注意: 1、rabbitmq依赖于erlang,需要先安装erlang 2、erlang和rabbitmq版本有对应关系 可参考网页:https://www.rabbitmq.com/which-erlang.html 第一步,安装编译工具及库文件,如果服务器上已经有了&…

长连接的原理

Apollo的长连接实现是 Spring的DeferredResult来实现的,先看怎么用 import ...RestController RequestMapping("deferredResult") public class DeferredResultController {private Map<String, Consumer<DeferredResultResponse>> taskMap new HashMa…

SpringBoot_minio sdk使用自签名https证书错误处理

minio sdk使用自签名https证书错误处理 1.问题描述1.1 报错日志1.2 maven 依赖配置1.3 当前spring MinioClient配置 2.问题分析3.问题解决3.1 使用受信任的证书3.2 忽略证书验证3.2.1 minio客户端3.2.2 minio sdk 忽略证书验证3.2.2.1 拓展: 补充minioclient请求日志 4. 问题总…

树形数据增删改查

功能描述&#xff1a; 默认展示所有项目点击项目展示当前项目下的所有区域点击区域展示当前区域下的所有工位以上以树形图格式展示项目&#xff0c;区域&#xff0c;和工位都可进行增加 修改 和删除&#xff0c;每个图标hover时显示对应提示信息项目&#xff0c;区域&#xff…

illuminate/database 使用 二

上一篇文章写怎么单独使用illuminate/database&#xff0c;这回讲下怎么整合到项目里使用。为此特意看了下laravel对其使用。本篇文章&#xff0c;参照laravel的使用&#xff0c;简单实现。 一 原理 laravel 里使用illuminate/config。 illuminate/config composer 地址&…

Spring Boot整合OAuth2实现GitHub第三方登录

Spring Boot整合OAuth2&#xff0c;实现GitHub第三方登录 1、第三方登录原理 第三方登录的原理是借助OAuth授权来实现&#xff0c;首先用户先向客户端提供第三方网站的数据证明自己的身份获取授权码&#xff0c;然后客户端拿着授权码与授权服务器建立连接获得一个Access Token…

vuejs实现点击导出按钮把数据加密后传到json/txt格式文件中并下载,以及上传json文件解密获得json内容

vuejs实现点击导出按钮把数据加密后传到json/txt格式文件中并下载&#xff0c;以及上传json文件解密获得json内容 &#xff08;1&#xff09;在Vue.js中使用crypto-js进行加密和解密&#xff0c;首先安装crypto-js库 npm install crypto-js&#xff08;2&#xff09;在需要使…

研发效能认证学员作品:快速进行持续集成应用实践丨IDCF

作者&#xff1a;赖嘉明 研发效能&#xff08;DevOps&#xff09;工程师认证学员 随着数字化转型的推进及市场竞争的加剧&#xff0c;越来越多的企业也意识到持续集成的重要性。 而持续集成作为一种先进的软件开发实践和工具链&#xff0c;可以帮助企业实现自动化构建、集成和…

systemctl 自启软件闪屏桌面

一、问题分析 systemctl 服务启动在桌面系统之前&#xff0c;启动界面加载到 100% 时桌面系统开始加载&#xff0c;会强制隐藏我们的界面并显示桌面&#xff0c;待桌面彻底加载完毕&#xff0c;才能显示我们的软件界面。这期间就是闪屏并显示桌面的原因。 不过正常情况桌面系…