Flink基本转换算子map/filter/flatmap

map

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
在这里插入图片描述
我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。

public class TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_2", 2, 2));// 方式一:传入匿名类,实现MapFunctionstream.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}).print();// 方式二:传入MapFunction的实现类// stream.map(new UserMap()).print();env.execute();}public static class UserMap implements MapFunction<WaterSensor, String> {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}
}

面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。

Filter

filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
在这里插入图片描述
进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。

public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));// 方式一:传入匿名类实现FilterFunctionstream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}).print();// 方式二:传入FilterFunction实现类// stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}

FlatMap

flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
在这里插入图片描述
案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。

public class TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));stream.flatMap(new MyFlatMap()).print();env.execute();}/*** TODO flatmap: 一进多出(包含0出)*      对于s1的数据,一进一出*      对于s2的数据,一进2出*      对于s3的数据,一进0出(类似于过滤的效果)**    map怎么控制一进一出:*      =》 使用 return**    flatmap怎么控制的一进多出*      =》 通过 Collector来输出, 调用几次就输出几条***/public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {if (value.id.equals("sensor_1")) {out.collect(String.valueOf(value.vc));} else if (value.id.equals("sensor_2")) {out.collect(String.valueOf(value.ts));out.collect(String.valueOf(value.vc));}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/212296.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

案例026:基于微信小程序的原创音乐系统的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

汽车网络安全--关于UN R155认证的思考

1.UN R155概述 2020年6月25日,联合国颁布了全球首个汽车网络安全强制性法规 -- UN 155,详细规定了关于评估网络安全措施的审核条款、制造商和供应商降低网络安全风险的方法以及实施风险评估的义务等。 法规适用于与信息安全相关的M类(4轮及以上载客汽车)、N类(四轮载货汽车)…

SpringBoot项目连接Graylog

直接用logback将控制台输出的日志发送到graylog上 1.导入logback依赖 <dependency> <groupId>de.siegmar</groupId> <artifactId>logback-gelf</artifactId> <version>1.1.0</version> </dependency> 2.创建logback-spring.x…

golang学习笔记——编写最简单的命令行工具

编写最简单的命令行工具 用户输入bufio 使用go语言编写最简单的命令行工具 mkdir hello-cli-demo cd hello-cli-demo # 查看环境变量 go envgo mod初始化 go mod init gitcode.com/m打开vscode&#xff0c;创建main.go package mainimport ("fmt""bufio&qu…

快速测试 3节点的redis sentinel集群宕机2个节点以后是否仍能正常使用

有同事问我&#xff0c;三个redis sentinel节点&#xff0c;宕机两个节点以后&#xff0c;是否还能够正常的通过redis sentinel正常访问redis的数据。我想了想&#xff0c;理论上是可以的&#xff0c;但是我没试过&#xff0c;今天有时间就测试了一下。搭建环境和测试代码的过程…

Java并发(十七)----变量的线程安全分析

1、成员变量和静态变量是否线程安全 如果它们没有共享&#xff0c;则线程安全 如果它们被共享了&#xff0c;根据它们的状态是否能够改变&#xff0c;又分两种情况 如果只有读操作&#xff0c;则线程安全 如果有读写操作&#xff0c;则这段代码是临界区&#xff0c;需要考虑线…

深入了解Python pydash库

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在数据处理和分析领域&#xff0c;Python一直是一种强大的编程语言。然而&#xff0c;在处理大规模数据集和执行复杂操作时&#xff0c;有时候需要更高效的工具。在本文中&#xff0c;我们将深入探讨pydash库&am…

语义分割 简介及数据集简介

参考文章 MS COCO数据集介绍以及pycocotools简单使用-CSDN博客

[MySQL--进阶篇]存储引擎的体系结构、简介、特点、选择

前言 ⭐Hello!这里是欧_aita的博客。 ⭐今日语录&#xff1a;不要在乎别人怎么看你&#xff0c;因为他们根本就没有时间&#xff0c;他们只关心他们自己。 ⭐个人主页&#xff1a;欧_aita ψ(._. )>⭐个人专栏&#xff1a; 数据结构与算法 MySQL数据库 存储引擎 前言MySQL体…

李宏毅gpt个人记录

参考&#xff1a; 李宏毅机器学习--self-supervised&#xff1a;BERT、GPT、Auto-encoder-CSDN博客 用无标注资料的任务训练完模型以后&#xff0c;它本身没有什么用&#xff0c;GPT 1只能够把一句话补完&#xff0c;可以把 Self-Supervised Learning 的 Model做微微的调整&am…

32.768KHz时钟RTC晶振精度PPM值及频差计算

一个数字电路就像一所城市的交通&#xff0c;晶振的作用就是十字路口的信号灯&#xff0c;因此晶振的品质及其电路应用尤其关键。数字电路又像生命体&#xff0c;它的运行就像人身体里的血液流通&#xff0c;它不是由单一的某个器件或器件单元构成&#xff0c;而是由多个器件及…

【Spring Boot 源码学习】ApplicationListener 详解

Spring Boot 源码学习系列 ApplicationListener 详解 引言往期内容主要内容1. 初识 ApplicationListener2. 加载 ApplicationListener3. 响应应用程序事件 总结 引言 书接前文《初识 SpringApplication》&#xff0c;我们从 Spring Boot 的启动类 SpringApplication 上入手&am…

如何查询川菜食材配料的API接口

在当今的美食文化中&#xff0c;菜谱不只是一张简单的食谱&#xff0c;更是了解美食文化和饮食知识的重要途径。然而&#xff0c;若没有准确的食材配料&#xff0c;烹制出的每道菜品都将难以达到完美的味道。因此&#xff0c;为了更好地满足人们对于菜谱和食谱的需求&#xff0…

linux权限管理以及shell

1.shell 1.1什么是shell? shell即外壳&#xff0c;是运行在linux系统上的一个脚本语言&#xff0c;包裹在linux内核的外面。我们常说的linux操作系统实际上是linux内核。我们使用的所有指令都是一个个程序&#xff0c;而shell指令就是一个将我们用户的操作翻译给linux内核的程…

软件设计之组合模式

组合模式&#xff1a;将对象组合成树形结构。 案例&#xff1a;公司管理。一个公司可以分总公司和分公司&#xff0c;无论是总公司还是分公司都有自己的部门&#xff0c;如人力资源管理部门、财务部门。分公司可以建立自己在不同地域的办事处。请使用组合模式打印出某个公司的…

SpringSecurity6 | 登陆后的跳转

SpringSecurity6 | 自定义认证规则 ✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; Ja…

第九天:信息打点-CDN绕过篇amp;漏洞回链amp;接口探针amp;全网扫描amp;反向邮件

信息打点-CDN绕过篇 cdn绕过文章&#xff1a;https://www.cnblogs.com/qiudabai/p/9763739.html 一、CDN-知识点 1、常见访问过程 1、没有CDN情况下传统访问&#xff1a;用户访问域名-解析服务器IP–>访问目标主机 2.普通CDN&#xff1a;用户访问域名–>CDN节点–>…

网络层重点协议——IP协议详解

✏️✏️✏️今天给大家分享的是网络层的重点协议——IP协议。 清风的CSDN博客 &#x1f6e9;️&#x1f6e9;️&#x1f6e9;️希望我的文章能对你有所帮助&#xff0c;有不足的地方还请各位看官多多指教&#xff0c;大家一起学习交流&#xff01; ✈️✈️✈️动动你们发财的…

阿里内部教程Jmeter 性能测试常用图表、服务器资源监控

性能测试常用图表 插件安装 步骤 1&#xff1a;安装插件管理器 在 Jmeter 官网上下载插件管理器 Plugins-manager-1.3.jar将 jar 包放入到 lib\ext 目录下重启 Jmeter&#xff0c;可以在选项下看到 Plugins Manager 选项 步骤 2&#xff1a;安装指定的插件 打开 Plugins Ma…

JVM虚拟机系统性学习-运行时数据区(堆)

运行时数据区 JVM 由三部分组成&#xff1a;类加载系统、运行时数据区、执行引擎 下边讲一下运行时数据区中的构成 根据线程的使用情况分为两类&#xff1a; 线程独享&#xff08;此区域不需要垃圾回收&#xff09; 虚拟机栈、本地方法栈、程序计数器 线程共享&#xff08;数…