43、Flink 自定义窗口触发器代码示例

1、方法说明

1)onElement() 方法在每个元素被加入窗口时调用。

返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件
CONTINUE: 什么也不做
FIRE: 触发计算
PURGE: 清空窗口内的元素
FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

2)onProcessingTime() 方法在注册的 processing-time timer 触发时调用。

返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件
CONTINUE: 什么也不做
FIRE: 触发计算
PURGE: 清空窗口内的元素
FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

3)onEventTime() 方法在注册的 event-time timer 触发时调用。

返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件
CONTINUE: 什么也不做
FIRE: 触发计算
PURGE: 清空窗口内的元素
FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

4)clear() 方法处理在对应窗口被移除时所需的逻辑。

5)onMerge() 方法与有状态的 trigger 相关,该方法会在两个窗口合并时,将窗口对应 trigger 的状态合并,比如使用会话窗口时。

2、完整代码示例

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;/*** FIRE 会保留被触发的窗口中的内容,Flink 内置的 trigger 默认使用 `FIRE`。* FIRE_AND_PURGE 不会保留被触发的窗口中的内容* Purge 只会移除窗口中的内容,不会移除关于窗口的 meta-information 和 trigger 的状态*/
public class _08_WindowTriggerCustom {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> input = env.socketTextStream("localhost", 8888);ArrayList<String> keyList = new ArrayList<>();keyList.add("c");keyList.add("d");// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);// Processing-Timeinput.keyBy(e -> String.valueOf(e.hashCode() % 2)).window(GlobalWindows.create()).trigger(new MyCustomWindowTrigger<>(keyList)).apply(new WindowFunction<String, String, String, GlobalWindow>() {@Overridepublic void apply(String s, GlobalWindow globalWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {for (String word : iterable) {collector.collect(word);}}}).print();env.execute();}
}class MyCustomWindowTrigger<W extends Window> extends Trigger<String, W> {private List<String> keyWords;private ValueStateDescriptor<String> valueStateDescriptor;public MyCustomWindowTrigger(List<String> keyWords) {this.keyWords = keyWords;this.valueStateDescriptor = new ValueStateDescriptor<>("cnt", String.class);}// onElement() 方法在每个元素被加入窗口时调用。// 返回 `TriggerResult` 来决定 trigger 如何应对到达窗口的事件//- `CONTINUE`: 什么也不做//- `FIRE`: 触发计算//- `PURGE`: 清空窗口内的元素//- `FIRE_AND_PURGE`: 触发计算,计算结束后清空窗口内的元素@Overridepublic TriggerResult onElement(String input, long l, W w, TriggerContext triggerContext) throws Exception {//当窗口内的元素匹配到首个关键字时触发,触发前的元素用 '-' 拼接//a0\b0\a1\b1\c2\d2//FIRE_AND_PURGE=会清除窗口状态//FIRE=不会清除窗口状态//2> b0//2> a1//2> c2////1> a0//1> b1//1> d2ValueState<String> partitionedState = triggerContext.getPartitionedState(valueStateDescriptor);String value = partitionedState.value();for (String keyWord : keyWords) {if (input.startsWith(keyWord)) {if (value == null || value.isEmpty()) {partitionedState.update(input);}value += "-" + input;partitionedState.update(value);}}if (partitionedState.value() != null && !partitionedState.value().isEmpty()) {
//            return TriggerResult.FIRE_AND_PURGE;return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}// onProcessingTime() 方法在注册的 processing-time timer 触发时调用。// 返回 `TriggerResult` 来决定 trigger 如何应对到达窗口的事件//- `CONTINUE`: 什么也不做//- `FIRE`: 触发计算//- `PURGE`: 清空窗口内的元素//- `FIRE_AND_PURGE`: 触发计算,计算结束后清空窗口内的元素@Overridepublic TriggerResult onProcessingTime(long l, W w, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}// onEventTime() 方法在注册的 event-time timer 触发时调用。// 返回 `TriggerResult` 来决定 trigger 如何应对到达窗口的事件//- `CONTINUE`: 什么也不做//- `FIRE`: 触发计算//- `PURGE`: 清空窗口内的元素//- `FIRE_AND_PURGE`: 触发计算,计算结束后清空窗口内的元素@Overridepublic TriggerResult onEventTime(long l, W w, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}// clear() 方法处理在对应窗口被移除时所需的逻辑。@Overridepublic void clear(W w, TriggerContext triggerContext) throws Exception {ValueState<String> partitionedState = triggerContext.getPartitionedState(valueStateDescriptor);partitionedState.clear();}// onMerge() 方法与有状态的 trigger 相关,该方法会在两个窗口合并时,将窗口对应 trigger 的状态合并,比如使用会话窗口时。@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/26336.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于docker无法正常下载镜像的问题

文章目录 之前还可以正常下载镜像&#xff0c;但是一段时间之后就无法下载了&#xff0c;猜测可能是政治原因&#xff0c;无法连接到国外服务器&#xff0c;所以我设置了阿里云的镜像加速器。 配置方法如下&#xff1a; 前往阿里云&#xff08;https://help.aliyun.com/zh/acr/…

ATA-3040C功率放大器的正确使用方法

功率放大器是一种用于增强电信号功率的重要设备。正确使用功率放大器可以确保信号的稳定放大&#xff0c;并避免设备损坏。下面将介绍功率放大器的正确使用方法。 确定输入信号的功率范围&#xff1a;在使用功率放大器之前&#xff0c;需要确定输入信号的功率范围。过大的输入功…

Python 最强的5个高级函数,你会几个?

Python是一门灵活而强大的编程语言&#xff0c;具有丰富的内置函数和库。今天&#xff0c;我们将深入探讨五个非常有用的Python高级函数。这些函数不仅能够简化代码&#xff0c;还能提高代码的可读性和可维护性。 首先&#xff0c;了解下什么是高级函数&#xff1f; 高级函数…

C# —— 字符串拼接

字符串拼接的方式一 之前的算术运算符 只是用来数值类型的相加 主要做的是数学的运算 // 而string 不存在算数运算 但是可以通过加号 进行拼接 string str "123" 字符串拼接 str str "456"; Console.WriteLine(str); // "123456&q…

博客摘录「 YOLOv5模型剪枝压缩」2024年5月11日

添加L1正则来约束BN层系数 语义边缘检测和语义分割的关系调研结果为&#xff0c;语义信息可以用来增强语义分割的效果&#xff0c;也有一定的优点和采用理由&#xff0c;但此类论文的数量并不是很多&#xff0c;语义分割的多数方法还是使用深度学习直接做像素分类。在对比两者…

【Unity】如何做一个很平滑的行人动画,且可以根据行人速度动态调整动画速度?

首先我们定一下不同速度对应的行人动作状态&#xff0c;设计为四种状态&#xff1a; 静止站立Stand&#xff1a;0~maxStandSpeed走路Walk&#xff1a;minWalkSpeed~maxWalkSpeed慢跑Jog&#xff1a;minJogSpeed~maxJogSpeed快跑Run&#xff1a;大于MinRunSpeed 我们可以使用A…

FISCO BCOS x GitLink,为国产开源技术生态注入新活力

作为中国领先的区块链底层平台之一&#xff0c;FISCO BCOS 自成立以来始终致力于推动国产开源区块链技术的应用和普及。近期&#xff0c;FISCO BCOS 将开源代码托管到CCF官方代码托管平台 GitLink &#xff08;确实开源&#xff09;&#xff0c;为国产开源技术生态注入新活力。…

C++类型转换深度解析:从基础数据类型到字符串,再到基础数据类型的完美转换指南

前言 在 C 编程中&#xff0c;我们经常需要在基础数据类型&#xff08;如 int、double、float、long、unsigned int 等&#xff09;与 string 类型之间进行转换。这种转换对于处理用户输入、格式化输出、数据存储等场景至关重要。 本文将详细介绍如何在 C 中实现这些转换。 文…

刚实习的大学生如何避免招聘骗局?

大学生在求职过程中&#xff0c;常常成为招聘骗局的受害者。为了避免这种情况&#xff0c;大学生需要提高警惕&#xff0c;采取一系列措施来防范招聘骗局。以下是一些建议&#xff1a; 首先&#xff0c;大学生应当保持警惕&#xff0c;不轻信招聘信息。在求职时&#xff0c;务…

使用 Elasticsearch 设计大规模向量搜索

作者&#xff1a;Jim Ferenczi 第 1 部分&#xff1a;高保真密集向量搜索 简介 在设计向量搜索体验时&#xff0c;可用选项的数量之多可能会让人感到不知所措。最初&#xff0c;管理少量向量很简单&#xff0c;但随着应用程序的扩展&#xff0c;这很快就会成为瓶颈。 在本系列…

Linux下的/etc/resolv.conf

Linux下的/etc/resolv.conf 文件用于配置域名解析器的设置&#xff0c;告诉系统在解析域名时要查询哪些DNS服务器。nameserver&#xff1a;指定DNS服务器的IP地址。你可以列出多个nameserver&#xff0c;系统将按顺序尝试它们&#xff0c;直到找到可用的DNS服务器。 nameserve…

ip地址公和内有什么区别

在数字化世界中&#xff0c;IP地址扮演着至关重要的角色。它不仅是网络设备的身份标识&#xff0c;更是信息传输的桥梁。然而&#xff0c;并非所有IP地址都拥有相同的属性和功能。公有IP地址和私有IP地址&#xff0c;作为IP地址的两大类别&#xff0c;它们存在着显著的差异。虎…

C#A类调用B类的方法,在方法中更新B类的控件

1.首先在B类中定义静态成员 public static B bnull; 其次&#xff0c;在B类构造函数中给静态成员初始化 public B(){B this;InitializeComponent();} 在A类中&#xff0c;调用更新B类控件的方法 B.b.Method("已通过"); 2.如果当前方法所在的线程不是UI线程&…

成都百洲文化传媒有限公司电商服务的领航者

在当今数字化浪潮席卷全球的时代&#xff0c;电商行业以其独特的魅力和无穷的潜力&#xff0c;正成为推动经济发展的重要引擎。在这一领域&#xff0c;成都百洲文化传媒有限公司以其专业的电商服务和创新的营销理念&#xff0c;成为了行业的佼佼者&#xff0c;引领着电商服务的…

芯片后端对于芯片设计公司的重要性

在芯片设计流程中&#xff0c;后端设计是一个至关重要的环节&#xff0c;它直接关系到芯片从设计到实际生产的转化&#xff0c;以及最终产品的性能、可靠性、成本和上市时间。 以下是为什么芯片后端非常重要的几个关键原因&#xff1a; 物理实现&#xff1a;后端设计是芯片从逻…

【车载开发系列】车载电源介绍

【车载开发系列】车载电源介绍 【车载开发系列】车载电源介绍 【车载开发系列】车载电源介绍一. 整车的两个电源二. 整车电源的状态1&#xff09;OFF模式2&#xff09;ON模式3&#xff09;ACC模式4&#xff09;CRANK模式 三. 整车电源相关术语说明 一. 整车的两个电源 发电机&…

git子模块应用和常用用法

概念 子模块&#xff1a;分离项目不同模块&#xff0c;集成一个大的项目&#xff0c;方便模块管理&#xff0c;比如模块各自管理自己的依赖。 命令 1. 初始化拉取&#xff0c;更新子模块 git submodule update --init --recursive --remote参数&#xff1a; –init: 初始化远…

CAP理论

CAP理论 在分布式系统的世界中&#xff0c;CAP理论是一个绕不开的话题。CAP&#xff0c;即Consistency&#xff08;一致性&#xff09;、Availability&#xff08;可用性&#xff09;和Partition tolerance&#xff08;分区容错性&#xff09;&#xff0c;这三个属性在分布式系…

langchain发布了v0.2版本

文章目录 前言1. 文档和可发现性&#xff1a;2. 标准化接口&#xff1a;3. 异步和流媒体支持&#xff1a;4. LangGraph&#xff1a;5. 改进的工具包和回调&#xff1a;6. 向后兼容性和迁移&#xff1a;总结 前言 langchain发布了v0.2版本&#xff0c;LangChain v0.2 相比 v0.1…

Vxe UI vxe-table custom 实现自定义列服务端保存,服务端恢复状态,实现用户个性化列信息保存

Vxe UI vue vxe-table custom 实现自定义列服务端保存&#xff0c;服务端恢复状态&#xff0c;实现用户个性化列信息保存 支持将自定义列状态信息&#xff0c;列宽、冻结列、列排序、列显示隐藏 等状态信息保存到本地或服务端 代码 实现自定义列状态保存功能&#xff0c;只需…