转换算子Transformation
- 概述
- 基本转换算子
- 映射Map
- 扁平映射flatMap
- 过滤Filter
- 聚合算子
- 按键分区keyBy
- 归约聚合reduce
- 简单聚合sum、min、max、minBy、maxBy
- 物理分区算子
- 随机分配
- 轮询分配
- 重缩放
- 广播
- 全局分区
- 自定义分区
- 分流操作
- Filter分流
- SideOutPut分流
- Split分流
- 合流操作
- 联合Union
- 连接Connect
- CoMap、CoFlatMap
- CoProcessFunction
- 算子链和资源组
- 创建新链
- 禁止链接
- 配置Slot共享组
- 用户自定义函数UDF
- 函数类
- 匿名函数
- 富函数类
概述
转换算子(Transformation)是ApacheFlink中用于对数据流进行处理和转换的操作。在Flink中,数据流被抽象为一个有向无环图(DAG),转换算子可以将数据流的每个元素进行操作,并生成新的数据流。
因此,Flink中的转换算子是指对输入数据流进行转换操作的一类算子,它是将一个或多个DataStream转换为新的DataStream
特点:
转换算子接受一个或多个输入数据流,并产生一个或多个输出数据流。每个转换算子都代表一个具体的数据处理操作,可以在数据流上执行诸如映射、过滤、聚合、分组等操作。转换算子可以按照不同的方式组合在一起,形成复杂的数据流处理逻辑。可以通过方法链式调用来连接多个转换算子,形成具体的数据处理流程。
常见Flink转换算子:
通过组合这些转换算子,可以构建出复杂的数据处理流程,实现各种业务逻辑的数据处理和分析。
Map:对输入数据流中的每个元素应用一个函数,并将函数的输出作为输出数据流中的元素FlatMap:与 Map 类似,但可以通过一个函数返回多个输出元素Filter:根据指定的条件过滤输入数据流中的元素,并将符合条件的元素作为输出数据流中的元素KeyBy:按照指定的键对输入数据流分组,并返回分组后的数据流Reduce:对输入数据流中的每个分组应用一个函数进行聚合,并将聚合结果作为输出数据流中的元素Aggregations:针对分组后的数据流进行聚合,可以使用 Sum、Max、Min、Avg 等内置函数Window:按照指定的时间或者大小划分输入数据流,并在窗口上应用聚合函数Join:将两个输入数据流中的元素按照指定的键进行连接
基本转换算子
Flink中基本的转换算子有Map、Filter 和 FlatMap。它们分别用于对每个输入元素应用一个函数,根据指定的条件过滤输入元素,以及将每个输入元素映射为多个输出元素。
映射Map
Map算子接受一个函数作为参数,该函数将输入数据流中的每个元素映射为一个新的元素,并将这些新元素组成一个输出数据流。
注意:通常在使用Flink算子的时候,可以使用匿名类、Lambda、实现类
方式一:传入匿名类,实现MapFunction
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));/*** MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。* * 实现MapFunction接口时,需要指定两个泛型,分别是输入和输出的类型,还需要重写map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑*/SingleOutputStreamOperator<String> map = stream.map(new MapFunction<Integer, String>() {@Overridepublic String map(Integer integer) throws Exception {return "数字: " + integer;}});map.print();env.execute();}
方式二:使用Lambda表达式
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));SingleOutputStreamOperator<String> map = stream.map((MapFunction<Integer, String>) integer -> "数字: " + integer);map.print();env.execute();}
方式三:传入MapFunction的实现类
public static void main(String[] args) throws Exception {class MyMap implements MapFunction<Integer, String> {@Overridepublic String map(Integer integer) throws Exception {return "数字: " + integer;}}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));stream.map(new MyMap()).print();env.execute();}
扁平映射flatMap
FlatMap算子接受一个函数作为参数,该函数将输入数据流中的每个元素映射为多个新的元素,并将这些新元素组成一个输出数据流。
flatMap是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。具有消费一个元素,可以产生0到多个元素的特点。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.fromCollection(Arrays.asList("h e l l o"));stream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String[] split = s.split(" ");for (String word : split) {collector.collect(word);}}}).print();env.execute();}
6> h
6> e
6> l
6> l
6> o
过滤Filter
Filter算子接受一个函数作为参数,该函数返回一个布尔值,表示是否应该保留输入数据流中的当前元素。如果该函数返回true,则当前元素被保留,否则,当前元素被丢弃。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));stream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a != 2;}}).print();env.execute();}
聚合算子
Flink提供了多种聚合算子,用于对数据流进行各种不同类型的聚合操作。
按键分区keyBy
在Flink中要做聚合,需要先进行分区,分区操作是通过keyBy来完成的。keyBy 是 Flink 中的一个操作符,用于将数据流按照指定的 key 进行分区
在 Flink 中,数据流被分为多个分区,每个分区都有一个或多个并行的任务来处理数据。keyBy 操作符会将数据流中具有相同 key 的数据分配到同一个分区中,从而保证相同 key 的数据被同一个任务处理。
keyBy 操作符常用于聚合操作,例如对某个字段进行求和、求平均值等。在使用 keyBy 操作符时,需要指定一个或多个字段作为 key,这些字段的值相同的数据将被分配到同一个分区中。
在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。
注意:
在Flink内部,它是通过计算key的哈希值来对分区数进行取模运算实现的。因此,如果Key是一个POJO对象,应该必须重写 hashCode()方法。
KeyBy是将
DataStream
转KeyedStream
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.fromCollection(Arrays.asList("ab", "abc", "bc", "ab", "bc"));// 以数据源本身作为 key 做一个分区操作KeyedStream<String, String> keyedStream =stream.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String s) throws Exception {return s;}});keyedStream.print();env.execute();}
keyBy得到的结果将不再是 DataStream,而是会将 DataStream 转换为KeyedStream
11> abc
4> ab
3> bc
4> ab
3> bc
归约聚合reduce
Flink的归约聚合(reduce)算子是一种常用的聚合操作,它对数据流中的元素进行逐个聚合,将当前元素与上一个聚合结果合并,得到一个单一的结果
在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。
reduce对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。在相同key的数据流上持续滚动执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。
注意:
Reduce是将
KeyedStream
转DataStream
定义一个输入数据流,其包含了一系列整数,通过reduce算子对流进行归约聚合,将相邻元素相加得到最终结果。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStream<Integer, Boolean> keyBy = stream.keyBy(number -> number > 3);keyBy.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {System.out.println("value1 = " + value1 + " value2 = " + value2); return value1 + value2; // 将两个值相加}}).print();env.execute();}
在流处理的底层实现过程中,实际上是将中间合并结果作为任务的一个状态保存起来,之后每来一个新的数据,就和之前的聚合状态进一步做归约
1> 1
8> 4
value1 = 1 value2 = 3
value1 = 4 value2 = 6
1> 4
8> 10
value1 = 4 value2 = 2
value1 = 10 value2 = 5
1> 6
8> 15
简单聚合sum、min、max、minBy、maxBy
有了按键分区的数据流 KeyedStream,就可以基于它进行聚合操作。使用聚合算子可以方便地对数据流进行各种不同类型的聚合操作,从而帮助实现各种复杂的分析和处理需求。
聚合算子主要包括以下几种:
sum:对数据流中的元素求和min:求数据流中的最小值minBy:根据指定的键值,求数据流中的最小值,并返回该最小值所在的全部元素max:求数据流中的最大值maxBy:根据指定的键值,求数据流中的最大值,并返回该最大值所在的全部元素
转换算子需要实现自定义函数,聚合需要指定字段即可。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,可以使用这两种方式来指定字段。需注意:元组中字段的名称,是以 f0、f1、f2、…来命名。数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定。
sum
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStream<Integer, Boolean> keyBy = stream.keyBy(number -> number > 3);SingleOutputStreamOperator<Integer> max = keyBy.sum(0);max.print();env.execute();}
max
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStream<Integer, Boolean> keyBy = stream.keyBy(number -> number > 3);SingleOutputStreamOperator<Integer> max = keyBy.max(0);max.print();env.execute();}
min
SingleOutputStreamOperator<Integer> max = keyBy.min(0);
minBy
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromElements(Tuple2.of("key1", 1), Tuple2.of("key2", 2), Tuple2.of("key1", 3), Tuple2.of("key2", 4));KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStreamSource.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple) throws Exception {return tuple.f0;}});// 按第二个字段选择最小值SingleOutputStreamOperator<Tuple2<String, Integer>> minBy = keyedStream.minBy(1);minBy.print();env.execute();}
maxBy
// 按第二个字段选择最大值
SingleOutputStreamOperator<Tuple2<String, Integer>> minBy = keyedStream.maxBy(1);
物理分区算子
Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。
物理分区算子(Physical Partitioning)是Flink中用于将数据流划分到不同物理分区的一种技术。通过将数据流划分到不同的分区,可以实现负载均衡、并行计算和提高吞吐量等优化。
Flink提供了多种物理分区算子,可以根据不同的需求选择适合的分区策略。常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)
随机分配
随机分配(Random)是一种将数据随机分配到各个分区的策略。每个分区接收的数据量可能不同,适用于无需考虑数据倾斜的场景。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);DataStream<Integer> partition = dataStreamSource.shuffle();partition.print();env.execute();}
2> 4
1> 1
2> 5
1> 2
1> 3
1> 6
轮询分配
轮询分配(Round-Robin)是一种均匀地按照轮询顺序将数据分配到各个分区的策略。每个分区依次接收一个元素,然后再轮到下一个分区,如此循环。
DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);// rebalance可以解决数据源倾斜的场景
DataStream<Integer> partition = dataStreamSource.rebalance();
2> 1
2> 3
2> 5
1> 2
1> 4
1> 6
重缩放
重缩放分区和轮询分区非常相似。缩放(Rescale)是一种根据数据的大小动态调整分区的策略。它根据每个分区中的数据量进行动态的分区调整,使各个分区的数据负载均衡。
DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
DataStream<Integer> partition = dataStreamSource.rescale();
1> 1
2> 2
1> 3
2> 4
1> 5
2> 6
广播
广播(Broadcast)是一种将数据复制到所有分区的策略,即所有分区都会接收到相同的数据。广播适合于需要在所有分区上进行全局操作的场景。
DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
DataStream<Integer> partition = dataStreamSource.broadcast();
1> 1
2> 1
1> 2
2> 2
1> 3
2> 3
1> 4
2> 4
1> 5
2> 5
1> 6
2> 6
注意:广播会在每个并行任务之间复制数据,并占用更多的内存和网络带宽,因此应慎重使用
全局分区
全局分区(Global Partitioning)是一种特殊的分区方式。它将所有的输入流数据都发送到下游算子的第一个并行子任务中去。
注意:
它可能会产生数据倾斜问题。由于所有数据都发送到同一个并行任务,这个任务可能会成为瓶颈,并导致性能下降相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力
DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
// 全部数据发往第一个子任务
DataStream<Integer> partition = dataStreamSource.global();
1> 1
1> 2
1> 3
1> 4
1> 5
1> 6
自定义分区
在Flink中,可以使用partitionCustom()方法来实现自定义的分区策略。通过自定义分区,可以根据自己的业务需求将数据合理地分发到不同的分区中。
自定义分区需要实现Partitioner接口,并重写其中的partition()方法。partition()方法接收一个键和键的总数作为参数,并返回要分配到的分区编号(从0开始)。
通过自定义分区策略,可以根据具体的业务场景将数据分配到合适的分区中,用于实现更精确的数据处理和控制。
自定义分区器将奇数和偶数分到不同的分区中
public class MyPartitioner implements Partitioner<Integer> {@Overridepublic int partition(Integer key, int numPartitions) {if (key % 2 == 0) {// 将偶数分配到第一个分区return 0;} else {// 将奇数分配到第二个分区return 1;}}
}
在Flink程序中使用自定义分区
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromCollection(Arrays.asList(1, 2, 3));// DataStream<Integer> partitionCustom = dataStreamSource.partitionCustom(new MyPartitioner(), value -> value);DataStream<Integer> partitionCustom = dataStreamSource.partitionCustom(new MyPartitioner(), new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value;}});partitionCustom.print();env.execute();}
2> 1
1> 2
2> 3
分流操作
分流就是将一条数据流拆分成完全独立的两条、甚至多条流。分流就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
把输入源按照需要进行拆分,比如期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。
Filter分流
Filter算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,可以做多次 filter,把需要的不同数据生成不同的流。
针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流。
根据奇偶性将输入流分流成两个输出流:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));SingleOutputStreamOperator<Integer> even = stream.filter(value -> value % 2 == 0);SingleOutputStreamOperator<Integer> odd = stream.filter(value -> value % 2 == 1);even.print("偶数-job");odd.print("奇数-job");env.execute();}
偶数-job:1> 2
奇数-job:1> 5
偶数-job:3> 4
奇数-job:9> 1
奇数-job:11> 3
直接使用filter来实现分流效果,存在缺点
1.代码显得有些冗余2.同一个数据,要被处理两遍(调用两次filter),不够高效
SideOutPut分流
使用侧输出流(SideOutput)也可以实现数据的分流操作。侧输出流允许将不符合主要流处理逻辑的数据发送到一个或多个辅助输出流中。
使用步骤:
1.使用OutputTag定义两个侧输出流的标签:evenTag和oddTag2.通过processElement()方法的逻辑,主要处理逻辑将负数发送到主输出流,而偶数、奇数通过ctx.output()方法发送到侧输出流evenTag与oddTag3.最后使用getSideOutput()方法获取侧输出流
通过侧输出流机制,可以将不符合主要处理逻辑的数据单独处理,以实现更灵活的数据分流和处理
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, -6));/*** 创建OutputTag对象* 分别指定: 标签名、放入侧输出流的数据类型(Typeinformation)*/OutputTag<Integer> evenTag = new OutputTag<>("even", Types.INT);OutputTag<Integer> oddTag = new OutputTag<>("odd", Types.INT);// 使用process算子SingleOutputStreamOperator<Integer> process = stream.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value > 0) {if (value % 2 == 0) {// 偶数放到侧输出流evenTag中// 调用上下文对象ctx的output方法,分别传入 Tag对象、放入侧输出流中的数据ctx.output(evenTag, value);} else if (value % 2 == 1) {// 奇数放到侧输出流oddTag中ctx.output(oddTag, value);}} else {// 负数 数据,放到主流中out.collect(value);}}});// 在主流中,根据标签 获取 侧输出流SideOutputDataStream<Integer> even = process.getSideOutput(evenTag);SideOutputDataStream<Integer> odd = process.getSideOutput(oddTag);// 打印主流process.printToErr("主流-负数-job");//打印 侧输出流even.print("偶数-job");odd.print("奇数-job");env.execute();}
奇数-job:1> 1
偶数-job:2> 2
奇数-job:1> 3
偶数-job:2> 4
奇数-job:1> 5
主流-负数-job:2> -6
Split分流
Split也是将流进行切分的方法,需要在split算子中定义OutputSelector,然后重写其中的select方法,将不同类型的数据进行标记,最后对返回的SplitStream使用select方法将对应的数据选择出来。
注意:
使用split算子切分过的流,是不能进行二次切分的
Split算子在Flink较新版本中已弃用,推荐使用SideOutPut进行流的拆分
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//获取数据源List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();data.add(new Tuple3<>(0,1,0));data.add(new Tuple3<>(0,2,2));data.add(new Tuple3<>(0,1,1));DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {@Overridepublic Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {List<String> tags = new ArrayList<>();if (value.f0 == 0) {tags.add("zero");} else if (value.f0 == 1) {tags.add("one");}return tags;}});splitStream.select("zero").print();splitStream.select("one").printToErr();//打印结果String jobName = "zero one streaming";env.execute(jobName);}
合流操作
联合Union
联合操作将两个或多个数据流联合来创建一个包含所有流中数据的新流。这种合并操作会保留所有输入流的顺序。流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
注意:
如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。
流的联合受限于数据类型不能改变,缺乏灵活性
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> source2 = env.fromElements(4, 5, 6);DataStreamSource<String> source3 = env.fromElements("7", "8", "9");DataStream<Integer> union = source1.union(source2).union(source3.map(number -> Integer.valueOf(number)));
// DataStream<Integer> union = source1.union(source2, source3.map(number -> Integer.valueOf(number)));union.print();env.execute();}
1
2
3
4
5
6
7
8
9
连接Connect
在Flink中,可以使用connect()操作符将两个或多个流连接在一起以形成ConnectedStreams。连接流提供一种将不同类型的流合并在一起的方式,通过它可以对每个流应用不同的处理逻辑,但它们会共享相同的上下文信息。
使用connect合并流,一次只能连接2条流,流的数据类型可以不一样,连接后可以调用 map、flatmap、process来处理
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
DataStreamSource<String> source2 = env.fromElements("4", "5", "6");
ConnectedStreams<Integer, String> connect = source1.connect(source2);
CoMap、CoFlatMap
类似于在连接的数据流上进行 map 和 flatMap。
如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<String> source2 = env.fromElements("4", "5", "6");ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<Integer> result = connect.map(new CoMapFunction<Integer, String, Integer>() {@Overridepublic Integer map1(Integer value) throws Exception {return value;}@Overridepublic Integer map2(String value) throws Exception {return Integer.parseInt(value);}});result.print();env.execute();}
调用.flatMap()
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word);}}
});
CoProcessFunction
调用.process()时,传入一个CoProcessFunction。它需要实现的就是processElement1()、processElement2()两个方法,
假设有两个输入流,将这两个流合并计算得到每个key对应的合计,并输出结果流
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("key1", 1), Tuple2.of("key2", 4), Tuple2.of("key1", 2));DataStreamSource<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("key1", 3), Tuple2.of("key2", 5), Tuple2.of("key2", 6));ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connect = source1.connect(source2);// 进行keyby操作,将key相同数据放到一起ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);/*** 对2个流中相同key的值求和*/SingleOutputStreamOperator<String> process = connectKeyby.process(new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {Map<String, Integer> map = new HashMap<>();/*** 第一条流的处理逻辑* @param value 第一条流的数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {String key = value.f0;if (!map.containsKey(key)) {// 如果key不存在,则将值直接put进mapmap.put(key, value.f1);} else {// key存在,则计算:获取上一次put的值 + 本次的值Integer total = map.get(key) + value.f1;map.put(key, total);}out.collect("processElement1 key = " + key + " value = " + value + "total = " + map.get(key));}/*** 第二条流的处理逻辑* @param value 第二条流的数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {String key = value.f0;if (!map.containsKey(key)) {// 如果key不存在,则将值直接put进mapmap.put(key, value.f1);} else {// key存在,则计算:获取上一次put的值 + 本次的值Integer total = map.get(key) + value.f1;map.put(key, total);}out.collect("processElement2 key = " + key + " value = " + value + "total = " + map.get(key));}});process.print();env.execute();}
3> processElement1 key = key2 value = (key2,4)total = 4
4> processElement1 key = key1 value = (key1,1)total = 1
4> processElement2 key = key1 value = (key1,3)total = 4
4> processElement1 key = key1 value = (key1,2)total = 6
3> processElement2 key = key2 value = (key2,5)total = 9
3> processElement2 key = key2 value = (key2,6)total = 15
算子链和资源组
将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink默认会将能链接的算子尽可能地进行链接(例如两个map转换操作)。
此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:
如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注意的是,这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(…).startNewChain() 这样调用,而不能 someStream.startNewChain() 这样。
一个资源组对应着 Flink 中的一个 slot 槽,更多细节请看 slots 。 你可以根据需要手动地将各个算子隔离到不同的 slot 中。
任务链和资源组 ( Task chaining and resource groups ) 也是 Flink 提供的底层 API,用于控制任务链和资源分配。默认情况下,如果操作允许 (例如相邻的两次 map 操作) ,则 Flink 会尝试将它们在同一个线程内进行,从而可以获取更好的性能。但是 Flink 也允许用户自己来控制这些行为,这就是任务链和资源组 API:
创建新链
使用startNewChain基于当前算子创建一个新的算子链。
例如:后面两个map将被链接起来,而 filter 和第一个 map 不会链接在一起。
someStream.filter(...).map(...).startNewChain().map(...);
禁止链接
disableChaining操作用于禁止将其他操作与当前操作放置于同一个任务链中
someStream.map(...).disableChaining();
配置Slot共享组
为某个算子设置slot共享组。Flink会将同一个slot共享组的算子放在同一个slot 中,而将不在同一slot共享组的算子保留在其它slot 中。这可用于隔离slot 。
如果所有输入算子都属于同一个slot共享组,那么slot共享组从将继承输入算子所在的slot。slot共享组的默认名称是 “default”,可以调用slotSharingGroup(“default”) 来显式地将算子放入该组
someStream.filter(...).slotSharingGroup("name");
用户自定义函数UDF
在Flink中,可以通过自定义函数(UDF,User-Defined Function)来实现对数据流的自定义操作。
UDF可以用于转换、过滤、聚合等操作,以满足特定的业务需求。也就是说用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类。
要定义自定义函数,需要实现相应的函数接口,具体取决于希望实现的功能。
函数类
函数类(Function Classes)是最通用、最灵活的一种用户定义函数的方式。可以创建一个实现特定函数接口的类,并在类中实现函数的逻辑。常见的函数接口有:MapFunction、FilterFunction、ReduceFunction、AggregatgeFunction等等。
public static void main(String[] args) throws Exception {class MyMapFunction implements MapFunction<Integer, String> {@Overridepublic String map(Integer value) {return "数字 = " + value; // 将整数转换为字符串}}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperator<String> map = stream.map(new MyMapFunction());map.print();env.execute();}
2> 数字 = 3
1> 数字 = 6
4> 数字 = 5
12> 数字 = 4
11> 数字 = 1
3> 数字 = 2
匿名函数
匿名函数(Anonymous Function)是一种没有具体函数类定义的函数,它只是在使用时定义。匿名函数通常用于一次性的简单操作,不需要单独定义一个函数类。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperator<String> map = stream.map(new MapFunction<Integer, String>() {@Overridepublic String map(Integer value) {return "数字 = " + value;}});map.print();env.execute();}
在匿名函数中,还可以自由地引用外部的变量
public static void main(String[] args) throws Exception {int threshold = 3;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperator<Integer> map = stream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) {return value > threshold; // 过滤大于阈值的值}});map.print();env.execute();}
10> 6
1> 5
9> 4
富函数类
富函数类(Rich Function Class)是Function Class的一种特殊形式。富函数类不仅支持函数的逻辑定义,还可以使用更多的生命周期和上下文信息,如open()、close()和getRuntimeContext()等。
open()方法
是Rich Function的初始化方法,会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法
生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
public static void main(String[] args) throws Exception {class MyRichMapFunction extends RichMapFunction<Integer, String> {@Overridepublic String map(Integer value) {return "数字 = " + value;}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化资源或状态System.out.println("并行子任务的索引:" + getRuntimeContext().getIndexOfThisSubtask() + " 生命周期开始");}@Overridepublic void close() throws Exception {// 释放资源或保存状态super.close();System.out.println("并行子任务的索引:" + getRuntimeContext().getIndexOfThisSubtask() + " 生命周期结束");}}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));SingleOutputStreamOperator<String> map = stream.map(new MyRichMapFunction());map.print();env.execute();}
并行子任务的索引:0 生命周期开始
数字 = 1
数字 = 2
数字 = 3
并行子任务的索引:0 生命周期结束
env.setParallelism(2);
并行子任务的索引:0 生命周期开始
并行子任务的索引:1 生命周期开始
2> 数字 = 2
1> 数字 = 1
1> 数字 = 3
并行子任务的索引:1 生命周期结束
并行子任务的索引:0 生命周期结束