【入门Flink】- 08Flink时间语义和窗口概念

Flink-Windows

是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。【事件驱动,没有数据到达永远都不会创建窗口】

1)窗口分类

(1)按照驱动类型分

(1)时间窗口

时间窗口以时间点来定义窗口的开始(start)和结束(end),截取出的就是某一时间段的数据。

(2)计数窗口

计数窗口基于元素的个数截取数据,到达固定的个数时就触发计算并关闭窗口。

(2)按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

(1)滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是
“首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口

滚动窗口应用非常广泛,可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

(2)滑动窗口(Sliding Windows)

滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率

滚动窗口也可以看作是一种特殊的滑动窗口一一窗口大小等于滑动步长(size=slide)
滑动窗口适合计算结果更新频率非常高的场景。

(3)会话窗口(Session Windows)

会话窗口,是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义。
会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到
来的时间间隔(gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,
那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

会话窗口之间一定是不会重叠的,而且会留有至少为size的间隔(session)

在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计。

(4)全局窗口(Global Windows)

“全局窗口”,这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时侯, 默认是不会做触发计算的,如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

2)窗口 API

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

(1)按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。

stream.keyBy(...)
.window(...)

(2)非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。

stream.windowAll(...)

注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

(2)窗口分配器(Window Assigners)和窗口函数(WindowFunctions)

stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)

窗口分配器

(1)时间窗口

滚动处理时间窗口

stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)

.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

滑动处理时间窗口

stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

处理时间会话窗口

stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

还可以调用 withDynamicGap()方法定义 session gap 的动态提取逻辑。

滚动事件时间窗口

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)

滑动事件时间窗口

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

事件时间会话窗口

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)

(2)计数窗口

滚动计数窗口

stream.keyBy(...)
.countWindow(10)

滑动计数窗口

stream.keyBy(...)
.countWindow(10, 3)

全局窗口

stream.keyBy(...)
.window(GlobalWindows.create());

注意:使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

窗口函数

(1)增量聚合函数(ReduceFunction / AggregateFunction)

归约函数(ReduceFunction)

类似Reduce算子,只不过固定时间才会输出

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> stream = env.socketTextStream("124.222.253.33", 7777);stream.map(new WaterSensorMapFunction()).keyBy(WaterSensor::getId)// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("调用reduce 方法,之前的结果:" + value1 + ",现在来的数据:" + value2);return new WaterSensor(value1.getId(), System.currentTimeMillis(), value1.getVc() + value2.getVc());}}).print();env.execute();

聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样

image-20231109192227819

有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

输入类型IN 就是输入流中元素的数据类型;累加器类型 ACC 是进行聚合的中间状态类型;而输出类型OUT是最终计算结果的类型。

接口中有四个方法:

  • createAccumulator():创建一个累加器,为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

AggregateFunction 的工作原理:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(" 调用add方法,value=" + value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}});aggregate.print();env.execute();

(2)全窗口函数(full window functions)

基于全部的数据计算

全窗口函数有两种:WindowFunction ProcessWindowFunction

窗口函数(WindowFunction)

基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());

该类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用

处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。

时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor,String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();env.execute();

增量聚合和全窗口函数结合使用

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,WindowFunction<TRKW>function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)// AggregateFunction 与 WindowFunction 结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,
ProcessWindowFunction<VRKW>     

结合使用

public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数:/*增量聚合 Aggregate + 全窗口 process1、增量聚合函数处理数据: 来一条计算一条2、窗口触发时, 增量聚合的结果(只有一条)传递给全窗口函数3、经过全窗口函数的处理包装后,输出结合两者的优点:1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少2、全窗口函数: 可以通过 上下文 实现灵活的功能*/// sensorWS.reduce() //也可以传两个SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String> {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用 add 方法,value=" + value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用 getResult 方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用 merge 方法");return null;}}// 全窗口函数的输入类型 = 增量聚合函数的输出类型public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {@Overridepublic void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyyMM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);}}
}

