Flink之转换算子Transformation

转换算子Transformation

  • 概述
  • 基本转换算子
    • 映射Map
    • 扁平映射flatMap
    • 过滤Filter
  • 聚合算子
    • 按键分区keyBy
    • 归约聚合reduce
    • 简单聚合sum、min、max、minBy、maxBy
  • 物理分区算子
    • 随机分配
    • 轮询分配
    • 重缩放
    • 广播
    • 全局分区
    • 自定义分区
  • 分流操作
    • Filter分流
    • SideOutPut分流
    • Split分流
  • 合流操作
    • 联合Union
    • 连接Connect
    • CoMap、CoFlatMap
    • CoProcessFunction
  • 算子链和资源组
    • 创建新链
    • 禁止链接
    • 配置Slot共享组
  • 用户自定义函数UDF
    • 函数类
    • 匿名函数
    • 富函数类

概述

转换算子(Transformation)是ApacheFlink中用于对数据流进行处理和转换的操作。在Flink中,数据流被抽象为一个有向无环图(DAG),转换算子可以将数据流的每个元素进行操作,并生成新的数据流。

因此,Flink中的转换算子是指对输入数据流进行转换操作的一类算子,它是将一个或多个DataStream转换为新的DataStream

特点:

转换算子接受一个或多个输入数据流,并产生一个或多个输出数据流。每个转换算子都代表一个具体的数据处理操作,可以在数据流上执行诸如映射、过滤、聚合、分组等操作。转换算子可以按照不同的方式组合在一起,形成复杂的数据流处理逻辑。可以通过方法链式调用来连接多个转换算子,形成具体的数据处理流程。

常见Flink转换算子:

通过组合这些转换算子,可以构建出复杂的数据处理流程,实现各种业务逻辑的数据处理和分析。

Map:对输入数据流中的每个元素应用一个函数,并将函数的输出作为输出数据流中的元素FlatMap:与 Map 类似,但可以通过一个函数返回多个输出元素Filter:根据指定的条件过滤输入数据流中的元素,并将符合条件的元素作为输出数据流中的元素KeyBy:按照指定的键对输入数据流分组,并返回分组后的数据流Reduce:对输入数据流中的每个分组应用一个函数进行聚合,并将聚合结果作为输出数据流中的元素Aggregations:针对分组后的数据流进行聚合,可以使用 Sum、Max、Min、Avg 等内置函数Window:按照指定的时间或者大小划分输入数据流,并在窗口上应用聚合函数Join:将两个输入数据流中的元素按照指定的键进行连接

基本转换算子

Flink中基本的转换算子有Map、Filter 和 FlatMap。它们分别用于对每个输入元素应用一个函数,根据指定的条件过滤输入元素,以及将每个输入元素映射为多个输出元素。

映射Map

Map算子接受一个函数作为参数,该函数将输入数据流中的每个元素映射为一个新的元素,并将这些新元素组成一个输出数据流。

注意:通常在使用Flink算子的时候,可以使用匿名类、Lambda、实现类

方式一:传入匿名类,实现MapFunction

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));/*** MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。* * 实现MapFunction接口时,需要指定两个泛型,分别是输入和输出的类型,还需要重写map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑*/SingleOutputStreamOperator<String> map = stream.map(new MapFunction<Integer, String>() {@Overridepublic String map(Integer integer) throws Exception {return "数字: " + integer;}});map.print();env.execute();}

方式二:使用Lambda表达式

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));SingleOutputStreamOperator<String> map = stream.map((MapFunction<Integer, String>) integer -> "数字: " + integer);map.print();env.execute();}

方式三:传入MapFunction的实现类

    public static void main(String[] args) throws Exception {class MyMap implements MapFunction<Integer, String> {@Overridepublic String map(Integer integer) throws Exception {return "数字: " + integer;}}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));stream.map(new MyMap()).print();env.execute();}

扁平映射flatMap

FlatMap算子接受一个函数作为参数,该函数将输入数据流中的每个元素映射为多个新的元素,并将这些新元素组成一个输出数据流。

