【Flink实战】Flink中的分流

Flink中的分流

在Flink中将数据流切分为多个子数据流,子数据流称为”旁路输出数据流“。

拆分流
正常处理
异常处理
数据读取
合法入库
异常监控

拆分流数据的方式

  • Split,已经废弃,不推荐使用
  • Fliter
  • SideOut,推荐使用

Fliter分流的Java实现

    public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 指标明细DataStream<String> detailMessage = KafkaConfigUtil.buildSource(env).map((MapFunction<String, String>) kafkaMessage -> {JSONObject jsonobject = null;try {jsonobject = JSONObject.parseObject(kafkaMessage);} catch (Exception e) {LOG.warn("报文格式错误:{}", kafkaMessage);}if (null == jsonobject || jsonobject.isEmpty()) {LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));} else {if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))&& !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));}}return JSONObject.toJSONString(jsonobject);});// 将原始流中包含demo的数据筛选出来DataStream<String> diagnosisMessages = detailMessage.filter((FilterFunction<String>) kafkaMessage -> (kafkaMessage.contains("demo"))).map((MapFunction<String, String>) sparkMessage -> {// 为达到实验效果,进行日志输出LOG.info("[is demo message]:{}", sparkMessage);return sparkMessage;});env.execute("Flink Streaming Java API Skeleton");}

SideOut分流的Java实现

    public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();System.out.println("【SideOutputDemo】");// 指标明细DataStream<String> mainMessage = KafkaConfigUtil.buildSource(env).map((MapFunction<String, String>) kafkaMessage -> {JSONObject jsonobject = null;try {jsonobject = JSONObject.parseObject(kafkaMessage);} catch (Exception e) {LOG.warn("报文格式错误:{}", kafkaMessage);}if (null == jsonobject || jsonobject.isEmpty()) {LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));} else {if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))&& !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));}}return JSONObject.toJSONString(jsonobject);});// 定义一个切分(旁路输出)final OutputTag<String> outputTag = new OutputTag<String>("Spark_END") {};SingleOutputStreamOperator<String> sp = mainMessage.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String s, Context context, Collector<String> collector) throws Exception {// 向常规流(主流)中添加数据collector.collect(s);// 向旁路输出流中添加数据if (s.contains(AppPhaseEnum.Spark_APP_End.getValue())) {context.output(outputTag, s);}}});sp.map((MapFunction<String, String>) sparkMessage -> {LOG.info("主流的数据: {}", sparkMessage);return sparkMessage;});DataStream<String> tag = sp.getSideOutput(outputTag);tag.map((MapFunction<String, String>) sparkMessage -> {LOG.info("旁路[{}]的数据: {}", outputTag.getId(), sparkMessage);return sparkMessage;});env.execute("Flink Streaming Java API Skeleton");}

SideOutPut 是 Flink 框架推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:

  1. 为每个分支流定义一个 SideOutPut。

  2. 为定义好的 SideOutPut发出数据。只有以下特定的函数才能通过Context上下文对象,向旁路输出的SideOutPut发送数据。

    1. ProcessFunction:处理函数,单流输入函数
    2. KeyedProcessFunction:处理函数,单流输入函数
    3. CoProcessFunction:处理函数,双流流输入函数
    4. KeyedCoProcessFunction:处理函数,双流流输入函数
    5. ProcessWindowFunction:窗口函数,全量计算函数
    6. ProcessAllWindowFunction:窗口函数,全量计算函数,它与 ProcessWindowFunction 类似,但是它会对窗口中的所有数据进行处理,而不是仅处理触发窗口计算的数据。

    例子中使用ProcessFunction实现流拆分。

  3. 根据SideOutPut 的ID标识获取旁路输出流,进行数据继续处理。

拆分方式对比
Split不支持链式拆分,切分得到的流,是不能进行再次切分的
Fliter多分支流,需要多次遍历原始流进行筛选。浪费集群的资源
SideOut以多次进行拆分的,支持链式拆分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/53159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++ 实现一个简单的自定义报文协议

思路&#xff1a; 将发送的数据按照长度和body的方式一次放入一个大的buffer中&#xff0c;4个字节存放body长度&#xff0c;后面存放报文&#xff0c;依次放入数据。 后续如果想要存储复杂类型&#xff0c;可以拓展头部信息&#xff0c;比如数据类型等 #include <stdio.h&…

如何在不使用任何软件的情况下将 PDF 转换为 Excel

通常&#xff0c;您可能会遇到这样的情况&#xff1a;您需要的数据不在 Excel 工作表中&#xff0c;而是以数据表形式出现在 PDF 文件中。为了将此数据放入 Excel 工作表中&#xff0c;如果您尝试将数字复制并粘贴到电子表格中&#xff0c;则列/行将无法正确复制和对齐。因此&a…

java八股文面试[Spring]——如何实现一个IOC容器

什么是IOC容器 IOC不是一种技术&#xff0c;只是一种思想&#xff0c;一个重要的面向对象编程的法则&#xff0c;它能指导我们如何设计出松耦合&#xff0c;更优良的程序。传统应用程序都是由我们在类内部主动创建依赖对象&#xff0c;从而导致类与类之间高耦合&#xff0c;难于…

从零开始配置Jenkins与GitLab集成:一步步实现持续集成

在软件开发中&#xff0c;持续集成是确保高效协作和可靠交付的核心实践。以下是在CentOS上安装配置Jenkins与GitLab集成的详细步骤&#xff1a; 1.安装JDK 解压JDK安装包并设置环境变量&#xff1a; JDK下载网址 Java Downloads | Oracle 台灣 tar zxvf jdk-11.0.5_linux-x64_b…

关于ros工作空间devel下setup.bash的理解

