Flink实时开发添加水印的案例分析

在Flink中,处理时间序列数据时,通常需要考虑事件时间和水印(watermarks)的处理。以下是修改前后的代码对比分析:

修改前的代码:

val systemDS = unitDS.map(dp => {dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))dp
}).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
  1. unitDS 经过一个 map 操作,将每个元素的 deviceCode 转换为系统设备码。
  2. 使用 keyBy(_.getDeviceCode) 对转换后的设备码进行分组。
  3. 定义了一个基于事件时间的滚动窗口,窗口大小为60秒。
  4. 使用 process 操作应用自定义的窗口函数 HPageSystemWinF 来处理每个窗口中的数据。

注意:修改前的代码没有显示地处理水印(watermarks),这可能导致在处理乱序数据或延迟数据时出现问题。

修改后的代码:

val systemDS = unitDS.map(dp => {dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))dp
}).keyBy(_.getDeviceCode)
.assignTimestampsAndWatermarks(WatermarkStrategy.<boundedOutOfOrdernessDaysPower>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 假设这里应该是.forBoundedOutOfOrderness而不是.forBoundedOutOfOrdernessDaysPower.withIdleness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner[DaysPower] {override def extractTimestamp(element: DaysPower, recordTimestamp: Long): Long = {Math.max(element.getEventTime, recordTimestamp)}})
).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
  1. 与修改前相同的部分:mapkeyBy, 和 window 操作。
  2. 添加了 assignTimestampsAndWatermarks 方法来处理事件时间和水印:
    • 使用 WatermarkStrategy.forBoundedOutOfOrderness 允许一定程度的乱序数据(这里是5秒)。
    • .withIdleness(Duration.ofSeconds(5)) 设置了空闲超时时间为5秒,用于处理不活跃的键。
    • 使用 withTimestampAssigner 自定义了时间戳分配器,确保使用的事件时间是元素中的 eventTime 和记录的 recordTimestamp 中的较大值。

不同点和适用场景:

  • 事件时间和水印处理:修改后的代码显式地处理了事件时间和水印,这对于处理乱序数据、延迟数据以及确保正确的时间窗口计算是非常重要的。如果您的数据流中存在乱序或延迟数据,或者您希望更严格地保证处理时间窗口的正确性,那么应该使用修改后的代码。
  • 空闲超时:通过设置空闲超时,可以处理那些长时间不活跃的键,避免因为某些键长时间没有新数据而导致整个程序挂起。
  • 延迟数据处理:如果数据有可能晚到,但仍然需要被纳入正确的窗口进行计算,水印可以帮助界定数据的“迟到”界限。
    精确的时间窗口分析:对于需要基于事件实际发生时间而非数据处理时间进行分析的场景,如实时监控、金融交易分析等,事件时间模型是必须的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/45709.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Macos 远程登录 Ubuntu22.04 桌面

这里使用的桌面程序为 xfce, 而 gnome 桌面则测试失败。 1,安装 在ubuntu上&#xff0c;安装 vnc server与桌面程序xfce sudo apt install xfce4 xfce4-goodies tightvncserver 2&#xff0c;第一次启动和配置 $ tightvncserver :1 设置密码。 然后修改配置&#xff1a…

JVM 之对象的结构与创建

1.对象的创建 1.1类加载 当Java 虚拟机遇到一条字节码 new 指令时&#xff0c;首先将去检查这个指令的参数是否能在常量池中定位到 一个类的符号引用&#xff0c;并且检查这个符号引用代表的类是否已被加载、解析和初始化过。如果没有&#xff0c;那 必须先执行相应的类加载过…

Python MySQL 教程

Python MySQL 教程 引言 Python 是一种广泛使用的高级编程语言,以其简洁明了的语法和强大的功能而闻名。MySQL 是一种流行的开源关系数据库管理系统 (RDBMS),广泛用于各种应用程序,包括数据密集型 Web 应用程序。Python MySQL 教程将指导您如何使用 Python 语言与 MySQL 数…

C# .net6使用Hangfire

首先我们先来了解什么是Hangfire&#xff1f; Hangfire 是一个用于 .NET 的任务调度库&#xff0c;允许你在后台运行任务&#xff0c;而不需要依赖外部的任务队列服务或复杂的基础设施。它简化了后台任务的创建、调度和管理过程&#xff0c;使得在 .NET 应用程序中处理长期运行…

flutter Android端权限

flutter 中权限请求path_provider Android 6.0 - 10.0 (API level 23 - 29)Android 11 (API level 30)具体实现示例注意事项 在 Flutter 中使用 path_provider 插件获取除本应用外所有的 PDF 文件&#xff0c;对于不同的 Android 版本&#xff08;从 Android 6.0 到 Android 14…

我不是一个人在战斗

这也是一个老生常谈的话题&#xff0c;但很多人还是出不出单兵作战的怪圈&#xff0c;不断让单兵作战的孤独感、团队协作的无助感困惑自己&#xff0c;困惑整个团队&#xff0c;造成心气一直不高&#xff0c;严重影响工作效率。对此我还是坚持去年的观点&#xff1a;坚持宽松、…

【系统架构设计】操作系统(一)

