【大数据】基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

  • 1.准备阶段
    • 1.1 准备教程所需要的组件
    • 1.2 下载 Flink 和所需要的依赖包
    • 1.3 准备数据
      • 1.3.1 在 MySQL 数据库中准备数据
      • 1.3.2 在 Postgres 数据库中准备数据
  • 2.启动 Flink 集群和 Flink SQL CLI
  • 3.在 Flink SQL CLI 中使用 Flink DDL 创建表
  • 4.关联订单数据并且将其写入 Elasticsearch 中
  • 5.环境清理

这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java / Scala 代码,也无需安装 IDE。

假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。 对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。

接下来的内容将介绍如何使用 Flink MySQL / Postgres CDC 来实现这个需求,系统的整体架构如下图所示:

在这里插入图片描述

1.准备阶段

准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:postgres:image: debezium/example-postgres:1.1ports:- "5432:5432"environment:- POSTGRES_DB=postgres- POSTGRES_USER=postgres- POSTGRES_PASSWORD=postgresmysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpwelasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeports:- "9200:9200"- "9300:9300"ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- "5601:5601"

该 Docker Compose 中包含的容器有:

  • MySQL商品表 products订单表 orders 将存储在该数据库中, 这两张表将和 Postgres 数据库中的物流表 shipments 进行关联,得到一张包含更多信息的订单表 enriched_orders
  • Postgres:物流表 shipments 将存储在该数据库中。
  • Elasticsearch:最终的订单表 enriched_orders 将写到 Elasticsearch。
  • Kibana:用来可视化 ElasticSearch 的数据。

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

1.2 下载 Flink 和所需要的依赖包

下载 Flink 1.18.0 并将其解压至目录 flink-1.18.0

下载以下列出的依赖包,并将它们放到目录 flink-1.18.0/lib/ 下:

  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
  • flink-sql-connector-mysql-cdc-2.5-SNAPSHOT.jar
  • flink-sql-connector-postgres-cdc-2.5-SNAPSHOT.jar

注:下载链接只对已发布的版本有效,SNAPSHOT 版本需要本地基于 masterrelease 分支编译。

1.3 准备数据

1.3.1 在 MySQL 数据库中准备数据

进入 MySQL 容器

docker-compose exec mysql mysql -u root -p 123456

创建数据库和表 productsorders,并插入数据。

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

1.3.2 在 Postgres 数据库中准备数据

进入 Postgres 容器:

docker-compose exec postgres psql -h localhost -U postgres

创建表 shipments,并插入数据。

-- PG
CREATE TABLE shipments (shipment_id SERIAL NOT NULL PRIMARY KEY,order_id SERIAL NOT NULL,origin VARCHAR(255) NOT NULL,destination VARCHAR(255) NOT NULL,is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),(default,10002,'Hangzhou','Shanghai',false),(default,10003,'Shanghai','Hangzhou',false);

2.启动 Flink 集群和 Flink SQL CLI

使用下面的命令跳转至 Flink 目录下

cd flink-1.18.0

使用下面的命令启动 Flink 集群

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

在这里插入图片描述
使用下面的命令启动 Flink SQL CLI

./bin/sql-client.sh

启动成功后,可以看到如下的页面:
在这里插入图片描述

3.在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔 3 秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 productsordersshipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

-- Flink SQL
Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'products');Flink SQL> CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders');Flink SQL> CREATE TABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (shipment_id) NOT ENFORCED) WITH ('connector' = 'postgres-cdc','hostname' = 'localhost','port' = '5432','username' = 'postgres','password' = 'postgres','database-name' = 'postgres','schema-name' = 'public','table-name' = 'shipments','slot.name' = 'flink');

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中。

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,shipment_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders');

4.关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中。

-- Flink SQL
Flink SQL> INSERT INTO enriched_ordersSELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrivedFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.idLEFT JOIN shipments AS s ON o.order_id = s.order_id;

现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern,创建 index patternenriched_orders

在这里插入图片描述

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了。

在这里插入图片描述

接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana 中显示的订单数据也将实时更新:

