flink 不设置水印_区分理解Flink水印延迟与窗口允许延迟的概念

link 在开窗处理事件时间(Event Time) 数据时,可设置水印延迟以及设置窗口允许延迟(allowedLateness)以保证数据的完整性。这两者因都是设置延迟时间所以刚接触时容易混淆。本文接下将展开讨论分析“水印延迟”与“窗口允许延迟”概念及区别。

水印延迟(WaterMark)

(1) 水印

由于采用了事件时间,脱离了物理挂钟。窗口不知道什么时候需要关闭并进行计算,这个时候需要借助水印来解决该问题。当窗口遇到水位标识时就默认是窗口时间段内的数据都到齐了,可以触发窗口计算。

(2) 水印延迟

设置水印延迟时间的目的是让水印延迟到达,从而可以解决乱序问题。通过水印延迟到达让在延迟时间范围内到达的迟到数据可以加入到窗口计算中,保证了数据的完整性。当水印到达后就会触发窗口计算,在水印之后到达的迟到数据则会被丢弃。

窗口允许延迟(allowedLateness)

使用 StreamAPI 时,在进行开窗后可设置 allowedLateness 窗口延迟。官网中对其解释如下:

默认情况下,当水印到达窗口末端时,迟到元素将会被删除。但Flink允许为window operators指定允许的最大延迟。允许延迟指定元素在被删除之前延迟的时间,默认值为0。当元素在水印经过窗口末端后到达,且它的到达时间在窗口末端加上运行延迟的时间之内,其仍会被添加到窗口中。根据所使用的触发器,延迟但未被丢弃的元素可能会再次触发窗口计算。EventTimeTrigger就是这种情况。为了做到这一点,Flink保持窗口的状态,直到它们允许的延迟到期。一旦发生这种情况,Flink将删除窗口并删除其状态,正如窗口生命周期部分中所描述的那样。

简单理解:通常在水印到达之后迟到数据将会被删除,而窗口的延迟则是指数据在被删除之前的允许保留时间。也就是说,在水印达到之后迟到数据本该被删除,但是如果设置了窗口延迟,那么在水印之后到窗口延迟时间段内到达的迟到数据还是会被加入到窗口计算中,并再次触发窗口计算。

一个Demo 两个猜想

下面我用一个 Demo 和两个猜想来帮助大家加深理解这两个概念。

例子:接收 Kafka 数据,数据为 JSON 格式如:{"word":"a","count":1,"time":1604286564}。我们开一个 5 秒的 tumbling windows 滚动窗口,以 word 作为 key 在窗口内对 count 值进行累加。同时设置水印延迟 2 秒,窗口延迟 2 秒。代码如下:

public class MyExample {

public static void main(String[] args) throws Exception {

// 创建环境

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 设置时间特性为

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 水印策略,其需要注入Timestamp Assigner(描述了如何访问事件时间戳)和 Watermark Generator (事件流显示的超出正常范围的程度)

WatermarkStrategywatermarkStrategy=WatermarkStrategy

// forBoundedOutOfOrderness 属于(periodic周期性),周期生成器通常通过onEvent()观察传入的事件,然后在框架调用onPeriodicEmit()时发出水印。

.forBoundedOutOfOrderness(Duration.ofSeconds(2))

.withTimestampAssigner(new SerializableTimestampAssigner() {

@Override

public long extractTimestamp(WC wc, long l) {

return wc.getEventTime() * 1000;

}

});

// Kafka 配置

Properties properties=newProperties();

properties.setProperty("bootstrap.servers", "Kafka地址:9092");

properties.setProperty("group.id", "test");

// Flink 需要知道如何转换Kafka消息为Java对象(反序列化),默认提供了 KafkaDeserializationSchema(序列化需要自己编写)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchema

env.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())

// map 构建 WC 对象

.map(new MapFunction() {

@Override

public WC map(ObjectNode jsonNode) throws Exception {

JsonNode valueNode=jsonNode.get("value");

WC wc=newWC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());

return wc;

}

})

// 设定水印策略

.assignTimestampsAndWatermarks(watermarkStrategy)

.keyBy(WC::getWord)

// 窗口设置,这里设置为滚动窗口

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

// 设置窗口延迟

.allowedLateness(Time.seconds(2))

