Flink系列二:DataStream API中的Source,Transformation,Sink详解(^_^)

在上面篇文章中已经对flink进行了简单的介绍以及了解了Flink API 层级划分,这一章内容我们主要介绍DataStream API

 

流程图解:

 

3908739ad07d4f6b987e0e15b257e0ac.png

 

一、DataStream API Source

Flink 在流处理和批处理上的 source 大概有 4 类:

(1)基于本地集合的 source

(2)基于文件的 source

(3)基于网络套接字的 source,具体来说就是从远程服务器或本地端口上的套接字连接中接收数据,比如上一篇文章中的入门案例就属于这一种。

(4)自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source,灵活度较高,看个人需求。

 

下面就是纯代码演示了,具体细节会在注释中说明

1、本地集合的source

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class Demo1ListSource {public static void main(String[] args) throws Exception{//创建flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//创建集合ArrayList<String> arrayList = new ArrayList<>();arrayList.add("java");arrayList.add("java");arrayList.add("java");arrayList.add("java");arrayList.add("java");/**基于集合的Source ----- 属于有界流*/DataStream<String> listDS = env.fromCollection(arrayList);listDS.print();//启动Flink作业执行env.execute();}
}

结果:

16787a01a3a847538ef32dcecc29f79f.png

在这解释一下结果图中的数字前缀,这个前缀的主要目的是不同并行实例的输出。什么都不设置的话取决于你电脑的内存了,比如我电脑是16G的内存,那么当数据较多时默认分配给该作业分了16个task。

 

2、本地文件的source

注意:同一个File数据源,既能有界读取,也能无界读取

2.1 有界读取

/**流批统一:* 1、同一套算子代码既能作流处理也能做批处理* 2、同一个File数据源,既能有界读取,也能无界读取*/
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2FileSource1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/**有界读取*///老版本方式:简单但不灵活DataStream<String> lineDS = env.readTextFile("flink/data/student.csv");
//        lineDS.print();//新版本方式:复杂一点但更灵活,使用这种既能有界读取,也能无界读取//构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(//指定编码new TextLineInputFormat("UTF-8")//指定路径, new Path("flink/data/student.csv")).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");fileDS.print();env.execute();}
}

 