Flink-Time

  • Event Time:事件时间,一个是数据产生的时间(时间戳Timestamp)
  • Processing time:处理时间,数据真正被处理的时间

image-20231108081425604

事件时间在实际应用中更为广泛,从Flink 1.12版本开始,Flink已经将事件时间作为默认的时间语义

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/141245.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【云栖2023】王峰:开源大数据平台3.0技术解读

本文根据2023云栖大会演讲实录整理而成&#xff0c;演讲信息如下&#xff1a; 演讲人&#xff1a;王峰 | 阿里云研究员&#xff0c;阿里云计算平台事业部开源大数据平台负责人 演讲主题&#xff1a;开源大数据平台3.0技术解读 实时化与Serverless是开源大数据3.0时代的必然选…

网络的相关概念介绍

客户端和服务器通常运行在不同的主机上&#xff0c;通过计算机网络的硬件和软件资源来通信。网络是个复杂的系统&#xff0c;这里我们从程序员的角度来介绍一下网络相关的概念。 对主机而言&#xff0c;网络只是一种I/O设备&#xff0c;是数据源和数据接收方。 一个插到I/O总线…

C语言--假设共有鸡、兔30只,脚90只,求鸡、兔各有多少只​

一.题目描述 假设共有鸡、兔30只&#xff0c;脚90只&#xff0c;求鸡、兔各有多少只&#xff1f; 二.思路分析 本题是一个典型的穷举法例题&#xff0c;而穷举法&#xff0c;最重要的就是条件判断。⭐⭐ 本题中的条件很容易发现&#xff1a; 假设鸡有x只&#xff0c;兔有y只…

Leetcode154. Find Minimum in Rotated Sorted Array II

旋转数组找最小&#xff0c;这次值可以重复 不妨假设你已经做了上一题&#xff0c;题解 上一题的方法1肯定是用不了了&#xff0c;因为不再能完全分成2个不同的部分 所以我们沿着方法2走 如果 > n u m s [ r ] >nums[r] >nums[r]&#xff0c;我们依然可以找右半边 …

Clickhouse学习笔记(10)—— 查询优化

单表查询 Prewhere 替代 where prewhere与where相比&#xff0c;在过滤数据的时候会首先读取指定的列数据&#xff0c;来判断数据过滤&#xff0c;等待数据过滤之后再读取 select 声明的列字段来补全其余属性 简单来说就是先过滤再查询&#xff0c;而where过滤是先查询出对应…

matlab 小波自适应阈值去噪

1、内容简介 略 12-可以交流、咨询、答疑 小波自适应阈值去噪 2、内容说明 小波自适应阈值一维信号去噪&#xff0c;也包含软阈值和硬阈值 硬阈值、软阈值、自适应阈值 3、仿真分析 略 4、参考论文 略 链接&#xff1a;https://pan.baidu.com/s/1yQ1yDfk-_Qnq7tGpa23L…

JOSEF约瑟 反时限过流继电器JGL-115板前接线5A速断保护

系列型号 JGL-111反时限过流继电器&#xff1b;JGL-112反时限过流继电器&#xff1b; JGL-113反时限过流继电器&#xff1b;JGL-114反时限过流继电器&#xff1b; JGL-115反时限过流继电器&#xff1b;JGL-116反时限过流继电器&#xff1b; JGL-117反时限过流继电器&#xff1b…

Leetcode—69.x的平方根【简单】

2023每日刷题&#xff08;二十七&#xff09; Leetcode—69.x的平方根 直接法实现代码 int mySqrt(int x) {long long i 0;while(i * i < x) {i;}if(i * i > x) {return i - 1;}return i; }运行结果 二分法实现代码 int mySqrt(int x) {long long left 0, right (l…

Apache DolphinScheduler如何完全设置东八区?

默认情况 为了兼容全世界不同时区&#xff0c;Apache DolphinScheduler 使用的是 UTC 0 时区&#xff0c;包括保存到数据库表中的数据时区&#xff0c;以及展示到页面上的时区。 如果我们想在页面上看到东八区时间&#xff0c;则需要在页面上手动选择上海时区&#xff0c;如下…

