Fiink的简单学习一

一 相关大数据概念

1.1 根据时间

1.实时计算:

数据实时处理,结果实时存储

是一种持续、低时延、事件触发的任务

2.离线计算:

数据延迟处理,结果N+1模式(昨天的数据今天存储)

是一种批量、高时延、主动发起的计算任务

1.2 处理方式

1.流式处理:

一次处理一条或者少量;状态小

2.批量处理:

处理大量数据;处理完返回最终结果

二 Flink的架构以及工作原理

2.1 相关概念

Apache Flink 是一个实时计算框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

2.2 特性

1.支持高吞吐、低延迟、高性能的流处理

2.支持带有事件时间的窗口(Window)操作

3.支持有状态计算的Exactly-once语义

4.支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作

5.支持具有反压功能的持续流模型

6.支持基于轻量级分布式快照(Snapshot)实现的容错

7.一个运行时同时支持Batch on Streaming处理和Streaming处理

8.Flink在JVM内部实现了自己的内存管理,避免了出现oom

9.支持迭代计算

10.支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

2.3 wordConut 代码

2.3.1 流处理

1.流处理的模式

setRuntimeMode(RuntimeExecutionMode.STREAMING)(持续流模型)

2.读取文件的方法

无界流:socketTextStream

有界流:readTextFile

3.输出结果是:连续型的(累加的)