2.1 无界读取

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2FileSource2 {public static void main(String[] args) throws Exception {/**使用无界流读取文件数,很简单,其实就是对上面的代码修改运行模式并加个参数就可以了*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//修改运行模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("spark/data/student.csv")).build();//使用fileSourceDataStreamSource<String> linesDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");linesDS.print();env.execute();}
}

 

3、本地端口的source

上一篇文章中的入门案例就属于这一种,后面在代码中也会用到,在此不在赘述了。

 

4、自定义的 source

举例:使用自定义source读取mysql中的数据

/*实现方式:* 1、实现SourceFunction或ParallelSourceFunction接口来创建自定义的数据源。* 2、然后使用env.addSource(new CustomSourceFunction())或DataStreamSource.fromSource添加你自定义的数据源。*/
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class Demo3MysqlSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用自定义的sourceDataStream<Student> studentDSSource = env.addSource(new MysqlSource());//统计学生表每个班级的人数//取出每一行的班级列并加上人数后缀1DataStream<Tuple2<String, Integer>> clazzKvDS = studentDSSource.map(line -> Tuple2.of(line.getClazz(), 1), Types.TUPLE(Types.STRING, Types.INT));//分组,将相同的键发送给同一个task中KeyedStream<Tuple2<String, Integer>, String> keyByDS = clazzKvDS.keyBy(kv -> kv.f0);//求和SingleOutputStreamOperator<Tuple2<String, Integer>> clazzSum = keyByDS.sum(1);//输出clazzSum.print();env.execute();}}/*** 自定义source读取mysql中的数据*/
class MysqlSource implements SourceFunction<Student> {/*** run()方法会在任务启动的时候执行一次*/@Overridepublic void run(SourceContext ctx) throws Exception {//1、加载mysq驱动Class.forName("com.mysql.jdbc.Driver");//2、创建数据库连接//注意:如果报连不上的错误,将参数补全(useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false)Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false", "root", "123456");//3、编写sql查询PreparedStatement sql = conn.prepareStatement("select * from students");//4、执行查询ResultSet resultSet = sql.executeQuery();//5、遍历查询出的数据while (resultSet.next()) {int id = resultSet.getInt("id");String name = resultSet.getString("name");int age = resultSet.getInt("age");String gender = resultSet.getString("gender");String clazz = resultSet.getString("clazz");//将数据发送到下游/** collect():从 DataStream 收集所有的元素,并将它们作为列表或其他集合类型返回给客户端*/ctx.collect(new Student(id, name, age, gender, clazz));}//6、释放资源sql.close();conn.close();}@Overridepublic void cancel() {/** cancel(),它用于在任务完成后执行清理操作*/}
}/*** 这里使用了lombok插件(小辣椒)* 这个插件的作用可以在代码编译的时候增加方法(相当于scala中的case class),就不用我们自己手动添加get、set、toString等方法了。* 使用方法:加@就行了*/
@Data
@AllArgsConstructor
class Student {private int id;private String name;private int age;private String gender;private String clazz;
}

 

 

二、DataStream API Transformation

Transformation:数据流转换。

常见算子有 Map / FlatMap / Filter /KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据形式。

其实这些算子在功能上与scala或spark中的基本相同,只是形式和细节上会有些差别。

1、map

DataStream → DataStream    输入一个元素同时输出一个元素


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo1Map {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用nc -lk 8888 模拟实时数据的产生DataStreamSource<String> source = env.socketTextStream("master", 8888);//方式1:匿名内部类形式/** 观察源码map发现:* MapFunction<T, O> 是一个函数接口,用于对流中的每个元素的处理* 这个接口定义了一个 map 方法,该方法接受一个输入元素(类型为 T)并返回一个输出元素(类型为 O)。*/DataStream<String> map1DS = source.map(new MapFunction<String, String>() {@Overridepublic String map(String word) throws Exception {return word.toUpperCase();}});
//        map1DS.print();//方式2:lambda表达式形式(更简洁常用)source.map(String::toUpperCase).print();    //是对source.map(word -> word.toUpperCase())的更简写env.execute();}
}

结果:

45374801cb354dccbc098f73af160b18.png

 

 

2、flatMap

DataStream → DataStream

输入一个元素转换为一个或多个元素输出

/**flatMap 方法用于将输入流中的每个元素转换成一个或多个输出元素*/import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo2FaltMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> source = env.socketTextStream("master", 8888);//方式1:匿名内部类//看源码,这个方法接受一个FlatMapFunction<T, R>类型的参数,其中T是输入元素的类型,R是输出元素的类型DataStream<String> out2DS = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {for (String word : line.split(",")) {//循环将数据发送到下游out.collect(word);}}});//        out2DS.print();//方式2:lambda表达式DataStream<String> out1DS = source.flatMap((line, out) -> {for (String word : line.split(",")) {//循环将数据发送到下游out.collect(word);}}, Types.STRING);out1DS.print();env.execute();}
}

结果:

c82a409f9f824b6eba5ee6263c064070.png

 

3、filter

DataStream → DataStream 

为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo3Filter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> source = env.readTextFile("spark/data/student.csv");//需求:过滤出文科一班的学生的信息//方式一:匿名内部类source.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String lines) throws Exception {return "文科一班".equals(lines.split(",")[4]);}}); //.print();//方式2:lambda表达式source.filter(lines->"文科一班".equals(lines.split(",")[4])).print();env.execute();}
}

结果:

d3e93c6a422c4b5abbd8ce6822799a0d.png

 

4、keyBy

作用为:分组

DataStream → KeyedStream

在逻辑层面将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。

39272837e4da48c3b64487f8a16d2766.png

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo4KeyBy {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("master", 8888);//方式1:匿名内部类/** public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key)* 其中 T 是输入元素的类型,K 是键的类型*/source.map(word-> Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> kv) throws Exception {return kv.f0;}});//.print();//方式2:lambda表达式source.map(word-> Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).keyBy(kv->kv.f0).print();env.execute();}
}

结果: 可以看出的确作了分区

48db7796907641b9b7c8480b6561ab4b.png

 

5、reduce

作用为:聚合

KeyedStream → DataStream

在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo5Reduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> source = env.socketTextStream("master", 8888);//方式1:匿名内部类source.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> kv1,Tuple2<String, Integer> kv2) throws Exception {//kv1和kv2的key是一样的String word = kv1.f0;int counts = kv1.f1 + kv2.f1;return Tuple2.of(word,counts);}}).print();env.execute();}
}

结果:从结果来看说明reduce是一个有状态算子。 

6003735a7c804c60b0a2652d4f664ee0.png

 

6、Window

KeyedStream → WindowedStream 

可以在已经分区的 KeyedStreams 上定义 Window,Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组。

