【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】

【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】

  • 1)使用 filter 简单实现
  • 2)使用侧输出流实现

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

1)使用 filter 简单实现

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选,就可以得到拆分之后的流了。

案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

public class SplitByFilterDemo {public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);/*** TODO 使用filter来实现分流效果* 缺点: 同一个数据,要被处理两遍(调用两次filter)*/SingleOutputStreamOperator<String> even = socketDS.filter(value -> Integer.parseInt(value) % 2 == 0);SingleOutputStreamOperator<String> odd = socketDS.filter(value -> Integer.parseInt(value) % 2 == 1);even.print("偶数流");odd.print("奇数流");env.execute();}
}

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

2)使用侧输出流实现

关于处理函数中侧输出流的用法,我们已经在 flatmap 课节做了详细介绍。简单来说,只需要调用上下文 ctx.output() 方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”OutputTag),指定了侧输出流的 id 和类型。

代码实现:将 WaterSensor 按照 id 类型进行分流。

准备好自定义的 MapFunction:

public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}
}

实现:

public class SideOutputDemo {public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());/*** TODO 使用侧输出流 实现分流* 需求: watersensor的数据,s1、s2的数据分别分开** TODO 总结步骤:*    1、使用 process算子*    2、定义 OutputTag对象*    3、调用 ctx.output*    4、通过主流 获取 测流*//*** 创建OutputTag对象* 第一个参数: 标签名* 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation*/OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<WaterSensor> process = sensorDS.process(new ProcessFunction<WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {String id = value.getId();if ("s1".equals(id)) {// 如果是 s1,放到侧输出流s1中/*** 上下文ctx 调用ouput,将数据放入侧输出流* 第一个参数: Tag对象* 第二个参数: 放入侧输出流中的 数据*/ctx.output(s1Tag, value);} else if ("s2".equals(id)) {// 如果是 s2,放到侧输出流s2中ctx.output(s2Tag, value);} else {// 非s1、s2的数据,放到主流中out.collect(value);}}});// 从主流中,根据标签 获取 侧输出流SideOutputDataStream<WaterSensor> s1 = process.getSideOutput(s1Tag);SideOutputDataStream<WaterSensor> s2 = process.getSideOutput(s2Tag);// 打印主流process.print("主流-非s1、s2");//打印 侧输出流s1.printToErr("s1");s2.printToErr("s2");env.execute();}
}

要点:

1、使用 process();(是最底层 API)。

2、process 每次处理一条数据

3、定义 OutputTag 对象:

(1)第一个参数:标签名

(2)第二个参数:放入侧输出流中的数据的类型,Typeinformation

4、调用 ctx.output();

5、通过主流获取测流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/644416.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx 代理服务路径带/和不带/的问题

nginx初始配置如下 server {listen 6087;location / {#网站主页路径。此路径仅供参考&#xff0c;具体请您按照实际目录操作。#例如&#xff0c;您的网站运行目录在/etc/www下&#xff0c;则填写/etc/www。#允许跨域请求的域&#xff0c;* 代表所有add_header Access-Control-…

AMIS的组件学习使用

部分代码片段 {"id": "filterForm","className": " xysd-zbkb-pubquery","labelWidth": 130,"body": [{"type": "grid","className": "xysd-grid-query-input","c…

第12章_集合框架(Collection接口,Iterator接口,List,Set,Map,Collections工具类)

文章目录 第12章_集合框架本章专题与脉络1. 集合框架概述1.1 生活中的容器1.2 数组的特点与弊端1.3 Java集合框架体系1.4 集合的使用场景 2. Collection接口及方法2.1 添加2.2 判断2.3 删除2.4 其它 3. Iterator(迭代器)接口3.1 Iterator接口3.2 迭代器的执行原理3.3 foreach循…

Android TP方向调试

一、问题描述 拿到一款新TP&#xff0c;适配好驱动后&#xff0c;触摸屏幕发现触摸点位置和Android报点位置不一致&#xff0c;如上滑变成下滑或者左滑右滑等 二、问题分析 1.加载TP驱动&#xff0c;且驱动能正确上报数据 2.使用命令打开触摸报点 settings put system show…

dolphinscheduler节点二次开发需要改动的部分

dolphinscheduler节点二次开发需要改动的部分 前端 在dolphinscheduler-ui/public/images/task-icons/目录下新增两个节点的logo图片&#xff0c;一个为激活状态的一个为非激活状态的&#xff0c;如下。 修改文件dolphinscheduler-ui/src/views/projects/task/constants/task…

实战:加密传输数据解密

前言 下面将分享一些实际的渗透测试经验&#xff0c;帮助你应对在测试中遇到的数据包内容加密的情况。我们将以实战为主&#xff0c;技巧为辅&#xff0c;进入逆向的大门。 技巧 开局先讲一下技巧&#xff0c;掌握好了技巧&#xff0c;方便逆向的时候可以更加快速的找到关键…

HCIE之BGP基础概念(一)

BGP 一、BGP的基本概述二、BGP分类三、BGP的工作原理BGP报文类型&#xff1a;BGP状态机&#xff1a; 四、BGP对等体之间的交互原则解决BGP路由黑洞方法&#xff1a; 五、路由反射器路由反射规则路由反射器下防环联邦 六、BGP属性特点优选协议首选值&#xff08;PrefVal&#xf…

Oracle 数据库恢复删除的数据

需求描述&#xff1a; 同事让删除脏数据&#xff0c;结果删错了&#xff0c;需要恢复数据 思路&#xff1a; 利用闪回恢复数据只能恢复15分钟之内的&#xff0c;后面undo空间会被重写&#xff0c;就恢复不了&#xff0c;所以删除数据后&#xff0c;要谨慎再三确认&#xff0c…

正弦量的有效值

正弦量的有效值 引言正文 引言 相信有很多小伙伴们还记得&#xff0c;初中学习电压电流的时候&#xff0c;对于交变电流&#xff0c;电压&#xff0c;其有效值为正弦信号的最大值除以 1 2 \cfrac{1}{\sqrt{2}} 2 ​1​。 具体推到过程&#xff0c;作者以后有时间了会进行添加…

ESP8266 PlatformIO Arduino中为墨水屏制作u8g2自定义字库

Content 0. 前言1. 工具下载2. 字体下载3. 开始转换4. 添进字库0. 前言 使用的是 U8g2_for_Adafruit_GFX 字库,兴许你会在其他地方遇见更中意的字体,现在我们将它添加进去。 操作系统:Windows10 专业版 1. 工具下载 使用开源的 u8g2FontTool 软件,下载完成后解压。 资…

PHP编程实践:实际商品价格数据采集

引言 在电子商务领域&#xff0c;对商品价格进行数据采集和对比是一项常见的需求。本文将介绍如何使用PHP编程语言实现对1688和淘宝商品价格数据的采集和对比&#xff0c;帮助读者了解实际的编程实践过程。 一、数据采集原理 数据采集是指从互联网上获取数据的过程&#xff…

学习理解Java工厂模式

学习理解Java工厂模式 一、前言二、简单工程模式三、工厂方法模式四、抽象工厂模式五、静态工厂模式和 new 有什么区别&#xff1f;六、总结 一、前言 工厂模式目的是封装对象的创建过程&#xff0c;将对象的创建和使用分离开来&#xff0c;从而提高代码的可维护性和可扩展性。…

腾讯云上linux系统使用nginx,flask构建个人网站SSL证书过期换证书的操作步骤

ssl证书过期的时候&#xff0c;一般腾讯云提前一段时间给通知&#xff0c;让更换ssl证书&#xff0c;现在一般都可以免费更换&#xff0c;一般是一年期的&#xff0c;审核通过之后&#xff0c;需要下载nginx版本的证书&#xff0c;我的是4个文件&#xff0c;替换到nginx/cert文…

【前端web入门第一天】01 开发环境、HTML基本语法文本标签

文章目录: 1. 准备开发环境 1.1 vs Code基本使用 2.HTML文本标签 2.1 标签语法2.2 HTML基本骨架2.3 标签的关系2.4 注释2.5 标题标签2.6 段落标签2.7 换行与水平线标签2.8 文本格式化标签 1. 准备开发环境 VSCode与谷歌浏览器离线版,安装包评论区自提. VSCode默认安装位置:C…

3、非数值型的分类变量

非数值型的分类变量 有很多非数字的数据,这里介绍如何使用它来进行机器学习。 在本教程中,您将了解什么是分类变量,以及处理此类数据的三种方法。 本课程所需数据集夸克网盘下载链接:https://pan.quark.cn/s/9b4e9a1246b2 提取码:uDzP 文章目录 1、简介2、三种方法的使用1…

书生·浦语大模型实战营-学习笔记5

LMDeploy 大模型量化部署实践 大模型部署背景 LMDeploy简介 轻量化、推理引擎、服务 核心功能-量化 显存消耗变少了 大语言模型是典型的访存密集型任务&#xff0c;因为它是decoder-by-decoder 先把数据量化为INT4存起来&#xff0c;算的时候会反量化为FP16 AWQ算法&a…

Angular组件(一) 分割面板ShrinkSplitter

Angular组件(一) 分割面板ShrinkSplitter 前言 分割面板在日常开发中经常使用&#xff0c;可将一片区域&#xff0c;分割为可以拖拽整宽度或高度的两部分区域。模仿iview的分割面板组件&#xff0c;用angular实现该功能&#xff0c;支持拖拽和[(ngModel)]双向绑定的方式控制区…

Docker容器引擎(2)

目录 一.批量删除镜像&#xff0c;容器 二.Docker 网络实现原理 随机映射端口&#xff08;从32768开始&#xff09; 访问自己&#xff1a; 在10服务器上配置路由转发&#xff1a; 指定映射端口&#xff1a; 查看容器的输出和日志信息&#xff1a; 将宿主机目标|文件挂载…

RabbitMQ中交换机的应用及原理,案例的实现

目录 一、介绍 1. 概述 2. 作用及优势 3. 工作原理 二、交换机Exchange 1. Direct 2. Topic 3. Fanout 三、代码案例 消费者代码 1. 直连direct 生产者代码 测试 2. 主题topic 生产者代码 测试 3. 扇形fanout 生产者代码 测试 每篇一获 一、介绍 1. …

Vue的生命周期方法

beforeCreate 在实例初始化之后&#xff0c;数据观测&#xff08;data observe&#xff09;和 event/watcher 事件配置之前被调用。在当前阶段 data、methods、computed 以及 watch 上的数据和方法都不能被访问。 created 实例已经创建完成之后被调用。在这一步&#xff0c;实…