水位线和窗口

水位线特点

  1. 插入到数据流中的一个标记,可以认为是一个特殊的数据
  2. 主要内容是一个时间戳
  3. 水位线是基于数据的时间戳生成的,即事件时间
  4. 水位线必须单调递增
  5. 水位线可以通过设置延迟,来保证正确处理乱序数据
  6. 一个水位线,表示事件时间已经达到了时间戳t
  7. 水位线是Flink流处理中保证结果正确性的核心机制

窗口

错误理解:窗口是一个固定位置的框,数据流源源不断地流过来,到某个时间窗口该关闭了,就停止收集数据,触发计算并窗口关闭输出结果。

Flink中窗口是动态创建的,当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。事实上,触发计算和窗口关闭两个行为可以分开。

总体原则

水位线出现表示这个时间之前的数据已经全部到齐,之后再也不会出现了,不过要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成方案

水位线的生成位置:越靠近数据源越好

WatermarkStrategy:水位线策略对象
1. 水位线生成器 WatermarkGenerator
- onEvent() 给每条数据生成水位线
- onPeriodicEmit():周期性生成水位线
2. 时间戳分配器 TimestampAssigner
- extractTimestamp()

有序流水位线生成的代码如下:

public class Flink01_UserDefineWaterMarkStrategy {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//设置生成水位线的周期env.getConfig().setAutoWatermarkInterval(1000);//tom,/home,1000SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] words = line.split(",");return new Event(words[0].trim(),words[1].trim(),Long.valueOf(words[2].trim()));});ds.print("input");ds.assignTimestampsAndWatermarks(new MyWatermarkStrategy());ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyWatermarkStrategy implements WatermarkStrategy<Event>{/*** 创建水位线生成器* @param context* @return*/@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWatermarkGenerator();}public static class MyWatermarkGenerator implements WatermarkGenerator<Event>{private Long maxTs = Long.MIN_VALUE;/*** 每一条数据调用一次,用于生成一次水位线* @param event* @param eventTimestamp* @param output*/@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {//有序流,每条数据生成水位线
//                System.out.println("有序流每条数据生成水位线===》"+eventTimestamp);
//                output.emitWatermark(new Watermark(eventTimestamp));maxTs = Math.max(maxTs, eventTimestamp);}/*** 周期性生成水位线* 默认周期是200ms* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//有序流,周期性生成水位线System.out.println("有序流周期性生成水位线===》"+maxTs);output.emitWatermark(new Watermark(maxTs));}}/*** 创建时间戳分配器,用于从数据中提取时间戳* @param context* @return*/@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new MyTimestampAssigner();}}public static class MyTimestampAssigner implements TimestampAssigner<Event>{/*** 从数据中提取时间戳* @param element The element that the timestamp will be assigned to.* @param recordTimestamp The current internal timestamp of the element, or a negative value, if*     no timestamp has been assigned yet.* @return*/@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTs();}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/207469.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[FPGA 学习记录] 数码管动态显示

数码管动态显示 文章目录 1 理论学习1.1 数码管动态扫描显示原理 2 实战演练2.1 实验目标2.2 程序设计2.2.1 框图绘制2.2.2 数据生成模块 data_gen2.2.2.1 波形绘制2.2.2.2 代码编写2.2.2.3 代码编译2.2.2.4 逻辑仿真2.2.2.4.1 仿真代码编写2.2.2.4.2 仿真代码编译2.2.2.4.3 波…

如何解决el-table中动态添加固定列时出现的行错位

问题描述 在使用el-table组件时&#xff0c;我们有时需要根据用户的操作动态地添加或删除一些固定列&#xff0c;例如操作列或选择列。但是&#xff0c;当我们使用v-if指令来控制固定列的显示或隐藏时&#xff0c;可能会出现表格的行错位的问题&#xff0c;即固定列和非固定列…

el-tree数据量过大,造成浏览器卡死、崩溃

el-tree数据量过大&#xff0c;造成浏览器卡死、崩溃 场景&#xff1a;树形结构展示&#xff0c;数据超级多&#xff0c;超过万条&#xff0c;每次打开都会崩溃 我这里采用的是引入新的插件虚拟树&#xff0c;它是参照element-plus 中TreeV2改造vue2.x版本虚拟化树形控件&…

2024年强烈推荐mac 读写NTFS工具Tuxera NTFS for Mac2023中文破解版

大家好啊&#xff5e;今天要给大家推荐的是 Tuxera NTFS for Mac2023中文破解版&#xff01; 小可爱们肯定知道&#xff0c;Mac系统一直以来都有一个小小的痛点&#xff0c;就是无法直接读写NTFS格式的移动硬盘和U盘。但是&#xff0c;有了Tuxera NTFS for Mac2023&#xff0c;…

正则表达式:字符串处理的瑞士军刀

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

记一次xss通杀挖掘历程

前言 前端时间&#xff0c;要开放一个端口&#xff0c;让我进行一次安全检测&#xff0c;发现的一个漏洞。 经过 访问之后发现是类似一个目录索引的端口。(这里上厚码了哈) 错误案例测试 乱输内容asdasffda之后看了一眼Burp的抓包&#xff0c;抓到的内容是可以发现这是一个…

MuJoCo机器人动力学仿真平台安装与教程

MuJoCo是一个机器人动力学仿真平台&#xff0c;它包括一系列的物理引擎、可视化工具和机器人模拟器等工具&#xff0c;用于研究和模拟机器人的运动和动力学特性。以下是MuJoCo的安装教程&#xff1a; 下载和安装MuJoCo Pro。可以从MuJoCo的官方网站上下载最新版本的安装包。根…

【Python机器学习系列】一文彻底搞懂机器学习中表格数据的输入形式(理论+源码)

一、问题 机器学习或者深度学习在处理表格数据&#xff08;Tabular data&#xff09;、图像数据&#xff08;Image data&#xff09;、文本数据&#xff08;Text data&#xff09;、时间序列数据&#xff08;Time series data&#xff09;上得到了广泛的应用。 其中&#xff0c…

微信小程序 - 创建 ZIP 压缩包

微信小程序 - 创建 ZIP 压缩包 场景分享代码片段导入 JSZip创建ZIP文件追加写入文件测试方法参考资料 场景 微信小程序只提供了解压ZIP的API&#xff0c;并没有提供创建ZIP的方法。 当我们想把自己处理好的保存&#xff0c;打包ZIP保存下来时就需要自己实现了。 分享代码片段…

无重复字符的最长子串(LeetCode 3)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路方法一&#xff1a;暴力法方法二&#xff1a;滑动窗口 参考文献 1.问题描述 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的最长子串的长度。 s 由英文字母、数字、符号和空格组成。 示例 1&#xff1a; 输…

基于Java商品销售管理系统

基于Java商品销售管理系统 功能需求 1、商品管理&#xff1a;系统需要提供商品信息的管理功能&#xff0c;包括商品的录入、编辑、查询和删除。每个商品应包含基本信息如名称、编码、类别、价格、库存量等。 2、客户管理&#xff1a;系统需要能够记录客户的基本信息&#xf…

算法:常见的哈希表算法

文章目录 两数之和判断是否互为字符重排存在重复元素存在重复元素字母异位词分组 本文总结的是关于哈希表常见的算法 哈希表其实就是一个存储数据的容器&#xff0c;所以其实它本身的算法难度并不高&#xff0c;只是利用哈希表可以对于一些场景进行优化 两数之和 class Solut…

Michael.W基于Foundry精读Openzeppelin第41期——ERC20Capped.sol

Michael.W基于Foundry精读Openzeppelin第41期——ERC20Capped.sol 0. 版本0.1 ERC20Capped.sol 1. 目标合约2. 代码精读2.1 constructor() && cap()2.2 _mint(address account, uint256 amount) 0. 版本 [openzeppelin]&#xff1a;v4.8.3&#xff0c;[forge-std]&…

AI智能降重软件大全,免费最新AI智能降重软件

在当今信息爆炸的时代&#xff0c;内容创作者们面临着巨大的写作压力&#xff0c;如何在保持高质量的前提下提高效率成为摆在许多人面前的难题。AI智能降重软件因其独特的算法和功能逐渐成为提升文案质量的得力助手。本文将专心分享一些优秀的AI智能降重软件。 147SEO改写软件 …

云贝教育 |【技术文章】PostgreSQL中误删除数据怎么办(一)

原文链接&#xff1a;【PostgreSQL】PostgreSQL中误删除数据怎么办&#xff08;一&#xff09; - 课程体系 - 云贝教育 (yunbee.net) 在我们学习完PG的MVCC机制之后&#xff0c;对于DML操作&#xff0c;被操作的行其实并未被删除&#xff0c;只能手工vacuum或自动vacuum触发才会…

【分享】我想上手机器学习

目录 前言 一、理解机器学习 1.1 机器学习的目的 1.2 机器学习的模型 1.3 机器学习的数据 二、学习机器学习要学什么 2.1 学习机器学习的核心内容 2.2 怎么选择模型 2.3 怎么获取训练数据 2.4 怎么训练模型 三、机器学习的门槛 3.1 机器学习的第一道门槛 3.2 机器…

最新版IDEA专业版大学生申请免费许可证教学(无需学校教育邮箱+官方途径+非破解手段)

文章目录 前言1. 申请学籍在线验证报告2. 进入IDEA官网进行认证3. 申请 JB (IDEA) 账号4. 打开 IDEA 专业版总结 前言 当你进入本篇文章时, 你应该是已经遇到了 IDEA 社区版无法解决的问题, 或是想进一步体验 IDEA 专业版的强大. 本文是一篇学生申请IDEA免费许可证的教学, 在学…

unity 2d 入门 飞翔小鸟 小鸟碰撞 及死亡(九)

1、给地面&#xff0c;柱体这种添加2d盒装碰撞器&#xff0c;小鸟移动碰到就不会动了 2、修改小鸟的脚本&#xff08;脚本命名不规范&#xff0c;不要在意&#xff09; using System.Collections; using System.Collections.Generic; using UnityEngine;public class Fly : Mo…

kafka高吞吐、低延时、高性能的实现原理

作者&#xff1a;源码时代-Raymon老师 Kafka的高吞吐、低延时、高性能的实现原理 Kafka是大数据领域无处不在的消息中间件&#xff0c;目前广泛使用在企业内部的实时数据管道&#xff0c;并帮助企业构建自己的流计算应用程序。Kafka虽然是基于磁盘做的数据存储&#xff0c;但…

可信固件-M (TF-M)

概述&#xff1a; 参考: Trusted Firmware-M Documentation — Trusted Firmware-M v2.0.0 documentation 开源代码托管&#xff1a; trusted-firmware-m.git - Trusted Firmware for M profile Arm CPUs STM32 U5支持TF-M : STM32U5 — Trusted Firmware-M v2.0.0 document…