大数据-玩转数据-Flink SQL编程

一、概念

在这里插入图片描述

1.1 Apache Flink 两种关系型 API

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。
Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。

Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。

Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。
注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。

1.2 动态表(Dynamic Tables)

动态表是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。
动态表是随时间变化的,可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。

对动态表的一般处理过程: 流->动态表->连续查询处理->动态表->流

二、导入Flink Table API依赖

pom.xml 中添加

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.21</version>
</dependency>

三、表与DataStream的混合使用简单案例

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;//必须添加此类才能在表达式中运用$符号
import static org.apache.flink.table.api.Expressions.$;public class Table_Api_BasicUse {public static void main(String[] args) throws Exception {// 流运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//并行参数env.setParallelism(1);// 数据源DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));// 创建表的执行环境StreamTableEnvironment TableEnv = StreamTableEnvironment.create(env);// 创建表,将流转换成动态表. 表的字段名从pojo的属性名自动抽取Table table = TableEnv.fromDataStream(waterSensorStream);// 对动态表进行查询Table resultTable = table.where($("id").isEqual("sensor_1")).select($("id"),$("vc"));//把动态表转化为流DataStream<Row> dataStream = TableEnv.toAppendStream(resultTable,Row.class);dataStream.print();env.execute();}
}

四、表到流的转换

动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:
Append-only 流
仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
Retract 流
retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
Upsert 流
upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。

请注意,在将动态表转换为 DataStream 时,只支持 append 流和 retract 流。

五、通过Connector声明读入数据

前面是先得到流, 再转成动态表, 其实动态表也可以直接连接到数据

5.1 File source

// 创建表
//表的元数据信息
Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());
// 连接文件, 并创建一个临时表, 其实就是一个动态表
tableEnv.connect(new FileSystem().path("input/sensor.txt")).withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n")).withSchema(schema).createTemporaryTable("sensor");
// 做成表对象, 然后对动态表进行查询
Table sensorTable = tableEnv.from("sensor");
Table resultTable = sensorTable.groupBy($("id")).select($("id"), $("id").count().as("cnt"));
//  把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
resultStream.print();

5.2 Kafka Source

// 创建表
// 表的元数据信息
Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());
// 连接文件, 并创建一个临时表, 其实就是一个动态表
tableEnv.connect(new Kafka().version("universal").topic("sensor").startFromLatest().property("group.id", "bigdata").property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092")).withFormat(new Json()).withSchema(schema).createTemporaryTable("sensor");
//对动态表进行查询
Table sensorTable = tableEnv.from("sensor");
Table resultTable = sensorTable.groupBy($("id")).select($("id"), $("id").count().as("cnt"));
//把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
resultStream.print();

六、通过Connector声明写出数据

6.1 File Sink

package com.atguigu.flink.java.chapter_11;import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/11 21:43*/
public class Flink02_TableApi_ToFileSystem {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table sensorTable = tableEnv.fromDataStream(waterSensorStream);Table resultTable = sensorTable.where($("id").isEqual("sensor_1") ).select($("id"), $("ts"), $("vc"));// 创建输出表Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());tableEnv.connect(new FileSystem().path("output/sensor_id.txt")).withFormat(new Csv().fieldDelimiter('|')).withSchema(schema).createTemporaryTable("sensor");// 把数据写入到输出表中resultTable.executeInsert("sensor");}
}

6.2 Kafka Sink

package com.atguigu.flink.java.chapter_11;import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/11 21:43*/
public class Flink03_TableApi_ToKafka {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table sensorTable = tableEnv.fromDataStream(waterSensorStream);Table resultTable = sensorTable.where($("id").isEqual("sensor_1") ).select($("id"), $("ts"), $("vc"));// 创建输出表Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());tableEnv.connect(new Kafka().version("universal").topic("sink_sensor").sinkPartitionerRoundRobin().property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092")).withFormat(new Json()).withSchema(schema).createTemporaryTable("sensor");// 把数据写入到输出表中resultTable.executeInsert("sensor");}
}

七、基本使用

7.1 查询未注册的表

package com.lyh.flink12;import org.apache.flink.types.Row;
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Connect_File_source {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> dataStreamSource =env.fromElements(new WaterSensor("sensor_1", 1000L, 20),new WaterSensor("sensor_1", 2000L, 30),new WaterSensor("sensor_1", 3000L, 40),new WaterSensor("sensor_1", 4000L, 50),new WaterSensor("sensor_1", 5000L, 60));// 创建动态表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 使用SQL查询未注册的表// 从流中得到一个表Table inputTable = tableEnv.fromDataStream(dataStreamSource);Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id = 'sensor_1'");tableEnv.toAppendStream(resultTable, Row.class).print();env.execute();}
}

7.2 查询已注册的表

package com.lyh.flink12;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Flink05_SQL_BaseUse_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 使用sql查询一个已注册的表// 1. 从流得到一个表Table inputTable = tableEnv.fromDataStream(waterSensorStream);// 2. 把注册为一个临时视图tableEnv.createTemporaryView("sensor", inputTable);// 3. 在临时视图查询数据, 并得到一个新表Table resultTable = tableEnv.sqlQuery("select * from sensor where id='sensor_1'");// 4. 显示resultTable的数据tableEnv.toAppendStream(resultTable, Row.class).print();env.execute();}
}

