目录
Flink程序的基本构成
获得执行环境(environment)
加载/初始化数据(source)
基于文件
基于socket
基于集合
自定义
转换操作(transformation)
基本转换
物理分区
任务链和资源组
名称和描述
指定计算结果放置在何处(sink)
触发程序执行(execution)
Flink程序的基本构成
一个Flink程序的基本构成如下:
1.获得一个执行环境(environment)
2.加载/创建初始数据(source)
3.在此数据上指定转换(transformation)
4.指定将计算结果放置在何处(sink)
5.触发程序执行(execution)
获得执行环境(environment)
获得流处理执行环境的三种方式:
1.根据上下文实际情况的执行环境
StreamExecutionEnvironment.getExecutionEnvironment();
2.本地执行环境
StreamExecutionEnvironment.createLocalEnvironment();
3.远程执行环境
createRemoteEnvironment(String host, int port, String... jarFiles);
例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();StreamExecutionEnvironment remoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081,"path/to/jarfile");
通常情况下,直接使用getExecutionEnvironment()来获取执行环境,因为程序运行时根据上下文条件自动选择相应的环境。如果在IDE中执行程序,将返回本地的执行环境。如果将程序打成jar文件,并通过命令提交jar到flink集群,此时将返回flink集群环境。
加载/初始化数据(source)
基于文件
- readTextFile(path)
- readFile(fileInputFormat, path)
- readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
例如:
DataSource<String> lineDs = env.readTextFile("data/words.txt");
基于socket
socketTextStream(hostname, port)
例如:
DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);
基于集合
- fromCollection(Collection)
- fromCollection(Iterator, Class)
- fromElements(T ...)
- fromParallelCollection(SplittableIterator, Class)
- generateSequence(from, to)
例如:
DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(0, 1, 2));
DataStream<Integer> dataStream = env.fromElements(1, 0, 3, 0, 5);
DataStreamSource<Long> source1 = env.generateSequence(1, 10);
自定义
- 旧的方式:addSource(SourceFunction<OUT> function)
例如:读取kafka的数据
env.addSource(new FlinkKafkaConsumer<>(...))
- 新的方式:fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName)
例如:读取kafka的数据
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
转换操作(transformation)
转换操作:运算符将一个或多个数据流转换为新的数据流。
程序可以将多个转换组合成复杂的数据流拓扑结构。
本节描述了基本转换、应用这些转换后的有效物理分区以及对Flink运算符链的深入了解。
基本转换
- Map
- FlatMap
- Filter
- KeyBy
- Reduce
- Window
- WindowAll
- Window Apply
- WindowReduce
- Union
- Window Join
- Interval Join
- Window CoGroup
- Connect
- CoMap, CoFlatMap
- Cache
在Flink WordCount工程 的基础上操作,把以下案例代码放在org.example.transformations
包或者其他自定义的包下。
Map
DataStream → DataStream
对流数据里的每个元素进行转换,得到另一个流数据。
DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return 2 * value;} });
将数据流里的每个元素乘以2,得到新的数据流并输出,完整代码如下:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*Takes one element and produces one element.A map function that doubles the values of the input stream:*/
public class OperatorMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5);// transformationsSingleOutputStreamOperator<Integer> data = dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}});// sinkdata.print();// executeenv.execute();}
}
运行结果
2> 10
7> 4
6> 2
8> 6
1> 8
FlatMap
DataStream → DataStream
将数据流中的每个元素转换得到0个,1个 或 多个元素
dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}} });
把句子中的单词取出来,完整代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*Takes one element and produces zero, one, or more elements.A flatmap function that splits sentences to words:*/
public class OperatorFlatMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStream<String> dataStream = env.fromElements("hello world", "hello flink", "hello hadoop");// transformationsSingleOutputStreamOperator<String> data = dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String word : value.split(" ")) {out.collect(word);}}});// sinkdata.print();// executeenv.execute();}
}
运行结果
5> hello
7> hello
6> hello
7> hadoop
5> world
6> flink
Filter
DataStream → DataStream
为每个元素计算一个布尔函数,并保留函数返回true的元素。
dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value != 0;} });
输出不是0的元素,完整代码如下:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*Evaluates a boolean function for each element and retains those for which the function returns true.A filter that filters out zero values*/
public class OperatorFilter {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStream<Integer> dataStream = env.fromElements(1, 0, 3, 0, 5);// transformationsSingleOutputStreamOperator<Integer> data = dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value != 0;}});// sinkdata.print();// executeenv.execute();}
}
运行结果
8> 5
6> 3
4> 1
KeyBy
DataStream → KeyedStream
在逻辑上将流划分为不相交的分区。具有相同键的所有记录都被分配到同一个分区。在内部,keyBy()是通过散列分区实现的。
dataStream.keyBy(value -> value.getSomeKey()); dataStream.keyBy(value -> value.f0);
根据key进行分组,并对值进行求和,完整代码如下:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;/*Logically partitions a stream into disjoint partitions.All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning.There are different ways to specify keys.*/
public class OperatorKeyBy {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceList<Tuple2<String, Integer>> dataSource = Arrays.asList(Tuple2.of("hello", 3),Tuple2.of("flink", 2),Tuple2.of("hadoop", 4),Tuple2.of("flink", 5));DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromCollection(dataSource);// transformationsSingleOutputStreamOperator<Tuple2<String, Integer>> data = dataStream.keyBy(value -> value.f0).sum(1);// sinkdata.print();// executeenv.execute();}
}
运行结果
3> (hello,3)
7> (flink,2)
8> (hadoop,4)
7> (flink,7)
Reduce
KeyedStream → DataStream
对键控数据流进行“滚动”缩减。将当前元素与上一个缩减值组合并发出新值。
keyedStream.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2)throws Exception {return value1 + value2;} });
对有相同key的值进行规约运算,这里做求和运算,完整代码如下:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;/*KeyedStream → DataStreamA “rolling” reduce on a keyed data stream.Combines the current element with the last reduced value and emits the new value.A reduce function that creates a stream of partial sums:*/
public class OperatorReduce {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceList<Tuple2<String, Integer>> dataSource = Arrays.asList(Tuple2.of("hello", 3),Tuple2.of("flink", 2),Tuple2.of("hadoop", 3),Tuple2.of("flink", 5),Tuple2.of("hello", 1),Tuple2.of("hadoop", 1));DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromCollection(dataSource);// transformationsKeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(value -> value.f0);SingleOutputStreamOperator<Tuple2<String, Integer>> data = keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, (value1.f1 + value2.f1));}});// sinkdata.print();// executeenv.execute();}
}
运行结果
7> (flink,2)
8> (hadoop,3)
3> (hello,3)
7> (flink,7)
8> (hadoop,4)
3> (hello,4)
Window
KeyedStream → WindowedStream
可以在已分区的KeyedStreams上定义窗口(Windows)。窗口根据某些特性(例如,最后10秒内到达的数据)对每个键中的数据进行分组。
dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10)));
可以对窗口的数据进行计算,例如:计算10秒滚动窗口中每个单词出现的次数,案例代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WindowWordCount {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsSingleOutputStreamOperator<Tuple2<String, Integer>> dataStream = env.socketTextStream("node1", 7777).flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(1);// sinkdataStream.print();// executionenv.execute("Window WordCount");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}
}
启动nc监听
[hadoop@node1 ~]$ nc -lk 7777
运行flink程序
发送测试数据
[hadoop@node1 ~]$ nc -lk 7777 hello world hello flink hello hadoop hello java hello
运行结果
5> (world,1) 8> (hadoop,1) 3> (hello,3) 7> (flink,1) 2> (java,1) 3> (hello,1) 3> (hello,1)
注意:输入数据的速度不一样,会导致数据分配到不同的窗口,计算出的结果也会不一样。
WindowAll
DataStream → AllWindowedStream
可以在常规数据流上定义窗口。 Windows 根据某些特征(例如,最近 10 秒内到达的数据)对所有流事件进行分组.
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));
注意:很多情况下WindowAll是一种非并行转换。所有记录将被收集到同一个任务中进行计算,数据量大可能会出现OOM问题。
把所有窗口中的数据进行规约运算,这里使用逗号来拼接每个单词,完整代码如下:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*Windows can be defined on regular DataStreams.Windows group all the stream events according to some characteristicThis is in many cases a non-parallel transformation. (非并行)All records will be gathered in one task for the windowAll operator.*/
public class OperatorWindowAll {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsSingleOutputStreamOperator<String> result = env.socketTextStream("node1", 7777).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<String>() {@Overridepublic String reduce(String value1, String value2) throws Exception {return value1 + "," + value2;}});// sinkresult.print();// executeenv.execute();}
}
测试数据
[hadoop@node1 ~]$ nc -lk 7777 hello world hadoop flink hello
运行结果
4> hello,world 5> hadoop,flink,hello
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将通用函数应用到整个窗口。
注意:如果使用 windowAll 转换,则需要使用 AllWindowFunction。
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {public void apply (Tuple tuple,Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));} }); // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {public void apply (Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));} });
对窗口内元素根据key相同进行求和运算,完整代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class OperatorWindowApply {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsDataStreamSource<String> dataStream = env.socketTextStream("node1", 7777);WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = dataStream.flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<Integer> applyStream = windowedStream.apply(new WindowFunction<Tuple2<String, Integer>, Integer, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {int sum = 0;for (Tuple2<String, Integer> value : values) {sum += value.f1;}out.collect(new Integer(sum));}});// sinkapplyStream.print();// executeenv.execute();}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}
}
发送测试数据
[hadoop@node1 ~]$ nc -lk 7777 hello world hello hadoop hello flink flink
运行结果
5> 1 3> 1 3> 2 7> 2 8> 1
注意:输入速度不一样,导致数据分配到不同的窗口,运行结果也会不一样。
分析结果
第一行hello world在一个窗口,每个单词都出现1次,所以输出1 、 1
第二行、第三行、第四行 在同一窗口,hello出现2次, flink出现2次, hadoop出现一次,所以输出 2 、 2、 1
WindowReduce
WindowedStream → DataStream
将函数式Reduce函数应用于窗口并返回Reduce后的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);} });
使用Reduce实现词频统计,完整代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class OperatorWindowReduce {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsDataStreamSource<String> dataStream = env.socketTextStream("node1", 7777);WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = dataStream.flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);}});// sinkresult.print();// executeenv.execute();}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}
}
测试数据
[hadoop@node1 ~]$ nc -lk 7777 hello hello world hello flink flink flink hadoop hadoop hello
运行结果
5> (world,1) 3> (hello,2) 7> (flink,3) 3> (hello,1) 8> (hadoop,2) 3> (hello,1)
Union
DataStream* → DataStream
两个或多个相同类型的数据流联合创建一个包含所有流中所有元素的新流。
dataStream.union(otherStream1, otherStream2, ...);
两个相同类型的数据流联结,完整代码如下:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*
Union of two or more data streams creating a new stream containing all the elements from all the streams.
Note: If you union a data stream with itself you will get each element twice in the resulting stream.*/
public class OperatorUnion {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// sourceDataStream<Integer> dataStream1 = env.fromElements(1, 2, 3);DataStream<Integer> dataStream2 = env.fromElements(4, 5, 6);// transformationsDataStream<Integer> res = dataStream1.union(dataStream2);// sinkres.print();// executeenv.execute();}
}
运行结果
1
2
3
4
5
6
Window Join
DataStream,DataStream → DataStream
连接给定键和公共窗口上的两个数据流
dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new JoinFunction () {...});
两个数据流的窗口联结,案例完整代码如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class OperatorWindowJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4),Tuple2.of("c", 12)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer,Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0).equalTo(r2 -> r2.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}
运行结果
(a,1)<----->(a,1,1) (a,2)<----->(a,1,1) (b,3)<----->(b,2,1) (c,12)<----->(c,14,1)
Interval Join
KeyedStream,KeyedStream → DataStream
在给定时间间隔内使用公共key连接两个KeyedStream的两个元素 e1 和 e2,以便 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound。
keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // 时间下限,时间上限.upperBoundExclusive(true) // 可选项.lowerBoundExclusive(true) // 可选项.process(new IntervalJoinFunction() {...});
间隔连接,完整代码如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class OperatorIntervalJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);//调用 interval joinks1.intervalJoin(ks2)// 连接时间间隔.between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {out.collect(left + "<------>" + right);}}).print();env.execute();}
}
运行结果
(a,1)<------>(a,1,1) (a,2)<------>(a,1,1) (b,3)<------>(b,2,1)
Window CoGroup
DataStream,DataStream → DataStream
将给定键和公共窗口上的两个数据流联合分组。
dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new CoGroupFunction () {...});
两个数据流联合分组,完整代码如下:
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.example.jiaocai.chapter5.CoGroupExample;public class OperatorWindowCoGroup {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> socketSource1 = env.socketTextStream("node1", 7777);DataStream<String> socketSource2 = env.socketTextStream("node1", 8888);DataStream<Tuple2<String, Integer>> input1 = socketSource1.map(line -> {String[] arr = line.split(" ");String id = arr[0];int t = Integer.parseInt(arr[1]);return Tuple2.of(id, t);}).returns(Types.TUPLE(Types.STRING, Types.INT));DataStream<Tuple2<String, Integer>> input2 = socketSource2.map(line -> {String[] arr = line.split(" ");String id = arr[0];int t = Integer.parseInt(arr[1]);return Tuple2.of(id, t);}).returns(Types.TUPLE(Types.STRING, Types.INT));DataStream<String> coGroupResult = input1.coGroup(input2).where(i1 -> i1.f0).equalTo(i2 -> i2.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupExample.MyCoGroupFunction());coGroupResult.print();env.execute("window cogroup function");}public static class MyCoGroupFunction implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {@Overridepublic void coGroup(Iterable<Tuple2<String, Integer>> input1, Iterable<Tuple2<String, Integer>> input2, Collector<String> out) {input1.forEach(element -> System.out.println("input1 :" + element.f1));input2.forEach(element -> System.out.println("input2 :" + element.f1));}}
}
测试数据
[hadoop@node1 ~]$ nc -lk 7777 hello 2 hello 1 [hadoop@node1 ~]$ nc -lk 8888 hello 3 hello 4
运行结果
input1 :2 input1 :1 input2 :3 input2 :4
Connect
DataStream,DataStream → ConnectedStream
“连接”两个保留其类型的数据流,连接允许两个流之间共享状态。两个流的数据类型可以不一样。
DataStream<Integer> someStream = //... DataStream<String> otherStream = //...ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
connect连接后,得到ConnectedStreams流,对于ConnectedStreams流转换时需要实现CoMapFunction或CoFlatMapFunction接口,重写里面的两个方法分别来处理两个流数据,也就是第一个方法处理第一个流的数据,第二个方法处理第二个流的数据。传入的数据类型如下:
// IN1 表示第一个流的数据类型 // IN2 表示第二个流的数据类型 // IN3 表示处理后输出流的数据类型 public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
两个不同数据类型的数据流的联结,完整代码如下:
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class OperatorConnect {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// sourceDataStream<Integer> dataStream1 = env.fromElements(1, 2, 3);DataStream<String> dataStream2 = env.fromElements("hello", "flink", "spark");// transformationsConnectedStreams<Integer, String> connectedStreams = dataStream1.connect(dataStream2);SingleOutputStreamOperator<String> res = connectedStreams.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer input1) throws Exception {return input1.toString();}@Overridepublic String map2(String input2) throws Exception {return input2;}});// sinkres.print();// executeenv.execute();}
}
运行结果
1
hello
2
flink
3
spark
CoMap, CoFlatMap
ConnectedStream → DataStream
将连接流转换为数据流,其中转换与map、flatMap类似
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {@Overridepublic Boolean map1(Integer value) {return true;}@Overridepublic Boolean map2(String value) {return false;} });connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word);}} });
将连接流转换为数据流,完整代码如下:
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;public class OperatorCoFlatMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStream<Integer> dataStream1 = env.fromElements(1, 2, 3);DataStream<String> dataStream2 = env.fromElements("hello world", "hello flink");// transformationsConnectedStreams<Integer, String> connectedStreams = dataStream1.connect(dataStream2);SingleOutputStreamOperator<String> res = connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) throws Exception {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) throws Exception {for (String word : value.split(" ")) {out.collect(word);}}});// sinkres.print();// executeenv.execute();}
}
运行结果
8> hello
8> flink
4> 1
7> hello
7> world
5> 2
6> 3
Cache
缓存转换的中间结果。 目前仅支持以批量执行模式运行的作业。 缓存中间结果是在第一次计算中间结果时延迟生成的,以便后续作业可以重用该结果。 如果缓存丢失,将使用原始转换重新计算。
DataStream<Integer> dataStream = //... CachedDataStream<Integer> cachedDataStream = dataStream.cache();//缓存数据 cachedDataStream.print();
物理分区
- 自定义分区
- 随机分区
- 重新缩放
- 广播
自定义分区
dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0);
随机分区
dataStream.shuffle();
重新缩放
dataStream.rescale();
广播
dataStream.broadcast();
任务链和资源组
- 启动新链
- 禁用链接
- 设置插槽共享组
启动新链
Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
someStream.filter(...).map(...).startNewChain().map(...);
禁用链接
Do not chain the map operator.
someStream.map(...).disableChaining();
设置插槽共享组
someStream.filter(...).slotSharingGroup("name");
名称和描述
someStream.filter(...).setName("filter").setDescription("一些描述内容");
指定计算结果放置在何处(sink)
- writeAsText(path):输出到文件中
- writeAsCsv(...):输出csv文件中
- print():打印到控制台
- writeUsingOutputFormat()
- writeToSocket:输出到Socket中
- addSink:自定义sink,例如输出到kafka中
例如:
DataStreamSource<Integer> dataStream = env.fromElements(1, 2, 3); dataStream.writeAsText("sinkout"); dataStream.print();
触发程序执行(execution)
- execute:同步执行,会阻塞其他作业
execute() execute(String jobName) execute(StreamGraph streamGraph)
例如:
env.execute();
- executeAsync:异步执行,不会阻塞其他作业
executeAsync() executeAsync(String jobName) executeAsync(StreamGraph streamGraph)
例如:
env.executeAsync();
完成!enjoy it!