窗口算子有很多,以后会专门出一章具体说明,下面写一个滑动窗口的案例。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo6Window {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/** 每隔5秒统计最近15秒每个单词的数量 --- 滑动窗口*/DataStream<String> wordsDS = env.socketTextStream("master", 8888);//转换成kvDataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//按照单词分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//划分窗口//SlidingEventTimeWindows:滑动的处理时间窗口//前一个参数为窗口大小(window size),后一个参数为滑动大小(window slide)WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));//统计单词的数量DataStream<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();env.execute();}
}

 

7、Union

DataStream→ DataStream

将两个或多个数据流联合来创建一个包含所有流中数据的新流。注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo7Union {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> source1 = env.socketTextStream("master", 8888);DataStream<String> source2 = env.socketTextStream("master", 9999);/** 合并两个DataStream* 注意:在数据层面并没有合并,只是在逻辑层面合并了*/DataStream<String> unionDS = source1.union(source2);unionDS.print();env.execute();}
}

结果:

5e95fe6957a1463abe7dde2dce4af9d9.png

 

8、process

DataStream→ DataStream

process算子是flink的底层算子,可以用来代替map、faltMap、filter等算子

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class Demo8Process {public static void main(String[] args) throws Exception {/** process算子是flink的底层算子,可以用来代替map、faltMap、filter等算子** public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction)* 其中 T 是输入数据的类型,R 是输出数据的类型*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> source = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> processDS = source.process(new ProcessFunction<String, Tuple2<String, Integer>>() {/** processElement:在当前代码中相当于flatMap,每一条数据执行一次,可以返回一条或多条数据* ctx:上下文对象(代表flink执行环境)* out:输出,用于将数据发送到下游*/@Overridepublic void processElement(String line, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx,Collector<Tuple2<String, Integer>> out) throws Exception {//这里的逻辑与flatMap的逻辑相同for (String word : line.split(",")) {out.collect(Tuple2.of(word, 1));}}});env.execute();/** 注意:该算子不能用lambda表达式改写,因为ProcessFunction它包含了一些生命周期方法和状态管理的方法,* 这些方法使得它不适合直接简化为lambda表达式的形式。** 在底层代码层面来说,ProcessFunction是一个抽象类,该类还有许多复杂的方法,使得它无法直接用lambda表达式来改写* 因为 lambda 表达式只能表示简单的函数接口(即那些只包含一个抽象方法的接口)* public abstract class ProcessFunction<I, O> extends AbstractRichFunction*/}
}

 

三、DataStream API Sink

Flink 将转换计算后的数据发送的地点 。

Flink 常见的 Sink 大概有如下几类:

(1)打印在控制台、写入文件。

(2)写入 socket(具体指的是将数据发送到网络套接字(例如端口))。

(3)自定义的 sink :常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,当然你也可以根据需求定义自己的 sink。

 

1、写入文件

对于写入文件,是否要将所有数据写入同一个文件?由于是流式写入,该文件就一直处于正在写入的状态,而且可能会造成文件过大的问题,所以DataStream API提供了滚动策略的方式来解决这样的问题。

9a5ed8bc0f1c42a4a16f59c8bcf80165.png

 


import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;public class Demo1FileSink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> source = env.socketTextStream("master", 8888);//创建fileSink/**public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(*        final Path basePath, final Encoder<IN> encoder)}**<IN> : The type of the elements that are being written by the sink.*/FileSink<String> fileSink = FileSink.forRowFormat(new Path("flink/data/words"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder()//每10秒进行一次滚动(生成文件).withRolloverInterval(Duration.ofSeconds(10))//当延迟超过10秒进行一次滚动.withInactivityInterval(Duration.ofSeconds(5))//文件大小达到1MB进行一次滚动.withMaxPartSize(MemorySize.ofMebiBytes(1)).build()).build();//使用fileSink,将读取的数据写入另一到文件夹中source.sinkTo(fileSink);env.execute();}
}

结果: 

d9cfbc1a28b94ad79646683db279d104.png

 

2、自定义的 sink

举例:使用自定义sink将数据存到mysql中

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class Demo3MySqlSInk {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> wordsDS = env.socketTextStream("master", 8888);//统计单词的数量DataStream<Tuple2<String, Integer>> countDS = wordsDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0).sum(1);//将统计结果保存到数据countDS.addSink(new MySQlSink());env.execute();}
}/*** 自定义sink将数据保存到mysql* RichSinkFunction:多了open和close方法,用于打开和关闭连接* SinkFunction*/
class MySQlSink extends RichSinkFunction<Tuple2<String, Integer>> {Connection con;PreparedStatement stat;/*** invoke方法每一条数据执行一次*/@Overridepublic void invoke(Tuple2<String, Integer> kv, Context context) throws Exception {stat.setString(1, kv.f0);stat.setInt(2, kv.f1);//执行sqlstat.execute();}/*** open方法会在任务启动的时候,每一个task中执行一次*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("创建数据库连接");//1、加载启动Class.forName("com.mysql.jdbc.Driver");//2、创建数据库连接con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29", "root", "123456");//3、编写保存数据的sql//replace into 替换插入,如果没有就插入,如果有就更新,表需要有主键stat = con.prepareStatement("replace into word_count values(?,?)");}/*** close方法会在任务取消的时候,每一个task中执行一次*/@Overridepublic void close() throws Exception {//4、关闭数据库连接stat.close();con.close();}

 

 

 

 

 

---------------------------------------------------------------------------------------------------------------------------------

代码注意提示:

如果在写flink代码的过程中出现了以下错误,大概率就是有些算子使用没有写数据类型,与spark不同,spaark底层由scala编写,scala提供了自动类型推断机制,所以不写参数类型也不会报错,但是flink底层是java编写的,java没有这种机制。

23ddb4c8d57549598c2cdb15b640badf.png

 

 

基础的算子到这结束,其他算子后续也会写,以上内容具体详情皆参考apache flink官网,官网详细说明了各种算子的使用,网址贴在下面了:

https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/operators/overview/

个人感觉写的很详细了,看不懂建议直接打死作者(^_^)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/19342.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python网页处理与爬虫实战:使用Requests库进行网页数据抓取

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

【前端】Vuex笔记(超详细!!)

最近花了两周时间&#xff0c;完完全全的跟着Vuex官方的视频学完了Vuex并且详详细细的做了笔记&#xff0c;其中总结部分是我对于整个视频课程的总结&#xff0c;视频部分是跟着视频做的笔记&#xff0c;如果总结部分有不懂的话&#xff0c;直接去视频部分查找对应的笔记即可&a…

ios:文本框默认的copy、past改成中文复制粘贴

问题 ios 开发&#xff0c;对于输入框的一些默认文案展示&#xff0c;如复制粘贴是英文的&#xff0c;那么如何改为中文的呢 解决 按照路径找到这个文件 ios/项目/Info.plist&#xff0c;增加 <key>CFBundleAllowMixedLocalizations</key> <true/> <…

Keras深度学习框架实战(1):图像分类识别

1、绪论 1.1 图像分类的定义 图像分类是计算机视觉领域中的一项基本任务&#xff0c;其定义是将输入图像分配给预定义类别中的一个或多个。具体来说&#xff0c;图像分类系统接受一个图像作为输入&#xff0c;并输出一个或多个类别标签&#xff0c;这些标签描述了图像中的内容…

第十三届蓝桥杯物联网试题(国赛)

还是那句话不能掉以轻心&#xff0c;全力以赴吧&#xff0c;遇事不要慌&#xff0c;该做的都做了&#xff0c;冷静沉稳的处理&#xff0c;看看配置&#xff0c;看看代码&#xff0c;还是不行就重启&#xff0c;都没问题换个板子 下面对比较复杂的部分的处理过程进行展现&#x…

git报错prohibited by Gerrit: not permitted: update

git push报错&#xff1a; Push to refs/for/[branch] to create a review, or get Push rights to update the branch. Contact an administrator to fix the permissions (prohibited by Gerrit: not permitted: update)原因&#xff1a; 使用Gerrit代码审核时&#xff0c;本…

CentOS 7基础操作02_优化Linux操作系统中的服务

1、实验环境 公司在文件服务器中新安装了CentOS系统.由于默认启动的服务程序较多&#xff0c;系统运行缓慢。现需要对系绞服务进行适当优化&#xff0c;减少一些不必要的自启动服务.并设置系统在开机后直接进入字符模式。 2、需求描述 根据实际使用需求对CentOS 7操作系统中的…

postgressql——PGPROC XLOG(6)

PGPROC相关结构 在共享内存中,核心数据结构围绕PROC_HDR指向的两个list:PROC和XACT PRCO内存连续,维护链表结构方便申请释放,对应每个后台服务进程,PID为OS标识、PGPROCNO为内部标识 XACT内存连续,维护快照需要的xmin和xid,XACT从PROC拆出来是为了更高的cache line命中…

IBM开源Granite Code模型,多尺寸可选,支持多种代码任务,性能媲美 CodeLlama

前言 近年来&#xff0c;大型语言模型&#xff08;LLM&#xff09;在代码领域展现出惊人的潜力&#xff0c;为软件开发流程带来了革命性的改变。代码 LLM 不仅能够生成高质量代码&#xff0c;还能帮助程序员修复错误、解释代码、编写文档等等&#xff0c;极大地提高了软件开发…

【kubernetes】探索k8s集群的存储卷、pvc和pv

目录 一、emptyDir存储卷 1.1 特点 1.2 用途 1.3部署 二、hostPath存储卷 2.1部署 2.1.1在 node01 节点上创建挂载目录 2.1.2在 node02 节点上创建挂载目录 2.1.3创建 Pod 资源 2.1.4访问测试 2.2 特点 2.3 用途 三、nfs共享存储卷 3.1特点 3.2用途 3.3部署 …

Web程序设计-实验05 DOM与BOM编程

题目 【实验主题】 影视网站后台影视记录管理页设计 【实验任务】 1、浏览并分析多个网站后台的列表页面、编辑页面&#xff08;详见参考资源&#xff0c;建议自行搜索更多后台页面&#xff09;的主要元素构成和版面设计&#xff0c;借鉴并构思预期效果。 2、新建 index.h…

正则匹配优化:匹配排除多个字符串的其他字符串

(^entity|^with|...)\w优化 (?!entity|with|has|index|associations|input)\w(?!): 匹配排除项 效果 继续优化 匹配会过滤掉带有关键字的字段&#xff0c;在过滤的时候是可以加上尾部结束匹配符的 效果&#xff1a;

thinkphp6 自定义的查询构造器类

前景需求&#xff1a;在查询的 时候我们经常会有一些通用的&#xff0c;查询条件&#xff0c;但是又不想每次都填写一遍条件&#xff0c;这个时候就需要重写查询类&#xff08;Query&#xff09; 我目前使用的thinkphp版本是6.1 首先自定义CustomQuery类继承于Query <?p…

【C语言回顾】预处理

前言1. 简单概要2. 预处理命令讲解结语 上期回顾: 【C语言回顾】编译和链接 个人主页&#xff1a;C_GUIQU 归属专栏&#xff1a;【C语言学习】 前言 各位小伙伴大家好&#xff01;上期小编给大家讲解了C语言中的编译和链接&#xff0c;接下来我们讲解一下预处理&#xff01; …

【香橙派 AIpro】新手保姆级开箱教程:Linux镜像+vscode远程连接

香橙派 AIpro 开发板 AI 应用部署测评 写在最前面一、开发板概述官方资料试用印象适用场景 二、详细开发前准备步骤1. 环境准备2. 环境搭建3. vscode安装ssh插件4. 香橙派 AIpro 添加连接配置5. 连接香橙派 AIpro6. SSH配置 二、详细开发步骤1. 登录 juypter lab2. 样例运行3. …

【IDEA】-使用IDEA查看类之间的依赖关系

1、父子类的继承、实现关系 1.1、使用CTRL Alt U 选择 java class 依据光标实际指向的类位置 用实心箭头表示泛化关系 是一种继承的关系&#xff0c;指向父类 可以提前设置需要显示的类的属性、方法等信息 快捷键 Ctrl Alt S &#xff0c;然后搜索 Diagrams 1.2、使用…

python知识继续学习

1、计算机表示小数是有误差的&#xff0c;下面的5就是误差 2、在python中&#xff0c;所有的非0数字都是True&#xff0c;零是False。所有的非空字符串都是True&#xff0c;空字符串是False。空列表是False。在python的基本数据类型中&#xff0c;表示空的东西都是False&#x…

数据结构(三)循环链表 约瑟夫环

文章目录 一、循环链表&#xff08;一&#xff09;概念&#xff08;二&#xff09;示意图&#xff08;三&#xff09;操作1. 创建循环链表&#xff08;1&#xff09;函数声明&#xff08;2&#xff09;注意点&#xff08;3&#xff09;代码实现 2. 插入&#xff08;头插&#x…

【linux】运维-基础知识-认知hahoop周边

1. HDFS HDFS&#xff08;Hadoop Distributed File System&#xff09;–Hadoop分布式文件存储系统 源自于Google的GFS论文&#xff0c;HDFS是GFS的克隆版 HDFS是Hadoop中数据存储和管理的基础 他是一个高容错的系统&#xff0c;能够自动解决硬件故障&#xff0c;eg&#xff1a…

【Linux 网络编程】网络的背景、协议的分层知识!

文章目录 1. 计算机网络背景2. 认识 "协议"3. 协议分层 1. 计算机网络背景 网络互联: 多台计算机连接在一起, 完成数据共享; &#x1f34e;局域网&#xff08;LAN----Local Area Network&#xff09;: 计算机数量更多了, 通过交换机和路由器连接。 &#x1f34e; 广…