FlinkAPI开发之自定义函数UDF

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类

函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:

方式一:通过匿名类来实现FilterFunction接口:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 实现自定义接口FilterFunctionDataStream<Orders> streamOperator = ordersDataStreamSource.filter(new FilterFunction<Orders>() {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}});streamOperator.print();environment.execute();}
}

在这里插入图片描述

方式二: 实现FilterFunction接口

import com.zxl.bean.Orders;
import org.apache.flink.api.common.functions.FilterFunction;public class OrderFilter implements FilterFunction<Orders> {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}
}
import com.zxl.Functions.OrderFilter;
import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 返回类型记得修改为 DataStreamDataStream<Orders> operator = ordersDataStreamSource.filter(new OrderFilter());operator.print();environment.execute();}
}

在这里插入图片描述

方式三:采用匿名函数(Lambda)

//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明DataStream<Orders> streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);streamOperator.print();environment.execute();

在这里插入图片描述

富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());ordersDataStreamSource.print();// TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型DataStream<String> operator = ordersDataStreamSource.map(new RichMapFunction<Orders, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic String map(Orders orders) throws Exception {return orders.getOrder_date().toString()+"字符串";}@Overridepublic void close() throws Exception {super.close();System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}});operator.print();environment.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610366.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot的毕业生实习与就业管理系统(系统+数据库+文档)

&#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目 希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;一、绪论 1. 研究背景 现在大家…

私域流量转化差,这些问题你都解决了吗?

一、流量不精准 这是一个常见而又经常被忽视的问题。许多企业在私域运营中面临转化率低下的问题&#xff0c;但有没有想过&#xff0c;这可能只是因为你吸引的流量与你的产品不匹配&#xff1f; 从公域引流到私域&#xff0c;数量并非唯一关键&#xff0c;精准度更是重中之重…

【计算机组成原理】计算机系统概论 常用计算机术语英文缩写汇总

集成电路 小规模集成电路 &#xff08;SSIC&#xff09;&#xff1a;Small Scale Integrated Circuits中规模集成电路 &#xff08;MSIC&#xff09;&#xff1a;Medium Scale Integrated Circuits大规模集成电路 &#xff08;LSIC&#xff09;&#xff1a;Large Scale Integr…

【Vue3+Ts项目】硅谷甄选 — 品牌管理+平台属性管理+SPU管理+SKU管理

一、品牌管理模块 1.1 静态模块搭建 使用到element-plus的card、button、table、pagination等组件&#xff1a;src/views/product/trademark/index.vue <template><el-card><!-- 卡片顶部添加品牌按钮 --><el-button type"primary" size&quo…

第十九章 调用Callout Library函数 - 将 $ZF(-5) 与多个库和许多函数调用一起使用

文章目录 第十九章 调用Callout Library函数 - 将 $ZF(-5) 与多个库和许多函数调用一起使用将 $ZF(-5) 与多个库和许多函数调用一起使用 第十九章 调用Callout Library函数 - 将 $ZF(-5) 与多个库和许多函数调用一起使用 将 $ZF(-5) 与多个库和许多函数调用一起使用 Method G…

数字IC后端实现之Innovus TA-152错误解析(分频generated clock定义错误)

**ERROR: (TA-152): A latency path from the ‘Fall’ edge of the master clock at source pin… Error Code TA-152 在数字IC后端实现innovus中我们经常会看到这类Error&#xff0c;具体信息如下所示。 Error Message **ERROR: (TA-152): A latency path from the ‘Fa…

Goby高级食用指南

Goby高级食用指南 1.Goby POC2.自定义字典3.Goby插件生态 - 一些好用的插件分享FOFASubDomainsBruteExportCsvAWVSRedis-cliGoby4waf初级篇参考 - Goby基本使用 1.Goby POC Goby的漏洞模块包含官方自定义的一些初始POC: 红队版的POC会实时更新,普通版则不会 Goby的POC编写…

沈阳数字孪生赋能工业智能制造,助力制造业企业数字化转型

沈阳数字孪生赋能工业智能制造&#xff0c;助力制造业企业数字化转型。在数字经济时代&#xff0c;数字孪生作为实现各行各业智能化、数字化的重要手段之一&#xff0c;受到了各方的广泛重视。随着各项关键使能技术的不断发展&#xff0c;数字孪生的应用价值有望得到进一步释放…

GitLab clone 地址不对的解决办法

