Flink Stream API实践

目录

Flink程序的基本构成

获得执行环境(environment)

加载/初始化数据(source)

基于文件

基于socket

基于集合

自定义

转换操作(transformation)

基本转换

物理分区

任务链和资源组

名称和描述

指定计算结果放置在何处(sink)

触发程序执行(execution)


 

Flink程序的基本构成

一个Flink程序的基本构成如下:

1.获得一个执行环境(environment)
2.加载/创建初始数据(source)
3.在此数据上指定转换(transformation)
4.指定将计算结果放置在何处(sink)
5.触发程序执行(execution)

获得执行环境(environment)

获得流处理执行环境的三种方式:

1.根据上下文实际情况的执行环境
StreamExecutionEnvironment.getExecutionEnvironment();


2.本地执行环境
StreamExecutionEnvironment.createLocalEnvironment();


3.远程执行环境
createRemoteEnvironment(String host, int port, String... jarFiles);

例如:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();StreamExecutionEnvironment remoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081,"path/to/jarfile");

通常情况下,直接使用getExecutionEnvironment()来获取执行环境,因为程序运行时根据上下文条件自动选择相应的环境。如果在IDE中执行程序,将返回本地的执行环境。如果将程序打成jar文件,并通过命令提交jar到flink集群,此时将返回flink集群环境。


加载/初始化数据(source)

基于文件

  • readTextFile(path)
  • readFile(fileInputFormat, path)
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

例如:

DataSource<String> lineDs = env.readTextFile("data/words.txt");

基于socket

socketTextStream(hostname, port)

例如:

DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);

基于集合

  • fromCollection(Collection)
  • fromCollection(Iterator, Class)
  • fromElements(T ...)
  • fromParallelCollection(SplittableIterator, Class)
  • generateSequence(from, to)

例如:

DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(0, 1, 2));
DataStream<Integer> dataStream = env.fromElements(1, 0, 3, 0, 5);
DataStreamSource<Long> source1 = env.generateSequence(1, 10);

自定义

  • 旧的方式:addSource(SourceFunction<OUT> function)

例如:读取kafka的数据

env.addSource(new FlinkKafkaConsumer<>(...))

  • 新的方式:fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName)

例如:读取kafka的数据

env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")


转换操作(transformation)

转换操作:运算符将一个或多个数据流转换为新的数据流。

程序可以将多个转换组合成复杂的数据流拓扑结构。

本节描述了基本转换、应用这些转换后的有效物理分区以及对Flink运算符链的深入了解。

基本转换

  • Map
  • FlatMap
  • Filter
  • KeyBy
  • Reduce
  • Window
  • WindowAll
  • Window Apply
  • WindowReduce
  • Union
  • Window Join
  • Interval Join
  • Window CoGroup
  • Connect
  • CoMap, CoFlatMap
  • Cache

在Flink WordCount工程 的基础上操作,把以下案例代码放在org.example.transformations包或者其他自定义的包下。

Map

DataStream → DataStream

对流数据里的每个元素进行转换,得到另一个流数据。

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}
});

将数据流里的每个元素乘以2,得到新的数据流并输出,完整代码如下:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*Takes one element and produces one element.A map function that doubles the values of the input stream:*/
public class OperatorMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5);// transformationsSingleOutputStreamOperator<Integer> data = dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}});// sinkdata.print();// executeenv.execute();}
}

运行结果

2> 10
7> 4
6> 2
8> 6
1> 8

FlatMap

DataStream → DataStream

将数据流中的每个元素转换得到0个,1个 或 多个元素

dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}}
});

把句子中的单词取出来,完整代码如下:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*Takes one element and produces zero, one, or more elements.A flatmap function that splits sentences to words:*/
public class OperatorFlatMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStream<String> dataStream = env.fromElements("hello world", "hello flink", "hello hadoop");// transformationsSingleOutputStreamOperator<String> data = dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String word : value.split(" ")) {out.collect(word);}}});// sinkdata.print();// executeenv.execute();}
}

运行结果

5> hello
7> hello
6> hello
7> hadoop
5> world
6> flink

Filter

DataStream → DataStream

为每个元素计算一个布尔函数,并保留函数返回true的元素。

dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value != 0;}
});

输出不是0的元素,完整代码如下:

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*Evaluates a boolean function for each element and retains those for which the function returns true.A filter that filters out zero values*/
public class OperatorFilter {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStream<Integer> dataStream = env.fromElements(1, 0, 3, 0, 5);// transformationsSingleOutputStreamOperator<Integer> data = dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value != 0;}});// sinkdata.print();// executeenv.execute();}
}