[Hive] INSERT OVERWRITE DIRECTORY要注意的问题

在使用Hive的INSERT OVERWRITE语句时&#xff0c;需要注意以下问题&#xff1a; 数据覆盖&#xff1a;INSERT OVERWRITE语句会覆盖目标目录中的数据。因此&#xff0c;在执行该语句之前&#xff0c;请确保目标目录为空或者你希望覆盖的数据已经不再需要。数据格式&#xff1a;…

Android Glide transform圆形图CircleCrop动态代码描边绘制外框线并rotateImage旋转,Kotlin

Android Glide transform圆形图CircleCrop动态代码描边绘制外框线并rotateImage旋转&#xff0c;Kotlin <?xml version"1.0" encoding"utf-8"?> <FrameLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:app&q…

JVM及其垃圾回收机制(GC)

目录 一.JVM内存区域划分 二.JVM类加载机制 类加载过程 类加载的时机 双亲委派模型 三.JVM垃圾回收机制&#xff08;GC) GC工作过程 1.找到垃圾/判断垃圾 &#xff08;1&#xff09;引用计数【python/PHP】 &#xff08;2&#xff09;可达性分析【Java】 2.对象释放…

Juniper PPPOE双线路冗余RPM配置

------------------ 浮动静态路由 set routing-options static route 0.0.0.0/0 next-hop pp0.0 qualified-next-hop pp0.1 preference 10 ----------------- RPM测试的内容,包括从哪个接口发起测试,测试ping等等 #指定探针类型用ICMP请求 #探测的目标地址 #探测间隔 #探测阈…

ElasticSearch中常见的分词器介绍

文章目录 ElasticSearch中常见的分词器介绍前言分词器的作用如何指定分词器分词器的组成分词器的类型标准分词器空格分词器简单分词器关键词分词器停用词分词器IK分词器NGram分词器正则匹配分词器语言分词器自定义分词器 ElasticSearch中常见的分词器介绍 前言 ElasticSearch是…

如何利用黑群晖虚拟机和内网穿透实现公网远程访问

文章目录 前言本教程解决的问题是&#xff1a;按照本教程方法操作后&#xff0c;达到的效果是前排提醒&#xff1a; 1. 搭建群晖虚拟机1.1 下载黑群晖文件vmvare虚拟机安装包1.2 安装VMware虚拟机&#xff1a;1.3 解压黑群晖虚拟机文件1.4 虚拟机初始化1.5 没有搜索到黑群晖的解…

Linux系统上搭建高可用Kafka集群(使用自带的zookeeper)

本次在CentOS7.6上搭建Kafka集群 Apache Kafka 是一个高吞吐量的分布式消息系统&#xff0c;被广泛应用于大规模数据处理和实时数据管道中。本文将介绍在CentOS操作系统上搭建Kafka集群的过程&#xff0c;以便于构建可靠的消息处理平台。 文件分享&#xff08;KafkaUI、kafka…

No193.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

ctfshow sql171-179

mysql 先打开我们本地的mysql&#xff0c;可以看到这些数据库 information_schema information_schema 库: 是信息数据库&#xff0c;其中保存着关于MySQL服务器所维护的所有其他数据库的信息比如数据库名&#xff0c;数据库表&#xff0c; SCHEMATA表: 提供了当前MySQL实例…

Golang 字符串处理汇总

1. 统计字符串长度&#xff1a;len(str) len(str) 函数用于统计字符串的长度&#xff0c;按字节进行统计&#xff0c;且该函数属于内置函数也不用导包&#xff0c;直接用就行&#xff0c;示例如下&#xff1a; //统计字符串的长度,按字节进行统计: str : "golang你好&qu…

​软考-高级-系统架构设计师教程(清华第2版)【第4章 信息安全技术基础知识(P160~189)-思维导图】​

软考-高级-系统架构设计师教程&#xff08;清华第2版&#xff09;【第4章 信息安全技术基础知识&#xff08;P160~189&#xff09;-思维导图】 课本里章节里所有蓝色字体的思维导图