Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

上周六在深圳分享了《Flink SQL 1.9.0 技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。希望对于 Flink SQL 的初学者能有所帮助。完整分享可以观看 Meetup 视频回顾 :https://developer.aliyun.com/live/1416

演示代码已经开源到了 GitHub 上:https://github.com/wuchong/flink-sql-submit

这份代码主要由两部分组成:1) 能用来提交 SQL 文件的 SqlSubmit 实现。2) 用于演示的 SQL 示例、Kafka 启动停止脚本、 一份测试数据集、Kafka 数据源生成器。

通过本实战,你将学到:

  1. 如何使用 Blink Planner
  2. 一个简单的 SqlSubmit 是如何实现的
  3. 如何用 DDL 创建一个 Kafka 源表和 MySQL 结果表
  4. 运行一个从 Kafka 读取数据,计算 PVUV,并写入 MySQL 的作业
  5. 设置调优参数,观察对作业的影响

SqlSubmit 的实现

笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句。所以笔者就只好自己写了个简单的提交脚本。后来想想,也挺好的,可以让听众同时了解如何通过 SQL 的方式,和编程的方式使用 Flink SQL。

SqlSubmit 的主要任务是执行和提交一个 SQL 文件,实现非常简单,就是通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则会调用 tEnv.sqlUpdate(...)。如果是 SET 开头,则会将配置设置到 TableConfig 上。其核心代码主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建一个使用 Blink Planner 的 TableEnvironment, 并工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 读取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通过正则表达式匹配前缀,来区分不同的 SQL 语句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根据不同的 SQL 语句,调用 TableEnvironment 执行
for (SqlCommandCall call : calls) {switch (call.command) {case SET:String key = call.operands[0];String value = call.operands[1];// 设置参数tEnv.getConfig().getConfiguration().setString(key, value);break;case CREATE_TABLE:String ddl = call.operands[0];tEnv.sqlUpdate(ddl);break;case INSERT_INTO:String dml = call.operands[0];tEnv.sqlUpdate(dml);break;default:throw new RuntimeException("Unsupported command: " + call.command);}
}
// 提交作业
tEnv.execute("SQL Job");

使用 DDL 连接 Kafka 源表

在 flink-sql-submit 项目中,我们准备了一份测试数据集(来自阿里云天池公开数据集,特别鸣谢),位于 src/main/resources/user_behavior.log。数据以 JSON 格式编码,大概长这个样子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

为了模拟真实的 Kafka 数据源,笔者还特地写了一个 source-generator.sh 脚本(感兴趣的可以看下源码),会自动读取 user_behavior.log 的数据并以默认每毫秒1条的速率灌到 Kafka 的 user_behavior topic 中。

有了数据源后,我们就可以用 DDL 去创建并连接这个 Kafka 中的 topic(详见 src/main/resources/q1.sql)。

CREATE TABLE user_log (user_id VARCHAR,item_id VARCHAR,category_id VARCHAR,behavior VARCHAR,ts TIMESTAMP
) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_behavior',  -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append','format.type' = 'json',  -- 数据源格式为 json'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)

注:可能有用户会觉得其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。

使用 DDL 连接 MySQL 结果表

连接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (dt VARCHAR,pv BIGINT,uv BIGINT
) WITH ('connector.type' = 'jdbc', -- 使用 jdbc connector'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url'connector.table' = 'pvuv_sink', -- 表名'connector.username' = 'root', -- 用户名'connector.password' = '123456', -- 密码'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条
)

PV UV 计算

假设我们的需求是计算每小时全网的用户访问量,和独立用户数。很多用户可能会想到使用滚动窗口来计算。但这里我们介绍另一种方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECTDATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,COUNT(*) AS pv,COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 这个内置函数,将日志时间归一化成“年月日小时”的字符串格式,并根据这个字符串进行分组,即根据每小时分组,然后通过 COUNT(*) 计算用户访问量(PV),通过 COUNT(DISTINCT user_id) 计算独立用户数(UV)。这种方式的执行模式是每收到一条数据,便会进行基于之前计算的值做增量计算(如+1),然后将最新结果输出。所以实时性很高,但输出量也大。

我们将这个查询的结果,通过 INSERT INTO 语句,写到了之前定义的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我们有对这种查询的性能调优做了深度的介绍。

实战演示

环境准备

本实战演示环节需要安装一些必须的服务,包括:

  • Flink 本地集群:用来运行 Flink SQL 任务。
  • Kafka 本地集群:用来作为数据源。
  • MySQL 数据库:用来作为结果表。
  • Flink 本地集群安装

1.下载 Flink 1.9.0 安装包并解压:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
2.下载以下依赖 jar 包,并拷贝到 flink-1.9.0/lib/ 目录下。因为我们运行时需要依赖各个 connector 实现。

  • flink-sql-connector-kafka_2.11-1.9.0.jar
    http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
    http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
    http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
    https://dev.mysql.com/downloads/connector/j/5.1.html

3.将 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因为我们的演示任务可能会消耗多于1个的 slot。
4.在 flink-1.9.0 目录下执行 ./bin/start-cluster.sh,启动集群。

运行成功的话,可以在 http://localhost:8081 访问到 Flink Web UI。

另外,还需要将 Flink 的安装路径填到 flink-sql-submit 项目的 env.sh 中,用于后面提交 SQL 任务,如我的路径是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安装

下载 Kafka 2.2.0 安装包并解压:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

将安装路径填到 flink-sql-submit 项目的 env.sh 中,如我的路径是

KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

在 flink-sql-submit 目录下运行 ./start-kafka.sh 启动 Kafka 集群。

在命令行执行 jps,如果看到 Kafka 进程和 QuorumPeerMain 进程即表明启动成功。

MySQL 安装

可以在官方页面下载 MySQL 并安装:
https://dev.mysql.com/downloads/mysql/
如果有 Docker 环境的话,也可以直接通过 Docker 安装
https://hub.docker.com/_/mysql

$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

然后在 MySQL 中创建一个 flink-test 的数据库,并按照上文的 schema 创建 pvuv_sink 表。

提交 SQL 任务

1.在 flink-sql-submit 目录下运行 ./source-generator.sh,会自动创建 user_behavior topic,并实时往里灌入数据。

2.在 flink-sql-submit 目录下运行 ./run.sh q1, 提交成功后,可以在 Web UI 中看到拓扑。

在 MySQL 客户端,我们也可以实时地看到每个小时的 pv uv 值在不断地变化

结尾

本文带大家搭建基础集群环境,并使用 SqlSubmit 提交纯 SQL 任务来学习了解如何连接外部系统。flink-sql-submit/src/main/resources/q1.sql 中还有一些注释掉的调优参数,感兴趣的同学可以将参数打开,观察对作业的影响。关于这些调优参数的原理,可以看下我在 深圳 Meetup 上的分享《Flink SQL 1.9.0 技术内幕和最佳实践》。


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517983.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java的基础语法和数据类型,IDEA

IDEA的用法 快捷方法 主函数&#xff1a;psvm输出语句&#xff1a;sout java基础语法注释 注释&#xff1a;必须要写注释 单行注释&#xff1a;//多行注释&#xff1a;/**/文档注释javaDoc&#xff1a;/***/ 平时写代码要注意规范 标识符和关键字 所有标识符应该都以字母&…

手把手教你配置VS Code 远程开发工具,工作效率提升N倍

来源 | 后端技术学堂责编 | Carol封图 | CSDN 付费下载于视觉中国今天和大家分享一个远程开发解决方案&#xff0c;聊一聊我平常是如何用 VS Code 进行远程开发工作的&#xff0c;以及一步步教你搭建远程开发环境&#xff0c;拥有比德芙还丝滑的远程开发体验。我们厂里为了最大…

蚂蚁金服隗华:十五年时间见证分布式数据库的崛起

北大计算所启蒙 “做中国人自己的技术” 如果用一句话来评价读书时的隗华&#xff08;花名&#xff1a;风羿&#xff09;&#xff0c;那一定是“德智体美劳全面发展的好学生”。本科在北航读的计算机专业&#xff0c;硕士则就读于北大的计算机研究所。 北大&#xff0c;中国高…

用户数从 0 到亿,我的 K8s 踩坑血泪史

导读&#xff1a;容器服务 Kubernetes 是目前炙手可热的云原生基础设施&#xff0c;作者过去一年上线了一个用户数极速增长的应用&#xff1a;该应用一个月内日活用户从零至四千万&#xff0c;用户数从零到一亿的裂变式增长&#xff0c;充分享受了容器服务快速简便的扩容操作和…

行,Python玩大了!​取代Excel,程序员:太牛!你怎么看?

Python真的玩大了吗&#xff1f;2020年&#xff0c;Python程序员究竟怎么样&#xff1f;A与B程序员与远方近日日本最大的证券公司之一野村证券首席数字官马修汉普森&#xff0c;在Quant Conference上发表讲话&#xff1a;“用Excel的人越来越少&#xff0c;大家都在用Python。”…

云原生计算重塑企业IT架构 - 分布式应用架构

进入21世纪以来&#xff0c;我们见证了企业分布式应用架构从SOA(Service-oriented Architecture)&#xff0c;到微服务架构&#xff0c;再到云原生应用架构的演化。 为了说明企业架构演化背后的思考&#xff0c;我们先谈一些玄学。 第一&#xff0c;企业IT系统的复杂性&#…

首发!《长安十二时辰背后的技术秘籍》正式公开,速来下载

一名死囚如何在十二时辰内利用“唐代黑科技”&#xff0c;拯救长安百姓于水火中&#xff1f; 这就是《长安十二时辰》的故事&#xff0c;剧中有恢弘的长安美景、让人流口水的水晶柿子/水盆羊肉&#xff0c;还有张小敬和檀棋“在一起”呼声……然而&#xff0c;最让人刮目相看的…

ETL异构数据源Datax_MySQL同步Oracle(全量)_07

文章目录1. 清除Oracle数据库中OTBS1表的数据2. 构建json3. 执行脚本4. 同步验证5. 同步分析7. 同步结果1. 清除Oracle数据库中OTBS1表的数据 Truncate TABLE OTBS1;2. 构建json {"core": {"transport": {"channel": {"speed": {&qu…

左手代码右手滑板 支付宝这个程序员有些酷

走在杭州支付宝z空间的园区&#xff0c;常常可以看到一个脚踩滑板&#xff0c;脑后扎个发髻的男青年。 他叫边柳。来蚂蚁金服三年&#xff0c;除了是一名前端码农&#xff0c;也是一位斜杠青年。捧着程序员的“饭碗”&#xff0c;兼顾着滑板和摇滚的爱好&#xff0c;可以说他过…

2019阿里云910会员节大促主会场全攻略

2019阿里云910会员大促活动已经于8月28日正式开启&#xff0c;从已开放的活动页面来看&#xff0c;整场大促活动由阿里云10年有礼时光机、爆款产品推荐、七大分会场组成。 在910这个秋季大幅度优惠促销日&#xff0c;怎样才能花最少的钱配置最特惠的云服务&#xff1f;云栖社区…

浪潮商用机器与腾讯TDSQL完成互认证 共同拓展Power行业生态

日前&#xff0c;浪潮商用机器有限公司宣布&#xff0c;旗下K1 Power服务器系列产品经过几十项基础功能和高可用功能用例的专业测试&#xff0c;与腾讯新兴国产分布式数据库TDSQL完美兼容&#xff0c;且性能优异&#xff0c;可进行顺利的部署、平稳的运行及对外提供服务。此次互…

历时五天用 SwiftUI 做了一款 APP,阿里工程师如何做的?

作者|姜沂(倾寒) 出品|阿里巴巴新零售淘系技术部 导读&#xff1a;自 2014 年苹果发布会发布 Swift 之后, Swift 经过多年迭代&#xff0c;终于达到了 ABI 稳定版本&#xff0c;也意味着 Swift 做为稳定的得语言&#xff0c;值得用在大型 APP&#xff0c; 用来生产环境中。 2…

Istio从懵圈到熟练 – 二分之一活的微服务

Istio is the future&#xff01;基本上&#xff0c;我相信对云原生技术趋势有些微判断的同学&#xff0c;都会有这个觉悟。其背后的逻辑其实是比较简单的&#xff1a;当容器集群&#xff0c;特别是K8S成为事实上的标准之后&#xff0c;应用必然会不断的复杂化&#xff0c;服务…

数据结构与算法、讲解、动态规划一脸懵?看完之后轻松掌握!

来源 | 昊天码字责编 | Carol封图 | CSDN 付费下载于视觉中国碰到动态规划问题摸不着头脑&#xff1f;总结不出动态规划的类型&#xff1f;有多少人曾经历过这种迷茫与无助&#xff1f;看完本文&#xff0c;让你一脚迈进动态规划的大门。我们在用递归求解问题的过程中&#xff…

搜索场景下的智能推荐演变之路

摘要&#xff1a;传统的推荐手段主要还是深度挖掘用户行为和内容本身相似性的价值&#xff0c;包括但不限于协同过滤&#xff0c;内容表征向量召回&#xff0c;以及各式各样的点击率预估模型&#xff0c;然后这样的推荐行为缺乏内在的逻辑性和可解释性&#xff0c;有一种知其然…

调查了 17,000 多位程序员,当前的云原生开发现状究竟如何?

整理 | 弯月&#xff0c;责编 | 郭芮头图 | CSDN 下载自东方IC出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;容器的标准化使用改变了软件的开发方式&#xff0c;我们迎来了开发运维的时代&#xff0c;基于云原生的开发能够帮助我们构建更灵活、更强大的应用程序。…

阿里研究员:测试稳定性三板斧,我怎么用?

阿里妹导读&#xff1a;如何治理测试稳定性问题&#xff1f;很多人会说&#xff1a;环境、流程管控、监控、工具化、加机器、专人负责、等等。这些都是对的。不过这些都是解决方案层面的&#xff0c;而不是方法论和理论体系层面的。今天&#xff0c;阿里研究员郑子颖来说说测试…

阿里架构总监一次讲透中台架构,13页PPT精华详解,建议收藏!

本文整理了阿里几位技术专家&#xff0c;如架构总监 谢纯良&#xff0c;中间件技术专家 玄难等几位大牛&#xff0c;关于中台架构的几次分享内容&#xff0c;将业务中台形态、中台全局架构、业务中台化、中台架构图、中台建设方法论、中台组织架构、企业中台建设实施步骤等总共…

Redis 6.0 的客户端缓存是怎么肥事?一文带你了解!

来源 | 程序员历小冰责编 | Carol封图 | CSDN 付费下载于视觉中国近日 Redis 6.0.0 GA 版本发布&#xff0c;这是 Redis 历史上最大的一次版本更新&#xff0c;包括了客户端缓存 (Client side caching)、ACL、Threaded I/O 和 Redis Cluster Proxy 等诸多更新。我们今天就依次聊…

AI时代,你的职业会是?99%的人都无法直面!

在我10岁的时候&#xff0c;算命先生曾对说我30岁时我会每天与八阿哥玩在一起。 当时懵懂的我一脸茫然&#xff0c;想着谁是我的八阿哥&#xff0c;却在30岁的这一年意识到自己确实日以继夜的与八阿哥在一起。 曾经&#xff0c;我们也担心自己未来的工作岗位是否会被人工智能给…