Flink Transformation - 转换算子全面解析

Flink Transformation - 转换算子全面解析

一、引言

在Flink的数据流处理中,转换算子(Transformation Operators)扮演着极为关键的角色。它们能够对输入的数据流进行各种处理和转换操作,以满足不同的业务需求。本文将详细介绍Flink中常见的转换算子,包括mapflatMapfilterkeyByreduceunionconnect以及各种物理分区算子,并结合代码示例进行深入讲解。

二、常用转换算子

(一)map算子

map算子用于将一个数据流中的每个元素进行一对一的转换。例如,假设有如下数据,我们可以将其转换为一个LogBean对象并输出。首先,读取本地文件的方式如下:

DataStream<String> lines = env.readTextFile("./data/input/flatmap.log");

假设LogBean类有相应的字段定义(例如String field1; String field2;等),map算子的使用示例如下:

DataStream<LogBean> logBeanStream = lines.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {// 解析line并创建LogBean对象LogBean logBean = new LogBean();// 设置LogBean的各个字段值return logBean;}
});

(二)FlatMap算子

FlatMap算子将数据流中的每个元素转换为零个、一个或多个元素。例如,读取flatmap.log文件中的数据,如“张三,苹果手机,联想电脑,华为平板”,可以转换为“张三有苹果手机”“张三有联想电脑”“张三有华为平板”等。代码演示如下:

DataStream<String> lines = env.readTextFile("./data/input/flatmap.log");
DataStream<String> resultStream = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] items = line.split(",");for (int i = 1; i < items.length; i++) {collector.collect(items[0] + "有" + items[i]);}}
});

(三)Filter算子

Filter算子用于根据指定的条件过滤数据流中的元素。例如,读取a.log文件中的访问日志数据,过滤出访问IP是83.149.9.216的访问日志:

DataStream<String> lines = env.readTextFile("./data/input/a.log");
DataStream<String> filteredStream = lines.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {// 解析line获取IP并判断是否为目标IPString ip = parseIPFromLine(line);return "83.149.9.216".equals(ip);}
});

(四)KeyBy算子

在流处理中,KeyBy算子类似于批处理中的groupBy,用于按照指定的键对数据进行分组。KeySelector对象可以支持元组类型,也可以支持POJO(如EntryJavaBean)。

  • 元组类型
    • 单个字段keyBy:例如,对于一个包含Tuple2<String, Integer>类型的数据流,如果要按照第一个字段(String类型)进行分组,可以这样写:
DataStream<Tuple2<String, Integer>> tupleStream =...;
KeyedStream<Tuple2<String, Integer>, String> keyedStream = tupleStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple) throws Exception {return tuple.f0;}
});
- **多个字段`keyBy`**:类似于SQL中的`group by`多个字段,例如对于`Tuple3<String, Integer, Double>`类型的数据流,按照第一个和第二个字段进行分组:
DataStream<Tuple3<String, Integer, Double>> tuple3Stream =...;
KeyedStream<Tuple3<String, Integer, Double>, Tuple2<String, Integer>> keyedStream = tuple3Stream.keyBy(new KeySelector<Tuple3<String, Integer, Double>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple3<String, Integer, Double> tuple) throws Exception {return Tuple2.of(tuple.f0, tuple.f1);}
});
  • POJO类型
    • 单个字段keyBy:假设User类有idname等字段,要按照id字段进行分组:
DataStream<User> userStream =...;
KeyedStream<User, String> keyedStream = userStream.keyBy(new KeySelector<User, String>() {@Overridepublic String getKey(User user) throws Exception {return user.getId();}
});
- **多个字段`keyBy`**:例如按照`User`类的`id`和`age`字段进行分组:
DataStream<User> userStream =...;
KeyedStream<User, Tuple2<String, Integer>> keyedStream = userStream.keyBy(new KeySelector<User, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(User user) throws Exception {return Tuple2.of(user.getId(), user.getAge());}
});

(五)Reduce算子

Reduce算子可以对一个数据集或一个分组来进行聚合计算,最终聚合成一个元素。例如,读取a.log日志,统计ip地址访问pv数量,使用reduce操作聚合成一个最终结果:

