Flink学习笔记
前言:今天是学习 flink 的第 16 天啦!学习了 flinkSQL 与企业级常用外部系统结合,主要是解决大数据领域数据计算后,写入到文件,kafka,还是mysql等 sink 的问题,即数据计算完后保存到哪里的问题!结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!
Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!
喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"
文章目录
- Flink学习笔记
- 二、FlinkSQL 连接外部系统
- 1. 输出到文件
- 2. 更新模式(Update Mode)
- 2.1 追加模式(Append Mode)
- 2.2 撤回模式(Retract Mode)
- 2.3 更新插入模式(Upsert Mode)
- 3. 写入到 Kafka
- 4. 写入到 MySQL
二、FlinkSQL 连接外部系统
1. 输出到文件
例子:将表结果输出到文件系统中
package cn.itcast.day01.sink;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** @author lql* @time 2024-03-12 15:25:14* @description TODO*/
public class FsSinkTest {public static void main(String[] args) throws Exception {// todo 1) 配置 table 环境// 1. 配置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置设置环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 3. 配置表环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);// todo 2) 从文件中读取数据String filePath = FsSinkTest.class.getClassLoader().getResource("order.csv").getPath();DataStreamSource<String> inputStream = env.readTextFile(filePath);// todo 3) 将表数据转化类型SingleOutputStreamOperator<OrderInfo> stream = inputStream.map(new MapFunction<String, OrderInfo>() {@Overridepublic OrderInfo map(String data) throws Exception {String[] dataArrary = data.split(",");return new OrderInfo(dataArrary[0],dataArrary[1],Double.parseDouble(dataArrary[2]),dataArrary[3]);}});// todo 4) 将数据流转化为表Table table = bsTableEnv.fromDataStream(stream);// todo 5) 调用 api 方式Table result = table.select($("id"), $("timestamp"), $("money"), $("category")).filter($("category").isEqual("电脑"));// todo 6) 将表转化为流打印bsTableEnv.toAppendStream(result, Row.class).print("结果数据>>>");// todo 7) 将查询的结果写入到文件中ConnectTableDescriptor connectTableDescriptor = bsTableEnv.connect(new FileSystem().path("D:\\IDEA_Project\\BigData_Java\\flinksql_pro\\data\\output\\order.txt")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("name", DataTypes.STRING()).field("money", DataTypes.DOUBLE()).field("category", DataTypes.STRING()));// todo 8) 将通过connect创建的输出文件注册为表对象connectTableDescriptor.createTemporaryTable("outputOrder");// todo 9) 将表查询的结果插入到临时表中table.executeInsert("outputOrder");// todo 10) 执行程序env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class OrderInfo {private String id;private String timestamp;private Double money;private String category;}
}
结果:在 output 目录下生成了一个 order.txt 的文件
总结:
- 1- connect 方法:存储在哪里,存储什么地方,存储什么格式
- 2- createTemporaryTable():创建临时表
- 3- 将之前结果表插入到临时表中
2. 更新模式(Update Mode)
2.1 追加模式(Append Mode)
- 表(动态表)和外部连接器只交换插入(Insert)消息。
2.2 撤回模式(Retract Mode)
- 表和外部连接器交换的是添加(Add)和撤回(Retract)消息。
- 应用场景:
- 当插入数据时,它会被编码为添加消息。
- 当删除数据时,它会被编码为撤回消息;
- 当更新数据时,会先发送一个已更新行的撤回消息,然后再发送一个更新行的添加消息。
- 这种模式允许对表中的数据进行修改和删除,但需要注意的是,它不能定义key。
2.3 更新插入模式(Upsert Mode)
- 动态表和外部连接器交换 Upsert 和 Delete 消息。
- 这种模式需要一个唯一的 key,通过这个 key 可以传递更新消息。
- 应用场景:
- 当插入数据时,它会使用 Upsert 消息。
- 当删除数据时,它会使用 Delete 消息。
- 当更新数据时,它也会使用 Upsert 消息,并通过 key 来标识要更新的行。
- 这种模式在效率上更高,因为它只需要发送一条消息即可完成更新操作。
案例演示:从 kafka 读取数据,实时聚合操作,撤回模式
package cn.itcast.day01.sink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** @author lql* @time 2024-03-12 17:33:03* @description TODO*/
public class KafkaSinkTest {public static void main(String[] args) throws Exception {// todo 1) 初始化 flinkSQL 环境// 1.1 配置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.2 配置setting环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 1.3 配置表表环境StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env, bsSettings);// todo 2) 读取 kafka 数据源tbEnv.connect(new Kafka().version("universal") // 指定 kafka 版本.topic("order") // 定义主题.property("bootstrap.servers","node1:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.STRING()).field("money", DataTypes.DOUBLE()).field("category", DataTypes.STRING()).field("pt", DataTypes.TIMESTAMP(3))// 使用 protime,指定字段名定义处理时间字段// 这个 proctime 属性只能通过附加逻辑字段,进行扩展物理 schema.proctime()).createTemporaryTable("kafkaInputTable");// todo 3) 表的查询操作// 3.1 通过表环境获取到数据表:fromTable orderTable = tbEnv.from("kafkaInputTable");// 3.2 将表转为 stream 后打印tbEnv.toAppendStream(orderTable, Row.class).print("Table API>>>>");// 3.2 调用 table api 进行聚合操作Table aggResultTable = orderTable.groupBy($("category")).select($("category"),$("money").sum().as("totalMoney"),$("id").count().as("cnt"));tbEnv.toRetractStream(aggResultTable,Row.class).print("agg result:>>>");// todo 4) 启动程序,将数据写入到kafka的时候,可以不加execute代码env.execute();}
}
结果:显示 false 即撤回,显示 true 即添加
Table API>>>>> +I[user_001, 1621718199, 10.1, 电脑, 2024-03-12T10:42:18.335Z]
agg result:>>>> (true,+I[电脑, 10.1, 1])
Table API>>>>> +I[user_001, 1621718201, 14.1, 手机, 2024-03-12T10:42:33.626Z]
agg result:>>>> (true,+I[手机, 14.1, 1])
Table API>>>>> +I[user_002, 1621718202, 82.5, 手机, 2024-03-12T10:42:50.130Z]
agg result:>>>> (false,-U[手机, 14.1, 1])
agg result:>>>> (true,+U[手机, 96.6, 2])
总结:
- 实时聚合操作结果不可以简单 toAppendStream 打印,需要使用更新模式
toRetractStream
! - 这种
聚合结果
更新操作暂时不适合写入 kafka!
3. 写入到 Kafka
例子:将查询的结果数据写入到 kafka 中
package cn.itcast.day01.sink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** @author lql* @time 2024-03-12 19:29:56* @description TODO*/
public class KafkaSinkTest1 {public static void main(String[] args) throws Exception {// Todo 1) 配置 flink SQL 环境// 1. 配置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置 flink settings 环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 3. 配置表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);// Todo 2) kafka 数据源tableEnv.connect(new Kafka().version("universal").topic("order").property("bootstrap.servers","node1:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.STRING()).field("money", DataTypes.DOUBLE()).field("category", DataTypes.STRING()).field("pt", DataTypes.TIMESTAMP(3))// 使用 protime,指定字段名定义处理时间字段// 这个 proctime 属性只能通过附加逻辑字段,进行扩展物理 schema.proctime()).createTemporaryTable("kafkaInputTable");// Todo 3) table API方式提取数据// 3.1 通过表环境获取数据表Table ordertable = tableEnv.from("kafkaInputTable");tableEnv.toAppendStream(ordertable, Row.class).print("Table API>>>");// 3.2 编写逻辑提取数据Table tableResult = ordertable.select($("id"), $("timestamp"), $("money"), $("category")).filter($("category").isEqual("电脑"));// 3.3 将表数据转化为流数据打印tableEnv.toAppendStream(tableResult, Row.class).printToErr("API 抽取的数据>>>");// Todo 4) 将抽取的数据写入到 kafka 中tableEnv.connect(new Kafka().version("universal") //指定版本.topic("orderResult")//定义主题.property("bootstrap.servers", "node1:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.STRING()).field("money", DataTypes.DOUBLE()).field("category", DataTypes.STRING())).createTemporaryTable("kafkaOutputTable");//todo 5)将查询结果输出到kafka中tableResult.executeInsert("kafkaOutputTable");//todo 6) 注意:将数据写入到kafka的时候,可以不加execute代码env.execute();}
}
结果:kafka 的 orderResult 这个主题中保存了数据!
总结:
- 结果表,调用 executeInsert 写入到 sink 表中!
- 在读取数据源的时候,添加了一个字段 pt,调用 proctime 方法,作为处理时间!
4. 写入到 MySQL
样本数据:
{"id":1,"timestamp":"2020-05-08T01:03.00Z","category":"电脑","areaName":"石家庄","money":"1450"}
{"id":2,"timestamp":"2020-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}
{"id":3,"timestamp":"2020-05-08T01:03.00Z","category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":"2020-05-08T05:01.00Z","category":"电脑","areaName":"上海","money":"1513"}
{"id":5,"timestamp":"2020-05-08T01:03.00Z","category":"家电","areaName":"北京","money":"1550"}
{"id":6,"timestamp":"2020-05-08T01:01.00Z","category":"电脑","areaName":"深圳","money":"1550"}
mysql 数据表:
DROP TABLE IF EXISTS `order_test`;
CREATE TABLE `order_test` (`id` varchar(255) NOT NULL,`timestamp` varchar(255) DEFAULT NULL,`category` varchar(255) DEFAULT NULL,`areaName` varchar(255) DEFAULT NULL,`money` double DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
案例演示:
package cn.itcast.day01.sink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @author lql* @time 2024-03-12 20:35:31* @description TODO*/
public class MySQLSinkTest {public static void main(String[] args) throws Exception {// Todo 1) 配置 flink SQL 环境// 1. 配置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置 flink settings 环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 3. 配置表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);// Todo 2) 配置 kafka 数据源,使用建表的方式,注意指定数据源是 jsonString sourceTable = "CREATE TABLE KafkaInputTable (\n" +" `id` varchar,\n" +" `timestamp` varchar,\n" +" `category` varchar,\n" +" `areaName` varchar,\n" +" `money` double\n" +") WITH (\n" +" 'connector' = 'kafka',\n" +" 'topic' = 'order',\n" +" 'properties.bootstrap.servers' = 'node1:9092',\n" +" 'properties.group.id' = 'testGroup',\n" +" 'scan.startup.mode' = 'earliest-offset',\n" +" 'format' = 'json'\n" +")";// 执行建立数据源表语句tableEnv.executeSql(sourceTable);// Todo 3) 表的查询Table orderTable = tableEnv.from("KafkaInputTable");// 将表数据转化为 datastream,并打印出来tableEnv.toAppendStream(orderTable, Row.class).print("SQL>>>");// Todo 4) 将结果数据写入到 mysql中String sinkTable = "CREATE TABLE order_test (\n" +" `id` varchar,\n" +" `timestamp` varchar,\n" +" `category` varchar,\n" +" `areaName` varchar,\n" +" `money` double\n" +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'url' = 'jdbc:mysql://node1:3306/test?characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'order_test'," +" 'driver'='com.mysql.jdbc.Driver'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'sink.buffer-flush.interval'='1s'," +" 'sink.buffer-flush.max-rows'='1'," +" 'sink.max-retries' = '5'" +")";// 执行建表数据tableEnv.executeSql(sinkTable);// 插入语句逻辑//定义sql语句String insert = "INSERT INTO order_test SELECT * FROM KafkaInputTable";//todo 5)将源表的数据写入到目标表中tableEnv.executeSql(insert);env.execute();}
}
结果:json 数据源源不断,解析到 mysql 里面
总结:
- kafka 作为数据源,mysql 作为 sink,都可以用建表的方式解决!
- 原理就是查询一个表,插入到另一个表中!