一 相关大数据概念
1.1 根据时间
1.实时计算:
数据实时处理,结果实时存储
是一种持续、低时延、事件触发的任务
2.离线计算:
数据延迟处理,结果N+1模式(昨天的数据今天存储)
是一种批量、高时延、主动发起的计算任务
1.2 处理方式
1.流式处理:
一次处理一条或者少量;状态小
2.批量处理:
处理大量数据;处理完返回最终结果
二 Flink的架构以及工作原理
2.1 相关概念
Apache Flink 是一个实时计算框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
2.2 特性
1.支持高吞吐、低延迟、高性能的流处理
2.支持带有事件时间的窗口(Window)操作
3.支持有状态计算的Exactly-once语义
4.支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
5.支持具有反压功能的持续流模型
6.支持基于轻量级分布式快照(Snapshot)实现的容错
7.一个运行时同时支持Batch on Streaming处理和Streaming处理
8.Flink在JVM内部实现了自己的内存管理,避免了出现oom
9.支持迭代计算
10.支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
2.3 wordConut 代码
2.3.1 流处理
1.流处理的模式
setRuntimeMode(RuntimeExecutionMode.STREAMING)(持续流模型)
2.读取文件的方法
无界流:socketTextStream
有界流:readTextFile
3.输出结果是:连续型的(累加的)
package com.shujia.flink.core;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo1StreamWordCount {public static void main(String[] args)throws Exception {//1、创建flink的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);//2、读取数据//nc -lk 8888//DataStreamSource是DataStream的子类,本来读文件返回的类型是DataStreamSource//但是为了整洁使用DataStreamDataStream<String> wordDS = env.socketTextStream("master", 8888);//统计单词数量//SingleOutputStreamOperator是DataStream的子类,map返回的类型是SingleOutputStreamOperator//但是为了整洁使用DataStream/*使用的是Tuple2类中的of方法,转化成kv形式的二元组但是这样后面需要加上二元组的类型,需要手动加入Types:是flink中的类*/DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word-> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//3、统计单词的数量KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//对下标为1的列求和//SingleOutputStreamOperator是DataStream的子类,sum返回的类型是SingleOutputStreamOperator//但是为了整洁使用DataStreamDataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);//打印数据countDS.print();//启动flinkenv.execute();}
}
2.3.2 批处理
1.RuntimeExecutionMode.BATCH(MR模型)
2.批处理模式只能用于处理有界流
3.输出结果是最终结果
package com.shujia.flink.core;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2BatchWordCount {public static void main(String[] args)throws Exception {//1、创建flink的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//修改处理模式/**处理模式* RuntimeExecutionMode.BATCH:批处理模式(MapReduce模型)* 1、输出最终结果* 2、批处理模式只能用于处理有界流** RuntimeExecutionMode.STREAMING:流处理模式(持续流模型)* 1、输出连续结果* 2、流处理模式,有界流核无界流都可以处理*/env.setRuntimeMode(RuntimeExecutionMode.BATCH);//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);//2、读取文件--有界流//DataStreamSource是DataStream的子类,本来读文件返回的类型是DataStreamSource//但是为了整洁使用DataStreamDataStream<String> wordDS = env.readTextFile("filnk/data/words.txt");//统计单词数量//SingleOutputStreamOperator是DataStream的子类,map返回的类型是SingleOutputStreamOperator//但是为了整洁使用DataStream/*使用的是Tuple2类中的of方法,转化成kv形式的二元组但是这样后面需要加上二元组的类型,需要手动加入Types:是flink中的类*/DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word-> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//3、统计单词的数量KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//对下标为1的列求和//SingleOutputStreamOperator是DataStream的子类,sum返回的类型是SingleOutputStreamOperator//但是为了整洁使用DataStreamDataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);//打印数据countDS.print();//启动flinkenv.execute();}
}
2.4 原理
1.创建环境 socket
2.使用socketTextStream读取文件,形成一个task任务(因为读取文件只支持单线程,所以只有一个task)
3.map是分组,形成两个task任务
4.keyBy,分组(相同的key被分到一个task中),里面有一个shuffle,也是两个task任务。这里之前被称为上游,之后称为下游。
5.sum。求和。将keyBy里面已经分好的task任务里面的元素求和
6.打印。
spark与flink的区别
spark是MR模型,先执行map task 再执行reduce task。MR模型可以再map端进行预聚合,shuffle减少了数据的传输
flink是持续流模型,上游task与下游tsak同时启动,等待数据到达,每一条数据都会进行计算,出来结果。
三 source
3.1 集合source
1.将java中的集合变成一个source
package com.shujia.flink.source;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class Demo1ListSource {public static void main(String[] args)throws Exception {//创建flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);ArrayList<String> list = new ArrayList<>();list.add("java");list.add("java");list.add("java");list.add("java");list.add("java");list.add("java");//DataStream<String> listDS = env.fromCollection(list);listDS.print();env.execute();}
}
3.2 fileSource
3.2.1.fileSource的有界流读法
1.创建一个FileSource对象。使用forRecordStreamFormat方法,里面传的参数是格式与编码的对象TextLineInputFormat,与读取文件路径的对象Path。然后整体使用built创建
2.无界流读取的是文件
3.使用fileSource。使用flink环境对象env中的fromSource方法,里面传的三个参数是:
fileSource对象,WatermarkStrategy.noWatermarks(),与数据source的名字。这里创建的对象是DS对象
4.代码
package com.shujia.flink.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2FileSource {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);/** 1、老版本读取文件 -- 有界流*/DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
// linesDS.print();/**2、型版本 --可有界读取也可以无界读取*///构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("flink/data/students.csv")).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");fileDS.print();env.execute();}
}
5.结果
3.2.2 fileSource的无界流读法
1.与有界流读法的差别就是在创建FileSource对象时,多设置了一步,每隔一段时间读取目录下新的文件,构建无界流。方法是:
monitorContinuously(Duration.ofSeconds(5)),
2.这个读取是文件夹,所以设置forRecordStreamFormat里面传的参数Path读取的是文件夹
3.代码
package com.shujia.flink.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class Demo2FileSource {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);/** 1、老版本读取文件 -- 有界流*/DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
// linesDS.print();/**2、型版本 --可有界读取也可以无界读取*///构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("flink/data/stu"))//每隔一段时间读取目录下新的文件,构建无界流.monitorContinuously(Duration.ofSeconds(5)).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");fileDS.print();env.execute();}
}
4.结果
3.2.3 fileSource批流处理
1. 流批统一:同一套算子代码既能做流处理也能做批处理
同一个file数据源,既能有界读取也能无界读取
2.批处理(有界流)
package com.shujia.flink.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class Demo2FileSource {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);//设置处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);/** 1、老版本读取文件 -- 有界流*/DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
// linesDS.print();/**2、型版本 --可有界读取也可以无界读取*///构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("flink/data/stu"))//每隔一段时间读取目录下新的文件,构建无界流
// .monitorContinuously(Duration.ofSeconds(5)).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");DataStream<Tuple2<String, Integer>> kvDS = fileDS.map(stu -> Tuple2.of(stu.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);countDS.print();
// fileDS.print();env.execute();}
}
运行结果:是最终结果
3.流处理(无界流)
package com.shujia.flink.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class Demo2FileSource {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);//设置处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);/** 1、老版本读取文件 -- 有界流*/DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
// linesDS.print();/**2、型版本 --可有界读取也可以无界读取*///构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("flink/data/stu"))//每隔一段时间读取目录下新的文件,构建无界流.monitorContinuously(Duration.ofSeconds(5)).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");DataStream<Tuple2<String, Integer>> kvDS = fileDS.map(stu -> Tuple2.of(stu.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);countDS.print();
// fileDS.print();env.execute();}
}
结果是 连续型结果
3.3 自定义source
1.使用addSource方法,里面传入的是实现SourceFunction接口的对象,该接口有一个泛型
2.自己定义类实现SourceFunction,重写run()方法,run方法里面使用collect方法提交数据
package com.shujia.flink.source;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class Demo3MySource {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用自定义//addSource里面需要传一个实现了SourceFunction接口类的对象DataStream<Integer> linesDS = env.addSource(new MySource());linesDS.print();env.execute();}
}class MySource implements SourceFunction<Integer> {@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (true) {//发送数据//ctx是SourceContext对象ctx.collect(100);Thread.sleep(5);}}//cancel方法再任务取消的时候执行,一般用于回收资源@Overridepublic void cancel() {}
}
3.实例:连接Mysql数据库,读取数据库其中一个表的内容
其实Student类中使用了小辣椒的插件,简化的代码的冗余度
package com.shujia.flink.source;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class Demo4MySQLSource {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用自定义SourceDataStream<Student> lineDS = env.addSource(new MySQlSource());// lineDS.print();DataStream<Tuple2<String, Integer>> kvDS = lineDS.map(line ->Tuple2.of(line.getClazz(), 1),Types.TUPLE(Types.STRING, Types.INT));DataStream<Tuple2<String, Integer>> countDS = kvDS.keyBy(kv -> kv.f0).sum(1);countDS.print();env.execute();}
}class MySQlSource implements SourceFunction<Student>{@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {//加载mysql驱动Class.forName("com.mysql.jdbc.Driver");//创建数据库连接对象Connection connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false","root","123456");//编写查询语句PreparedStatement sta = connection.prepareStatement("select * from students");//执行查询ResultSet resultSet = sta.executeQuery();//解析数据while (resultSet.next()){int id = resultSet.getInt("id");String name = resultSet.getString("name");int age = resultSet.getInt("age");String gender = resultSet.getString("gender");String clazz = resultSet.getString("clazz");//将数据发送到下游sourceContext.collect(new Student(id,name,age,gender,clazz));}//关闭查询连接sta.close();//关闭数据库连接connection.close();}@Overridepublic void cancel() {}
}/*** lombok插件作用:可以再代码编译时动态给代码增加方法(相当于scala中的case class)* /@Getter* /@Setter* 上面两个可以和起来成为一个@Data:这个是除了构造方法,其他方法都有,toString等等* / @AllArgsConstructor:这个是构造方法*/
@Data
@AllArgsConstructor
class Student{private Integer id;private String name;private Integer age;private String gender;private String clazz;
}
四 算子
4.1 map
1.map里面需要传入一个实现MapFunction的接口的对象,而MapFunction继承与Function。
2.重写map方法
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo1Map {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);/** 1、使用匿名内部类的方式*///map里面需要传入一个MapFunction的对象//public interface MapFunction<T, O> extends FunctionSingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = linesDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});// kvDS.print();/** 2、使用lambda表达式的方式 -- 简化的写法*/linesDS.map(word->Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).print();env.execute();}
}
4.2 flatMap
1.flapMap里面传入的是实现FlatMapFunction的接口的对象,而FlatMapFunction继承与Function。
2.重写flatMap方法,里面需要传两个参数,一个是传进来的数据,另一个是发送数据的接口类Collector。故使用lambda表达式时,->前面需要传2个值。
3.使用lambda表达式,最后要指定返回的类型,要不然会报错。
4.无论是lambda表达式还是用匿名内部类,里面的主要步骤还是循环发送数据到下游。这里的数据要循环,但是循环之前怎么做都可以,切分啊 转化啊等等。
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo2FlatMap {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDs = env.socketTextStream("master", 8888);//flatMap(FlatMapFunction<T, R> flatMapper)//public interface FlatMapFunction<T, O> extends FunctionDataStream<String> flapMapDS = linesDs.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {for (String word : line.split(",")) {out.collect(word);}}});// flapMapDS.print();linesDs.flatMap( (line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING).print();env.execute();}
}
4.3 filter
1.filter里面传入的是实现FilterFunction的接口的对象,而FilterFunction继承与Function。
2.lambda表达式中不需要传入返回值类型,因为他是boolean的值。
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo3Filter {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");//filter(FilterFunction<T> filter)//public interface FilterFunction<T> extends Function// boolean filter(T value)DataStream<String> filterDS = linesDS.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {String clazz = value.split(",")[4];return "文科一班".equals(clazz);}});
// filterDS.print();linesDS.filter(line->"文科一班".equals(line.split(",")[4])).print();env.execute();}
}
4.4 keyBy
1.keyBy里面传入的是实现KeySelector接口的对象,KeySelector继承于Function
2.重写getKey方法,直接返回参数即可
3.他的作用就是将相同key发送到同一个task任务里面
4.他的返回值是kv形式的DS
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo4KeyBy {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);//keyBy(KeySelector<T, K> key)//KeySelector<IN, KEY> extends FunctionKeyedStream<String, String> keyByDS = linesDS.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {return value;}});//将相同的key发送到同一个task中linesDS.keyBy(word->word).print();env.execute();}
}
4.5 reduce
1.他只能作用在kv形式的DS上
//reduce(ReduceFunction<T> reducer) //public interface ReduceFunction<T> extends Function
2.最后返回一个二元组的形式,无论是组合式还是lambda表达式中
3.reduce中传进来的两个kv形式的DS中,key都是一样的,然后将他们俩的values求值,最后返回他们共同的key,以及value的和
4.最后求和其实可以用sum的算子,里面传入的是1。他也是作用在kv形式的DS上
5.其中kv1是上一次计算的结果(状态),kv2是这次传入的值
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo5Reduce {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> kvDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : value.split(",")) {out.collect(Tuple2.of(word, 1));}}}, Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//reduce(ReduceFunction<T> reducer)//public interface ReduceFunction<T> extends FunctionDataStream<Tuple2<String, Integer>> reduceDS = keyByDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}});// reduceDS.print();//使用lambda表达式DataStream<Tuple2<String, Integer>> reduceDS1 = keyByDS.reduce((kv1, kv2) -> {String word = kv1.f0;int count = kv1.f1 + kv2.f1;return Tuple2.of(word, count);});reduceDS1.print();env.execute();}
}
4.6 window
1.这里简单的说一下窗口,后面会细说。
2.也只能作用在kv形式的DS上
package com.shujia.flink.tf;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo6Window {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> lineDS = env.socketTextStream("master", 8888);/** 每隔5秒统计最近15秒每个单词的数量 --- 滑动窗口*///转换成kvDataStream<Tuple2<String, Integer>> kvDS = lineDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//按照单词分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//划分窗口//SlidingProcessingTimeWindows:滑动的处理时间窗口//WindowedStream<T, KEY, W> windowWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(16), Time.seconds(5)));//这个时间是每隔5秒处理一次15秒里窗口的数据,如果前15秒过去了,只会计算后面的15秒出现的数据/** 比如 0-5-10-15-20-25,在0-5里面输入了hjx,hjx,java,那么结果是(hjx,2),(java,1)* 然后在5-15里面输入了kkk,lll,hjx,结果是(hjx,3),(java,1)(lll,1),(kkk,1)* 但是这个时候执行了第二个窗口,5-20秒的窗口,里面数据只有kkk,lll,hjx* */windowDS.sum(1).print();env.execute();}
}
4.7 union
1.只是代码层面上的合并,其实数据没有合并
package com.shujia.flink.tf;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo7Union {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> ds1 = env.socketTextStream("master", 8888);DataStreamSource<String> ds2 = env.socketTextStream("master", 9999);/** union:合并两个DS* 在数据层面并没有合并,只是在逻辑层面合并了*/DataStream<String> unionDS = ds1.union(ds2);unionDS.print();env.execute();}
}
4.8 process
1.是flink的底层算子,可以代替map,flatMap,filter。
package com.shujia.flink.tf;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class Demo8ProcessFunction {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);//public abstract class ProcessFunction<I, O> extends AbstractRichFunction// process(ProcessFunction<T, R> processFunction)DataStream<Tuple2<String, Integer>> kvDS = linesDS.process(new ProcessFunction<String, Tuple2<String, Integer>>() {@Overridepublic void processElement(String value,ProcessFunction<String, Tuple2<String, Integer>>.Context ctx,Collector<Tuple2<String, Integer>> out) throws Exception {/**value:一行数据* ctx:上下文对象* out:将数据发送到下游*/for (String word : value.split(",")) {out.collect(Tuple2.of(word, 1));}}});kvDS.print();env.execute();}
}
五 Sink
5.1 fileSink
1.记住核心参数就可以
写入的位置,编码:forRowFormat
指定策略:withRollingPolicy
package com.shujia.flink.sink;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;public class Demo1FileSInk {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);FileSink<String> fileSink = FileSink//final Path basePath, final Encoder<IN> encoder指定数据的格式以及存放的位置.<String>forRowFormat(new Path("flink/data/words"),new SimpleStringEncoder<>("UTF-8"))//指定策略//withRollingPolicy(final RollingPolicy<IN, String> policy).withRollingPolicy(DefaultRollingPolicy.builder()包含了至少10秒的数据量.withRolloverInterval(Duration.ofSeconds(10))从没接收延时10秒之外的新纪录.withInactivityInterval(Duration.ofSeconds(10))//文件大小已经达到 1MB(写入最后一条记录之后).withMaxPartSize(MemorySize.ofMebiBytes(1)).build()).build();//使用fileSinklinesDS.sinkTo(fileSink);env.execute();}
}
5.2 自定义sink
1.自定义sink需要实现SinkFunction接口,重写invoke方法
2.addSink里面传入的是一个实现了SinkFunction接口的类的对象
3.SinkFunction的泛型要与写入数据的类型一样
package com.shujia.flink.sink;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;public class Demo2MySInk {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);linesDS.addSink(new MySink());env.execute();}
}
class MySink implements SinkFunction<String>{//自定义sink需要实现SinkFunction,重写invoke方法,// invoke里面可以写数据保存的位置,也可以直接输出@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println("自定义sink"+value);//自定义sink+端口里面输出的}
}
5.3 实例
1.如果是实现SinkFunction接口,那么invoke方法每次都要执行一次连接数据库,浪费资源。所以自定义sink写入数据库时候,继承RichSinkFunction抽象类。里面的open与close只执行一次,刚刚好可以用来连接数据库与关闭数据库
package com.shujia.flink.sink;import com.mysql.jdbc.Connection;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.DriverManager;public class Demo3MySqlSInk {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> countDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0).sum(1);countDS.addSink(new MySqlSInk());env.execute();}
}/*** 自定义sink将数据保存到mysql* RichSinkFunction:多了open和close方法,用于打开和关闭连接* SinkFunction*/
class MySqlSInk extends RichSinkFunction<Tuple2<String, Integer>> {Connection con;PreparedStatement stat;@Overridepublic void invoke(Tuple2<String, Integer> value, Context context) throws Exception {stat.setString(1, value.f0);stat.setInt(2, value.f1);stat.execute();}@Overridepublic void open(Configuration parameters) throws Exception {//创建数据库驱动Class.forName("com.mysql.jdbc.Driver");con = (Connection) DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false", "root", "123456");//replace into 替换插入,如果没有就插入,如果有就更新,表需要有主键stat = (PreparedStatement) con.prepareStatement("replace into word_count values(?,?)");}@Overridepublic void close() throws Exception {stat.close();con.close();}}