Flink多流转换(1)—— 分流合流

目录

分流

代码示例

使用侧输出流

合流

联合(Union)

连接(Connect)


简单划分的话,多流转换可以分为“分流”和“合流”两大类

目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用 union、connect、join 以及 coGroup 等接口进行连接合并操作

分流

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream

代码示例

调用.filter()方法进行筛选,将符合条件的数据拣选出来放到对应的流里

public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 筛选Mary的浏览行为放入MaryStream流中DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Mary");}});// 筛选Bob的购买行为放入BobStream流中DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Bob");}});// 筛选其他人的浏览行为放入elseStream流中DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return !value.user.equals("Mary") && !value.user.equals("Bob") ;}});MaryStream.print("Mary pv");BobStream.print("Bob pv");elseStream.print("else pv");env.execute();}
}

缺点:上述操作将原始流复制了三份,对每一份分别进行筛选,因此代码冗余,不够高效

解决:①.split()方法(但限制了数据类型转换,已经废弃)

②测输出流

使用侧输出流

改进后的代码如下:

public class SplitStreamByOutputTag {// 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {@Overridepublic void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {if (value.user.equals("Mary")){ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));} else if (value.user.equals("Bob")){ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));} else {out.collect(value);}}});processedStream.getSideOutput(MaryTag).print("Mary pv");processedStream.getSideOutput(BobTag).print("Bob pv");processedStream.print("else");env.execute();}
}

①定义OutputTag作为标签

②使用ctx.output方法将符合筛选条件的数据写入侧输出流

③使用getSideOutput方法从侧输出流中获得数据

合流

对于来源不同的多条流中的数据进行联合处理(与分流相比,合流操作更为普遍)

联合(Union)

直接将多条流合在一起(要求必须流中的数据类型必须相同),合并之后的新流会包括所有流中的元素,数据类型不变

操作:基于 DataStream 直接调用.union()方法

参数:其他 DataStream

返回值:一个 DataStream

stream1.union(stream2, stream3, ...)

水位线时效性:多流合并时处理的时效性是以最慢的那个流为准的(多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程)

代码示例:

public class UnionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream1.print("stream1");SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream2.print("stream2");// 合并两条流stream1.union(stream2).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {out.collect("水位线:" + ctx.timerService().currentWatermark());}}).print();env.execute();}
}

测试:

分别在两台机器上输入以下数据:

hadoop102 :Alice, ./home, 1000
hadoop103 :Alice, ./home, 2000
hadoop102 :Alice, ./home, 3000

水位线的推进如下:

连接(Connect)

连接操作允许流的数据类型不同

连接流(ConnectedStreams)

连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的

要想得到新的 DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型

代码实现

①基于一条 DataStream 调用.connect()方法,传入另外一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams

②调用同处理方法得到 DataStream(可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法)

public class ConnectTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<Integer> stream1 = env.fromElements(1,2,3);DataStream<Long> stream2 = env.fromElements(1L,2L,3L);ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {@Overridepublic String map1(Integer value) {return "Integer: " + value;}@Overridepublic String map2(Long value) {return "Long: " + value;}});result.print();env.execute();}
}

①ConnectedStreams有两个类型参数,分别是stream1和stream2的类型;

②map方法中实现了一个CoMapFunction,表示分别对两条流中的数据执行 map 操作

类型参数<IN1, IN2, OUT>,分别表示第一条流、第二条流,以及合并后的流中的数据类型

这里我们将一条 Integer 流和一条 Long 流合并,转换成 String 输出。所以当遇到第一条流输入的整型值时,调用.map1();而遇到第二条流输入的长整型数据时,调用.map2():最终都转换为字符串输出,合并成了一条字符串流

③补充:ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个 ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);

传入的参数是两条流中各自的键选择器

这样的操作就是把两条流中key相同的数据放到了一起,然后针对来源的流各自进行处理;

同样也可以在合并之前先使用KeyBy进行分区,然后基于两条KeyedStream进行连接操作;

要注意两条流定义的键的类型必须相同,否则会抛出异常


CoProcessFunction

对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)

例如:CoMapFunction、CoFlatMapFunction、CoProcessFunction

CoProcessFunction源码如下:

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}public abstract class Context {...}
...
}

简单示例:实现一个实时对账的需求,也就是app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息

// 实时对账
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 来自app的支付日志SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(Tuple3.of("order-1", "app", 1000L),Tuple3.of("order-2", "app", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {return element.f2;}}));// 来自第三方支付平台的支付日志SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(Tuple4.of("order-1", "third-party", "success", 3000L),Tuple4.of("order-3", "third-party", "success", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {return element.f3;}}));// 检测同一支付单在两条流中是否匹配,不匹配就报警appStream.connect(thirdpartStream).keyBy(data -> data.f0, data -> data.f0).process(new OrderMatchResult()).print();env.execute();}// 自定义实现CoProcessFunctionpublic static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>{// 定义状态变量,用来保存已经到达的事件private ValueState<Tuple3<String, String, Long>> appEventState;private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;@Overridepublic void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG)));}@Overridepublic void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {// 看另一条流中事件是否来过if (thirdPartyEventState.value() != null){out.collect("对账成功:" + value + "  " + thirdPartyEventState.value());// 清空状态thirdPartyEventState.clear();} else {// 更新状态appEventState.update(value);// 注册一个5秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);}}@Overridepublic void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {if (appEventState.value() != null){out.collect("对账成功:" + appEventState.value() + "  " + value);// 清空状态appEventState.clear();} else {// 更新状态thirdPartyEventState.update(value);// 注册一个5秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来if (appEventState.value() != null) {out.collect("对账失败:" + appEventState.value() + "  " + "第三方支付平台信息未到");}if (thirdPartyEventState.value() != null) {out.collect("对账失败:" + thirdPartyEventState.value() + "  " + "app信息未到");}appEventState.clear();thirdPartyEventState.clear();}}}

