【Flink快速入门-5.流处理之多流转换算子】

流处理之多流转换算子

实验介绍

前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子流中处理不同的业务逻辑。所以本节实验的内容我们将学习 DataSteam API 中的可以将多条输入流合并为一个输入流,或者将一个输入流分割为多个子流的算子,我们将其统称为“多流转换算子”。

知识点
  • Union
  • filter

算子演示

Union

union 顾名思义就是连接的意思,所以 union 算子的作用就是合并两条或者多条相同类型的 DataStream,生成一个新的类型相同的 DataStream。如图所示:
在这里插入图片描述

需要注意的是,事件合流的方式为 FIFO 方式。操作符并不会产生一个特定顺序的事件流。union 操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。

假设某公司分别在淘宝和天猫都开设了自己的直营店,公司高层需要实时监控到两个店铺的交易数据,并希望通过大屏展示的方式实时滚动。我们可以通过两条 Socket 输入流来模拟这样的场景。

首先在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个 UnionOperator 的 Scala object,输入如下代码:

package com.vlab.operatorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object UnionOperator {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 接收京东订单val jdOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9999)// 接收拼刀刀订单val pindaoOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9998)// 将两条输入流合并为一条输入流val unionStream:DataStream[String] = jdOrder.union(pindaoOrder)// 设置并行度unionStream.print().setParallelism(1)// 执行env.execute("UnionOperator")}}

我们使用 netcat 监控两个端口来模拟发送淘宝和天猫的订单信息,然后使用 Flink 接收。打开终端窗口,执行 nc -l -p 9998 命令,紧接着打开另一个终端窗口,执行 nc -l -p 9999 命令。这样的话我们监控了 9998 和 9999 两个端口,接下来在 Flink 中进行接收。

运行刚刚的代码,然后在前面打开的两个终端中交替发送订单数据,观察 idea 控制台输出。
在这里插入图片描述

filter

使用 filter 来根据体温的阈值将流拆分为两个子流:一个是正常体温流,另一个是发烧体温流。然后我们可以对每个子流进行不同的业务逻辑处理。

疫情期间,全国各地的超市、医院、机场等公共场所入口都有温度监控设备,当该设备检测到某个人体温异常之后就会报警。假设鉴别正常体温和发烧体温的阈值为 36.0 摄氏度,也就是说,只要体温大于等于 36.0 摄氏度我们就认为其为发烧状态。我们使用 检测体温是否异常,我们可以使用 filter 来将流分为两条子流,一个代表 正常体温,另一个代表 发烧体温,然后可以对这些流进行不同的业务逻辑处理。

在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个名为 SelectOperator 的 Scala object,代码如下:

package com.shiyanlou.operatorimport org.apache.flink.streaming.api.scala._object SelectOperator {def main(args: Array[String]): Unit = {// 设置流环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取socket文本数据流val inputDS: DataStream[String] = env.socketTextStream("192.168.137.81", 9999)val peopleStream = inputDS.map(line => {val arr = line.split(" ")People(arr(0), arr(1).toFloat)})// 使用 keyBy 按照温度类型(high 或 normal)进行分组val highTempStream = peopleStream.filter(_.temperature > 36.5)val normalTempStream = peopleStream.filter(_.temperature <= 36.5)// 打印输出highTempStream.print("发烧")normalTempStream.print("体温正常")env.execute("SelectOperator")}case class People(name: String, temperature: Float)
}

上面的代码中,我们创建了一个 Socket 输入流监控localhost下的 9999 端口,然后将输入的文本使用空格分隔之后转换为People类。紧接着使用 Split 算子将体温大于 36.0 的人群定义为fever,将体温小于等于 36.0 的人群定义为normal,最后使用select算子选择了fever(发烧)状态的人群并输出到控制台。

打开终端,执行nc -l -p 9999,在 idea 运行以上代码,并在终端中依次发送下面的信息:

张小明 35.6
李鹏程 36.3
赵露 36.7
李阳 35.5
刘明 37.0

在 idea 的控制台会看到将体温高于 36.5 的做了打印(赵露、刘明)。
在这里插入图片描述

实验总结

本节实验中我们介绍了 Flink 中的多流转换算子,其中 Union 是将两个或者多个类型相同的输入流转换成一个输入流,而filter是将一个输入流根据给定的条件切分成多个子输入流。这部分内容在工作中会经常用到,大家一定要理解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/71207.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【STM32】DRV8833驱动电机

1.电机如何转动 只需要给电机两个端子加一正一负的极性就会转起来了&#xff0c;但是要注意的是不要将电机两端直接接在5v和gnd之间&#xff0c;这种电机一般要提供几百毫安的电流&#xff0c;而GPIO口只能提供几毫安&#xff0c;所以我们使用一个DRV8833来驱动 DRV8833输入口…

vue2老版本 npm install 安装失败_安装卡主

vue2老版本 npm install 安装失败_安装卡主 特别说明&#xff1a;vue2老版本安装慢、运行慢&#xff0c;建议升级vue3element plus vite 解决方案1&#xff1a; 第一步、修改npm 镜像为国内镜像 使用淘宝镜像&#xff1a; npm config set registry https://registry.npmmir…

Linux相关概念和易错知识点(27)(认识线程、页表与进程地址空间、线程资源划分)

目录 1.认识线程 &#xff08;1&#xff09;进程与线程的关系 &#xff08;2&#xff09;最小执行流 &#xff08;3&#xff09;轻量级进程&#xff08;LWP&#xff09; ①对task_struct的理解 ②轻量级进程 ③LWP和TCB的区别 2.页表与进程地址空间 &#xff08;1&…

GitHub基本操作及Git简单命令

GitHub简介 GitHub就是一个远程仓库&#xff0c;远程仓库可以理解为就是一个可以保存自己代码的地方&#xff0c;在实际开发当中一个项目往往是有多个人来共同协作开发完成的&#xff0c;那么就需要一个统一代码保存的地方&#xff0c;而GitHub就是起到一个共享和汇总代码的作…

数据结构(陈越,何钦铭)第三讲 树(上)

3.1 树与数的表示 3.1.1 顺序查找 int SequentialSearch(List Tbl,ElementType K){int i;Tbl->Element[0]K;for(iTbl->Length;Tbl->Element[i]!K;i--);return i; } typedef struct LNode *List; struct LNode{ElementType Element[MAXSIZE];int Length; };3.1.2 二分…

【PYTORCH】官方的turoria实现中英文翻译

参考 https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html 背景 pytorch官方的是seq2seq是法语到英文&#xff0c;做了一个中文到英文的。 数据集 下载后解压&#xff0c;使用的data\testsets\devset\UNv1.0.devset.zh和UNv1.0.devset.en&#x…

仿叮咚买菜鸿蒙原生APP

# DingdongShopping 这是一个原生鸿蒙版的仿叮咚买菜APP项目 鸿蒙Next发布至今已经有一年多的时间了&#xff0c;但有时候我们想要实现一些复杂的功能或者效果&#xff0c;在开发文档上查阅一些资料还是比较费时的&#xff0c;有可能还找不到我们想要的内容。而社会层面上分享…

VSCode 接入DeepSeek V3大模型,附使用说明

VSCode 接入DeepSeek V3大模型,附使用说明 由于近期 DeepSeek 使用人数激增,服务器压力较大,官网已 暂停充值入口 ,且接口响应也开始不稳定,建议使用第三方部署的 DeepSeek,如 硅基流动 或者使用其他模型/插件,如 豆包免费AI插件 MarsCode、阿里免费AI插件 TONGYI Lin…

中上211硕对嵌入式AI感兴趣,如何有效规划学习路径?

今天给大家分享的是一位粉丝的提问&#xff0c;中上211硕对嵌入式AI感兴趣&#xff0c;如何有效规划学习路径&#xff1f; 接下来把粉丝的具体提问和我的回复分享给大家&#xff0c;希望也能给一些类似情况的小伙伴一些启发和帮助。 同学提问&#xff1a; 中上211&#xff0c;…

Linux 目录结构与基础命令学习记录

在 Linux 的学习旅程中&#xff0c;熟练掌握基础命令是开启高效操作与系统管理的钥匙。这些命令不仅能帮助我们在 Linux 系统中自由穿梭&#xff0c;还能深入了解系统的运行状态。以下是我对 Linux 基础命令的学习总结&#xff0c;希望能为大家的 Linux 学习提供帮助。 一、Lin…

python学opencv|读取图像(六十五)使用cv2.boundingRect()函数实现图像轮廓矩形标注

【1】引言 前序学习进程中&#xff0c;已经使用cv2.findContours()函数cv2.drawContours()函数实现图像轮廓识别和标注&#xff0c;这种标注沿着图像的轮廓进行&#xff0c;比较细致。相关文章链接为&#xff1a; python学opencv|读取图像&#xff08;六十四&#xff09;使用…

Visionpro 齿轮测量

效果展示 一、题目要求 求出最大值&#xff0c;最小值&#xff0c;平均值 二、分析 1.首先要进行模板匹配 2.划清匹配范围 3.匹配小三角的模板匹配 4.卡尺 5.用找圆工具 工具 1.CogPMAlignTool 2.CogCaliperTool 3.CogFindCircleTool 4.CogFixtureTool 三、模板匹…

【ISO 14229-1:2023 UDS诊断(会话控制0x10服务)测试用例CAPL代码全解析②】

ISO 14229-1:2023 UDS诊断【会话控制0x10服务】_TestCase02 作者&#xff1a;车端域控测试工程师 更新日期&#xff1a;2025年02月15日 关键词&#xff1a;UDS诊断、0x10服务、诊断会话控制、ECU测试、ISO 14229-1:2023 TC10-002测试用例 用例ID测试场景验证要点参考条款预期…

AlmaLinux使用Ansible自动部署k8s集群

一、环境准备 节点规划&#xff08;最低要求&#xff09; 1台Master节点&#xff08;4核/8GB内存&#xff09;2台Worker节点&#xff08;2核/4GB内存&#xff09;1台Ansible控制机&#xff08;可复用Master节点&#xff09; 系统配置 # 所有节点执行 sudo hostnamectl set-hos…

机器学习:十大算法实现汇总

机器学习十大算法代码实现&#xff1a;使用numpy、pandas&#xff0c;不调用机器学习相关库。 已将代码和相关文档上传到了github&#xff1a;golitter/Decoding-ML-Top10: 使用 Python 优雅地实现机器学习十大经典算法。 (github.com) 一元线性回归&#xff1a;机器学习&…

ffmpeg学习:ubuntu下编译Android版ffmpeg-kit

文章目录 前言一. 配置环境1.1 虚拟机版本1.2 安装Android环境1.2.1 Android SDK安装1.2.2 Android NDK安装 1.3 编译前的准备工作1.3.1 libtasn1-1安装1.3.2 meson安装1.3.3 harfbuzz下载 二. 编译ffmpeg-kit三. 总结 前言 ffmpeg-kit是一款跨多个平台的&#xff0c;用于在应…

Qt使用pri和pro文件进行模块化编程

假如我想要做一个功能&#xff0c;这个功能用代码模块化实现出来&#xff0c;方便将来移植&#xff0c;比如音视频播放器的界面&#xff0c;将来想要在其他工程使用时&#xff0c;只需要将widget提升为音视频播放界面即可。 当我们其他工程需要这个功能时&#xff0c;我们在调用…

C# windowForms 的DataGridView控件的使用

C# Windows Forms DataGridView 控件使用详解 DataGridView 是 Windows Forms 中用于显示和编辑表格数据的核心控件。它支持高度自定义的列类型、数据绑定、事件处理和丰富的样式配置。以下是其详细使用方法。 目录 基础使用 数据绑定 列类型与自定义

PyQt 界面编程:QDialog、QWidget、QMainWindow 的面向过程与面向对象编程

文章目录 一、PyQt简介二、面向过程编程三、面向对象编程(推荐)3.1 QWidget窗口3.2 QMainWindow窗口3.3 QDialog窗口文档: https://www.riverbankcomputing.com/static/Docs/PyQt5/ 一、PyQt简介 PyQt简介:PyQt 是一个用于创建图形用户界面(GUI)的 Python 库,它将 Qt …

Jvascript网页设计案例:通过js实现一款密码强度检测,适用于等保测评整改

本文目录 前言功能预览样式特点总结&#xff1a;1. 整体视觉风格2. 密码输入框设计3. 强度指示条4. 结果文本与原因说明 功能特点总结&#xff1a;1. 密码强度检测2. 实时反馈机制3. 详细原因说明4. 视觉提示5. 交互体验优化 密码强度检测逻辑Html代码Javascript代码 前言 能满…