FLINK SQL时间属性

Flink三种时间属性简介

在Flink SQL中,时间属性是一个核心概念,它主要用于处理与时间相关的数据流。Flink支持三种时间属性:事件时间(event time)、处理时间(processing time)和摄入时间(ingestion time)。以下是对这三种时间属性的详细解释:

一、事件时间(Event Time)

  • 定义:事件时间指的是数据本身携带的时间,即数据在产生时的时间戳。
  • 特点:
    • 反映了数据实际发生的时间。
    • 需要从数据中提取时间戳,并可能需要生成watermark来处理乱序数据。
    • 在Flink SQL触发计算时,使用数据本身携带的时间。
  • 应用场景:适用于需要基于数据实际发生时间进行计算的场景,如实时日志分析、交易系统等。

二、处理时间(Processing Time)

  • 定义:处理时间指的是具体算子计算数据执行时的机器时间,即在Flink集群中处理数据的节点所在机器的本地时间。
  • 特点:
    • 是最简单的一种时间概念,不需要从数据里获取时间,也不需要生成watermark。
    • 提供了较低的时间精度和确定性,因为不同节点的处理时间可能不同。
  • 应用场景:适用于对时间精度要求不高,或者数据不需要基于事件时间进行处理的场景。
  • 定义方式:
    • 在DataStream转换时直接指定。
    • 在定义Table Schema时指定,使用.proctime后缀。
    • 在创建表的DDL中指定,使用PROCTIME()函数。

三、摄入时间(Ingestion Time)

  • 定义:摄入时间指的是数据从数据源进入Flink的时间。
  • 特点:
    • 反映了数据被Flink系统接收的时间。
    • 适用于数据源与Flink集群之间存在较大时间差的场景。
  • 应用场景:在Flink SQL中,摄入时间的使用相对较少,因为大多数场景更关注数据实际发生的时间(事件时间)或处理时间。

四、时间属性的应用

在Flink SQL中,时间属性主要用于时间窗口的计算、自定义时间语义的计算等。通过定义时间属性,可以方便地实现基于时间的聚合、过滤、连接等操作。

注意事项

  • 在一个Flink任务中,通常只会选择一个时间属性作为全局时间属性。
  • 时间属性的定义方式取决于具体的应用场景和需求。
  • 在使用事件时间时,需要注意处理乱序数据的问题,并合理设置watermark的生成策略。

Flink三种时间属性应用场景

一、事件时间(Event Time)应用场景:

  • 实时日志分析:在实时日志分析中,通常使用事件时间作为分析的基础。例如,需要统计某个时间段内的日志数量或类型,使用事件时间可以确保统计结果基于日志实际发生的时间。
  • 交易系统:在交易系统中,事件时间用于处理交易数据的实时分析。例如,计算某支股票在特定时间段内的价格波动,需要确保时间戳与交易发生的时间一致。
  • 实时推荐系统:在实时推荐系统中,用户行为数据的时间戳是事件时间。通过基于事件时间的分析,可以了解用户在不同时间段的行为模式,从而提供更加个性化的推荐。

二、处理时间(Processing Time)应用场景:

  • 非实时数据分析:对于不需要严格基于事件时间进行分析的场景,可以使用处理时间。例如,进行批处理任务时,不关心数据实际发生的时间,只关注任务开始和结束的时间。
  • 本地开发和测试:在本地开发和测试环境中,由于无法模拟真实的事件时间,可以使用处理时间进行简化处理。

三、摄入时间(Ingestion Time)应用场景:

  • 数据源与Flink集群时间差较大:当数据源与Flink集群之间存在较大的时间差时,使用摄入时间可以确保数据在Flink集群中处理的一致性。然而,在实际应用中,摄入时间的使用相对较少,因为大多数场景更关注数据实际发生的时间(事件时间)或处理时间。

SQL指定时间属性两种方式

在Flink SQL中,指定时间属性主要有两种方式,以下是对这两种方式的详细解释:

一、在创建表的DDL中指定时间属性

  1. 事件时间(Event Time):
    • 在创建表的DDL语句中,可以通过增加一个时间戳字段并使用WATERMARK语句来定义事件时间属性。
    • 事件时间列的字段类型必须是TIMESTAMP或TIMESTAMP_LTZ类型。
    • WATERMARK语句用于生成水印(watermark),以处理乱序数据。水印是一个延迟阈值,表示在该时间戳之前的所有数据都已经到达。
      示例代码:
CREATE TABLE user_actions (  user_name STRING,  data STRING,  user_action_time TIMESTAMP(3),  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND  
) WITH (...);

在这个例子中,user_action_time被声明为事件时间,并且设置了5秒的水印延迟。
2. 处理时间(Processing Time):
- 在创建表的DDL语句中,可以通过增加一个字段并使用PROCTIME()函数来定义处理时间属性。
- PROCTIME()函数是Flink SQL内置的函数,用于获取当前处理时间。
示例代码:

CREATE TABLE EventTable (  user STRING,  url STRING,  ts AS PROCTIME()  
) WITH (...);

在这个例子中,ts字段被定义为处理时间属性。

二、在DataStream转换时指定时间属性

  1. 事件时间(Event Time):
    • 在DataStream API中,可以通过assignTimestampsAndWatermarks方法来为数据流分配时间戳和水印。
    • 这种方法通常用于从外部数据源(如Kafka)读取数据时,为数据分配事件时间。
      示例代码(伪代码):
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer<MyEvent>(...))  .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(5)) {  @Override  public long extractTimestamp(MyEvent event) {  return event.getTimestamp(); // 从事件中提取时间戳  }  });

在这个例子中,使用BoundedOutOfOrdernessTimestampExtractor为数据流分配了事件时间,并设置了5秒的最大乱序时间。
2. 处理时间(Processing Time):
- 在DataStream API中,处理时间是默认的时间属性,不需要显式指定。
- 但是,如果需要在后续操作中引用处理时间,可以通过在Table API中使用.proctime后缀来访问。
示例代码(伪代码):

Table table = tableEnv.fromDataStream(stream, "user, temperature, timestamp, pt.proctime as processingTime");

在这个例子中,pt.proctime被用作处理时间属性,并在Table API中进行了访问。

需要注意的是,在实际应用中,选择哪种方式指定时间属性取决于具体的应用场景和需求。在Flink SQL中,通常更倾向于在创建表的DDL中指定时间属性,因为这样可以更直观地定义表的模式结构(schema),并且方便后续的时间相关操作。而在DataStream API中指定时间属性则更灵活,适用于需要从外部数据源读取数据并为其分配时间戳的场景。

SQL事件时间案例

以下是一个关于Flink SQL事件时间的案例,用于展示如何在Flink SQL中使用事件时间属性进行窗口聚合操作。

案例背景

假设有一个数据流,其中包含了用户的点击事件。每个事件都有一个事件时间戳,表示用户点击的时间。任务是计算每个用户在每10分钟窗口内的点击次数。

步骤一:创建数据源表

首先,需要创建一个数据源表,并声明事件时间属性。在这个例子中,假设数据源是一个Kafka主题,并且事件时间戳存储在名为eventTime的字段中。

CREATE TABLE clicks (  userId STRING,  eventTime TIMESTAMP(3),  -- 事件时间戳  WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 声明水印,用于处理乱序数据  
) WITH (  'connector' = 'kafka',  'topic' = 'clicks_topic',  'properties.bootstrap.servers' = 'localhost:9092',  'format' = 'json',  'scan.startup.mode' = 'earliest-offset'  
);

步骤二:使用窗口聚合操作

接下来,可以使用Flink SQL的窗口聚合操作来计算每个用户在每10分钟窗口内的点击次数。在这个例子中,将使用滚动窗口(TUMBLE)进行聚合。

SELECT  userId,  TUMBLE_START(eventTime, INTERVAL '10' MINUTE) AS windowStart,  -- 窗口开始时间  TUMBLE_END(eventTime, INTERVAL '10' MINUTE) AS windowEnd,      -- 窗口结束时间  COUNT(*) AS clickCount                                         -- 点击次数  
FROM  clicks  
GROUP BY  userId,  TUMBLE(eventTime, INTERVAL '10' MINUTE);

解释

  1. 创建数据源表:在创建表时,指定了eventTime字段为事件时间属性,并设置了5秒的水印延迟。这意味着Flink将等待最多5秒以处理可能到达的乱序数据。
  2. 窗口聚合操作:使用TUMBLE函数定义了一个滚动窗口,窗口大小为10分钟。然后,按用户ID和窗口进行分组,并计算每个分组中的点击次数。
  3. 结果:查询结果将包含用户ID、窗口开始时间、窗口结束时间和点击次数。

注意事项

  1. 水印:在处理事件时间时,水印是非常重要的。它们允许Flink处理乱序数据,并确保在窗口聚合时不会遗漏任何数据。
  2. 时间属性类型:事件时间列的字段类型必须是TIMESTAMP或TIMESTAMP_LTZ类型。如果数据源中的时间戳是BIGINT类型(表示毫秒或秒),则需要在创建表时将其转换为TIMESTAMP类型。
  3. 窗口类型:Flink SQL支持多种类型的窗口,如滚动窗口(TUMBLE)、滑动窗口(SLIDE)和会话窗口(SESSION)等。根据具体需求选择合适的窗口类型。

SQL处理时间案例

在Flink SQL中,处理时间(Processing Time)是指数据被具体算子处理时的系统时间。以下是一个基于处理时间的Flink SQL案例,用于展示如何使用处理时间属性进行窗口聚合操作。

案例背景

假设有一个数据流,其中包含了传感器读取的数据。每个数据都有一个读取时间戳,但这个时间戳不是事件发生时的时间,而是数据被读取到系统的时间。任务是计算每5分钟内读取的数据量。

步骤一:创建数据源表

首先,需要创建一个数据源表,并声明处理时间属性(在Flink SQL中,处理时间属性是隐式的,不需要显式声明,但可以通过特定的函数来引用)。在这个例子中,假设数据源是一个Socket流。

-- 假设有一个Socket数据源,数据格式为:id,value,timestamp(这里的timestamp是读取时间戳)  
CREATE TABLE sensor_data (  id STRING,  value DOUBLE,  timestamp BIGINT  -- 读取时间戳,单位为毫秒  
) WITH (  'connector' = 'socket',  'hostname' = 'localhost',  'port' = '9999',  'format' = 'csv'  
);

步骤二:转换时间戳并创建处理时间窗口

由于Flink SQL中的处理时间属性是隐式的,不能直接对其进行操作。但是,可以通过将读取时间戳转换为TIMESTAMP类型(尽管这不是必要的,因为处理时间窗口不需要显式的时间戳字段),然后使用Flink SQL提供的窗口函数来创建处理时间窗口。不过,在这个例子中,将直接使用处理时间窗口函数,而不进行显式的转换。

-- 使用处理时间滚动窗口计算每5分钟内的数据量  
SELECT  TUMBLE_START(PROCTIME()) AS window_start,  -- 窗口开始时间(处理时间)  TUMBLE_END(PROCTIME()) AS window_end,      -- 窗口结束时间(处理时间)  COUNT(*) AS data_count                     -- 数据量  
FROM  sensor_data  
GROUP BY  TUMBLE(PROCTIME(), INTERVAL '5' MINUTE);   -- 使用处理时间滚动窗口,窗口大小为5分钟

解释

  1. 创建数据源表:创建了一个名为sensor_data的数据源表,它接收来自Socket流的数据。数据包含id、value和timestamp字段,其中timestamp是数据被读取到系统的时间戳(以毫秒为单位)。
  2. 转换时间戳并创建窗口:在这个例子中,实际上没有显式地将timestamp字段转换为TIMESTAMP类型,因为处理时间窗口不需要这样做。相反,直接使用了PROCTIME()函数来获取处理时间,并使用TUMBLE函数创建了一个滚动窗口。窗口大小为5分钟,意味着每5分钟将计算一次窗口内的数据量。
  3. 结果:查询结果将包含窗口开始时间、窗口结束时间和窗口内的数据量。

注意事项

  1. 处理时间属性:在Flink SQL中,处理时间属性是隐式的,不需要显式声明。它表示数据被具体算子处理时的系统时间。
  2. 窗口函数:Flink SQL提供了多种窗口函数,如TUMBLE(滚动窗口)、SLIDE(滑动窗口)和SESSION(会话窗口)等。根据具体需求选择合适的窗口函数。
  3. 数据源:在这个例子中,使用了Socket作为数据源。在实际应用中,数据源可能是Kafka、文件系统或其他数据源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/54948.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025 - 中医学基础 - 考研 - 职称

2025 - 中医学基础 - 考研 - 职称 第1章 中医学导论 1.中医学的指导思想是&#xff08;&#xff09;( ) [单选] A&#xff0e;阴阳学说 B&#xff0e;五行学说 C&#xff0e;精气学说 D&#xff0e;整体观念 E&#xff0e;辨证论治 正确答案: D 2.中医学的理论核心是&…

[单master节点k8s部署]37.微服务(一)springCloud 微服务

微服务架构的一个重要特点是&#xff0c;它与开发中使用的具体编程语言或技术栈无关。每个微服务都可以使用最适合其功能需求的语言或技术来实现。例如&#xff0c;一个微服务可以用Java编写&#xff0c;另一个微服务可以用Python、Go、Node.js等编写。微服务架构允许这种灵活性…

gin如何具体利用Server-Send-Events(SSE)实时推送技术实现消息推送

目录 业务场景 解决方案 1. 轮询 2. WebSocket 3. SSE(Server-Send-Events) 代码实现 总结 业务场景 在抖音、美团等APP中&#xff0c;我们经常会遇到APP内部的消息推送&#xff0c;如关注的人的动态消息推送、点赞评论互动消息推送以及算法推荐消息推送。这些场景都是…

数据结构-C语言顺序栈功能实现

栈 栈&#xff1a;类似于一个容器&#xff0c;如我们生活中的箱子&#xff0c;我们向箱子里放东西&#xff0c;那么最先放的东西是最后才能拿出来的 代码实现 #include <stdio.h> #include <stdlib.h>#define MAX_SIZE 100typedef struct {int* base; // 栈底指针…

STM32与ESP32串口数据发送以及网页端数据实时显示和远程遥控

目标&#xff1a;实现网页端速度实时显示以及可以通过点击页面按键达到对小车的位移方位控制。 一、ESP32代码 首先&#xff0c;需要让ESP32连接到WiFi&#xff0c;这样才能为后续的操作做准备。 ssid"xxxxxx" password"xxxxxx"#WIFI连接 def wifi_conn…

题解:牛客小白月赛102(A - C)

A 序列中的排列 题意&#xff1a; 每次给你两个正整数 n , k n,k n,k &#xff0c;并给你一段长度为 n n n 的序列。&#xff08;所有输入均为小于等于100的正整数&#xff09; 问&#xff1a;原序列中是否存在子序列&#xff0c;使得这个子序列是 k k k 的排列 子序列&am…

数据仓库中的维度建模:深入理解与案例分析

数据仓库中的维度建模&#xff1a;深入理解与案例分析 维度建模是数据仓库设计中最常用的一种方法&#xff0c;旨在简化数据访问、提高查询效率&#xff0c;特别适用于需要对数据进行多维分析的场景。本文将深入探讨维度建模的核心概念、设计步骤以及如何将其应用于实际案例中…

通过PyTorch 手写数字识别 入门神经网络 详细讲解

通过PyTorch 手写数字识别 入门神经网络 数据集 MNIST数据集中有手写数字图片7万张&#xff0c;划分训练集6万张&#xff0c;划分测试集1万张。 每张图片都会有一张标签&#xff0c;也就是代表着图片的真实值&#xff08;真实含义&#xff09;。 概念 计算机是如何读取图片的…

基于Android11简单分析audio_policy_configuration.xml

开篇先贴上一个高通的例子&#xff0c;后续基于此文件做具体分析。 1 <?xml version"1.0" encoding"UTF-8" standalone"yes"?> 2 <!-- Copyright (c) 2016-2019, The Linux Foundation. All rights reserved 3 Not a Contribut…

Python保存CSV文件,Excel打开后中文乱码

情况描述 在做多语言文件处理时&#xff0c; 使用 pandas&#xff0c; 并且指定了encoding为 UTF-8&#xff0c; 在 IDE&#xff0c; Sublime等编辑器上查看都显示正常&#xff0c;使用Excel打开非英文字符&#xff0c; 例如汉字&#xff0c; 阿拉伯文&#xff0c; 希伯来文等显…

多态常见面试问题

1、什么是多态&#xff1f; 多态&#xff08;Polymorphism&#xff09;是面向对象编程中的一个重要概念&#xff0c;它允许同一个接口表现出不同的行为。在C中&#xff0c;多态性主要通过虚函数来实现&#xff0c;分为编译时多态&#xff08;静态多态&#xff09;和运行时多态…

【Spring AI】Java实现类似langchain的第三方函数调用_原理与详细示例

Spring AI 介绍 &#xff1a;简化Java AI开发的统一接口解决方案 在过去&#xff0c;使用Java开发AI应用时面临的主要困境是没有统一且标准的封装库&#xff0c;导致开发者需要针对不同的AI服务提供商分别学习和对接各自的API&#xff0c;这增加了开发难度与迁移成本。而Sprin…

【数据结构】邻接表

一、概念 邻接表是一个顺序存储与链式存储相结合的数据结构&#xff0c;用于描述一个图中所有节点之间的关系。 若是一个稠密图&#xff0c;我们可以选择使用邻接矩阵&#xff1b;但当图较稀疏时&#xff0c;邻接矩阵就显得比较浪费空间了&#xff0c;此时我们就可以换成邻接…

机器人的应用 基于5G的变电站智慧管控系统

背景概述 一、电力行业面临的挑战与变革 随着全球工业化和信息化的快速发展&#xff0c;电力行业作为国民经济的基础性行业&#xff0c;其重要性日益凸显。然而&#xff0c;随着电力网络的不断扩展和复杂化&#xff0c;变电站和开关站作为电力传输与分配的关键节点&#xff0…

6、Spring Boot 3.x集成RabbitMQ动态交换机、队列

一、前言 本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成&#xff0c;比如动态新增 RabbitMQ 交换机、队列等操作。二、默认RabbitMQ中的exchange、queue动态新增及监听 1、新增RabbitMQ配置 RabbitMQConfig.java import org.springframework.amqp.rabbit.a…

Excel中Ctrl+e的用法

重点&#xff1a;想要使用ctrle&#xff0c;前提是整合或拆分后的结果放置的单元格必须和被提取信息的单元格相邻&#xff0c;且被提取信息的单元格也必须相连。 下图为错误示例 这样则可以使用ctrle 1、信息整合 2、提取信息 3、添加符号 4、信息顺序调换 5、数字提取 crtle还…

HarmonyOS NEXT 应用开发实战(三、ArkUI页面底部导航TabBar的实现)

在开发HarmonyOS NEXT应用时&#xff0c;TabBar是用户界面设计中不可或缺的一部分。本文将通过代码示例&#xff0c;带领大家一同实现一个常用的TabBar&#xff0c;涵盖三个主要的内容页&#xff1a;首页、知乎日报和我的页面。以模仿知乎日报的项目为背景驱动&#xff0c;设定…

解决ubuntu 下 VS code 无法打开点击没反应问题

从Ubuntu 22.04 升级到ubuntu 24.04 后&#xff0c;发现Vsode无法打开&#xff0c;不论是点击图标&#xff0c;还是terminator里面运行code 可执行程序&#xff0c;均没有反应。debug如下: 提示权限不够。 解决方案&#xff1a; sudo sysctl -w kernel.apparmor_restrict_unp…

C语言题目练习2

前面我们知道了单链表的结构及其一些数据操作&#xff0c;今天我们来看看有关于单链表的题目~ 移除链表元素 移除链表元素&#xff1a; https://leetcode.cn/problems/remove-linked-list-elements/description/ 这个题目要求我们删除链表中是指定数据的结点&#xff0c;最终返…

C语言 | Leetcode C语言题解之第460题LFU缓存

题目&#xff1a; 题解&#xff1a; /* 数值链表的节点定义。 */ typedef struct ValueListNode_s {int key;int value;int counter;struct ValueListNode_s *prev;struct ValueListNode_s *next; } ValueListNode;/* 计数链表的节点定义。 其中&#xff0c;head是数值链表的头…