Flink--API 之Transformation-转换算子的使用解析

目录

一、常用转换算子详解

(一)map 算子

(二)flatMap 算子

(三)filter 算子

(四)keyBy 算子

元组类型

POJO

(五)reduce 算子

二、合并与连接操作

(一)union 算子

(二)connect 算子

三、侧输出流(Side Outputs)

四、总结


        在大数据处理领域,Apache Flink 凭借其强大的流处理和批处理能力备受青睐。而转换算子作为 Flink 编程模型中的关键部分,能够对数据进行灵活多样的处理操作,满足各种复杂业务场景需求。本文将深入介绍 Flink 中常见的转换算子,包括 map、flatMap、filter、keyBy、reduce 等,并结合详细代码示例讲解其使用方法,同时探讨 union、connect 等合并连接操作以及侧输出流等特性,帮助读者全面掌握 Flink 转换算子的精髓。

一、常用转换算子详解

(一)map 算子

功能概述
        map 算子主要用于对输入流中的每个元素进行一对一的转换操作,基于用户自定义的映射逻辑将输入元素转换为新的输出元素。

代码示例
假设我们有一份访问日志数据,格式如下:

86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css

        我们要将其转换为一个 LogBean 对象,包含访问 ip、用户 userId、访问时间戳 timestamp、访问方法 method、访问路径 path 等字段。

假如需要用到日期工具类,可以导入lang3包:

<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version>
</dependency>

以下是代码实现:


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;public class MapDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 此处也可以将数据放入到tuple中,tuple可以支持到tuple25DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];int userId = Integer.valueOf(arr[1]);String createTime = arr[2];// 如何将一个时间字符串变为时间戳// 17/05/2015:10:05:30/*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = simpleDateFormat.parse(createTime);long timeStamp = date.getTime();*/// 要想使用这个common.lang3 下的工具类,需要导入包Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");long timeStamp = date.getTime();String method = arr[3];String path = arr[4];LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);return logBean;}});//4. sink-数据输出mapStream.print();//5. execute-执行env.execute();}
}

        在上述代码中,通过 map 函数的自定义逻辑,将每行日志字符串按空格拆分后,进行相应字段提取与时间戳转换,最终封装成 LogBean 对象输出。

第二个版本:

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;@Data
@AllArgsConstructor
class LogBean{private String ip;      // 访问ipprivate int userId;     // 用户idprivate long timestamp; // 访问时间戳private String method;  // 访问方法private String path;    // 访问路径
}
public class Demo04 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split("\\s+");//时间戳转换  17/05/2015:10:06:53String time = arr[2];SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = format.parse(time);long timeStamp = date.getTime();return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);}});//4. sink-数据输出map.print();//5. execute-执行env.execute();}
}

(二)flatMap 算子

功能概述
        flatMap 算子可以将输入流中的每个元素转换为零个、一个或多个输出元素。它适用于需要对输入元素进行展开、拆分等操作的场景。

代码示例
        假设有数据格式为“张三,苹果手机,联想电脑,华为平板”这样的文本文件,我们要将其转换为“张三有苹果手机”“张三有联想电脑”“张三有华为平板”等形式。

flatmap.log文件如:

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

代码如下:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");//3. transformation-数据处理转换DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {//张三,苹果手机,联想电脑,华为平板String[] arr = line.split(",");String name = arr[0];for (int i = 1; i < arr.length; i++) {String goods = arr[i];collector.collect(name+"有"+goods);}}});//4. sink-数据输出flatMapStream.print();//5. execute-执行env.execute();}
}

        这里在 flatMap 函数内部,按逗号拆分每行数据,遍历拆分后的数组(除第一个元素作为名称外),通过 collector 将新组合的字符串收集输出。

(三)filter 算子

功能概述
        filter 算子依据用户定义的过滤条件,对输入流元素进行筛选,满足条件的元素继续向下游传递,不满足的则被过滤掉。

代码示例
        读取map算子中的访问日志数据,过滤出访问 IP 是 83.149.9.216 的访问日志,代码实现如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;public class FilterDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");// 数据处理转换DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {String ip = line.split(" ")[0];return ip.equals("83.149.9.216");}});// 数据输出filterStream.print();// 执行env.execute();}
}

        在 filter 函数中,通过拆分每行日志获取 IP 地址,并与目标 IP 进行比较决定是否保留该条日志数据。

