详解 Flink Table API 和 Flink SQL 之时间特性

一、介绍

  • Table API 和 SQL 进行基于时间的操作(比如时间窗口)时需要定义相关的时间语义和时间数据来源的信息。因此会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间
  • 时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。一旦定义了时间属性,就可以作为一个普通字段引用,并且可以在基于时间的操作中使用
  • 时间属性的数据类型为 TIMESTAMP,类似于常规时间戳,可以直接访问并且进行计算。
  • 按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)

二、处理时间定义

/**处理时间既不需要提取时间戳,也不需要生成 watermark
*/
public class TestTableProcessingTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);/*方式一:在 DataStream 转化时直接指定注意:1.使用 .proctime,定义处理时间字段2.proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在 schema 定义的末尾定义*/DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature, pt.proctime");/*方式二:在定义 Table Schema时指定*/tableEnv.connect(new FileSystem().path("./sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE()).field("pt", DataTypes.TIMESTAMP(3)).proctime()).createTemporaryTable("sensorTable");/*方式三:在创建表的 DDL 中指定注意:运行这个 DDL,必须使用 Blink Planner 版本依赖*/String sinkDDL= "create table sensorTable (" +" id varchar(20) not null, " +" ts bigint, " +" temperature double, " +" pt AS PROCTIME() " +") with (" +" 'connector.type' = 'filesystem', " +" 'connector.path' = '/sensor.txt', " +" 'format.type' = 'csv')";tableEnv.sqlUpdate(sinkDDL);env.execute();}
}

三、事件时间定义

/**事件时间定义需要从事件数据中,提取时间戳,并设置用于推进事件时间的进展的 watermark
*/
public class TestTableEventTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//开启事件时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);/*方式一:在 DataStream 转化时直接指定注意:1.首先必须在转换的数据流中分配时间戳和 watermark2.使用 .rowtime,定义事件时间字段3.事件时间字段既可以作为新字段追加到 schema,也可以用现有字段替换*/DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));})//提取事件时间戳和设置watermark.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature"); //将现有字段替换为事件时间字段Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature, rt.rowtime"); //将事件时间字段作为新字段追加/*方式二:在定义 Table Schema时指定*/tableEnv.connect(new FileSystem().path("./sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()).rowtime( //在事件时间戳字段后调用 rowtime 方法new RowTime().timestampsFromField("timestamp")  // 从字段中提取事件时间戳.watermarksPeriodicBounded(1000)   // 设置watermark 延迟 1 秒).field("temperature", DataTypes.DOUBLE())).createTemporaryTable("sensorTable");/*方式三:在创建表的 DDL 中指定注意:运行这个 DDL,必须使用 Blink Planner 版本依赖说明:FROM_UNIXTIME 是系统内置的时间函数,用来将一个整数(秒数)转换成 “YYYY-MM-DD hh:mm:ss”格式(默认,也可以作为第二个 String 参数传入)的日期时间字符串(date time string);然后再用 TO_TIMESTAMP 将其转换成 Timestamp*/String sinkDDL = "create table dataTable (" +" id varchar(20) not null, " + " ts bigint, " +" temperature double, " +" rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +" watermark for rt as rt - interval '1' second" +") with (" +" 'connector.type' = 'filesystem', " +" 'connector.path' = '/sensor.txt', " +" 'format.type' = 'csv')";tableEnv.sqlUpdate(sinkDDL);env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/26790.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高效处理海量慢SQL日志文件:Java与JSQLParser去重方案详解

在大数据处理环境下&#xff0c;慢SQL日志优化是一个必要的步骤&#xff0c;尤其当日志文件达到数GB时&#xff0c;直接操作日志文件会带来诸多不便。本文将介绍如何通过Java和JSQLParser库来解析和去重慢SQL日志&#xff0c;以提高性能和可维护性。 背景 公司生产环境中&…

Lua与C交互API接口总结

Lua与C交互 1. 常见Lua相关的C API压入元素查询元素获取元素检查元素栈的相关数据操作 2. C调用Lua核心调用函数示例 3. Lua调用C1. C函数注册到Lua&#xff08;lua_register&#xff09;示例2. 批量注册&#xff08;luaL_Reg&#xff09;示例 1. 常见Lua相关的C API 压入元素…

大模型微调出错的解决方案(持续更新)

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

等保测评练习题11

等级保护初级测评师试题11 姓名&#xff1a; 成绩&#xff1a; 判断题&#xff08;10110分&#xff09; 1.国家支持网络运营者之间在网络安全信息收集、分析、通报和应急处置等方面进行合作。&#xff08;T&#xff09; 安全法第二…

【文献阅读】基于高阶矩的波形分类方法

文章目录 基本信息SND及其统计特征分类 基本信息 【2017】rse Moritz, Bruggisser, Andreas, et al. Retrieval of higher order statistical moments from full-waveform LiDAR data for tree species classification[J]. Remote Sensing of Environment, 2017,196: 28-41. …

鲁教版八年级数学下册-笔记

文章目录 第六章 特殊平行四边形1 菱形的性质与判定2 矩形的性质与判定3 正方形的性质与判定 第七章 二次根式1 二次根式2 二次根式的性质3 二次根式的加减二次根式的乘除 第八章 一元二次方程1 一元二次方程2 用配方法解一元二次方程3 用公式法解一元二次方程4 用因式分解法解…