运行结果

8> 5
6> 3
4> 1

KeyBy

DataStream → KeyedStream

在逻辑上将流划分为不相交的分区。具有相同键的所有记录都被分配到同一个分区。在内部,keyBy()是通过散列分区实现的。

dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

根据key进行分组,并对值进行求和,完整代码如下:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;/*Logically partitions a stream into disjoint partitions.All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning.There are different ways to specify keys.*/
public class OperatorKeyBy {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceList<Tuple2<String, Integer>> dataSource = Arrays.asList(Tuple2.of("hello", 3),Tuple2.of("flink", 2),Tuple2.of("hadoop", 4),Tuple2.of("flink", 5));DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromCollection(dataSource);// transformationsSingleOutputStreamOperator<Tuple2<String, Integer>> data = dataStream.keyBy(value -> value.f0).sum(1);// sinkdata.print();// executeenv.execute();}
}

运行结果

3> (hello,3)
7> (flink,2)
8> (hadoop,4)
7> (flink,7)

Reduce

KeyedStream → DataStream

对键控数据流进行“滚动”缩减。将当前元素与上一个缩减值组合并发出新值。

keyedStream.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2)throws Exception {return value1 + value2;}
});

对有相同key的值进行规约运算,这里做求和运算,完整代码如下:

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;/*KeyedStream → DataStreamA “rolling” reduce on a keyed data stream.Combines the current element with the last reduced value and emits the new value.A reduce function that creates a stream of partial sums:*/
public class OperatorReduce {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceList<Tuple2<String, Integer>> dataSource = Arrays.asList(Tuple2.of("hello", 3),Tuple2.of("flink", 2),Tuple2.of("hadoop", 3),Tuple2.of("flink", 5),Tuple2.of("hello", 1),Tuple2.of("hadoop", 1));DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromCollection(dataSource);// transformationsKeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(value -> value.f0);SingleOutputStreamOperator<Tuple2<String, Integer>> data = keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, (value1.f1 + value2.f1));}});// sinkdata.print();// executeenv.execute();}
}

运行结果

7> (flink,2)
8> (hadoop,3)
3> (hello,3)
7> (flink,7)
8> (hadoop,4)
3> (hello,4) 

Window

KeyedStream → WindowedStream

可以在已分区的KeyedStreams上定义窗口(Windows)。窗口根据某些特性(例如,最后10秒内到达的数据)对每个键中的数据进行分组。

dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))); 

可以对窗口的数据进行计算,例如:计算10秒滚动窗口中每个单词出现的次数,案例代码如下:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WindowWordCount {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsSingleOutputStreamOperator<Tuple2<String, Integer>> dataStream = env.socketTextStream("node1", 7777).flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(1);// sinkdataStream.print();// executionenv.execute("Window WordCount");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}
}

启动nc监听

[hadoop@node1 ~]$ nc -lk 7777

运行flink程序

发送测试数据

[hadoop@node1 ~]$ nc -lk 7777
hello world
hello flink
hello hadoop
hello java
hello  
​

运行结果

5> (world,1)
8> (hadoop,1)
3> (hello,3)
7> (flink,1)
2> (java,1)
3> (hello,1)
3> (hello,1)

注意:输入数据的速度不一样,会导致数据分配到不同的窗口,计算出的结果也会不一样。

WindowAll

DataStream → AllWindowedStream

可以在常规数据流上定义窗口。 Windows 根据某些特征(例如,最近 10 秒内到达的数据)对所有流事件进行分组.

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));

注意:很多情况下WindowAll是一种非并行转换。所有记录将被收集到同一个任务中进行计算,数据量大可能会出现OOM问题。

把所有窗口中的数据进行规约运算,这里使用逗号来拼接每个单词,完整代码如下:

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*Windows can be defined on regular DataStreams.Windows group all the stream events according to some characteristicThis is in many cases a non-parallel transformation. (非并行)All records will be gathered in one task for the windowAll operator.*/
public class OperatorWindowAll {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsSingleOutputStreamOperator<String> result = env.socketTextStream("node1", 7777).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<String>() {@Overridepublic String reduce(String value1, String value2) throws Exception {return value1 + "," + value2;}});// sinkresult.print();// executeenv.execute();}
}

测试数据

