【API篇】五、Flink分流合流API

文章目录

  • 1、filter算子实现分流
  • 2、分流:使用侧输出流
  • 3、合流:union
  • 4、合流:connect
  • 5、connect案例

分流,很形象的一个词,就像一条大河,遇到岸边有分叉的,而形成了主流和测流。对于数据流也一样,不过是一个个水滴替换成了一条条数据。

在这里插入图片描述

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

在这里插入图片描述

1、filter算子实现分流

Demo案例:读取一个整数数字流,将数据流划分为奇数流和偶数流。

实现思路:针对同一个流,多次条用filter算子来拆分

public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> ds = env.socketTextStream("node01", 9527).map(Integer::valueOf);//将ds 分为两个流 ,一个是奇数流,一个是偶数流//使用filter 过滤两次SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);ds1.print("偶数");ds2.print("奇数");env.execute();}
}

以上实现的明显缺陷是,同一条数据,被多次处理。以上其实是将原始数据流stream复制两份,然后对每一份分别做筛选,冗余且低效。

2、分流:使用侧输出流

基本步骤为:

  • 使用process算子(Flink分层API中的最底层的处理函数)
  • 定义OutputTag对象,即输出标签对象,用于后面标记和提取侧流
  • 调用上下文ctx的.output()方法
  • 通过主流获取侧流
案例:实现将WaterSensor按照Id类型进行分流

先定义下MapFunction的转换规则,用来将输入的数据转为自定义的WaterSensor对象:

public class WaterSensorMapFunction implements MapFunction<StringWaterSensor>{@Overridepublic WaterSensor map(String value) throws Exception {String[] strArr = value.split( regex: ",");//String组装对象return new WaterSensor(strArr[0],Long.value0f(strArr[1]),Integer.value0f(strArr[2]));}
}

使用侧流:

public class SplitStreamByOutputTag {    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());//定义两个输出标签对象,用于后面标记和提取侧流OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));//返回的都是主流SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>(){@Override//形参为别为:流中的一条数据、上下文对象、收集器public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1, value);} else if ("s2".equals(value.getId())) {ctx.output(s2, value);} else {//主流out.collect(value);}}});ds1.print("主流");SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);s1DS.printToErr("侧流s1");  //区别主流,让控制台输出标红s2DS.printToErr("侧流s2");env.execute();}
}

相关传参说明,首先是创建OutputTag对象时的传参:

  • 第一个参数为标签名,用于区分是哪一个侧流
  • 第二个是放入侧流中的数据的类型,且必须是Flink的类型(TypeInfomation,借助Types类)
  • OutputTag的泛型,是流到对应的侧流的数据类型

ProcessFunction接口的泛型中:

  • 第一个是输入的数据类型
  • 第二个是输出到主流上的数据类型

ctx.output方法的形参:

  • 第一个为outputTag对象
  • 第二个为数据,上面代码中传value即直接输出数据本身,也可输出处理后的数据,主流侧流数据类型不用一致

看下运行效果:

在这里插入图片描述

3、合流:union

将来源不同的多条流,合并成一条来联合处理,即合流。最简单的合流操作,就是直接将多条流合在一起,叫作流的联合(union)

在这里插入图片描述

union的条件是:

  • 每条流中要合并的数据类型必须相同(原始不同,可先借助map,在union)
  • 合并之后的新流会包括所有流中的元素,数据类型不变
stream1.union(stream2, stream3, ...)  //可变长参数
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");ds1.union(ds2,ds3.map(Integer::valueOf)).print();env.execute();}
}
//输出:
1
2
3
2
2
3
2
2
3

4、合流:connect

union合并流受限于数据类型,因此还有另一种合流操作:connect

在这里插入图片描述

public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Integer流SingleOutputStreamOperator<Integer> source1 = env.socketTextStream("node01", 9527).map(i -> Integer.parseInt(i));//String流DataStreamSource<String> source2 = env.socketTextStream("node01", 2795);/*** 总结: 使用 connect 合流* 1、一次只能连接 2条流* 2、流的数据类型可以不一样* 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的*/ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return "来源于原source1流:" + value.toString();}@Overridepublic String map2(String value) throws Exception {return "来源于原source2流:" + value;}});result.print();env.execute();    }
}

使用 connect 合流的总结:

  • 一次只能连接 2条流,因为connect返回的是一个ConnectedStreams对象,不再是DataStreamSource或其子类了
  • 两条流中的数据类型可以不一样
  • 连接后可以调用 map、flatmap、process来处理,但是各处理各的

