目录
函数类(Function Classes)
富函数类(Rich Function Classes)
函数类(Function Classes)
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
java:
public class MapFunctionExample { public static void main(String[] args) throws Exception { // 创建流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建一个数据源,例如从本地生成整数序列 DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5); // 使用 MapFunction 将整数翻倍 DataStream<Integer> doubledNumbers = numbers.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value * 2; } }); // 打印结果到控制台 doubledNumbers.print(); // 执行流处理 env.execute("MapFunction Example"); }
}
scala:
object MapFunctionExample { def main(args: Array[String]): Unit = { // 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建输入数据源 val input = env.fromElements(1, 2, 3, 4, 5) // 使用 MapFunction 将每个元素乘以 2 val output = input.map(new MapFunction[Int, Int] { def map(value: Int): Int = { value * 2 } }) // 打印结果到控制台 output.print() // 执行流处理作业 env.execute("MapFunction Example") }
}
富函数类(Rich Function Classes)
“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- .................
Rich Function 有一个生命周期的概念。典型的生命周期方法有:
- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
// 富函数,可以获取到运行时上下文,还有一些生命周期class MyRichMap extends RichMapFunction[SensorReading, String]{override def open(parameters: Configuration): Unit = {//做一些初始化操作。比如map方法需要交互数据库,数据库连接可以在open里边做//getRuntimeContext()}override def map(value: SensorReading): String = {value.id + " temperature"}override def close(): Unit = {//map调用完之后。一般做收尾工作,比如关闭连接,或者清空状态}}
java:
public class RichMapExample { public static void main(String[] args) throws Exception { // 设置执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建数据源 env.fromElements("Hello", "World", "Flink") .map(new RichMapFunction<String, Tuple2<String, Integer>>() { private int count = 0; @Override public Tuple2<String, Integer> map(String value) throws Exception { count++; System.out.println("Mapped value: " + value); return new Tuple2<>(value, count); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 在这里可以添加一些初始化代码,比如日志记录、度量等。 } }) // 添加简单的打印操作作为例子 .print(); // 执行任务 env.execute("Rich Map Function Example"); }
}
scala:
object RichMapFunctionExample { def main(args: Array[String]): Unit = { // 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建一个简单的数据源 val stream = env.fromElements(1, 2, 3, 4, 5) // 使用 RichMapFunction 转换数据流 val transformedStream = stream.map(new RichMapFunction[Int, Int]() { var stateValue: Option[Int] = None override def open(config: Configuration): Unit = { // 初始化状态描述器 val descriptor = new ValueStateDescriptor[Int]("state", classOf[Int]) stateValue = getRuntimeContext.getState(descriptor) } override def map(value: Int): Int = { // 使用状态值进行转换 val transformedValue = stateValue match { case Some(prevValue) => value + prevValue case None => value } // 更新状态值 stateValue = Some(transformedValue) transformedValue } }) // 打印结果到控制台 transformedStream.print() // 执行作业 env.execute("RichMapFunction Example") }
}