Flink学习(二)Flink中的时间

 摘自Apache Flink官网

 

最早的streaming 架构是storm的lambda架构

分为三个layer

  • batch layer
  • serving layer
  • speed layer

 

 

 

 

一、在streaming中Flink支持的通知时间

Flink官网写了个了解streaming和各种时间的博客

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101#F2

 

1、Processing time:执行时候的机器系统时间。

  • 如果使用时间窗口的话,如果一个应用在9:15开始,那么第一次的结束时间在10:00. 然后是10:00~11:00, 之后都是整点。就第一个点比较特殊

 

2、Event time:每一个Event在其设备上产生的时间,是在进入Flink之前的时间。

  • 可以从data里面提取出来
  • Event time的程序必须声明怎么产生Event Time Watermarks。
  • Event time处理会发生延时,因为有可能有的Event没有到达
  • 如果所有的events都到达了,那么event time operations会按照预期的执行

 

3、Ingestion time:events进入Flink的时间

  • 在source算子,每一个记录得到当前算子的时间,基于时间的操作根据这个时间。
  • 记录时间有点开销,因为是在source上,但是非常可靠。因为如果是processing time的话,有可能机器的local time不一样
  • Ingestion time和event time不一样,这个不能处理过期时间

4、watermark:在Flink中Event time程序衡量执行的是watermarks

  • watermark携带了时间戳
  • watermark在source function之后产生
  • 每一个并行的子任务独立的产生watermarks
  • 可以设置迟到时间,来容忍迟到的watermak

注册watermark的代码:

 1 public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> {
 2         private final long maxOutOfOrderness = 3500; // 3.5 seconds
 3 
 4         private long currentMaxTimestamp;
 5 
 6         @Override
 7         public long extractTimestamp(OrderRecord record, long previousElementTimestamp) {            // 将数据中的时间戳字段(long 类型,精确到毫秒)赋给 timestamp 变量,此处是 OrderRecord 的 timestamp 字段
 8             long timestamp = record.timestamp;
 9             currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
10             return timestamp;
11         }
12 
13         @Override
14         public Watermark getCurrentWatermark() {            // return the watermark as current highest timestamp minus the out-of-orderness bound
15             return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
16         }
17     }

 

5、Late Elements:迟到元素。即使在watermark(k)已经产生了之后,仍然有迟到元素

  • 设置很长的延迟时间不太实际
  • 默认上Late Elements是drop掉的
  • Flink支持allowedLateness,在被drop前可以容忍的最大延迟时间
  • 如果设置了allowedLateness,当迟到元素到达的时候,会再计算一遍窗口
  • 也可以设置side output将废弃的数据当成side output

6、idling sources: 在一段时间内,watermark没有到来,窗口内的元素就不执行,这就是idling sources

 

二、生成TimeStamps / Watermarks

1、指派timestamps

这部分通常在实例中的一些filed内进行accessing/extracting the timestamp。

2、生成timestamps 和 watermark的方法

  • Directly in the data source.
  • 通过watermark 和 timestamp generator

3、在source下生成timestamps和watermark

  • 需要使用collectWithTimestamp方法在SourceContext下面
  • watermark需要使用emitWatermark

如果使用了generator那么source生成的watermark和timestamp会被复写

Java Code:

 1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 2 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 3 
 4 DataStream<MyEvent> stream = env.readFile(
 5         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
 6         FilePathFilter.createDefaultFilter(), typeInfo);
 7 
 8 DataStream<MyEvent> withTimestampsAndWatermarks = stream
 9         .filter( event -> event.severity() == WARNING )
10         .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
11 
12 withTimestampsAndWatermarks
13         .keyBy( (event) -> event.getGroup() )
14         .timeWindow(Time.seconds(10))
15         .reduce( (a, b) -> a.add(b) )
16         .addSink(...);

 

三、预定义的TimeStamp Extractor和Watermark Emmiter

1、最简单的watermark generator

如果并行数据是升序的,那么最简单的方法是使用 AscendingTimestampExtractor。即便是kafka消息源,如果每个partition的消息是升序的,那么在shuffle阶段,会把每个partition的watermark正确的进行shuffle。

 1 DataStream<MyEvent> stream = ...
 2 
 3 DataStream<MyEvent> withTimestampsAndWatermarks =
 4     stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
 5 
 6         @Override
 7         public long extractAscendingTimestamp(MyEvent element) {
 8             return element.getCreationTime();
 9         }
