Flink SQL kafka连接器

版本说明

Flink和kafka的版本号有一定的匹配关系,操作成功的版本:

  • Flink1.17.1
  • kafka_2.12-3.3.1

添加kafka连接器依赖

将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下

下载flink-sql-connector-kafka连接器jar包

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上传到flink的lib目录下

[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib

分发flink-connector-kafka-1.17.1.jar

xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar

启动yarn-session

[hadoop@node2 ~]$ myhadoop.sh start
[hadoop@node2 ~]$ yarn-session.sh -d

启动kafka集群

[hadoop@node2 ~]$ zk.sh start
[hadoop@node2 ~]$ kf.sh start

创建kafka主题

查看主题
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
​
如果没有ws1,则创建
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
​

普通Kafka表

'connector' = 'kafka'

进入Flink SQL客户端

[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
...
省略若干日志输出
...
Flink SQL> 

创建Kafka的映射表

CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094','properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed','topic' = 'ws1','format' = 'json'
);

可以往kafka读数据,也可以往kafka写数据。

插入数据到Kafka表

如果没有source表,先创建source表,如果source表存在则不需要再创建。

CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);

把source表插入t1表

insert into t1(id,ts,vc) select * from source;

如果报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、yarn-session也要重启(重要)

cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib

查看是否复制成功

$ ls $FLINK_HOME/lib

重启sql-client重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
​

查询Kafka表

select * from t1;

报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord​

重启yarn session,重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
​
​
Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
select * from t1;
[INFO] Result retrieval cancelled.
​
Flink SQL> 
​

 

upsert-kafka表

'connector' = 'upsert-kafka'

如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。

创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);

如果没有kafka名为ws2的topic,将自动被创建。

插入upsert-kafka表

insert into t2 select id,sum(vc) sumVC  from source group by id;

查询upsert-kafka表

upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。

设置显示模式

SET sql-client.execution.result-mode=tableau;

 查询t2表数据

select * from t2;

如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。

进入Flink Web UI,cancel掉所有running job,重新操作成功如下:

删除表

Flink SQL> show tables;
+------------+
| table name |
+------------+
|     source |
|         t1 |
|         t2 |
+------------+
3 rows in set
​
Flink SQL> drop table source;
Flink SQL> drop table t1;
Flink SQL> drop table t2;

创建表

CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);

设置显示模式

SET sql-client.execution.result-mode=tableau;

查询表

select * from t2;

 

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/867750.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最新整理的机器人相关数据合集(1993-2022年不等 具体看数据类型)

机器人安装数据是指记录全球或特定区域内工业机器人新安装数量的信息,这一数据由国际机器人联合会(IFR)等权威机构定期发布。这些数据不仅揭示了机器人技术的市场需求趋势,还反映了各国和地区自动化水平及产业升级的步伐。例如,数据显示中国在…

数据库测试|Elasticsearch和ClickHouse的对决

前言 数据库作为产品架构的重要组成部分,一直是技术人员做产品选型的考虑因素之一。 ClkLog会经常遇到小伙伴问支持兼容哪几种数据库?为什么是选择ClickHouse而不是这个或那个。 由于目前市场上主流的数据库有许多,这次我们选择其中一个比较典…

深度学习3 基于规则的决策树模型

1.决策树是一种归纳学习算法,从一些没有规则、没有顺序、杂乱无章的数据中,推理出决 策模型。不管是什么算法的决策树,都是一种对实例进行分类的树形结构。决策树有三个要素:节点(Node)、分支(Branches)和结果(Leaf)。 训练决策树…

算法010:无重复字符的最长子串

无重复字符的最长子串. - 备战技术面试?力扣提供海量技术面试资源,帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/longest-substring-without-repeating-characters/ 使用的算法:滑动窗口 在这个…

干货 | 2024大模型增强下的图智能在金融场景的应用(免费下载)

如需下载本方案PPT/WORD原格式,诚挚邀请您微信扫描以下二维码加入方案驿站知识星球,获取上万份PPT/WORD解决方案!!!感谢支持!!!

SSM高校教师教学质量评估系统-计算机毕业设计源码03344

摘要 在高等教育中,教学质量是培养优秀人才的关键。为了提高教学质量,高校需要建立一套科学、有效的教师教学质量评估系统。本研究采用 SSM技术框架,旨在开发一款高校教师教学质量评估系统。 SSM框架作为一种成熟的Java开发框架,具…

Mysql5.7并发插入死锁问题