在 MySQL 的 orders 表中插入一条数据:

--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

在 Postgres 的 shipment 表中插入一条数据:

--PG
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);

在这里插入图片描述

在 MySQL 的 orders 表中更新订单的状态:

--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;

在 Postgres 的 shipment 表中更新物流的状态:

--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

在这里插入图片描述

在 MySQL 的 orders 表中删除一条数据:

--MySQL
DELETE FROM orders WHERE order_id = 10004;

每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:

在这里插入图片描述

5.环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

在 Flink 所在目录 flink-1.18.0 下执行如下命令停止 Flink 集群:

./bin/stop-cluster.sh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/603790.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STL标准库与泛型编程(侯捷)笔记1

STL标准库与泛型编程(侯捷) 本文是学习笔记,仅供个人学习使用。如有侵权,请联系删除。 参考链接 Youbute: 侯捷-STL标准库与泛型编程 B站: 侯捷 - STL Github:STL源码剖析中源码 https://github.com/SilverMaple/STLSourceCo…

Java流程控制的陷阱

文章目录 1. switch中break的作用2. switch支持的数据类型3. else隐含的条件4. 省略花括号的陷阱5. for循环的结构6. 使用标签跳出双层for循环 流程控制三种:顺序结构、分支结构、循环结构 分支机构两种:if语句、switch语句 循环结构:while循…

JumpServer一键安装脚本

JumpServer 一键安装命令如下: curl -sSL https://resource.fit2cloud.com/jumpserver/jumpserver/releases/latest/download/quick_start.sh | bash上述quick_start.sh脚本详细内容如下: #!/usr/bin/env bash #VERSIONv3.10.1 DOWNLOAD_URLhttps://re…

【APACHE】的认识和基础配置参数

#主页传送:江南的江 #每日鸡汤:人生没有如果和假设,只有后果和结果。生活有进有退,输什么也不能输心情。生活简单就是迷人的,学会简单其实就是不简单。要学会平静地接受现实,学会对自己说声顺其自然,学会坦…

MS4553S用于开漏模式和推拉模式的 2bit 双向电平转换器,可替代TXS0102/PCA9306等

产品简述 MS4553S 是一款双向电平转换器,可以用作混合电压的数字信 号系统中。其使用两个独立构架的电源供电, A 端供电电压范围是 1.65V 到 5.5V , B 端供电电压范围是 2.3V 到 5.5V 。可用在电压为 1.8V 、 2.5V 、 3.3V 和 5V 的信号转…

C++系列十四:结构体

C中的结构体 一、结构体的定义 在C中,结构体是一种自定义的数据类型,它允许我们将不同类型的数据组合在一起。结构体可以包含任意类型的数据,包括基本数据类型、指针、数组、其他结构体等。 定义结构体的语法如下: struct 结构…

目前最完整的WebRTC资源平台 —— 筑梦之路

webrtcwork.com 是一个非常好的网站,笔者从那里获得了很多有价值的学习资源,比如服务器端压力测试,商业WebRTC部署等资料。 地址:webrtcwork - Resources for those working with WebRTC 做个笔记

如何用UE5 的小白人替换成自己的 metahumen 数字人

1、用QuixelBridge 插件导入制作好的metahumen数字人 2、创建项目时如有选择第三人称游戏,在内容目录中找到第三人称游戏小白人的蓝图类,对其进行复制一个,重命名,我这里命名为BP_METAHUMEN, 并移到Metahumen目录下方便…

安全基础~信息搜集3

文章目录 知识补充APP信息搜集php开发学习理解漏洞 知识补充 端口渗透总结 python Crypto报错:https://blog.csdn.net/five3/article/details/86160683 APP信息搜集 1. AppInfoScanner 移动端(Android、iOS、WEB、H5、静态网站)信息收集扫描工具 使用教程 演示&…

第三十八周周报:文献阅读 +BILSTM+GRU+Seq2seq