7.3 Kafka到Kafka

使用sql从Kafka读数据, 并写入到Kafka中

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Sql_kafka_kafka {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("+ "'connector' = 'kafka',"+ "'topic' = 'topic_source_sensor',"+ "'properties.bootstrap.servers' = 'hadoop100:9029',"+ "'properties.group.id' = 'atguigu',"+ "'scan.startup.mode' = 'latest-offset',"+ "'format' = 'json'"+ ")");// 2. 注册SinkTable: sink_sensortableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("+ "'connector' = 'kafka',"+ "'topic' = 'topic_sink_sensor',"+ "'properties.bootstrap.servers' = 'hadoop100:9029',"+ "'format' = 'json'"+ ")");// 3. 从SourceTable 查询数据, 并写入到 SinkTabletableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/86406.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS 学习笔记(基础)

用来控制网页表现的语言&#xff0c;CSS&#xff08;Cascading Style Sheet&#xff09;&#xff1a;层叠样式表。然后我们继续看看 W3C 标准&#xff1a; 结构&#xff1a;HTML表现&#xff1a;CSS行为&#xff1a;JavaScript CSS导入方式、选择器&属性 由于网页的框架…

linux进程杀不死

项目场景&#xff1a; 虚拟机 问题描述 linux进程杀不死 无反应 原因分析&#xff1a; 进程僵死zombie 解决方案&#xff1a; 进proc或者find命令找到进程所在地址 cat status查看进程杀死子进程

linux系统中mysql 连接出现“too many connections”问题解决办法

问题内容&#xff1a; 原因: mysql配置参数中设定的并发连接数太少或者系统繁忙导致连接数被占满。连接数超过了 MySQL 设置的值&#xff0c; 与 max_connections 和 wait timeout 都有关&#xff0c;wait_timeout 的值越大&#xff0c;连接的空闲等待就越长&#xff0c; 这样就…

Linux忘记密码

在虚拟机安装了centOS7&#xff0c;但是忘记了root密码&#xff0c;登录的时候发现登录不上了&#xff0c;然后重新修改密码。 1、重启虚拟机 2、进入到该页面之后&#xff0c;选中第一个&#xff08;高亮显示即为选中&#xff09;选项&#xff0c;然后按下键盘的“E”键 3…

mybatis日志体系

title: “java日志体系” createTime: 2021-12-08T12:19:5708:00 updateTime: 2021-12-08T12:19:5708:00 draft: false author: “ggball” tags: [“mybatis”] categories: [“java”] description: “java日志体系” java日志体系 常用日志框架 Log4j&#xff1a;Apache …

74、SpringBoot 整合 Spring Data JDBC

总结&#xff1a;用起来跟 Spring Data JPA 差不多 什么是 JdbcTemplate&#xff1f;&#xff08;Template译为模板&#xff09; Spring 框架对 JDBC 进行封装&#xff0c;使用 JdbcTemplate 方便实现对数据库操作 ★ Spring Data JDBC 既不需要JPA、Hibernate这种ORM框架&a…

离线部署 python 3.x 版本

文章目录 离线部署 python 3.x 版本1. 下载版本2. 上传到服务器3. 解压并安装4. 新建软连信息5. 注意事项 离线部署 python 3.x 版本 1. 下载版本 python 各版本下载地址 本次使用版本 Python-3.7.0a2.tgz # linux 可使用 wget 下载之后上传到所需服务器 wget https://www.py…

gma 2 成书计划

随着 gma 2 整体构建完成。下一步计划针对库内所有功能完成一个用户指南&#xff08;非网站&#xff09;。 封皮 主要章节 章节完成度相关链接第 1 章 GMA 概述已完成第 2 章 地理空间数据操作已完成第 3 章 坐标参考系统已完成第 4 章 地理空间制图已完成第 5 章 数学运算模…

3288S Android11 适配红外遥控功能(超详细)

