Flink中的Time与Window

一、Time

在Flink的流式处理中,会涉及到时间的不同概念

Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳

Ingestion Time:是数据进入Flink的时间

Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

例如一条日志进入Flink的时间为2017-11-12 10:00:00.123 到达window的系统时间为 2017-11-12 10:00:01.234,日志内容如下:

2017-11-02 18:37:15.624 INFO Fair over to rm2

对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?----- eventTime,因为我们要根据日志的生成时间进行统计。

  

如果要想聚合,不可能对无解数据流进行聚合。

 

二、Window

1、streaming流式计算是一种被设计用于处理处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的"buckets"桶,我们可以在这些桶上做计算操作。

共有两类,五种时间窗口。

2、Window类型(两类)

2.1、CountWindow:按照指定的数据条数生成一个window,与时间无关

2.2、TimeWindow:按照时间生成window。(按照Processing Time来划分Window)

对于TimeWindow和CountWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

(1)滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切分。

特点:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。

(2)滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。

因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

使用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警。)

(3)会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成。类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。

session 窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的

时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭。一个Session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃

周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

 

三、Window API

3.1、CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。

注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的所有元素的总数。

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}/*** CountWindow 中的滚动窗口(Tumbling Windows)* 将数据依据固定的窗口长度对数据进行切分。*/
object TimeAndWindow {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost",11111)val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0)//注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的所有元素的总数。val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5).reduce((item1, item2)=>(item1._1,item1._2+item2._2))streamWindow.print()env.execute("TimeAndWindow")}
}

3.2

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}/*** CountWindow 中的滑动窗口(Sliding Windows)* 将数据依据固定的窗口长度对数据进行切分。*/
object TimeAndWindow {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost",11111)val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0)//注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的所有元素的总数。//满足步长,就执行一次,按第一个参数的长度val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5,2).reduce((item1, item2)=>(item1._1,item1._2+item2._2))streamWindow.print()env.execute("TimeAndWindow")}
}

四、EventTime与Window

1、EventTime的引入

在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那么需要引入EventTime的时间戳,引入方式如下所示:

2、Watermark

  概念:我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的

事件戳顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的EventTime顺序排列的。

  Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的Watermark。

  Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。

  数据流中的Watermark用于表示eventTime小于Watermark的数量,都已经到达了,因此,window的执行也是由Watermark触发的。

  Watermark可以理解成一个延迟触发机制。我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime 小于

maxEventTime-t 的所有数据都已经到达。如果有窗口的停止时间等于maxEventTime-t,那么这个窗口被触发执行。

滚动窗口/滑动窗口/会话窗口

 
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**
* TimeWindow
*/
object EventTimeAndWindow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//开启watermark
//从调用时刻开始给env创建的每一个stream追加时间特征。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: KeyedStream[(String, Long), Tuple] = env.socketTextStream("192.168.218.130", 1111).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {
override def extractTimestamp(element: String): Long = {
// event word eventTime是日志生成时间,我们从日志中解析EventTime
val eventTime = element.split(" ")(0).toLong
println(eventTime)
eventTime
}
}
).map(item => (item.split(" ")(1),1L)).keyBy(0)
//加上滚动窗口,窗口大小是5s,调用window的api
// val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//滑动窗口
// val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
//会话窗口
val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
val streamReduce = streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))
streamReduce.print()

env.execute("EventTimeAndWindow")
}
}

 

转载于:https://www.cnblogs.com/ssqq5200936/p/11014296.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/462316.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电脑方面的技巧

快速启动程序     很多朋友发现,在“运行”窗口中可以直接运行Ping、Telnet等系统自带的命令,可是运行Winword.exe、QQ.exe等程序时却出错。其实这主要没有定义系统变量。      打开系统属性窗口,切换到“高级”选项卡,单…

在linux下编译boost库【搜集】

http://www.cnblogs.com/flywuya/archive/2010/11/30/1892483.html 编译环境 操作系统:SUSE linux Enterprise Server 10 64-bit 编译工具:gcc 4.1.2 1.下载boost1.36 2.解压boost到/usr/share 3.在命令行运行/usr/share/boost_1_36_0/tools/jam/src/build.sh生成bjam 4.复制/u…

postgresql安装配置

postgresql安装配置 一,什么是postgresql PostgreSQL是以加州大学伯克利分校计算机系开发的 POSTGRES 版本 4.2 为基础的对象关系型数据库管理系统(ORDBMS),简称pgsql,它支持大部分 SQL 标准并且提供了许多其他现代特性:复杂查询 外键 触发器…

Dart中的mixins