死锁的产生条件 互斥、请求和保持、不可剥夺、循环等待 MySQL锁类型 死锁复现 环境:Mysql 5.7版本,Innodb引擎,可重复度隔离级别 并发场景下使用duplicate key update插入或更新数据可能会造成死锁,下面就产生死锁的条件进行模…

七大排序-冒泡排序,插入排序,希尔排序(一)

目录 排序冒泡排序插入排序冒泡排序和插入排序的对比希尔排序 排序 先写单趟,再写多趟,这样比较好写 排序可以理解为对商品价格的排序,对数字大小的排序,排序再生活中随处可见 冒泡排序 冒泡排序就是两个相邻的数交换&#xff…

GD32 MCU ADC采样率如何计算?

大家在使用ADC采样的时候是否计算过ADC的采样率,这个问题非常关键! 以下为GD32F303系列MCU中有关ADC的参数,其中ADC时钟最大值为40MHz,12位分辨率下最大采样率为2.86MSPS.如果ADC时钟超频的话,可能会造成ADC采样异常&…

工作两年后,我如何看待设计模式

在软件工程中,设计模式是经过反复验证的最佳实践,用于解决在软件设计中经常遇到的一类问题。它们为开发者提供了一种通用的解决方案和语言,使得复杂的编程问题得以简化,代码结构更加清晰,可维护性大大提高。简而言之&a…

阶段三:项目开发---大数据开发运行环境搭建:任务6:安装配置HBase

任务描述 知识点:安装配置HBase 重 点: 安装配置HBase 难 点:无 内 容: 本阶段任务是安装配置HBase,实时飞行数据是保存在HBase中的,因为HBase具有高效的读写能力,在当前项目中我们是…

矢量绘图设计Sketch中文 Sketch直装安装包

Sketch是一款专为UI设计师和UX专家打造的矢量图形设计软件,以其简洁的界面、强大的功能和高效的协作能力而闻名。Sketch支持快速创建高质量的UI界面、图标、图形和插画,其矢量绘图工具让设计细节更加精准。同时,Sketch内置丰富的插件和组件库…

基于vue的3D高德地图的引入

在引入高德地图的时候需要先注册一个账号 登录下面的网站 账号认证 | 高德控制台 (amap.com) 打开首页应用管理,我的应用 创建新的应用 根据自己的需求进行选择 创建完成之后,点击添加key 不同的服务平台对应不同的可使用服务,选择自己适…

LeetCode刷题之HOT100之完全平方数

2024 7/7 转眼间就到周日啦!昨天下午开组会,开了三个半小时。如坐针毡,会后跑了个步、洗了个澡、洗了衣服、躺床上看了会《罪与罚》,睡着了。早上起来,去拿我昨晚充电的车,当我看到车没有停在昨天的位置&am…

《算法笔记》总结No.3——排序

基础算法之一,相当重要。在普通的机试中如果没有数据类型和时空限制,基本上选择自己最熟悉的就好。本篇只总结选择排序和插入排序,侧重应用,408中要求的种类更加繁多,此处先不扩展难度~总结最常用的两种排序。 一.选择…

14-24 剑和侠客 – 预训练模型三部曲1 - 文本

在这个三部曲中,我们旨在从三个部分深入研究预训练模型:文本、图像和机器人。 我们旨在探索它们的概念、出现以及这些模型的工作原理。还将研究预训练模型的不同架构和类型。 探索哪些是最强大的,以及预训练模型和 Transformers 是否是 LLM…

常用SQL语句(基础篇)

前言 查询的sql的结构是 select...from...where...group by...having...order by...limit... 写查询sql的时候需要按照如下顺序写 from,where(and,or,!),group by,select&#xf…

文章解读与仿真程序复现思路——太阳能学报EI\CSCD\北大核心《计及电-热-氢负荷与动态重构的主动配电网优化调度》

本专栏栏目提供文章与程序复现思路,具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

Simulated Annealing

模拟退火最大值算法: 初始化起始解 x 0 x_0 x0​ 、温度 t 0 t_0 t0​ 以及迭代次数 steps,计算初始值 y 0 y_0 y0​扰动产生新解 x 1 x_1 x1​, 计算对应函数值 y 1 y_1 y1​依据 Δ y y 1 − y 0 \Delta y y_1 - y_0 Δyy1​−y0​ 决策是否接…

缓存-分布式锁-原理和基本使用

分布式锁原理和使用 自旋 public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {Boolean b redisTemplate.opsForValue().setIfAbsent(Lock, Lock, Duration.ofMinutes(1));if (!b) {int i 10;while (i > 0) {Object result redisTe…