flatMap是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。具有消费一个元素,可以产生0到多个元素的特点。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.fromCollection(Arrays.asList("h e l l o"));stream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String[] split = s.split(" ");for (String word : split) {collector.collect(word);}}}).print();env.execute();}
6> h
6> e
6> l
6> l
6> o

过滤Filter

Filter算子接受一个函数作为参数,该函数返回一个布尔值,表示是否应该保留输入数据流中的当前元素。如果该函数返回true,则当前元素被保留,否则,当前元素被丢弃。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));stream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a != 2;}}).print();env.execute();}

聚合算子

Flink提供了多种聚合算子,用于对数据流进行各种不同类型的聚合操作。

按键分区keyBy

在Flink中要做聚合,需要先进行分区,分区操作是通过keyBy来完成的。keyBy 是 Flink 中的一个操作符,用于将数据流按照指定的 key 进行分区

在 Flink 中,数据流被分为多个分区,每个分区都有一个或多个并行的任务来处理数据。keyBy 操作符会将数据流中具有相同 key 的数据分配到同一个分区中,从而保证相同 key 的数据被同一个任务处理。

keyBy 操作符常用于聚合操作,例如对某个字段进行求和、求平均值等。在使用 keyBy 操作符时,需要指定一个或多个字段作为 key,这些字段的值相同的数据将被分配到同一个分区中。

在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。

注意:

在Flink内部,它是通过计算key的哈希值来对分区数进行取模运算实现的。因此,如果Key是一个POJO对象,应该必须重写 hashCode()方法。

KeyBy是将DataStreamKeyedStream

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.fromCollection(Arrays.asList("ab", "abc", "bc", "ab", "bc"));// 以数据源本身作为 key 做一个分区操作KeyedStream<String, String> keyedStream =stream.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String s) throws Exception {return s;}});keyedStream.print();env.execute();}

keyBy得到的结果将不再是 DataStream,而是会将 DataStream 转换为KeyedStream

11> abc
4> ab
3> bc
4> ab
3> bc

归约聚合reduce

Flink的归约聚合(reduce)算子是一种常用的聚合操作,它对数据流中的元素进行逐个聚合,将当前元素与上一个聚合结果合并,得到一个单一的结果

在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

reduce对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。在相同key的数据流上持续滚动执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

注意:

Reduce是将KeyedStreamDataStream

定义一个输入数据流,其包含了一系列整数,通过reduce算子对流进行归约聚合,将相邻元素相加得到最终结果。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStream<Integer, Boolean> keyBy = stream.keyBy(number -> number > 3);keyBy.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {System.out.println("value1 = " + value1 + " value2 = " + value2); 	return value1 + value2; // 将两个值相加}}).print();env.execute();}

在流处理的底层实现过程中,实际上是将中间合并结果作为任务的一个状态保存起来,之后每来一个新的数据,就和之前的聚合状态进一步做归约

1> 1
8> 4
value1 = 1 value2 = 3
value1 = 4 value2 = 6
1> 4
8> 10
value1 = 4 value2 = 2
value1 = 10 value2 = 5
1> 6
8> 15

简单聚合sum、min、max、minBy、maxBy

有了按键分区的数据流 KeyedStream,就可以基于它进行聚合操作。使用聚合算子可以方便地对数据流进行各种不同类型的聚合操作,从而帮助实现各种复杂的分析和处理需求。

聚合算子主要包括以下几种:

sum:对数据流中的元素求和min:求数据流中的最小值minBy:根据指定的键值,求数据流中的最小值,并返回该最小值所在的全部元素max:求数据流中的最大值maxBy:根据指定的键值,求数据流中的最大值,并返回该最大值所在的全部元素

转换算子需要实现自定义函数,聚合需要指定字段即可。指定字段的方式有两种:指定位置,和指定名称。

对于元组类型的数据,可以使用这两种方式来指定字段。需注意:元组中字段的名称,是以 f0、f1、f2、…来命名。数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定。

sum

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStream<Integer, Boolean> keyBy = stream.keyBy(number -> number > 3);SingleOutputStreamOperator<Integer> max = keyBy.sum(0);max.print();env.execute();}