[hadoop@node1 ~]$ nc -lk 7777
hello      
world
hadoop
flink
hello
​

运行结果

4> hello,world
5> hadoop,flink,hello

Window Apply

WindowedStream → DataStream
AllWindowedStream → DataStream

将通用函数应用到整个窗口。

注意:如果使用 windowAll 转换,则需要使用 AllWindowFunction。

windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {public void apply (Tuple tuple,Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});
​
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {public void apply (Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});

对窗口内元素根据key相同进行求和运算,完整代码如下:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class OperatorWindowApply {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsDataStreamSource<String> dataStream = env.socketTextStream("node1", 7777);WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = dataStream.flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<Integer> applyStream = windowedStream.apply(new WindowFunction<Tuple2<String, Integer>, Integer, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {int sum = 0;for (Tuple2<String, Integer> value : values) {sum += value.f1;}out.collect(new Integer(sum));}});// sinkapplyStream.print();// executeenv.execute();}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}
}

发送测试数据

[hadoop@node1 ~]$ nc -lk 7777
hello world
hello hadoop
hello flink
flink
​

运行结果

5> 1
3> 1
3> 2
7> 2
8> 1

注意:输入速度不一样,导致数据分配到不同的窗口,运行结果也会不一样。

分析结果

第一行hello world在一个窗口,每个单词都出现1次,所以输出1 、 1

第二行、第三行、第四行 在同一窗口,hello出现2次, flink出现2次, hadoop出现一次,所以输出 2 、 2、 1

WindowReduce

WindowedStream → DataStream 

将函数式Reduce函数应用于窗口并返回Reduce后的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);}
});

使用Reduce实现词频统计,完整代码如下:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class OperatorWindowReduce {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsDataStreamSource<String> dataStream = env.socketTextStream("node1", 7777);WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = dataStream.flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);}});// sinkresult.print();// executeenv.execute();}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}
}

测试数据

[hadoop@node1 ~]$ nc -lk 7777
hello hello world
hello flink
flink flink
hadoop hadoop
hello
​

运行结果

5> (world,1)
3> (hello,2)
7> (flink,3)
3> (hello,1)
8> (hadoop,2)
3> (hello,1)

Union

DataStream* → DataStream

两个或多个相同类型的数据流联合创建一个包含所有流中所有元素的新流。

dataStream.union(otherStream1, otherStream2, ...);

两个相同类型的数据流联结,完整代码如下:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*
Union of two or more data streams creating a new stream containing all the elements from all the streams.
Note: If you union a data stream with itself you will get each element twice in the resulting stream.*/
public class OperatorUnion {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// sourceDataStream<Integer> dataStream1 = env.fromElements(1, 2, 3);DataStream<Integer> dataStream2 = env.fromElements(4, 5, 6);// transformationsDataStream<Integer> res = dataStream1.union(dataStream2);// sinkres.print();// executeenv.execute();}
}

运行结果

1
2
3
4
5
6

Window Join

DataStream,DataStream → DataStream

连接给定键和公共窗口上的两个数据流

dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new JoinFunction () {...});

两个数据流的窗口联结,案例完整代码如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class OperatorWindowJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4),Tuple2.of("c", 12)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer,Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0).equalTo(r2 -> r2.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}

运行结果

(a,1)<----->(a,1,1)
(a,2)<----->(a,1,1)
(b,3)<----->(b,2,1)
(c,12)<----->(c,14,1)

Interval Join

KeyedStream,KeyedStream → DataStream

在给定时间间隔内使用公共key连接两个KeyedStream的两个元素 e1 和 e2,以便 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound。

keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // 时间下限,时间上限.upperBoundExclusive(true) // 可选项.lowerBoundExclusive(true) // 可选项.process(new IntervalJoinFunction() {...});

间隔连接,完整代码如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class OperatorIntervalJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);//调用 interval joinks1.intervalJoin(ks2)// 连接时间间隔.between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {out.collect(left + "<------>" + right);}}).print();env.execute();}
}

运行结果

(a,1)<------>(a,1,1)
(a,2)<------>(a,1,1)
(b,3)<------>(b,2,1)

Window CoGroup

DataStream,DataStream → DataStream

将给定键和公共窗口上的两个数据流联合分组。

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new CoGroupFunction () {...});

