Flink的Window

1 Window概述

    streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

    Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作

    注意:window一般在keyBy(KeyedStram)后。如果实在DataStream后的话是windowAll(不建议使用,会将所有数据汇总到一个分区计算)

    window assigner确定了数据属于哪个窗口丢到正确的桶里面,还没有做计算。真正做计算是在window assigner后面的window function。下面两步和起来才是一个完整的窗口操作

    .window(<window assigner>).aggregate(new AverageAggregate)

2 Window的类型

   Window可以分成两类:TimeWindow(按照时间生成Window)和CountWindow(按照指定的数据条数生成一个Window,与时间无关)。

2.1 TimeWindow

   对于TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

2.1.1 Tumbling Window

   将数据依据固定的窗口长度对数据进行切片,滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。特点:时间对齐,窗口长度固定,没有重叠。适合做BI统计等(做每个时间段的聚合计算)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DzbJcNJd-1595865245961)(C:\资料\flink\笔记\4 Flink的Window\assets\1595692592803.png)]

2.1.2 Sliding Window

   滑动窗口由固定的窗口长度和滑动间隔组成。滑动窗口分配器将元素分配到固定长度的窗口中,如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。特点:时间对齐,窗口长度固定,可以有重叠。

在这里插入图片描述

2.1.3 Session Window

   由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。特点:时间无对齐。

在这里插入图片描述

3 WindowAPI

3.0 窗口分配器

  窗口分配器 即 window() 方法,我们可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口。

  window() 方法接收的输入参数是一个 WindowAssigner(窗口分配器),WindowAssigner 负责将每条输入的数据分发到正确的 window 中。Flink 提供了通用的 WindowAssigner:①滚动时间窗口( .timeWindow(Time.seconds(5)))②滑动时间窗口(.timeWindow(Time.seconds(15), Time.seconds(5)))③会话窗口( .window(EventTimeSessionWindows.withGap(Time.minutes(1)))④滚动计数窗口(.countWindow(5))⑤滑动计数窗口(.countWindow(20,10))

3.1 TimeWindow

   TimeWindow将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算。默认的时间窗口根据Processing Time 进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。

   (1)滚动窗口

   时间间隔参数可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等指定。

val timeWindowStream = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))

   (2)滑动窗口

   在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。每5s就计算输出结果一次,每一次计算的window范围是1分钟内的所有元素如下:

val timeWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15), Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

   或者

val timeWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).window(SlidingEventTimeWindows.of(Time.minutes(1),Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))

3.2 CountWindow

   CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。

   注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。

   (1)滚动窗口

   指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

val countWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).countWindow(5).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))

   (2)滑动窗口

   在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。每当某一个key的个数达到10的时候,触发计算,计算最近该key最近20个元素的内容如下

val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0).countWindow(20,10).sum(1)

3.3 window function

   window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:①增量聚合函数:每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。②全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction就是一个全窗口函数

   (1)ReduceFunction

   下面的示例的展示了如何将递增的ReduceFunction与ProcessWindowFunction结合使用,以返回窗口中的最小事件以及窗口的开始时间。

val input: DataStream[SensorReading] = ...input.keyBy(<key selector>).timeWindow(<duration>).reduce((r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },( key: String,context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,minReadings: Iterable[SensorReading],out: Collector[(Long, SensorReading)] ) =>{val min = minReadings.iterator.next()out.collect((context.window.getStart, min))})

   (2)AggregateFunction

   下面的示例展示了如何将递增的AggregateFunction与ProcessWindowFunction结合起来计算平均值,并同时发出键和窗口以及平均值。

val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).timeWindow(<duration>).aggregate(new AverageAggregate(), new MyProcessWindowFunction())// Function definitions/*** The accumulator is used to keep a running sum and a count. The [getResult] method* computes the average.*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {override def createAccumulator() = (0L, 0L)override def add(value: (String, Long), accumulator: (Long, Long)) =(accumulator._1 + value._2, accumulator._2 + 1L)override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2override def merge(a: (Long, Long), b: (Long, Long)) =(a._1 + b._1, a._2 + b._2)
}class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {val average = averages.iterator.next()out.collect((key, average))}
}

   (3)FoldFunction

   下面的示例显示如何将递增的FoldFunction与ProcessWindowFunction结合使用,以提取窗口中的事件数量,并返回窗口的键和结束时间。

