Flink之Watermark策略代码模板

方式作用
WatermarkStrategy.noWatermarks()不生成watermark
WatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略
WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略
WatermarkStrategy.forGenerator()自定义watermark生成策略
  • noWatermarks
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark生成策略,选择不生成watermarkWatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    关于noWaterMarks()的使用没有太多内容.
  • forMonotonousTimestamps
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用紧跟最大事件时间策略WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于forMonotonousTimestamps()可说内容并不多,如果选择了forMonotonousTimestamps这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
    通过源码内容可以看到forMonotonousTimestamps底层也是使用的forBoundedOutOfOrderness方式,只不过将容错时间设置为了0,源码如下:
    // 首先看这里,继承的BoundedOutOfOrdernessWatermarks
    public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {/** Creates a new watermark generator with for ascending timestamps. */public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0)); // 这里将容错时间设置为了0}
    }
    
  • forBoundedOutOfOrderness
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用允许水位线乱序策略,并设置最大容错时间为2sWatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S,那么前后的数据差最大只能是2S,如果差值大于2S,后来的这条数据就会被抛弃.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/98588.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【重拾C语言】七、指针(一)指针与变量、指针操作、指向指针的指针

目录 前言 七、指针 7.1 指针与变量 7.1.1 指针类型和指针变量 7.1.2 指针所指变量 7.1.3 空指针、无效指针 7.2 指针操作 7.2.1 指针的算术运算 7.2.2 指针的比较 7.2.3 指针的递增和递减 7.3 指向指针的指针 前言 指针是C语言中一个重要的概念正确灵活运用指针 可…

机器学习笔记 - 两个静态手势识别的简单示例

一、关于手势识别 手势识别方法通常分为两类:静态或动态。 静态手势是那些只需要在分类器的输入处处理单个图像的手势,这种方法的优点是计算成本较低。动态手势需要处理图像序列和更复杂的手势识别方法。 进一步了解可以参考下面链接。 静态手势识别和动态手势识别的区别和技…

【MATLAB源码-第43期】基于matlab的turbo码误码率仿真比较不同迭代次数,采用logmap/sova算法。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 Turbo码是一种前向纠错码 (Forward Error Correction, FEC)&#xff0c;在 1993 年由法国的两位研究员 Claude Berrou 和 Alain Glavieux 提出。这种编码技术以其接近 Shannon 极限的高性能而受到广泛关注。以下是关于 Turbo…

手摸手系列之批量修改MySQL数据库所有表中某些字段的类型

在迁移老项目的数据库时&#xff0c;使用Navicat Premium的数据传输功能同步了表结构和数据。但是&#xff0c;发现某些字段的数据类型出现了错误&#xff0c;例如&#xff0c;租户ID从Oracle的NUMBER类型变成了MySQL的decimal(10)&#xff0c;正确的应该是bigInt(20)。此外&am…

这道面试题工作中经常碰到,但 99% 的程序员都答不上来

小时候都被问过一个脑筋急转弯&#xff0c;把大象放进冰箱有几个步骤&#xff1f;我们一开始都会抓耳挠腮&#xff0c;去想着该如何把大象塞进冰箱。最终揭晓的答案却根本不关心具体的操作方法&#xff0c;只是提供了 3 个步骤组成的流程&#xff0c;「把冰箱打开&#xff0c;把…

【力扣每日一题】2023.10.7 股票价格跨度

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 给我们一个数组表示不同时间的股票的价格&#xff0c;要我们按照顺序返回每天的股票价格跨度&#xff0c;价格跨度就是股票价格小于或等于…

[python 刷题] 76 Minimum Window Substring

[python 刷题] 76 Minimum Window Substring 题目&#xff1a; Given two strings s and t of lengths m and n respectively, return the minimum window substring of s such that every character in t (including duplicates) is included in the window. If there is no …

voc数据集格式与yolo数据集格式的区别及相互转化

Pascal VOC数据集是目标检测领域最常用的标准数据集之一&#xff0c;几乎所有检测方向的论文都会给出其在VOC数据集上训练并评测的效果。VOC数据集包含的信息非常全&#xff0c;它不仅被拿来做目标检测&#xff0c;也可以拿来做分割等任务&#xff0c;因此除了目标检测所需的文…

2023高交会“创新驱动发展·智慧赋能未来”招商工作已接近尾声