max

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStream<Integer, Boolean> keyBy = stream.keyBy(number -> number > 3);SingleOutputStreamOperator<Integer> max = keyBy.max(0);max.print();env.execute();}

min

SingleOutputStreamOperator<Integer> max = keyBy.min(0);

minBy

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromElements(Tuple2.of("key1", 1), Tuple2.of("key2", 2), Tuple2.of("key1", 3), Tuple2.of("key2", 4));KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStreamSource.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple) throws Exception {return tuple.f0;}});// 按第二个字段选择最小值SingleOutputStreamOperator<Tuple2<String, Integer>> minBy = keyedStream.minBy(1);minBy.print();env.execute();}

maxBy

 // 按第二个字段选择最大值
SingleOutputStreamOperator<Tuple2<String, Integer>> minBy = keyedStream.maxBy(1);

物理分区算子

Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。

物理分区算子(Physical Partitioning)是Flink中用于将数据流划分到不同物理分区的一种技术。通过将数据流划分到不同的分区,可以实现负载均衡、并行计算和提高吞吐量等优化。

Flink提供了多种物理分区算子,可以根据不同的需求选择适合的分区策略。常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)

随机分配

随机分配(Random)是一种将数据随机分配到各个分区的策略。每个分区接收的数据量可能不同,适用于无需考虑数据倾斜的场景。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);DataStream<Integer> partition = dataStreamSource.shuffle();partition.print();env.execute();}
2> 4
1> 1
2> 5
1> 2
1> 3
1> 6

轮询分配

轮询分配(Round-Robin)是一种均匀地按照轮询顺序将数据分配到各个分区的策略。每个分区依次接收一个元素,然后再轮到下一个分区,如此循环。

 DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);// rebalance可以解决数据源倾斜的场景
DataStream<Integer> partition = dataStreamSource.rebalance();
2> 1
2> 3
2> 5
1> 2
1> 4
1> 6

重缩放

重缩放分区和轮询分区非常相似。缩放(Rescale)是一种根据数据的大小动态调整分区的策略。它根据每个分区中的数据量进行动态的分区调整,使各个分区的数据负载均衡。

DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
DataStream<Integer> partition = dataStreamSource.rescale();
1> 1
2> 2
1> 3
2> 4
1> 5
2> 6

广播

广播(Broadcast)是一种将数据复制到所有分区的策略,即所有分区都会接收到相同的数据。广播适合于需要在所有分区上进行全局操作的场景。

DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
DataStream<Integer> partition = dataStreamSource.broadcast();
1> 1
2> 1
1> 2
2> 2
1> 3
2> 3
1> 4
2> 4
1> 5
2> 5
1> 6
2> 6

注意:广播会在每个并行任务之间复制数据,并占用更多的内存和网络带宽,因此应慎重使用

全局分区

全局分区(Global Partitioning)是一种特殊的分区方式。它将所有的输入流数据都发送到下游算子的第一个并行子任务中去。

注意:

它可能会产生数据倾斜问题。由于所有数据都发送到同一个并行任务,这个任务可能会成为瓶颈,并导致性能下降相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力
DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
// 全部数据发往第一个子任务
DataStream<Integer> partition = dataStreamSource.global();
1> 1
1> 2
1> 3
1> 4
1> 5
1> 6

自定义分区

在Flink中,可以使用partitionCustom()方法来实现自定义的分区策略。通过自定义分区,可以根据自己的业务需求将数据合理地分发到不同的分区中。

自定义分区需要实现Partitioner接口,并重写其中的partition()方法。partition()方法接收一个键和键的总数作为参数,并返回要分配到的分区编号(从0开始)。

通过自定义分区策略,可以根据具体的业务场景将数据分配到合适的分区中,用于实现更精确的数据处理和控制。

自定义分区器将奇数和偶数分到不同的分区中