css系列:音频播放效果-波纹律动

介绍 语音播放的律动效果&#xff0c;通俗来说就是一个带动画的特殊样式的进度条&#xff0c;播放的部分带有上下律动的动画&#xff0c;未播放的部分是普通的灰色竖状条。 实现中夹带了less变量、继承和循环遍历&#xff0c;可以顺带学习一下。 结果展示 大致效果如图所示…

防火墙安全管理

大多数企业通过互联网传输关键数据&#xff0c;因此部署适当的网络安全措施是必要的&#xff0c;拥有足够的网络安全措施可以为网络基础设施提供大量的保护&#xff0c;防止黑客、恶意用户、病毒攻击和数据盗窃。 网络安全结合了多层保护来限制恶意用户&#xff0c;并仅允许授…

使用QT制作QQ登录界面

mywidget.cpp #include "mywidget.h"Mywidget::Mywidget(QWidget *parent): QWidget(parent) {/********制作一个QQ登录界面*********************/this->resize(535,415);//设置登录窗口大小this->setFixedSize(535,415);//固定窗口大小this->setWindowTi…

Spring Boot 的启动原理、Spring Boot 自动配置原理

Spring Boot启动原理包含自动装配原理。 Spring Boot 的启动原理&#xff1a; 1. 入口类与 SpringApplication 初始化&#xff1a; 应用程序通常从一个带有 SpringBootApplication 注解的主类开始&#xff0c;这个注解是一个组合注解&#xff0c;包含了 SpringBootConfigurat…

【学习笔记8】阅读StyleID论文源码

论文【链接】 源码【链接】 一、DDIM eta ddim_step表示执行几轮去噪迭代&#xff0c;eta表示DDPM和DDIM的插值系数。当eta0时&#xff0c;为DDPM&#xff1b;当eta≠0时&#xff0c;为DDIM。 参考 DDIM 简明讲解与 PyTorch 实现&#xff1a;加速扩散模型采样的通用方法 【s…

2024.06.13

这两天一直在准备面试和进行面试啊&#xff0c; 从昨天面七牛云&#xff0c;到今天面百度和蔚来&#xff0c;学到了很多不只是知识上的内容&#xff0c;详情可看&#xff1a;我的牛客

【ARMv8/ARMv9 硬件加速系列 3 -- SVE 硬件加速向量运算 1】

文章目录 SVE 使用介绍SVE 特点SVE2 特点 SVE 寄存器扩展的向量寄存器可扩展的谓词寄存器.d 与 .b 后缀的区别举例介绍使用 .d 后缀进行64位元素操作使用 .b 后缀进行8位元素操作 ptrue 指令小结 FFR 寄存器 SVE 使用介绍 前面文章:【ARMv8/ARMv9 硬件加速系列 1 – SVE | NEO…

git下载项目登录账号或密码填写错误不弹出登录框

错误描述 登录账号或密码填写错误不弹出登录框 二、解决办法 控制面板\用户帐户\凭据管理器 找到对应的登录地址进行更新或者删除 再次拉取或者更新就会提示输入登录信息

影响数字本振信噪比的因素

2048 点 -66 4096 点-72 8192 点-77 16384 点-84

FineBI开发中的一些数据处理方法

在这里记录在FineBI开发中的遇到的一些数据处理方法。 1、获取一星期中的首日日期 假设电商数据分析场景中有张订单表&#xff0c;其中有一列为订单日期(order_create_dt)&#xff0c;如果需要统计订单金额周同比&#xff0c;一般我们都需要构建一张日期维度表&#xff08;如…

SAP PP学习笔记21 - 计划策略的Customize:策略组 > 策略 > 需求类型 > 需求类(消费区分,计划区分)

上面几章讲了MTS&#xff0c;MTO&#xff0c;ATO的计划策略。 本章来讲一下它的后台 Customize。 1&#xff0c;Customizeing&#xff1a;Planned Indep.Reqmts Management 这是配置计划策略的整个过程&#xff1a; - Requirements Type / Class 需求类型 / 需求类 - Plann…

VUE之重定向redirect

VUE之路由和重定向redirect 这个小知识点是在学习做项目的时候遇到的一个问题&#xff0c;借鉴了一个他人的项目&#xff0c;是一个酒店管理系统&#xff0c;拿到源码之后导到我的vscode里。 参考链接 导的过程比较顺利&#xff0c;正常安装&#xff0c;加依赖&#xff0c;没有…

SIM卡 移动、联通、电信对比

中国移动、联通、电信优势劣势分析 移动和联通采用GSM终端&#xff0c;电信采用CDMA终端(码分多址)&#xff0c;上网速度快&#xff0c;保密性好联通也有CDMA关于GSM、CDMA、TDMA、 TD-SCDMA、WCDMA之间的各种纠结 中国联通&#xff1a;网络安全的“攻”与“防” 联通保密性…

java操作数据库语法

1 新建数据库 1.1 新建数据库 1 启动mysql数据库 2 新建数据库 1.2 mysql数据库语法 1 选择数据库 use java_demo1 2 移除数据库 drop database java_web1 3 创建表 CREATE TABLE user (id int(11) PRIMARY KEY AUTO_INCREMENT,name varchar(255) NOT NULL,age int(11)…