duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)

e44dbfdf198113d6b9bad5e7e92749be.png

Flink总共有三种时间语义:Processing time(处理时间)、Event time(事件时间)以及Ingestion time(摄入时间)。关于这些时间语义的具体解释,可以参考另一篇文章Flink的时间与watermarks详解。本文主要讲解Flink Table API & SQL中基于时间的算子如何定义时间语义。通过本文你可以了解到:

  • 时间属性的简介
  • 处理时间
  • 事件时间

时间属性简介

Flink TableAPI&SQL中的基于时间的操作(如window),需要指定时间语义,表可以根据指定的时间戳提供一个逻辑时间属性。

时间属性是表schama的一部分,当使用DDL创建表时、DataStream转为表时或者使用TableSource时,会定义时间属性。一旦时间属性被定义完成,该时间属性可以看做是一个字段的引用,从而在基于时间的操作中使用该字段。

时间属性像一个时间戳,可以被访问并参与计算,如果一个时间属性参与计算,那么该时间属性会被雾化成一个常规的时间戳,常规的时间戳不能与Flink的时间与水位线兼容,不能被基于时间的操作所使用。

Flink TableAPI & SQL所需要的时间属性可以通过Datastream程序中指定,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认// 可以选择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

处理时间

基于本地的机器时间,是一种最简单的时间语义,但是不能保证结果一致性,使用该时间语义不需要提取时间戳和生成水位线。总共有三种方式定义处理时间属性,具体如下

DDL语句创建表时定义处理时间

处理时间的属性可以在DDL语句中被定义为一个计算列,需要使用PROCTIME()函数,如下所示:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 声明一个额外字段,作为处理时间属性
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗口

DataStream转为Table的过程中定义处理时间

在将DataStream转为表时,在schema定义中可以通过.proctime属性指定时间属性,并将其放在其他schema字段的最后面,具体如下:

DataStream<Tuple2<String, String>> stream = ...;
// 声明一个额外逻辑字段作为处理时间属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

自定义TableSource并实现DefinedProctimeAttribute 接口,如下:

// 定义个带有处理时间属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 创建streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 该字段会追加到schema中,作为第三个字段return "user_action_time";}
}// 注册table source
tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

事件时间

基于记录的具体时间戳,即便是存在乱序或者迟到数据也会保证结果的一致性。总共有三种方式定义处理时间属性,具体如下

DDL语句创建表时定事件时间

事件时间属性可以通过 WATERMARK语句进行定义,如下:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 声明user_action_time作为事件时间属性,并允许5S的延迟  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStream转为Table的过程中定义事件时间

当定义Schema时通过.rowtime属性指定事件时间属性,必须在DataStream中指定时间戳与水位线。例如在数据集中,事件时间属性为event_time,此时Table中的事件时间字段中可以通过’event_time. rowtime‘来指定。

目前Flink支持两种方式定义EventTime字段,如下:

// 方式1:
// 提取timestamp并分配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 声明一个额外逻辑字段作为事件时间属性
// 在table schema的末尾使用user_action_time.rowtime定义事件时间属性
// 系统会在TableEnvironment中获取事件时间属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");// 方式2:// 从第一个字段提取timestamp并分配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 第一个字段已经用来提取时间戳,可以直接使用对应的字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");// 使用:WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

另外也可以在创建TableSource的时候,实现DefinedRowtimeAttributes接口来定义EventTime字段,在接口中需要实现getRowtimeAttributeDescriptors方法,创建基于EventTime的时间属性信息。

// 定义带有rowtime属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};TypeInformation[] types =new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 创建流,基于user_action_time属性分配水位线DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);return stream;}@Overridepublic List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {// 标记user_action_time字段作为事件时间属性// 创建user_action_time描述符,用来标识时间属性字段RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor("user_action_time",new ExistingField("user_action_time"),new AscendingTimestamps());List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);return listRowtimeAttrDescr;}
}// register表
tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

小结

