玩转数据-大数据-Flink SQL 中的时间属性

一、说明

时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。

二、处理时间

2.1、准备WaterSensor类,方便使用

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

2.2、DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它新增一个字段。
代码段:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}

执行结果:
在这里插入图片描述

2.3、创建数据文件sensor.txt 数据,方便使用

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

2.4、在创建表的 DDL 中定义

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}

运行结果:
在这里插入图片描述

三、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

3.1、DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
1、在 schema 的结尾追加一个新的字段
2、替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
代码:
援用上面WaterSensor类

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}

运行结果:
在这里插入图片描述

3.2、使用已有的字段作为时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

3.3、在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}

运行结果:
在这里插入图片描述
说明:
1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/91612.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手动实现BERT

本文重点介绍了如何从零训练一个BERT模型的过程&#xff0c;包括整体上BERT模型架构、数据集如何做预处理、MASK替换策略、训练模型和保存、加载模型和测试等。 一.BERT架构   BERT设计初衷是作为一个通用的backbone&#xff0c;然后在下游接入各种任务&#xff0c;包括翻译…

Minio入门系列【7】Spring Boot集成Minio

1 前言 之前介绍了如何使用Minio提供的JAVA SDK进行上传和下载文件&#xff0c;在此基础上&#xff0c;我们可以使用spring boot集成Minio JAVA SDK&#xff0c;添加自动配置、装配、客户端管理等功能&#xff0c;简化开发 2 Spring Boot集成Minio 2.1 环境搭建 首先我们搭…

设计模式-组合模式

目录 设计模式-组合模式什么是组合模式设计模式&#xff1f;java示例 设计模式-组合模式 什么是组合模式设计模式&#xff1f; 组合模式是一种结构型设计模式&#xff0c;它允许将对象组合成树状结构来表示“部分-整体”的层次结构。组合模式使得用户对单个对象和组合对象的使…

Docker 网桥、docker0 网桥和 --net host:平台差异、使用方式和场景介绍简介:

Docker 是一个流行的容器化平台,它提供了不同的网络配置选项。其中,Docker 网桥、docker0 网桥和 --net host 是常见的网络部署方式。本文将介绍这些网络选项的平台差异、使用方式以及适用的场景。 Docker 网桥 (Bridge Networking):Docker 网桥是 Docker 默认的网络模式。在…

Android 使用Kotlin封装RecyclerView

文章目录 1.概述2.运行效果图3.代码实现3.1 扩展RecyclerView 3.2 扩展Adapter3.3 RecyclerView装饰绘制3.3.1 以图片实现分割线3.3.2 画网格线3.3.3空白的分割线3.3.4 不同方向上的分割线 3.4 使用方法 1.概述 在一个开源项目上看到了一个Android Kotlin版的RecyclerView封装…

JAVA学习(方法的定义和调用)

一、方法的定义和调用 1、关键词&#xff1a;static表示静态方法&#xff0c;如没有返回值使用void&#xff0c;方法名前使用类型&#xff0c;例如int、float等&#xff1b; /*** 测试方法的定义和调用*/public class TestMethod {public static void main(String[] args) {a…

分布式事务-TCC异常-空回滚

1、空回滚问题&#xff1a; 因为是全局事务&#xff0c;A服务调用服务C的try时服务出现异常服务B因为网络或其他原因还没执行try方法&#xff0c;TCC因为C的try出现异常让所有的服务执行cancel方法&#xff0c;比如B的try是扣减积分 cancel是增加积分&#xff0c;还没扣减就增…

Java初始化大量数据到Neo4j中(二)

接Java初始化大量数据到Neo4j中(一)继续探索&#xff0c;之前用create命令导入大量数据发现太过耗时&#xff0c;查阅资料说大量数据初始化到Neo4j需要使用neo4j-admin import 业务数据说明可以参加Java初始化大量数据到Neo4j中(一)&#xff0c;这里主要是将处理好的节点数据和…

竞赛无人机搭积木式编程(四)---2023年TI电赛G题空地协同智能消防系统(无人机部分)

竞赛无人机搭积木式编程&#xff08;四&#xff09; ---2023年TI电赛G题空地协同智能消防系统&#xff08;无人机部分&#xff09; 无名小哥 2023年9月15日 赛题分析与解题思路综述 飞控用户在学习了TI电赛往届真题开源方案以及用户自定义航点自动飞行功能方案讲解后&#x…

排序篇(二)----选择排序