以map为例,其形参是一个CoMapFuntion接口类型,泛型则分别是流1的数据类型、流2的数据类型、合并及处理后输出的数据类型。两个map方法可以看出,虽然两个流合并成一个了,但处理数据时还是各玩各的。

  • .map1()就是对第一条流中数据的map操作
  • .map2()则是针对第二条流

在这里插入图片描述

connect 就类比被逼相亲后结婚,两个人看似成一家了,但实际上各自玩各自的。往大了举例就相当于一国两制。

5、connect案例

和connect以后的map传CoMapFunction一样,process算子也不再传ProcessFunction,而是CoProcessFunction,实现两个方法:

  • processElement1():针对第一条流
  • processElement2():针对第二条流

connect合并后得到的ConnectedStreams也可以直接调用.keyBy()进行按键分区,分区后返回的还是一个ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);
//keySelector1和keySelector2,是两条流中各自的键选择器

ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理

案例需求:连接两条流,输出能根据id匹配上的数据,即两个流里元组f0相同的数据(类似inner join效果)
public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//二元组流DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));//三元组流DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);// 多并行度下,需要根据 关联条件 进行keyby,才能保证key相同的数据到一起去,才能匹配上ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);SingleOutputStreamOperator<String> result = connectKey.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {// 定义 HashMap,缓存来过的数据,key=id,value=list<数据>Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s1数据,都存起来if (!s1Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 1.2 不是第一条,直接添加到 list中s1Cache.get(id).add(value);}//TODO 2.根据id,查找s2的数据,只输出 匹配上 的数据if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("s1:" + value + "<--------->s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s2数据,都存起来if (!s2Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 1.2 不是第一条,直接添加到 list中s2Cache.get(id).add(value);}//TODO 2.根据id,查找s1的数据,只输出 匹配上 的数据if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("s1:" + s1Element + "<--------->s2:" + value);}}}});result.print();env.execute();}
}

运行效果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/113049.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能电表上的模块发热正常吗?

智能电表是一种可以远程抄表、计费、控制和管理的电力计量设备&#xff0c;它可以实现智能化、信息化和网络化的电力用电管理。智能电表的主要组成部分包括电能计量模块、通信模块、控制模块和显示模块等。其中&#xff0c;通信模块和控制模块是智能电表的核心部件&#xff0c;…

虹科 | 解决方案 | 机械免拆压力测试方案

对于发动机的气门卡滞或气门开闭时刻错误、活塞环磨损、喷油嘴泄漏/堵塞等故障&#xff0c;往往需要解体发动机或拆卸部件才能发现&#xff1b;而对于某些轻微的故障&#xff0c;即使解体了发动机后也经常难于肉眼判别 虹科Pico提供的WPS500压力测试方案&#xff0c;可以动态测…

为什么索引要用B+树来实现呢,而不是B树

首先&#xff0c;常规的数据库存储引擎&#xff0c;一般都是采用 B 树或者 B树来实现索引的存储。 B树 因为 B 树是一种多路平衡树&#xff0c;用这种存储结构来存储大量数据&#xff0c;它的整个高度会相比二叉树来说&#xff0c;会矮很多。 而对于数据库来说&#xff0c;所有…

修改echarts的tooltip样式 折线图如何配置阴影并实现渐变色和自适应

图片展示 一、引入echarts 这里不用多解释 vue里使用 import echarts from “echarts”; html页面引用js文件或用script标签引用 二、定义一个具有宽高的dom div <div id"echart-broken" style"width:400px;height: 200px;"></div>三、定义…

layui框架实战案例(21):layui table单元格显示图片导致复选框冗余的解决方案

图片自适应表格CSS 为防止单元格内的图片不能正常显示&#xff0c;需本地重写CSS。 /*layui-table图片自适应*/ .layui-table-cell {height: auto;line-height: 20px;}.layui-table-cell img {height: 50%;max-width: 50%; }列代码 , cols: [[{type: checkbox,fixed:left, w…

非科班,补基础

大家好&#xff0c;我是大彬~ 今天跟大家分享知识星球小伙伴关于【非科班转码如何补基础】的提问。 往期星球提问整理&#xff1a; 读博&#xff1f;找工作&#xff1f; 性格测试真的很重要 想找一份实习工作&#xff0c;需要准备什么 球友提问&#xff1a; 大彬大佬&#xf…

Vue 3使用 Iconify 作为图标库与图标离线加载的方法、 Icones 开源在线图标浏览库的使用

之前一直naive-ui搭配使用的是xicons&#xff0c;后来发现Iconify支持的图标合集更多&#xff0c;因此转而使用Iconify。 与FontAwesome不同的是&#xff0c;Iconify配合Icones相当于是一个合集&#xff0c;Iconify提供了快捷引入图标的方式&#xff0c;而Icones是一个大的图标…

