flink处理函数--副输出功能

背景

在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出

副输出

本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:

package wikiedits.processfunc.job;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;public class SideOutPutJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(new SensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());// 打印附输出monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();// 打印主输出monitoredReadings.print();see.execute();}
}package wikiedits.processfunc.process;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if (value.temperature < 32.0) {ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);}out.collect(value);}}
package wikiedits.processfunc.source;/** Copyright 2015 Fabian Hueske / Vasia Kalavri** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**  http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;import java.util.Calendar;
import java.util.Random;/*** Flink SourceFunction to generate SensorReadings with random temperature values.** Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.** Note: This is a simple data-generating source function that does not checkpoint its state.* In case of a failure, the source does not replay any data.*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {// flag indicating whether source is still runningprivate boolean running = true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublic void run(SourceContext<SensorReading> srcCtx) throws Exception {// initialize random number generatorRandom rand = new Random();// look up index of this parallel taskint taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds = new String[10];double[] curFTemp = new double[10];for (int i = 0; i < 10; i++) {sensorIds[i] = "sensor_" + (taskIdx * 10 + i);curFTemp[i] = 65 + (rand.nextGaussian() * 20);}while (running) {// get current timelong curTime = Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor (int i = 0; i < 10; i++) {// update current temperaturecurFTemp[i] += rand.nextGaussian() * 0.5;// emit readingsrcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublic void cancel() {this.running = false;}
}

程序运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/92650.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于YOLOv8的安全帽检测系统

1.Yolov8介绍 Ultralytics YOLOv8是Ultralytics公司开发的YOLO目标检测和图像分割模型的最新版本。YOLOv8是一种尖端的、最先进的&#xff08;SOTA&#xff09;模型&#xff0c;它建立在先前YOLO成功基础上&#xff0c;并引入了新功能和改进&#xff0c;以进一步提升性能和灵活…

树的表示——孩子兄弟表示法

从图中可以看出&#xff0c;树的每个结点&#xff0c;都有不确定的指向他们的孩子的节点&#xff0c;如果我们定义这样一个结构体来便是数的结构的话&#xff1a; struct TreeNode { int val; struct TreeNodep1; struct TreeNodep1; … }; 是不能够表示一棵树的&#xff0c;因…

9.25 day 2

1. 简述方法重写与方法重载的意义与区别&#xff1a; 方法重写&#xff1a; 1.参数列表必须完全与被重写方法相同 //参数列表&#xff08;分为四种&#xff09;&#xff1a; &#xff08;1&#xff09;无参无返回值方法&#xff1b; &#xff08;2&#xff09;有参无返回…

“智慧时代的引领者:探索人工智能的无限可能性“

目录 一.背景 二.应用 2.1金融领域 2.2医疗领域 2.3教育领域 三.发展 四.总结: 一.背景 人工智能&#xff08;Artificial Intelligence&#xff0c;简称AI&#xff09;&#xff0c;是指通过计算机程序模拟人类智能的一种技术。它是计算机科学、工程学、语言学、哲学等多…

【Redis】redis基本数据类型详解(String、List、Hash、Set、ZSet)

目录 RedisString(字符串)List(列表)Hash(字典)Set(集合)ZSet(有序集合) Redis Redis有5种基本的数据结构&#xff0c;分别为&#xff1a;string&#xff08;字符串&#xff09;、list&#xff08;列表&#xff09;、set&#xff08;集合&#xff09;、hash&#xff08;哈希&a…

音频编辑软件Steinberg SpectraLayers Pro mac中文软件介绍

Steinberg SpectraLayers Pro mac是一款专业的音频编辑软件&#xff0c;旨在帮助音频专业人士进行精细的音频编辑和声音处理。它提供了强大的频谱编辑功能&#xff0c;可以对音频文件进行深入的频谱分析和编辑。 Steinberg SpectraLayers Pro mac软件特点 1. 频谱编辑&#xff…

transformers简介

目录 1、前言 2、网络结构 &#xff08;1&#xff09;、Transformers的总体架构可以分为四部分 &#xff08;2&#xff09;、输入文本包含 &#xff08;3&#xff09;、输出部分包含 &#xff08;4&#xff09;、编码器部分 &#xff08;5&#xff09;、解码器部分 1、前…

SpringBoot 如何解决跨域问题

Spring Boot 中的跨域请求&#xff08;Cross-Origin Request&#xff09;问题与解决方案 跨域请求是指浏览器从一个域名的网页去请求另一个域名的资源&#xff0c;它是为了增强 Web 安全性而产生的限制。Spring Boot 应用程序通常会面临跨域请求的问题&#xff0c;本文将介绍跨…

AWS SAA知识点整理(作成中)

共通 一些信息已经更新了&#xff0c;但参考题的答案还是旧的。 比如&#xff1a; S3的最大读写性能已经提高到 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second 并且不再要求使用random prefix 题目中有时候会让选择Not violation 不合适的一项&#xff…

AI编程助手 Amazon CodeWhisperer 全面解析与实践

目录 引言Amazon CodeWhisperer简介智能编程助手智能代码建议代码自动补全 提升代码质量代码质量提升安全性检测 支持多平台多语言 用户体验和系统兼容性用户体验文档和学习资源个性化体验系统兼容性 功能全面性和代码质量功能全面性代码生成质量和代码安全性 CodeWhisperer的代…

在pycharm中弹出图后,需要关闭才会显示Process finished with exit code 0

在pycharm中弹出图后&#xff0c;需要关闭才会显示Process finished with exit code 0 在PyCharm中&#xff0c;当你运行一个Python程序并弹出一个图形窗口时&#xff0c;程序会等到图形窗口关闭后才会显示 “Process finished with exit code 0” 的消息。 这是 由于代码执行…

毛玻璃员工卡片悬停效果

效果展示 页面结构组成 通过效果展示图&#xff0c;我们可以看出页面布局比较常规&#xff0c;最核心的就是卡片&#xff0c;当鼠标没有悬停在卡片上时&#xff0c;文字和头像处于半透明状态&#xff0c;当鼠标悬停在卡片上是&#xff0c;底部会展示社交图标。 CSS 知识点 b…

1.物联网射频识别,RFID概念、组成、中间件、标准,全球物品编码——EPC码

1.RFID概念 RFID是Radio Frequency Identification的缩写&#xff0c;又称无线射频识别&#xff0c;是一种通信技术&#xff0c;可通过无线电讯号识别特定目标并读写相关数据&#xff0c;而无需与被识别物体建立机械或光学接触。 RFID&#xff08;Radio Frequency Identificati…

Qt扩展-QCustomPlot 简介及配置

QCustomPlot 简介及配置 一、概述二、安装教程三、帮助文档的集成 一、概述 QCustomPlot是一个用于绘图和数据可视化的Qt 控件。它没有进一步的依赖关系&#xff0c;并且有良好的文档记录。这个绘图库专注于制作好看的、发布质量的2D绘图、图形和图表&#xff0c;以及为实时可…

Spring IOC之AnnotationConfigApplicationContext

博主介绍:✌全网粉丝近5W,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌ 博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus…

Andriod 简单控件

目录 一、文本显示1.1 设置文本内容1.2 设置文本大小1.3 设置文本颜色 二、视图基础2.1 设置视图宽高2.2 设置视图间距2.3 设置视图对齐方式 三、常用布局3.1 线性布局LinearLayout3.2 相对布局RelativeLayout3.3 网格布局GridLayout3.4 滚动视图ScrollView 四、按钮触控4.1 按…

VSCode 在部分 Linux 设备上终端和文本编辑器显示文本不正常的解决方法

部分Linux设备上运行VSCode时&#xff0c;发现文本编辑器的缩放不明显&#xff0c;终端字体间距过大等。 这里以Kali Linux为例&#xff0c;其他Linux发行版请选择对应的系统内置的等宽字体 我们依次打开 设置 -> 外观 -> 字体 这里我们可以发现&#xff0c;Kali Linux默…

Linux性能优化--性能工具-系统CPU

2.0.概述 本章概述了系统级的Linux性能工具。这些工具是你追踪性能问题时的第一道防线。它们能展示整个系统的性能情况和哪些部分表现不好。 1.理解系统级性能的基本指标&#xff0c;包括CPU的使用情况。 2.明白哪些工具可以检索这些系统级性能指标。 2.1CPU性能统计信息 为…

chrome extensions mv3通过content scripts注入/获取原网站的window数据

开发插件的都知道插件的content scripts和top window只共享Dom不共享window和其他数据&#xff0c;如果想拿挂载在window的数据还有点难度&#xff0c;下面会通过事件的方式传递cs和top window之间的数据写一个例子 代码 manifest.json 这里只搞了2个js&#xff0c;content.…