(四)keyBy 算子

功能概述
        keyBy 算子在流处理中用于对数据按照指定的键进行分组,类似于 SQL 中的 group by,后续可基于分组进行聚合等操作,支持对元组类型 POJO 类型数据按不同方式指定分组键。

流处理中没有groupBy,而是keyBy

KeySelector对象可以支持元组类型,也可以支持POJO[Entry、JavaBean]

元组类型

单个字段keyBy

//用字段位置(已经被废弃)
wordAndOne.keyBy(0)//用字段表达式
wordAndOne.keyBy(v -> v.f0)

多个字段keyBy

//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});

类似于sql中的group by

select sex,count(1) from student group by sex;

group by 后面也可以跟多个字段进行分组,同样 keyBy 也支持使用多个列进行分组 

POJO

public class PeopleCount {private String province;private String city;private Integer counts;public PeopleCount() {}//省略其他代码。。。
}

单个字段keyBy

source.keyBy(a -> a.getProvince());

多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});

代码示例
        假设有数据表示不同球类的数量,格式为 Tuple2(球类名称,数量),如 Tuple2.of("篮球", 1) 等,需求是统计篮球、足球各自的总数量。代码如下:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("篮球", 1),Tuple2.of("篮球", 2),Tuple2.of("篮球", 3),Tuple2.of("足球", 3),Tuple2.of("足球", 2),Tuple2.of("足球", 3));// 数据处理转换/*KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});*/KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(tuple -> tuple.f0);keyedStream.sum(1).print();// 执行env.execute();}
}

这里通过 lambda 表达式指定 Tuple2 的第一个元素(球类名称)作为分组键,对数据分组后使用 sum 聚合统计每种球类的数量总和。

pojo演示

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-13 14:32:52**/
public class Demo07 {@Data@AllArgsConstructorstatic class Ball{private String ballName;private int num;}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出// 以下演示数据是pojoDataStreamSource<Ball> ballSource = env.fromElements(new Ball("篮球", 1),new Ball("篮球", 2),new Ball("篮球", 3),new Ball("足球", 3),new Ball("足球", 2),new Ball("足球", 3));ballSource.keyBy(ball -> ball.getBallName()).print();ballSource.keyBy(new KeySelector<Ball, String>() {@Overridepublic String getKey(Ball ball) throws Exception {return ball.getBallName();}});//5. execute-执行env.execute();}
}

(五)reduce 算子

功能概述
        reduce 算子可对一个数据集或一个分组进行聚合计算,将多个元素逐步合并为一个最终元素,常用于求和、求最值等场景,sum 底层其实也是基于 reduce 实现。

代码示例
        读取访问日志,统计每个 IP 地址的访问 PV 数量,代码如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;public class ReduceDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 此处也可以将数据放入到tuple中,tuple可以支持到tuple25DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];int userId = Integer.valueOf(arr[1]);String createTime = arr[2];// 如何将一个时间字符串变为时间戳// 17/05/2015:10:05:30/*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = simpleDateFormat.parse(createTime);long timeStamp = date.getTime();*/// 要想使用这个common.lang3 下的工具类,需要导入包Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");long timeStamp = date.getTime();String method = arr[3];String path = arr[4];LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);return logBean;}});DataStream<Tuple2<String, Integer>> mapStream2 = mapStream.map(new MapFunction<LogBean, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(LogBean logBean) throws Exception {return Tuple2.of(logBean.getIp(), 1);}});//4. sink-数据输出KeyedStream<Tuple2<String,Integer>, String> keyByStream = mapStream2.keyBy(tuple -> tuple.f0);// sum的底层是 reduce// keyByStream.sum(1).print();//  [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {// t1 => ("10.0.0.1",10)// t2 => ("10.0.0.1",1)return Tuple2.of(t1.f0,  t1.f1 + t2.f1);}}).print();//5. execute-执行env.execute();}
}

        先将日志数据处理成包含 IP 和计数 1 的 Tuple2 格式,按 IP 分组后,在 reduce 函数中对相同 IP 的计数进行累加,得到每个 IP 的访问 PV 数。

二、合并与连接操作

(一)union 算子

功能概述
        union 算子能够合并多个同类型的流,将多个 DataStream 合并成一个 DataStream,但注意合并时流的类型必须一致,且不会对数据去重