public class MyPartitioner implements Partitioner<Integer> {@Overridepublic int partition(Integer key, int numPartitions) {if (key % 2 == 0) {// 将偶数分配到第一个分区return 0;} else {// 将奇数分配到第二个分区return 1;}}
}

在Flink程序中使用自定义分区

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromCollection(Arrays.asList(1, 2, 3));//        DataStream<Integer> partitionCustom = dataStreamSource.partitionCustom(new MyPartitioner(), value -> value);DataStream<Integer> partitionCustom = dataStreamSource.partitionCustom(new MyPartitioner(), new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value;}});partitionCustom.print();env.execute();}
2> 1
1> 2
2> 3

分流操作

分流就是将一条数据流拆分成完全独立的两条、甚至多条流。分流就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

把输入源按照需要进行拆分,比如期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。

Filter分流

Filter算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,可以做多次 filter,把需要的不同数据生成不同的流。

针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流。

根据奇偶性将输入流分流成两个输出流:

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));SingleOutputStreamOperator<Integer> even = stream.filter(value -> value % 2 == 0);SingleOutputStreamOperator<Integer> odd = stream.filter(value -> value % 2 == 1);even.print("偶数-job");odd.print("奇数-job");env.execute();}
偶数-job:1> 2
奇数-job:1> 5
偶数-job:3> 4
奇数-job:9> 1
奇数-job:11> 3

直接使用filter来实现分流效果,存在缺点

1.代码显得有些冗余2.同一个数据,要被处理两遍(调用两次filter),不够高效

SideOutPut分流

使用侧输出流(SideOutput)也可以实现数据的分流操作。侧输出流允许将不符合主要流处理逻辑的数据发送到一个或多个辅助输出流中。

使用步骤:

1.使用OutputTag定义两个侧输出流的标签:evenTag和oddTag2.通过processElement()方法的逻辑,主要处理逻辑将负数发送到主输出流,而偶数、奇数通过ctx.output()方法发送到侧输出流evenTag与oddTag3.最后使用getSideOutput()方法获取侧输出流

通过侧输出流机制,可以将不符合主要处理逻辑的数据单独处理,以实现更灵活的数据分流和处理

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, -6));/*** 创建OutputTag对象* 分别指定: 标签名、放入侧输出流的数据类型(Typeinformation)*/OutputTag<Integer> evenTag = new OutputTag<>("even", Types.INT);OutputTag<Integer> oddTag = new OutputTag<>("odd", Types.INT);// 使用process算子SingleOutputStreamOperator<Integer> process = stream.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value > 0) {if (value % 2 == 0) {// 偶数放到侧输出流evenTag中// 调用上下文对象ctx的output方法,分别传入 Tag对象、放入侧输出流中的数据ctx.output(evenTag, value);} else if (value % 2 == 1) {// 奇数放到侧输出流oddTag中ctx.output(oddTag, value);}} else {// 负数 数据,放到主流中out.collect(value);}}});// 在主流中,根据标签 获取 侧输出流SideOutputDataStream<Integer> even = process.getSideOutput(evenTag);SideOutputDataStream<Integer> odd = process.getSideOutput(oddTag);// 打印主流process.printToErr("主流-负数-job");//打印 侧输出流even.print("偶数-job");odd.print("奇数-job");env.execute();}
奇数-job:1> 1
偶数-job:2> 2
奇数-job:1> 3
偶数-job:2> 4
奇数-job:1> 5
主流-负数-job:2> -6

Split分流

Split也是将流进行切分的方法,需要在split算子中定义OutputSelector,然后重写其中的select方法,将不同类型的数据进行标记,最后对返回的SplitStream使用select方法将对应的数据选择出来。

注意:

使用split算子切分过的流,是不能进行二次切分的

Split算子在Flink较新版本中已弃用,推荐使用SideOutPut进行流的拆分

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//获取数据源List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();data.add(new Tuple3<>(0,1,0));data.add(new Tuple3<>(0,2,2));data.add(new Tuple3<>(0,1,1));DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {@Overridepublic Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {List<String> tags = new ArrayList<>();if (value.f0 == 0) {tags.add("zero");} else if (value.f0 == 1) {tags.add("one");}return tags;}});splitStream.select("zero").print();splitStream.select("one").printToErr();//打印结果String jobName = "zero one streaming";env.execute(jobName);}

合流操作

联合Union

联合操作将两个或多个数据流联合来创建一个包含所有流中数据的新流。这种合并操作会保留所有输入流的顺序。流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

