【大数据面试题】005 谈一谈 Flink Watermark 水印

一步一个脚印,一天一道面试题。

感觉我现在很难把水印描述的很好,但,完成比完美更重要。后续我再补充。各位如果有什么建议或补充也欢迎留言。

在实时处理任务时,由于网络延迟,人工异常,各种问题,数据往往会出现乱序,不按照我们的预期到达处理框架。
WaterMark 水印,就是为了一定程度的解决数据,延迟乱序问题的。

使用 WaterMark 一般有以下几个步骤:

  • 定义时间特性
    (Flink 1.12 已废弃,默认使用 事件时间)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • 设置 Watermark 策略,赋值事件时间
        // 分配时间戳和水位线DataStream<Tuple2<Long, Integer>> withTimestampsAndWatermarks = parsedStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> element.f0));

话不多说,直接给个 Watermark 水印样例代码。


public class SimpleWatermarkExample {public static void main(String[] args) throws Exception {// 设置流执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 socket 文本流接收数据DataStream<String> input = env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1));// 解析输入的数据DataStream<Tuple2<Long, Integer>> parsedStream = input.map(new MapFunction<String, Tuple2<Long, Integer>>() {@Overridepublic Tuple2<Long, Integer> map(String value) throws Exception {String[] parts = value.split(",");return new Tuple2<>(Long.parseLong(parts[0]), Integer.parseInt(parts[1]));}});// 分配时间戳和水位线DataStream<Tuple2<Long, Integer>> withTimestampsAndWatermarks = parsedStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> element.f0));// 使用窗口函数统计每10秒内的最大值DataStream<String> maxValues = withTimestampsAndWatermarks.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowFunction<Tuple2<Long, Integer>, String, TimeWindow>() {@Overridepublic void apply(TimeWindow window, Iterable<Tuple2<Long, Integer>> values, Collector<String> out) throws Exception {int maxValue = Integer.MIN_VALUE;for (Tuple2<Long, Integer> value : values) {maxValue = Math.max(maxValue, value.f1);}out.collect("Window: " + window + " Max Value: " + maxValue);}});// 打印结果maxValues.print();// 执行程序env.execute("Simple Flink Watermark Example");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/674793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PyTorch、NCNN、CV::Mat三者张量的shape

目录 一、PyTorch二、NCNN三、CV::Mat 一、PyTorch 在 PyTorch 中&#xff0c;张量&#xff08;Tensor&#xff09;的形状通常按照 (N, C, H, W) 的顺序排列&#xff0c;其中&#xff1a; N 是批量大小&#xff08;batch size&#xff09; C 是通道数&#xff08;channel numb…

【什么是IDE?新手用哪个IDE比较好?——详细讲解】

什么是IDE&#xff1f;新手用哪个IDE比较好&#xff1f; 1. 什么是IDE&#xff1f;2. 新手适用的IDE 1. 什么是IDE&#xff1f; IDE是集成开发环境&#xff08;Integrated Development Environment&#xff09;的缩写&#xff0c;它是集合了程序开发中多种工具的应用软件。IDE…

机器学习 | 深入集成学习的精髓及实战技巧挑战

目录 xgboost算法简介 泰坦尼克号乘客生存预测(实操) lightGBM算法简介 《绝地求生》玩家排名预测(实操) xgboost算法简介 XGBoost全名叫极端梯度提升树&#xff0c;XGBoost是集成学习方法的王牌&#xff0c;在Kaggle数据挖掘比赛中&#xff0c;大部分获胜者用了XGBoost。…

webp是什么格式,怎么转成.jpg

WebP是一种旨在加快图像加载时间的现代图像格式。这种格式由Google开发&#xff0c;支持无损压缩和有损压缩。WebP格式的图像文件通常比同等质量的JPEG或PNG文件小&#xff0c;这使得它们在提高网页加载速度方面特别有用。 要将WebP格式转换成JPEG格式&#xff0c;我们可以使用…

Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类

目录 一、失败方案串口监听工具Controller层MySerialPortEventListenerimpl 二、成功方案串口监听工具Controller层MySerialPortEventListenerimpl前端Api 在之前的文章中&#xff0c;我们讨论了使用单例模式的SerialPortEventListener类。然而&#xff0c;这种模式在某些情况下…

【代码随想录26】332.重新安排行程 51.N皇后 37.解数独

目录 332.重新安排行程题目描述参考代码 51.N皇后题目描述参考代码 37.解数独题目描述参考代码 332.重新安排行程 题目描述 给你一份航线列表 tickets &#xff0c;其中 tickets[i] [fromi, toi] 表示飞机出发和降落的机场地点。请你对该行程进行重新规划排序。 所有这些机…

09-错误处理

上一篇&#xff1a;08-常用集合(容器) 在软件中&#xff0c;错误是一个不争的事实&#xff0c;因此 Rust 提供了许多功能来处理出错的情况。在许多情况下&#xff0c;Rust 要求您在编译代码之前承认出错的可能性并采取一些措施。这一要求可确保您在将代码部署到生产环境之前发现…

JRebel激活-nginx版本

nginx转发流量&#xff08;代替其他网上说的那个工具&#xff09; proxy_pass http://idea.lanyus.com; 工具激活 填写内容说明&#xff1a; 第一行的激活网址是&#xff1a;http://127.0.0.1:8888/ 正确的GUID。GUID 可以通过专门的网站来生成&#xff08;点击打开&#…

kettle控件-复制记录到结果/ 从结果获取记录的使用

在数据采集过程中&#xff0c;遇到对方数据传送不及时的情况&#xff0c;导致数据漏采集&#xff0c;需要手工反复补采。为了解决这一问题&#xff0c;可以利用kettle的复制记录到结果/从结果获取记录控件。 job的整个流程如下&#xff1a; 设置变量&#xff1a; 创建目录: ge…

STM32输出PWM波控制180°舵机

时间记录&#xff1a;2024/2/8 一、PWM介绍 &#xff08;1&#xff09;脉冲宽度调制 &#xff08;2&#xff09;占空比&#xff1a;高电平时间占整个周期时间的比例 &#xff08;3&#xff09;STM32通过定时器实现PWM时具有两种模式 PWM1模式&#xff1a;向上计数模式下&…

软件测试工程师——缺陷(一篇足以)

目录 定义 缺陷的类型 缺陷的严重程度 缺陷的状态 缺陷的根源 ​缺陷的来源 缺陷的起源 缺陷的生命周期 缺陷的识别 缺陷报告模板 编写缺陷报告的目的 缺陷报告编写的准则 缺陷描述的准则 定义 1. 软件未实现产品说明书中所提及的功能 2. 软件实现了产品说明书中…

Python入门知识点分享——(十九)私有属性和方法

上文我们介绍了面向对象的基础知识&#xff0c;了解了类和对象的联系和语法&#xff0c;这次我们就紧接着来介绍面向对象中的私有特点——私有属性和私有方法。 私有属性&#xff0c;顾名思义是指不能在类的外部被使用或直接访问的属性。私有属性严格意义上来说并不能算做第三…

第63讲个人中心用户信息动态显示实现

个人中心页面实现 &#xff08;补充前面的取消按钮逻辑&#xff09; 个人中心用户信息动态显示实现 index.wxml <view class"user_info"><!-- 用户背景信息开始 --><view class"user_info_bg"><view class"user_info_wrap&q…

echarts图表插件

图表组件 ECharts&#xff0c;全称为Enterprise Charts&#xff0c;是一个使用JavaScript实现的开源可视化库。它主要用于数据可视化领域&#xff0c;能够方便地创建出直观、交互性强的图表。ECharts由百度团队开发&#xff0c;目前是Apache的顶级项目之一。ECharts支持的图表…

JVM调优(Window下)

1、编写代码&#xff0c;像下面代码这样&#xff0c;产生OOM&#xff0c; private static final Integer K 1024;/*** 死循环&#xff0c;验证JVM调优* return*/GetMapping(value "/deadLoop")public void deadLoop(){int size K * K * 8;List<byte[]> lis…

Java后端技术助力,党员学习平台更稳定

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

C++奇淫巧计:如何快速观察内存分配

需求 在不严肃的场景下&#xff0c;你想测试、跟踪自己的代码的内存分配&#xff0c;粗略评估有无错误的、意外的行为&#xff0c;怎么做&#xff1f; 代码 很简单&#xff0c;直接重写 new 操作符。 #include <iostream>static int malloc_count 0;void* operator …

38. C++ 引用的本质

1. C 引用的本质 1.1 引用的底层实现方式 引用被称为变量的别名&#xff0c;它不能脱离被引用对象独立存在&#xff0c;这是在高级语言层面的概念和理解&#xff0c;并未揭示引用的实现方式。常见错误说法是“引用“自身不是一个变量&#xff0c;甚至编译器可以不为引用分配空…

Day 41 | 动态规划 343. 整数拆分 、 96.不同的二叉搜索树

343. 整数拆分 题目 文章讲解 视频讲解 思路&#xff1a;不需要考虑正整数为1的情况。 dp[i]表示正整数i拆分后结果的最大乘积&#xff0c;递推公式中 j 表示拆分的正整数&#xff0c;最大不会超过 i-j &#xff0c;否则会轮回。dp[i-j]是正整数 i-j 拆分后结果最大乘积。 c…

堆排及时间复杂度分析

箴言: 初始阶段&#xff0c;不需要去纠结那一种更优美&#xff0c;非要找出那一种是最好的&#xff0c;其实能解决问题的就是好办法。 一&#xff0c;常见排序时间复杂度 冒泡快排归并堆排桶排时间O(n^2)O(nlogn)O(nlogn)O(nlogn)kn空间O(1)O(1)O(nlogn)O(1)kn 二&#xff…