KDP数据分析实战:从0到1完成数据实时采集处理到可视化

智领云自主研发的开源轻量级Kubernetes数据平台,即Kubernetes Data Platform (简称KDP),能够为用户提供在Kubernetes上的一站式云原生数据集成与开发平台。在最新的v1.1.0版本中,用户可借助 KDP 平台上开箱即用的 Airflow、AirByte、Flink、Kafka、MySQL、ClickHouse、Superset 等开源组件快速搭建实时、半实时或批量采集、处理、分析的数据流水线以及可视化报表展示,可视化展示效果如下:

247984561aa6a7370c0c0741a330aded.png

以下我们将介绍一个实时订单数据流水线从数据采集到数据处理,最后到可视化展示的详细建设流程。

 1.流水线设计

借助 KDP 平台的开源组件 Airflow、MySQL、Flink、Kafka、ClickHouse、Superset 完成数据实时采集处理及可视化分析,架构如下: 

8ea91c86309be540823ed486fe2b0dce.jpeg

1.1 数据流

  • 直接使用Flink构建实时数仓,由Flink进行清洗加工转换和聚合汇总,将各层结果集写入Kafka中;

  • ClickHouse从Kafka分别订阅各层数据,将各层数据持久化到ClickHouse中,用于之后的查询分析。

1.2 数据表

本次分析数据基于mock数据,包含数据实时采集处理及可视化分析:

  • 消费者表:customers

字段

字段说明

id

用户ID

name

姓名

age

年龄

gender

性别

  • 订单表:orders

字段

字段说明

order_id

订单ID

order_revenue

订单金额

order_region

下单地区

customer_id

用户ID

create_time

下单时间

1.3 环境说明

在 KDP 页面安装如下组件并完成组件的 QuickStart:

  • MySQL: 实时数据数据源及 Superset/Airflow 元数据库,安装时需要开启binlog

  • Kafka: 数据采集sink

  • Flink: 数据采集及数据处理

  • ClickHouse: 数据存储

  • Superset: 数据可视化

  • Airflow: 作业调度

2. 数据集成与处理

文中使用的账号密码信息请根据实际集群配置进行修改。

2.1 创建MySQL表

2.2 创建 Kafka Topic

进入Kafka broker pod,执行命令创建 Topic,也可以通过Kafka manager 页面创建,以下为进入pod并通过命令行创建的示例:

export BOOTSTRAP="kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092" bin/kafka-topics.sh --create \--topic ods-order \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAP bin/kafka-topics.sh --create \--topic ods-customers \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dwd-order-customer-valid \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dws-agg-by-region \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAP

2.3 创建 ClickHouse 表

进入clickhouse pod,使用`clickhouse-client`执行命令创建表,以下为建表语句:

CREATE DATABASE IF NOT EXISTS kdp_demo;
USE kdp_demo;-- kafka_dwd_order_customer_valid
CREATE TABLE IF NOT EXISTS kdp_demo.dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = MergeTree()
ORDER BY order_id;CREATE TABLE kdp_demo.kafka_dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dwd-order-customer-valid',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dwd_order_customer_valid TO kdp_demo.dwd_order_customer_valid AS
SELECTorder_id,order_revenue,order_region,create_time,customer_id,customer_age,customer_name,customer_gender
FROM kdp_demo.kafka_dwd_order_customer_valid;-- kafka_dws_agg_by_region
CREATE TABLE IF NOT EXISTS kdp_demo.dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = ReplacingMergeTree()
ORDER BY order_region;CREATE TABLE kdp_demo.kafka_dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dws-agg-by-region',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dws_agg_by_region TO kdp_demo.dws_agg_by_region AS
SELECTorder_region,order_cnt,order_total_revenue
FROM kdp_demo.kafka_dws_agg_by_region;

2.4 创建 Flink SQL 作业

2.4.1 SQL部分