两个数据流联合分组,完整代码如下:

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.example.jiaocai.chapter5.CoGroupExample;public class OperatorWindowCoGroup {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> socketSource1 = env.socketTextStream("node1", 7777);DataStream<String> socketSource2 = env.socketTextStream("node1", 8888);DataStream<Tuple2<String, Integer>> input1 = socketSource1.map(line -> {String[] arr = line.split(" ");String id = arr[0];int t = Integer.parseInt(arr[1]);return Tuple2.of(id, t);}).returns(Types.TUPLE(Types.STRING, Types.INT));DataStream<Tuple2<String, Integer>> input2 = socketSource2.map(line -> {String[] arr = line.split(" ");String id = arr[0];int t = Integer.parseInt(arr[1]);return Tuple2.of(id, t);}).returns(Types.TUPLE(Types.STRING, Types.INT));DataStream<String> coGroupResult = input1.coGroup(input2).where(i1 -> i1.f0).equalTo(i2 -> i2.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupExample.MyCoGroupFunction());coGroupResult.print();env.execute("window cogroup function");}public static class MyCoGroupFunction implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {@Overridepublic void coGroup(Iterable<Tuple2<String, Integer>> input1, Iterable<Tuple2<String, Integer>> input2, Collector<String> out) {input1.forEach(element -> System.out.println("input1 :" + element.f1));input2.forEach(element -> System.out.println("input2 :" + element.f1));}}
}

测试数据

[hadoop@node1 ~]$ nc -lk 7777
hello 2
hello 1
​
​
[hadoop@node1 ~]$ nc -lk 8888
hello 3
hello 4
​

运行结果

input1 :2
input1 :1
input2 :3
input2 :4

Connect

DataStream,DataStream → ConnectedStream

“连接”两个保留其类型的数据流,连接允许两个流之间共享状态。两个流的数据类型可以不一样。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

connect连接后,得到ConnectedStreams流,对于ConnectedStreams流转换时需要实现CoMapFunction或CoFlatMapFunction接口,重写里面的两个方法分别来处理两个流数据,也就是第一个方法处理第一个流的数据,第二个方法处理第二个流的数据。传入的数据类型如下:

// IN1 表示第一个流的数据类型
// IN2 表示第二个流的数据类型
// IN3 表示处理后输出流的数据类型
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {

两个不同数据类型的数据流的联结,完整代码如下:

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class OperatorConnect {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// sourceDataStream<Integer> dataStream1 = env.fromElements(1, 2, 3);DataStream<String> dataStream2 = env.fromElements("hello", "flink", "spark");// transformationsConnectedStreams<Integer, String> connectedStreams = dataStream1.connect(dataStream2);SingleOutputStreamOperator<String> res = connectedStreams.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer input1) throws Exception {return input1.toString();}@Overridepublic String map2(String input2) throws Exception {return input2;}});// sinkres.print();// executeenv.execute();}
}

运行结果

1
hello
2
flink
3
spark

CoMap, CoFlatMap

ConnectedStream → DataStream

将连接流转换为数据流,其中转换与map、flatMap类似

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {@Overridepublic Boolean map1(Integer value) {return true;}@Overridepublic Boolean map2(String value) {return false;}
});connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word);}}
});

将连接流转换为数据流,完整代码如下:

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;public class OperatorCoFlatMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStream<Integer> dataStream1 = env.fromElements(1, 2, 3);DataStream<String> dataStream2 = env.fromElements("hello world", "hello flink");// transformationsConnectedStreams<Integer, String> connectedStreams = dataStream1.connect(dataStream2);SingleOutputStreamOperator<String> res = connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) throws Exception {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) throws Exception {for (String word : value.split(" ")) {out.collect(word);}}});// sinkres.print();// executeenv.execute();}
}

运行结果

8> hello
8> flink
4> 1
7> hello
7> world
5> 2
6> 3

Cache

缓存转换的中间结果。 目前仅支持以批量执行模式运行的作业。 缓存中间结果是在第一次计算中间结果时延迟生成的,以便后续作业可以重用该结果。 如果缓存丢失,将使用原始转换重新计算。

DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();//缓存数据
cachedDataStream.print(); 

物理分区

  • 自定义分区
  • 随机分区
  • 重新缩放
  • 广播

自定义分区
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);

随机分区
dataStream.shuffle();

重新缩放
dataStream.rescale();

广播
dataStream.broadcast();

任务链和资源组

  • 启动新链
  • 禁用链接
  • 设置插槽共享组
启动新链

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

someStream.filter(...).map(...).startNewChain().map(...);

禁用链接

Do not chain the map operator.