1丶问题描述 2丶解决方案 解决方案&#xff1a; 找到挂载到宿主机配置文件&#xff1a;gitlab.rb vi gitlab.rb 改成自己的ip 重启容器 docker restart gitlab 如果发现容器一直重启&#xff0c;可采用粗暴的方法&#xff0c;直接干掉当前容器&#xff0c;重新运行一个 …

一键与图片对话!LLM实现图片关键信息提取与交互

本期文心开发者说邀请到飞桨开发者技术专家徐嘉祁&#xff0c;主要介绍了如何通过小模型与大模型的结合&#xff0c;解决数据分析中的问题。 项目背景 在智能涌现的大模型时代&#xff0c;越来越多的企业和研究机构开始探索如何利用大模型来提升工作效率&#xff0c;助力业务智…

企业数字化转型指南,12步实现企业转型之路

引言 在这个数字化时代&#xff0c;企业面临着前所未有的机遇和挑战。随着科技的飞速发展和市场竞争的加剧&#xff0c;传统商业模式正在经历翻天覆地的变革。数字化转型&#xff0c;已经不再只是一种选择&#xff0c;而是企业生存和发展的必然路径。它不仅仅是技术的升级&…

Open CASCADE学习|基于visual studio 2022编译源码

目录 1、简介 2、下载 2.1下载visual studio 2022 community 2.2下载下载cmake工具 2.3下载源码 2.4下载第三方插件 3、安装 3.1安装visual studio 2022 community 3.2安装cmake 4、编译源码 5、测试 1、简介 Open CASCADE&#xff08;简称…

C //练习 4-8 假定最多只压回一个字符。请相应地修改getch与ungetch这两个函数。

C程序设计语言 &#xff08;第二版&#xff09; 练习 4-8 练习 4-8 假定最多只压回一个字符。请相应地修改getch与ungetch这两个函数。 注意&#xff1a;代码在win32控制台运行&#xff0c;在不同的IDE环境下&#xff0c;有部分可能需要变更。 IDE工具&#xff1a;Visual St…

【OCR】 - Tesseract OCR在Windows系统中安装

Tesseract OCR 在Windows环境下安装Tesseract OCR&#xff08;Optical Character Recognition&#xff09;通常包括以下几个步骤&#xff1a; 下载Tesseract 访问Tesseract的GitHub发布页面&#xff1a;https://github.com/tesseract-ocr/tesseract/releases找到适合你操作系…

羊奶制作工艺揭秘,如何打造丰富多样的口味品种?

羊奶制作工艺揭秘&#xff0c;如何打造丰富多样的口味品种&#xff1f; 羊奶一直以来都是人们健康饮食的选择之一&#xff0c;它不仅营养丰富&#xff0c;而且口感独特。但是&#xff0c;你是否好奇羊奶是如何做到各种口味的呢&#xff1f;下面就跟随小编羊大师一起揭秘羊奶制…

SpringBoot集成Minio

pom文件导入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/P…

Hadoop之mapreduce参数大全-3

51.指定Shuffle传输过程中可以同时连接的节点数 mapreduce.shuffle.max.connections是Hadoop MapReduce中的一个配置参数&#xff0c;用于指定Shuffle传输过程中可以同时连接的节点数。该参数用于控制Shuffle传输的并发度&#xff0c;以保障任务的稳定性和性能。 可以通过以下…

RHCE9学习指南 第17章 进程管理

17.1 进程介绍 在Windows下打开任务管理器就可以查看到系统所有进程&#xff0c;如图17-1所示。 图17-1 Windows下的任务管理器 这里列出了系统中所有的进程。不过也可以使用命令行工具来查看进程。每个进程都会有一个process ID&#xff0c;简称为pid。 17.2 查看进程 也可…

实用Unity3D Log打印工具XDebug

特点 显示时间&#xff0c;精确到毫秒显示当前帧数&#xff08;在主线程中的打印才有意义&#xff0c;非主线程显示为-1&#xff09;有三种条件编译符(如下图) 注&#xff1a;要能显示线程中的当前帧数&#xff0c;要在app启动时&#xff0c;初始化mainThreadID字段条件编译符…

uniapp日期加减切换,点击切换

先上完成后的页面&#xff1a;当前年年份不显示&#xff0c;不然完整显示。 可以切换和自定义选择。 html:样式和图片自定义。 <view class"image-text_30"><image click"delMonth" :src"require(/static/home/zuo.png)" class"…