本文主要介绍了如何在Flink Table API和SQL中使用时间语义,可以使用两种时间语义:处理时间和事件时间。分别对每种的时间语义的使用方式进行了详细解释。

往期精彩回顾

Flink Table API & SQL编程指南(1)

Flink Table API & SQL编程指南之动态表(2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/550482.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

旅游系统_旅游标识系统,必须真的“旅游化”

标识是为游客传递路线&#xff0c;指明景点位置、起安全警示作用以及传达公园发展理念的标识(牌)或标识物&#xff0c;是公园的重要组成部分&#xff0c;有助于旅游者顺利完成游览过程&#xff0c;获得满意的旅游体验。好的完善的标识系统&#xff0c;可以起到画龙点睛的作用&a…

如何在linux下启动和关闭oracle服务

1.前言 确保我们能够访问oracle数据库包含两部分&#xff0c;一个是oracle实例&#xff0c;一个是监听&#xff0c;两个同时开启&#xff0c;我们才能正常的使用数据库&#xff0c;因此我们在关闭和启动oracle服务时&#xff0c;也需要同时操作实例和监听。能够操作linux的工具…

exfat为什么不适合机械硬盘_为什么有人说小排量车不适合跑高速,多少排量的车适合?...

阅读本文前&#xff0c;请您先点击上面的蓝色字体“梅赛德斯丶Benz”&#xff0c;再点击“关注”&#xff0c;这样您就可以继续免费收到祝福了。每天都有分享&#xff0c;完全是免费订阅&#xff0c;请放心关注。 哈喽&#xff0c;小伙伴们关注“梅塞德斯丶Benz”每…

调用第三方接口的几种请求方式

第一种方式&#xff1a; String url4"https://www.showmebug.com/open_api/v1/interviews"; jsonnew JSONObject(); json.put("candidate_name", "张三");//传递的参数 MediaType mediaType MediaType.parse("application/json;charsetut…

rust石头墙几个c4_石头在景观中的运用

石材的运用横跨几个世纪&#xff0c;经久不衰。在景观设计中仍然是一个受欢迎的材料。运用好了可以很好的彰显景观的特性。石头的优点持续一生&#xff1b;非常耐用&#xff1b;容易使用&#xff1b;可以用在墙壁装修&#xff0c;铺路&#xff0c;以及重复使用&#xff1b;有不…

java通过POI技术将html转成word

private static void inputStreamToWord() throws IOException {String content "<html>" "<head>你好</head>" "<body>" "<table>" "<tr>" "<td>信息1</td>" …

h5禁用浏览器下载视频_Flash正式被全球禁用,只有中国版还活着

这个弹窗常用 Chrome 或 Edge 浏览器的用户应该很熟悉&#xff0c;基本上每月都能看到几次。说起来 Adobe Flash Player 也是老朋友&#xff0c;这个 F 红标从 4399 小游戏到视频网站默认播放器&#xff0c;二十年来几乎伴随互联网一代人成长。图源自小众软件但技术总在进步&am…

java通过POI技术将HTML文件转成Word

public void htmlToWord2() throws Exception {InputStream bodyIs new FileInputStream("f:\\1.html");InputStream cssIs new FileInputStream("f:\\1.css");String body this.getContent(bodyIs);String css this.getContent(cssIs);//拼一个标准的…

alxctools索引超出了数组界限_[译]V8中的数组类型

译者&#xff1a;蒋海涛JavaScript 对象可以和任何属性有关联。对象属性的名称可以包含任何字符。有趣的是 JavaScript 引擎可以选择名称为纯数字的属性来进行优化&#xff0c;而这个属性其实就是数组 index。在 V8 中&#xff0c;会特殊处理整数名称的属性&#xff08;最常见的…

IntelliJ IDEA下git版本回退,版本还原

1、选中要回退的文件或者项目 2、复制要回退版本的版本号&#xff1a; 3、然后在branches里check out你想要回退的版本 选择Checkout Tag or Revision… 在弹出的窗口粘贴刚刚复制的版本号&#xff0c;然后点击OK&#xff0c;版本回退成功。

vue选中点击的元素_vue中v-for循环选中点击的元素并对该元素添加样式操作

相信大家都会遇到这种情况&#xff1a;v-for循环时&#xff0c;我只需要点击到的元素做出相应反应&#xff0c;其他的元素不变&#xff1b;但是往往所有v-for循环出的元素都会变化。如下面的代码&#xff1a;我需要点击到的元素添加一个类样式&#xff0c;其他元素不变&#xf…

【IDEA】IDEA中使用git将项目上传到码云上

前言 该篇文章记录了使用IDEA上传项目到码云上。 前提是你在 IDEA中集成了git &#xff0c;并且会 git的简单使用 。 一、IDEA上传项目到码云上 1.将项目变成Git能管理的仓库 选中菜单栏 VCS ➡ Import into Version Control ➡ Create GIt Repository…选中当前项目目录&…

activiti 多部门审批_Activiti 基本介绍

简介这两天工作中要用到Activiti&#xff0c;就简单学习了下&#xff0c;做一个记录&#xff0c;好脑子不如烂笔头&#xff0c;记下来牢靠些&#xff0c;来吧&#xff0c;话不多说&#xff0c;一个字&#xff1a;干。Activiti是什么&#xff0c;为什么要用它Activiti项目是一项…

拼接大屏数据展示_可视化大屏的UI设计是根据哪几个方面来进行?

随着大数据产业的发展&#xff0c;越来越多的公司开始意识到数据资源的管理和运用&#xff0c;特别是一些中、大型企业&#xff0c;在日常中会经常用到可视化大屏&#xff0c;这个时候就需要UI设计师能呈现出相应的视觉效果。下面&#xff0c;就给大家介绍一下可视化大屏的UI设…

00600 ora 关闭oracle_Oracle集群高可用故障切换

原文链接[WK-T]ORACLE 10G 配置故障转移(Failover)​blog.itpub.net文章参考&#xff1a;《大话 Oracle RAC 集群 高可用性 备份与恢复》 张晓明 编著Oracle RAC 同时具备HA(High Availiablity) 和LB(LoadBalance). 而其高可用性的基础就是Failover(故障转移). 它指集群中任何一…

如何关闭window10自动更新

如何关闭Windows10的自动更新&#xff1f; 相信很多同学在用Windows10系统的时候&#xff0c;经常跳出更新系统的提示。 有时自动更新的时间&#xff0c;恰好是我们需要急用电脑的时候&#xff0c;而且系统更新比较慢&#xff0c;等待的时间长。 甚至经常会更新失败&#xf…

python字典返回键值对列表_返回列表Python dict dictionaries Python 数据结构——字典 返回列表...

最近研究返回列表&#xff0c;稍微总结一下&#xff0c;以后继续补充&#xff1a;字典是比列表更先进的一种内置数据结构。“字典”就像现实中的字典一样&#xff0c;每一个单词对应好几个意思。在Python面里就是每一个键对应一个关联值。在Python中&#xff0c;我们可以很便利…

分割文本_PSENet、PANNet、DBNet三个文本检测算法异同

点击蓝字关注我们这三个文本检测算法都是segment base算法&#xff0c;通过由下而上的方式&#xff0c;先对text进行segment&#xff0c;然后再根据segment text&#xff0c;计算出text的instancePSENet近年来&#xff0c;自然场景文本检测在场景理解、产品识别、自动驾驶和目标…

maven安装过程以及手动添加jar包到本地仓库

Maven安装过程及手动添加JAR包到本地仓库详解 https://blog.csdn.net/niityzu/article/details/50997544 分类&#xff1a; Maven&#xff08;1&#xff09; 版权声明&#xff1a;本文为博主原创文章&#xff0c;未经博主允许不得转载。 一、Maven介绍 Maven是一个项目构建…