揭秘“湖仓一体”——Flink+Paimon+StarRocks,打造实时分析新纪元

1.湖仓一体

数据湖仓是 Flink 流批一体发挥重要作用的场景,使用 Flink + Paimon + starRocks 来构建湖仓一体数据分析.
Apache Paimon 是一个专为实时数据处理而设计的湖表格式,它最大的亮点是使用了 LSM Tree 技术。与 Hudi 相比,Paimon 在更新插入(Upsert)操作上速度快了4倍,查询扫描(Scan)速度也提高了10倍。这意味着它能提供更快的响应速度,同时降低数据入湖的成本,并且让开发者用起来更高效。Paimon 社区十分活跃,很多产品都在迅速与其兼容,这让它的生态系统发展得比其他湖库表格式更快、更全面。
StarRocks 是一款高性能分析型数据仓库,使用向量化、MPP 架构、CBO、智能物化视图、可实时更新的列式存储引擎等技术实现多维、实时、高并发的数据分析。StarRocks 既支持从各类实时和离线的数据源高效导入数据,也支持直接分析数据湖上各种格式的数据。StarRocks 兼容 MySQL 协议,可使用 MySQL 客户端和常用 BI 工具对接。同时 StarRocks 具备水平扩展,高可用、高可靠、易运维等特性。广泛应用于实时数仓、OLAP 报表、数据湖分析等场景。
湖仓一体化.jpg

Flink + Paimon + StarRocks 流式湖仓方案将 3 个产品做了非常紧密的结合,首先使用 Flink 流批一体计算引擎将数仓以 Paimon 格式在湖上构建,使用 Flink 完成数仓 ODS 到 DWD 层,DWS 和 ADS 的计算。通过使用 StarRocks 对各层数仓做统一的 OLAP 查询和 ADS 层在线分析。基于 Paimon 可以实现高吞吐入湖;基于 Flink 可以实现全链路的流批一体计算,基于 StarRocks 可以实现高性能的 OLAP 查询,所以整个链路从实时性、时效性、成本几个方面都可以取得比较好的平衡。
使用 StarRocks 统一管理数据湖和数据仓库,将高并发和实时性要求很高的业务放在 StarRocks 中分析,也可以使用 External Catalog 和外部表进行数据湖上的分析。StarRocks 从 3.1 版本开始支持 Paimon Catalog。
Paimon Catalog 是一种 External Catalog。通过 Paimon Catalog,您不需要执行数据导入就可以直接查询 Apache Paimon 里的数据。
在数据湖仓场景下,使用 Flink 可以完成复杂的数据拼接以及聚合计算,并且达到很高的实时性的要求。另外,实时链路在使用的过程中不可避免的会因为一些数据延迟等问题导致会有数据修正和数据回溯的需求。Flink 流批一体的特性能够让用户方便的使用与实时链路一样的作业代码,高效地完成数据修正和数据回溯的需求。

2.演示架构

通过flink-cdc 监听MySQL Binlog数据同步到Paimon ODS层,然后进行DWD数据清洗宽表打通,再到DWS层进行多维度汇总聚合,最后同ADS层进行数据呈现.其中用到streamPark进行作业编排.
2.1 组件使用版本

  • flink1.18.1
  • paimon0.8
  • fink-cdc3.1
  • streamPark2.1.4
  • starRocks3.1
    安装方式请自行安装

2.2 场景说明
在mysql创建3张表:用户表users,订单表orders,商品表products,订单详情表order_details
分析3个指标: 用户的总购买金额 产品的销售数量 用户的平均订单金额
在执行下面操作前,确保mysql,flink,streamPark,starRocks已经启动.

步骤一:准备演示数据

  1. 在MySQL中执行以下命令,创建数据表。