操作系统&#xff08;一&#xff09; 操作系统的类型和结构操作系统基本原理进程管理进程三态模型挂起状态进程互斥 / 进程同步前趋图进程调度死锁 存储管理设备管理文件管理作业管理 操作系统原理的关键在于“一个观点、两条线索”&#xff1a;一个观点是以资源管理的观点来定…

简谈设计模式之代理模式

代理模式是一种结构型设计模式, 主要用于为其他对象提供一种代理, 以控制对这个对象的访问. 代理模式可以在不修改目标对象的前提下, 通过代理对象在访问目标对象之前或之后增加额外的操作 代理模式结构 抽象主题: 定义了真实主题和代理的共同接口, 这样客户端就可以通过接口…

【算法笔记自学】第 9 章 提高篇(3)——数据结构专题(2)

9.1树与二叉树 #include <cstdio>int main() {int n, m;scanf("%d%d", &n, &m);printf(n m 1 ? "Yes" : "No");return 0; } 9.2二叉树的遍历 #include <cstdio> #include <vector> using namespace std;const int…

Java毕业设计 基于SSM vue电影订票系统小程序 微信小程序

Java毕业设计 基于SSM vue电影订票系统小程序 微信小程序 SSM 电影订票系统小程序 功能介绍 用户 登录 注册 忘记密码 首页 图片轮播 电影信息 电影详情 评论 收藏 预订 电影资讯 资讯详情 用户信息修改 电影评价 我的收藏管理 用户充值 在线客服 我的订单 管理员 登录 个人…

不遵守全局主键配置【PGSQL】

<selectKey keyProperty"id" resultType"java.lang.Long" order"BEFORE"> SELECT nextval(ft.id2_seq::regclass) </selectKey> 这段 XML 配置中的 <selectKey> 标签是 MyBatis 中用来生成主键的关键部分。让我解释一下每个…

SpringCloud集成kafka集群

目录 1.引入kafka依赖 2.在yml文件配置配置kafka连接 3.注入KafkaTemplate模版 4.创建kafka消息监听和消费端 5.搭建kafka集群 5.1 下载 kafka Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/downloads.html 5.2 在config目录下做…

java导出word实现

参考&#xff1a;Poi-tl Documentation

NaiveUI与ElementUI 比较分析

前言 在前端开发的广阔领域中&#xff0c;Vue.js作为最流行的前端框架之一&#xff0c;为开发者提供了丰富的组件库&#xff0c;其中NaiveUI和ElementUI是两个备受瞩目的选择。本文将深入分析这两个组件库的特点、优劣势以及适用场景&#xff0c;帮助开发者在项目中做出更合适…

全渠道AI智能商品管理软件平台 助力零售品牌占领技术高地

关于7thonline第七在线 1999年创立于纽约&#xff0c;7thonline第七在线全渠道AI智能商品管理平台&#xff0c;以先进的数学算法模型、人工智能和机器学习技术为核心驱动力&#xff0c;融合了众多零售商品管理的卓越实践经验&#xff0c;精心打造出一套深度适配零售业务场景的自…

[每周一更]-(第105期):SSL证书过期后引发的DNS缓存问题

问题回顾&#xff1a; ​ 上班路上收到ZeroSSL邮件通知我们清点项目的SSL证书到期了&#xff0c;到公司还是登录网址查看信息&#xff0c;一看果然是7.10也就是今天到期&#xff0c;开始看下acme.sh的定制任务为何没生效&#xff0c;一看crontab脚本&#xff0c;日志任务丢垃圾…

tomcat和nginx实现动静分离

访问nginx就是静态页面&#xff0c;nginx代理index.jsp可以访问tomcat的动态页面。 实验 1、设备以及IP地址 nginx1 192.168.10.41 tomcat1 192.168.10.51 tomcat2 192.168.10.52 2、tomcat1 的配置 创建动态页面 cd /usr/local/tomcat/webapps 创建一个目录作为一个ser…

复现ORB3-YOLO8项目记录

文章目录 1.编译错误1.1 错误11.2 错误21.3 错误31.4 错误4 1.编译错误 首先ORB-SLAM相关项目已经写过很多篇博客了&#xff0c;从ORB-SLAM2怎么运行&#xff0c;再到现在的项目。关于环境已经不想多说了 1.1 错误1 – DEPENDENCY_LIBS : /home/lvslam/ORB3-YOLO8/Thirdparty…

2024年C#优秀实用的类库推荐

简介 在软件开发领域&#xff0c;随着技术的不断进步和项目的复杂化&#xff0c;使用高质量的类库来加速开发过程、减少错误并提高代码质量变得至关重要。C# 作为一款功能强大的编程语言&#xff0c;拥有众多优秀的类库供开发者选择。本文旨在为您推荐一些在2024年备受推崇的C…

Feature Alignment and Uniformity for Test Time Adaptation--论文笔记

论文笔记 资料 1.代码地址 https://github.com/SakurajimaMaiii/TSD 2.论文地址 https://arxiv.org/abs/2303.10902 3.数据集地址 论文摘要的翻译 TTA在接收训练分布外的测试域样本时对深度神经网络进行自适应。在这样设置下&#xff0c;模型只能访问在线未标记的测试样…