注意:

如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

流的联合受限于数据类型不能改变,缺乏灵活性

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> source2 = env.fromElements(4, 5, 6);DataStreamSource<String> source3 = env.fromElements("7", "8", "9");DataStream<Integer> union = source1.union(source2).union(source3.map(number -> Integer.valueOf(number)));
//        DataStream<Integer> union = source1.union(source2, source3.map(number -> Integer.valueOf(number)));union.print();env.execute();}
1
2
3
4
5
6
7
8
9

连接Connect

在Flink中,可以使用connect()操作符将两个或多个流连接在一起以形成ConnectedStreams。连接流提供一种将不同类型的流合并在一起的方式,通过它可以对每个流应用不同的处理逻辑,但它们会共享相同的上下文信息。

使用connect合并流,一次只能连接2条流,流的数据类型可以不一样,连接后可以调用 map、flatmap、process来处理

DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
DataStreamSource<String> source2 = env.fromElements("4", "5", "6");
ConnectedStreams<Integer, String> connect = source1.connect(source2);

CoMap、CoFlatMap

类似于在连接的数据流上进行 map 和 flatMap。

如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<String> source2 = env.fromElements("4", "5", "6");ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<Integer> result = connect.map(new CoMapFunction<Integer, String, Integer>() {@Overridepublic Integer map1(Integer value) throws Exception {return value;}@Overridepublic Integer map2(String value) throws Exception {return Integer.parseInt(value);}});result.print();env.execute();}

调用.flatMap()

connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word);}}
});

CoProcessFunction

调用.process()时,传入一个CoProcessFunction。它需要实现的就是processElement1()、processElement2()两个方法,

假设有两个输入流,将这两个流合并计算得到每个key对应的合计,并输出结果流

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("key1", 1), Tuple2.of("key2", 4), Tuple2.of("key1", 2));DataStreamSource<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("key1", 3), Tuple2.of("key2", 5), Tuple2.of("key2", 6));ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connect = source1.connect(source2);// 进行keyby操作,将key相同数据放到一起ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);/*** 对2个流中相同key的值求和*/SingleOutputStreamOperator<String> process = connectKeyby.process(new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {Map<String, Integer> map = new HashMap<>();/*** 第一条流的处理逻辑* @param value 第一条流的数据* @param ctx   上下文* @param out   采集器* @throws Exception*/@Overridepublic void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {String key = value.f0;if (!map.containsKey(key)) {// 如果key不存在,则将值直接put进mapmap.put(key, value.f1);} else {// key存在,则计算:获取上一次put的值 + 本次的值Integer total = map.get(key) + value.f1;map.put(key, total);}out.collect("processElement1  key = " + key + " value = " + value + "total = " + map.get(key));}/*** 第二条流的处理逻辑* @param value 第二条流的数据* @param ctx   上下文* @param out   采集器* @throws Exception*/@Overridepublic void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {String key = value.f0;if (!map.containsKey(key)) {// 如果key不存在,则将值直接put进mapmap.put(key, value.f1);} else {// key存在,则计算:获取上一次put的值 + 本次的值Integer total = map.get(key) + value.f1;map.put(key, total);}out.collect("processElement2  key = " + key + " value = " + value + "total = " + map.get(key));}});process.print();env.execute();}
3> processElement1  key = key2 value = (key2,4)total = 4
4> processElement1  key = key1 value = (key1,1)total = 1
4> processElement2  key = key1 value = (key1,3)total = 4
4> processElement1  key = key1 value = (key1,2)total = 6
3> processElement2  key = key2 value = (key2,5)total = 9
3> processElement2  key = key2 value = (key2,6)total = 15

算子链和资源组

将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink默认会将能链接的算子尽可能地进行链接(例如两个map转换操作)。

此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:

如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注意的是,这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(…).startNewChain() 这样调用,而不能 someStream.startNewChain() 这样。

一个资源组对应着 Flink 中的一个 slot 槽,更多细节请看 slots 。 你可以根据需要手动地将各个算子隔离到不同的 slot 中。