use emp;-- 用户表
CREATE TABLE users (user_id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '用户ID,主键',user_name VARCHAR(50) NOT NULL COMMENT '用户名',email VARCHAR(100) NOT NULL COMMENT '邮箱',registration_date DATE NOT NULL COMMENT '注册日期',PRIMARY KEY (`user_id`) USING BTREE
) COMMENT '用户表';INSERT INTO users (user_id, user_name, email, registration_date) VALUES (1, '张山', 'alice@example.com', '2023-01-15');
INSERT INTO users (user_id, user_name, email, registration_date) VALUES (2, '李四', 'bob@example.com', '2023-02-20');
INSERT INTO users (user_id, user_name, email, registration_date) VALUES (3, '刘博', 'charlie@example.com', '2023-03-10');-- 订单表
CREATE TABLE orders (order_id bigint AUTO_INCREMENT PRIMARY KEY COMMENT '订单ID,主键',user_id INT NOT NULL COMMENT '用户ID,外键,关联到users表',order_date DATE NOT NULL COMMENT '订单日期',total_amount DECIMAL(10, 2) NOT NULL COMMENT '订单总金额',PRIMARY KEY (`order_id`) USING BTREE
) COMMENT '订单表';-- 演示数据
INSERT INTO orders (user_id, order_date, total_amount) VALUES
(1, '2023-04-01', 150.00),
(2, '2023-04-05', 200.00),
(3, '2023-04-10', 250.00),
(1, '2023-04-15', 300.00);-- 商品表
CREATE TABLE products (product_id bigint AUTO_INCREMENT PRIMARY KEY COMMENT '产品ID,主键',product_name VARCHAR(100) NOT NULL COMMENT '产品名',price DECIMAL(10, 2) NOT NULL COMMENT '产品价格',PRIMARY KEY (`product_id`) USING BTREE
) COMMENT '产品表';-- 演示数据
INSERT INTO products (product_name, price) VALUES
('笔记本', 50.00),
('手表', 75.00),
('耳机', 100.00);-- 订单详情表
CREATE TABLE order_details (order_detail_id bigint AUTO_INCREMENT PRIMARY KEY COMMENT '订单详情ID,主键',order_id INT NOT NULL COMMENT '订单ID,外键,关联到orders表',product_id INT NOT NULL COMMENT '产品ID,外键,关联到products表',quantity INT NOT NULL COMMENT '购买数量',subtotal DECIMAL(10, 2) NOT NULL COMMENT '小计金额(quantity * price)',PRIMARY KEY (`order_detail_id`) USING BTREE
) COMMENT '订单详情表';-- 演示数据
INSERT INTO order_details (order_id, product_id, quantity, subtotal) VALUES
(1, 1, 2, 100.00),
(1, 2, 1, 50.00),
(2, 1, 1, 50.00),
(2, 3, 2, 150.00),
(3, 2, 2, 150.00),
(3, 3, 1, 100.00),
(4, 3, 3, 300.00);

步骤二:mysql数据同步paimon

确保mysql已经开启binlog

1.编写flink-cdc同步任务,在flink-cdc的创建job文件夹,然后在里面创建mysql-to-paimon.yml

source:
type: mysql
name: MySQL Source
hostname: 192.168.1.72
port: 3306
username: root
password: dory@2022
tables: emp.users,emp.products,emp.order_details,emp.orders
server-id: 5401-5404sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /opt/software/paimonpipeline:
name: MySQL to Paimon Pipeline
parallelism: 1
  1. 在 flink-cdc的lib文件夹下添加:
    flink-cdc-pipeline-connector-mysql-3.1.0.jar
    flink-cdc-pipeline-connector-paimon-3.1.0.jar
    mysql-connector-java-8.0.27.jar
  2. 在保证flink集群启动的情况下,进行启动flink-cdc
 ./bin/flink-cdc.sh job/mysql-to-paimom.yaml --jar lib/mysql-connector-java-8.0.27.jar
# 执行成功会出现jobid
Pipeline has been submitted to cluster.
Job ID: b68bfad5753ae600eeb1efed17d957ff
Job Description: MySQL to Paimon Pipeline

4.来到flink工作台进行查询任务
image.png

5.在服务器上查看同步文件信息

 cd /opt/software/paimon/cd emp.db/ls#显示已经同步过来
order_details  orders  products  users

已经完成ODS层数据同步.

步骤三: DWD数据清洗宽表打通

1.打开streamPark,进行开始编写flink sql
image.png

SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 创建CATALOG
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'file:/opt/software/paimon'
);-- 切换CATALOG
USE CATALOG paimon_catalog;
create DATABASE IF NOT EXISTS emp;
-- 切换database
use emp;-- 创建dwd_user_orders表
CREATE TABLE IF NOT EXISTS dwd_user_orders (
order_id bigint,
user_id bigint,
user_name STRING,
order_date date,
total_amount decimal,
PRIMARY KEY (order_id) NOT ENFORCED
);-- 创建dwd_orders_products_details表
CREATE TABLE IF NOT EXISTS dwd_orders_products_details (
order_detail_id bigint,
order_id bigint,
product_id bigint,
product_name STRING,
price decimal,
quantity bigint,
subtotal decimal,
PRIMARY KEY (order_detail_id) NOT ENFORCED
);INSERT INTO
dwd_user_orders
SELECT
o.order_id,o.user_id,u.user_name,o.order_date,o.total_amount
FROM orders o join users u ON o.user_id=u.user_id;
INSERT INTO
dwd_orders_products_details
SELECT
d.order_detail_id,d.order_id,d.product_id,p.product_name,p.price,d.quantity,d.subtotal
FROM order_details d join products p ON p.product_id=d.product_id;

发布启动任务
image.png

flink-web-ui查看任务
image.png

步骤三:进行维度分析

创建DWS层进行多维度汇总聚合,还是在streamPark编写DWS层任务

统计维度指标:

  • 用户的总购买金额
  • 产品的销售数量
  • 订单的平均金额
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 创建CATALOG
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'file:/opt/software/paimon'
);-- 切换CATALOG
USE CATALOG paimon_catalog;
create DATABASE IF NOT EXISTS emp;
-- 切换database
use emp;
-- 创建用户的总购买金额表
CREATE TABLE IF NOT EXISTS dws_user_total_amount (
user_id bigint,
user_name STRING,
total_spent decimal,
PRIMARY KEY (user_id) NOT ENFORCED
);-- 创建产品的销售数量
CREATE TABLE IF NOT EXISTS dws_product_sales_quantity (
product_id bigint,
product_name STRING,
total_quantity BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
);-- 创建订单的平均金额
CREATE TABLE IF NOT EXISTS dws_order_average_amount (
order_id bigint,
average_order_amount decimal,
PRIMARY KEY (order_id) NOT ENFORCED
);-- 用户的总购买金额
INSERT INTO
dws_user_total_amount
SELECT user_id,user_name, sum(total_amount) AS total_spent
FROM dwd_user_orders
group by user_id,user_name;
-- 产品的销售数量
INSERT INTO
dws_product_sales_quantity
SELECT product_id,product_name,SUM(quantity) AS total_quantity
FROM dwd_orders_products_details
group by product_id,product_name;
-- 订单的平均金额
INSERT INTO
dws_order_average_amount
SELECT order_id,AVG(total_amount) AS average_order_amount
FROM dwd_user_orders
group by order_id;

发布启动任务
image.png

flink-web-ui查看任务
image.png

步骤四:ADS查看维度结果数据

这里要使用starRocks进行查询paimon catalog数据表.在starRock 中paimon catalog是一种外部catalog.可以直接进行查询数据.
保证starRock正常启动.安装方式参考:https://www.cnblogs.com/freeweb/p/18137023
DBeaver连接上starRocks
image.png

查询对应维度数据
image.png

-- 查询用户的总购买金额
SELECT * FROM paimon_catalog.emp.dws_user_total_amount;
-- 结果
user_id|user_name|total_spent|
-------+---------+-----------+1|张山       |        450|2|李四       |        200|3|刘博       |        250|-- 查询产品的销售数量
SELECT * FROM paimon_catalog.emp.dws_product_sales_quantity;
-- 结果
product_id|product_name|total_quantity|
----------+------------+--------------+1|笔记本         |             3|2|手表          |             3|3|耳机          |             6|
-- 查询订单的平均金额
SELECT * FROM paimon_catalog.emp.dws_order_average_amount;
-- 结果
order_id|average_order_amount|
--------+--------------------+1|                 150|2|                 200|3|                 250|4|                 300|