val input: DataStream[SensorReading] = ...input.keyBy(<key selector>).timeWindow(<duration>).fold (("", 0L, 0),(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },( key: String,window: TimeWindow,counts: Iterable[(String, Long, Int)],out: Collector[(String, Long, Int)] ) =>{val count = counts.iterator.next()out.collect((key, window.getEnd, count._3))})

   (4)ProcessWindowFunction

   除了访问键控状态(任何富函数都可以),ProcessWindowFunction还可以使用范围限定在函数当前处理的窗口的键控状态。

val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction())

3.4 trigger

   触发器trigger定义 window 什么时候关闭,触发计算并输出结果。触发器确定窗口(由窗口分配者形成)何时准备好由窗口函数处理。每个WindowAssigner都有一个默认的触发器。如果默认触发器不符合需要,可以自定义触发器。

   触发器接口有五个方法,允许触发器对不同的事件作出反应:

   ①onElement()方法对添加到窗口的每个元素调用。

   ②当一个已注册的事件时间计时器触发时,将调用onEventTime()方法。

   ③当触发注册的处理时间计时器时,将调用onProcessingTime()方法。

   ④onMerge()方法与有状态触发器相关,当两个触发器对应的窗口合并时,将它们的状态合并起来,例如在使用会话窗口时。

   ⑤clear()方法在删除相应窗口时执行所需的任何操作。

   关于上述方法,有两点需要注意:

   ①前三个函数通过返回一个TriggerResult来决定如何处理它们的调用事件。动作可以是下列动作之一:CONTINUE:什么也不做;FIRE:触发计算;PURGE:清除窗口中的元素;FIRE_AND_PURGE:触发计算并随后清除窗口中的元素。

   ②这些方法中的任何一个都可以用于为将来的操作注册处理或事件时间计时器。

3.5 evitor

   移除器evitor可以窗口触发前或触发后,定义移除某些数据的逻辑。一般和global window一起用,要自定义trigger和evitor,因为把所有的数据都存下来了,不用的数据丢弃。evitor接口有2个方法如下

