函数类(Function Classes)和 富函数类(Rich Function Classes)

   

目录

函数类(Function Classes)

富函数类(Rich Function Classes)


函数类(Function Classes)

      Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

java:

public class MapFunctionExample {  public static void main(String[] args) throws Exception {  // 创建流执行环境  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 创建一个数据源,例如从本地生成整数序列  DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);  // 使用 MapFunction 将整数翻倍  DataStream<Integer> doubledNumbers = numbers.map(new MapFunction<Integer, Integer>() {  @Override  public Integer map(Integer value) throws Exception {  return value * 2;  }  });  // 打印结果到控制台  doubledNumbers.print();  // 执行流处理  env.execute("MapFunction Example");  }  
}

scala:

object MapFunctionExample {  def main(args: Array[String]): Unit = {  // 创建流处理环境  val env = StreamExecutionEnvironment.getExecutionEnvironment  // 创建输入数据源  val input = env.fromElements(1, 2, 3, 4, 5)  // 使用 MapFunction 将每个元素乘以 2  val output = input.map(new MapFunction[Int, Int] {  def map(value: Int): Int = {  value * 2  }  })  // 打印结果到控制台  output.print()  // 执行流处理作业  env.execute("MapFunction Example")  }  
}

富函数类(Rich Function Classes)

        “富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

  •  RichMapFunction
  •  RichFlatMapFunction
  •  RichFilterFunction
  •  .................

Rich Function 有一个生命周期的概念。典型的生命周期方法有:

  • open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
// 富函数,可以获取到运行时上下文,还有一些生命周期class MyRichMap extends RichMapFunction[SensorReading, String]{override def open(parameters: Configuration): Unit = {//做一些初始化操作。比如map方法需要交互数据库,数据库连接可以在open里边做//getRuntimeContext()}override def map(value: SensorReading): String = {value.id + " temperature"}override def close(): Unit = {//map调用完之后。一般做收尾工作,比如关闭连接,或者清空状态}}

java:

public class RichMapExample {  public static void main(String[] args) throws Exception {  // 设置执行环境  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 创建数据源  env.fromElements("Hello", "World", "Flink")  .map(new RichMapFunction<String, Tuple2<String, Integer>>() {  private int count = 0;  @Override  public Tuple2<String, Integer> map(String value) throws Exception {  count++;  System.out.println("Mapped value: " + value);  return new Tuple2<>(value, count);  }  @Override  public void open(Configuration parameters) throws Exception {  super.open(parameters);  // 在这里可以添加一些初始化代码,比如日志记录、度量等。  }  })  // 添加简单的打印操作作为例子  .print();  // 执行任务  env.execute("Rich Map Function Example");  }  
}

scala:

object RichMapFunctionExample {  def main(args: Array[String]): Unit = {  // 创建流处理环境  val env = StreamExecutionEnvironment.getExecutionEnvironment  // 创建一个简单的数据源  val stream = env.fromElements(1, 2, 3, 4, 5)  // 使用 RichMapFunction 转换数据流  val transformedStream = stream.map(new RichMapFunction[Int, Int]() {  var stateValue: Option[Int] = None  override def open(config: Configuration): Unit = {  // 初始化状态描述器  val descriptor = new ValueStateDescriptor[Int]("state", classOf[Int])  stateValue = getRuntimeContext.getState(descriptor)  }  override def map(value: Int): Int = {  // 使用状态值进行转换  val transformedValue = stateValue match {  case Some(prevValue) => value + prevValue  case None => value  }  // 更新状态值  stateValue = Some(transformedValue)  transformedValue  }  })  // 打印结果到控制台  transformedStream.print()  // 执行作业  env.execute("RichMapFunction Example")  }  
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/650823.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【STM32】STM32学习笔记-Unix时间戳(41)

00. 目录 文章目录 00. 目录01. Unix时间戳02. UTC/GMT03. 时间戳转换04. C 标准库 <time.h>05. 时间相关函数示例5.1 time函数5.2 gmtime函数5.3 localtime函数5.4 mktime函数5.5 ctime函数5.6 asctime函数5.7 strftime函数 06. 预留07. 附录 01. Unix时间戳 •Unix 时…

2024-macOS系统或Kail系统重——破解ZIP压缩的文件密码

2024-macOS系统或Kail系统重——破解ZIP压缩的文件密码 1. 你们有遇见这样子的情况么&#xff1a; 别人给你发的zip或者下载的zip文件&#xff0c;没有密码打不开么网上都是win系统的&#xff0c;都是没有macOS系统的&#xff0c;所以比较烦恼 2. 所以我就想到了代码&#x…

gradle简单入门

安装 需要有Java环境 下载地址&#xff1a;https://gradle.org/releases/ 8.5版本仅有二进制文件&#xff1a;https://gradle.org/next-steps/?version8.5&formatbin 8.5版本包含文档和源码及二进制文件&#xff1a;https://gradle.org/next-steps/?version8.5&f…

PyTorch 之 rand() 与 randn() 函数

文章目录 torch.rand()示例: torch.randn()示例: 当然&#xff0c;让我更详细地介绍 torch.rand() 和 torch.randn()&#xff0c;以及它们在 PyTorch 中的用法。 torch.rand() torch.rand(*sizes, outNone, dtypeNone, layouttorch.strided, deviceNone, requires_gradFalse)…

无线路由探索

实验大纲 第一部分&#xff1a; 探索无线网络 步骤 1&#xff1a; 探索拓扑 步骤 2&#xff1a; 验证连接 第二部分&#xff1a; Wi-Fi 连接添加到董事会议室 步骤 1&#xff1a; 安装新的 LAP-PT 设备以覆盖新的董事会议室 步骤 2&#xff1a; 检验连接 第三部分&#…

Git初识

&#x1f4d9; 作者简介 &#xff1a;RO-BERRY &#x1f4d7; 学习方向&#xff1a;致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f4d2; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;欢迎各位关注&#xff0c;谢谢各位的支持 在学习Git之前我们先引入一…

美易平台:捷蓝航空股价上涨3%,Spirit Airlines股价下跌17%:合并协议可能面临终止

捷蓝航空&#xff08;JetBlue Airways&#xff09;的股价在美股盘前交易中上涨了3%&#xff0c;而Spirit Airlines的股价则下跌了近17%。这一波动是由于捷蓝航空在此前宣布可能会终止与Spirit Airlines的合并协议&#xff0c;并表示将继续评估合并协议下的选择。 捷蓝航空是一…

C/C++ - 面向对象编程

面向对象 面向过程编程&#xff1a; 数据和函数分离&#xff1a;在C语言中&#xff0c;数据和函数是分开定义和操作的。数据是通过全局变量或传递给函数的参数来传递的&#xff0c;函数则独立于数据。函数为主导&#xff1a;C语言以函数为主导&#xff0c;程序的执行流程由函数…

外卖跑腿系统开发:构建高效、安全的服务平台

在当今快节奏的生活中&#xff0c;外卖跑腿系统的开发已成为技术领域的一个重要课题。本文将介绍如何使用一些常见的编程语言和技术框架&#xff0c;构建一个高效、安全的外卖跑腿系统。 1. 技术选择 在开始开发之前&#xff0c;我们需要选择适合的技术栈。常用的技术包括&a…

Java聚类分析

聚类 聚类1 解决什么问题KMean聚类Kmedoids聚类2 java实现计算二维点的聚类案例KMean实现输出 K-medoids实现输出 聚类 1 解决什么问题 假设二维坐标轴上有一些点&#xff0c;现在让你把这些点分个类。于是对我们来说&#xff0c;这个分类似乎就是把距离相近的点画到一类中去。…

DDT数据驱动测试

简单介绍 ​ DDT&#xff08;Date Driver Test&#xff09;&#xff0c;所谓数据驱动测试&#xff0c; 简单来说就是由数据的改变从而驱动自动化测试的执行&#xff0c;最终引起测试结果的改变。通过使用数据驱动测试的方法&#xff0c;可以在需要验证多组数据测试场景中&…

【LeetCode-452】用最少数量的箭引爆气球(贪心)

LeetCode452.用最少数量的箭引爆气球 题目描述 原题链接 在二维空间中有许多球形的气球。对于每个气球&#xff0c;提供的输入是水平方向上&#xff0c;气球直径的开始和结束坐标。由于它是水平的&#xff0c;所以纵坐标并不重要&#xff0c;因此只要知道开始和结束的横坐标…

详细分析Java的树形工具类(含注释)

目录 前言1. 基本框架2. 实战应用 前言 对应的每个子孙属于该父亲&#xff0c;这其实是数据结构的基础知识&#xff0c;那怎么划分怎么归属呢 对应的基本知识推荐如下&#xff1a; 【数据结构】树和二叉树详细分析&#xff08;全&#xff09;【数据结构】B树和B树的笔记详细…

面试 HTML 框架八股文十问十答第一期

面试 HTML 框架八股文十问十答第一期 作者&#xff1a;程序员小白条&#xff0c;个人博客 相信看了本文后&#xff0c;对你的面试是有一定帮助的&#xff01;关注专栏后就能收到持续更新&#xff01; ⭐点赞⭐收藏⭐不迷路&#xff01;⭐ 1&#xff09;src和href的区别 src和…

数据结构与算法教程,数据结构C语言版教程!(第六部分、数据结构树,树存储结构详解)二

第六部分、数据结构树&#xff0c;树存储结构详解 数据结构的树存储结构&#xff0c;常用于存储逻辑关系为 "一对多" 的数据。 树存储结构中&#xff0c;最常用的还是二叉树&#xff0c;本章就二叉树的存储结构、二叉树的前序、中序、后序以及层次遍历、线索二叉树、…

300. 最长递增子序列(动态规划)

动态规划&#xff1a; 状态定义&#xff1a;dp[i]表示以索引为第i个字符结尾的最长递增子序列的长度&#xff0c;d[n-1]表示以第n-1个字符作为结尾的最长递增子序列的长度&#xff0c;但是这并不是答案&#xff0c;因为整个序列中的最长递增子序列不一定以n-1结尾&#xff0c;…

Docker部署思维导图工具SimpleMindMap并实现公网远程访问

文章目录 1. Docker一键部署思维导图2. 本地访问测试3. Linux安装Cpolar4. 配置公网地址5. 远程访问思维导图6. 固定Cpolar公网地址7. 固定地址访问 SimpleMindMap 是一个可私有部署的web思维导图工具。它提供了丰富的功能和特性&#xff0c;包含插件化架构、多种结构类型&…

如何重置某个css属性值(unset)

场景 你用了别人的UI框架&#xff0c;然后你发现&#xff0c;你给css动态赋的值&#xff0c;被UI框架的优先级更高的css覆盖了。你可以写js来改变它&#xff0c;但是如果有很多层循环操作&#xff0c;你需要写一大段的js&#xff0c;此时js并不是最优的选择&#xff0c;你真正…

【Unicode】Character ‘ENQUIRY‘ (U+0005)

询问 result.append("\u0005");Unicode Character ‘ENQUIRY’ (U0005)

js中字符串string,遍历json/Object【匹配url、邮箱、电话,版本号,千位分割,判断回文】

目录 正则 合法的URL 邮箱、电话 字符串方法 千位分割&#xff1a;num.slice(render, len).match(/\d{3}/g).join(,) 版本号比较 判断回文 json/Object 遍历 自身属性 for...inhasOwnProperty(key) Object.获取数组(obj)&#xff1a;Object.keys&#xff0c;Object…