数据结构与算法-(10)---列表(List)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

力扣每日一题52:N皇后问题||

题目描述&#xff1a; n 皇后问题 研究的是如何将 n 个皇后放置在 n n 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;返回 n 皇后问题 不同的解决方案的数量。 示例 1&#xff1a; 输入&#xff1a;n 4 输出&#xff1a;2 解释&#…

【微信小程序】后台数据交互于WX文件使用

目录 一、前期准备 1.1 数据库准备 1.2 后端数据获取接口编写 1.3 前端配置接口 1.4 封装微信的request请求 二、WXS文件的使用 2.1 WXS简介 2.2 WXS使用 三、后台数据交互完整代码 3.1 WXML 3.2 JS 3.3 WXSS 效果图 一、前期准备 1.1 数据库准备 创建数据库&…

TX Text Control.NET 32.0 For WPF

TX Text Control 支持VISUAL STUDIO 2022、.NET 5 和 .NET 6 支持 .NET WPF 应用程序的文档处理 将文档编辑、创建和 PDF 生成添加到您的 WPF 应用程序中。 视窗用户界面 功能齐全的文档编辑器 TX Text Control 是一款完全可编程的丰富编辑控件&#xff0c;它在专为 Visual Stu…

同步网盘选择指南:哪个同步网盘更好用?

同步盘是当下热门的云存储服务之一&#xff0c;它可以将您的文件在不同设备之间进行同步&#xff0c;使您可以随时随地访问和共享您的文件&#xff0c;因此受到了许多用户的喜爱。 一、什么是同步盘 首先到底什么是同步盘&#xff1f;同步盘是指一种云存储服务&#xff0c;它…

零基础Linux_20(进程信号)内核态和用户态+处理信号+不可重入函数+volatile

目录 1. 内核态和用户态 1.1 内核态和用户态概念 1.2 内核态和用户态转化 2. 处理信号 2.2 捕捉信号 2.2 系统调用sigaction 3. 不可重入函数 4. volatile关键字 5. SIGCHLD信号&#xff08;了解&#xff09; 6. 笔试选择题 答案及解析 本篇完。 1. 内核态和用户态…

【吞噬星空】又被骂,罗峰杀人目无法纪,但官方留后手,增加审判戏份

Hello,小伙伴们&#xff0c;我是小郑继续为大家深度解析国漫吞噬星空资讯。 吞噬星空动画中&#xff0c;罗峰复仇的戏份&#xff0c;简直是帅翻了&#xff0c;尤其是秒杀阿特金三大巨头&#xff0c;让人看的也是相当的解气&#xff0c;相当的爽&#xff0c;一点都不拖沓&#x…

fastadmin笔记,fastadmin表格功能

fastadmin笔记 官方文档请到&#xff1a; https://ask.fastadmin.net/article/323.html自行查阅 1、默认有个切换功能。 浏览模式可以切换卡片视图和表格视图两种模式&#xff0c;如果不需要此功能 在该控制器对应的js 文件中添加上showToggle:false即可。 2、导出功能 …

linux基础IO

文章目录 前言一、基础IO1、文件预备知识1.1 文件类的系统调用接口1.2 复习c语言接口 2、文件类的系统调用接口2.1 open系统调用2.2 close系统调用2.3 write系统调用2.4 read系统调用 3、文件描述符3.1 文件描述符fd介绍3.2 文件描述符fd分配规则与重定向3.3 重定向原理3.4输入…

armbian安装gcc、g++

文章目录 安装GCC安装G 安装GCC 打开终端&#xff0c;更新软件包列表&#xff1a; sudo apt update安装GCC&#xff1a; sudo apt install gcc如果需要安装特定版本的GCC&#xff0c;可以使用以下命令&#xff1a; sudo apt install gcc-<version> # sudo apt install g…

小白也能成功搭建网站

随着互联网的快速发展&#xff0c;拥有一个个人网站已经成为了越来越多人的追求。然而&#xff0c;对于编程知识不太了解的小白来说&#xff0c;搭建个人网站似乎是一件很困难的事情。但是&#xff0c;现在有了一个不需要编程的方法&#xff0c;小白也能够轻松建立自己的个人网…

如何利用IP定位技术进行反欺诈?

网络欺诈风险是指在互联网和数字领域中&#xff0c;存在各种类型的欺诈活动&#xff0c;旨在欺骗个人、组织或系统以获得非法获益。以下是一些常见的网络欺诈风险类型&#xff1a; 身份盗用&#xff1a;这是一种欺诈行为&#xff0c;涉及盗取他人的个人身份信息&#xff0c;如姓…