运行结果如下:

运行结果解析:
①在CoProcessFunction的实现中,声明了两个状态变量用来保存App的支付信息和第三方的支付信息

②App支付信息到达之后,触发processElement1中的操作,检查第三方的支付信息是否已经到达(如果先到达会保存在相应的状态变量中);如果已经到达,则对账成功;如果没有到达,则等待5s,仍未到达则对账失败;

③第三方支付信息到达后,流程同②

④对于order-1,时间戳为1000的数据(App)到达后,第三方支付信息未到达,等待5s,接着时间戳未3000的数据(第三方)到达后,发现App支付信息已经到达,因此对账成功

⑤对于order-2和order-3,均是等待5s后没有检测到App(第三方)数据到达而发出报警信息

广播连接流(BroadcastConnectedStream)

DataStream 调用.connect()方法时可以传入一个广播流(BroadcastConnectedStream)

这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以我们可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)

如何创建广播流


基于DataStream调用.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor),说明状态的名称和类型;

因为广播状态底层是用一个“映射”(map)结构来保存的

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

数据流和广播流的连接

得到“广播连接流”(BroadcastConnectedStream),然后基于广播连接流调用.process()方法,就可以同时获取规则和数据,进行动态处理

DataStream<String> output = stream.connect(ruleBroadcastStream).process( new BroadcastProcessFunction<>() {...} );
BroadcastProcessFunction

BroadcastProcessFunction 与 CoProcessFunction 类似,同样是一个抽象类,需要实现两个方法,针对合并的两条流中元素分别定义处理操作。区别在于这里一条流是正常处理数据,而另一条流则是要用新规则来更新广播状态,所以对应的两个方法叫作.processElement().processBroadcastElement()

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/643173.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac Idea安装后无法启动

1、起因 想安装一个新版的idea2023.3.2&#xff0c;结果安装完之后直接无法启动 以为是卸载不干净&#xff0c;下载了一个腾讯柠檬&#xff0c;结果将2018版也一并卸载了 好家伙&#xff0c;彻底没得用 2、找原因 1&#xff09;查看idea报错信息 网上找了一圈&#xff0c;其…

Unity 适配器模式(实例详解)

文章目录 简介1. **Input Adapter 示例**2. **Component Adapter 示例**3. **网络数据解析适配器**4. **物理引擎适配**5. **跨平台服务适配** 简介 Unity中的适配器模式&#xff08;Adapter Pattern&#xff09;主要用于将一个类的接口转换为另一个接口&#xff0c;以便于原本…

ctfshow-命令执行(web53-web72)

目录 web53 web54 web55 web56 web57 web58 web59 web60 web61 web62 web63 web64 web65 web66 web67 web68 web69 web70 web71 web72 web53 …

麒麟系统—— openKylin 安装到虚拟机以及开放SSH通过工具连接

麒麟系统—— openKylin 安装到虚拟机以及开放SSH通过工具连接 1. 在VMware中安装openKylin麒麟系统步骤1&#xff1a;准备VMware环境步骤2&#xff1a;创建新的虚拟机步骤3&#xff1a;安装openKylin麒麟系统步骤4&#xff1a;调整分别率步骤5&#xff1a;安装SSH 2. 使用Open…

x-cmd pkg | perl - 具有强大的文本处理能力的通用脚本语言

目录 介绍首次用户技术特点竞品进一步阅读 介绍 Perl 是一种动态弱类型编程语言。Perl 内部集成了正则表达式的功能&#xff0c;以及巨大的第三方代码库 CPAN;在处理文本领域,是最有竞争力的一门编程语言之一 生态系统&#xff1a;综合 Perl 档案网络 (CPAN) 提供了超过 25,0…

flink-java使用介绍,flink,java

1、环境准备 文档&#xff1a;https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/ 仓库&#xff1a;https://github.com/apache/flink 下载&#xff1a;https://flink.apache.org/zh/downloads/ 下载指定版本&#xff1a;https://archive.apache.org/dist/flink…

c语言-柔性数组

文章目录 前言一、柔性数组的介绍1.1 柔性数组的定义 二、柔性数组的使用2.1 使用说明2.2 结构体中的成员只包含一个柔性数组成员2.3 结构体中的成员包含其他成员和一个柔性数组成员 三、模拟柔性数组总结 前言 本篇文章介绍c语言中的柔性数组。 一、柔性数组的介绍 1.1 柔性…