10 });

 

2、允许延迟的watermark

可以设定固定的延时时间,延迟=迟到时间戳   -  上一个元素的watermark。如果延迟 > lateness,会被忽略。

 1 DataStream<MyEvent> stream = ...
 2 
 3 DataStream<MyEvent> withTimestampsAndWatermarks =
 4     stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
 5 
 6         @Override
 7         public long extractTimestamp(MyEvent element) {
 8             return element.getCreationTime();
 9         }
10 });

 

转载于:https://www.cnblogs.com/ylxn/p/10617357.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RSS阅读器使用:ROME,Spring MVC,嵌入式Jetty

在这篇文章中&#xff0c;我将展示一些创建Spring Web应用程序的准则&#xff0c;使用Jetty以及使用名为ROME的外部库运行RSS来运行它。 一般 我最近创建了一个示例Web应用程序&#xff0c;充当RSS阅读器。 我想检查ROME以阅读RSS。 我还想使用Spring容器和MVC创建最简单的视图…

HZOJ string

正解炸了…… 考试的时候想到了正解&#xff0c;非常高兴的打出来了线段树&#xff0c;又调了好长时间&#xff0c;对拍了一下发现除了非常大的点跑的有点慢外其他还行。因为复杂度算着有点高…… 最后正解死于常数太大……旁边的lyl用同样的算法拿了90分我却拿了个暴力的分40……

Unity3D入门其实很简单

在上次发布拙作后&#xff0c;有不少童鞋询问本人如何学习Unity3D。本人自知作为一名刚入门的菜鸟&#xff0c;实在没有资格谈论这么高大上的话题&#xff0c;生怕误导了各位。不过思来想去&#xff0c;决定还是写一些自己的经验&#xff0c;如果能给想要入门U3D的您一些启发&a…

4. HTML表单标签

表单是网页中最常见的元素&#xff0c;也是用户和我们交互的重要手段&#xff0c;在网站中的登录、注册、信息更新这些功能都是依赖表单实现的。在HTML中对于表单提供了一系列的标签&#xff0c;即输入框、下拉框、按钮、文本域&#xff0c;如下是一个最常见的表单结构内容&…

Java 8默认方法可能会破坏您的(用户)代码

乍一看&#xff0c; 默认方法为Java虚拟机的指令集带来了一个很棒的新功能。 最后&#xff0c;库开发人员能够开发已建立的API&#xff0c;而不会对其用户代码造成不兼容性。 使用默认方法&#xff0c;当将新方法引入该接口时&#xff0c;任何实现库接口的用户类都会自动采用默…

ADO.NET之使用DataSet类更新数据库

1.首先从数据库获得数据填充到DataSet类&#xff0c;该类中的表和数据库中的表相互映射。 2.对DataSet类中的表进行修改&#xff08;插入&#xff0c;更新&#xff0c;删除等&#xff09; 3.同步到数据库中&#xff1a;使用SqlDataAdapter实例名.Update(DataSet实例名&#xff…

CSS完美兼容IE6/IE7/FF的通用方法