步骤五: 演示数据实时更新

在mysql表进行修改数据查询维度表数据是否发生计算结果变更
添加一条人员信息,产品信息,订单信息,订单详情信息,看维度表数据是否发生变化

  1. 在mysql中添加下面数据
INSERT INTO users (user_id, user_name, email, registration_date) 
VALUES (4, '刘晓天', 'charlie@example.com', '2024-06-17');INSERT INTO orders (order_id,user_id, order_date, total_amount) VALUES
(5,4, '2024-06-17', 1800000.00);INSERT INTO products (product_id,product_name, price) VALUES
(4,'天启坦克', 1800000.00);INSERT INTO order_details (order_id, product_id, quantity, subtotal) VALUES
(5, 4, 1, 1800000.00);

sleep 5s

  1. 查看维度分析结果,已经发生结果变化.

删除,修改mysql表同样会触发维度结果变化
image.png

image.png
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/28886.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaWeb项目-ssm+vue大学生心理健康管理系统功能介绍

本项目源码:java-springboot大学生心理健康管理系统的设计与实现源码资源-CSDN文库 项目关键技术 开发工具:IDEA 、Eclipse 编程语言: Java 数据库: MySQL5.7 框架:ssm、Springboot 前端:Vue、ElementUI 关键技术:spr…

celery使用 Zookeeper 或 kafka 作为broker,使用 mysql 作为 backend

背景介绍: 先贴上celery官方文档:Celery - Distributed Task Queue — Celery 5.4.0 documentation xxx项目中单点环境运行celery + redis,使用流畅,不做过多介绍。 切换高可用环境时,客户redis使用的是cluster集群,官方文档中并没有对redis cluster的支持,查看githu…

Kubernetes 1.18 部署 Traefik2.0

Kubernetes 1.18部署 Traefik2.0 参考资料: Traefik 2.0 官方文档:https://doc.traefik.io/traefik/v2.0/Kubernetes 1.18.3 部署 Traefik2.0:https://www.cnblogs.com/heian99/p/14608414.html 1. Traefik 介绍 traefik 是一款反向代理、…

CRMEB多门店的门店后台首页路由

如何在输入 http://localhost:8080/、http://localhost:8080/store/、http://localhost:8080/custom-store/ 这三个中任意一个链接都能正确跳转到 http://localhost:8080/store/home/index 。要实这个要求,有两种方式: 重定向 const router new VueRo…

App端接口用例设计方法和测试方法

🍅 视频学习:文末有免费的配套视频可观看 🍅 点击文末小卡片 ,免费获取软件测试全套资料,资料在手,涨薪更快 前言 接口测试作为测试的重要一环,重点关注的是数据层面的输入输出,今天…

【JVM】JVisualVM的介绍、使用和GC过程