排序篇(二)----选择排序 1.直接选择排序 基本思想&#xff1a; 每一次从待排序的数据元素中选出最小&#xff08;或最大&#xff09;的一个元素&#xff0c;存放在序列的起始位置&#xff0c;直到全部待排序的数据元素排完 。 直接选择排序: ​ 在元素集合array[i]–array[…

迭代器,可迭代对象,生成器

目录 结论&#xff1a; 1&#xff1a;可迭代对象&#xff1a; 2&#xff1a;生成器&#xff1a;概念如下&#xff1a; 3&#xff1a;迭代器的定义&#xff1a;要同时满足以下三点 一&#xff1a;可迭代对象的分类 二&#xff1a;迭代器的意义和应用场景 1&#xff1a;迭代…

红米手机 导出 通讯录 到电脑保存

不要搞什么 云服务 不要安装什么 手机助手 不要安装 什么app 用 usb 线 连接 手机 和 电脑 手机上会跳出 提示 选择 仅传输文件 会出现下面的 一个 盘 进入 MIUI目录 然后进入 此电脑\Redmi Note 5\内部存储设备\MIUI\backup\AllBackup\20230927_043337 如何没有上面的文件&a…

基于Matlab实现全局优化算法

Matlab是一种非常强大的数学建模和计算工具&#xff0c;它提供了许多优化算法的实现。全局优化算法是一种能够找到全局最优解的优化算法&#xff0c;相对于局部优化算法来说&#xff0c;具有更强的全局搜索能力。在本文中&#xff0c;我们将介绍如何使用Matlab实现全局优化算法…

如何在PIL图像和PyTorch Tensor之间进行相互转换,使用pytorch进行PIL和tensor之间的数据转换

目录 引言PIL简介PyTorch和Torchvision简介PIL转换为TensorTensor转换为PIL实例代码和解释结论参考文献 &#x1f4dd; 引言 在计算机视觉领域&#xff0c;使用图像处理库对图像进行预处理是非常常见的。其中&#xff0c;Python Imaging Library&#xff08;PIL&#xff09;以…

记一次springboot的@RequestBody json值注入失败的问题(字段大小写的问题)

有时候做后端开发时&#xff0c;难免会与算法联调接口&#xff0c;很多算法的变量命名时全部大写&#xff0c;在实际springmvc开发中会遇到无法赋值的问题。 先粘贴问题代码 entity类 Data NoArgsConstructor EqualsAndHashCode(callSuper true) ToString(callSuper true) …

应用在手机触摸屏中的电容式触摸芯片

触控屏&#xff08;Touch panel&#xff09;又称为触控面板&#xff0c;是个可接收触头等输入讯号的感应式液晶显示装置&#xff0c;当接触了屏幕上的图形按钮时&#xff0c;屏幕上的触觉反馈系统可根据预先编程的程式驱动各种连结装置&#xff0c;可用以取代机械式的按钮面板&…

在nodejs常见的不良做法及其优化解决方案

在nodejs常见的不良做法及其优化解决方案 当涉及到在express和nodejs中开发应用程序时。遵循最佳实践对于确保项目的健壮性、可维护性和安全性至关重要。 在本文中&#xff0c;我们将探索开发人员经常遇到的几种常见的错误做法&#xff0c;并通过代码示例研究优化的最佳做法&…

Git分支管理

前言 文本将会向您介绍创建、查看、切换、合并、删除、合并、临时保存等分支管理操作 创建/查看/切换分支 [Fan_558VM-12-13-centos gitcode]$ git branch dev //创建分支 [Fan_558VM-12-13-centos gitcode]$ git branch //查看分支dev * master [Fan_558VM-12-13-centos…

ElasticSearch - 基于 JavaRestClient 操作索引库和文档

目录 一、RestClient操作索引库 1.1、RestClient是什么&#xff1f; 1.2、JavaRestClient 实现创建、删除索引库 1.2.1、前言 1.2.1、初始化 JavaRestClient 1.2.2、创建索引库 1.2.3、判断索引库是否存在 1.2.4、删除索引库 1.3、JavaRestClient 实现文档的 CRUD 1.3…

数据结构复习重点代码——连载中

前言 本文中使用数据类型时&#xff0c;若无特殊需求&#xff0c;默认都采用int类型。 线性表 顺序表 定义 //静态定义 #define MaxSize 50 typedef struct{int data[MaxSize];//采用数组的形式定义int length; //表示顺序表的长度 }seqList;//动态定义 #define InitSize …