Flink系列六:Flink SQl 之常用的连接器(Connector)

一、动态表 & 连续查询(Continuous Query)

1、动态表(Dynamic Tables)

        当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的SQL查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。

        动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化。

2、 连续查询

         动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。

下图显示了流、动态表和连续查询之间的关系:

3、在流上定义表

为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作。本质上我们正在从一个 INSERT-only 的 changelog 流构建表。

下图显示了单击事件流(左侧)如何转换为表(右侧)。当插入更多的单击流记录时,结果表将不断增长。

4、三种查询机制

动态表可以像普通数据库表一样通过 INSERTUPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。

在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:

(1)Append-only 流: 仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。(追加查询

(2)Upsert 流:当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用toChangelogStream()方法。

二、准备工作

1、sql-client准备

基于yarn-session模式

1.1启动flink集群
     yarn-session.sh -d

1.2进入sql命令行
     sql-client.sh

2、sql命令行打印结果模式

2.1、表格模式(table mode)

在内存中实体化结果,并将结果用规则的分页表格可视化展示出来,为默认模式。执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'table';

2.2   变更日志模式(changelog mode)

不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'changelog';

2.3 Tableau模式(tableau mode)

更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type),执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'tableau';

3、处理模式

3.1 流处理模式

1、可以用于处理有界流和无界流

2、流处理模式输出连续结果

3、流处理模式底层时持续流模型

执行如下命令启用:

SET 'execution.runtime-mode' = 'streaming'; 

3.2  批处理模式

1、批处理模式只能用于处理有界流

2、输出最终结果

3、底层是MapReduce模型

执行如下命令启用:

SET 'execution.runtime-mode' = 'batch'; 

3.3 关系型表/SQL与流处理对比

三、连接器(Connector)

1、kafka

kafka source