关于CSS对各个浏览器兼容已经是老生常谈的问题了, 网络上的教程遍地都是.以下内容没有太多新颖, 希望能对初学者有一定的帮助.一、CSS HACK 以下两种方法几乎能解决现今所有HACK. 1, !important 随着IE7对!important的支持, !important 方法现在只针对IE6的HACK.(注意写法.记得…

学习进度

这一周学习了第四章登录&#xff0c;第五章系统业务逻辑没有完成上周规定的四五六章&#xff0c;因为发现有些以前看的东西现在没印象了&#xff0c;又看了一些之前的。 在代码上完成了老师布置的代码&#xff0c;第一个是判断AB和C的关系 &#xff0c;第二个是成绩排名&#x…

为Lucene选择快速唯一标识符(UUID)

大多数使用Apache Lucene的搜索应用程序都会为每个索引文档分配唯一的ID&#xff08;即主键&#xff09;。 尽管Lucene本身不需要这样做&#xff08;它可能不太在乎&#xff01;&#xff09;&#xff0c;但应用程序通常需要它以后通过其外部ID替换&#xff0c;删除或检索该文档…

ubuntu16.04设置静态ip

最近在课堂上&#xff0c;有很多同学反映在搭建环境的时候&#xff0c;虚拟机ip经常变&#xff0c;那么我们配置好的web服务可能就不能用了。下面讲一下如何在ubuntu上面设置静态ip 1&#xff1a;首先我们确认一下ubuntu的版本 cat /etc/issue 或者sudo lsb_release -a或者unam…

css布局:块级元素的居中

一.定宽&#xff1a; 1.定位居中(absolute) 方法一&#xff1a; html:<div class"main"></main>css:.main{width:400px;height:200px;background:#eee;position:absolute;left:50%;top:50%;margin-left:-200px;margin-top:-100px;} 方法二&#xff1a;…

精准客户数据服务

详情请点击&#xff1a;http://www.rulingtech.com 客户数据&#xff08;名录&#xff09;是市场营销活动不可或缺的重要资料。目前企业采用的客户名录数据主要通过订购黄页、购买第三方名录、自行搜集等途径。这些方式面临的问题是&#xff1a;数据重复度高、有效性差、准确率…

Maven常用的构建命令

Maven常用命令&#xff1a; Maven库&#xff1a; http://repo2.maven.org/maven2/ Maven依赖查询&#xff1a; http://mvnrepository.com/ 一&#xff0c;Maven常用命令&#xff1a; 1. 创建Maven的普通Java项目&#xff1a; mvn archetype:create-DgroupIdpackageName-Dartifa…

JPA 2.1实体图–第1部分:命名实体图

延迟加载通常是JPA 2.0的问题。 如果要使用FetchType.LAZY&#xff08;默认&#xff09;或FetchType.EAGER来加载关系&#xff0c;则必须在实体上进行定义&#xff0c;并且始终使用此模式。 仅当我们要始终加载关系时才使用FetchType.EAGER。 FetchType.LAZY几乎在所有情况下都…

课时85.层叠性(掌握)

1.什么是层叠性&#xff1f; 层叠性就是CSS处理冲突的一种能力。 这个字体最终会变为红色 注意点&#xff1a; 层叠性只有在多个选择器选中“同一个标签”,然后又设置了“相同的属性”&#xff0c;才会发生层叠性。 CSS全称&#xff1a;Cascading StyleSheet 层叠样式表&am…

ABZ职业规划

关于职业规划&#xff0c;LinkedIn&#xff08;领英&#xff09;创始人里德霍夫曼&#xff08;Reid Hoffman&#xff09;提出著名的“ABZ理论”&#xff0c;教我们应对变化莫测的职场人生。 德霍父曼建议每个职场人应有A计划&#xff0c;B计划&#xff0c;C计划。 A计划&#x…

SetProcessWorkingSetSize减少内存占用

系统启动起来以后&#xff0c;内存占用越来越大&#xff0c;使用析构函数、GC.Collect什么的也不见效果&#xff0c;后来查了好久&#xff0c;找到了个办法&#xff0c;就是使用 SetProcessWorkingSetSize函数。这个函数是Windows API 函数。下面是使用的方法&#xff1a;[Syst…

Spring Boot 与消息 (JMS、AMQP、RabbitMQ)

RabbitMQ教程 - 鸟哥的专栏 - CSDN博客 一、概述 大多应用中&#xff0c;可通过消息服务中间件来提升系统异步通信、扩展解耦能力消息服务中两个重要概念&#xff1a;消息代理&#xff08;message broker)和目的地&#xff08;destination) 当消息发送者发送消息以后&#xff0…

使用Apache Camel通过soap添加WS-Security

WS-Security&#xff08;Web服务安全性&#xff09;是一种协议&#xff0c;可让您保护自己的soap Web服务。 发出Soap请求的客户端必须在Soap标头中提供登录名和密码。 服务器接收到肥皂请求&#xff0c;检查凭据并验证请求是否正确。 使用Apache Camel&#xff0c;可以很容易…

课时3.浏览器访问网页原理(理解)

浏览器访问网页原理&#xff08;理解&#xff09; 第一次打开IE6&#xff0c;发现系统自动生成了一个文件夹&#xff0c;所以我们可以得出这个文件夹必然和IE6有一定的关系先删除Internet Cache下的所有文件夹&#xff0c;然后通过IE6打开百度的首页&#xff0c;打开百度首页以…