.reduce(new ReduceFunction() {

@Override

public WC reduce(WC wc, WC t1) throws Exception {

return new WC(wc.getWord(), wc.getCount() + t1.getCount());

}

})

.print();

env.execute();

}

static class WC {

public String word;

public int count;

public long eventTime;

public long getEventTime() {

return eventTime;

}

public void setEventTime(long eventTime) {

this.eventTime= eventTime;

}

public String getWord() {

return word;

}

public void setWord(String word) {

this.word= word;

}

public int getCount() {

return count;

}

public void setCount(int count) {

this.count= count;

}

public WC(String word, int count) {

this.word= word;

this.count= count;

}

public WC(String word, int count,long eventTime) {

this.word= word;

this.count= count;

this.eventTime= eventTime;

}

@Override

public String toString() {

return "WC{" +

"word='" + word + '\'' +

", count=" + count +

'}';

}

}

}

猜想1:

水印延迟 2s 达到,所以会在第 5 + 2 = 7s 时认为 [ 0 ,5 ) 窗口的数据全部到齐,并触发窗口计算。

// 往 Kafka 中写入数据

{"word":"a","count":1,"time":1604286560}   //2020-11-02 11:09:20

{"word":"a","count":1,"time":1604286561}   //2020-11-02 11:09:21

{"word":"a","count":1,"time":1604286562}   //2020-11-02 11:09:22

{"word":"a","count":1,"time":1604286566}   //2020-11-02 11:09:26

{"word":"a","count":1,"time":1604286567}   //2020-11-02 11:09:27 (触发了窗口计算)

控制台输出

分析:通过测试发现最后在第 7s 也就是 11:09:27 时触发了窗口计算,这符合了我们的猜想一。水印延迟 2s 达到,所以会在第 5 + 2 = 7s 时认为 [ 0 ,5 ) 窗口的数据全部到齐,并触发窗口计算。计算结果为3,这是因为只有最前面的3条数据属于 [0,5) 窗口计算范围之内。

猜想2:

设置了窗口延迟2秒,那么只要在水印之后到窗口允许延迟的时间范围内达到且属于 [ 0,5) 窗口的迟到数据会被加入到窗口中,且再次触发窗口运算:

// 继续往 Kafka 中写入数据

{"word":"a","count":1,"time":1604286568}   //2020-11-02 11:09:28 时间到达了第 8 秒

{"word":"a","count":1,"time":1604286563}   //2020-11-02 11:09:23 模拟一个在水印之后、在窗口允许延迟范围内、且属于[0,5) 窗口的迟到数据,该数据还是会触发并参与到[0,5) 窗口的计算

控制台输出新增了一行

// 我们再继续往 Kafka 中写入数据

{"word":"a","count":1,"time":1604286569}  //2020-11-02 11:09:29  时间到达第9秒

{"word":"a","count":1,"time":1604286563}  //2020-11-02 11:09:23 模拟一个在水印之后且超出窗口允许延迟范围、且属于[0,5) 窗口的迟到数据,该数据不会参与和触发[0,5)窗口计算

查看控制台并没有发现新的输出打印。

解析:水印因延迟在第 7s 到达之后会触发[0,5) 窗口计算,如果没有设置窗口延迟的情况下,水印之后迟到且属于 [0,5) 窗口的数据会被丢弃。上面我们实验设置窗口延迟 2s,实现的效果就是在水印之后,窗口允许延迟时间之内(7 + 2 = 9s 之间),迟到且属于 [0,5) 窗口的数据还是会触发一次窗口计算,并参与到窗口计算中。而在 9s 之后,也就是超过窗口允许延时时间,那么迟到且属于[0,5)的数据就会被丢弃。

总结

WaterMark 到达之前,窗口在攒数据,不会触发计算。

WaterMark 等于 windowEndTime 时,第一次触发窗口计算。

WaterMark 到达之后,allowlateness之前,如果来了数据,每条数据都会触发窗口计算。

超过了allowlateness之后到达的迟到数据会丢弃。

水印用于解决乱序问题保证数据的完整性。而之所以有allowlateness的出现是因为如果WaterMark 加大会导致窗口计算延迟。WaterMark 设定的时间,是第一次触发窗口计算的时间。allowlateness 表示,WaterMark 触发窗口计算以后,还可以再等多久的迟到数据,每次符合条件的数据到达都会再次触发一次窗口计算。allowlateness 是在 Watermark 基础上再做了一层迟到数据的保证。