package com.shujia.flink.core;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo1StreamWordCount {public static void main(String[] args)throws Exception {//1、创建flink的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);//2、读取数据//nc -lk 8888//DataStreamSource是DataStream的子类,本来读文件返回的类型是DataStreamSource//但是为了整洁使用DataStreamDataStream<String> wordDS = env.socketTextStream("master", 8888);//统计单词数量//SingleOutputStreamOperator是DataStream的子类,map返回的类型是SingleOutputStreamOperator//但是为了整洁使用DataStream/*使用的是Tuple2类中的of方法,转化成kv形式的二元组但是这样后面需要加上二元组的类型,需要手动加入Types:是flink中的类*/DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word-> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//3、统计单词的数量KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//对下标为1的列求和//SingleOutputStreamOperator是DataStream的子类,sum返回的类型是SingleOutputStreamOperator//但是为了整洁使用DataStreamDataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);//打印数据countDS.print();//启动flinkenv.execute();}
}

2.3.2 批处理

1.RuntimeExecutionMode.BATCH(MR模型)

2.批处理模式只能用于处理有界流

3.输出结果是最终结果

package com.shujia.flink.core;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2BatchWordCount {public static void main(String[] args)throws Exception {//1、创建flink的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//修改处理模式/**处理模式* RuntimeExecutionMode.BATCH:批处理模式(MapReduce模型)* 1、输出最终结果* 2、批处理模式只能用于处理有界流** RuntimeExecutionMode.STREAMING:流处理模式(持续流模型)* 1、输出连续结果* 2、流处理模式,有界流核无界流都可以处理*/env.setRuntimeMode(RuntimeExecutionMode.BATCH);//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);//2、读取文件--有界流//DataStreamSource是DataStream的子类,本来读文件返回的类型是DataStreamSource//但是为了整洁使用DataStreamDataStream<String> wordDS = env.readTextFile("filnk/data/words.txt");//统计单词数量//SingleOutputStreamOperator是DataStream的子类,map返回的类型是SingleOutputStreamOperator//但是为了整洁使用DataStream/*使用的是Tuple2类中的of方法,转化成kv形式的二元组但是这样后面需要加上二元组的类型,需要手动加入Types:是flink中的类*/DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word-> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//3、统计单词的数量KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//对下标为1的列求和//SingleOutputStreamOperator是DataStream的子类,sum返回的类型是SingleOutputStreamOperator//但是为了整洁使用DataStreamDataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);//打印数据countDS.print();//启动flinkenv.execute();}
}

2.4 原理

1.创建环境 socket

2.使用socketTextStream读取文件,形成一个task任务(因为读取文件只支持单线程,所以只有一个task)

3.map是分组,形成两个task任务

4.keyBy,分组(相同的key被分到一个task中),里面有一个shuffle,也是两个task任务。这里之前被称为上游,之后称为下游。

5.sum。求和。将keyBy里面已经分好的task任务里面的元素求和

6.打印。

 spark与flink的区别

spark是MR模型,先执行map task 再执行reduce task。MR模型可以再map端进行预聚合,shuffle减少了数据的传输

flink是持续流模型,上游task与下游tsak同时启动,等待数据到达,每一条数据都会进行计算,出来结果。

三 source

3.1 集合source

1.将java中的集合变成一个source

package com.shujia.flink.source;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class Demo1ListSource {public static void main(String[] args)throws Exception {//创建flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);ArrayList<String> list = new ArrayList<>();list.add("java");list.add("java");list.add("java");list.add("java");list.add("java");list.add("java");//DataStream<String> listDS = env.fromCollection(list);listDS.print();env.execute();}
}

3.2 fileSource

3.2.1.fileSource的有界流读法

1.创建一个FileSource对象。使用forRecordStreamFormat方法,里面传的参数是格式与编码的对象TextLineInputFormat,与读取文件路径的对象Path。然后整体使用built创建

2.无界流读取的是文件

3.使用fileSource。使用flink环境对象env中的fromSource方法,里面传的三个参数是:

fileSource对象,WatermarkStrategy.noWatermarks(),与数据source的名字。这里创建的对象是DS对象

4.代码

package com.shujia.flink.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2FileSource {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);/** 1、老版本读取文件  -- 有界流*/DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//        linesDS.print();/**2、型版本  --可有界读取也可以无界读取*///构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("flink/data/students.csv")).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");fileDS.print();env.execute();}
}

5.结果

 3.2.2 fileSource的无界流读法

1.与有界流读法的差别就是在创建FileSource对象时,多设置了一步,每隔一段时间读取目录下新的文件,构建无界流。方法是:

monitorContinuously(Duration.ofSeconds(5)),

2.这个读取是文件夹,所以设置forRecordStreamFormat里面传的参数Path读取的是文件夹

3.代码

package com.shujia.flink.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class Demo2FileSource {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);/** 1、老版本读取文件  -- 有界流*/DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//        linesDS.print();/**2、型版本  --可有界读取也可以无界读取*///构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("flink/data/stu"))//每隔一段时间读取目录下新的文件,构建无界流.monitorContinuously(Duration.ofSeconds(5)).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");fileDS.print();env.execute();}
}

4.结果

3.2.3 fileSource批流处理

1. 流批统一:同一套算子代码既能做流处理也能做批处理

                      同一个file数据源,既能有界读取也能无界读取 

2.批处理(有界流)

package com.shujia.flink.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class Demo2FileSource {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);//设置处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);/** 1、老版本读取文件  -- 有界流*/DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//        linesDS.print();/**2、型版本  --可有界读取也可以无界读取*///构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("flink/data/stu"))//每隔一段时间读取目录下新的文件,构建无界流
//                .monitorContinuously(Duration.ofSeconds(5)).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");DataStream<Tuple2<String, Integer>> kvDS = fileDS.map(stu -> Tuple2.of(stu.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);countDS.print();
//        fileDS.print();env.execute();}
}

 运行结果:是最终结果

3.流处理(无界流)

package com.shujia.flink.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class Demo2FileSource {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task//这个不给值默认值是电脑的核数env.setParallelism(2);//数据从上游发送到下游的延迟时间,默认200毫秒env.setBufferTimeout(200);//设置处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);/** 1、老版本读取文件  -- 有界流*/DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//        linesDS.print();/**2、型版本  --可有界读取也可以无界读取*///构建fileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),new Path("flink/data/stu"))//每隔一段时间读取目录下新的文件,构建无界流.monitorContinuously(Duration.ofSeconds(5)).build();//使用fileSourceDataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");DataStream<Tuple2<String, Integer>> kvDS = fileDS.map(stu -> Tuple2.of(stu.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);countDS.print();
//        fileDS.print();env.execute();}
}