/* mixins的中文意思是混入,就是在类中混入其他功能。在Dart中可以使用mixins实现类似多继承的功能,with关键字因为mixins使用的条件,随着Dart版本一直在变,这里讲的是Dart2.x中使用mixins的条件:1、作为mixins的类只能…

[转]Messenger:使用消息的跨进程通信

本文转自:http://xwangly.iteye.com/blog/1109424 Messenger:信使 官方文档解释:它引用了一个Handler对象,以便others能够向它发送消息(使用mMessenger.send(Message msg)方法)。该类允许跨进程间基于Message的通信(即两个进程间可以通过Mess…

常用WebService一览表

天气预报Web服务,数据来源于中国气象局 Endpoint :http://www.webxml.com.cn/WebServices/WeatherWebService.asmx Disco :http://www.webxml.com.cn/WebServices/WeatherWebService.asmx?disco WSDL :http://www.webxml.com.cn/WebServices/WeatherWeb…

Ubuntu 10.10 下配置Telnet服务器

首先说明我的系统环境: Ubuntu10.10 x86 Desktop 在这个系统上默认只安装了telnet(也就是client) rootwww.linuxidc.com:~# dpkg -s telnet Package: telnet Status: install ok installed Priority: standard Section: net Installed-Size: …

python 编程模型

数据模型(译) image.png1 对象(object)、类型(type)和值(value) python中所有的数据都是通过对象(object)或者对象之间的关系来表示 每个对象(obj…

MySQL实战练习

在测试Django的数据库同步时,经常会出现需要Drop掉MySql数据库表情况。单独开发了一个MySQL删除脚本。 其中涉及到动态语法与MySQL的数据字典概念。 CREATE DEFINER Productlocalhost PROCEDURE ClearTables( IN Confirm BOOLEAN ) DETERMINISTIC MODIFIES SQL DAT…

R中统计假设检验总结(一)

先PS一个:考虑到这次的题目本身的特点 尝试下把说明性内容都直接作为备注写在语句中 另外用于说明的部分例子参考了我的教授Guy Yollin在Financial Data Analysis and Modeling with R这门课课件上的例子 部分参考了相关package的帮助文档中的例子 下面正题- 戌 >…

linux中如何快速进入某个目录

http://blog.csdn.net/qinglu000/article/details/17247283 1. 如果偶尔进的话可用tab键加速目录输入速度。 2. 如果经常进某个目录可设置环境变量例如$DIRPATH/usr/bin,通过cd $DIRPATH的方式进入。 3. 更简单可以通过alias 命令注册一个快捷命令如alias uucd /us…

事务的四大特性和隔离级别

1.事务的四大特性(ACID):指数据库事务正确执行的四个基本要素的缩写。包含:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability&#xff09…

改造MUC实现Openfire群

我的Openfire群实现思路: 1、群和群成员,要保存到表中。 2、拉取群列表和群成员列表,均从DB中查询返回。 3、抛弃老外的“进房间,要发Presence ”。只要此人一上线,就模似一个Presence进行joinRoom,进入他的…

如何在Windows环境下的VS中安装使用Google Protobuf完成SOCKET通信

http://blog.csdn.net/whuancai/article/details/11994341 如何在Windows环境下的VS中安装使用Google Protobuf完成SOCKET通信 原文出自:http://blog.csdn.net/monkey_d_meng/article/details/5894910 尊重作者:MONKEY_D_MENG 最近一段时间,由…

14 Scroll 滚动搜索

Scroll的用法:第一次搜的时候,要指定 快照保留时间1min,分页的大小:2条/页;对于第一次搜索,ES会返回一个这个scroll的id;下次再搜的时候,就带着这个scrollid去搜就行了,不…

解决IE6透明PNG图片的代码

使用方法1.下载DD_belatedPNG.js文件&#xff08;官方网站&#xff1a;http://www.dillerdesign.com/experiment/DD_belatedPNG/&#xff09;. 2.在网页中head区引用,如下:<!--[if IE 6]><script src"DD_belatedPNG.js" mce_src"DD_belatedPNG.js"…

LVS+Keepalived负载均衡方式总结

1、负载均衡器、服务器集群直连方式&#xff08;lb_kind DR&#xff09;http://network.51cto.com/art/201006/206831.htmhttp://www.myhack58.com/Article/sort099/sort0102/2012/35640_4.htm负载均衡器最简洁安装可以直接#yum ipvsadm keepalived根据51cto.com网站知道要分别…

VS2008中使用JSONCPP方法小结

http://sourceforge.net/projects/jsoncpp/?sourcetyp_redirect C要使用JSON来解析数据&#xff0c;一般采用jsoncpp. 下载jsoncpp后&#xff0c;按ReadMe文档的说法是要先安装的&#xff0c;但是安装比较麻烦。然而事实上&#xff0c;我们并不需要安装&#xff0c;就可以直接…

SQL条件语句(IF, CASE WHEN, IF NULL)

1.IF 表达式&#xff1a;IF( expr1 , expr2 , expr3 ) expr1条件&#xff0c;条件为true&#xff0c;则值是expr2 &#xff0c;false&#xff0c;值就是expr3 SELECT o.id,u.account,catagory.name,orderTime,detail.amount,periodtime,if(direction0,看涨,看跌) directionNam…

Forensic Challenge 9 - Mobile Malware

使用智能手机的人越来越多&#xff0c;对智能手机的安全研究也慢慢开始成为重点。The honey project有一个Mobile malware的挑战赛&#xff0c; 带你走进Mobile Malwares的世界。参见&#xff1a; Forensic Challenge 9 - "Mobile Malware" 样本&#xff1a; fc9fil…