duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)

e44dbfdf198113d6b9bad5e7e92749be.png

Flink总共有三种时间语义:Processing time(处理时间)、Event time(事件时间)以及Ingestion time(摄入时间)。关于这些时间语义的具体解释,可以参考另一篇文章Flink的时间与watermarks详解。本文主要讲解Flink Table API & SQL中基于时间的算子如何定义时间语义。通过本文你可以了解到:

  • 时间属性的简介
  • 处理时间
  • 事件时间

时间属性简介

Flink TableAPI&SQL中的基于时间的操作(如window),需要指定时间语义,表可以根据指定的时间戳提供一个逻辑时间属性。

时间属性是表schama的一部分,当使用DDL创建表时、DataStream转为表时或者使用TableSource时,会定义时间属性。一旦时间属性被定义完成,该时间属性可以看做是一个字段的引用,从而在基于时间的操作中使用该字段。

时间属性像一个时间戳,可以被访问并参与计算,如果一个时间属性参与计算,那么该时间属性会被雾化成一个常规的时间戳,常规的时间戳不能与Flink的时间与水位线兼容,不能被基于时间的操作所使用。

Flink TableAPI & SQL所需要的时间属性可以通过Datastream程序中指定,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认// 可以选择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

处理时间

基于本地的机器时间,是一种最简单的时间语义,但是不能保证结果一致性,使用该时间语义不需要提取时间戳和生成水位线。总共有三种方式定义处理时间属性,具体如下

DDL语句创建表时定义处理时间

处理时间的属性可以在DDL语句中被定义为一个计算列,需要使用PROCTIME()函数,如下所示:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 声明一个额外字段,作为处理时间属性
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗口

DataStream转为Table的过程中定义处理时间

在将DataStream转为表时,在schema定义中可以通过.proctime属性指定时间属性,并将其放在其他schema字段的最后面,具体如下:

DataStream<Tuple2<String, String>> stream = ...;
// 声明一个额外逻辑字段作为处理时间属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

自定义TableSource并实现DefinedProctimeAttribute 接口,如下:

// 定义个带有处理时间属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 创建streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 该字段会追加到schema中,作为第三个字段return "user_action_time";}
}// 注册table source
tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

事件时间

基于记录的具体时间戳,即便是存在乱序或者迟到数据也会保证结果的一致性。总共有三种方式定义处理时间属性,具体如下

DDL语句创建表时定事件时间

事件时间属性可以通过 WATERMARK语句进行定义,如下:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 声明user_action_time作为事件时间属性,并允许5S的延迟  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStream转为Table的过程中定义事件时间

当定义Schema时通过.rowtime属性指定事件时间属性,必须在DataStream中指定时间戳与水位线。例如在数据集中,事件时间属性为event_time,此时Table中的事件时间字段中可以通过’event_time. rowtime‘来指定。

目前Flink支持两种方式定义EventTime字段,如下:

// 方式1:
// 提取timestamp并分配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 声明一个额外逻辑字段作为事件时间属性
// 在table schema的末尾使用user_action_time.rowtime定义事件时间属性
// 系统会在TableEnvironment中获取事件时间属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");// 方式2:// 从第一个字段提取timestamp并分配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 第一个字段已经用来提取时间戳,可以直接使用对应的字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");// 使用:WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

另外也可以在创建TableSource的时候,实现DefinedRowtimeAttributes接口来定义EventTime字段,在接口中需要实现getRowtimeAttributeDescriptors方法,创建基于EventTime的时间属性信息。

// 定义带有rowtime属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};TypeInformation[] types =new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 创建流,基于user_action_time属性分配水位线DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);return stream;}@Overridepublic List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {// 标记user_action_time字段作为事件时间属性// 创建user_action_time描述符,用来标识时间属性字段RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor("user_action_time",new ExistingField("user_action_time"),new AscendingTimestamps());List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);return listRowtimeAttrDescr;}
}// register表
tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

小结

本文主要介绍了如何在Flink Table API和SQL中使用时间语义,可以使用两种时间语义:处理时间和事件时间。分别对每种的时间语义的使用方式进行了详细解释。