DataStream<String> lines = env.readTextFile("./data/input/a.log");
DataStream<Tuple2<String, Integer>> ipCountStream = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String line) throws Exception {// 解析line获取IP并设置初始计数为1String ip = parseIPFromLine(line);return Tuple2.of(ip, 1);}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple) throws Exception {return tuple.f0;}
}).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}
});

(六)flatMap/map/filter/keyby/reduce综合练习

需求是对流数据中的单词进行统计,排除敏感词“TMD”(腾讯美团滴滴)。首先启动netcat服务端(在Windows上解压相关软件后,在路径中输入cmd,然后启动服务端),客户端双击nc.exe即可。代码示例如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> socketStream = env.socketTextStream("localhost", 8888);
DataStream<String> filteredWords = socketStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {if (!"TMD".equals(word)) {collector.collect(word);}}}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1);}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple) throws Exception {return tuple.f0;}
}).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}
});
filteredWords.print();
env.execute();

三、合并和连接算子

(一)Union算子

Union算子可以合并多个同类型的流,将多个DataStream合并成一个DataStream。需要注意的是,union合并的DataStream的类型必须是一致的,并且union可以取并集,但是不会去重。例如:

DataStream<String> stream1 = env.fromElements("a", "b", "c");
DataStream<String> stream2 = env.fromElements("c", "d", "e");
DataStream<String> unionStream = stream1.union(stream2);

(二)Connect算子

Connect算子可以连接2个不同类型的流(最后需要处理后再输出)。DataStreamDataStream连接后得到ConnectedStreams:连接两个保持它们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何变化(类似“一国两制”),两个流相互独立,作为对比Union后是真的变成一个流了。和union类似,但是connect只能连接两个流,两个流之间的数据类型可以不同,对两个流的数据可以分别应用不同的处理逻辑。例如:

DataStream<Integer> streamA = env.fromElements(1, 2, 3);
DataStream<String> streamB = env.fromElements("a", "b", "c");
ConnectedStreams<Integer, String> connectedStreams = streamA.connect(streamB);

四、Side Outputs侧道输出(侧输出流)

侧输出流可以对流中的数据按照特定规则进行分流。例如,对流中的数据按照奇数和偶数进行分流,并获取分流后的数据。具体实现时,可以在ProcessFunction中使用OutputTag来定义侧输出流,并根据条件将数据发送到不同的侧输出流中。

五、物理分区算子

Flink提供了以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。

(一)Global Partitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。例如:

DataStream<Long> stream =...;
stream.global();

(二)Shuffle Partitioner

根据均匀分布随机划分元素。使用示例:

DataStream<Long> stream =...;
stream.shuffle();

(三)Broadcast Partitioner

发送到下游所有的算子实例,是将上游的所有数据,都给下游的每一个分区一份。例如:

DataStream<Long> stream =...;
stream.broadcast();

(四)Rebalance Partitioner(重分区)

通过循环的方式依次发送到下游的task,用于解决数据倾斜问题(当某一个分区数据量过大时)。可以通过人为制造数据不平衡,然后使用rebalance方法让其平衡,并通过观察每一个分区的总数来观察效果。例如:

DataStream<Long> stream =...;
DataStream<Long> rebalancedStream = stream.rebalance();

(五)Forward Partitioner

发送到下游对应的第一个task,保证上下游算子并行度一致,即上下游算子与下游算子是1:1的关系。在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner。对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。例如:

DataStream<Long> stream =...;
stream.forward();

(六)Custom(自定义)Partitioning

使用用户定义的Partitioner为每个元素选择目标任务。例如:

class CustomPartitioner implements Partitioner<Long>{@Overridepublic int partition(Long key, int numPartitions) {System.out.println(numPartitions);if(key < 10000){return 0;}return 1;}
}
DataStreamSource<Long> streamSource = env.fromSequence(1, 15000);
DataStream<Long> dataStream = streamSource.partitionCustom(new CustomPartitioner(), new KeySelector<Long, Long>() {@Overridepublic Long getKey(Long value) throws Exception {return value;}
});

六、总结