/*** Optionally evicts elements. Called before windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** Optionally evicts elements. Called after windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

  evictBefore()包含要在窗口函数之前应用的驱逐逻辑,而evictAfter()包含要在窗口函数之后应用的驱逐逻辑。

   Flink提供了三个预先实现的驱逐器:①CountEvictor:保持窗口中元素的用户指定数量,并丢弃从窗口缓冲区开始的剩余元素。②DeltaEvictor:获取delta函数和阈值,计算窗口缓冲区中最后一个元素与每个剩余元素之间的增量,并删除增量大于或等于阈值的元素。③TimeEvictor:以毫秒为单位的时间间隔作为参数,对于给定的窗口,它在元素中查找最大时间戳max_ts,并删除所有时间戳小于max_ts - interval的元素。

3.6 allowedLateness

   允许处理迟到的数据,分布式计算数据可能是乱序的,开了时间窗口之后,可能属于他的数据姗姗来迟。假设正在是10点关闭窗口,允许1分钟的迟到数据,到10点不关但是要触发一次计算输出一个计算结果,后面一分钟再来的数据可以在这个基础上在做叠加触发一次计算再输出一个结果。也就是先输出结果后面更新

   注意:这些处理迟到数据的必须在数据自带的时间处理才有意义

val input: DataStream[T] = ...input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).<windowed transformation>(<window function>)

3.7 sideOutputLateData和getSideOutput

   sideOutputLateData将迟到的数据放入侧输出流,getSideOutput获取侧输出流

val lateOutputTag = OutputTag[T]("late-data")val input: DataStream[T] = ...val result = input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).sideOutputLateData(lateOutputTag).<windowed transformation>(<window function>)val lateStream = result.getSideOutput(lateOutputTag)

3.8 window API 总览

Keyed Windowsstream.keyBy(...)               <-  keyed versus non-keyed windows.window(...)              <-  required: "assigner"[.trigger(...)]            <-  optional: "trigger" (else default trigger)[.evictor(...)]            <-  optional: "evictor" (else no evictor)[.allowedLateness(...)]    <-  optional: "lateness" (else zero)[.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply()      <-  required: "function"[.getSideOutput(...)]      <-  optional: "output tag"
Non-Keyed Windowsstream.windowAll(...)           <-  required: "assigner"[.trigger(...)]            <-  optional: "trigger" (else default trigger)[.evictor(...)]            <-  optional: "evictor" (else no evictor)[.allowedLateness(...)]    <-  optional: "lateness" (else zero)[.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply()      <-  required: "function"[.getSideOutput(...)]      <-  optional: "output tag"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473566.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

标记语言Markdown介绍以及日常使用

Markdown介绍 Markdown是一种文本标记语言&#xff0c;用于快速文档排版Markdown文件为纯文本文件&#xff0c;后缀名为 .mdMarkdown介于Word和HTML之间 比起Word&#xff0c;Markdown是纯文本&#xff0c;排版文档轻量、方便、快速。比起HTML&#xff0c;Markdown简单直观&…

天池 在线编程 有效的字符串

文章目录1. 题目2. 解题1. 题目 描述 如果字符串的所有字符出现的次数相同&#xff0c;则认为该字符串是有效的。 如果我们可以在字符串的某1个索引处删除1个字符&#xff0c;并且其余字符出现的次数相同&#xff0c;那么它也是有效的。 给定一个字符串s&#xff0c;判断它是否…

Flink的时间语义和Watermark

1 时间语义 数据迟到的概念是&#xff1a;数据先产生&#xff0c;但是处理的时候滞后了 在Flink的流式处理中&#xff0c;会涉及到时间的不同概念&#xff0c;如下图所示&#xff1a; Event Time&#xff1a;是事件创建的时间。它通常由事件中的时间戳描述&#xff0c;例如采集…

数据分析案例:亚洲国家人口数据计算

数据截图: 数据下载地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1dGHwAC5 密码&#xff1a;nfd2 该数据包含了2006年-2015年10年间亚洲地区人口数量数据&#xff0c;共10行50列数据。我们需要使用Numpy完成如下数据任务: 计算2015年各个国家人口数据计算朝鲜历…

LeetCode 1646. 获取生成数组中的最大值

文章目录1. 题目2. 解题1. 题目 给你一个整数 n 。按下述规则生成一个长度为 n 1 的数组 nums &#xff1a; nums[0] 0nums[1] 1当 2 < 2 * i < n 时&#xff0c;nums[2 * i] nums[i]当 2 < 2 * i 1 < n 时&#xff0c;nums[2 * i 1] nums[i] nums[i 1]…

TotoiseSVN的基本使用方法

一、签入源代码到SVN服务器 假如我们使用Visual Studio在文件夹StartKit中创建了一个项目&#xff0c;我们要把这个项目的源代码签入到SVN Server上的代码库中里&#xff0c;首先右键点击StartKit文件夹&#xff0c;这时候的右键菜单如下图所示&#xff1a; 图2-2-1 点击Import…

LeetCode 1647. 字符频次唯一的最小删除次数(贪心)

文章目录1. 题目2. 解题1. 题目 如果字符串 s 中 不存在 两个不同字符 频次 相同的情况&#xff0c;就称 s 是 优质字符串 。 给你一个字符串 s&#xff0c;返回使 s 成为 优质字符串 需要删除的 最小 字符数。 字符串中字符的 频次 是该字符在字符串中的出现次数。 例如&am…

Flink中的状态管理

1 Flink中的状态 当数据流中的许多操作只查看一个每次事件(如事件解析器)&#xff0c;一些操作会跨多个事件的信息(如窗口操作)。这些操作称为有状态。状态由一个任务维护&#xff0c;并且用来计算某个结果的所有数据&#xff0c;都属于这个任务的状态。可以简单的任务状态就是…

Python之日志处理(logging模块)

主要内容 日志相关概念logging模块简介使用logging提供的模块级别的函数记录日志logging模块日志流处理流程使用logging四大组件记录日志配置logging的几种方式向日志输出中添加上下文信息参考文档 一、日志相关概念 日志是一种可以追踪某些软件运行时所发生事件的方法。软件开…

LeetCode 514. 自由之路(记忆化递归 / DP)

文章目录1. 题目2. 解题1. 题目 电子游戏“辐射4”中&#xff0c;任务“通向自由”要求玩家到达名为“Freedom Trail Ring”的金属表盘&#xff0c;并使用表盘拼写特定关键词才能开门。 给定一个字符串 ring&#xff0c;表示刻在外环上的编码&#xff1b;给定另一个字符串 ke…

Flink中的容错机制

1 checkpoint Flink 故障恢复机制的核心&#xff0c;就是应用状态的一致性检查点checkpoint。 在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint&#xff0c;处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下…

LeetCode 698. 划分为k个相等的子集(回溯)

文章目录1. 题目2. 解题1. 题目 给定一个整数数组 nums 和一个正整数 k&#xff0c;找出是否有可能把这个数组分成 k 个非空子集&#xff0c;其总和都相等。 示例 1&#xff1a; 输入&#xff1a; nums [4, 3, 2, 3, 5, 2, 1], k 4 输出&#xff1a; True 说明&#xff1a;…

MySQL中的表中增加删除字段

1增加两个字段&#xff1a; mysql> create table id_name(id int,name varchar(20)); Query OK, 0 rows affected (0.13 sec)mysql> alter table id_name add age int,add address varchar(11); Query OK, 0 rows affected (0.13 sec) Records: 0 Duplicates: 0 Warnin…

Ubuntu下svn 版本管理客户端工具及常用方法

Ubuntu16.04系统下安装RapidSVN版本控制器及配置diff,editor,merge和exploer工具&#xff0c;在Window下我们使用TortoiseSVN(小乌龟)&#xff0c;可以很方便地进行查看、比较、更新、提交、回滚等SVN版本控制操作。 在Linux下我们可以使用RapidSVN。RapidSVN是一款轻量级的免费…

Flink的Table API 与SQL的流处理

1 流处理与SQL的区别 Table API和SQL&#xff0c;本质上还是基于关系型表的操作方式&#xff1b;而关系型表、SQL本身&#xff0c;一般是有界的&#xff0c;更适合批处理的场景。所以在流处理的过程中&#xff0c;有一些特殊概念。 SQL流处理处理对象字段元组的有界集合字段元…

LeetCode 833. 字符串中的查找与替换(排序,replace)

文章目录1. 题目2. 解题1. 题目 某个字符串 S 需要执行一些替换操作&#xff0c;用新的字母组替换原有的字母组&#xff08;不一定大小相同&#xff09;。 每个替换操作具有 3 个参数&#xff1a;起始索引 i&#xff0c;源字 x 和目标字 y。 规则是&#xff1a;如果 x 从原始…

Json.NET

我前面的一篇博客 Metro应用Json数据处理 介绍了如何使用 DataContractJsonSerializer 类将对象的实例序列化为JSON字符串以及将JSON字符串反序列化为对象的实例的处理方式。而此种处理方式的一个很大的缺点就是要求JSON字符串格式是约定好的&#xff0c;而在很多情况下我们无法…

天池 在线编程 最大得分(DP)

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744275 2. 解题 class Solution { public:/*** param matrix: the matrix* return: the maximum score you can get*/int maximumScore(vector<vector<i…

天池 在线编程 LR String

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744276 2. 解题 class Solution { public:/*** param s: a string* param t: a string* param n: max times to swap a l and a r.* return: return if s can …

天池 在线编程 音乐组合

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744274 2. 解题 对60求余后&#xff0c;0, 30的为 Cn2C_n^2Cn2​&#xff0c;其余的相加等于60的&#xff0c;种类相乘 class Solution { public:/*** param …