someStream.map(...).disableChaining();

设置插槽共享组

someStream.filter(...).slotSharingGroup("name");

名称和描述

someStream.filter(...).setName("filter").setDescription("一些描述内容");


指定计算结果放置在何处(sink)

  • writeAsText(path):输出到文件中
  • writeAsCsv(...):输出csv文件中
  • print():打印到控制台
  • writeUsingOutputFormat()
  • writeToSocket:输出到Socket中
  • addSink:自定义sink,例如输出到kafka中

例如:

DataStreamSource<Integer> dataStream = env.fromElements(1, 2, 3);
dataStream.writeAsText("sinkout");
dataStream.print();


触发程序执行(execution)

  • execute:同步执行,会阻塞其他作业
execute()
execute(String jobName)
execute(StreamGraph streamGraph)

例如:

env.execute();

  • executeAsync:异步执行,不会阻塞其他作业
executeAsync()
executeAsync(String jobName)
executeAsync(StreamGraph streamGraph)

例如:

env.executeAsync();

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/9873.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小红书搞钱美学课-6.0升级版,账号搭建/爆款创作/工具实战/账号变现篇

让我们用视觉撬动流量 课程体系 334253课程权益(5周服务期) 3节账号运营基础课3节自媒体笔记创作课。4节封面设计实操课2次实操加餐分享5次作业指导(一对一)3次答疑直播 课程大纲 一、账号搭建篇 变现模板、精准定位 二、爆款创作篇爆款选题、首图、文案与脚本、快速涨粉…

redis入门学习

一、基础架构 一、应用场景 1、缓存&#xff1a;将后端数据库的热数据缓存到redis中&#xff0c;然后直接从内存中读取数据&#xff0c;提高响应速度。 2、消息队列 3、排行榜&#xff0c;一般用于游戏行业 二、基础结构 1、数据结构 sring&#xff08;字符串&#xff09…

【计算机毕业设计】springboot工资管理系统

人类现已迈入二十一世纪&#xff0c;科学技术日新月异&#xff0c;经济、资讯等各方面都有了非常大的进步&#xff0c;尤其是资讯与 网络技术的飞速发展&#xff0c;对政治、经济、军事、文化等各方面都有了极大的影响。 利用电脑网络的这些便利&#xff0c;发展一套工资管理系…

权力集中,效率提升,中心化模式的优势与挑战

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 &#x1f680; 转载自热榜文章&#x1f525;&#xff1a;探索设计模式的魅力&#xff1a;权力集中…

IO 5.10

在一个进程中&#xff0c;创建一个子线程。 主线程负责&#xff1a;向文件中写入数据 子线程负责&#xff1a;从文件中读取数据 要求使用线程的同步逻辑&#xff0c;保证一定在主线程向文件中写入数据成功之后&#xff0c;子线程才开始运行&#xff0c;去读取文件中的数据#incl…

第Ⅷ章-Ⅱ 组合式API使用

第Ⅷ章-Ⅱ 组合式API使用 provide与inject的使用vue 生命周期的用法编程式路由的使用vuex的使用获取DOM的使用setup语法糖setup语法糖的基本结构响应数据的使用其它语法的使用引入组件的使用 父组件传值的使用defineProps 父传子defineEmits 子传父 provide与inject的使用 pro…

学习java的继承

1.什么是继承 java中提供了一个关键字&#xff0c;extends&#xff0c;可以让一个类与另一个类建立起父子关系。 例如 public class B extends A { --- } 在这里&#xff0c;我们称A类为父类&#xff08;也被称为基类或者超类&#xff09;B类称为子类&#xff08;或者是派生…

debian10 (armbian) 配置CUPS 服务

更新apt apt-update安装相关软件 apt-get install ghostscript apt-get install dc apt-get install foomatic-db-engine apt-get install cups3.修改配置文件 nano /etc/cups/cupsd.conf Listen localhost:631改为 Listen 0.0.0.0:631 以下四段配置加入Allow All # Only li…

【智能优化算法】矮猫鼬优化算法(Dwarf Mongoose Optimization Algorithm,DMHO)

矮猫鼬优化算法(Dwarf Mongoose Optimization Algorithm,DMHO)是期刊“COMPUTER METHODS IN APPLIED MECHANICS AND ENGINEERING”&#xff08;IF 7.3&#xff09;的2022年智能优化算法 01.引言 矮猫鼬优化算法(Dwarf Mongoose Optimization Algorithm,DMHO)模仿矮猫鼬的觅食行…

