flink处理函数--副输出功能

背景

在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出

副输出

本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:

package wikiedits.processfunc.job;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;public class SideOutPutJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(new SensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());// 打印附输出monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();// 打印主输出monitoredReadings.print();see.execute();}
}package wikiedits.processfunc.process;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if (value.temperature < 32.0) {ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);}out.collect(value);}}
package wikiedits.processfunc.source;/** Copyright 2015 Fabian Hueske / Vasia Kalavri** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**  http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;import java.util.Calendar;
import java.util.Random;/*** Flink SourceFunction to generate SensorReadings with random temperature values.** Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.** Note: This is a simple data-generating source function that does not checkpoint its state.* In case of a failure, the source does not replay any data.*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {// flag indicating whether source is still runningprivate boolean running = true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublic void run(SourceContext<SensorReading> srcCtx) throws Exception {// initialize random number generatorRandom rand = new Random();// look up index of this parallel taskint taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds = new String[10];double[] curFTemp = new double[10];for (int i = 0; i < 10; i++) {sensorIds[i] = "sensor_" + (taskIdx * 10 + i);curFTemp[i] = 65 + (rand.nextGaussian() * 20);}while (running) {// get current timelong curTime = Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor (int i = 0; i < 10; i++) {// update current temperaturecurFTemp[i] += rand.nextGaussian() * 0.5;// emit readingsrcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublic void cancel() {this.running = false;}
}

程序运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/92650.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于YOLOv8的安全帽检测系统

1.Yolov8介绍 Ultralytics YOLOv8是Ultralytics公司开发的YOLO目标检测和图像分割模型的最新版本。YOLOv8是一种尖端的、最先进的&#xff08;SOTA&#xff09;模型&#xff0c;它建立在先前YOLO成功基础上&#xff0c;并引入了新功能和改进&#xff0c;以进一步提升性能和灵活…

树的表示——孩子兄弟表示法

从图中可以看出&#xff0c;树的每个结点&#xff0c;都有不确定的指向他们的孩子的节点&#xff0c;如果我们定义这样一个结构体来便是数的结构的话&#xff1a; struct TreeNode { int val; struct TreeNodep1; struct TreeNodep1; … }; 是不能够表示一棵树的&#xff0c;因…

9.25 day 2

1. 简述方法重写与方法重载的意义与区别&#xff1a; 方法重写&#xff1a; 1.参数列表必须完全与被重写方法相同 //参数列表&#xff08;分为四种&#xff09;&#xff1a; &#xff08;1&#xff09;无参无返回值方法&#xff1b; &#xff08;2&#xff09;有参无返回…

“智慧时代的引领者:探索人工智能的无限可能性“

目录 一.背景 二.应用 2.1金融领域 2.2医疗领域 2.3教育领域 三.发展 四.总结: 一.背景 人工智能&#xff08;Artificial Intelligence&#xff0c;简称AI&#xff09;&#xff0c;是指通过计算机程序模拟人类智能的一种技术。它是计算机科学、工程学、语言学、哲学等多…

【Redis】redis基本数据类型详解(String、List、Hash、Set、ZSet)

目录 RedisString(字符串)List(列表)Hash(字典)Set(集合)ZSet(有序集合) Redis Redis有5种基本的数据结构&#xff0c;分别为&#xff1a;string&#xff08;字符串&#xff09;、list&#xff08;列表&#xff09;、set&#xff08;集合&#xff09;、hash&#xff08;哈希&a…

CUDA、cudNN 、CUDAToolKit三者关系

cudatoolkit和cuda有关系吗 CUDA 是 NVIDIA 推出的一种并行计算平台和编程模型&#xff0c;可以在 NVIDIA GPU 上运行 C/C 代码。 CUDA Toolkit 是 NVIDIA 提供的一套开发工具&#xff0c;它包含了用于开发 CUDA 应用程序所需的各种工具&#xff0c;如编译器、调试器和库。 因…

音频编辑软件Steinberg SpectraLayers Pro mac中文软件介绍

Steinberg SpectraLayers Pro mac是一款专业的音频编辑软件&#xff0c;旨在帮助音频专业人士进行精细的音频编辑和声音处理。它提供了强大的频谱编辑功能&#xff0c;可以对音频文件进行深入的频谱分析和编辑。 Steinberg SpectraLayers Pro mac软件特点 1. 频谱编辑&#xff…

transformers简介

目录 1、前言 2、网络结构 &#xff08;1&#xff09;、Transformers的总体架构可以分为四部分 &#xff08;2&#xff09;、输入文本包含 &#xff08;3&#xff09;、输出部分包含 &#xff08;4&#xff09;、编码器部分 &#xff08;5&#xff09;、解码器部分 1、前…

sci投稿流程(从投稿到录用全流程解析)

论文投稿流程 1.初稿:准备好论文&#xff0c;给编辑的信(cover letter&#xff09; 2.返修: 一.会返回两个东西: 1&#xff09;编辑的决定 (小修minior review&#xff0c;大修major review&#xff0c;拒稿reject等等)。只要不是审稿人质疑创新性问题和数据造假问题&…

Mybatis配置文件(mybatis-config.xml)和Mapper映射文件(XXXMapper.xml)模板

配置文件 ${dirver} ---> com.mysql.jdbc.Driver ${url} ---> jdbc:mysql://localhost:3306/数据库名 <?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE configurationPUBLIC "-//mybatis.org//DTD Config 3.0//EN""h…

SpringBoot 如何解决跨域问题

Spring Boot 中的跨域请求&#xff08;Cross-Origin Request&#xff09;问题与解决方案 跨域请求是指浏览器从一个域名的网页去请求另一个域名的资源&#xff0c;它是为了增强 Web 安全性而产生的限制。Spring Boot 应用程序通常会面临跨域请求的问题&#xff0c;本文将介绍跨…

c++ 关键字和标识符

在本教程中&#xff0c;我们将学习关键字&#xff08;C 编程中的保留关键字&#xff0c;它们是语法的一部分&#xff09;。另外&#xff0c;我们还将学习标识符以及如何命名它们。 C 关键字 关键字是预定义的单词&#xff0c;对编译器具有特殊的含义。例如&#xff0c; 示例…

AWS SAA知识点整理(作成中)

共通 一些信息已经更新了&#xff0c;但参考题的答案还是旧的。 比如&#xff1a; S3的最大读写性能已经提高到 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second 并且不再要求使用random prefix 题目中有时候会让选择Not violation 不合适的一项&#xff…

2022年6月和7月的工作经历

6月 3D打标软件 3D打标软件&#xff0c;要求在Open3d上加几个2D文字。大致有如下几个方案&#xff1a; 依葫芦画瓢&#xff0c;但O3DVisualizer派生于gui::Window&#xff0c;我的程序派生于Visualizer。工作量不小。 利用OpenGL输出文字&#xff0c;Baidu的两种方法一个编…

AI编程助手 Amazon CodeWhisperer 全面解析与实践

目录 引言Amazon CodeWhisperer简介智能编程助手智能代码建议代码自动补全 提升代码质量代码质量提升安全性检测 支持多平台多语言 用户体验和系统兼容性用户体验文档和学习资源个性化体验系统兼容性 功能全面性和代码质量功能全面性代码生成质量和代码安全性 CodeWhisperer的代…

在pycharm中弹出图后,需要关闭才会显示Process finished with exit code 0

在pycharm中弹出图后&#xff0c;需要关闭才会显示Process finished with exit code 0 在PyCharm中&#xff0c;当你运行一个Python程序并弹出一个图形窗口时&#xff0c;程序会等到图形窗口关闭后才会显示 “Process finished with exit code 0” 的消息。 这是 由于代码执行…

毛玻璃员工卡片悬停效果

效果展示 页面结构组成 通过效果展示图&#xff0c;我们可以看出页面布局比较常规&#xff0c;最核心的就是卡片&#xff0c;当鼠标没有悬停在卡片上时&#xff0c;文字和头像处于半透明状态&#xff0c;当鼠标悬停在卡片上是&#xff0c;底部会展示社交图标。 CSS 知识点 b…

AI大模型安装

1、https://blog.csdn.net/m0_63748493/article/details/131914092 环境安装 2、https://www.jianshu.com/p/728b4b7c3efd 3、安装显卡驱动 下载 https://blog.csdn.net/zataji/article/details/123104569 用bash 下载启动文档位置run 报错提示 https://blog.csdn.net/sunming…

1.物联网射频识别,RFID概念、组成、中间件、标准,全球物品编码——EPC码

1.RFID概念 RFID是Radio Frequency Identification的缩写&#xff0c;又称无线射频识别&#xff0c;是一种通信技术&#xff0c;可通过无线电讯号识别特定目标并读写相关数据&#xff0c;而无需与被识别物体建立机械或光学接触。 RFID&#xff08;Radio Frequency Identificati…