结果是 连续型结果 

 3.3 自定义source

1.使用addSource方法,里面传入的是实现SourceFunction接口的对象,该接口有一个泛型

2.自己定义类实现SourceFunction,重写run()方法,run方法里面使用collect方法提交数据

package com.shujia.flink.source;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class Demo3MySource {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用自定义//addSource里面需要传一个实现了SourceFunction接口类的对象DataStream<Integer> linesDS = env.addSource(new MySource());linesDS.print();env.execute();}
}class MySource implements SourceFunction<Integer> {@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (true) {//发送数据//ctx是SourceContext对象ctx.collect(100);Thread.sleep(5);}}//cancel方法再任务取消的时候执行,一般用于回收资源@Overridepublic void cancel() {}
}

3.实例:连接Mysql数据库,读取数据库其中一个表的内容

其实Student类中使用了小辣椒的插件,简化的代码的冗余度

package com.shujia.flink.source;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class Demo4MySQLSource {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用自定义SourceDataStream<Student> lineDS = env.addSource(new MySQlSource());//        lineDS.print();DataStream<Tuple2<String, Integer>> kvDS = lineDS.map(line ->Tuple2.of(line.getClazz(), 1),Types.TUPLE(Types.STRING, Types.INT));DataStream<Tuple2<String, Integer>> countDS = kvDS.keyBy(kv -> kv.f0).sum(1);countDS.print();env.execute();}
}class MySQlSource implements SourceFunction<Student>{@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {//加载mysql驱动Class.forName("com.mysql.jdbc.Driver");//创建数据库连接对象Connection connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false","root","123456");//编写查询语句PreparedStatement sta = connection.prepareStatement("select * from students");//执行查询ResultSet resultSet = sta.executeQuery();//解析数据while (resultSet.next()){int id = resultSet.getInt("id");String name = resultSet.getString("name");int age = resultSet.getInt("age");String gender = resultSet.getString("gender");String clazz = resultSet.getString("clazz");//将数据发送到下游sourceContext.collect(new Student(id,name,age,gender,clazz));}//关闭查询连接sta.close();//关闭数据库连接connection.close();}@Overridepublic void cancel() {}
}/*** lombok插件作用:可以再代码编译时动态给代码增加方法(相当于scala中的case class)* /@Getter* /@Setter* 上面两个可以和起来成为一个@Data:这个是除了构造方法,其他方法都有,toString等等* / @AllArgsConstructor:这个是构造方法*/
@Data
@AllArgsConstructor
class Student{private Integer id;private String name;private Integer age;private String gender;private String clazz;
}

四 算子

4.1 map

1.map里面需要传入一个实现MapFunction的接口的对象,而MapFunction继承与Function。

2.重写map方法

package com.shujia.flink.tf;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo1Map {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);/** 1、使用匿名内部类的方式*///map里面需要传入一个MapFunction的对象//public interface MapFunction<T, O> extends FunctionSingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = linesDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});//        kvDS.print();/** 2、使用lambda表达式的方式  -- 简化的写法*/linesDS.map(word->Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).print();env.execute();}
}

4.2 flatMap

1.flapMap里面传入的是实现FlatMapFunction的接口的对象,而FlatMapFunction继承与Function。

2.重写flatMap方法,里面需要传两个参数,一个是传进来的数据,另一个是发送数据的接口类Collector。故使用lambda表达式时,->前面需要传2个值。

3.使用lambda表达式,最后要指定返回的类型,要不然会报错。

4.无论是lambda表达式还是用匿名内部类,里面的主要步骤还是循环发送数据到下游。这里的数据要循环,但是循环之前怎么做都可以,切分啊 转化啊等等。

package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo2FlatMap {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDs = env.socketTextStream("master", 8888);//flatMap(FlatMapFunction<T, R> flatMapper)//public interface FlatMapFunction<T, O> extends FunctionDataStream<String> flapMapDS = linesDs.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {for (String word : line.split(",")) {out.collect(word);}}});//        flapMapDS.print();linesDs.flatMap( (line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING).print();env.execute();}
}

4.3 filter

1.filter里面传入的是实现FilterFunction的接口的对象,而FilterFunction继承与Function。

2.lambda表达式中不需要传入返回值类型,因为他是boolean的值。

package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo3Filter {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");//filter(FilterFunction<T> filter)//public interface FilterFunction<T> extends Function// boolean filter(T value)DataStream<String> filterDS = linesDS.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {String clazz = value.split(",")[4];return "文科一班".equals(clazz);}});
//        filterDS.print();linesDS.filter(line->"文科一班".equals(line.split(",")[4])).print();env.execute();}
}

 4.4 keyBy

1.keyBy里面传入的是实现KeySelector接口的对象,KeySelector继承于Function

2.重写getKey方法,直接返回参数即可

3.他的作用就是将相同key发送到同一个task任务里面

4.他的返回值是kv形式的DS

package com.shujia.flink.tf;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo4KeyBy {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);//keyBy(KeySelector<T, K> key)//KeySelector<IN, KEY> extends FunctionKeyedStream<String, String> keyByDS = linesDS.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {return value;}});//将相同的key发送到同一个task中linesDS.keyBy(word->word).print();env.execute();}
}

 4.5 reduce