在创建了ros的工作空间之后 在工作空间的devel文件夹中存在几个setup.*sh形式的环境变量设置脚本 使用source命令运行这些脚本文件&#xff0c;则工作空间的环境变量设置可以生效&#xff08;如可以找到该工作空间内的项目&#xff09;。 source devel/setup.bash 设置环境变量…

# Go学习-Day6

文章目录 Go学习-Day6封装继承接口 Go学习-Day6 个人博客&#xff1a;CSDN博客 封装 类似java的类的封装&#xff0c;这里我们利用大小写和工厂模式来实现封装的功能略过 继承 相似的类具有相似的方法&#xff0c;反复绑定相同的方法&#xff0c;代码冗余&#xff0c;所以引…

Zipkin开源的分布式链路追踪系统

Zipkin是一款开源的分布式链路追踪系统,主要功能包括: 1. 采集跟踪数据 - Zipkin client库负责收集并上报各服务的请求信息。 2. 存储跟踪数据 - 存储层默认采用Zipkin自带的基于内存的快速存储,也支持整合MySQL、Cassandra等外部存储。 3. 查询接口 - 提供RESTful API进行跟…

Vue全局后置守卫

全局后置守卫 一、在 router 目录下的 index.js 文件中配置全局后置守卫。 import Vue from vue import VueRouter from vue-router Vue.use(VueRouter)import Home from ../views/Home.vue import About from ../views/About.vue import Login from ../views/Login.vueconst…

Ubuntu 22.04.3 LTS 维护更新发布

导读近日消息&#xff0c;Canonical 今天发布了代号为 Jammy Jellyfish、长期支持的 Ubuntu 22.04 第 3 个维护版本更新&#xff0c;距离上个版本相隔 6 周时间。 Ubuntu 22.04.3 LTS 最大的亮点在于内核升级到 Linux Kernel 6.2&#xff0c;此外 Mesa 图形堆栈也升级到 23.0.…

qt 实现音视频的分贝检测系统

项目场景&#xff1a; 目前的产品经常播放m3u8流&#xff0c;有的视频声音正常&#xff0c;有的视频声音就偏低&#xff0c;即使放到最大音量声音也是比较小&#xff0c;所以就产生了某种需求&#xff0c;能否自动感知视频声音的大小&#xff0c;如果发现声音比较小的情况&…

C++头文件和std命名空间

C 是在C语言的基础上开发的&#xff0c;早期的 C 还不完善&#xff0c;不支持命名空间&#xff0c;没有自己的编译器&#xff0c;而是将 C 代码翻译成C代码&#xff0c;再通过C编译器完成编译。 这个时候的 C 仍然在使用C语言的库&#xff0c;stdio.h、stdlib.h、string.h 等头…

FFmpeg<第一篇>:环境配置

1、官网地址 http://ffmpeg.org/download.html2、linux下载ffmpeg 下载&#xff1a; wget https://ffmpeg.org/releases/ffmpeg-snapshot.tar.bz2解压&#xff1a; tar xvf ffmpeg-snapshot.tar.bz23、FFmpeg ./configure编译参数汇总 解压 ffmpeg-snapshot.tar.bz2 之后&…

【附安装包】Python-3.9.5安装教程

软件下载 软件&#xff1a;Python版本&#xff1a;3.9.5语言&#xff1a;英文大小&#xff1a;26.9M安装环境&#xff1a;Win11/Win10/Win8/Win7硬件要求&#xff1a;CPU2.5GHz 内存2G(或更高&#xff09;下载通道①百度网盘丨64位下载链接&#xff1a;https://pan.baidu.com/…

CSS概念

1、CSS与HTML结合方式 1.1 第一种方式 内联/行内样式 就是在我们的HTML标签上通过style属性来引用CSS代码。 优点:简单方便 &#xff1b; 缺点:只能对一个标签进行修饰。 1.2 第二种方式 内部样式 我们通过<style>标签来声明我们的CSS. 通常<style>标签我们推荐写在…

Spark最后一课

1.Spark的提交过程(YarnCluster) 1.命令输入脚本启动,启动submit任务 2.解析参数 看是cluster还是yarn单点模式 3.创建客户端YarnClusterApplication 4.封装提交命令交给RM 5.RM在NM上启动ApplicationMaster(AM) 注意AM消耗的资源都是container的 6.AM根据参数启动Driver并且…

大数据Flink(六十七):SQL Table 简介及运行环境

文章目录 SQL & Table 简介及运行环境 一、​​​​​​​​​​​​​​简介 二、案例

【大模型】二 、大语言模型的基础知识

文章目录 大型语言模型国内外大语言模型大模型列表国外大模型 大型语言模型 大型语言模型是近年来机器学习和自然语言处理领域的一个重要发展趋势。以GPT模型为例&#xff0c;阐述其发展 GPT系列基于Transformer架构&#xff0c;进行构建&#xff0c;旨在理解和生成人类语言。…

es的索引管理

概念 &#xff08;1&#xff09;集群&#xff08;Cluster&#xff09;&#xff1a; ES可以作为一个独立的单个搜索服务器。不过&#xff0c;为了处理大型数据集&#xff0c;实现容错和高可用性&#xff0c;ES可以运行在许多互相合作的服务器上。这些服务器的集合称为集群。 &…

【电能质量扰动】基于ML和DWT的电能质量扰动分类方法研究(Matlab实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

c语言调用mciSendString播放音乐

如下所示&#xff0c;这是一个使用c语言调用系统方法mciSendString()&#xff0c;让系统播放音乐的示例&#xff1a; baihuaxiang 代码&#xff1a; #include <graphics.h> #include <Windows.h> #include <mmsystem.h>#pragma comment(lib,"WINMM.LIB…