flink重温笔记(十六): flinkSQL 顶层 API ——实时数据流结合外部系统

Flink学习笔记

前言:今天是学习 flink 的第 16 天啦!学习了 flinkSQL 与企业级常用外部系统结合,主要是解决大数据领域数据计算后,写入到文件,kafka,还是mysql等 sink 的问题,即数据计算完后保存到哪里的问题!结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Flink学习笔记
    • 二、FlinkSQL 连接外部系统
      • 1. 输出到文件
      • 2. 更新模式(Update Mode)
        • 2.1 追加模式(Append Mode)
        • 2.2 撤回模式(Retract Mode)
        • 2.3 更新插入模式(Upsert Mode)
      • 3. 写入到 Kafka
      • 4. 写入到 MySQL

二、FlinkSQL 连接外部系统

1. 输出到文件

例子:将表结果输出到文件系统中

package cn.itcast.day01.sink;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** @author lql* @time 2024-03-12 15:25:14* @description TODO*/
public class FsSinkTest {public static void main(String[] args) throws Exception {// todo 1) 配置 table 环境// 1. 配置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置设置环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 3. 配置表环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);// todo 2) 从文件中读取数据String filePath = FsSinkTest.class.getClassLoader().getResource("order.csv").getPath();DataStreamSource<String> inputStream = env.readTextFile(filePath);// todo 3) 将表数据转化类型SingleOutputStreamOperator<OrderInfo> stream = inputStream.map(new MapFunction<String, OrderInfo>() {@Overridepublic OrderInfo map(String data) throws Exception {String[] dataArrary = data.split(",");return new OrderInfo(dataArrary[0],dataArrary[1],Double.parseDouble(dataArrary[2]),dataArrary[3]);}});// todo 4) 将数据流转化为表Table table = bsTableEnv.fromDataStream(stream);// todo 5) 调用 api 方式Table result = table.select($("id"), $("timestamp"), $("money"), $("category")).filter($("category").isEqual("电脑"));// todo 6) 将表转化为流打印bsTableEnv.toAppendStream(result, Row.class).print("结果数据>>>");// todo 7) 将查询的结果写入到文件中ConnectTableDescriptor connectTableDescriptor = bsTableEnv.connect(new FileSystem().path("D:\\IDEA_Project\\BigData_Java\\flinksql_pro\\data\\output\\order.txt")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("name", DataTypes.STRING()).field("money", DataTypes.DOUBLE()).field("category", DataTypes.STRING()));// todo 8) 将通过connect创建的输出文件注册为表对象connectTableDescriptor.createTemporaryTable("outputOrder");// todo 9) 将表查询的结果插入到临时表中table.executeInsert("outputOrder");// todo 10) 执行程序env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class OrderInfo {private String id;private String timestamp;private Double money;private String category;}
}

结果:在 output 目录下生成了一个 order.txt 的文件

总结:

  • 1- connect 方法:存储在哪里,存储什么地方,存储什么格式
  • 2- createTemporaryTable():创建临时表
  • 3- 将之前结果表插入到临时表中

2. 更新模式(Update Mode)

2.1 追加模式(Append Mode)
  • 表(动态表)和外部连接器只交换插入(Insert)消息。

2.2 撤回模式(Retract Mode)
  • 表和外部连接器交换的是添加(Add)和撤回(Retract)消息。
  • 应用场景:
    • 插入数据时,它会被编码为添加消息。
    • 删除数据时,它会被编码为撤回消息;
    • 更新数据时,会先发送一个已更新行的撤回消息,然后再发送一个更新行的添加消息。
  • 这种模式允许对表中的数据进行修改和删除,但需要注意的是,它不能定义key

2.3 更新插入模式(Upsert Mode)
  • 动态表和外部连接器交换 Upsert 和 Delete 消息。
  • 这种模式需要一个唯一的 key,通过这个 key 可以传递更新消息。
  • 应用场景:
    • 插入数据时,它会使用 Upsert 消息。
    • 删除数据时,它会使用 Delete 消息。
    • 更新数据时,它也会使用 Upsert 消息,并通过 key 来标识要更新的行。
  • 这种模式在效率上更高,因为它只需要发送一条消息即可完成更新操作。

案例演示:从 kafka 读取数据,实时聚合操作,撤回模式

package cn.itcast.day01.sink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** @author lql* @time 2024-03-12 17:33:03* @description TODO*/
public class KafkaSinkTest {public static void main(String[] args) throws Exception {// todo 1) 初始化 flinkSQL 环境// 1.1 配置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.2 配置setting环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 1.3 配置表表环境StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env, bsSettings);// todo 2) 读取 kafka 数据源tbEnv.connect(new Kafka().version("universal") // 指定 kafka 版本.topic("order") // 定义主题.property("bootstrap.servers","node1:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.STRING()).field("money", DataTypes.DOUBLE()).field("category", DataTypes.STRING()).field("pt", DataTypes.TIMESTAMP(3))// 使用 protime,指定字段名定义处理时间字段// 这个 proctime 属性只能通过附加逻辑字段,进行扩展物理 schema.proctime()).createTemporaryTable("kafkaInputTable");// todo 3) 表的查询操作// 3.1 通过表环境获取到数据表:fromTable orderTable = tbEnv.from("kafkaInputTable");// 3.2 将表转为 stream 后打印tbEnv.toAppendStream(orderTable, Row.class).print("Table API>>>>");// 3.2 调用 table api 进行聚合操作Table aggResultTable = orderTable.groupBy($("category")).select($("category"),$("money").sum().as("totalMoney"),$("id").count().as("cnt"));tbEnv.toRetractStream(aggResultTable,Row.class).print("agg result:>>>");// todo 4) 启动程序,将数据写入到kafka的时候,可以不加execute代码env.execute();}
}

结果:显示 false 即撤回,显示 true 即添加

Table API>>>>> +I[user_001, 1621718199, 10.1, 电脑, 2024-03-12T10:42:18.335Z]
agg result:>>>> (true,+I[电脑, 10.1, 1])
Table API>>>>> +I[user_001, 1621718201, 14.1, 手机, 2024-03-12T10:42:33.626Z]
agg result:>>>> (true,+I[手机, 14.1, 1])
Table API>>>>> +I[user_002, 1621718202, 82.5, 手机, 2024-03-12T10:42:50.130Z]
agg result:>>>> (false,-U[手机, 14.1, 1])
agg result:>>>> (true,+U[手机, 96.6, 2])

总结:

  • 实时聚合操作结果不可以简单 toAppendStream 打印,需要使用更新模式toRetractStream
  • 这种聚合结果更新操作暂时不适合写入 kafka!

3. 写入到 Kafka

例子:将查询的结果数据写入到 kafka 中

package cn.itcast.day01.sink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** @author lql* @time 2024-03-12 19:29:56* @description TODO*/
public class KafkaSinkTest1 {public static void main(String[] args) throws Exception {// Todo 1) 配置 flink SQL 环境// 1. 配置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置 flink settings 环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 3. 配置表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);// Todo 2) kafka 数据源tableEnv.connect(new Kafka().version("universal").topic("order").property("bootstrap.servers","node1:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.STRING()).field("money", DataTypes.DOUBLE()).field("category", DataTypes.STRING()).field("pt", DataTypes.TIMESTAMP(3))// 使用 protime,指定字段名定义处理时间字段// 这个 proctime 属性只能通过附加逻辑字段,进行扩展物理 schema.proctime()).createTemporaryTable("kafkaInputTable");// Todo 3) table API方式提取数据// 3.1 通过表环境获取数据表Table ordertable = tableEnv.from("kafkaInputTable");tableEnv.toAppendStream(ordertable, Row.class).print("Table API>>>");// 3.2 编写逻辑提取数据Table tableResult = ordertable.select($("id"), $("timestamp"), $("money"), $("category")).filter($("category").isEqual("电脑"));// 3.3 将表数据转化为流数据打印tableEnv.toAppendStream(tableResult, Row.class).printToErr("API 抽取的数据>>>");// Todo 4) 将抽取的数据写入到 kafka 中tableEnv.connect(new Kafka().version("universal") //指定版本.topic("orderResult")//定义主题.property("bootstrap.servers", "node1:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.STRING()).field("money", DataTypes.DOUBLE()).field("category", DataTypes.STRING())).createTemporaryTable("kafkaOutputTable");//todo 5)将查询结果输出到kafka中tableResult.executeInsert("kafkaOutputTable");//todo 6) 注意:将数据写入到kafka的时候,可以不加execute代码env.execute();}
}

结果:kafka 的 orderResult 这个主题中保存了数据!

总结:

  • 结果表,调用 executeInsert 写入到 sink 表中!
  • 在读取数据源的时候,添加了一个字段 pt,调用 proctime 方法,作为处理时间!

4. 写入到 MySQL

样本数据:

{"id":1,"timestamp":"2020-05-08T01:03.00Z","category":"电脑","areaName":"石家庄","money":"1450"}
{"id":2,"timestamp":"2020-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}
{"id":3,"timestamp":"2020-05-08T01:03.00Z","category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":"2020-05-08T05:01.00Z","category":"电脑","areaName":"上海","money":"1513"}
{"id":5,"timestamp":"2020-05-08T01:03.00Z","category":"家电","areaName":"北京","money":"1550"}
{"id":6,"timestamp":"2020-05-08T01:01.00Z","category":"电脑","areaName":"深圳","money":"1550"}

mysql 数据表:

DROP TABLE IF EXISTS `order_test`;
CREATE TABLE `order_test` (`id` varchar(255) NOT NULL,`timestamp` varchar(255) DEFAULT NULL,`category` varchar(255) DEFAULT NULL,`areaName` varchar(255) DEFAULT NULL,`money` double DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

案例演示:

package cn.itcast.day01.sink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @author lql* @time 2024-03-12 20:35:31* @description TODO*/
public class MySQLSinkTest {public static void main(String[] args) throws Exception {// Todo 1) 配置 flink SQL 环境// 1. 配置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置 flink settings 环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 3. 配置表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);// Todo 2) 配置 kafka 数据源,使用建表的方式,注意指定数据源是 jsonString sourceTable = "CREATE TABLE KafkaInputTable (\n" +"  `id` varchar,\n" +"  `timestamp` varchar,\n" +"  `category` varchar,\n" +"  `areaName` varchar,\n" +"  `money` double\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'order',\n" +"  'properties.bootstrap.servers' = 'node1:9092',\n" +"  'properties.group.id' = 'testGroup',\n" +"  'scan.startup.mode' = 'earliest-offset',\n" +"  'format' = 'json'\n" +")";// 执行建立数据源表语句tableEnv.executeSql(sourceTable);// Todo 3) 表的查询Table orderTable = tableEnv.from("KafkaInputTable");// 将表数据转化为 datastream,并打印出来tableEnv.toAppendStream(orderTable, Row.class).print("SQL>>>");// Todo 4) 将结果数据写入到 mysql中String sinkTable = "CREATE TABLE order_test (\n" +"  `id` varchar,\n" +"  `timestamp` varchar,\n" +"  `category` varchar,\n" +"  `areaName` varchar,\n" +"  `money` double\n" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://node1:3306/test?characterEncoding=utf-8&useSSL=false',\n" +"   'table-name' = 'order_test'," +"   'driver'='com.mysql.jdbc.Driver'," +"   'username' = 'root'," +"   'password' = '123456'," +"   'sink.buffer-flush.interval'='1s'," +"   'sink.buffer-flush.max-rows'='1'," +"   'sink.max-retries' = '5'" +")";// 执行建表数据tableEnv.executeSql(sinkTable);// 插入语句逻辑//定义sql语句String insert = "INSERT INTO order_test SELECT * FROM KafkaInputTable";//todo 5)将源表的数据写入到目标表中tableEnv.executeSql(insert);env.execute();}
}

结果:json 数据源源不断,解析到 mysql 里面

总结:

  • kafka 作为数据源,mysql 作为 sink,都可以用建表的方式解决!
  • 原理就是查询一个表,插入到另一个表中!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/744671.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为机试题-士兵的任务 2

题目 士兵在迷宫中执行任务,迷宫中危机重重,他需要在在最短的时间内到达指定的位置。你可以告诉土兵他最少需要多长时间吗? 输入一个 n m 的迷宫中,迷宫中 0 为路,1 为墙,2 为起点,3 为终点,4 为陷阱,6 为炸弹。士兵只能向上下左右四个方向移动,如果路径上为墙,不能移动。已知…

商业模式的深度解析:以四个特点构筑成功之路

商业模式&#xff0c;是企业将产品、服务、信息流、资金流等要素组合在一起&#xff0c;形成独特价值主张&#xff0c;并通过特定的渠道和方式传递给目标市场&#xff0c;从而获取利润的一种系统性设计。一个成功的商业模式&#xff0c;往往具备四个显著特点&#xff1a;重点突…

企业微信如何接入第三方应用?

1.登录企业微信管理后台&#xff1a;https://work.weixin.qq.com/wework_admin​​​​​ 2.点击创建应用&#xff1b; ​​​​​​​ 3. 此时可以看到已经创建好的应用&#xff0c;并且生成应用的唯一id&#xff08;agentId&#xff09; 4. 第三方应用申请域名 (举例&…

智慧楼宇物联网建设实施方案(2)

建设方案 楼宇综合管理平台 智慧楼宇物联网应用综合管理系统是对整个物联网系统的集中监控和展示。其主要功能是对各应用子系统的关键监测数据进行数据格式解析并呈现。进而使管理者能够从整体上对整个物联网系统运行状态有个直观的了解。其不同于各专业子系统的管理软件,重…

flstudio教程如何设置成中文 flstudio基础教程 flstudio免费

Fl studio编曲软件总共有英文和中文两种语言供用户选择&#xff0c;对于我们来说&#xff0c;更习惯于使用中文版本的flstudio编曲软件&#xff0c;包括我自己也比较习惯于使用中文版本的flstudio&#xff0c;同时也能提高工作效率。Flstudio编曲软件默认语言是英文&#xff0c…

vue:功能【xlsx】动态行内合并

场景&#xff1a;纯前端导出excel数据&#xff0c;涉及到列合并、行合并。 注&#xff09;当前数据表头固定&#xff0c;行内数据不固定。以第一列WM为判断条件&#xff0c;相同名字的那几行数据合并单元格。合并的那几行数据&#xff0c;后面的列按需求进行合并。 注&#x…

vue2 elementui 封装一个动态表单复杂组件

封装一个动态表单组件在 Vue 2 和 Element UI 中需要考虑到表单字段的动态添加、删除以及验证等复杂功能。下面是一个简单的例子&#xff0c;展示如何创建一个可以动态添加和删除字段的表单组件。 首先&#xff0c;你需要安装并引入 Element UI&#xff1a; bash 复制 npm in…

CV论文--2024.3.13

1、Sora Generates Videos with Stunning Geometrical Consistency 中文标题:Sora 生成具有惊人几何一致性的视频。 简介&#xff1a;最近发布的 Sora 模型展示了在视频生成领域的出色表现&#xff0c;引发了人们对其模拟真实世界现象能力的激烈讨论。尽管该模型越来越受欢迎&…

如何保证Redis和数据库数据一致性

缓存可以提升性能&#xff0c;减轻数据库压力&#xff0c;在获取这部分好处的同时&#xff0c;它却带来了一些新的问题&#xff0c;缓存和数据库之间的数据一致性问题。 想必大家在工作中只要用了咱们缓存势必就会遇到过此类问题 首先我们来看看一致性&#xff1a; 强一致性…

前端实现生成图片并批量下载,下载成果物是zip包

简介 项目上有个需求&#xff0c;需要根据表单填写一些信息&#xff0c;来生成定制的二维码图片&#xff0c;并且支持批量下载二维码图片。 之前的实现方式是直接后端生成二维码图片&#xff0c;点击下载时后端直接返回一个zip包即可。但是项目经理说后端实现方式每次改个东西…

python基础——列表【创建,下标索引,常见操作方法】

&#x1f4dd;前言&#xff1a; 这篇文章主要讲解一下python中常见的数据容器之一——列表 本文主要讲解列表的创建以及我们常用的列表操作方法 &#x1f3ac;个人简介&#xff1a;努力学习ing &#x1f4cb;个人专栏&#xff1a;C语言入门基础以及python入门基础 &#x1f380…

泰迪智能科技3月线上培训计划

有学习意向可到 泰迪智能科技官网 咨询了解

Visual Basic6.0零基础教学(3)—焦点概念和深入学习属性

焦点概念和深入学习属性 文章目录 焦点概念和深入学习属性前言一、什么是焦点(Focus)?焦点的特点 二、窗体属性一、窗体的结构二、窗体的属性三、事件四、方法 一.控件属性一. 标签 Label二.文本框 TextBox2.常用事件 三.命令按钮事件 总结 前言 今天我们来继续学习VB中的属性…

Java全系工程源码加密,防止反编译

一、前言 在大约2015年左右&#xff0c;由于公司产品序列逐渐增加&#xff0c;涉及到N多项目部署交付&#xff0c;为了有效防止产品被滥用&#xff0c;私自部署&#xff0c;当时技术中心决定开发一套统一的授权认证管理系统&#xff0c;对公司所有产品序列进行 License 控制。…

【DevSecOps】10种静态应用程序安全测试SAST工具对比

【DevSecOps】10种静态应用程序安全测试SAST工具对比 目录 【DevSecOps】10种静态应用程序安全测试SAST工具对比关于静态应用程序安全测试(SAST)工具的一切知识(常见问题解答)什么是静态应用程序安全测试(SAST)工具?静态应用程序安全测试(SAST)工具是如何工作的?静态…

蓝桥杯历年真题省赛java b组 2016年 第七届 抽签

一、题目 抽签 X星球要派出一个5人组成的观察团前往W星。 其中&#xff1a; A国最多可以派出4人。 B国最多可以派出2人。 C国最多可以派出2人。 .... 那么最终派往W星的观察团会有多少种国别的不同组合呢&#xff1f; 下面的程序解决了这个问题。 数组a[] 中既是每个国家可…

区块链技术的革命性影响

1. 区块链技术的基本原理&#xff1a; 区块链是一种去中心化的分布式数据库技术&#xff0c;通过不断增长的记录&#xff08;块&#xff09;构成一个链式结构。每个区块包含了交易数据的加密信息以及上一个区块的哈希值&#xff0c;从而形成了不可篡改的交易记录。这种去中心化…

Kotlin:为什么创建类不能被继承

一、为什么创建类不能被继承 class或data class 默认情况下&#xff0c;Kotlin 类是最终&#xff08;final&#xff09;的&#xff1a;它们不能被继承。 示例&#xff1a;data class PsersonBean 反编译data class PsersonBean 生成 public final class PsersonBean 示例&…

材料科学类3区SCI,仅13天超快上线见刊,国人友好!!

录用案例 JCR3区材料类SCI (3.31截稿) 【期刊简介】IF&#xff1a;3.0-4.0&#xff0c;JCR3区&#xff0c;中科院4区&#xff1b; 【检索情况】SCI在检&#xff1b; 【征稿领域】低温环境下新型生物降解材料的开发相关或结合研究均可&#xff1b; 【案例分享】重要时间节点…

数据类型(面向对象)

一.基本数据类型 Java中的基本数据类型包括八种&#xff0c;它们都是Java语言内置的&#xff0c;可以直接使用。这八种基本数据类型分别是&#xff1a; byte&#xff1a;字节类型&#xff0c;占用1个字节&#xff08;8位&#xff09;&#xff0c;取值范围从-128到127。short&…