往期精彩回顾

Flink Table API & SQL编程指南(1)

Flink Table API & SQL编程指南之动态表(2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/550482.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

旅游系统_旅游标识系统,必须真的“旅游化”

标识是为游客传递路线&#xff0c;指明景点位置、起安全警示作用以及传达公园发展理念的标识(牌)或标识物&#xff0c;是公园的重要组成部分&#xff0c;有助于旅游者顺利完成游览过程&#xff0c;获得满意的旅游体验。好的完善的标识系统&#xff0c;可以起到画龙点睛的作用&a…

如何在linux下启动和关闭oracle服务

1.前言 确保我们能够访问oracle数据库包含两部分&#xff0c;一个是oracle实例&#xff0c;一个是监听&#xff0c;两个同时开启&#xff0c;我们才能正常的使用数据库&#xff0c;因此我们在关闭和启动oracle服务时&#xff0c;也需要同时操作实例和监听。能够操作linux的工具…

exfat为什么不适合机械硬盘_为什么有人说小排量车不适合跑高速,多少排量的车适合?...

阅读本文前&#xff0c;请您先点击上面的蓝色字体“梅赛德斯丶Benz”&#xff0c;再点击“关注”&#xff0c;这样您就可以继续免费收到祝福了。每天都有分享&#xff0c;完全是免费订阅&#xff0c;请放心关注。 哈喽&#xff0c;小伙伴们关注“梅塞德斯丶Benz”每…

rust石头墙几个c4_石头在景观中的运用

石材的运用横跨几个世纪&#xff0c;经久不衰。在景观设计中仍然是一个受欢迎的材料。运用好了可以很好的彰显景观的特性。石头的优点持续一生&#xff1b;非常耐用&#xff1b;容易使用&#xff1b;可以用在墙壁装修&#xff0c;铺路&#xff0c;以及重复使用&#xff1b;有不…

h5禁用浏览器下载视频_Flash正式被全球禁用,只有中国版还活着

这个弹窗常用 Chrome 或 Edge 浏览器的用户应该很熟悉&#xff0c;基本上每月都能看到几次。说起来 Adobe Flash Player 也是老朋友&#xff0c;这个 F 红标从 4399 小游戏到视频网站默认播放器&#xff0c;二十年来几乎伴随互联网一代人成长。图源自小众软件但技术总在进步&am…

alxctools索引超出了数组界限_[译]V8中的数组类型

译者&#xff1a;蒋海涛JavaScript 对象可以和任何属性有关联。对象属性的名称可以包含任何字符。有趣的是 JavaScript 引擎可以选择名称为纯数字的属性来进行优化&#xff0c;而这个属性其实就是数组 index。在 V8 中&#xff0c;会特殊处理整数名称的属性&#xff08;最常见的…

IntelliJ IDEA下git版本回退,版本还原

1、选中要回退的文件或者项目 2、复制要回退版本的版本号&#xff1a; 3、然后在branches里check out你想要回退的版本 选择Checkout Tag or Revision… 在弹出的窗口粘贴刚刚复制的版本号&#xff0c;然后点击OK&#xff0c;版本回退成功。

【IDEA】IDEA中使用git将项目上传到码云上

前言 该篇文章记录了使用IDEA上传项目到码云上。 前提是你在 IDEA中集成了git &#xff0c;并且会 git的简单使用 。 一、IDEA上传项目到码云上 1.将项目变成Git能管理的仓库 选中菜单栏 VCS ➡ Import into Version Control ➡ Create GIt Repository…选中当前项目目录&…

activiti 多部门审批_Activiti 基本介绍

简介这两天工作中要用到Activiti&#xff0c;就简单学习了下&#xff0c;做一个记录&#xff0c;好脑子不如烂笔头&#xff0c;记下来牢靠些&#xff0c;来吧&#xff0c;话不多说&#xff0c;一个字&#xff1a;干。Activiti是什么&#xff0c;为什么要用它Activiti项目是一项…

拼接大屏数据展示_可视化大屏的UI设计是根据哪几个方面来进行?

随着大数据产业的发展&#xff0c;越来越多的公司开始意识到数据资源的管理和运用&#xff0c;特别是一些中、大型企业&#xff0c;在日常中会经常用到可视化大屏&#xff0c;这个时候就需要UI设计师能呈现出相应的视觉效果。下面&#xff0c;就给大家介绍一下可视化大屏的UI设…

00600 ora 关闭oracle_Oracle集群高可用故障切换

原文链接[WK-T]ORACLE 10G 配置故障转移(Failover)​blog.itpub.net文章参考&#xff1a;《大话 Oracle RAC 集群 高可用性 备份与恢复》 张晓明 编著Oracle RAC 同时具备HA(High Availiablity) 和LB(LoadBalance). 而其高可用性的基础就是Failover(故障转移). 它指集群中任何一…

如何关闭window10自动更新

如何关闭Windows10的自动更新&#xff1f; 相信很多同学在用Windows10系统的时候&#xff0c;经常跳出更新系统的提示。 有时自动更新的时间&#xff0c;恰好是我们需要急用电脑的时候&#xff0c;而且系统更新比较慢&#xff0c;等待的时间长。 甚至经常会更新失败&#xf…

分割文本_PSENet、PANNet、DBNet三个文本检测算法异同

点击蓝字关注我们这三个文本检测算法都是segment base算法&#xff0c;通过由下而上的方式&#xff0c;先对text进行segment&#xff0c;然后再根据segment text&#xff0c;计算出text的instancePSENet近年来&#xff0c;自然场景文本检测在场景理解、产品识别、自动驾驶和目标…

maven安装过程以及手动添加jar包到本地仓库

Maven安装过程及手动添加JAR包到本地仓库详解 https://blog.csdn.net/niityzu/article/details/50997544 分类&#xff1a; Maven&#xff08;1&#xff09; 版权声明&#xff1a;本文为博主原创文章&#xff0c;未经博主允许不得转载。 一、Maven介绍 Maven是一个项目构建…

怎么把文件放到docker容器里

1、查找所有容器id&#xff1a;docker ps -a 2、查找容器长ID&#xff1a;docker inspect -f {{.ID}} tomcat-container-id 3、拷贝本地文件到容器&#xff1a; 命令&#xff1a;docker cp 本地路径 容器长ID:容器路径 例子&#xff1a;docker cp /home/work/FDFS/1.jpg dfba3…

lisp医院化验系统_医院智能导视系统

众所周知&#xff0c;“看病难”已经成为了全民关注的社会问题&#xff0c;这一问题也不是一朝一夕能解决的。我司研发的医院智能导视系统&#xff0c;避免就医过程中不必要的时间浪费&#xff0c;大大有效的提高就医效率。医院智能导视系统为了有序推进医院信息化工作&#xf…

的环境下 qt 运行在_Ubuntu16.04环境下运行vins mono(环境配置及编译)之ROS kinetic的安装...

所需环境&#xff1a;ubuntu16.04ROS kineticopencv 3.3.1eigen3.3.3ceres solver 1.141.ROS Kinetic 的安装&#xff08;1&#xff09;设置sources.listsudo sh -c echo "deb http://packages.ros.org/ros/ubuntu $(lsb_release -sc) main" > /etc/apt/sources.l…

db2 jdbc驱动参数_JDBC详细整理(一)

一.什么是JDBCJDBC(Java DataBase Connectivity)就是Java数据库连接&#xff0c;说白了就是用Java语言来操作数据库。原来我们操作数据库是在控制台使用SQL语句来操作数据库&#xff0c;JDBC是用Java语言向数据库发送SQL语句。二.JDBC原理早期SUN公司的天才们想编写一套可以连接…

生物学专业_江南大学微生物学(发酵)20002008历年考研专业课真题汇编

说明 1. 海量考研真题免费发布&#xff0c;欢迎关注公众号『守望考研』&#xff1b;2. 想获取本文对应的PDF文档以便打印使用&#xff0c;欢迎关注公众号了解领取方法&#xff1b;PS: PDF版文档清晰度更高、水印更小南开大学861微生物学1997-2001、2003-2011历年考研专业课真题…