Flink系列六:Flink SQl 之常用的连接器(Connector)

一、动态表 & 连续查询(Continuous Query)

1、动态表(Dynamic Tables)

        当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的SQL查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。

        动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化。

2、 连续查询

         动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。

下图显示了流、动态表和连续查询之间的关系:

3、在流上定义表

为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作。本质上我们正在从一个 INSERT-only 的 changelog 流构建表。

下图显示了单击事件流(左侧)如何转换为表(右侧)。当插入更多的单击流记录时,结果表将不断增长。

4、三种查询机制

动态表可以像普通数据库表一样通过 INSERTUPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。

在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:

(1)Append-only 流: 仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。(追加查询

(2)Upsert 流:当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用toChangelogStream()方法。

二、准备工作

1、sql-client准备

基于yarn-session模式

1.1启动flink集群
     yarn-session.sh -d

1.2进入sql命令行
     sql-client.sh

2、sql命令行打印结果模式

2.1、表格模式(table mode)

在内存中实体化结果,并将结果用规则的分页表格可视化展示出来,为默认模式。执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'table';

2.2   变更日志模式(changelog mode)

不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'changelog';

2.3 Tableau模式(tableau mode)

更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type),执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'tableau';

3、处理模式

3.1 流处理模式

1、可以用于处理有界流和无界流

2、流处理模式输出连续结果

3、流处理模式底层时持续流模型

执行如下命令启用:

SET 'execution.runtime-mode' = 'streaming'; 

3.2  批处理模式

1、批处理模式只能用于处理有界流

2、输出最终结果

3、底层是MapReduce模型

执行如下命令启用:

SET 'execution.runtime-mode' = 'batch'; 

3.3 关系型表/SQL与流处理对比

三、连接器(Connector)

1、kafka

kafka source

