Flink基本转换算子map/filter/flatmap

map

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
在这里插入图片描述
我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。

public class TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_2", 2, 2));// 方式一:传入匿名类,实现MapFunctionstream.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}).print();// 方式二:传入MapFunction的实现类// stream.map(new UserMap()).print();env.execute();}public static class UserMap implements MapFunction<WaterSensor, String> {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}
}

面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。

Filter

filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
在这里插入图片描述
进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。

public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));// 方式一:传入匿名类实现FilterFunctionstream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}).print();// 方式二:传入FilterFunction实现类// stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}

FlatMap

flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
在这里插入图片描述
案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。

public class TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));stream.flatMap(new MyFlatMap()).print();env.execute();}/*** TODO flatmap: 一进多出(包含0出)*      对于s1的数据,一进一出*      对于s2的数据,一进2出*      对于s3的数据,一进0出(类似于过滤的效果)**    map怎么控制一进一出:*      =》 使用 return**    flatmap怎么控制的一进多出*      =》 通过 Collector来输出, 调用几次就输出几条***/public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {if (value.id.equals("sensor_1")) {out.collect(String.valueOf(value.vc));} else if (value.id.equals("sensor_2")) {out.collect(String.valueOf(value.ts));out.collect(String.valueOf(value.vc));}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/212296.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

案例026:基于微信小程序的原创音乐系统的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

什么是Restful?

Rest简介 REST是英文representational state transfer(表象性状态转变)或者表述性状态转移。Rest是web服务的一种架构风格。使用HTTP,URI,XML,JSON,HTML等广泛流行的标准和协议。轻量级,跨平台,跨语言的架构设计。它是一种设计风格,不是一种标准,是一种思想。 Rest架构的主要…

java程序定时器

目录 1.java定时器原生方法 1.java定时器原生方法 实现每天早上8点执行任务的示例代码 import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class TimeTest{pub…

汽车网络安全--关于UN R155认证的思考

1.UN R155概述 2020年6月25日,联合国颁布了全球首个汽车网络安全强制性法规 -- UN 155,详细规定了关于评估网络安全措施的审核条款、制造商和供应商降低网络安全风险的方法以及实施风险评估的义务等。 法规适用于与信息安全相关的M类(4轮及以上载客汽车)、N类(四轮载货汽车)…

SpringBoot项目连接Graylog

直接用logback将控制台输出的日志发送到graylog上 1.导入logback依赖 <dependency> <groupId>de.siegmar</groupId> <artifactId>logback-gelf</artifactId> <version>1.1.0</version> </dependency> 2.创建logback-spring.x…

浅谈低代码

低代码开发是近年来迅速崛起的软件开发方法&#xff0c;让编写应用程序变得更快、更简单。有人说它是美味的膳食&#xff0c;让开发过程高效而满足&#xff0c;但也有人质疑它是垃圾食品&#xff0c;缺乏定制性与深度。你认为低代码到底是美以下方向仅供参考。味的膳食还是垃圾…

SpringBoot - 四种常见定时器

常见实现方案 Scheduled注解&#xff1a;基于注解Timer().schedule创建任务&#xff1a;基于封装类Timer线程&#xff1a;使用线程直接执行任务即可&#xff0c;可以与thread、线程池、ScheduleTask等配合使用quartz配置定时器&#xff1a;基于spring的quartz框架 Scheduled注…

golang学习笔记——编写最简单的命令行工具

编写最简单的命令行工具 用户输入bufio 使用go语言编写最简单的命令行工具 mkdir hello-cli-demo cd hello-cli-demo # 查看环境变量 go envgo mod初始化 go mod init gitcode.com/m打开vscode&#xff0c;创建main.go package mainimport ("fmt""bufio&qu…

RK3568 CIF和ISP的关联

1. 引言 在本文档中&#xff0c;我们将介绍RK3568芯片的CIF&#xff08;Camera Interface&#xff09;和ISP&#xff08;Image Signal Processor&#xff09;模块。这两个模块是RK3568芯片的关键组成部分&#xff0c;用于图像采集和处理。 CIF是一个标准接口&#xff0c;用于…

快速测试 3节点的redis sentinel集群宕机2个节点以后是否仍能正常使用

有同事问我&#xff0c;三个redis sentinel节点&#xff0c;宕机两个节点以后&#xff0c;是否还能够正常的通过redis sentinel正常访问redis的数据。我想了想&#xff0c;理论上是可以的&#xff0c;但是我没试过&#xff0c;今天有时间就测试了一下。搭建环境和测试代码的过程…

Java并发(十七)----变量的线程安全分析

1、成员变量和静态变量是否线程安全 如果它们没有共享&#xff0c;则线程安全 如果它们被共享了&#xff0c;根据它们的状态是否能够改变&#xff0c;又分两种情况 如果只有读操作&#xff0c;则线程安全 如果有读写操作&#xff0c;则这段代码是临界区&#xff0c;需要考虑线…

深入了解Python pydash库

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在数据处理和分析领域&#xff0c;Python一直是一种强大的编程语言。然而&#xff0c;在处理大规模数据集和执行复杂操作时&#xff0c;有时候需要更高效的工具。在本文中&#xff0c;我们将深入探讨pydash库&am…

语义分割 简介及数据集简介

参考文章 MS COCO数据集介绍以及pycocotools简单使用-CSDN博客

[MySQL--进阶篇]存储引擎的体系结构、简介、特点、选择

前言 ⭐Hello!这里是欧_aita的博客。 ⭐今日语录&#xff1a;不要在乎别人怎么看你&#xff0c;因为他们根本就没有时间&#xff0c;他们只关心他们自己。 ⭐个人主页&#xff1a;欧_aita ψ(._. )>⭐个人专栏&#xff1a; 数据结构与算法 MySQL数据库 存储引擎 前言MySQL体…

代码随想录算法训练营第四十一天|343. 整数拆分、96.不同的二叉搜索树

代码随想录算法训练营第四十一天|343. 整数拆分、96.不同的二叉搜索树 整数拆分 343. 整数拆分 文章讲解&#xff1a;https://programmercarl.com/0343.%E6%95%B4%E6%95%B0%E6%8B%86%E5%88%86.html 题目链接&#xff1a;https://leetcode.cn/problems/integer-break/ 视频讲解…

李宏毅gpt个人记录

参考&#xff1a; 李宏毅机器学习--self-supervised&#xff1a;BERT、GPT、Auto-encoder-CSDN博客 用无标注资料的任务训练完模型以后&#xff0c;它本身没有什么用&#xff0c;GPT 1只能够把一句话补完&#xff0c;可以把 Self-Supervised Learning 的 Model做微微的调整&am…

32.768KHz时钟RTC晶振精度PPM值及频差计算

一个数字电路就像一所城市的交通&#xff0c;晶振的作用就是十字路口的信号灯&#xff0c;因此晶振的品质及其电路应用尤其关键。数字电路又像生命体&#xff0c;它的运行就像人身体里的血液流通&#xff0c;它不是由单一的某个器件或器件单元构成&#xff0c;而是由多个器件及…

【Spring Boot 源码学习】ApplicationListener 详解

Spring Boot 源码学习系列 ApplicationListener 详解 引言往期内容主要内容1. 初识 ApplicationListener2. 加载 ApplicationListener3. 响应应用程序事件 总结 引言 书接前文《初识 SpringApplication》&#xff0c;我们从 Spring Boot 的启动类 SpringApplication 上入手&am…

如何查询川菜食材配料的API接口

在当今的美食文化中&#xff0c;菜谱不只是一张简单的食谱&#xff0c;更是了解美食文化和饮食知识的重要途径。然而&#xff0c;若没有准确的食材配料&#xff0c;烹制出的每道菜品都将难以达到完美的味道。因此&#xff0c;为了更好地满足人们对于菜谱和食谱的需求&#xff0…

C语言习题集(026)

//写一个函数&#xff0c;输入一个4位数字&#xff0c;要求输出这4个 //数字字符&#xff0c;但每两个数字间空一个空格。如输入 //1990&#xff0c;应输出"1 9 9 0"。 /* */ //解答&#xff1a; #include<stdio.h> void change(int a) { if(a/10!0) { chang…