第二十五届中国国际高新技术成果交易会&#xff08;简称“高交会”&#xff09;将于2023年11月15日至19日在深圳会展中心举行。本届高交会以“创新驱动发展智慧赋能未来”为主题&#xff0c;聚焦战略性新兴产业和未来产业&#xff0c;集中展示中国高新技术成果和创新实力。 作为…

androidStudio第一次运行报错无法运行

安卓第一次运行失败 大家好&#xff0c;我使用androidStudio新建了一个测试demo第一次运行&#xff0c;结果失败了&#xff0c;显示如下图&#xff1a; 然后查了各种方法&#xff0c;都是没有用&#xff0c;最后 历经困难&#xff0c;还是找到了&#xff0c;原来是 gradle的依…

flink redis connector需要防止包冲突

1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 <dependency><groupId>org.apache.bahir</groupId><artifactId

Python 无废话-办公自动化Excel格式美化

设置字体 在使用openpyxl 处理excel 设置格式&#xff0c;需要导入Font类&#xff0c;设置Font初始化参数&#xff0c;常见参数如下&#xff1a; 关键字参数 数据类型 描述 name 字符串 字体名称&#xff0c;如Calibri或Times New Roman size 整型 大小点数 bold …

分类预测 | MATLAB实现POA-CNN鹈鹕算法优化卷积神经网络多特征分类预测

分类预测 | MATLAB实现POA-CNN鹈鹕算法优化卷积神经网络多特征分类预测 目录 分类预测 | MATLAB实现POA-CNN鹈鹕算法优化卷积神经网络多特征分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现POA-CNN鹈鹕算法优化卷积神经网络多特征分类预测&#xff0…

.Net 对象生命周期由浅入深2

大家都知道.net程序创建对象后没法写代码销毁对象&#xff0c;它有它自己的处理机制&#xff0c;今天来大概说说它的原理&#xff0c;探讨下它是如何管理对象即管理内存的 在程序里使用new 关键字实例化一个对象 如果这个对象类型是引用类型则在堆上分配然后由GC管理 new 操作…

如何开发移动应用:iOS和Android的比较

如何开发移动应用&#xff1a;iOS和Android的比较 移动应用开发领域一直以来都备受关注&#xff0c;而iOS和Android作为两大主要的移动操作系统&#xff0c;各自拥有强大的生态系统和开发工具。在本文中&#xff0c;我们将比较iOS和Android移动应用开发的关键方面&#xff0c;…

C# OpenCvSharp 实现迷宫解密

效果 项目 代码 using OpenCvSharp; using System; using System.Drawing; using System.Windows.Forms;namespace OpenCvSharp_实现迷宫解密 {public partial class Form1 : Form{public Form1(){InitializeComponent();}private void Form1_Load(object sender, EventArgs e…

掌握 BERT:自然语言处理 (NLP) 从初级到高级的综合指南(1)

简介 BERT&#xff08;来自 Transformers 的双向编码器表示&#xff09;是 Google 开发的革命性自然语言处理 (NLP) 模型。它改变了语言理解任务的格局&#xff0c;使机器能够理解语言的上下文和细微差别。在本文[1]中&#xff0c;我们将带您踏上从 BERT 基础知识到高级概念的旅…

Jmeter常用参数化技巧总结!

说起接口测试&#xff0c;相信大家在工作中用的最多的还是Jmeter。 JMeter是一个100&#xff05;的纯Java桌面应用&#xff0c;由Apache组织的开放源代码项目&#xff0c;它是功能和性能测试的工具。具有高可扩展性、支持Web(HTTP/HTTPS)、SOAP、FTP、JAVA 等多种协议。 在做…

【c#】线程Monitor.Wait和Monitor.Pulse使用

介绍 以一个简易版的数据库连接池的实现来说明一下 连接池的connection以队列来管理 getConnection的时候&#xff0c;如果队列中connection个数小于50&#xff0c;且暂时无可用的connection&#xff08;个数为0或者peek看下头部需要先出那个元素还处于不可用状态&#xff09;…

Dockerfile自定义容器

1、Dockerfile Dockerfile 是用于构建 Docker 镜像的文本文件&#xff0c;其中包含一系列的指令和配置&#xff0c;用于定义镜像的构建过程。通过 Dockerfile&#xff0c;你可以定义镜像的基础操作系统、依赖、环境设置、应用程序等信息&#xff0c;从而实现可复制、自动化的镜…