【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】
- 1)使用 filter 简单实现
- 2)使用侧输出流实现
所谓“分流”
,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
1)使用 filter 简单实现
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
public class SplitByFilterDemo {public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);/*** TODO 使用filter来实现分流效果* 缺点: 同一个数据,要被处理两遍(调用两次filter)*/SingleOutputStreamOperator<String> even = socketDS.filter(value -> Integer.parseInt(value) % 2 == 0);SingleOutputStreamOperator<String> odd = socketDS.filter(value -> Integer.parseInt(value) % 2 == 1);even.print("偶数流");odd.print("奇数流");env.execute();}
}
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
2)使用侧输出流实现
关于处理函数中侧输出流的用法,我们已经在 flatmap 课节做了详细介绍。简单来说,只需要调用上下文 ctx 的 .output() 方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”
(OutputTag),指定了侧输出流的 id 和类型。
代码实现:将 WaterSensor 按照 id 类型进行分流。
准备好自定义的 MapFunction:
public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}
}
实现:
public class SideOutputDemo {public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());/*** TODO 使用侧输出流 实现分流* 需求: watersensor的数据,s1、s2的数据分别分开** TODO 总结步骤:* 1、使用 process算子* 2、定义 OutputTag对象* 3、调用 ctx.output* 4、通过主流 获取 测流*//*** 创建OutputTag对象* 第一个参数: 标签名* 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation*/OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<WaterSensor> process = sensorDS.process(new ProcessFunction<WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {String id = value.getId();if ("s1".equals(id)) {// 如果是 s1,放到侧输出流s1中/*** 上下文ctx 调用ouput,将数据放入侧输出流* 第一个参数: Tag对象* 第二个参数: 放入侧输出流中的 数据*/ctx.output(s1Tag, value);} else if ("s2".equals(id)) {// 如果是 s2,放到侧输出流s2中ctx.output(s2Tag, value);} else {// 非s1、s2的数据,放到主流中out.collect(value);}}});// 从主流中,根据标签 获取 侧输出流SideOutputDataStream<WaterSensor> s1 = process.getSideOutput(s1Tag);SideOutputDataStream<WaterSensor> s2 = process.getSideOutput(s2Tag);// 打印主流process.print("主流-非s1、s2");//打印 侧输出流s1.printToErr("s1");s2.printToErr("s2");env.execute();}
}
要点:
1、使用 process();
(是最底层 API)。
2、process 每次处理一条数据
。
3、定义 OutputTag 对象:
(1)第一个参数:标签名
。
(2)第二个参数:放入侧输出流中的数据的类型,Typeinformation
。
4、调用 ctx.output();
5、通过主流获取测流
。