【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

  • 1)函数类(Function Classes)
  • 2)富函数类(Rich Function Classes)

用户自定义函数(user-defined functionUDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类匿名函数富函数类

1)函数类(Function Classes)

Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunctionFilterFunctionReduceFunction 等。所以用户可以自定义一个函数类,实现对应的接口。

需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

方式一:实现 FilterFunction 接口

public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));DataStream<String> filter = stream.filter(new UserFilter());filter.print();env.execute();}public static class UserFilter implementsFilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}

方式二:通过匿名类来实现 FilterFunction 接口

DataStream<String> stream = stream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}});

方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));DataStream<String> stream = stream.filter(newFilterFunctionImpl("sensor_1"));public static class FilterFunctionImpl implementsFilterFunction<WaterSensor> {private String id;FilterFunctionImpl(String id) {this.id = id;}@Overridepublic boolean filter(WaterSensor value) throws Exception {return thid.id.equals(value.id);}}}

方式三:采用匿名函数(Lambda)

public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));
//map 函数使用 Lambda 表达式,不需要进行类型声明SingleOutputStreamOperator<String> filter =stream.filter(sensor -> "sensor_1".equals(sensor.id));filter.print();env.execute();}
}

2)富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版 本 。 富函数类一般是以抽象类的形式出现的。例如:RichMapFunctionRichFilterFunctionRichReduceFunction 等。

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open() 方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map() 或者 filter() 方法被调用之前,open() 会首先被调用。

  • close() 方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

来看一个例子说明:

public class RichFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1, 2, 3, 4).map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic Integer map(Integer integer) throwsException {return integer + 1;}@Overridepublic void close() throws Exception {super.close();System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/641615.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧工厂视频监控平台EasyCVR公网收流后内网设备无法播放是什么原因?

安防视频监控平台EasyCVR采用了开放式的网络结构&#xff0c;支持高清视频的接入和传输、分发&#xff0c;平台提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、云存储等丰富的视频能力&#xff0c;此外&am…

如何在科技创新中发挥国有企业的战略支撑作用?

要在科技创新中发挥国有企业的战略支撑作用&#xff0c;需要采取以下措施&#xff1a; 1. 强化国有企业创新主体地位&#xff1a;鼓励和支持国有企业加强技术创新、产品创新、组织创新和市场创新&#xff0c;提高自主创新能力。政府可以给予国有企业一定的政策和资金支持&…

机器学习分类模型评价指标总结(准确率、精确率、召回率、Fmax、TPR、FPR、ROC曲线、PR曲线,AUC,AUPR)

为了看懂论文&#xff0c;不得不先学一些预备知识&#xff08;&#xff08;55555 主要概念 解释见图 TP、FP、TN、FN 准确率、精确率&#xff08;查准率&#xff09;、召回率&#xff08;查全率&#xff09; 真阳性率TPR、伪阳性率FPR F1-score2TP/(2*TPFPFN) 最大响应分…

netty源码:(46) TailContext

TailContext是DefaultChannelPipeline中的一个内部类&#xff0c;它是一个ChannelInboundHandler TailContext在我们所添加的自己定义的所有ChannelInboundHandler(比如通过addLast方法&#xff09;之后&#xff0c;是整个入栈消息处理的最后一环&#xff0c;也就是tail. 它的…

Django 手把手教你搭建MYSQL多数据源 实现读写分离

目录 一、创建3.2版本的Django项目 二、配置MYSQL多数据源 三、实现读写分离 一、创建3.2版本的Django项目 第一步&#xff1a;创建虚拟环境 第二步&#xff1a;打开终端安装django pip install django3.2 第三步&#xff1a;创建项目 django-admin startproject django_…

windows用msvc编译opencv、opencv-python、opencv_contrib、cuda

如要用mingw编译opencv&#xff0c;参考我另外一篇文章https://blog.csdn.net/weixin_44733606/article/details/135741806。 如要用Ubuntu编译opencv&#xff0c;参考我另外一篇文章https://blog.csdn.net/weixin_44733606/article/details/131720128。 一、安装VS2022&…

实现组件动画的时候报这个错误

vue3 Component inside &#xff1c;Transition&#xff1e; renders non-element root node that cannot be animated. 然后怎么解决的,在template元素里面放了多个节点所以报这个错误,我们先写一个div里面写就可以解决 <template><div><el-card>123</…

夜视成像应用激光照明方法

在夜视成像领域&#xff0c;激光照明的使用主要集中在提高成像质量和远距离观察上。 以下是几种用于夜视成像的激光照明方法&#xff1a; 直接激光照明&#xff1a; 这种方法涉及直接用激光光束照射目标。激光器发出的光束可以是可见光或红外光&#xff0c;具体取决于应用需求和…

elastic search入门

参考1&#xff1a;Elastic Search 入门 - 知乎 参考2&#xff1a;Ubuntu上安装ElasticSearch_ubuntu elasticsearch-CSDN博客 1、ElasticSearch安装 1.1安装JDK&#xff0c;省略&#xff0c;之前已安装过 1.2创建ES用户 创建用户&#xff1a;sudo useradd esuser 设置密码&…

仿真机器人-深度学习CV和激光雷达感知(项目2)day03【机器人简介与ROS基础】

文章目录 前言机器人简介机器人应用与前景机器人形态机器人的构成 ROS基础ROS的作用和特点ROS的运行机制ROS常用命令 前言 &#x1f4ab;你好&#xff0c;我是辰chen&#xff0c;本文旨在准备考研复试或就业 &#x1f4ab;本文内容是我为复试准备的第二个项目 &#x1f4ab;欢迎…

java实现时间轮算法

Main demo测试代码 public class Main {static int inCount 0;static int runCount 0;public static void main(String[] args) {CountDownLatch countDownLatch new CountDownLatch(1000);Timer timer new Timer();for(int i1;i<1000;i){TimerTask timerTask new Ti…

【vue3】GSAP在vue中的使用

一、获取GSAP npm install gsap 二、开始GSAP 导入GSAP&#xff0c;如果需要导入gsap的插件可以参考这里。 import gasp from gsap; 这里用的是选项式&#xff0c;在methods属性中创建一个方法用来写gsap的动画。 gasp_animation(){let tl gasp.timeline({defaults:{ ease:&…

【zlm】针对单个设备的码率的设置

目录 代码修改 实验数据一 实验数据二 同时拉一路视频后 修改记录 使用方法 代码修改 要被子类引用 &#xff0c;所以放在protected 不能放private 下面的结论&#xff0c;可以在下面的实验数据里引用。“同时拉一路视频后” 实验数据一 https://10.60.3.45:10443/index…

【学习】双线性插值

双线性插值公式 对于一个目的像素&#xff0c;设置坐标通过反向变换得到的浮点坐标为(iu,jv) (其中i、j均为浮点坐标的整数部分&#xff0c;u、v为浮点坐标的小数部分&#xff0c;是取值[0,1)区间的浮点数)&#xff0c;则这个像素得值 f(iu,jv) 可由原图像中坐标为 (i,j)、(i1…

php学习

php基础语法 一 php程序 1.php标记 开始标记<?php 和结束标记 ?>中间写 PHP 代码 当解析一个文件时&#xff0c;PHP 会寻找起始和结束标记&#xff0c;也就是告诉php 开始和停止解析二者之间的代码。此种解析方式使得PHP 可以被嵌入到各种不同的文档中去&#xff…

使用PSIM软件生成DSP28335流水灯程序

最近在学习DSP28335芯片&#xff0c;然后在使用PSIM仿真软件时发现这个仿真软件也支持28335芯片&#xff0c;于是就想学习下如何在PSIM软件中使用DSP28335芯片。在PSIM自带的官方示例中有使用DSP28335芯片的相关例子。 工程下载链接 https://download.csdn.net/download/qq_20…

2401llvm,clang插件

Clang插件 在编译时,Clang插件可运行额外的用户定义操作. 介绍 Clang插件在代码上运行FrontendActions.见FrontendAction教程,了解如何使用RecursiveASTVisitor编写FrontendAction. 这里,演示如何编写简单的clang插件. 编写PluginASTAction插件 与编写普通FrontendActions…

图解CART分类树评估器的参数

图解CART分类树评估器的参数

联邦学习:密码学 + 机器学习 + 分布式 实现隐私计算,破解医学界数据孤岛的长期难题

联邦学习&#xff1a;密码学 机器学习 分布式 提出背景&#xff1a;数据不出本地&#xff0c;又能合力干大事联邦学习的问题 分布式机器学习&#xff1a;解决大数据量处理的问题横向联邦学习&#xff1a;解决跨多个数据源学习的问题纵向联邦学习&#xff1a;解决数据分散在多…

(一)SpringBoot3---尚硅谷总结

目录 示例Demo&#xff1a; 1、我们先来创建一个空工程&#xff1a; 2、我们通过Maven来创建一个Module&#xff1a; 3、让此Maven项目继承父项目: 4、导入web开发的场景启动器 5、创建Springboot项目的主入口程序&#xff1a; 6、举例测试&#xff1a; 7、Springboot还能…