VisualVM介绍 VisualVM 是Netbeans的profile子项目,已在JDK6.0 update 7 中自带,能够监控线程,内存情况,查看方法的CPU时间和内存中的对 象,已被GC的对象,反向查看分配的堆栈(如100个String对象分别由哪几…

eclipse maven打包报错: 致命错误: 在类路径或引导类路径中找不到程序包 java.lang的解决

还是上来帖张图: 1、系统之前是运行在mac上的,打包一切正常,但是现在在win11的eclipse打包就报错了。 2、致命错误: 在类路径或引导类路径中找不到程序包 java.lang,上面的问题应该是找不到java中的jar中的class导致。 解决&…

【C++】模板初级

【C】模板初级 泛型编程函数模板函数模板的概念函数模板格式函数模板的原理函数模板的实例化模板参数的匹配原则 类模板类模板格式类模板的实例化 泛型编程 当我们之前了解过函数重载后可以知道,一个程序可以出现同名函数,但参数类型不同。 //整型 voi…

树形喇叭状异形创意LED显示屏正在成为设计师们手中的神来之笔

异形创意LED显示屏以其独特的形状和强大的视觉冲击效果,正逐渐改变着我们的视觉体验。不同于传统的矩形、平面板状的LED显示屏,异形屏以其形状各异、造型奇特的特点,为商业显示、展览展示、文旅旅游等行业带来了全新的变化。本文将重点介绍异…

解决ubuntu22.04共享文件夹问题

刚开机发现ubuntu里面的共享文件夹访问不了了 ubuntuwxy:/mnt/hgfs$ ls找了几篇博客,设置如下指令即可,记得退出当前目录重新进入刷新一下 sudo vmhgfs-fuse .host:/ /mnt/hgfs/ -o allow_other -o uid1000 仅供参考

【YOLOv8改进[注意力]】在YOLOv8中添加ECA高效通道注意力(2020.4)的实践 + 含全部代码和详细修改方式 + 手撕结构图

本文将进行在YOLOv8中添加ECA高效通道注意力的实践,助力YOLOv8目标检测效果的实践,文中含全部代码、详细修改方式以及手撕结构图。助您轻松理解改进的方法。 改进前和改进后的参数对比: 目录 一 ECA 二 在YOLOv8中添加ECA注意力

高速公路视频监控系统与车牌抓拍:EasyCVR视频监控技术助力交通道路安全监控

随着科技的不断发展,高速公路视频监控与车牌抓拍系统作为智能交通的重要组成部分,日益发挥着不可或缺的作用。这些先进的技术不仅提高了道路交通的管理效率,也为保障行车安全提供了新的手段。 高速公路视频监控系统的应用,极大地…

手表化身车钥匙:智慧控车,优雅随行

智能汽车时代来临,传统车钥匙正在逐渐被取代。HUAWEI WATCH 4 Pro及HUAWEI WATCH Ultimate系列手表配对问界M9等,不仅可以化身 UWB 数字车钥匙,无感解锁车辆,还可以实现智能语音控车等功能,让你从容出行,优…

使用 C# 学习面向对象编程:第 7 部分

多态性 我们在程序中使用多态的频率是多少?多态是面向对象编程语言的第三大支柱,我们几乎每天都在使用它,却不去想它。 这是一个非常简单的图表,它将解释多态性本身。 简单来说,我们可以说,只要我们重载类…

iconfont的使用(超简单)

iconfont的使用(超简单) 1、iconfont 是什么?2、使用2.1、新建项目2.2、搜图标 添加 至项目中2.3、下载iconfont的包文件![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/91a0a07cd4b74798b7fb333dddca7724.png)2.4、画一个文件夹…

【Qt项目专栏】贪吃蛇小游戏1.0

博客主页:Duck Bro 博客主页系列专栏:Qt 专栏关注博主,后期持续更新系列文章如果有错误感谢请大家批评指出,及时修改感谢大家点赞👍收藏⭐评论✍ 贪吃蛇小游戏1.0 项目编号:01 文章目录 贪吃蛇小游戏1.0一…

我一直看不明白:“C++会被java/python等这些语言替代”

在开始前刚好我有一些资料,是我根据网友给的问题精心整理了一份「C的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!! 有些程序,是既可以…

使用 Cheerio 和 Node.js 进行网络搜刮 2024

Web scraping 是一种强大的技术,用于从网站提取数据,广泛应用于数据分析、市场研究和内容聚合。截至2024年,利用 Cheerio 和 Node.js 进行 web scraping 仍然是一种流行且高效的方法。本文将深入探讨使用 Cheerio 和 Node.js 进行 web scrapi…

SRAM和DRAM

1.SRAM(静态RAM) 把存放一个二进制位的物理器件称为存储元,它是存储器最基本的构件。 地址码相同的多个存储元构成一个存储单元。 存储单元的集合构成存储体。 静态RAM的存储元是用双稳态触发器(六晶体管MOS)来记忆…

C#结合JS 修改解决 KindEditor 弹出层问题

目录 问题现象 原因分析 范例运行环境 解决问题 修改 kindeditor.js C# 服务端更新 小结 问题现象 KindEditor 是一款出色的富文本HTML在线编辑器,关于编辑器的详细介绍可参考我的文章《C# 将 TextBox 绑定为 KindEditor 富文本》,这里我们讲述在…