flink学习之水位线

什么是水位线

在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,
用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数
据的时间戳来驱动的。
我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟
的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标
记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以
更新自己的时钟了。在 Flink 中,数据流中用来做时间标记的记号就叫做水位线。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,
主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个
数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

水位线的分类

有序的水位线

在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;而在实际应用中,
如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间
戳、插入水位线就做了大量的无用功。所以为了提高效率,一般会每隔一段时间生成一个水位
线,这个水位线的时间戳,就是当前最新数据的时间戳,所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
在这里插入图片描述

无序的水位线

在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改
变,这就是所谓的“乱序数据”。
在这里插入图片描述
对于连续数据流,我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就
不再生成新的水位线,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
在这里插入图片描述

如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需
要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新
的水位线,。所以我们可以试着多等几秒,也就是把时钟调得更慢一些。最终的目的,就是要让窗口能够把所有迟到数据都收进来,得到正确的计算结果。对应到水位线上,其实就是要保证,当前时间已经进展到了这个时间戳,在这之后不可能再有迟到数据来了(延迟设的足够长)。
在这里插入图片描述

如何生成水位线

1.水位线的生成时机

水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,可以考虑在生成水位线之前使用。

2.水位线生成策略

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法:assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指
示事件时间。

val stream: DataStream[ClickEvent] = env.addSource(new ClickSource())  
val withTimestampsAndWatermarks: DataStream[ClickEvent] = stream.assignTimestampsAndWatermarks(watermarkStrategy)

assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是
所谓的“水位线生成策略”。WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner
和一个“水位线生成器”WatermarkGenerator。

trait WatermarkStrategy[T] extends TimestampAssignerSupplier[T] with WatermarkGeneratorSupplier[T] {  def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[T]  def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[T]  
}

TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给
元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在
WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,
以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间
为处理时间,可以调用环境配置的 setAutoWatermarkInterval()方法来设置,默认为
200ms。

env.getConfig.setAutoWatermarkInterval(60 * 1000L)
3. flink内置水位线生成器
  1. 有序流
val stream: DataStream[Event] = env.addSource(new ClickSource())  
val withTimestampsAndWatermarks: DataStream[Event] = stream.assignTimestampsAndWatermarks(  WatermarkStrategy  .forMonotonousTimestamps[Event]()  .withTimestampAssigner { (event, timestamp) => event.timestamp }  
)
  1. 无序流
