Operators transform one or more DataStreams into a new DataStream.
Operators操作转换一个或多个DataStream到一个新的DataStream 。
filter function
Scala
object DataStreamTransformationApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentfilterFunction(env)env.execute("DataStreamTransformationApp")}def filterFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomNonParallelSourceFunction)data.map(x=>{println("received:" + x)x}).filter(_%2 == 0).print().setParallelism(1)}}
数据源选择之前的任意一个数据源即可。
这里的map中没有做任何实质性的操作,filter中将所有的数都对2取模操作,打印结果如下:
received:1 received:2 2 received:3 received:4 4 received:5 received:6 6 received:7 received:8 8
说明map中得到的所有的数据,而在filter中进行了过滤操作。
Java
public static void filterFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction());data.setParallelism(1).map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("received:"+value);return value;}}).filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return value % 2==0;}}).print().setParallelism(1);}
需要先使用data.setParallelism(1)然后再进行map操作,否则会输出多次。因为我们用的是JavaCustomParallelSourceFunction(),而当我们使用JavaCustomNonParallelSourceFunction时,默认就是并行度1,可以不用设置。
Union Function
Scala
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// filterFunction(env)unionFunction(env)env.execute("DataStreamTransformationApp")}def unionFunction(env: StreamExecutionEnvironment): Unit = {val data01 = env.addSource(new CustomNonParallelSourceFunction)val data02 = env.addSource(new CustomNonParallelSourceFunction)data01.union(data02).print().setParallelism(1)}
http://www.developcls.com/qa/d8c20a9e2ba34964a440f96b88730f2e.html
Union操作将两个数据集综合起来,可以一同处理,上面打印输出如下:
1 1 2 2 3 3 4 4
Java
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(environment);unionFunction(environment);environment.execute("JavaDataStreamTransformationApp");}public static void unionFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction());DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction());data1.union(data2).print().setParallelism(1);}
Split Select Function
Scala
split可以将一个流拆成多个流,select可以从多个流中进行选择处理的流。
def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)val split = data.split(new OutputSelector[Long] {override def select(value: Long): lang.Iterable[String] = {val list = new util.ArrayList[String]()if (value % 2 == 0) {list.add("even")} else {list.add("odd")}list}})split.select("odd","even").print().setParallelism(1)}
可以根据选择的名称来处理数据。
Java
public static void splitSelectFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction());SplitStream<Long> split = data.split(new OutputSelector<Long>() {@Overridepublic Iterable<String> select(Long value) {List<String> output = new ArrayList<>();if (value % 2 == 0) {output.add("odd");} else {output.add("even");}return output;}});split.select("odd").print().setParallelism(1);}