flink的CoProcessFunction使用示例

背景

在flink中对两个流进行connect之后进行出处理的场景很常见,我们本文就以书中的一个例子为例说明下实现一个CoProcessFunction的一些要点

实现CoProcessFunction的一些要点

这个例子举例的是当收到某个传感器放行的控制消息时,从传感器传来的温度流消息会被运行向下游传递一段时间

/*** 展示CoProcessFunction+onTimer使用方法的例子*/
public class CoProcessFunctionTimers {public static void main(String[] args) throws Exception {// set up the streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// use event time for the applicationenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 控制消息流允许传感器消息流通过指定长度的时间DataStream<Tuple2<String, Long>> filterSwitches = env.fromElements(// forward readings of sensor_2 for 10 secondsTuple2.of("sensor_2", 10_000L),// forward readings of sensor_7 for 1 minuteTuple2.of("sensor_7", 60_000L));// 传感器消息流DataStream<SensorReading> readings = env// SensorSource generates random temperature readings.addSource(new SensorSource());//传感器消息流connet控制消息流,并且按照传感器id作为key进行分组DataStream<SensorReading> forwardedReadings = readings//连接控制消息流.connect(filterSwitches)// 按照传感器id分组.keyBy(r -> r.id, s -> s.f0)// 应用CoProcessFunction + onTimer函数.process(new ReadingFilter());forwardedReadings.print();env.execute("Filter sensor readings");}//应用CoProcessFunction + onTimer函数,这已经按照key=传感器id分好组public static class ReadingFilter extends CoProcessFunction<SensorReading, Tuple2<String, Long>, SensorReading> {// 传感器开关状态--键值分区状态,key是传感器idprivate ValueState<Boolean> forwardingEnabled;// 保存传感器开关持续时间的状态--键值分区状态,key是传感器idprivate ValueState<Long> disableTimer;// 初始化键值分区状态 key是传感器idpublic void open(Configuration parameters) throws Exception {forwardingEnabled = getRuntimeContext().getState(new ValueStateDescriptor<>("filterSwitch", Types.BOOLEAN));disableTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Types.LONG));}@Overridepublic void processElement1(SensorReading r, Context ctx, Collector<SensorReading> out) throws Exception {// 处理传感器消息流,首先检查key是传感器id对应的键值分区状态,如果开启,那么这个传感器消息就可以正常通过Boolean forward = forwardingEnabled.value();if (forward != null && forward) {out.collect(r);}}@Overridepublic void processElement2(Tuple2<String, Long> s, Context ctx, Collector<SensorReading> out) throws Exception {//控制流消息过来后,更新键值分区的开关状态为true, key是传感器idforwardingEnabled.update(true);//控制流消息过来后,更新键值分区的开关状态为true的持续时长的定时器, key是传感器idlong timerTimestamp = ctx.timerService().currentProcessingTime() + s.f1;Long curTimerTimestamp = disableTimer.value();if (curTimerTimestamp == null || timerTimestamp > curTimerTimestamp) {// remove current timerif (curTimerTimestamp != null) {ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);}// register new timerctx.timerService().registerProcessingTimeTimer(timerTimestamp);disableTimer.update(timerTimestamp);}}// 键值开关状态的持续时间定时器,key是传感器id,注意,在ontimer方法中,也可以通过out.collect的方式向下游算子发送消息public void onTimer(long ts, OnTimerContext ctx, Collector<SensorReading> out) throws Exception {// 定时器时间到了之后,清理掉传感器的开关状态forwardingEnabled.clear();disableTimer.clear();}}
}

以上就是实现一个CoProcessFunction的大概逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/137643.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java自学第8课:电商项目(3) - 重新搭建环境

由于之前用的jdk和eclipse&#xff0c;以及mysql并不是视频教程所采用的&#xff0c;在后面运行源码和使用作者提供源码时&#xff0c;总是报错&#xff0c;怀疑&#xff1a; 1 数据库有问题 2 jdk和引入的jar包不匹配 3 其他什么未知的错误&#xff1f; 所以决定卸载jdk e…

SQL入门语句

MySQL和SQL的区别是什么&#xff1f;之间是什么关系&#xff1f; SQL&#xff08;Structured Query Language&#xff09;是用于管理和操作关系型数据库&#xff08;RDBMS&#xff09;的标准语言。SQL还可以用于这些RDBMS&#xff1a;MySQL、Oracle、Microsoft SQL Server、Pos…

ios 开发问题小集 [持续更新]

文章目录 一、如何给列表上的UITableViewCell添加手势二、获取NSIndexPath的方式2.1 根据row, section 来创建2.2 根据point 的位置来找到 indexPath三、tableView在Grouped样式下,设置表头表尾空白一、如何给列表上的UITableViewCell添加手势 给cell添加手势,大家都会这么做…

Kafka中遇到的错误:

1、原因&#xff1a;kafka是一个去中心化结果的&#xff0c;所以在启动Kafka的时候&#xff0c;每一个节点上都需要启动。 启动的命令&#xff1a;kafka-server-start.sh -daemon /usr/local/soft/kafka_2.11-1.0.0/config/server.properties

AMESim 2021安装教程

主要是AMESim的安装 写在前面&#xff0c;由于项目需要&#xff0c;需要自学AMESim&#xff0c;因此需要安装这个软件&#xff0c;目前仅仅安装使用&#xff0c;还不涉及到与MATLAB的联合仿真&#xff0c;老板说用 RT LAB半实物仿真平台&#xff0c;但是简单搜了一下&#xff0…

Flutter笔记:绘图示例 - 一个简单的(Canvas )时钟应用

Flutter笔记 绘图示例 - 一个简单的&#xff08;Canvas &#xff09;时钟应用 作者&#xff1a;李俊才 &#xff08;jcLee95&#xff09;&#xff1a;https://blog.csdn.net/qq_28550263 邮箱 &#xff1a;291148484163.com 本文地址&#xff1a;https://blog.csdn.net/qq_2855…

如何记录血压的波动情况

import pandas as pd from plotnine import * import matplotlib.pyplot as plt plt.rcParams[font.sans-serif] [Microsoft YaHei] 记录时间(time)、收缩压(SBP)、舒张压(DBP)&#xff1a; df pd.DataFrame({ time: [2023-11-01 08:30, 2023-11-02 21:00, 2023-11-0…

正则表达式中(?s)与(?m)的区别

理论&#xff1a; (?m) 和 (?s) 是正则表达式中的两个模式标志&#xff0c;它们具有不同的作用&#xff1a; (?m) 多行模式标志&#xff08;也称为 “multiline” 模式&#xff09;&#xff1a; 默认情况下&#xff0c;正则表达式将整个输入字符串视为单行多行文本中使用…

前端-第一部分-HTML

一.初识HTML 1.1 HTML 简介 HTML 全称为 HyperText Mark-up Language&#xff0c;翻译为超文本标签语言&#xff0c;标签也称作标记或者元素。HTML 是目前网络上应用最为广泛的技术之一&#xff0c;也是构成网页文档的主要基石之一。HTML文本是由 HTML 标签组成的描述性文本&a…

力扣最热一百题——每日温度

Python后面的文章&#xff0c;内容都比较多&#xff0c;但是同时我又想保持每天更新的速度&#xff0c;所以Python的文章我继续打磨打磨&#xff0c;先更新一篇算法的文章。 一身正气报国家&#xff0c;旁无乱境不恋她 ヾ(◍∇◍)&#xff89;&#xff9e; 力扣题号&#xff1a…

css呼吸效果实现

实现一个图片有规律的大小变化&#xff0c;呈现呼吸效果&#xff0c;怎么用CSS实现这个呼吸效果呢 一.实现 CSS实现动态效果可以使用动画( animation)来属性实现&#xff0c;放大缩小效果可以用transform: scale来实现&#xff0c;在这基础上有了动画&#xff0c;就可以设置一个…

ps人像怎么做渐隐的效果?

photoshop怎么制作人像渐隐的图片效果&#xff1f;渐隐效果需要使用渐变来实现&#xff0c;下面我们就来看看详细的教程。 首先&#xff0c;我们打开Photoshop&#xff0c;点击屏幕框选的【打开】&#xff0c;打开一张背景图片。 下面&#xff0c;我们点击左上角【文件】——【…

Exploration by random network distillation论文笔记

Exploration by Random Network Distillation (2018) 随机网络蒸馏探索 0、问题 这篇文章提出的随机网络蒸馏方法与Curiosity-driven Exploration by Self-supervised Prediction中提出的好奇心机制的区别&#xff1f; 猜想&#xff1a;本文是基于随机网络蒸馏提出的intrin…

16-nacos-快速入门

1.4.启动 启动非常简单&#xff0c;进入bin目录 然后执行命令即可&#xff1a; windows命令&#xff1a; startup.cmd -m standalone3.Nacos的依赖 父工程&#xff1a; <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-…

楼宇天台视频AI智能监管方案,时刻保障居民安全

一、背景需求分析 我们经常能看到这样的新闻报道&#xff0c;小孩登上小区的天台玩耍&#xff0c;因为家长和物业人员发现得晚&#xff0c;没有及时制止&#xff0c;结果导致意外事故的发生。此前&#xff0c;在某小区就有居民拍下多名儿童在小区高层住宅的楼顶玩耍跳跃&#…

Pytorch R-CNN目标检测-汽车car

概述 目标检测(Object Detection)就是一种基于目标几何和统计特征的图像分割,它将目标的分割和识别合二为一,通俗点说就是给定一张图片要精确的定位到物体所在位置,并完成对物体类别的识别。其准确性和实时性是整个系统的一项重要能力。 R-CNN的全称是Region-CNN(区域卷积神经…

Nginx实现tcp代理并支持TLS加密实验

Nginx源码编译 关于nginx的搭建配置具体参考笔者之前的一篇文章&#xff1a;实时流媒体服务器搭建试验&#xff08;nginxrtmp&#xff09;_如何在线测试流媒体rtmp搭建成功了吗-CSDN博客中的前半部分&#xff1b;唯一变化的是编译参数&#xff08;添加stream模块并添加其对应ss…

无线城市WiFi解决方案【完整Word】

wx供重浩&#xff1a;创享日记 获取完整无水印高清Word版 文章目录 第1章 项目背景1.1“无线城市”的定义1.2 国内外“无线城市”发展概况1.3 典型案例分析1.4 建设无线城市的必要性1.5 无线城市能为政府带来的价值 第2章 项目需求分析2.1 无线城市的现状分析2.2 无线城市的总体…

Excel中功能区的存放位置很灵活,可以根据需要隐藏或显示

在这个简短的教程中,你将找到5种快速简单的方法来恢复Excel功能区,以防丢失,并学习如何隐藏功能区,为工作表腾出更多空间。 功能区是Excel中所有操作的中心点,也是大多数可用功能和命令所在的区域。你觉得功能区占用了你太多的屏幕空间吗?没问题,只需单击鼠标,它就被隐…

Wsl2 Ubuntu在不安装Docker Desktop情况下使用Docker

目录 1. 前提条件 2.安装Distrod 3. 常见问题 3.1.docker compose 问题无法使用问题 3.1. docker-compose up报错 参考文档 1. 前提条件 win10 WSL2 Ubuntu(截止202308最新版本是20.04.xx) 有不少的博客都是建议直接安装docker desktop&#xff0c;这样无论在windows…