任务链和资源组 ( Task chaining and resource groups ) 也是 Flink 提供的底层 API,用于控制任务链和资源分配。默认情况下,如果操作允许 (例如相邻的两次 map 操作) ,则 Flink 会尝试将它们在同一个线程内进行,从而可以获取更好的性能。但是 Flink 也允许用户自己来控制这些行为,这就是任务链和资源组 API:

创建新链

使用startNewChain基于当前算子创建一个新的算子链。

例如:后面两个map将被链接起来,而 filter 和第一个 map 不会链接在一起。

someStream.filter(...).map(...).startNewChain().map(...);

禁止链接

disableChaining操作用于禁止将其他操作与当前操作放置于同一个任务链中

someStream.map(...).disableChaining();

配置Slot共享组

为某个算子设置slot共享组。Flink会将同一个slot共享组的算子放在同一个slot 中,而将不在同一slot共享组的算子保留在其它slot 中。这可用于隔离slot 。

如果所有输入算子都属于同一个slot共享组,那么slot共享组从将继承输入算子所在的slot。slot共享组的默认名称是 “default”,可以调用slotSharingGroup(“default”) 来显式地将算子放入该组

someStream.filter(...).slotSharingGroup("name");

用户自定义函数UDF

在Flink中,可以通过自定义函数(UDF,User-Defined Function)来实现对数据流的自定义操作。

UDF可以用于转换、过滤、聚合等操作,以满足特定的业务需求。也就是说用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类、匿名函数、富函数类。

要定义自定义函数,需要实现相应的函数接口,具体取决于希望实现的功能。

函数类

