Flink Environment
-
getExecutionEnvironment()
根据当前平台, 获取对应的执行环境, 若未设置并行度, 使用 flink-conf.yaml 中的并行度配置, 默认 1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
createLocalEnviroment()
创建本地环境, 并行度默认为 CPU 核数, 也可在构造函数中传参设置 LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
-
createRemoteEnviroment()
创建远程环境, 将 jar 提交到远程环境执行 StreamExecutionEnvironment remoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 7777, "/home/WordCount.jar");
Flink 输入源
- 使用集合数据作为输入源
env.fromCollection(new ArrayList<>()); env.fromElements(1, 2, 3);
- 使用文件作为输入源
env.readTextFile("/home/test.txt");
- 使用消息队列作为输入源
如下, 使用 Kafka 作为输入源引入连接器依赖: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version> </dependency>env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
- 用户自定义输入源(实现 SourceFunction 接口)
主要用于测试, 定义假数据.
具体实操代码如下:
import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Arrays;
import java.util.Properties;
import java.util.Random;/*** @author regotto*/
public class SourceTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();private static void readFromCollectionAndElement() {/*从集合中读取, SensorReading 自定义实体(String id, Long timestamp, Double temperature)*/DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(new SensorReading("1", 1111L, 35.1),new SensorReading("2", 2222L, 32.1),new SensorReading("3", 3333L, 33.1),new SensorReading("4", 12345L, 36.1)));DataStreamSource<Integer> elements = env.fromElements(1, 2, 3, 4, 5);dataStream.print("data");elements.print("int");}private static void readFromText() {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");dataStream.print();}private static void readFromKafka() {Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9999");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));dataStream.print();}/*** 用户自定义输入源*/private static void readFromUserDefine() {// 实现 SourceFunction 接口, run 方法中定义数据并使用 collect 资源输出DataStream<SensorReading> dataStream = env.addSource(new SourceFunction<SensorReading>() {private volatile boolean running = true;public void run(SourceContext<SensorReading> ctx) throws Exception {Random random = new Random();while (running) {for (int i = 0; i < 10; i++) {ctx.collect(new SensorReading(i + "", System.currentTimeMillis(), random.nextGaussian()));}}}public void cancel() {running = false;}});dataStream.print();}public static void main(String[] args) throws Exception {readFromUserDefine();env.execute();}
}
Transform
映射转换算子
- map: 将数据一一映射
- flatMap: 将数据打散后进行映射
- filter: 对数据进行过滤
聚合转换算子
- keyBy: 聚合操作, 将一个流 hash 运算拆分为不相交的分区, 每个分区包含相同key
滚动聚合: sum, min, max, minBy(), maxBy(); - reduce: 聚合操作, 合并当前元素与上次聚合的结果, 返回流包含所有聚合的结果
多流转换算子
- split 和 select: 根据某些特征将 DataStream 拆分为 2 个或 多个 DataStream
split: 将 DataStream 打上标签.
select: 将打上标签的 DataStream 进行一个拆分. - Connect 和 CoMap: 2个 DataStream 包装为 1 个 DataStream
connect: 包装后内部流依旧保持各自的状态, 流与流之间相互独立
coMap/coFlatMap: 对 connect 操作后的流, 进行 map/flatMa 合并操作 - union: 将 2 个以上相同类型的 DataStream 合并为同一个流
具体实操代码如下:
import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;/*** @author regotto*/
public class TransformTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();public static void main(String[] args) throws Exception {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");env.setParallelism(1);// map, 映射操作, 将数据映射封装为 SensorReadingDataStream<SensorReading> map = dataStream.map(value -> {String[] fields = value.split(",");return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));});map.print("map");// flatMap, 将原来的数据打散然后映射dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String s : value.split(",")) {out.collect(s);}}}).print("flatMap");// filter, 过滤器dataStream.filter((FilterFunction<String>) value -> value.startsWith("1")).print("filter");// map 进行滚动聚合求当前温度最大值, keyBy 可以用指定位置, 属性, 自定义 keySelectorKeyedStream<SensorReading, String> keyedStream = map.keyBy(SensorReading::getId);keyedStream.max("temperature").print("max temperature");// reduce 聚合, 求最大温度下的最大时间戳记录keyedStream.reduce(new ReduceFunction<SensorReading>() {@Overridepublic SensorReading reduce(SensorReading curData, SensorReading newData) throws Exception {return new SensorReading(curData.getId(), newData.getTimestamp(), Math.max(curData.getTemperature(), newData.getTemperature()));}}).print("最大温度下的最新时间");// split&select 根据温度把数据分为高温, 低温SplitStream<SensorReading> splitStream = keyedStream.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 36 ? Collections.singletonList("high") : Collections.singletonList("low");}});DataStream<SensorReading> high = splitStream.select("high");DataStream<SensorReading> low = splitStream.select("low");DataStream<SensorReading> all = splitStream.select("high", "low");high.print("高温流");low.print("低温流");all.print("all");// connect&coMap, 将高温处理为二元组, 与低温进行合并, 输出状态信息ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream =high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), value.getTemperature());}}).connect(low);connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {@Overridepublic Object map1(Tuple2<String, Double> value) throws Exception {return new Tuple3<>(value.f0, value.f1, "高温报警");}@Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), "温度正常");}}).print("connect&coMap");// 使用 union 合并 hig, lowhigh.union(low, all).print("union");env.execute();}
}
算子运算转化图:
RichMapFunction
对于 MapFunction
的增强, 可以获取 RuntimeContext
, 一个运行上下文代表一个分区, 每个分区创建销毁都执行 open, close 操作, 对资源预处理, 资源销毁进行操作, 继承 RichMapFunction
重写 open, close 实现资源预处理与回收操作. 使操作更为灵活, 其余 RichXXX
操作同理.
遇到的问题
写函数的时候, 把匿名内部类简写为 lambda 表达式, 导致泛型擦除的问题, 出现报错:
The generic type parameters of ‘Collector’ are missing. In many cases lambda methods don’t provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the ‘org.apache.flink.api.common.functions.FlatMapFunction’ interface. Otherwise the type has to be specified explicitly using type information.
总结
数据在运算, 转化过程, 一定要搞清楚, 输入是啥, 输出是啥.