代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionConnectDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");// 合并流DataStream<String> unionStream = stream1.union(stream2);unionStream.print();// 执行env.execute();}
}

上述代码将两个包含字符串元素的流进行合并输出。

(二)connect 算子

功能概述
        connect 算子可连接 2 个不同类型的流,连接后形成 ConnectedStreams,内部两个流保持各自的数据和形式独立,之后需通过自定义处理逻辑(如 CoMapFunction 等)处理后再输出,且处理后的数据类型需相同。

代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class UnionConnectDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<Long> stream3 = env.fromSequence(1, 10);// 连接流ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);// 处理流DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String value) throws Exception {return value;}@Overridepublic String map2(Long value) throws Exception {return Long.toString(value);}});// 输出mapStream.print();// 执行env.execute();}
}

        这里连接了一个字符串流和一个长整型序列流,通过 CoMapFunction 分别将字符串按原样、长整型转换为字符串后合并输出。

三、侧输出流(Side Outputs)

功能概述
        侧输出流可根据自定义规则对输入流数据进行分流,将满足不同条件的数据输出到不同的“分支”流中,方便后续针对性处理。

代码示例
以下示例对流中的数据按照奇数和偶数进行分流并获取分流后的数据:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SideOutputExample {public static void main(String[] args) throws Exception {// 1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 侧道输出流// 从0到100生成一个Long类型的数据流作为示例数据DataStreamSource<Long> streamSource = env.fromSequence(0, 100);// 定义两个标签,用于区分偶数和奇数的侧输出流OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));// 2. source-加载数据// 使用ProcessFunction来处理每个元素,决定将其输出到哪个侧输出流或者主输出流SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {// value代表每一个数据,判断是否为偶数if (value % 2 == 0) {// 如果是偶数,将其输出到偶数的侧输出流中ctx.output(tag_even, value);} else {// 如果是奇数,将其输出到奇数的侧输出流中ctx.output(tag_odd, value);}}});// 3. 获取奇数的所有数据,从主输出流中获取对应标签(tag_odd)的侧输出流数据DataStream<Long> sideOutput = process.getSideOutput(tag_odd);sideOutput.print("奇数:");// 获取所有偶数数据,同样从主输出流中获取对应标签(tag_even)的侧输出流数据DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);sideOutput2.print("偶数:");// 4. sink-数据输出(这里通过打印展示了侧输出流的数据,实际应用中可对接其他下游操作)// 5. 执行任务env.execute();}
}

在上述代码中:

  • 首先,我们创建了 StreamExecutionEnvironment 来准备 Flink 的执行环境,并设置了运行模式为自动(AUTOMATIC)。
  • 接着,通过 fromSequence 方法生成了一个从 0 到 100 的 Long 类型的数据流作为示例的输入数据。
  • 然后,定义了两个 OutputTag,分别用于标记偶数和奇数的侧输出流,并且指定了对应的类型信息(这里都是 Long 类型)。
  • 在 process 函数中,针对每个输入的元素(value),通过判断其是否能被 2 整除来决定将其输出到对应的侧输出流中。如果是偶数,就通过 ctx.output(tag_even, value) 将其发送到偶数侧输出流;如果是奇数,则通过 ctx.output(tag_odd, value) 发送到奇数侧输出流。
  • 最后,通过 getSideOutput 方法分别获取奇数和偶数侧输出流的数据,并进行打印输出,以此展示了侧输出流的分流及获取数据的完整流程。实际应用场景中,这些侧输出流的数据可以进一步对接不同的业务逻辑进行相应处理,比如奇数流进行一种聚合计算,偶数流进行另一种统计分析等。

        这样,利用侧输出流的特性,我们可以很灵活地根据自定义条件对数据进行分流处理,满足多样化的数据处理需求。

四、总结

        本文围绕 Apache Flink 转换算子展开,旨在助力读者洞悉其核心要点与多样应用,以灵活处理复杂业务数据。

  1. 常用转换算子
    • map:基于自定义逻辑,对输入元素逐一变换,如剖析日志字符串、提取关键信息并转换格式,封装为定制对象输出,契合精细化处理需求。
    • flatMap:将输入元素按需拆分为零个及以上输出元素,凭借拆分、重组操作,挖掘数据深层价值,适配展开、细化数据场景。
    • filter:依设定条件甄别筛选,精准把控数据流向,剔除不符元素,保障下游数据贴合业务关注点。
    • keyBy:类比 SQL 分组,依指定键归拢数据,为聚合奠基,以不同方式适配多元数据类型,实现分类统计。
    • reduce:聚焦数据集或分组,渐进聚合,可求和、求最值等,sum 操作底层亦仰仗于此,高效整合数据得出汇总结果。
  2. 合并与连接操作
    • union:整合同类型多流为一,操作简便,唯需留意类型一致,虽不除重但拓宽数据维度。
    • connect:桥接不同类型流,借自定义逻辑协同处理,统一输出类型,达成跨流数据交互融合。
  3. 侧输出流特性:基于自定义规则巧妙分流,借OutputTag标记、ProcessFunction判定,将数据导向不同 “分支”,按需对接各异业务逻辑,于复杂场景中尽显灵活应变优势,全方位满足数据处理多元化诉求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/887571.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Top 10 Tools to Level Up Your Prompt Engineering Skills

此文章文字是转载翻译&#xff0c;图片是自已用AI 重新生成的。文字内容来自 https://www.aifire.co/p/top-10-ai-prompt-engineering-tools 供记录学习使用。 Introduction to AI Prompt Engineering AI Prompt Engineering 简介 1&#xff0c;Prompt Engineering 提示工程…

Rust语言俄罗斯方块(漂亮的界面案例+详细的代码解说+完美运行)

tetris-demo A Tetris example written in Rust using Piston in under 500 lines of code 项目地址: https://gitcode.com/gh_mirrors/te/tetris-demo 项目介绍 "Tetris Example in Rust, v2" 是一个用Rust语言编写的俄罗斯方块游戏示例。这个项目不仅是一个简单…

Spring Boot 与 Spring Cloud Alibaba 版本兼容对照

版本选择要点 Spring Boot 3.x 与 Spring Cloud Alibaba 2022.0.x Spring Boot 3.x 基于 Jakarta EE&#xff0c;javax.* 更换为 jakarta.*。 需要使用 Spring Cloud 2022.0.x 和 Spring Cloud Alibaba 2022.0.x。 Alibaba 2022.0.x 对 Spring Boot 3.x 的支持在其发行说明中…

(免费送源码)计算机毕业设计原创定制:Java+ssm+JSP+Ajax SSM棕榈校园论坛的开发

摘要 随着计算机科学技术的高速发展,计算机成了人们日常生活的必需品&#xff0c;从而也带动了一系列与此相关产业&#xff0c;是人们的生活发生了翻天覆地的变化&#xff0c;而网络化的出现也在改变着人们传统的生活方式&#xff0c;包括工作&#xff0c;学习&#xff0c;社交…

Ubuntu Opencv 源码包安装

说明&#xff1a; ubuntu20.04 建议 使用 opencv-4.6.0版本 ubuntu18.04 建议 使用 opencv-4.5.2-版本 安装包准备 1、下载源码包 OpenCV官网 下载相关版本源码 Sources # 克隆方式 OpenCV 源码git clone https://github.com/opencv/opencv.gitcd opencvgit checkout 4.5.2 …

Linux 下自动化之路:达梦数据库定期备份并推送至 GitLab 全攻略

目录 环境准备 生成SSH 密钥对 数据库备份并推送到gitlab脚本 设置定时任务 环境准备 服务器要有安装达梦数据库&#xff08;达梦安装这里就不示例了&#xff09;&#xff0c;git 安装Git 1、首先&#xff0c;确保包列表是最新的&#xff0c;运行以下命令&#xff1a; …

<项目代码>YOLOv8 停车场空位识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

Spring Boot 集成 Knife4j 的 Swagger 文档

在开发微服务应用时&#xff0c;API 文档的生成和维护是非常重要的一环。Swagger 是一个非常流行的 API 文档工具&#xff0c;可以帮助我们自动生成 RESTful API 的文档&#xff0c;并提供了一个友好的界面供开发者测试 API。本文将介绍如何在 Spring Boot 项目中集成 Knife4j …

微信小程序中会议列表页面的前后端实现

题外话&#xff1a;想通过集成腾讯IM来解决即时聊天的问题&#xff0c;如果含语音视频&#xff0c;腾讯组件一年5万起步&#xff0c;贵了&#xff01;后面我们改为自己实现这个功能&#xff0c;这里只是个总结而已。 图文会诊需求 首先是个图文列表界面 同个界面可以查看具体…

git(Linux)

1.git 三板斧 基本准备工作&#xff1a; 把远端仓库拉拉取到本地了 .git --> 本地仓库 git在提交的时候&#xff0c;只会提交变化的部分 就可以在当前目录下新增代码了 test.c 并没有被仓库管理起来 怎么添加&#xff1f; 1.1 git add test.c 也不算完全添加到仓库里面&…

【动手学电机驱动】STM32-FOC(8)MCSDK Profiler 电机参数辨识

STM32-FOC&#xff08;1&#xff09;STM32 电机控制的软件开发环境 STM32-FOC&#xff08;2&#xff09;STM32 导入和创建项目 STM32-FOC&#xff08;3&#xff09;STM32 三路互补 PWM 输出 STM32-FOC&#xff08;4&#xff09;IHM03 电机控制套件介绍 STM32-FOC&#xff08;5&…

5G NR:带宽与采样率的计算

100M 带宽是122.88Mhz sampling rate这是我们都知道的&#xff0c;那它是怎么来的呢&#xff1f; 采样率 子载波间隔 * 采样长度 38.211中对于Tc的定义&#xff0c; 在LTE是定义了Ts&#xff0c;在NR也就是5G定义了Tc。 定义这个单位会对我们以后工作中的计算至关重要。 就是在…

【湿度数据处理】中国地面气候资料日值数据集(V3.0)(MATLAB全代码)

【湿度数据处理】中国地面气候资料日值数据集 处理1:数据范围筛选处理2:缺测数据筛查处理3:缺测数据插补参考基于此博客完成各要素数据提取后-【数据集处理】中国地面气候资料日值数据集(V3.0)(含MATLAB全代码),进行后续数据筛选及缺测处理,此处以湿度数据为例。 提取到的…

MySQL面试-1

InnoDB中ACID的实现 先说一下原子性是怎么实现的。 事务要么失败&#xff0c;要么成功&#xff0c;不能做一半。聪明的InnoDB&#xff0c;在干活儿之前&#xff0c;先将要做的事情记录到一个叫undo log的日志文件中&#xff0c;如果失败了或者主动rollback&#xff0c;就可以通…

大数据-231 离线数仓 - DWS 层、ADS 层的创建 Hive 执行脚本

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…

leetcode_有序数组中的单一元素

540. 有序数组中的单一元素 - 力扣&#xff08;LeetCode&#xff09; 二分查找 使用条件 &#xff1a; 有序 &#xff0c; log n class Solution { public:int singleNonDuplicate(vector<int>& nums) {int left 0, right nums.size() - 1, mid;while (left <…

Python中的简单爬虫

文章目录 一. 基于FastAPI之Web站点开发1. 基于FastAPI搭建Web服务器2. Web服务器和浏览器的通讯流程3. 浏览器访问Web服务器的通讯流程4. 加载图片资源代码 二. 基于Web请求的FastAPI通用配置1. 目前Web服务器存在问题2. 基于Web请求的FastAPI通用配置 三. Python爬虫介绍1. 什…

USRP:B205mini-i

USRP B205mini-i B205mini-i都是采用工业级的FPGA芯片(-I表示industrial-grade)&#xff0c;所以价格贵。 这个工业级会让工作温度从原来 0 – 45 C 变为 -40 – 75 C. 温度的扩宽&#xff0c;会让工作的稳定性变好。但是前提是你需要配合NI的外壳才行&#xff0c;你如果只买一…

基于Redis内核的热key统计实现方案|得物技术

一、Redis热key介绍 Redis热key问题是指单位时间内&#xff0c;某个特定key的访问量特别高&#xff0c;占用大量的CPU资源&#xff0c;影响其他请求并导致整体性能降低。而且&#xff0c;如果访问热key的命令是时间复杂度较高的命令&#xff0c;会使得CPU消耗变得更加严重&…

鸿蒙安全控件之位置控件简介

位置控件使用直观且易懂的通用标识&#xff0c;让用户明确地知道这是一个获取位置信息的按钮。这满足了授权场景需要匹配用户真实意图的需求。只有当用户主观愿意&#xff0c;并且明确了解使用场景后点击位置控件&#xff0c;应用才会获得临时的授权&#xff0c;获取位置信息并…