1.他只能作用在kv形式的DS上

//reduce(ReduceFunction<T> reducer)
//public interface ReduceFunction<T> extends Function

2.最后返回一个二元组的形式,无论是组合式还是lambda表达式中

3.reduce中传进来的两个kv形式的DS中,key都是一样的,然后将他们俩的values求值,最后返回他们共同的key,以及value的和

4.最后求和其实可以用sum的算子,里面传入的是1。他也是作用在kv形式的DS上

5.其中kv1是上一次计算的结果(状态),kv2是这次传入的值

package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo5Reduce {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> kvDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : value.split(",")) {out.collect(Tuple2.of(word, 1));}}}, Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//reduce(ReduceFunction<T> reducer)//public interface ReduceFunction<T> extends FunctionDataStream<Tuple2<String, Integer>> reduceDS = keyByDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}});//        reduceDS.print();//使用lambda表达式DataStream<Tuple2<String, Integer>> reduceDS1 = keyByDS.reduce((kv1, kv2) -> {String word = kv1.f0;int count = kv1.f1 + kv2.f1;return Tuple2.of(word, count);});reduceDS1.print();env.execute();}
}

4.6 window

1.这里简单的说一下窗口,后面会细说。

2.也只能作用在kv形式的DS上

package com.shujia.flink.tf;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo6Window {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> lineDS = env.socketTextStream("master", 8888);/** 每隔5秒统计最近15秒每个单词的数量 --- 滑动窗口*///转换成kvDataStream<Tuple2<String, Integer>> kvDS = lineDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//按照单词分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//划分窗口//SlidingProcessingTimeWindows:滑动的处理时间窗口//WindowedStream<T, KEY, W> windowWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(16), Time.seconds(5)));//这个时间是每隔5秒处理一次15秒里窗口的数据,如果前15秒过去了,只会计算后面的15秒出现的数据/** 比如 0-5-10-15-20-25,在0-5里面输入了hjx,hjx,java,那么结果是(hjx,2),(java,1)* 然后在5-15里面输入了kkk,lll,hjx,结果是(hjx,3),(java,1)(lll,1),(kkk,1)* 但是这个时候执行了第二个窗口,5-20秒的窗口,里面数据只有kkk,lll,hjx* */windowDS.sum(1).print();env.execute();}
}

4.7 union

1.只是代码层面上的合并,其实数据没有合并