【责任编辑:赵宁宁 TEL:(010)68476606】

点赞 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/533467.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

爱特php文件管理器2.8_查找「超级蜘蛛池开发者中心 抠:44564876易」安卓应用 - 豌豆荚...

8.6万人安装开发者头条 - 程序员分享平台 2015 年获「最美应用」官方推荐&#xff0c;程序员必装的应用。 开发者头条是由一群程序员创建的&#xff0c;我们运营了 developerWorks 的微博、微信&#xff0c;创建了码农周刊&#xff0c;已覆盖百万程序员&#xff1b; 我们更懂程…

谈华为鸿蒙内核和操作系统,谈华为鸿蒙内核和操作系统

作者 | 陆首群谈到华为自研鸿蒙内核和操作系统&#xff0c;从华为透漏出来的信息来看&#xff0c;有点自相矛盾、扑朔迷离&#xff01;我曾说过&#xff1a;真真假假&#xff0c;虚虚实实&#xff01;这里有技术原因&#xff0c;也有外部原因。一开始(大概是 2016 年左右)&…

弹跳机器人 桌游_MIT机器人轻松搞定桌游叠叠乐:你能玩过它算我输 |《科学》子刊...

乾明 发自 凹非寺 量子位 报道 | 公众号 QbitAI江湖上&#xff0c;一直流传着一种叠叠乐的试炼。规则很简单&#xff0c;从下方的积木中&#xff0c;抽一根往上搭。你能往上搭几层&#xff1f;对MIT团队研发的机器人来说&#xff0c;玩这个游戏基本上不费吹灰之力。而且&#x…

华为鸿蒙无人驾驶,特斯拉最大的对手竟是华为?Hicar+鸿蒙OS无人驾驶技术不再一家独大!...

原标题&#xff1a;特斯拉最大的对手竟是华为&#xff1f;Hicar鸿蒙OS无人驾驶技术不再一家独大&#xff01;短短几个月的时间&#xff0c;特斯拉的市值翻了近4倍&#xff0c;对于一个超级企业来说一切都显得那么不可思议&#xff0c;如果把它单纯的看成一家车企&#xff0c;恐…

new_picview_一款漂亮的图片查看器PictureViewer

前段时间写了一款查看妹子图片的客户端宅男福利妹子客户端SuperGank&#xff0c;于是后来就把其中的一个图片查看的功能封装成了一个library&#xff0c;使用简单&#xff0c;可以进行多项设置。先来看一眼效果图吧&#xff01;下面来看看如何使用它&#xff1a;首先把图片url的…

鸿蒙系统会不会影响游戏,令人担心,鸿蒙系统会不会让人失望?未来难说

如今&#xff0c;一直被炒的沸沸扬扬的鸿蒙系统&#xff0c;终于在2019年8月9日发布了&#xff0c;次日&#xff0c;也就是8月10日&#xff0c;荣耀的智慧屏又带着鸿蒙系统出现了一次&#xff0c;荣耀智慧屏也成为了首次搭载鸿蒙系统的终端&#xff0c;见证了中国操作系统的历史…

buck电路上下管_推荐 | 学好电路设计与仿真?你不能错过这两本书籍 ~

网 友小编&#xff0c;有没有 Saber 相关书籍可以推荐一下&#xff1f;还有&#xff0c;Saber 软件下载那个版本比较好&#xff1f;当然有啦&#xff01;小 编《Saber 电路仿真及开关电源设计》柯福波 等编著本书以 Saber 开关电源为基础&#xff0c;以具体工程电路为范例&am…

html5画电池状态,HTML5的一个显示电池状态的API简介

这篇文章主要介绍了HTML5的一个显示电池状态的API简介,由Mozilla设计,具体的设备和浏览器支持情况还要通过检测才能确定,需要的朋友可以参考下移动设备的份额在网络流量中在大量增长&#xff0c;其所贡献的网络流量非常庞大&#xff0c;以至于为了移动设备&#xff0c;我们单独…

redux异步action_react-redux--异步Action

上两篇文章叙述的都是同步操作&#xff0c;每当 dispatch action 时&#xff0c;state 会被立即更新。但是实际应用中&#xff0c;我们有很多操作执行后&#xff0c;过一段时间&#xff0c;才会得到结果。那么怎么处理这种情况呢&#xff1f;先熟悉一个概念中间件本质就是一个通…