import java.time.Duration  
import org.apache.flink.streaming.api.scala._  
import org.apache.flink.streaming.api.windowing.time.Time  
import org.apache.flink.util.Collector  object OutOfOrdernessTest {  def main(args: Array[String]): Unit = {  val env = StreamExecutionEnvironment.getExecutionEnvironment  val clickSource = new ClickSource()  val stream = env.addSource(clickSource)  // 插入水位线的逻辑  val watermarkedStream = stream  .assignTimestampsAndWatermarks(  WatermarkStrategy  .forBoundedOutOfOrderness(Time.seconds(5))  .withTimestampAssigner(new SerializableTimestampAssigner[Event] {  override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp  })  )  watermarkedStream.print()  env.execute("OutOfOrdernessTest")  }  
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/635483.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

matlab多元线性回归

1.matlab多元回归示例如下: 解决问题:油价预测 方法:多元线性回归 实现:matlab regress()函数 技巧:通过增加X1^2,X2^2,或者X1*X2等构造的特征项,可以提高回归模型的拟合准确度&…

基于SpringBoot Vue自习室管理系统

大家好✌!我是Dwzun。很高兴你能来阅读我,我会陆续更新Java后端、前端、数据库、项目案例等相关知识点总结,还为大家分享优质的实战项目,本人在Java项目开发领域有多年的经验,陆续会更新更多优质的Java实战项目&#x…

Mybatis面试题(四)

MyBatis 面试题 26、Mapper 编写有哪几种方式&#xff1f; 第一种&#xff1a;接口实现类继承 SqlSessionDaoSupport&#xff1a;使用此种方法需要编写mapper 接口&#xff0c;mapper 接口实现类、mapper.xml 文件。 1、在 sqlMapConfig.xml 中配置 mapper.xml 的位置 <m…

Bit.Store 加密卡集成主流 BRC20 ,助力 BTC 生态 Token 的流动性与消费

“Bit.Store 首创性的将包括 ORDI、SATS、以及 RATS 在内的主流 BRC20 资产集成到其加密卡支付中&#xff0c;通过以其推出的加密银行卡为媒介&#xff0c;助力 BTC 生态 Token 的流动性与消费。” 比特币网络在被设计之初&#xff0c;就是以一种去中心化、点对点的现金系统为定…

R2DBC-响应式数据库

简单查询 基于全异步,响应式,消息驱动 用法: 1.导入驱动:导入连接池(r2dbc-pool),导入驱动(r2dbc-mysql) 2. 使用驱动提供的api操作 pom.xml <properties><r2dbc-mysql.version>1.0.5</r2dbc-mysql.version> </properties><dependencies><d…

黑马 Javaweb - MySQL 精华篇

我是南城余&#xff01;阿里云开发者平台专家博士证书获得者&#xff01; 欢迎关注我的博客&#xff01;一同成长&#xff01; 一名从事运维开发的worker&#xff0c;记录分享学习。 专注于AI&#xff0c;运维开发&#xff0c;windows Linux 系统领域的分享&#xff01; 知…

市场监管总局发布区块链和分布式记账技术6项标准,中创积极推动区块链产业发展!

近日&#xff0c;市场监管总局&#xff08;国家标准委&#xff09;批准发布一批重要国家标准&#xff0c;涉及生产生活、绿色可持续等多个领域&#xff0c;这些标准将在引领产业发展、促进绿色转型、助力对外贸易、推动城乡建设、提升生活品质等方面发挥重要作用。 其中一项标…

02-编程猜谜游戏

本章通过演示如何在实际程序中使用 Rust&#xff0c;你将了解 let 、 match 、方法、关联函数、外部crate等基础知识。 本章将实现一个经典的初学者编程问题&#xff1a;猜谜游戏。 工作原理如下&#xff1a;程序将随机生成一个介于 1 和 100 之间的整数。然后&#xff0c;程序…

C# Cad2016二次开发选择csv导入信息(七)

//选择csv导入信息 [CommandMethod("setdata")] //本程序在AutoCAD的快捷命令是"DLLLOAD" public void setdata() {Microsoft.Win32.OpenFileDialog dlg new Microsoft.Win32.OpenFileDialog();dlg.DefaultExt ".csv";// Display OpenFileDial…

DNS寻址过程

用一张图详细的描述DNS寻址的过程&#xff0c;是高级前端进阶的网络篇&#xff1a; 主要是第三步要记仔细就行啦&#xff0c;每一步都要详细的记录下来&#xff0c;总结的脉络如下&#xff1a; 本地DNS缓存本地DNS服务器根域名服务器 顶级域名服务器再次顶级域名服务器权威域名…

YOLOv5改进 | 主干篇 | 华为移动端模型GhostnetV2一种移动端的专用特征提取网络

一、本文介绍 本文给大家带来的改进机制是华为移动端模型Ghostnetv2,华为GhostNetV2是为移动应用设计的轻量级卷积神经网络(CNN),旨在提供更快的推理速度,其引入了一种硬件友好的注意力机制,称为DFC注意力。这个注意力机制是基于全连接层构建的,它的设计目的是在通用硬…

Vue-21、Vue监测数组改变

1、数组调用以下方法Vue可以监测到。 arr.push(); 向数组的末尾追加元素 const array [1,2,3] const result array.push(4) // array [1,2,3,4] // result 4arr.pop(); 删除末尾的元素 const array [a, b] array.pop() // b array.pop() // a array.pop() // undefi…

Elasticsearch各种高级文档操作3

本文来记录几种Elasticsearch的文档操作 文章目录 初始化文档数据聚合查询文档概述对某个字段取最大值 max 示例对某个字段取最小值 min 示例对某个字段求和 sum 示例对某个字段取平均值 avg 示例对某个字段的值进行去重之后再取总数 示例 本文小结 初始化文档数据 在进行各种文…

flutter获取地理定位:geolocator依赖详细用法

本文使用geolocator插件实现app物理定位功能。 该插件的主要功能有&#xff1a; 获取最后已知位置&#xff1b;获取设备当前位置&#xff1b;获取连续的位置更新&#xff1b;检查设备是否启用了定位服务&#xff1b;计算两个地理坐标之间的距离&#xff08;米&#xff09;&am…

AI时代—ChatGPT-4.5的正确打开方式

前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家&#xff1a;https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言4.5key价格泄漏ChatGPT4.0使用地址ChatGPT正确打开方式最新功能语音助手存档…

微信小程序(七)navigator点击效果

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.默认效果 2.无效果 3.激活效果 源码&#xff1a; index.wxml //如果 <navigator url"/pages/logs/logs">跳转到log页面&#xff08;默认&#xff09; </navigator><navigator url&q…

从零开始,自己搭建一个autonomous mobile robot做gazebo仿真(1):mobile robot建模与添加差速控制器

这样一个简单的mobile robot模型 首先写xacro文件&#xff0c;创建 link joint transmission <?xml version"1.0"?> <robot xmlns:xacro"http://www.ros.org/wiki/xacro" name"whill_modelc" ><xacro:property name"PI&q…

【点云、图像】学习中 常见的数学知识及其中的关系与python实操[更新中]

文章目录 前言一、平均值、方差、协方差平均值&#xff08;mean&#xff09;np.mean()方差&#xff08;variance&#xff09;np.var()总体方差 np.var(a, ddof0)无偏样本方差np.var(a, ddof1)有偏样本方差标准差&#xff08;standard deviation&#xff09;np.std(a, ddof1)默认…

Docker 部署考核

Docker安装 安装必要的系统工具 yum install -y yum-utils device-mapper-persistent-data lvm2 添加docker-ce安装源&#xff1a; yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo 配置阿里云Docker Yum源: yum-config-manager --ad…

2024.1.19 网络编程 作业

思维导图 练习题 1> UDP传输实现聊天室 服务器端 #include <myhead.h> #define SER_IP "192.168.125.151" #define SER_PORT 9999 typedef struct Msg {char user[32]; //用户名int type; //执行操作1.登录、2.发消息、0.退出char text[1024]; …