package com.shujia.flink.tf;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo7Union {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> ds1 = env.socketTextStream("master", 8888);DataStreamSource<String> ds2 = env.socketTextStream("master", 9999);/** union:合并两个DS* 在数据层面并没有合并,只是在逻辑层面合并了*/DataStream<String> unionDS = ds1.union(ds2);unionDS.print();env.execute();}
}

4.8 process

1.是flink的底层算子,可以代替map,flatMap,filter。

package com.shujia.flink.tf;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class Demo8ProcessFunction {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);//public abstract class ProcessFunction<I, O> extends AbstractRichFunction// process(ProcessFunction<T, R> processFunction)DataStream<Tuple2<String, Integer>> kvDS = linesDS.process(new ProcessFunction<String, Tuple2<String, Integer>>() {@Overridepublic void processElement(String value,ProcessFunction<String, Tuple2<String, Integer>>.Context ctx,Collector<Tuple2<String, Integer>> out) throws Exception {/**value:一行数据* ctx:上下文对象* out:将数据发送到下游*/for (String word : value.split(",")) {out.collect(Tuple2.of(word, 1));}}});kvDS.print();env.execute();}
}

五 Sink

5.1 fileSink

1.记住核心参数就可以

        写入的位置,编码:forRowFormat

        指定策略:withRollingPolicy

package com.shujia.flink.sink;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;public class Demo1FileSInk {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);FileSink<String> fileSink = FileSink//final Path basePath, final Encoder<IN> encoder指定数据的格式以及存放的位置.<String>forRowFormat(new Path("flink/data/words"),new SimpleStringEncoder<>("UTF-8"))//指定策略//withRollingPolicy(final RollingPolicy<IN, String> policy).withRollingPolicy(DefaultRollingPolicy.builder()包含了至少10秒的数据量.withRolloverInterval(Duration.ofSeconds(10))从没接收延时10秒之外的新纪录.withInactivityInterval(Duration.ofSeconds(10))//文件大小已经达到 1MB(写入最后一条记录之后).withMaxPartSize(MemorySize.ofMebiBytes(1)).build()).build();//使用fileSinklinesDS.sinkTo(fileSink);env.execute();}
}

5.2 自定义sink

1.自定义sink需要实现SinkFunction接口,重写invoke方法

2.addSink里面传入的是一个实现了SinkFunction接口的类的对象

3.SinkFunction的泛型要与写入数据的类型一样

package com.shujia.flink.sink;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;public class Demo2MySInk {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);linesDS.addSink(new MySink());env.execute();}
}
class MySink implements SinkFunction<String>{//自定义sink需要实现SinkFunction,重写invoke方法,// invoke里面可以写数据保存的位置,也可以直接输出@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println("自定义sink"+value);//自定义sink+端口里面输出的}
}

5.3 实例

1.如果是实现SinkFunction接口,那么invoke方法每次都要执行一次连接数据库,浪费资源。所以自定义sink写入数据库时候,继承RichSinkFunction抽象类。里面的open与close只执行一次,刚刚好可以用来连接数据库与关闭数据库

