flink 最后一个窗口一直没有新数据,窗口不关闭问题

flink 最后一个窗口一直没有新数据,窗口不关闭问题

  • 自定义实现 WatermarkStrategy接口

自定义实现 WatermarkStrategy接口

窗口类型:滚动窗口
代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);@Overridepublic WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<JSONObject>() {private long maxWatermark;@Overridepublic void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));state.f0 = System.currentTimeMillis();System.out.println("maxWatermark is " + maxWatermark);state.f1 = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {//乱序时间long outOfTime = 3000L;if (maxWatermark - outOfTime <=0){} else {// 10s内没有数据则关闭当前窗口System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));System.out.println("state.f1:" + state.f1);if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));state.f1 = true;System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));} else {System.out.println("正常发送水印");watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));}}}};}}

代码部分逻辑说明
在这里插入图片描述若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
在这里插入图片描述
watermark 周期生成()的疑问:
1、默认200ms,会连续生成4次后,不会继续生成了
2、设置了周期生成间隔,env.getConfig().setAutoWatermarkInterval(1000L); 只会周期生成一次

参考:https://blog.csdn.net/lr131425/article/details/127422833

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/632103.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#设计模式教程(3):抽象工厂模式

抽象工厂模式是一种创建型设计模式,它提供了一个接口,用于创建一系列相关或相互依赖对象的家族,而无需指定它们具体的类。这种模式是工厂方法模式的一种扩展,它用于创建的不是一个产品,而是多个产品的家族。 C# 代码实现 以下是C#中实现抽象工厂模式的一个简单示例: 首…

10种较流行的网络安全框架及特点分析

网络安全框架主要包括安全控制框架&#xff08;SCF&#xff09;、安全管理框架&#xff08;SMP&#xff09;和安全治理框架&#xff08;SGF&#xff09;等类型。对于那些希望按照行业最佳实践来开展网络安全能力建设的企业来说&#xff0c;理解并实施强大的网络安全框架至关重要…

Demystifying CXL Memory with Genuine CXL-Ready Systems and Devices——论文阅读

MICRO 2023 Paper CXL论文阅读汇总 问题 对更大容量和更高带宽的内存的不断增长的需求推动了基于Compute eXpress Link&#xff08;CXL&#xff09;的内存扩展和分离技术的创新。特别是&#xff0c;基于CXL的内存扩展技术不仅能够经济地扩展内存容量和带宽&#xff0c;还能够…

第3章 语句与函数

插入汇编语句 asm语句可在C或C程序中插入汇编代码。 VS2019编译器使用“_asm”插入汇编代码。 static_assert用于提供静态断言服务&#xff0c;即在编译时判定执行条件是否满足。 使用格式为 static_assert(条件表达式, ”输出信息“)。 不满足则编译报错。 C函数 函数说明不定…

【数据结构】二叉树(遍历,递归)

&#x1f308;个人主页&#xff1a;秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343&#x1f525; 系列专栏&#xff1a;《数据结构》https://blog.csdn.net/qinjh_/category_12536791.html?spm1001.2014.3001.5482 ​​​ 目录 二叉树遍历规则 前序遍历 ​…

flutter 客户端日志上传定位错误信息

背景 flutter 开发的app 安装到真机上 无法定位报错信息&#xff0c;只能使用usb连接电脑 使用adb logcat来查看日志效率低下。 想法 如果将flutter 开发的app 运行的时候 将日志写进一个日志文件里面去&#xff0c;然后给flutter app搭建一个http服务器&#xff0c;后端知道对…

前端环境安装【mac/window,nvm,node,npm,yarn,react】

目录 nvm&#xff1a;node版本管理器 安装 window mac 常见命令 Node、npm yarn React a.全局方式 b.临时方式 运行 nvm&#xff1a;node版本管理器 nvm 主要是用来管理 nodejs 和 npm 版本的工具&#xff0c;可以用来切换不同版本的 nodejs。安装nvm之前先卸载nod…

如何使用VNC实现Win系统远程桌面Ubuntu图形化界面【内网穿透】

文章目录 推荐前言1. ubuntu安装VNC2. 设置vnc开机启动3. windows 安装VNC viewer连接工具4. 内网穿透4.1 安装cpolar【支持使用一键脚本命令安装】4.2 创建隧道映射4.3 测试公网远程访问 5. 配置固定TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址5.3 …

vue:处理base64格式文件pdf、图片预览

一、需求&#xff1a;后端返回是base64数据&#xff0c;需要前端处理来展示文件。 二、实现方法&#xff1a; 解释一下这段代码的功能&#xff1a; &#xff09;preview(item) 是一个函数&#xff0c;接受一个参数 item&#xff0c;其中包含了文件的相关信息。 &#xff09;首…

HTML5+CSS3+JS小实例:实时给中文添加拼音

实例:实时给中文添加拼音 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"> <head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"&…

使用freessl为网站获取https证书及配置详细步骤

文章目录 一、进入freessl网站二、修改域名解析记录三、创建证书四、配置证书五、服务启动 一、进入freessl网站 首先进入freessl网站&#xff0c;需要注册一个账号 freessl网站 进入网站后填写自己的域名 接下来要求进行DCV配置 二、修改域名解析记录 到域名管理处编辑域名…

项目开发中安全问题以及解决办法——客户传进来的数据不可信

用户传进来的数据是不可信的&#xff0c;比如下面这种情况下&#xff1a; PostMapping("/order") public void wrong(RequestBody Order order) { this.createOrder(order); } Data public class Order { private long itemId; //商品ID private BigDecimal ite…

智慧水务管理的发展历史有哪些阶段呢

随着科技的飞速发展&#xff0c;智慧水务管理已经成为了城市基础设施的重要组成部分。从传统的人工管理到现代的智能化管理&#xff0c;智慧水务经历了多个阶段的发展历程。本文将带您了解智慧水务管理的历史演变。 一、初级阶段&#xff1a;人工管理 在智慧水务管理发展的初期…

MS2358——96KHz、24bit 音频 ADC

产品简述 MS2358 是带有采样速率 8kHz-96kHz 的立体声音频模数 转换器&#xff0c;适合于面向消费者的专业音频系统。 MS2358 通过使用增强型双位 Δ - ∑ 技术来实现其高精度 的特点。 MS2358 支持单端的模拟输入&#xff0c;所以不需要外部器 件&#xff0c;非常适…

maven环境搭建(打包项目)

Maven:直观来讲就是打包写好的代码封装 Apahche 软件基金会&#xff08;非营业的组织&#xff0c;把一些开源软件维护管理起来&#xff09; maven apahce的一个开宇拿项目&#xff0c;是一个优秀的项目构建&#xff08;管理工具&#xff09; maven 管理项目的jar 以及jar与j…

[C++]:12:模拟实现list

[C]:12:模拟实现list 一.看一看SGI的stl_list的源码&#xff1a;1.基础结构构造函数1.节点结构&#xff1a;2.节点构造函数&#xff1a;3.链表结构&#xff1a;4.链表的构造函数&#xff1a; 2.析构1.节点析构&#xff1a;2.链表的析构&#xff1a; 3.迭代器 二.模拟实现list1.…

py的函数多返回值

前言:之前我们学过了py中函数&#xff0c;这一章我们来学习它的进阶版 目录 一.函数的多返回值 1.1关于函数的多返回值 1.2举例 二.函数多种传参方式 2.1关于多种传参方式 2.2关键字参数 2.2缺省参数 2.3不定长参数 2.4小结 三.匿名函数 3.1关于函数如何作为参数进行…

HEXO搭建个人博客

Hexo是一款基于Node.js的静态博客框架&#xff0c;可以生成静态网页托管在GitHub上。中文文档见HEXO 配置环境 安装Git&#xff1a;下载并安装Git 检查git是否正确安装&#xff1a; git --version 安装Node.js&#xff1a;Node.js 为大多数平台提供了官方的安装程序。注意安装…

Day30- 贪心算法part04

一、柠檬水找零 题目一&#xff1a;860. 柠檬水找零 860. 柠檬水找零 在柠檬水摊上&#xff0c;每一杯柠檬水的售价为 5 美元。顾客排队购买你的产品&#xff0c;&#xff08;按账单 bills 支付的顺序&#xff09;一次购买一杯。 每位顾客只买一杯柠檬水&#xff0c;然后向…

android 自定义八边形进度条

自定义八边形动画效果图如下 绘制步骤&#xff1a; 1.先绘制橙色底部八边形实心 2.黑色画笔绘制第二层&#xff0c;让最外层显示一条线条宽度即可 3.再用黄色画笔绘制黄色部分 4.使用渐变画笔根据当前进度绘制覆盖黄色部分 5.使用黑色画笔根据当前进度绘制刻度条 6.黑色画笔绘制…