-- 创建表 --- 无界流
-- TIMESTAMP(3): 是flink总的时间字段
CREATE TABLE students_kafka (id STRING,name STRING,age INT,sex STRING,clazz STRING,`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 获取kfka时间戳`partition` BIGINT METADATA VIRTUAL, -- 获取kafka数据所在的分区`offset` BIGINT METADATA VIRTUAL,-- 偏移量-- 指定时间字段和水位线生成策略WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'students','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset',   --指定读取数据的模式'format' = 'csv','csv.ignore-parse-errors' ='true' -- 当有脏数据时是否跳过当前行
);select id,name,event_time,`partition`,`offset` from students_kafka;-- 每隔5秒统计每个班级的人数
select 
clazz,
TUMBLE_START(event_time,INTERVAL '5' SECOND) as win_start,
TUMBLE_END(event_time,INTERVAL '5' SECOND) as win_end,
count(id) as num
from 
students_kafka
group by
clazz,
-- 滚动的事件时间窗口
TUMBLE(event_time,INTERVAL '5' SECOND);

kafka sink

-- 创建sink表
CREATE TABLE students_kafka_sink (id STRING,name STRING
) WITH ('connector' = 'kafka','topic' = 'id_name','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
);-- 1、将仅插入的结果写入sink表
insert into students_kafka_sink
select id,name from 
students_kafka;-- 查看结果
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic id_name
select * from students_kafka_sink;-- 2、将更新更改查询结果写入kafka
-- 将更新更改的流写入kafka需要使用canal-json格式,
-- canal-json中带上了数据操作的类型
-- {"data":[{"clazz":"理科六班","num":377}],"type":"INSERT"}CREATE TABLE clazz_num (clazz STRING,num BIGINT
) WITH ('connector' = 'kafka','topic' = 'clazz_num','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'canal-json'
);insert into clazz_num
select 
clazz,
count(1) as num
from 
students_kafka
group by 
clazz;-- 查看结果
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
select * from clazz_num;

2、mysql

准备工作:

# 将依赖包上传到flink的lib目录下
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.47.jar

# 依赖更新后需要重启集群才会生效
yarn application -list
yarn application -kill [appid]
yarn-session.sh -d

sql-client.sh

mysql source

-- 字段名称和字段类型需要和数据库中保存一致
CREATE TABLE students_mysql (id int,name STRING,age INT,gender STRING,clazz STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata29','table-name' = 'student','username' ='root','password' = '123456'
);

mysql sink

-- 创建mysql sink表。需要增加主键约束,flink会通过主键更新数据
CREATE TABLE clazz_num_mysql (clazz STRING,num BIGINT,PRIMARY KEY (clazz) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=UTF-8','table-name' = 'clazz_num_mysql', -- 需要手动创建'username' ='root','password' = '123456'
);insert into clazz_num_mysql
select 
clazz,
count(1) as num
from 
students_kafka
group by 
clazz;

3、HDFS

hdfs source

-- 创建hdfs source表 -- 有界流
CREATE TABLE students_hdfs (id int,name STRING,age INT,gender STRING,clazz STRING
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/bigdata29/data/students.csv',  -- 必选:指定路径'format' = 'csv'                    -- 必选:文件系统连接器指定 format
);CREATE TABLE clazz_num_batch (clazz STRING,num BIGINT
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/clazz_num_batch',  -- 必选:指定路径'format' = 'csv'                    -- 必选:文件系统连接器指定 format
);-- 查询数据
insert into clazz_num_batch
select 
clazz,
count(1) as num
from
students_hdfs
group by clazz;-- 创建hdfs source表 -- 无界流
CREATE TABLE students_hdfs_stream (id int,name STRING,age INT,gender STRING,clazz STRING
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/students',  -- 必选:指定路径'format' = 'csv',                    -- 必选:文件系统连接器指定 format'source.monitor-interval' = '5000' -- 指定扫描目录的间隔时间
);

hdfs sink

-- 将仅追加的结果流写入hdfs
CREATE TABLE students_hdfs_sink (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/students_sink',  -- 必选:指定路径'format' = 'csv'                    -- 必选:文件系统连接器指定 format
);
insert into students_hdfs_sink
select id,name,age,sex,clazz from students_kafka;-- 2、将更新更改的结果写入hdfs
CREATE TABLE clazz_num_hdfs (clazz STRING,num BIGINT
) WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/clazz_num',  -- 必选:指定路径'format' = 'canal-json'                    -- 必选:文件系统连接器指定 format
);insert into clazz_num_hdfs
select 
clazz,
count(1) as num
from
students_kafka
group by clazz;

 

4、hbase

准备工作:

# 将依赖包上传到flink的lib目录下
flink-sql-connector-hbase-2.2-1.15.2.jar

# 依赖更新后需要重启集群才会生效
yarn application -list
yarn application -kill [appid]
yarn-session.sh -d

sql-client.sh

-- 1、在hbase中创建表
create 'students_flink','info'-- 创建hbase sink表
CREATE TABLE students_hbase (id STRING, info ROW<name STRING,age INT,sex STRING,clazz STRING>, -- 指定列簇中的列PRIMARY KEY (id) NOT ENFORCED -- 设置hbase 的 rowkey
) WITH ('connector' = 'hbase-2.2','table-name' = 'students_flink','zookeeper.quorum' = 'master:2181,node1:2181,node2:2181'  --指定zookeeper集群列表
);insert into students_hbase
select 
id,
ROW(name,age,sex,clazz) as info
from students_kafka;-- 查看结果
select * from students_hbase;
scan 'students_flink'

5、datagen

用于随机生成测试数据,可以用于高性能测试

CREATE TABLE students_datagen (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'datagen','rows-per-second'='5', -- 指定每秒生成的数据量'fields.id.length'='5','fields.name.length'='3','fields.age.min'='1','fields.age.max'='100','fields.sex.length'='1','fields.clazz.length'='4'
);

 

6、print

在task manager中打印结果

CREATE TABLE print_table (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'print'
);insert into print_table
select * from students_datagen;

7、BlackHole

黑洞,数据进入就出不来,用于高性能测试

CREATE TABLE blackhole_table (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'blackhole'
);insert into blackhole_table
select * from students_datagen;

四、数据格式

1、csv

数据中字段的顺序需要和建表语句字段的顺序保持一致 (顺序映射) 默认按照逗号分割

CREATE TABLE students_csv (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置'format' = 'csv', -- 指定数据的格式'csv.field-delimiter' = ',' ,-- 指定分隔符'csv.ignore-parse-errors' ='true' -- 跳过脏数据
);

2、json

flink表中的字段和类型需要和json中保持一致(同名映射)

CREATE TABLE cars (car STRING,city_code STRING,county_code STRING,card BIGINT,camera_id STRING,orientation STRING,road_id BIGINT,`time` BIGINT,speed DOUBLE
) WITH ('connector' = 'kafka','topic' = 'cars', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置'format' = 'json', -- 指定数据的格式'json.ignore-parse-errors' ='true'
);

3、canal-json

用于保存更新结果流

CREATE TABLE clazz_num (clazz STRING,num BIGINT
) WITH ('connector' = 'kafka','topic' = 'clazz_num','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'canal-json'
);insert into clazz_num
select 
clazz,
count(1) as num
from 
students_kafka
group by 
clazz;

五、案例

数据为json格式:

{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"01001","orientation":"西南","road_id":34052056,"time":1614711904,"speed":35.38}

实时统计道路拥堵情况

CREATE TABLE cars (car STRING,city_code STRING,county_code STRING,card BIGINT,camera_id STRING,orientation STRING,road_id BIGINT,`time` BIGINT,speed DOUBLE,event_time as TO_TIMESTAMP(FROM_UNIXTIME(`time`)),-- 生成新的字段-- 指定事件时间和水位线WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'cars', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置'format' = 'json', -- 指定数据的格式'json.ignore-parse-errors' ='true'
);--每隔1分钟计算最近15分钟每个道路的车流量和平均车速(滑动事件时间窗口)
select road_id,HOP_start(event_time,INTERVAL '1' MINUTES, INTERVAL '15' MINUTES) as win_start,HOP_end(event_time,INTERVAL '1' MINUTES, INTERVAL '15' MINUTES) as win_end,count(distinct car) as flow,avg(speed) as avg_speed
from cars 
group by road_id,HOP(event_time,INTERVAL '1' MINUTES, INTERVAL '15' MINUTES);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/21846.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL Developer 导入CSV数据

之前已经写过一篇文章&#xff1a;将文本文件导入Oracle数据库的简便方法&#xff1a;SQL Developer 本文是类似的&#xff0c;只不过使用的是官方提供的 CSV文件&#xff0c;确实是标准的CSV&#xff08;comma separated values&#xff09;。 COL1,COL2,COL3 "e40a9db…

2024年文艺文化与社会发展国际会议(ICLCSD 2024)

2024年文艺文化与社会发展国际会议 2024 International Conference on Literature, Culture and Social Development 【1】会议简介 2024年文艺文化与社会发展国际会议是一场汇集全球文艺文化和社会科学领域精英的盛会。本次会议以“文艺文化与社会发展”为主题&#xff0c;旨在…

[Java基础揉碎]坦克大战 java事件处理机制

目录 坦克大战游戏演示 ​编辑 为什么要写这个项目 java绘图坐标体系 java绘图技术 Graphics的常用方法 // 画直线 ​编辑 // 画矩形边框 // 画填充矩形 // 画填充椭圆 // 获取图片资源 // 写字 绘出坦克 新建一个tankgame包, 新建一个类Tank, 里面包含横…

01_初识微服务

文章目录 一、微服务概述1.1 什么是微服务1.2 对比微服务架构与单体架构1.3 微服务设计原则1.4 微服务开发框架1.5 简单理解分布式部署与集群部署 二、微服务的核心概念2.1 服务注册与发现2.2 微服调用&#xff08;通信&#xff09;2.3 服务网关2.4 服务容错2.5 链路追踪参考链…

CSAPP Lab04——Cache Lab大师手笔,匠心制作

浮沉浪似人潮 哪会没有思念 你我伤心到 讲不出再见 ——讲不出再见 完整代码见&#xff1a;CSAPP/cachelab-handout at main SnowLegend-star/CSAPP (github.com) Part A: Cache Simulator 这个lab描述背大锅&#xff0c;开始我是真有点没看懂题目的描述。特别是“M 20,1”“…

构建大型语言模型(LLM)产品的实战指南

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

基于多尺度相关小波分解的单幅图像去雾和去噪方法(MATLAB)

小波变换具有优美的数学背景和强大的多分辨率分析能力。它集成和发展了短时傅里叶变换的思想并克服了其时间窗口不可变的缺点。小波变换通过使用具有局部感受野和多尺度的基函数。形成了同时具有局部和全局性质的信号表征。与DCT等全局变换相比&#xff0c;小波变换可以防止局部…

Java面试八股之Executors可以创建哪几种类型的线程池

Executors可以创建哪几种类型的线程池 newSingleThreadExecutor&#xff1a; 创建一个单线程的线程池&#xff0c;此线程池确保所有的任务都在同一个线程中按顺序执行。适用于需要保证任务顺序执行&#xff0c;或者在单线程中运行的任务。 newFixedThreadPool&#xff1a; …

每日两题 / 34. 在排序数组中查找元素的第一个和最后一个位置 33. 搜索旋转排序数组(LeetCode热题100)

34. 在排序数组中查找元素的第一个和最后一个位置 - 力扣&#xff08;LeetCode&#xff09; 根据二分函数&#xff0c;得到>target和<target的两个&#xff0c;分别是答案的l和r class Solution { public:vector<int> searchRange(vector<int>& nums,…

Python | Leetcode Python题解之第130题被围绕的区域

题目&#xff1a; 题解&#xff1a; class Solution:def solve(self, board: List[List[str]]) -> None:if not board:returnn, m len(board), len(board[0])que collections.deque()for i in range(n):if board[i][0] "O":que.append((i, 0))board[i][0] &q…

github有趣项目:Verilog在线仿真( DigitalJS+edaplayground)

DigitalJS https://github.com/tilk/digitaljs这个项目是一个用Javascript实现的数字电路模拟器。 它旨在模拟由硬件设计工具合成的电路 像 Yosys&#xff08;这里是 Github 存储库&#xff09;&#xff0c;它有一个配套项目 yosys2digitaljs&#xff0c;它可以转换 Yosys 将文…

【多视图聚类】COMPLETER:Incomplete Multi-view Clustering via Contrastive Prediction

CVPR 2021 0.摘要 在本文中&#xff0c;我们研究了不完全多视图聚类分析中的两个具有挑战性的问题&#xff0c;即i&#xff09;如何在没有标签的帮助下学习不同视图之间的信息性和一致性表示&#xff0c;以及ii&#xff09;如何从数据中恢复缺失的视图。为此&#xff0c;我们…

英伟达开源新利器NV-Embed向量模型,基于双向注意力的LLM嵌入模型,MTEB 56项任务排名第一

前言 文本嵌入模型能够将文本信息转化为稠密的向量表示&#xff0c;并在信息检索、语义相似度计算、文本分类等众多自然语言处理任务中发挥着关键作用。近年来&#xff0c;基于解码器的大型语言模型 (LLM) 开始在通用文本嵌入任务中超越传统的 BERT 或 T5 嵌入模型&#xff0c…

Centos 7之Hadoop搭建

介绍 Hadoop Distributed File System简称 HDFS&#xff0c;是一个分布式文件系统。HDFS 有着高容错性&#xff08;fault-tolerent&#xff09;的特点&#xff0c;并且设计用来部署在低廉的&#xff08;low-cost&#xff09;硬件上。而且它提供高吞吐量&#xff08;high throu…

三分钟“手撕”队列与习题

代码放开头&#xff0c;方便大家查阅 目录 一、实现代码 二、什么是队列 三、队列常见方法 入队push&#xff08;&#xff09; 出队 四、Queue使用 Java自带的Queue 双端队列 五、习题 循环队列 用队列实现栈 用栈实现队列 一、实现代码 package demo2;publi…

一款小众清新的Typecho主题

源码介绍 DearLicy主题&#xff0c;一款小众化小清新风格的博客主题 主题支持Typecho所支持的所有版本PHP 简约、小众、优雅 源码截图 安装教程 将主题上传至/usr/themes/文件夹下解压后台进行启用访问前台查看效果 源码下载 https://www.qqmu.com/3378.html

一键设置常用纸张和页面边距-Word插件-大珩助手

Word大珩助手是一款功能丰富的Office Word插件&#xff0c;旨在提高用户在处理文档时的效率。它具有多种实用的功能&#xff0c;能够帮助用户轻松修改、优化和管理Word文件&#xff0c;从而打造出专业而精美的文档。 【新功能】常用纸张和常用边距 1、一键设定符合中国人常用…

273 基于matlab的改进型节点重构小波包频带能量谱与 PNN(概率神经网络)的联合故障诊断新方法

基于matlab的改进型节点重构小波包频带能量谱与 PNN&#xff08;概率神经网络&#xff09;的联合故障诊断新方法。针对风电机组故障信号的非平稳性以及故障与征兆的非线性映射导致的故障识别困难问题&#xff0c;提出了改进型的节点重构小波包频带能量谱与PNN&#xff08;概率神…

大数据数据治理工具

大数据数据治理-CSDN博客 大数据数据治理工具&#xff1a; 开源工具&#xff1a; Apache Atlas&#xff1a; 一个开源的数据治理和元数据框架&#xff0c;为Hadoop生态系统提供数据分类、管理和安全功能。 Apache Ranger&#xff1a; 一个集中式安全管理框架&#xff0c;用于…

Java Web学习笔记2——Web开发介绍

什么是Web&#xff1f; Web&#xff1a;全球广域网&#xff0c;也称为万维网&#xff08;WWW World Wide Web&#xff09;&#xff0c;能够通过浏览器访问的网站。 1&#xff09;淘宝、京东、唯品会等电商系统&#xff1b; 2&#xff09;CRM、OA、ERP企业管理系统&#xff1…