函数类(Function Classes)是最通用、最灵活的一种用户定义函数的方式。可以创建一个实现特定函数接口的类,并在类中实现函数的逻辑。常见的函数接口有:MapFunction、FilterFunction、ReduceFunction、AggregatgeFunction等等。

    public static void main(String[] args) throws Exception {class MyMapFunction implements MapFunction<Integer, String> {@Overridepublic String map(Integer value) {return "数字 = " + value; // 将整数转换为字符串}}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperator<String> map = stream.map(new MyMapFunction());map.print();env.execute();}
2> 数字 = 3
1> 数字 = 6
4> 数字 = 5
12> 数字 = 4
11> 数字 = 1
3> 数字 = 2

匿名函数

匿名函数(Anonymous Function)是一种没有具体函数类定义的函数,它只是在使用时定义。匿名函数通常用于一次性的简单操作,不需要单独定义一个函数类。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperator<String> map = stream.map(new MapFunction<Integer, String>() {@Overridepublic String map(Integer value) {return "数字 = " + value;}});map.print();env.execute();}

在匿名函数中,还可以自由地引用外部的变量

    public static void main(String[] args) throws Exception {int threshold = 3;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperator<Integer> map = stream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) {return value > threshold; // 过滤大于阈值的值}});map.print();env.execute();}
10> 6
1> 5
9> 4

富函数类

富函数类(Rich Function Class)是Function Class的一种特殊形式。富函数类不仅支持函数的逻辑定义,还可以使用更多的生命周期和上下文信息,如open()、close()和getRuntimeContext()等。

open()方法

是Rich Function的初始化方法,会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。

close()方法

生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

    public static void main(String[] args) throws Exception {class MyRichMapFunction extends RichMapFunction<Integer, String> {@Overridepublic String map(Integer value) {return "数字 = " + value;}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化资源或状态System.out.println("并行子任务的索引:" + getRuntimeContext().getIndexOfThisSubtask() + " 生命周期开始");}@Overridepublic void close() throws Exception {// 释放资源或保存状态super.close();System.out.println("并行子任务的索引:" + getRuntimeContext().getIndexOfThisSubtask() + " 生命周期结束");}}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3));SingleOutputStreamOperator<String> map = stream.map(new MyRichMapFunction());map.print();env.execute();}
并行子任务的索引:0 生命周期开始
数字 = 1
数字 = 2
数字 = 3
并行子任务的索引:0 生命周期结束
env.setParallelism(2);
并行子任务的索引:0 生命周期开始
并行子任务的索引:1 生命周期开始
2> 数字 = 2
1> 数字 = 1
1> 数字 = 3
并行子任务的索引:1 生命周期结束
并行子任务的索引:0 生命周期结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/109237.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

4K壁纸小程序源码 全内容自动采集

全内容自动采集 4K壁纸小程序源码&#xff0c;带流量主。用的都是一个接口&#xff0c;不过这个不知是谁改的&#xff0c;成了LSP版&#xff0c;是真色啊&#xff0c;专搜小姐姐。 4K壁纸&#xff0c;静态壁纸&#xff0c;头像等都有保留&#xff0c;界面广告位很多&#xff0c…

线性回归原理

1、 线性回归的原理 1.1 线性回归应用场景 房价预测 销售额度预测 金融&#xff1a;贷款额度预测、利用线性回归以及系数分析因子1.2 什么是线性回归 1.2.1定义与公式 线性回归(Linear regression)是利用回归方程(函数)对一个或多个自变量(特征值)和因变量(目标值)之间关系…

导航守卫的使用记录和beforeEach( )死循环的问题

前置导航守卫beforeEach的使用 import Vue from vue import VueRouter from vue-router // 进度条 import NProgress from nprogress import nprogress/nprogress.cssVue.use(VueRouter)// 路由表 const routes [{path: "/",redirect: "/home",},{path: …

Unity3D Shader新手入门教程:3D溶解与腐蚀特效详解

引言 在游戏开发中&#xff0c;特效是非常重要的一部分&#xff0c;它能够增加游戏的趣味性和可玩性。其中&#xff0c;Shader特效是一种非常常见和常用的特效&#xff0c;它能够通过改变物体表面的渲染方式来实现各种各样的特效效果。本文将详细介绍Unity3D中的Shader 3D溶解与…

04 MIT线性代数-矩阵的LU分解 Factorization into A=LU

目的: 从矩阵的角度理解高斯消元法, 完成LU分解得到ALU 1.矩阵乘积的逆矩阵 Inverse of a product 2.矩阵乘积的转置 Transpose of a product 3.转置矩阵的逆矩阵 Inverse of a transpose 4.矩阵的LU分解 U为上三角阵(Upper triangular matrix), L为下三角阵(Lower triangular…

Qt系列-常用控件使用整理

1、QMainWindow介绍 菜单栏最多只有一个 //菜单栏创建 菜单栏最多只能有一个QMenuBar*bar menuBar();//将菜单栏放入到窗口中setMenuBar(bar);//创键菜单QMenu*fileMenubar->addMenu("文件");QMenu*editMenubar->addMenu("编辑");//创建菜单项QActi…

设计模式之六大设计原则

为什么要学习设计模式&#xff1f; 要知道设计模式就是软件工程的方法经验的总结&#xff0c;也是可以认为是过去一段时间软件工程的一个最佳实践&#xff0c;要理解&#xff0c;不要死记硬背。掌握这些方法后&#xff0c;可以让你的程序获得以下好处&#xff1a; 代码重用性…

【LeetCode】38. 外观数列

1 问题 给定一个正整数 n &#xff0c;输出外观数列的第 n 项。 「外观数列」是一个整数序列&#xff0c;从数字 1 开始&#xff0c;序列中的每一项都是对前一项的描述。 你可以将其视作是由递归公式定义的数字字符串序列&#xff1a; countAndSay(1) “1” countAndSay(n…

kali使用docker安装DVWA

上一篇文章我记录了如何使用kali安装DVWA&#xff0c;但是我是一个一个组件安装的&#xff0c;非常麻烦&#xff0c;比如数据库还需要配置&#xff0c;花费时间很多。昨天在逛github时&#xff0c;发现大佬的靶场都是通过docker打包好的&#xff0c;如果我也用docker安装DVWA&a…

4.4 网际控制报文协议ICMP

思维导图&#xff1a; 4.4 网际控制报文协议ICMP - 笔记 --- **定义**: - 网际控制报文协议ICMP(Internet Control Message Protocol)是根据[RFC 792]定义的一种协议。它的主要功能是为了提高IP数据报的转发效率和确保交付的成功率。 **主要功能**: 1. **差错报告**: ICMP允…

阿里云starrocks监控告发至钉钉群

背景&#xff1a;新入职一家公司&#xff0c;现场没有对sr的进行监控&#xff0c;根据开发的需求编写了一个python脚本。 脚本逻辑&#xff1a;抓取sr的be/fe/routine load状态信息&#xff0c;判读是否触发告警&#xff0c;若满足告警条件&#xff0c;则发送告警信息到钉钉群…

二维码智慧门牌管理系统:确保数据准确,强制校验GPS信号强度

文章目录 前言一、数据采集多种方式二、提高工作效率与管理效率 前言 在快速发展的科技时代&#xff0c;我们推出了一款最新的门牌系统解决方案——二维码智慧门牌。这款门牌不仅具备高效的管理功能&#xff0c;还为入口管理提供全新的智慧化解决方案。 一、数据采集多种方式 …

Could not find artifact com.sleepycat;je:jar:7.3.7 in aliyunmaven

在编译inlong源码时报的错误&#xff0c;去本地库里发现只有lastupdate的文件&#xff0c;就又去maven库里看了一下Maven Repository: com.sleepycat je (mvnrepository.com)&#xff0c;发现没有这个版本&#xff0c;将版本进行修改错误解决

【Arduino TFT】 记录使用DMA优化TFT屏帧率

忘记过去&#xff0c;超越自己 ❤️ 博客主页 单片机菜鸟哥&#xff0c;一个野生非专业硬件IOT爱好者 ❤️❤️ 本篇创建记录 2023-10-18 ❤️❤️ 本篇更新记录 2023-10-18 ❤️&#x1f389; 欢迎关注 &#x1f50e;点赞 &#x1f44d;收藏 ⭐️留言&#x1f4dd;&#x1f64…

信息系统漏洞与风险管理制度

1、总则 1.1、目的 为了进一步规范XXXXX单位信息系统风险管理活动&#xff0c;提升风险管理工作的可操纵性和适用性&#xff0c;使信息网络正常运行&#xff0c;防止网络攻击&#xff0c;保证业务的正常进行&#xff0c;依据XXXXX单位员的相关规范和标准规定&#xff0c;特制…

【马蹄集】—— 概率论专题

概率论专题 目录 MT2226 抽奖概率MT2227 饿饿&#xff01;饭饭&#xff01;MT2228 甜甜花的研究MT2229 赌石MT2230 square MT2226 抽奖概率 难度&#xff1a;黄金    时间限制&#xff1a;1秒    占用内存&#xff1a;128M 题目描述 小码哥正在进行抽奖&#xff0c;箱子里有…

Spring Framework :WebClient 取代 RestTemplate

本心、输入输出、结果 文章目录 Spring Framework :WebClient 取代 RestTemplate前言WebClient 优于 RestTemplate 的地方使用示例创建客户端发起同步请求发起异步请求WebClient 简介安装配置如何设置 URL 参数 (REST)配置超时时间免除 SSL 验证弘扬爱国精神Spring Framewor…

vue echarts图表自适应屏幕变化

在Vue中使用ECharts图表实现自适应屏幕变化&#xff0c;可以按照以下步骤进行操作&#xff1a; 安装ECharts和vue-echarts库。 npm install echarts vue-echarts在需要使用图表的组件中导入相关库并注册图表组件。 import ECharts from vue-echarts; import echarts/lib/char…

搭建CNFS文件系统

1.概念&#xff1a; CNFS &#xff08;Cluster Network File System&#xff09;是 GPFS 中的一种模式&#xff0c;用于配置和管理多台服务器&#xff08;节点&#xff09;之间的文件共享和数据访问 它允许多个节点同时访问和共享文件系统的数据&#xff0c;以实现高性能、高可…

Transformer模型 | Transformer模型描述

谷歌推出的BERT模型在11项NLP任务中夺得SOTA结果,引爆了整个NLP界。而BERT取得成功的一个关键因素是Transformer的强大作用。谷歌的Transformer模型最早是用于机器翻译任务,当时达到了SOTA效果。Transformer改进了RNN最被人诟病的训练慢的缺点,利用self-attention机制实现快…