怎么批量修改html文件后缀,如何批量修改文件后缀名

我们都知道电脑文件都有一个格式&#xff0c;比如JPG、MP3等等格式&#xff0c;每个格式都代表不一样文件类型&#xff0c;那么我们该如何批量更改文件类型的后缀呢?比如把JPG更改为MP3&#xff0c;只要在电脑里设置不隐藏文件扩展名&#xff0c;然后建立统一的文件夹&#xf…

python 怎么调用 矩阵 第几行_第58集 python机器学习:混淆矩阵精度指标

混淆矩阵的精度计算公式为&#xff1a;精度(TPTN)/(TPTNFPFN)&#xff0c;也就是说&#xff0c;精度就是指正确的预测数目除以所有样本的数量。准确率、召回率与f-分数&#xff1a;总结混淆矩阵还有几种方法&#xff0c;其中最常见的就是准确率和召回率。准确率度量的是被预测为…

android seekbar闪退,android seekbar 踩坑之路

最近项目中有用到seekbar&#xff0c;之前对这东西不太了解&#xff0c;趁机来踩坑。seekbar样式按我觉得Material 中的还不算难看了。但是美工给了自己的样式&#xff0c;还是得改。主要有这2个属性:android:thumb"drawable/thumb"android:progressDrawable"dr…

tensorflow分类的loss函数_tensorflow 分类损失函数使用小记

多分类损失函数label.shape:[batch_size]; pred.shape: [batch_size, num_classes]使用 tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logitsFalse, axis-1)- y_true 真实值&#xff0c; y_pred 预测值- from_logits,我的理解是&#xff0c;如果预测…

华为鸿蒙系统还没发布吗,华为没有孤军奋战,合作伙伴“雪中送炭”,鸿蒙系统正式发布!...

6月2日晚上&#xff0c;期待许久的鸿蒙0S 2终于正式登场了&#xff0c;这意味着鸿蒙手机已经变成了面向市场的正式产品&#xff0c;这是华为迈出的一小步&#xff0c;却是国产系统与安卓、iOS竞争的开始。在鸿蒙OS正式推出后&#xff0c;不少华为手机用户已经收到了系统更新的推…

freemark循环map_java与freemarker遍历map

一、java遍历MAP/*** 1.把key放到一个集合里&#xff0c;遍历key值同时根据key得到值 (推荐)*/Set set map.keySet();Iterator itset.iterator();while(it.hasNext()){String s (String) it.next();System.out.println(map.get(s));}/*** 2.把一个map对象放到放到entry里&#…

.net 开发 html框架,Asp.net的开发框架

Asp.net的开发首先要选择开发框架&#xff0c;选择怎样框架要看看开发什么养的网站用的&#xff0c;选择一个适合的开发框架能节约很多的时间。20个优秀的前端框架&#xff1a;1. Twitter BootStrap (Apache v2.0&#xff1b;响应式)时髦、直观并且强大的前端框架&#xff0c;让…

baseresponse响应类_内部类、响应类Response、序列化基类、反序列化、全局局部钩子...

一、内部类1、概念&#xff1a;将类定义在一个类的内部&#xff0c;被定义的类就是内部类2、特点&#xff1a;内部类及内部类的所以名称空间&#xff0c;可以直接被外部类访问的3、 应用&#xff1a;通过内部类的名称空间&#xff0c;给外部类额外拓展一些特殊的属性(配置)&…

python模块名限定_python 正则表达式 匹配 ?的使用 限定符 sys.re模块

特殊字符&#xff1a;注意\b是匹配单词而非字符串的开始和结束&#xff1b; \w不能匹配汉字限定符放在匹配符的后边2 括号分组&#xff1a;(\d\d\d){2} #有括号匹配6位数字\d\d\d{2} #没有括号匹配4位数字3 &#xff1f;的使用4 sys.re模块的常用函数表&#xff1a;查找、分隔、…

计算机不能进入桌面,电脑开机无法进入桌面,请高手解决。

该故障是Windows XP操作系统关机故障中最容易出现的故障。造成该故障的原因可能有以下几方面原因&#xff1a;系统设置在添乱Windows XP默认情况下&#xff0c;当系统出现错误时会自动重新启动&#xff0c;这样当用户关机时&#xff0c;如果关机过程中系统出现错误就会重新启动…