Flink中的分流
在Flink中将数据流切分为多个子数据流,子数据流称为”旁路输出数据流“。
拆分流数据的方式
- Split,已经废弃,不推荐使用
- Fliter
- SideOut,推荐使用
Fliter分流的Java实现
public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 指标明细DataStream<String> detailMessage = KafkaConfigUtil.buildSource(env).map((MapFunction<String, String>) kafkaMessage -> {JSONObject jsonobject = null;try {jsonobject = JSONObject.parseObject(kafkaMessage);} catch (Exception e) {LOG.warn("报文格式错误:{}", kafkaMessage);}if (null == jsonobject || jsonobject.isEmpty()) {LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));} else {if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))&& !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));}}return JSONObject.toJSONString(jsonobject);});// 将原始流中包含demo的数据筛选出来DataStream<String> diagnosisMessages = detailMessage.filter((FilterFunction<String>) kafkaMessage -> (kafkaMessage.contains("demo"))).map((MapFunction<String, String>) sparkMessage -> {// 为达到实验效果,进行日志输出LOG.info("[is demo message]:{}", sparkMessage);return sparkMessage;});env.execute("Flink Streaming Java API Skeleton");}
SideOut分流的Java实现
public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();System.out.println("【SideOutputDemo】");// 指标明细DataStream<String> mainMessage = KafkaConfigUtil.buildSource(env).map((MapFunction<String, String>) kafkaMessage -> {JSONObject jsonobject = null;try {jsonobject = JSONObject.parseObject(kafkaMessage);} catch (Exception e) {LOG.warn("报文格式错误:{}", kafkaMessage);}if (null == jsonobject || jsonobject.isEmpty()) {LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));} else {if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))&& !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));}}return JSONObject.toJSONString(jsonobject);});// 定义一个切分(旁路输出)final OutputTag<String> outputTag = new OutputTag<String>("Spark_END") {};SingleOutputStreamOperator<String> sp = mainMessage.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String s, Context context, Collector<String> collector) throws Exception {// 向常规流(主流)中添加数据collector.collect(s);// 向旁路输出流中添加数据if (s.contains(AppPhaseEnum.Spark_APP_End.getValue())) {context.output(outputTag, s);}}});sp.map((MapFunction<String, String>) sparkMessage -> {LOG.info("主流的数据: {}", sparkMessage);return sparkMessage;});DataStream<String> tag = sp.getSideOutput(outputTag);tag.map((MapFunction<String, String>) sparkMessage -> {LOG.info("旁路[{}]的数据: {}", outputTag.getId(), sparkMessage);return sparkMessage;});env.execute("Flink Streaming Java API Skeleton");}
SideOutPut 是 Flink 框架推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:
-
为每个分支流定义一个 SideOutPut。
-
为定义好的 SideOutPut发出数据。只有以下特定的函数才能通过Context上下文对象,向旁路输出的SideOutPut发送数据。
- ProcessFunction:处理函数,单流输入函数
- KeyedProcessFunction:处理函数,单流输入函数
- CoProcessFunction:处理函数,双流流输入函数
- KeyedCoProcessFunction:处理函数,双流流输入函数
- ProcessWindowFunction:窗口函数,全量计算函数
- ProcessAllWindowFunction:窗口函数,全量计算函数,它与 ProcessWindowFunction 类似,但是它会对窗口中的所有数据进行处理,而不是仅处理触发窗口计算的数据。
例子中使用ProcessFunction实现流拆分。
-
根据SideOutPut 的ID标识获取旁路输出流,进行数据继续处理。
拆分方式 | 对比 |
---|---|
Split | 不支持链式拆分,切分得到的流,是不能进行再次切分的 |
Fliter | 多分支流,需要多次遍历原始流进行筛选。浪费集群的资源 |
SideOut | 以多次进行拆分的,支持链式拆分。 |