flink 最后一个窗口一直没有新数据,窗口不关闭问题

flink 最后一个窗口一直没有新数据,窗口不关闭问题

  • 自定义实现 WatermarkStrategy接口

自定义实现 WatermarkStrategy接口

窗口类型:滚动窗口
代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);@Overridepublic WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<JSONObject>() {private long maxWatermark;@Overridepublic void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));state.f0 = System.currentTimeMillis();System.out.println("maxWatermark is " + maxWatermark);state.f1 = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {//乱序时间long outOfTime = 3000L;if (maxWatermark - outOfTime <=0){} else {// 10s内没有数据则关闭当前窗口System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));System.out.println("state.f1:" + state.f1);if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));state.f1 = true;System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));} else {System.out.println("正常发送水印");watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));}}}};}}

代码部分逻辑说明
在这里插入图片描述若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
在这里插入图片描述
watermark 周期生成()的疑问:
1、默认200ms,会连续生成4次后,不会继续生成了
2、设置了周期生成间隔,env.getConfig().setAutoWatermarkInterval(1000L); 只会周期生成一次

参考:https://blog.csdn.net/lr131425/article/details/127422833

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/632103.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10种较流行的网络安全框架及特点分析

网络安全框架主要包括安全控制框架&#xff08;SCF&#xff09;、安全管理框架&#xff08;SMP&#xff09;和安全治理框架&#xff08;SGF&#xff09;等类型。对于那些希望按照行业最佳实践来开展网络安全能力建设的企业来说&#xff0c;理解并实施强大的网络安全框架至关重要…

Demystifying CXL Memory with Genuine CXL-Ready Systems and Devices——论文阅读

MICRO 2023 Paper CXL论文阅读汇总 问题 对更大容量和更高带宽的内存的不断增长的需求推动了基于Compute eXpress Link&#xff08;CXL&#xff09;的内存扩展和分离技术的创新。特别是&#xff0c;基于CXL的内存扩展技术不仅能够经济地扩展内存容量和带宽&#xff0c;还能够…

【数据结构】二叉树(遍历,递归)

&#x1f308;个人主页&#xff1a;秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343&#x1f525; 系列专栏&#xff1a;《数据结构》https://blog.csdn.net/qinjh_/category_12536791.html?spm1001.2014.3001.5482 ​​​ 目录 二叉树遍历规则 前序遍历 ​…

flutter 客户端日志上传定位错误信息

背景 flutter 开发的app 安装到真机上 无法定位报错信息&#xff0c;只能使用usb连接电脑 使用adb logcat来查看日志效率低下。 想法 如果将flutter 开发的app 运行的时候 将日志写进一个日志文件里面去&#xff0c;然后给flutter app搭建一个http服务器&#xff0c;后端知道对…

如何使用VNC实现Win系统远程桌面Ubuntu图形化界面【内网穿透】

文章目录 推荐前言1. ubuntu安装VNC2. 设置vnc开机启动3. windows 安装VNC viewer连接工具4. 内网穿透4.1 安装cpolar【支持使用一键脚本命令安装】4.2 创建隧道映射4.3 测试公网远程访问 5. 配置固定TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址5.3 …

vue:处理base64格式文件pdf、图片预览

一、需求&#xff1a;后端返回是base64数据&#xff0c;需要前端处理来展示文件。 二、实现方法&#xff1a; 解释一下这段代码的功能&#xff1a; &#xff09;preview(item) 是一个函数&#xff0c;接受一个参数 item&#xff0c;其中包含了文件的相关信息。 &#xff09;首…

HTML5+CSS3+JS小实例:实时给中文添加拼音

实例:实时给中文添加拼音 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"> <head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"&…

使用freessl为网站获取https证书及配置详细步骤

文章目录 一、进入freessl网站二、修改域名解析记录三、创建证书四、配置证书五、服务启动 一、进入freessl网站 首先进入freessl网站&#xff0c;需要注册一个账号 freessl网站 进入网站后填写自己的域名 接下来要求进行DCV配置 二、修改域名解析记录 到域名管理处编辑域名…

智慧水务管理的发展历史有哪些阶段呢

随着科技的飞速发展&#xff0c;智慧水务管理已经成为了城市基础设施的重要组成部分。从传统的人工管理到现代的智能化管理&#xff0c;智慧水务经历了多个阶段的发展历程。本文将带您了解智慧水务管理的历史演变。 一、初级阶段&#xff1a;人工管理 在智慧水务管理发展的初期…

MS2358——96KHz、24bit 音频 ADC

产品简述 MS2358 是带有采样速率 8kHz-96kHz 的立体声音频模数 转换器&#xff0c;适合于面向消费者的专业音频系统。 MS2358 通过使用增强型双位 Δ - ∑ 技术来实现其高精度 的特点。 MS2358 支持单端的模拟输入&#xff0c;所以不需要外部器 件&#xff0c;非常适…

maven环境搭建(打包项目)

Maven:直观来讲就是打包写好的代码封装 Apahche 软件基金会&#xff08;非营业的组织&#xff0c;把一些开源软件维护管理起来&#xff09; maven apahce的一个开宇拿项目&#xff0c;是一个优秀的项目构建&#xff08;管理工具&#xff09; maven 管理项目的jar 以及jar与j…

[C++]:12:模拟实现list

[C]:12:模拟实现list 一.看一看SGI的stl_list的源码&#xff1a;1.基础结构构造函数1.节点结构&#xff1a;2.节点构造函数&#xff1a;3.链表结构&#xff1a;4.链表的构造函数&#xff1a; 2.析构1.节点析构&#xff1a;2.链表的析构&#xff1a; 3.迭代器 二.模拟实现list1.…

py的函数多返回值

前言:之前我们学过了py中函数&#xff0c;这一章我们来学习它的进阶版 目录 一.函数的多返回值 1.1关于函数的多返回值 1.2举例 二.函数多种传参方式 2.1关于多种传参方式 2.2关键字参数 2.2缺省参数 2.3不定长参数 2.4小结 三.匿名函数 3.1关于函数如何作为参数进行…

android 自定义八边形进度条

自定义八边形动画效果图如下 绘制步骤&#xff1a; 1.先绘制橙色底部八边形实心 2.黑色画笔绘制第二层&#xff0c;让最外层显示一条线条宽度即可 3.再用黄色画笔绘制黄色部分 4.使用渐变画笔根据当前进度绘制覆盖黄色部分 5.使用黑色画笔根据当前进度绘制刻度条 6.黑色画笔绘制…

使用Sqoop的并行处理:扩展数据传输

使用Sqoop的并行处理是在大数据环境中高效传输数据的关键。它可以显著减少数据传输的时间&#xff0c;并充分利用集群资源。本文将深入探讨Sqoop的并行处理能力&#xff0c;提供详细的示例代码&#xff0c;以帮助大家更全面地了解和应用这一技术。 Sqoop的并行处理 在开始介绍…

Java网络编程:概述--快速入门

I. 介绍 1.1 什么是网络编程 - 网络编程是指通过计算机网络实现程序之间的通信。在Java中&#xff0c;网络编程通常涉及到数据的传输、通信协议的使用以及与网络相关的各种操作。 1.2. 为什么学习Java网络编程 - Java网络编程是Java开发者重要的技能之一&#xff0c;因为它允许…

mybatis----小细节

1、起别名 在MyBatis中&#xff0c;<typeAliases>元素用于定义类型别名&#xff0c;它可以将Java类名映射为一个更简短的别名&#xff0c;这样在映射文件中可以直接使用别名而不需要完整的类名。 下面是一个示例&#xff1a; 在mybatis核心配置文件中配置typeAliases标…

SSH隧道技术

SSH隧道 简介 SSH隧道是一种通过SSH协议在两个网络节点之间建立安全通信的技术。它可以用于多种用途&#xff0c;包括加密和保护敏感数据传输、绕过防火墙限制、远程访问内部服务等。 应用&#xff1a; 端口转发&#xff1a;SSH隧道可以将本地端口转发到远程主机上&#xf…

合并K个升序链表(LeetCode 23)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路方法一&#xff1a;顺序合并方法二&#xff1a;分治合并方法三&#xff1a;使用优先队列合并 参考文献 1.问题描述 给你一个链表数组&#xff0c;每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中&#xff…

Python——基本语法(二)

一、while 循环 语法&#xff1a; while 条件表达式:条件表达示为真&#xff0c;就执⾏这⾥的代码&#xff0c;必须缩进 4 个空格多⾏代码保持缩进⼀致 条件表达式可以是: True # 布尔值的 True 1 < 10 # 凡是在 if 语句中使⽤的判断表达示&#xff0c;这⾥都可以使…