目录 一、rk3288平台红外遥控介绍二、原理图分析三、配置设备树并使能红外遥控功能四、打开红外打印功能&#xff0c;查看红外遥控的用户码和键值五、将查看到的红外遥控用户码和键值添加到设备树和.kl文件六、Android红外遥控.kl文件映射知识和使用添加新的.kl文件七、补充&am…

Unity中关于多线程的一些事

一.线程中不允许调用unity组件api 解决方法&#xff1a;可以使用bool值变化并且在update中监测bool值变化来调用关于unity组件的API. 二.打印并且将信息输出到list列表中 多线程可能同时输出多条信息。输出字符串可以放入Queue队列中。队列可以被多线程插入。 三.启用socke…

Python 网络爬取的时候使用那种框架

尽管现代的网站多采取前后端分离的方式进行开发了&#xff0c;但是对直接 API 的调用我们通常会有 token 的限制和可以调用频率的限制。 因此&#xff0c;在一些特定的网站上&#xff0c;我们可能还是需要使用网络爬虫的方式获得已经返回的 JSON 数据结构&#xff0c;甚至是处理…

【计算机毕业设计】基于SpringBoot+Vue记帐理财系统的设计与实现

博主主页&#xff1a;一季春秋博主简介&#xff1a;专注Java技术领域和毕业设计项目实战、Java、微信小程序、安卓等技术开发&#xff0c;远程调试部署、代码讲解、文档指导、ppt制作等技术指导。主要内容&#xff1a;毕业设计(Java项目、小程序、安卓等)、简历模板、学习资料、…

C语言 coding style

头文件 The #define Guard #define的保护文件的唯一性&#xff0c;防止被多重包含 格式 : <PROJECT>_< FILE>_H_ PROJECT : XS FILE : MV_CTR 头文件的包含顺序 C System FilesOther LibrariesUser LibraryConditional include 作用域 局部变量 -变量定义时需要…

go语言unsafe.Pointer与uintptr

以下内容来源go语言圣经 1、unsafe.Pointer&#xff0c;相当于c语言中的void *类型的指针&#xff0c;如果需要运算需要转成uintptr类型的指针 2. uintptr uintptr是一个无符号的整型&#xff0c;它可以保存一个指针地址。 它可以进行指针运算。 uintptr无法持有对象, GC不把…

Kubernetes的容器批量调度引擎 Volcano

一个用于高性能工作负载场景下基于Kubernetes的容器批量调度引擎 Volcano是在Kubernetes上运行高性能工作负载的容器批量计算引擎。 它提供了Kubernetes目前缺少的一套机制&#xff0c;这些机制通常是许多高性能 工作负载所必需的&#xff0c;包括&#xff1a; - 机器学习/深度…

三.vue2路由知识全总结

Vue Devtools&#xff1a;插件安装&#xff0c;展示模块中的数据 vue-router 应用场景&#xff1a;Vue Router 是 Vue.js 的官方路由。它与 Vue.js 核心深度集成&#xff0c;让用 Vue.js 构建单页应用变得轻而易举。 嵌套的路由/视图表模块化的、基于组件的路由配置路由参数、…

一拖三快充线(USB-C转三充)的解决方案--LDR6020P

DR6020P 是带有 3 组 6 路 DRP USB-C 及 PD 通信协议处理模块和 USB2.0 Device 功能的 16 位 RISC MCU&#xff0c;内置 8K16 位 MTP 程序存储器&#xff08;可烧录 1000 次&#xff09;&#xff0c;512 字节的数据存储器&#xff08;SRAM&#xff09;。内置 LDO 5V 输出&#…

通讯网关软件011——利用CommGate X2ODBC实现DDE数据转入ODBC

本文介绍利用CommGate X2ODBC实将DDE数据源中的数据转入到ODBC数据源。CommGate X2ODBC是宁波科安网信开发的网关软件&#xff0c;软件可以登录到网信智汇(http://wangxinzhihui.com)下载。 【案例】如下图所示&#xff0c;将DDE数据源&#xff08;如Excel&#xff09;的数据写…

postgresql-触发器

postgresql-触发器 触发器概述创建触发器管理触发器删除触发器事件触发器创建事件触发器修改触发器删除事件触发器 触发器概述 PostgreSQL 触发器&#xff08;trigger&#xff09;是一种特殊的函数&#xff0c;当某个数据变更事件&#xff08;INSERT、UPDATE、 DELETE 或者 TR…

【3dmax】怎么将点删除而面保留

在编辑多边形模式下&#xff0c;选择点模式&#xff0c;选择要删除的点&#xff0c;在下拉面板中找到【移除】