-- 创建表 --- 无界流
-- TIMESTAMP(3): 是flink总的时间字段
CREATE TABLE students_kafka (id STRING,name STRING,age INT,sex STRING,clazz STRING,`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 获取kfka时间戳`partition` BIGINT METADATA VIRTUAL, -- 获取kafka数据所在的分区`offset` BIGINT METADATA VIRTUAL,-- 偏移量-- 指定时间字段和水位线生成策略WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'students','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset',   --指定读取数据的模式'format' = 'csv','csv.ignore-parse-errors' ='true' -- 当有脏数据时是否跳过当前行
);select id,name,event_time,`partition`,`offset` from students_kafka;-- 每隔5秒统计每个班级的人数
select 
clazz,
TUMBLE_START(event_time,INTERVAL '5' SECOND) as win_start,
TUMBLE_END(event_time,INTERVAL '5' SECOND) as win_end,
count(id) as num
from 
students_kafka
group by
clazz,
-- 滚动的事件时间窗口
TUMBLE(event_time,INTERVAL '5' SECOND);

kafka sink

-- 创建sink表
CREATE TABLE students_kafka_sink (id STRING,name STRING
) WITH ('connector' = 'kafka','topic' = 'id_name','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
);-- 1、将仅插入的结果写入sink表
insert into students_kafka_sink
select id,name from 
students_kafka;-- 查看结果
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic id_name
select * from students_kafka_sink;-- 2、将更新更改查询结果写入kafka
-- 将更新更改的流写入kafka需要使用canal-json格式,
-- canal-json中带上了数据操作的类型
-- {"data":[{"clazz":"理科六班","num":377}],"type":"INSERT"}CREATE TABLE clazz_num (clazz STRING,num BIGINT
) WITH ('connector' = 'kafka','topic' = 'clazz_num','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'canal-json'
);insert into clazz_num
select 
clazz,
count(1) as num
from 
students_kafka
group by 
clazz;-- 查看结果
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
select * from clazz_num;

2、mysql

准备工作:

# 将依赖包上传到flink的lib目录下
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.47.jar

# 依赖更新后需要重启集群才会生效
yarn application -list
yarn application -kill [appid]
yarn-session.sh -d

sql-client.sh

mysql source

-- 字段名称和字段类型需要和数据库中保存一致
CREATE TABLE students_mysql (id int,name STRING,age INT,gender STRING,clazz STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata29','table-name' = 'student','username' ='root','password' = '123456'
);

mysql sink

-- 创建mysql sink表。需要增加主键约束,flink会通过主键更新数据
CREATE TABLE clazz_num_mysql (clazz STRING,num BIGINT,PRIMARY KEY (clazz) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=UTF-8','table-name' = 'clazz_num_mysql', -- 需要手动创建'username' ='root','password' = '123456'
);insert into clazz_num_mysql
select 
clazz,
count(1) as num
from 
students_kafka
group by 
clazz;

3、HDFS

hdfs source

-- 创建hdfs source表 -- 有界流
CREATE TABLE students_hdfs (id int,name STRING,age INT,gender STRING,clazz STRING
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/bigdata29/data/students.csv',  -- 必选:指定路径'format' = 'csv'                    -- 必选:文件系统连接器指定 format
);CREATE TABLE clazz_num_batch (clazz STRING,num BIGINT
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/clazz_num_batch',  -- 必选:指定路径'format' = 'csv'                    -- 必选:文件系统连接器指定 format
);-- 查询数据
insert into clazz_num_batch
select 
clazz,
count(1) as num
from
students_hdfs
group by clazz;-- 创建hdfs source表 -- 无界流
CREATE TABLE students_hdfs_stream (id int,name STRING,age INT,gender STRING,clazz STRING
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/students',  -- 必选:指定路径'format' = 'csv',                    -- 必选:文件系统连接器指定 format'source.monitor-interval' = '5000' -- 指定扫描目录的间隔时间
);

hdfs sink

-- 将仅追加的结果流写入hdfs
CREATE TABLE students_hdfs_sink (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/students_sink',  -- 必选:指定路径'format' = 'csv'                    -- 必选:文件系统连接器指定 format
);
insert into students_hdfs_sink
select id,name,age,sex,clazz from students_kafka;-- 2、将更新更改的结果写入hdfs
CREATE TABLE clazz_num_hdfs (clazz STRING,num BIGINT
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/clazz_num',  -- 必选:指定路径'format' = 'canal-json'                    -- 必选:文件系统连接器指定 format
);insert into clazz_num_hdfs
select 
clazz,
count(1) as num
from
students_kafka
group by clazz;

 

4、hbase

准备工作:

# 将依赖包上传到flink的lib目录下
flink-sql-connector-hbase-2.2-1.15.2.jar

# 依赖更新后需要重启集群才会生效
yarn application -list
yarn application -kill [appid]
yarn-session.sh -d

sql-client.sh

-- 1、在hbase中创建表
create 'students_flink','info'-- 创建hbase sink表
CREATE TABLE students_hbase (id STRING, info ROW<name STRING,age INT,sex STRING,clazz STRING>, -- 指定列簇中的列PRIMARY KEY (id) NOT ENFORCED -- 设置hbase 的 rowkey
) WITH ('connector' = 'hbase-2.2','table-name' = 'students_flink','zookeeper.quorum' = 'master:2181,node1:2181,node2:2181'  --指定zookeeper集群列表
);insert into students_hbase
select 
id,
ROW(name,age,sex,clazz) as info
from students_kafka;-- 查看结果
select * from students_hbase;
scan 'students_flink'

5、datagen

用于随机生成测试数据,可以用于高性能测试

CREATE TABLE students_datagen (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'datagen','rows-per-second'='5', -- 指定每秒生成的数据量'fields.id.length'='5','fields.name.length'='3','fields.age.min'='1','fields.age.max'='100','fields.sex.length'='1','fields.clazz.length'='4'
);

 

6、print

在task manager中打印结果

CREATE TABLE print_table (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'print'
);insert into print_table
select * from students_datagen;

7、BlackHole

黑洞,数据进入就出不来,用于高性能测试

CREATE TABLE blackhole_table (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'blackhole'
);insert into blackhole_table
select * from students_datagen;

四、数据格式

1、csv

数据中字段的顺序需要和建表语句字段的顺序保持一致 (顺序映射) 默认按照逗号分割

CREATE TABLE students_csv (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置'format' = 'csv', -- 指定数据的格式'csv.field-delimiter' = ',' ,-- 指定分隔符'csv.ignore-parse-errors' ='true' -- 跳过脏数据
);

2、json

flink表中的字段和类型需要和json中保持一致(同名映射)

CREATE TABLE cars (car STRING,city_code STRING,county_code STRING,card BIGINT,camera_id STRING,orientation STRING,road_id BIGINT,`time` BIGINT,speed DOUBLE
) WITH ('connector' = 'kafka','topic' = 'cars', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置'format' = 'json', -- 指定数据的格式'json.ignore-parse-errors' ='true'
);

3、canal-json

用于保存更新结果流

CREATE TABLE clazz_num (clazz STRING,num BIGINT
) WITH ('connector' = 'kafka','topic' = 'clazz_num','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'canal-json'
);insert into clazz_num
select 
clazz,
count(1) as num
from 
students_kafka
group by 
clazz;

五、案例

数据为json格式:

{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"01001","orientation":"西南","road_id":34052056,"time":1614711904,"speed":35.38}

实时统计道路拥堵情况

CREATE TABLE cars (car STRING,city_code STRING,county_code STRING,card BIGINT,camera_id STRING,orientation STRING,road_id BIGINT,`time` BIGINT,speed DOUBLE,event_time as TO_TIMESTAMP(FROM_UNIXTIME(`time`)),-- 生成新的字段-- 指定事件时间和水位线WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'cars', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置'format' = 'json', -- 指定数据的格式'json.ignore-parse-errors' ='true'
);--每隔1分钟计算最近15分钟每个道路的车流量和平均车速(滑动事件时间窗口)
select road_id,HOP_start(event_time,INTERVAL '1' MINUTES, INTERVAL '15' MINUTES) as win_start,HOP_end(event_time,INTERVAL '1' MINUTES, INTERVAL '15' MINUTES) as win_end,count(distinct car) as flow,avg(speed) as avg_speed
from cars 
group by road_id,HOP(event_time,INTERVAL '1' MINUTES, INTERVAL '15' MINUTES);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/21846.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL Developer 导入CSV数据

之前已经写过一篇文章&#xff1a;将文本文件导入Oracle数据库的简便方法&#xff1a;SQL Developer 本文是类似的&#xff0c;只不过使用的是官方提供的 CSV文件&#xff0c;确实是标准的CSV&#xff08;comma separated values&#xff09;。 COL1,COL2,COL3 "e40a9db…

2024年文艺文化与社会发展国际会议(ICLCSD 2024)

2024年文艺文化与社会发展国际会议 2024 International Conference on Literature, Culture and Social Development 【1】会议简介 2024年文艺文化与社会发展国际会议是一场汇集全球文艺文化和社会科学领域精英的盛会。本次会议以“文艺文化与社会发展”为主题&#xff0c;旨在…

新一代AI的崛起——GPT-4o深度评析

目录 引言 一、版本间的对比分析 1.1 GPT-4与GPT-4o 1.2 GPT-4o与GPT-3 二、GPT-4o的技术能力 2.1 多模态处理能力 2.2 强化学习与自监督学习 2.3 高效的推理能力 三、个人整体感受 3.1 使用体验 3.2 实际应用 四、未来展望 4.1 持续优化与创新 4.2 加强人机协作 …

[Java基础揉碎]坦克大战 java事件处理机制

目录 坦克大战游戏演示 ​编辑 为什么要写这个项目 java绘图坐标体系 java绘图技术 Graphics的常用方法 // 画直线 ​编辑 // 画矩形边框 // 画填充矩形 // 画填充椭圆 // 获取图片资源 // 写字 绘出坦克 新建一个tankgame包, 新建一个类Tank, 里面包含横…

01_初识微服务

文章目录 一、微服务概述1.1 什么是微服务1.2 对比微服务架构与单体架构1.3 微服务设计原则1.4 微服务开发框架1.5 简单理解分布式部署与集群部署 二、微服务的核心概念2.1 服务注册与发现2.2 微服调用&#xff08;通信&#xff09;2.3 服务网关2.4 服务容错2.5 链路追踪参考链…

前端科举八股文-VUE篇

前端科举八股文-VUE篇 Vue响应式的基本原理?computed和watch的区别computed和methods的区别Slot是什么 ? 作用域插槽是什么?组件缓冲keep-alive是什么&#xff1f; 讲讲原理v-if&#xff0c;v-show的区别v-modal如何实现双向绑定组件中的data属性为什么是一个函数而不是对象…

python SciPy 和 NumPy 版本冲突

UserWarning: A NumPy version >1.19.5 and <1.23.0 is required for this version of SciPy (detected version 1.17.2) warnings.warn(f"A NumPy version >{np_minversion} and <{np_maxversion}"在使用 Python 的科学计算库时&#xff0c;经常会遇到各…

CSAPP Lab04——Cache Lab大师手笔,匠心制作

浮沉浪似人潮 哪会没有思念 你我伤心到 讲不出再见 ——讲不出再见 完整代码见&#xff1a;CSAPP/cachelab-handout at main SnowLegend-star/CSAPP (github.com) Part A: Cache Simulator 这个lab描述背大锅&#xff0c;开始我是真有点没看懂题目的描述。特别是“M 20,1”“…

基于Nodejs的简易邮件SMTP服务器

仅用于内部开发环境无法访问邮件服务器的情况下&#xff0c;测试SMTP邮件发送有没有成功。收到邮件发送请求后仅打印内容和保存附件用于测试验证。 安装库 实验环境使用了Nodejs版本16。 npm install smtp-server mailparser JS版SMTP服务器代码 没有使用TSL/SSL&#xff0c…

构建大型语言模型(LLM)产品的实战指南

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

双击bat文件查看ip地址

echo off echo Running batch file as administrator...:: 下面这行是以管理员身份运行 ipconfig 命令&#xff0c;并将结果输出到控制台 cmd /c "ipconfig"echo Batch file execution completed. pause注意文件名不能为 ipconfig.bat 可以是 ipconfigs.bat 另一种…

【JavaScript】JS 的 btoa 和 atob 全局函数

在 JavaScript 中&#xff0c;btoa 和 atob 是两个处理 Base64 编码的全局函数&#xff0c;它们通常用于在浏览器环境中对二进制数据进行编码和解码。 不过&#xff0c;需要注意的是&#xff0c;这两个函数并非 JavaScript 标准规范&#xff08;ECMAScript&#xff09;的一部分…

MATLAB .m文件的命名规则

matlab的.m文件的命名规则&#xff1a; 文件名命名要用英文字符&#xff0c;首字符不能是数字或下划线。 文件名不能与matlab的内部函数名相同。 .m文件名的命名尽量不要是简单的英文单词&#xff0c;最好是由大小写英文/数字/下划线等组成。 原因是简单的单词命名容易与mat…

基于多尺度相关小波分解的单幅图像去雾和去噪方法(MATLAB)

小波变换具有优美的数学背景和强大的多分辨率分析能力。它集成和发展了短时傅里叶变换的思想并克服了其时间窗口不可变的缺点。小波变换通过使用具有局部感受野和多尺度的基函数。形成了同时具有局部和全局性质的信号表征。与DCT等全局变换相比&#xff0c;小波变换可以防止局部…

58、试除法求约数

试除法求约数 题目描述 给定n个正整数ai&#xff0c;对于每个整数ai,请你按照从小到大的顺序输出它的所有约数。 输入格式 第一行包含整数n。 接下来n行&#xff0c;每行包含一个整数ai。 输出格式 输出共n行&#xff0c;其中第 i 行输出第 i 个整数ai的所有约数。 数据…

oracle查看序列

在Oracle数据库中&#xff0c;查看序列的方式主要有以下几种&#xff1a; 查看当前用户下的所有序列名称&#xff1a; sql复制代码 SELECT sequence_name FROM user_sequences; 查看所有用户的序列&#xff1a; sql复制代码 SELECT sequence_name FROM all_sequences; 查看…

Java面试八股之Executors可以创建哪几种类型的线程池

Executors可以创建哪几种类型的线程池 newSingleThreadExecutor&#xff1a; 创建一个单线程的线程池&#xff0c;此线程池确保所有的任务都在同一个线程中按顺序执行。适用于需要保证任务顺序执行&#xff0c;或者在单线程中运行的任务。 newFixedThreadPool&#xff1a; …

知识蒸馏——讨论区

更多内容请了解&#xff1a; 知识蒸馏——基础知识 知识蒸馏——学生模型 知识蒸馏——代码实现 知识蒸馏——讨论区 知识蒸馏——讨论区 一、教师模型的预测结果&#xff08;软标签&#xff09;与传统标签的区别&#xff1f;二、教师模型的软标签与真实标签的关系三、为什么学…

YOLOv5改进 | Conv篇 | 利用YOLOv10提出的UIB模块二次创新C3(附代码 + 完整修改教程)

一、本文介绍 本文给大家带来的改进机制是利用利用YOLOv10提出的UIB模块二次创新C3助力YOLOv5进行有效涨点,其中C2fUIB模块所用到的CIB模块是一种紧凑的倒置块结构,它采用廉价的深度卷积进行空间混合,并采用成本效益高的点卷积进行通道混合。本文针对该方法给出多种使用方法…

每日两题 / 34. 在排序数组中查找元素的第一个和最后一个位置 33. 搜索旋转排序数组(LeetCode热题100)

34. 在排序数组中查找元素的第一个和最后一个位置 - 力扣&#xff08;LeetCode&#xff09; 根据二分函数&#xff0c;得到>target和<target的两个&#xff0c;分别是答案的l和r class Solution { public:vector<int> searchRange(vector<int>& nums,…