目录 摘要 Abstract 文献阅读:耦合时间和非时间序列模型模拟城市洪涝区洪水深度 现有问题 提出方法 创新点 XGBoost和LSTM耦合模型 XGBoost算法 ​编辑 LSTM(长短期记忆网络) 耦合模型 研究实验 数据集 评估指标 研究目的 洪…

适合前后端开发的可视化编辑器(拖拽控件)

分享一个面向研发人群使用的前后端分离的低代码软件——JNPF。 JNPF与市面上其他的低代码(轻流、宜搭、微搭、简道云、轻流、活字格等等),后者更倾向于非编程人员使用,让业务线人员自行构建应用程序。而 JNPF 这款低代码产品是面向…

Linux学习记录——삼십유 传输层TCP协议(1)

文章目录 1、TCP协议报文1、报头和有效载荷的分离2、TCP可靠性3、序号和确认序号4、16位窗口大小5、6个标志位和紧急指针 2、TCP可靠性1、应答机制2、超时重传机制3、连接管理机制握手挥手 3、流量控制 1、TCP协议报文 UDP属于TCP/IP协议族。 1、报头和有效载荷的分离 从头…

1、Excel工作场景和知识点总结

参考: 戴师兄–戴你玩转数据分析 Excel发挥战斗力的场景 地量级数据的存储 我们日常所用的各种数据表格,基本都以excel的.xlsx或者.xls格式进行存储。并且因为大家电脑上都有excel,这就使excel的通用性很高(我用excel做好一个表发给你&#x…

uniapp选择android非图片文件的方案踩坑记录

这个简单的问题我遇到下面6大坑,原始需求是选择app如android的excel然后读取到页面并上传表格数据json 先看看效果 uniapp 选择app excel文件读取 1.uniapp自带不支持 uniapp选择图片和视频非常方便自带已经支持可以直接上传和读取 但是选择word excel的时候就出现…

设计模式篇章(3)——七种结构型模式

结构型设计模式主要思考的是如何将对象进行合理的布局来组成一个更大的功能体或者结构体,这个现在讲有点抽象,用大白话讲就是利用现有的对象进行组合或者配合,使得组合后的这个系统更加好。好是相对于不使用设计模式,按照自己的堆…

【小沐学CAD】开源Assimp库导入三维模型(C++、Python)

文章目录 1、简介2、下载编译3、代码测试3.1 C3.2 pyassimp(Python) 结语 1、简介 https://github.com/assimp/assimp Open Asset Import Library 是一个库,用于将各种 3D 文件格式加载为共享的内存格式。它支持 40 多种用于导入的文件格式和…

API集群负载统计 - 华为OD统一考试

OD统一考试 分值: 100分 题解: Java / Python / C++ 题目描述 某个产品的RESTful API集合部署在服务器集群的多个节点上,近期对客户端访问日志进行了采集,需要统计各个API的访问频次,根据热点信息在服务器节点之间做负载均衡,现在需要实现热点信息统计查询功能。 RESTf…

自定义View之重写onMeasure

一、重写onMeasure()来修改已有的View的尺寸 步骤: 重写 onMeasure(),并调用 super.onMeasure() 触发原先的测量用 getMeasuredWidth() 和 getMeasuredHeight() 取到之前测得的尺寸,利用这两个尺寸来计算出最终尺寸使用 setMeasuredDimensio…

鱼类识别Python+深度学习人工智能+TensorFlow+卷积神经网络算法

一、介绍 鱼类识别系统。使用Python作为主要编程语言开发,通过收集常见的30种鱼类(‘墨鱼’, ‘多宝鱼’, ‘带鱼’, ‘石斑鱼’, ‘秋刀鱼’, ‘章鱼’, ‘红鱼’, ‘罗非鱼’, ‘胖头鱼’, ‘草鱼’, ‘银鱼’, ‘青鱼’, ‘马头鱼’, ‘鱿鱼’, ‘鲇…

基于JAVA的中小学教师课程排课系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 角色管理模块2.2 课程档案模块2.3 排课位置模块2.4 排课申请模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 角色表3.2.2 课程表3.2.3 排课位置表3.2.4 排课申请表 四、系统展示五、核心代码5.1 查询课程5.2 新增课…