Flink 侧输出流(SideOutput)

🌸在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。

🌸在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。ProcessFunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。

当使用旁路输出时,首先需要定义一个OutputTag来标识一个旁路输出流

val OutPut=OutputTag[String]("side-output")

注意:OutputTag是如何根据旁路输出流包含的元素类型typed的    

 ✨可以通过以下几种函数发射数据到旁路输出

        ProcessFunction

        CoProcessFunction

        ProcessWindowFunction

        ProcessAllWindowFunction

//将含有特殊字符串的流区分开,数据由两个定义好的工具类向Kafka灌入不同内容的数据,
//然后通过侧输出流(SideOutput)将不同的流进行分离,得到不同的输出import com.alibaba.fastjson.JSON
import com.tech.bean.Person_t
import com.tech.util.KafkaSourceUtil
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject sideOutputPerson_t {def main(args: Array[String]): Unit = {// UI地址访问:http://localhost:8081/#/job/runningval env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())val ksu = new KafkaSourceUtil("person_t", "test-consumer-group")val dstream = env.addSource(ksu.getSouceInfo())// 首先需要定义一个OutputTag来标识一个旁路输出流val outputTag = new OutputTag[String]("person_t_side-output")val mainDataStream = dstream.map(line => {JSON.parseObject(line, classOf[Person_t])})val sideOutput = mainDataStream.process(new ProcessFunction[Person_t, String] {override def processElement(value: Person_t,ctx: ProcessFunction[Person_t, String]#Context,out: Collector[String]): Unit = {if (!value.getName.contains("_side")) {out.collect(value.toString)} else {// 测输出流输出的部分ctx.output(outputTag, "sideOutput-> 带有_side标识的数据名称" + value.getName)}}})val sideOutputStream: DataStream[String] = sideOutput.getSideOutput(outputTag)// 测输出流处理sideOutputStream.print("测输出流")// 常规数据处理sideOutput.print("常规数据")env.execute("outSideput")}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/696009.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

操作系统导论-课后作业-ch19

1. 本书在第6章中有过介绍,gettimeofday函数最多精确到us,并且大致精确(并不完全精确),需要多迭代几次减少误差,循环次数太多也会导致结束时间小于开始时间(即回滚)的现象&#xff…

两数相加

2. 两数相加 给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 请你将两个数相加,并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外,这两个…

C语言—指针(1)

碎碎念:做指针题的时候我仿佛回到了原点&#xff0c;总觉得目的是为了把框架搭建起来&#xff0c;我胡说的哈31 1.利用指针变量将一个数组中的数据反向输出。 /*1.利用指针变量将一个数组中的数据反向输出。*/#include <stdio.h> #include <time.h> #include <…

一文读懂:AWS 网络对等互连(VPC peering)实用操作指南

VPC peering connection-网络对等互连在您的 Atlas VPC 和云提供商的 VPC 之间建立私有连接。该连接将流量与公共网络隔离以提高安全性。本篇文章有VPC peering的操作指南以及价格等信息。如还有疑问请联系我们MongoDB的销售&#xff0c;客户成功经理或解决方案架构师。 1 使用…

2000-2022年各省环境规制数据(原始数据+计算过程+计算结果)

2000-2022年各省环境规制数据&#xff08;原始数据计算过程计算结果&#xff09; 1、时间&#xff1a;2000-2022年 2、范围&#xff1a;30省 3、来源&#xff1a;各省年鉴、国家统计局、统计年鉴 4、指标&#xff1a;年份、省份、工业污染源治理投资完成实际额、工业增加值…

gitlab,从A仓库迁移某个工程到B仓库,保留提交记录

从A仓库&#xff0c;拷贝 git clone --bare ssh://git192.168.30.88:22/framework/platform.git 在B仓库新建工程&#xff0c;注意&#xff1a;一定要去掉默认的生成README文件进入platform.git 文件夹下&#xff0c;推送到B仓库 git push --mirror ssh://git192.168.30.100…

压缩感知常用的重建算法

重建算法的基本概念 在压缩感知&#xff08;Compressed Sensing, CS&#xff09;框架中&#xff0c;重建算法是指将从原始信号中以低于奈奎斯特率采集得到的压缩测量值恢复成完整信号的数学和计算过程。由于信号在采集过程中被压缩&#xff0c;因此重建算法的目标是找到最符合…

文件上传漏洞--Upload-labs--Pass20--数组绕过

一、漏洞原理 漏洞来源&#xff1a;count()函数漏洞。 现自定义一个数组 arr[]&#xff0c;定义arr[0]1,arr[3]2, 此时count(arr)的值为2&#xff0c;则arr[count[arr]]即为arr[2]&#xff0c;但是arr[2]未定义&#xff0c;即为一个空值&#xff0c;若使用count()函数的本意是…

图形系统开发实战课程:进阶篇(上)——6.图形交互操作:拾取

图形开发学院&#xff5c;GraphAnyWhere 课程名称&#xff1a;图形系统开发实战课程&#xff1a;进阶篇(上)课程章节&#xff1a;“图形交互操作:拾取”原文地址&#xff1a;https://www.graphanywhere.com/graph/advanced/2-6.html 第六章 图形交互操作:拾取 \quad 在图形系统…

【网络编程】okhttp深入理解

newCall 实际上是创建了一个 RealCall 有三个参数&#xff1a;OkHttpClient&#xff08;通用配置&#xff0c;超时时间等&#xff09; Request(Http请求所用到的条件&#xff0c;url等) 布尔变量forWebSocket&#xff08;webSocket是一种应用层的交互方式&#xff0c;可双向交互…

代码随想录算法训练营第23天|669. 修剪二叉搜索树 ● 108.将有序数组转换为二叉搜索树 ● 538.把二叉搜索树转换为累加树

669.修建二叉搜索树 思路:不能直接删除比区间小的或大的节点。例如比low小的根节点,虽然左子树都比根节点小可以全删,但是其右子树可能是存在区间内符合条件的值,所以需要在其根节点右子树继续遍历找到不符合条件的节点删除。比high大的思路和比low小的一样。 代码: TreeNod…

2.22使用GPIO子系统编写LED灯驱动,应用程序测试//注册三个按键的中断

使用GPIO子系统编写LED灯驱动 驱动程序 #include <linux/init.h> #include <linux/module.h> #include <linux/of.h> #include <linux/of_gpio.h> #include <linux/gpio.h> #include <linux/timer.h> #include <linux/interrupt.h>…

高防服务器主要运用在哪些场景?

高防服务器主要是用来防御DDOS攻击的服务器&#xff0c;能够为客户提供安全维护&#xff0c;高防服务器能够帮助网站拒绝服务攻击&#xff0c;定时扫描网络主节点&#xff0c;进行查找可能会出现的安全漏洞的服务类型&#xff0c;高防服务器也会根据不同的IDC机房环境来提供硬防…

Java的编程之旅22——将类作为对象的返回值

在Java中&#xff0c;可以将类作为对象的返回值。这可以通过在方法的声明中指定返回值类型为该类来实现。以下是一个示例&#xff1a; 首先定义一个“人”类 &#xff0c;Person类有三个成员变量&#xff1a;name、age和一个方法eat。 public class Person{public String nam…

ubuntu22.04@Jetson Orin Nano之CSI IMX219安装

ubuntu22.04Jetson Orin Nano之CSI IMX219安装 1. 源由2. 安装2.1 硬件安装2.2 软件配置2.3 新增摄像头 3. 效果4. 参考资料 1. 源由 折腾半天时间&#xff0c;捣鼓这个套装摄像头(IMX219)的安装&#xff0c;死活就是没有这个设备。世界总是这么小&#xff0c;看看遇到问题的大…

排序算法之——快速排序

快速排序 1.基本思想2.图示详解——以升序排列为例3.对基本思想和图示的补充说明4.代码实现5.空间、时间复杂度5.1最好情况5.2最坏情况 6.区间按照基准值划分的方法6.1 Hoare法6.2 挖坑法 7.优化措施7.1三数取中法7.1.1三数取中法——核心代码7.1.2优化效果7.1.3补充说明 7.2 递…

docker 容器内服务随容器自动启动

docker 容器内服务随容器自动启动 背景准备工作方案一&#xff0c;直接修改.bashrc文件&#xff08;简单粗暴&#xff09;方案二&#xff0c;编写启动脚本加入.bashrc文件&#xff08;文明一点&#xff09;制作nginx服务自启动镜像测试新镜像&#xff0c;nginx服务随容器自动启…

Pygame:让Python游戏开发无处不在

Pygame 是一个用于编写视频游戏的 Python 模块集。由于它提供了大量的工具和功能&#xff0c;使得 Python 开发者能够轻松地创建 2D 游戏&#xff0c;因此它已经成为 Python 游戏开发社区中非常受欢迎的选择。Pygame 支持跨平台开发&#xff0c;这意味着使用 Pygame 编写的游戏…

回显服务器的制作方法

文章目录 客户端和服务器TCP和UDP的特点UDP socket api的使用DatagramSocketDatagramPacketInetSocketAddress API 做一个简单的回显服务器UDP版本的回显服务器TCP版本的回显服务器 客户端和服务器 在网络中&#xff0c;主动发起通信的一方是客户端&#xff0c;被动接受的这一方…

Rman全备和增量备份说明

RMAN备份分为全备和增量备份&#xff0c;全备不能成为增量备份策略的一部分&#xff0c;它也不能作为后续增量备份的基础。 RMAN增量备份分为0、1、2三级&#xff0c;其中0级备份是增量备份的基础&#xff0c;备份内容也跟全备份一样&#xff0c;要使用增量备份&#xff0c;必…