package com.shujia.flink.sink;import com.mysql.jdbc.Connection;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.DriverManager;public class Demo3MySqlSInk {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> countDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0).sum(1);countDS.addSink(new MySqlSInk());env.execute();}
}/*** 自定义sink将数据保存到mysql* RichSinkFunction:多了open和close方法,用于打开和关闭连接* SinkFunction*/
class MySqlSInk extends RichSinkFunction<Tuple2<String, Integer>> {Connection con;PreparedStatement stat;@Overridepublic void invoke(Tuple2<String, Integer> value, Context context) throws Exception {stat.setString(1, value.f0);stat.setInt(2, value.f1);stat.execute();}@Overridepublic void open(Configuration parameters) throws Exception {//创建数据库驱动Class.forName("com.mysql.jdbc.Driver");con = (Connection) DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false", "root", "123456");//replace into 替换插入,如果没有就插入,如果有就更新,表需要有主键stat = (PreparedStatement) con.prepareStatement("replace into word_count values(?,?)");}@Overridepublic void close() throws Exception {stat.close();con.close();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/846778.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法金 | 再见,支持向量机 SVM!

大侠幸会&#xff0c;在下全网同名「算法金」 0 基础转 AI 上岸&#xff0c;多个算法赛 Top 「日更万日&#xff0c;让更多人享受智能乐趣」 一、SVM概述 定义与基本概念 支持向量机&#xff08;SVM&#xff09;是一种监督学习模型&#xff0c;用于解决分类和回归问题。它的核…

软件杯 题目:基于卷积神经网络的手写字符识别 - 深度学习

文章目录 0 前言1 简介2 LeNet-5 模型的介绍2.1 结构解析2.2 C1层2.3 S2层S2层和C3层连接 2.4 F6与C5层 3 写数字识别算法模型的构建3.1 输入层设计3.2 激活函数的选取3.3 卷积层设计3.4 降采样层3.5 输出层设计 4 网络模型的总体结构5 部分实现代码6 在线手写识别7 最后 0 前言…

基于springboot+vue的医院信息管理系统

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

HTML静态网页成品作业(HTML+CSS)—— 节日端午节介绍网页(5个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有5个页面。 二、作品演示 三、代…

Rust自动生成文件解析

目录 一、生成目录解析二、生成文件解析2.1 Cargo.toml2.2 main函数解析 一、生成目录解析 先使用cargo clean命令删除所有生成的文件&#xff0c;下图显示了目录结构和 main.rs文件 使用cargo new testrust时自动创建出名为testrust的Rust项目。内部主要包含一个src的源码文…

Qt——升级系列(Level Two):Hello Qt 程序实现、项目文件解析、

Hello Qt 程序实现 使用“按钮”实现 纯代码方式实现&#xff1a; // Widget构造函数的实现 Widget::Widget(QWidget *parent): QWidget(parent) // 使用父类构造函数初始化QWidget&#xff0c;传入父窗口指针, ui(new Ui::Widget) // 创建Ui::Widget类的实例&#xff0c;并…

切勿大意!痉挛性斜颈治疗中的三个重要“禁忌”,后果堪忧!

今天&#xff0c;要给大家讲一个非常重要的话题——痉挛性斜颈的治疗。痉挛性斜颈是一种常见的神经肌肉疾病&#xff0c;患者在日常生活中可能会遇到许多困扰和不便。因此&#xff0c;及早治疗对患者来说至关重要。 然而&#xff0c;在治疗痉挛性斜颈的过程中&#xff0c;千万切…

永磁同步电机高性能控制算法(12)——基于预测电流误差补偿的强鲁棒预测控制有限集预测控制与连续集预测控制的对比

1.文章简介 最近看到一篇比较有意思的文章&#xff0c;24年3月9日才刚刚收录。 众所周知模型预测控制受电机参数影响还是很大的。所以呢&#xff0c;各种观测器、参数辨识等算法都被用到预测控制中。 观测器设计的话就相对而言比较复杂&#xff1b;参数辨识也比较复杂&#x…

0基础学习Elasticsearch-使用Java操作ES

文章目录 1 背景2 前言3 Java如何操作ES3.1 引入依赖3.2 依赖介绍3.3 隐藏依赖3.4 初始化客户端&#xff08;获取ES连接&#xff09;3.5 发送请求给ES 1 背景 上篇学习了0基础学习Elasticsearch-Quick start&#xff0c;随后本篇研究如何使用Java操作ES 2 前言 建议通篇阅读再回…

MaxKey本地运行实战指南

MaxKey 本地运行总结 概述开发环境准备 主页传送门 &#xff1a; &#x1f4c0; 传送 概述 MaxKey单点登录认证系统&#xff0c;谐音为马克思的钥匙寓意是最大钥匙&#xff0c;是业界领先的IAM-IDaas身份管理和认证产品&#xff1b;支持OAuth 2.x/OpenID Connect、SAML 2.0、J…

记一次线上数据库连接超时异常问题

最近其他团队的开发人员告知我&#xff0c;我们项目有个feign接口调用失败了。我查看日志发现&#xff0c;其原因是尝试数据库连接超时&#xff0c;30秒内都没有连接成功。 我首先判断可能是网络不稳定&#xff0c;在一定时间内连接不上数据库。我登录到服务器环境看&#xff0…

德克萨斯大学奥斯汀分校自然语言处理硕士课程汉化版(第五周) - Transformer

Transformer 1. 注意力机制 在语言建模中&#xff0c;注意力(attention)是一个关键机制&#xff0c;用于在给定上下文中访问相关信息以进行预测。注意力机制允许模型根据输入上下文中的重要信息来加权关注不同的部分&#xff0c;并根据其重要性来决定对不同部分的关注程度。 …

【工具】探索 MOU:每用户通话时长

缘分让我们相遇乱世以外 命运却要我们危难中相爱 也许未来遥远在光年之外 我愿守候未知里为你等待 我没想到为了你我能疯狂到 山崩海啸没有你根本不想逃 我的大脑为了你已经疯狂到 脉搏心跳没有你根本不重要 &#x1f3b5; 邓紫棋《光年之外》 什么是 MOU…

discuz点微同城源码34.7+全套插件+小程序前端

discuz点微同城源码34.7全套插件小程序前后端 模板挺好看的 带全套插件 自己耐心点配置一下插件 可以H5可以小程序

YOLOv1深入解析与实战:目标检测算法原理

参考&#xff1a; https://zhuanlan.zhihu.com/p/667046384 https://blog.csdn.net/weixin_41424926/article/details/105383064 https://arxiv.org/pdf/1506.02640 1. 算法介绍 学习目标检测算法&#xff0c;yolov1是必看内容&#xff0c;不同于生成模型&#xff0c;没有特别…

CSAPP Lab07——Malloc Lab完成思路

完整代码见&#xff1a;CSAPP/malloclab-handout at main SnowLegend-star/CSAPP (github.com) Malloc Lab 按照惯例&#xff0c;我先是上来就把mm.c编译了一番&#xff0c;结果产生如下报错。搜索过后看样子应该是编译器的版本不匹配&#xff0c;得建立条软链接。 经过多番…

【数据结构】链式二叉树详解

个人主页~ 链式二叉树基本内容~ 链式二叉树详解 1、通过前序遍历的数组来构建二叉树2、二叉树的销毁3、二叉树节点个数4、二叉树叶子节点个数5、二叉树第k层节点个数6、二叉树查找7、前序遍历8、中序遍历9、后序遍历10、层序遍历与检查二叉树是否为完全二叉树Queue.hQueue.c层序…

WordPress子比内容同步插件

1.支持分类替换 将主站同步过来的文章分类进行替换 2.支持本地化文章图片 &#xff08;使用储存桶可能会导致无法保存图片&#xff09; 3.支持自定义文章作者&#xff08;选择多个作者则同步到的文章作者将会随机分配&#xff09; 4.支持将同步过来的文章自定义文章状态&…

Java | Leetcode Java题解之第128题最长连续序列

题目&#xff1a; 题解&#xff1a; class Solution {public int longestConsecutive(int[] nums) {Set<Integer> num_set new HashSet<Integer>();for (int num : nums) {num_set.add(num);}int longestStreak 0;for (int num : num_set) {if (!num_set.contai…

乡村振兴与文化传承:挖掘乡村历史文化资源,传承乡村优秀传统,打造具有地方特色的美丽乡村文化品牌

目录 一、引言 二、乡村历史文化资源的挖掘与保护 &#xff08;一&#xff09;乡村历史文化资源的内涵 &#xff08;二&#xff09;乡村历史文化资源的挖掘 &#xff08;三&#xff09;乡村历史文化资源的保护 三、乡村优秀传统的传承与创新 &#xff08;一&#xff09;…