Flink的转换算子为数据流的处理提供了丰富而强大的功能。通过合理地组合和运用这些算子,可以构建出复杂而高效的数据流处理逻辑,以满足各种大数据处理场景下的业务需求。在实际应用中,需要根据数据的特点、业务逻辑以及性能要求等因素,灵活选择和配置合适的转换算子,从而充分发挥Flink在大数据处理领域的优势。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61745.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Easyexcel(5-自定义列宽)

相关文章链接 Easyexcel&#xff08;1-注解使用&#xff09;Easyexcel&#xff08;2-文件读取&#xff09;Easyexcel&#xff08;3-文件导出&#xff09;Easyexcel&#xff08;4-模板文件&#xff09;Easyexcel&#xff08;5-自定义列宽&#xff09; 注解 ColumnWidth Data…

C++ 中的模板特化和偏特化

C中的模板特化和偏特化是C模板编程中的两种重要技术&#xff0c;用于在特定情况下提供更优化的代码实现。‌ 模板特化 模板特化是指在模板参数为特定类型时&#xff0c;提供一种特定的实现方式。模板特化分为‌函数模板特化‌和‌类模板特化‌。 函数模板特化‌&#xff1a; …

短信发送业务

Override public void sendCode(String phone) {// 通过正则判断手机号的合法性if (!phone.matches("^1[3-9]\\d{9}$")) {throw new RuntimeException("请输入合法的手机号");}// 判断3次// String.format("code:%s", phone)String numKey Stri…

1+X应急响应(网络)文件包含漏洞:

常见网络攻击-文件包含漏洞&命令执行漏洞&#xff1a; 文件包含漏洞简介&#xff1a; 分析漏洞产生的原因&#xff1a; 四个函数&#xff1a; 产生漏洞的原因&#xff1a; 漏洞利用条件&#xff1a; 文件包含&#xff1a; 漏洞分类&#xff1a; 本地文件包含&#xff1a; …

深入实践 Shell 脚本编程:高效自动化操作指南

一、什么是 Shell 脚本&#xff1f; Shell 脚本是一种用 Shell 编写的脚本程序&#xff0c;用于执行一系列的命令。它是 Linux/Unix 系统中自动化管理任务的利器&#xff0c;能够显著提升工作效率&#xff0c;特别适合批量处理文件、监控系统状态、自动部署等任务。 二、Shell…

HTML5 SVG

HTML5 SVG SVG(Scalable Vector Graphics)是一种基于XML的图像格式,用于在网页上创建矢量图形。与传统的位图图像(如PNG和JPEG)不同,SVG图像可以无限放大而不失真,因为它们是由直线和曲线定义的数学路径,而不是像素点。HTML5支持直接在网页中嵌入SVG,使得网页设计更加…

Flutter:SlideTransition位移动画,Interval动画延迟

配置vsync&#xff0c;需要实现一下with SingleTickerProviderStateMixinclass _MyHomePageState extends State<MyHomePage> with SingleTickerProviderStateMixin{// 定义 AnimationControllerlate AnimationController _controller;overridevoid initState() {super.…

gitlab:使用脚本批量下载项目,实现全项目检索

目的 当需要知道gitlab中所有项目是否存在某段代码时&#xff0c;gitlab免费版只提供了当个项目内的检索&#xff0c;当项目过多时一个个查太过繁琐。下面通过 GitLab API 将指定 Group 下的所有项目克隆到本地。此脚本会自动获取项目列表并逐一克隆它们&#xff0c;再在本地进…

【Android】android compat理解

1&#xff0c;前提 即便是在同一手机上安装的不同apk&#xff0c;其编译的apk不同&#xff0c;也会导致行为上的差异。如SDK34有限制后台启动&#xff0c;但如果安装的apk所依赖的sdk是33&#xff0c;则不会表现出此差异。这是如何实现的呢&#xff1f;其实&#xff0c;本质是…

使用php和Xunsearch提升音乐网站的歌曲搜索效果

文章精选推荐 1 JetBrains Ai assistant 编程工具让你的工作效率翻倍 2 Extra Icons&#xff1a;JetBrains IDE的图标增强神器 3 IDEA插件推荐-SequenceDiagram&#xff0c;自动生成时序图 4 BashSupport Pro 这个ides插件主要是用来干嘛的 &#xff1f; 5 IDEA必装的插件&…

电脑自动关机时间如何定?Wise Auto Shutdown 设置关机教程

在日常使用电脑的过程中&#xff0c;有时我们需要让电脑在特定的时间自动关机&#xff0c;比如在下载大文件完成后、执行长时间的任务结束时&#xff0c;或者只是单纯想在某个预定时间让电脑自动关闭以节省能源。这时候&#xff0c;Wise Auto Shutdown 这款软件就能派上大用场了…

微信小程序被攻击怎么选择高防产品

家人们&#xff0c;微信小程序被攻击了&#xff01;这事儿可不小。你想想&#xff0c;咱们平时用小程序点外卖、购物、玩游戏&#xff0c;现在却可能面临信息泄露风险。卡顿、闪退都算轻的&#xff0c;关键是咱的账号安全、个人数据&#xff0c;就像在“裸奔”。小程序本是方便…

k8s上面的Redis集群链接不上master的解决办法

问题描述 之前在k8s上面部署了一台node&#xff0c;然后创建了6个redis的pod&#xff0c;构建了一个redis的集群&#xff0c;正常运行。 最近添加了一台slave node&#xff0c;然后把其中的几个redis的pod调度到了slave node上面&#xff0c;结果集群就起不来了&#xff0c;…

什么是 C++ 中的智能指针?有哪些类型的智能指针?

智能指针的定义 在 C 中&#xff0c;智能指针是一种类模板&#xff0c;用于管理动态分配的内存。它的主要目的是自动管理内存的生命周期&#xff0c;避免手动释放内存时可能出现的错误&#xff0c;如内存泄漏&#xff08;忘记释放内存&#xff09;和悬空指针&#xff08;访问已…

Oracle热备过程中对数据库崩溃的处理方法

引言 在热备过程中如果发生数据库崩溃、断电等情况该如何处理? 如果正在备份 users 表空间的数据文件过程中,此时的数据文件表头 SCN 会被锁定,此时正在复制数据文件时数据库崩溃,系统断电。 从而导致数据文件表头与控制文件中的不一致,导致数据库无法打开,会要求介质恢…

Python操作neo4j库py2neo使用之创建和查询(二)

Python操作neo4j库py2neo使用之创建和查询&#xff08;二&#xff09; py2neo 创建操作 1、连接数据库 from py2neo import Graph graph Graph("bolt://100.100.20.55:7687", auth(user, pwd), nameneo4j)2、创建Node from py2neo import Node, Subgraph # 创建…

Elasticsearch面试内容整理-高级特性

Elasticsearch 提供了一系列高级特性,这些特性可以极大地增强其搜索、分析和管理能力,使得它在大数据场景中表现出色。以下是 Elasticsearch 的一些重要高级特性: 近实时搜索(Near Real-Time Search) Elasticsearch 的一个关键特性是 近实时搜索(NRT),这意味着数据写入…

算法专题十一: 基础递归

目录 1. 汉诺塔2. 合并两个有序链表3. 反转链表4. 两两交换链表中的节点5. Pow(x, n) 1. 汉诺塔 题目链接&#xff1a; Leetcode汉诺塔 算法原理&#xff1a; 递归&#xff1a;宏观视角理解递归 本道题为什么能用递归&#xff1f; 让我们逐一分析 首先思考我们如何来解决汉诺…

Cmakelist.txt之win-c-udp-client

1.cmakelist.txt cmake_minimum_required(VERSION 3.16) ​ project(c_udp_client LANGUAGES C) ​ add_executable(c_udp_client main.c) ​ target_link_libraries(c_udp_client wsock32) ​ ​ include(GNUInstallDirs) install(TARGETS c_udp_clientLIBRARY DESTINATION $…

Git错误:gnutls_handshake() failed: The TLS connection was non-properly terminated

最终我通过这个博客解决了问题&#xff1a;解决Git错误&#xff1a;gnutls_handshake() failed: The TLS connection was non-properly terminated 解决方案 1. 检查网络连接 首先&#xff0c;确保你的网络连接是稳定的。尝试访问其他HTTPS网站或服务&#xff0c;以排除网络问…