CREATE DATABASE IF NOT EXISTS `default_catalog`.`kdp_demo`;-- create source tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`orders_src`(`order_id` INT NOT NULL,`order_revenue` FLOAT NOT NULL,`order_region` STRING NOT NULL,`customer_id` INT NOT NULL,`create_time` TIMESTAMP,PRIMARY KEY(`order_id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'orders'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`customers_src` (`id` INT NOT NULL,`age` FLOAT NOT NULL,`name` STRING NOT NULL,`gender` STRING NOT NULL,PRIMARY KEY(`id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'customers'
);-- create ods dwd and dws tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_order_table` (`order_id` INT,`order_revenue` FLOAT,`order_region` VARCHAR(40),`customer_id` INT,`create_time` TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-order','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_customers_table` (`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`gender` STRING,PRIMARY KEY (customer_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-customers','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dwd_order_customer_valid` (`order_id` INT,`order_revenue` FLOAT,`order_region` STRING,`create_time` TIMESTAMP,`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`customer_gender` STRING,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dwd-order-customer-valid','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dws_agg_by_region` (`order_region` VARCHAR(40),`order_cnt` BIGINT,`order_total_revenue` FLOAT,PRIMARY KEY (order_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dws-agg-by-region','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);USE kdp_demo;
-- EXECUTE STATEMENT SET
-- BEGIN
INSERT INTO ods_order_table SELECT * FROM orders_src;
INSERT INTO ods_customers_table SELECT * FROM customers_src;
INSERT INTOdwd_order_customer_valid
SELECTo.order_id,o.order_revenue,o.order_region,o.create_time,c.id as customer_id,c.age as customer_age,c.name as customer_name,c.gender as customer_gender
FROMcustomers_src cJOIN orders_src o ON c.id = o.customer_id
WHEREc.id <> -1;
INSERT INTOdws_agg_by_region
SELECTorder_region,count(*) as order_cnt,sum(order_revenue) as order_total_revenue
FROMdwd_order_customer_valid
GROUP BYorder_region;
-- END;

2.4.2 使用 StreamPark 创建 Flink SQL 作业

具体使用参考 StreamPark 文档。

maven 依赖:

<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version>
</dependency>

2.5 创建 Airflow DAG

2.5.1 DAG 文件部分

import random
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_agodefault_args = {'owner': 'admin','depends_on_past': False,'email_on_failure': False,'email_on_retry': False,'retries': 1,
}dag = DAG('kdp_demo_order_data_insert',description='Insert into orders by using random data',schedule_interval=timedelta(minutes=1),start_date=days_ago(1),catchup=False,tags=['kdp-example'],
)# MySQL connection info
mysql_host = 'kdp-data-mysql'
mysql_db = 'kdp_demo'
mysql_user = 'bdos_dba'
mysql_password = 'KdpDba!mysql123'
mysql_port = '3306'
cities = ["北京", "上海", "广州", "深圳", "成都", "杭州", "重庆", "武汉", "西安", "苏州", "天津", "南京", "郑州","长沙", "东莞", "青岛", "宁波", "沈阳", "昆明", "合肥", "大连", "厦门", "哈尔滨", "福州", "济南", "温州","佛山", "南昌", "长春", "贵阳", "南宁", "金华", "石家庄", "常州", "泉州", "南通", "太原", "徐州", "嘉兴","乌鲁木齐", "惠州", "珠海", "扬州", "兰州", "烟台", "汕头", "潍坊", "保定", "海口"]
city = random.choice(cities)
consumer_id = random.randint(1, 100)
order_revenue = random.randint(1, 100)
# 插入数据的 BashOperator
insert_data_orders = BashOperator(task_id='insert_data_orders',bash_command=f'''mysql -h {mysql_host} -P {mysql_port} -u {mysql_user} -p{mysql_password} {mysql_db} -e "INSERT INTO orders(order_revenue,order_region,customer_id) VALUES({order_revenue},'{city}',{consumer_id});"''',dag=dag,
)
insert_data_orders

2.5.2 DAG 说明及执行

当前Airflow安装时,需要指定可访问的git 仓库地址,因此需要将 Airflow DAG 提交到 Git 仓库中。每分钟向orders表插入一条数据。

2.6 数据验证

使用ClickHouse验证数据:

(1)进入ClickHouse客户端

clickhouse-client 
# default pass: ckdba.123

(2)执行查询

SELECT * FROM kdp_demo.dwd_order_customer_valid;
SELECT count(*) FROM kdp_demo.dwd_order_customer_valid;

(3)对比验证MySQL中数据是否一致

select count(*) from kdp_demo.orders;

3. 数据可视化

在2.6中数据验证通过后,可以通过Superset进行数据可视化展示。使用账号`admin/admin`登录Superset页面(注意添加本地 Host 解析):http://superset-kdp-data.kdp-e2e.io

3.1 创建图表

导入我们制作好的图表:

  1. 下载面板:https://gitee.com/linktime-cloud/example-datasets/raw/main/superset/dashboard_export_20240607T100739.zip

  2. 导入面板

(1)选择下载的文件导入

eed49ffb69952693ad5a46da6be81f08.png

(2)输入 ClickHouse 的用户`default`的默认密码`ckdba.123`:

4c07d54c7ad0f2f66085481d5bfe77ab.png

3.2 效果展示

最终的实时订单数据图表展示如下,随着订单数据的更新,图表中的数据也会实时更新:

57bd6fa097e66e1da7dc8aa103a599b4.png

快速体验

🚀GitHub项目:

https://github.com/linktimecloud/kubernetes-data-platform

欢迎您参与开源社区的建设🤝

 - FIN -       

1ad0c68fe5ea3d59a392d306012eef0b.png

更多精彩推

  • 我们开源啦!一键部署免费使用!Kubernetes上直接运行大数据平台!

  • 开源 KDP  v1.1.0 版本正式发布,新增数据集成开发应用场景

  • 在 KubeSphere 上快速安装和使用 KDP 云原生数据平台

  • 在 Rancher 上快速安装和使用 KDP 云原生数据平台

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/44336.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL数据库中利用定时作业去杀死长时查询以防止数据库死锁风险

MySQL数据库中没有SQLServer数据库中那种传统的定时作业的概念。但是提供了一种【事件】的东西&#xff0c;基本和定时作业貌离神合。 下面我们在MySQL中创建一个事件&#xff0c;它的作用是去监测时间很长的异常查询&#xff0c;并且去主动杀掉该线程以防止数据库发生死锁的风…

探索Perl的自动清洁工:垃圾收集机制全解析

&#x1f9f9; 探索Perl的自动清洁工&#xff1a;垃圾收集机制全解析 Perl是一种高级编程语言&#xff0c;以其强大的文本处理能力而闻名。在Perl中&#xff0c;内存管理对于开发高效且稳定的应用程序至关重要。Perl提供了自动垃圾收集机制&#xff0c;帮助开发者管理内存&…

关于原型和原型链的学习和实践

在前端面试中&#xff0c;原型和原型链始终是一个避不开的问题&#xff0c;今天就弄明白! 原型和原型链 对象的创建方式工厂模式构造函数模式原型模式 原型和原型链实践 对象的创建方式 原型和原型链都是关于对象的内容&#xff0c;先来看一下JavaScript中对象的构建方式。 工…

代码随想录(day3)有序数组的平方

暴力求解法&#xff1a; 注意&#xff1a;需要确定范围&#xff0c;比如nums.sort()是在for循环之外&#xff0c;根据函数的功能来确定 return返回的是nums&#xff0c;而不是nums[i]因为返回的是整个数组 class Solution(object):def sortedSquares(self, nums):for i in r…

人话学Python-基础篇-数字计算

一&#xff1a;数字类型 对于最常见的数据类型,数字在Python中分为三类&#xff1a; 整型(int) 表示的是整数类型的所有数字&#xff0c;包括正整数&#xff0c;负整数和0。和C语言不同的是&#xff0c;Python中的int型没有范围的限制&#xff0c;理论上可以从无限小的整数取到…

RedHat运维-Ansible自动化运维基础22-rhel-system-roles

1. system_roles的官方文档的位置是___________________________________&#xff1b; 2. system_roles的官方文档的位置是___________________________________&#xff1b; 3. system_roles的官方文档的位置是___________________________________&#xff1b; 4. 安装rhel-s…

react基础语法,模板语法,ui渲染,jsx,useState状态管理

创建一个react应用 这里使用create-react-app的脚手架构建项目&#xff08;结构简洁&#xff0c;基于webpack-cli&#xff09;&#xff0c; npx create-react-app [项目名称] 使用其他脚手架构建项目可以参考&#xff1a;react框架&#xff0c;使用vite和nextjs构建react项目…

数学建模国赛入门指南

文章目录 认识数学建模及国赛认识数学建模什么是数学建模&#xff1f;数学建模比赛 国赛参赛规则、评奖原则如何评省、国奖评奖规则如何才能获奖 国赛赛题分类及选题技巧国赛赛题特点赛题分类 国赛历年题型及优秀论文数学建模分工技巧数模必备软件数模资料文献数据收集资料收集…

力扣题解(乘积为正数的最长子数组长度)

1567. 乘积为正数的最长子数组长度 已解答 中等 给你一个整数数组 nums &#xff0c;请你求出乘积为正数的最长子数组的长度。 一个数组的子数组是由原数组中零个或者更多个连续数字组成的数组。 请你返回乘积为正数的最长子数组长度。 本题要求乘积为正数&#xff0c;而整…

白蛇插画:成都亚恒丰创教育科技有限公司

白蛇插画&#xff1a;古韵今风&#xff0c;情深意长 在浩瀚的艺术长河中&#xff0c;插画作为一种独特的艺术形式&#xff0c;以其生动形象的画面、丰富多彩的色彩和深邃悠远的意境&#xff0c;成都亚恒丰创教育科技有限公司深受人们喜爱。而“白蛇插画”&#xff0c;作为融合…

bug - while parsing file included at

bug 如下 找到这个对应文件tb_top.sv的对应行&#xff0c;发现是一个 include "inc_tb_tests_xxx.sv" 问题点&#xff1a;头文件&#xff0c;重复定义&#xff0c;那么 解决方法- 在被include的文件首尾加入 ifndef MY_TRANSACTION__SV define MY_TRANSACTION__SV …

GenAI 技术堆栈架构师指南 - 十种工具

这篇文章于 2024 年 6 月 3 日首次出现在 The New Stack 上。 我之前写过关于现代数据湖参考架构的文章&#xff0c;解决了每个企业面临的挑战——更多的数据、老化的Hadoop工具&#xff08;特别是HDFS&#xff09;以及对RESTful API&#xff08;S3&#xff09;和性能的更大需求…

《javascript语言精粹》学习笔记之函数特性

分析javascript javascript比较好的思想&#xff1a;函数、弱类型、动态对象、对象字面量表示法 不好的思想&#xff1a;基于全局变量的编程模型 函数 函数对象 函数就是对象&#xff0c;新创建的函数会连接到Function.prototype上&#xff0c;没和函数创建时附带有两个隐藏…

前端--第一个前端程序

第一个前端程序 第一步&#xff1a; 使用记事本&#xff0c;编写代码 在你的一个磁盘里面创建一个文件夹&#xff0c;名为前端&#xff0c;然后在里面新建一个记事本&#xff0c;在里面写如下代码&#xff0c;注意一定要使用英文&#xff0c;然后把后缀名称改为.html。 第二…

你明白C++中的多态吗?(暑假提升-多态专题)

内不欺己&#xff0c;外不欺人。———孔子 有趣的多态 1、前言2、概念3、多态定义与产生条件4、多态的重要组成成员-(虚函数)5、虚函数的重写(覆盖)6、辅助关键字override与final(了解即可)7、重载&#xff0c;重定义(隐藏)&#xff0c;重写(覆盖)8、抽象类9、多态的原理9、1、…

PHP老照片修复文字识别图像去雾一键抠图微信小程序源码

&#x1f50d;解锁复古魅力&#xff0c;微信小程序黑科技大揭秘&#xff01;老照片修复&更多神奇功能等你来试&#xff01; &#x1f4f8; 【老照片修复&#xff0c;时光倒流的美颜术】 你是否珍藏着一堆泛黄的老照片&#xff0c;却因岁月侵蚀而模糊不清&#xff1f;现在…

实验02 黑盒测试(组合测试、场景法)

1. 组合测试用例设计技术 指出等价类划分法和边界值分析法通常假设输入变量相互独立&#xff0c;但实际情况中变量间可能存在关联。全面测试&#xff1a;覆盖所有输入变量的所有可能组合&#xff0c;测试用例数量随输入变量的增加而指数增长。 全面测试需要对所有输入的各个取…

2008年上半年软件设计师【上午题】真题及答案

文章目录 2008年上半年软件设计师上午题--真题2008年上半年软件设计师上午题--答案 2008年上半年软件设计师上午题–真题 2008年上半年软件设计师上午题–答案

按模版批量生成定制合同

提出问题 一个仪器设备采购公司&#xff0c;商品合同采购需要按模版生成的固定的文件&#xff0c;模板是固定的&#xff0c;只是每次需要替换信息&#xff0c;然后打印出来寄给客户。 传统方法 如果手工来做这个事情&#xff0c;准备好数据之后&#xff0c;需要从Excel表格中…

Qt5 Ubuntu18 QStackedWidget

1、在实际项目开发过程遇到&#xff0c;如果通过UI插件的属性设置&#xff0c;通过对默认的两个页面进行提升需要切换操作的对象&#xff0c;如果该对象需要外部接口传入数据&#xff0c;实现界面信息的实时刷新&#xff0c;这样会失败&#xff0c;失败的原因很好理解&#xff…