玩客云Armbian 23.8.1 Bullseye安装PrometheusGrafana

Welcome to Armbian 23.8.1 Bullseye with bleeding edge Linux 6.4.13-edge-meson prometheus 参考Monitoring – How to install Prometheus/Grafana on arm – Raspberry PI/Rock64 | Blogs (mytinydc.com) cd /usr/local/srcwget https://github.com/prometheus/prometh…

MySQL也开始支持JavaScript了

2023 年 12 月 16 日&#xff0c;Oracle 公司在一篇名为 《Introducing JavaScript support in MySQL》的文章中宣布 MySQL 数据库服务器将开始支持 JavaScript 语言。 这个举措标志着继PostgreSQL之后&#xff0c; MySQL 也支持使用 JavaScript 编写函数和存储过程了。作为最…

微信小程序(十一)表单组件(进阶)

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a;&#xff08;涉及内容较多&#xff0c;建议细看源码&#xff09; 1.radio-group的使用与数据处理 2.checkbox-group的使用与数据处理 3.picker的使用与数据同步处理(此处示范了地域与日期) 源码&#xff1a; form…

多协议转BACnet网关BA110

随着通讯技术和控制技术的发展&#xff0c;为了实现楼宇的高效、智能化管理&#xff0c;集中监控管理已成为楼宇智能管理发展的必然趋势。在此背景下&#xff0c;高性能的楼宇暖通数据传输解决方案——协议转换网关应运而生&#xff0c;广泛应用于楼宇自控和暖通空调系统应用中…

《WebKit 技术内幕》学习之六(2): CSS解释器和样式布局

2 CSS解释器和规则匹配 在了解了CSS的基本概念之后&#xff0c;下面来理解WebKit如何来解释CSS代码并选择相应的规则。通过介绍WebKit的主要设施帮助理解WebKit的内部工作原理和机制。 2.1 样式的WebKit表示类 在DOM树中&#xff0c;CSS样式可以包含在“style”元素中或者使…

unity刷新grid,列表

获取UIGrid 组件&#xff0c;更新列表 listParent.GetComponent().repositionNow true;

自然语言处理--概率最大中文分词

自然语言处理附加作业--概率最大中文分词 一、理论描述 中文分词是指将中文句子或文本按照语义和语法规则进行切分成词语的过程。在中文语言中&#xff0c;词语之间没有明显的空格或标点符号来分隔&#xff0c;因此需要通过分词工具或算法来实现对中文文本的分词处理。分词的…

[C++]使用yolov8的onnx模型仅用opencv和bytetrack实现目标追踪

【官方框架地址】 yolov8: https://github.com/ultralytics/ultralytics bytetrack: https://github.com/ifzhang/ByteTrack 【算法介绍】 随着人工智能技术的不断发展&#xff0c;目标追踪已成为计算机视觉领域的重要研究方向。Yolov8和ByTetrack作为当前先进的算法&…

Python实现两因素独立设计方差分析,简单效应分析

# Python实现两因素独立设计方差分析 1. 背景 1. 有研究者探讨了在不同企业文化下&#xff0c;管理者的不同语言风格所产生的影响 有的企业注重员工的独立性&#xff0c;强调个人努力和内部竞争&#xff1b;有的企业注重员工的整体性&#xff0c;强调团队合作和团队绩效。 …

LCD液晶屏驱动详解(3)

2.2、LCD控制寄存器LCDCON2 用于设置垂直方向各信号的时间参数&#xff0c;格式如下表所示&#xff1a; 功能位说明VBPD[31:24]VSYNC信号脉冲之后&#xff0c;还要经过(VBPD1)个HSYNC信号周期&#xff0c;有效的行数据才出现&#xff1b;LINEVAL[23:14]LCD的垂直宽度&#xf…

2024PMP考试新考纲-【过程领域】近期典型真题和很详细解析(11)

华研荟继续为您分享【过程Process领域】的新考纲下的真题&#xff0c;帮助大家体会和理解新考纲下PMP的考试特点和如何应用所学的知识和常识&#xff08;经验&#xff09;来解题&#xff0c;并且举一反三&#xff0c;一次性3A通过2024年PMP考试。 2024年PMP考试新考纲-【过程领…

探索文件与交互:使用PyQt5构建一个高级文件选择器

在当今的应用程序开发中&#xff0c;文件管理和交互是一个重要的组成部分。特别是对于桌面应用程序&#xff0c;提供一个直观、功能丰富的文件选择器是提高用户体验的关键。 本篇博客&#xff0c;我将介绍如何使用Python和PyQt5来构建一个高级的文件选择器&#xff0c;它不仅能…

数据挖掘笔记1

课程&#xff1a;清华大学-数据挖掘&#xff1a;理论与算法&#xff08;国家级精品课&#xff09;_哔哩哔哩_bilibili 一、Learning Resources 二、Data 数据是最底层的一种表现形式。数据具有连续性。从存储上来讲&#xff0c;数据分为逻辑上的和物理层的。大数据&#xff1…