天府锋巢直播产业基地构建成都电商直播高地

天府锋巢直播产业基地自成立以来&#xff0c;一直秉承着创新、协同、共赢的发展理念&#xff0c;吸引了众多直播企业纷纷入驻。随着直播产业的迅猛发展&#xff0c;改成都直播基地内的配套服务也显得尤为重要。本文将深入探讨入驻天府锋巢直播产业基地后&#xff0c;配套的直播…

错误处理机制——vba(vb.net)

程序出现错误时可采用如下错误处理机制&#xff1a;出错时跳到标签处&#xff0c;判断错误类型修正错误&#xff0c;重新返回正确标签处&#xff0c;继续运行程序。 代码如下&#xff1a; Private Sub Button2_Click(sender As Object, e As EventArgs) Handles Button2.Click…

数据分析的行为要求

数据分析的行为要求通常涉及多个方面&#xff0c;以确保分析的准确性、有效性和合规性。以下是一些关键的行为要求&#xff1a; 诚信、严谨和积极的职业态度&#xff1a; 保持诚实和透明的态度&#xff0c;确保数据分析结果的真实性和可信度。严谨对待数据&#xff0c;遵循科学…

Golang面向对象编程(一)

文章目录 结构体基本介绍结构体定义方式创建结构体变量结构体内存对齐结构体类型转换字段的Tag标签 方法基本介绍方法的定义和调用方法调用的传参机制String方法 结构体 基本介绍 基本介绍 Go支持面向对象编程特性&#xff0c;包括封装、继承和多态&#xff0c;但Go中没有类&a…

Linux——综合实验

要求 按照上面的架构部署一个简单的web节点所有的服务器使用DNS服务器作为自己的DNS服务器 就是/etc/reslov.conf 中nameserver的值必须是途中dns服务器的地址所有的数据库都是用mysql应用 nfs共享导出在客户端(web服务器上)使用autofs在自动挂载&#xff0c;或者写入/etc/fsta…

VirtualBox虚拟FreeBSD15显卡配置@Win10

VirtualBox虚拟FreeBSD15&#xff0c;准备把X桌面装上&#xff0c;但是常规一顿操作后pkg install xorg xfce4 &#xff0c;一开始startx直接黑屏&#xff0c;后来执行startx就卡在登录界面&#xff0c;而且只能ssh登录上去kill不能切换出来。 执行xrandr报错没有显示器&#x…

window10设置静态IP

右键桌面网络图标 点击属性 点击要查看的网络 点击详细信息 获得网络连接详细信息 右键WiFi符号 或者其他方式进入网络与internet中心 点击 WLAN 点击属性 点击编辑&#xff08;点击一个即可&#xff09; 选择手动将刚才的信息方进入即可 完成

MySQL变量的声明与使用

MySQL变量的声明与使用 1、标识符不能以数字开头 2、自能使用_或$符号&#xff0c;不允许使用其他符号。 3、不允许使用系统关键字 将赋值与查询结合 set userName 刘德华; select userName: 刘青云; # 将赋值与查询结合 查询变量/使用变量 select userName as 读取到的u…

TDN: Temporal Difference Networks for Efficient Action Recognition 论文阅读

TDN: Temporal Difference Networks for Efficient Action Recognition 论文阅读 Abstract1. Introduction2. Related work3. Temporal Difference Networks3.1. Overview3.2. Short-term TDM3.3. Long-term TDM3.4. Exemplar: TDN-ResNet 4. ExperimentsAblation studiesCompa…

抖音新店怎么对接达人?对接达人秘籍流程分享,让你学会找达人

大家好&#xff0c;我是电商花花。 新手怎么对接达人带货&#xff1f;这是我们新手商家 要考虑的问题。 很多新手抱怨自己新店铺不出单&#xff0c;没有销量&#xff0c;对接达人又怕达人看不上&#xff0c;没有达人愿意帮我带货&#xff0c;在面临这样的情况下不知道该怎么办…

【科研】常用的实验结果评价指标(1) —— R2(R-square)是什么?

常用的实验结果评价指标&#xff08;1&#xff09; —— R2(R-square)&#xff0c;可能为负数吗&#xff1f;&#xff01; 提示&#xff1a;先说概念&#xff0c;后续再陆续上代码 文章目录 常用的实验结果评价指标&#xff08;1&#xff09; —— R2(R-square)&#xff0c;可能…