[实时计算flink]基于Paimon的数据库实时入湖快速入门

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。本文通过Paimon Catalog和MySQL连接器,将云数据库RDS中的订单数据和表结构变更导入Paimon表中,并使用Flink对Paimon表进行简单分析。

背景信息

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。目前阿里云实时计算Flink版,以及开源大数据平台E-MapReduce上常见的计算引擎(例如Spark、Hive或Trino)都与Paimon有着较为完善的集成度。您可以借助Apache Paimon快速地在HDFS或者OSS上构建自己的数据湖存储服务,并接入计算引擎实现数据湖的分析。

前提条件

  • 如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。

  • 已创建Flink工作空间,详情请参见开通实时计算Flink版。

步骤一:准备数据源

  1. 快速创建RDS MySQL实例。

    说明

    RDS MySQL版实例需要与Flink工作空间处于同一VPC。不在同一VPC下时请参见网络连通性。

  2. 创建数据库和账号。

    创建名称为orders的数据库,并创建高权限账号或具有数据库orders读写权限的普通账号。

  3. 通过DMS登录RDS MySQL,在orders数据库中创建表orders_1和orders_2。

    CREATE TABLE `orders_1` (orderkey BIGINT NOT NULL,custkey BIGINT,order_status VARCHAR(100),total_price DOUBLE,order_date DATE,order_priority VARCHAR(100),clerk VARCHAR(100),ship_priority INT,comment VARCHAR(100),PRIMARY KEY (orderkey)
    );CREATE TABLE `orders_2` (orderkey BIGINT NOT NULL,custkey BIGINT,order_status VARCHAR(100),total_price DOUBLE,order_date DATE,order_priority VARCHAR(100),clerk VARCHAR(100),ship_priority INT,comment VARCHAR(100),PRIMARY KEY (orderkey)
    );
  4. 插入如下测试数据。

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

步骤二:创建Catalog

  1. 进入元数据管理页面。

    1. 登录实时计算控制台。

    2. 单击目标工作空间操作列下的控制台

    3. 单击左侧的元数据管理

  2. 创建Paimon Catalog。

    1. 单击创建Catalog内置Catalog页签,选择Apache Paimon后,单击下一步。

    2. 填写配置信息。

      image.png

      配置项

      说明

      备注

      catalog name

      您自定义的Paimon Catalog名称。

      填写为自定义的英文名。

      metastore

      Paimon表的元数据存储类型:

      • filesystem:仅将元数据存储在OSS中。

      • dlf:除了将元数据存储在OSS上外,还会将元数据同步到阿里云数据湖构建服务DLF中。

      本文选择filesystem。

      warehouse

      Paimon Catalog的存储根目录,是一个OSS目录。可以选择创建实时计算Flink版时使用的OSS Bucket,也可以使用同一账号同一地域下的其他OSS Bucket。

      格式为oss://<bucket>/<object>。其中:

      • bucket:表示您创建的OSS Bucket名称。

      • object:表示您存放数据的路径。

      您可以在OSS管理控制台上查看您的bucket和object名称。

    3. 单击确定

步骤三:创建Flink作业

  1. 数据开发 > ETL页面,单击新建

  2. 选择空白的流作业草稿,单击下一步

  3. 新建作业草稿对话框,填写作业配置信息。

    作业参数

    说明

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    存储位置

    指定该作业的存储位置。

    您还可以在现有文件夹右侧,单击

    新建文件夹

    图标,新建子文件夹。

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。

  4. 单击创建

  5. 输入以下语句,实时捕获orders数据库中相关表的变化,并同步到Paimon表中。

    -- 使用刚刚创建的Paimon Catalog
    USE CATALOG `test`;-- 创建一张MySQL临时表,捕获表名符合正则表达式orders_\d+的MySQL表的变化
    CREATE TEMPORARY TABLE mysql_orders (orderkey BIGINT,custkey BIGINT,order_status VARCHAR(100),total_price DOUBLE,order_date DATE,order_priority VARCHAR(100),clerk VARCHAR(100),ship_priority INT,`comment` VARCHAR(100),PRIMARY KEY (orderkey) NOT ENFORCED
    ) WITH ('connector' = 'mysql','hostname' = 'rm-bp1s1xgll21ey****.mysql.rds.aliyuncs.com','port' = '3306','username' = 'your_username','password' = '${secret_values.mysql_pw}','database-name' = 'orders','table-name' = 'orders_\d+','server-time-zone' = 'Asia/Shanghai'
    );-- 将MySQL表的变化同步到Paimon表中
    CREATE TABLE IF NOT EXISTS orders AS TABLE mysql_orders;

    ​参数说明如下,您可以根据实际情况进行修改。MySQL连接器更多参数详情请参见MySQL。

    参数

    说明

    备注

    connector

    连接器类型。

    本示例固定值为mysql

    hostname

    MySQL数据库的IP地址或者Hostname。

    本文填写为RDS实例的内网地址。

    username

    MySQL数据库服务的用户名。

    无。

    password

    MySQL数据库服务的密码。

    本示例通过使用名为mysql_pw密钥的方式填写密码值,避免信息泄露,详情请参见变量管理。

    database-name

    MySQL数据库名称。

    本示例填写为步骤一:准备数据源中创建的数据库。

    table-name

    MySQL表名。

    作为源表时,表名支持正则表达式以读取多个表的数据。

    port

    MySQL数据库服务的端口号。

    无。

  6. (可选)单击右上方的深度检查,确认作业Flink SQL语句中是否存在语法错误。

  7. 单击右上方的部署,单击确定

  8. 在左侧导航栏,单击运维中心 > 作业运维,单击目标作业名称,进入作业部署详情页面。

  9. 单击运行参数配置区域右侧的编辑

    本文为了更快观察到任务运行的结果,将系统检查点间隔两次系统检查点之间的最短时间间隔均改为10s,单击保存

    image

  10. 在目标作业部署详情页顶部,单击启动,选择无状态启动

    image.png

  11. 查询Paimon数据。

    1. 数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行

      select custkey, sum(total_price) from `test`.`default`.`orders` group by custkey;
    2. 结果浏览完成后,单击左侧的

      image.png

      停止调试。

      image.png

步骤四:更新MySQL表结构

本部分将演示MySQL表结构变更同步到Paimon表的功能。

  1. 登录云数据库RDS控制台。

  2. 在orders数据库,输入如下SQL语句,然后单击执行,为两张数据表添加一列,并填充一些数据。

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. 在实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行

    select * from `test`.`default`.`orders` where `quantity` is not null;

    结果如下,浏览完成后,可单击左侧的

    image.png

    停止调试。

    Image 32

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(46)MATLAB仿真从正弦波转换为方波

文章目录 前言一、MATLAB代码二、仿真结果画图三、吉布斯效应 前言 本文使用MATLAB仿真的方法&#xff0c;给出从正弦波转换为方波的过程&#xff0c;说明方波的傅里叶级数展开式是如何由奇次谐波的和构成的。另外&#xff0c;说明了在此过程中的吉布斯效应。 一、MATLAB代码 …

pm2 部署vue

1、为什么要使用pm2运行vue项目 为什么&#xff01;&#xff01;&#xff01;我们一般是将打出来的DIST目录上传到服务器发布即可&#xff0c;为啥我会使用PM2来运行部署呢&#xff1f; 前提&#xff1a;vue2mysqlexpress不使用中间服务器&#xff0c;即不要后端人员开发接口服…

Bands Page 乐队页面

“带区”页面提供了用于添加和删除带区、自定义带区设置以及更改带区和列布局的设计时工具。此页面如下图所示。 该页面说明了一个预览部分、一个用于访问所选频段设置的属性网格以及一组按钮&#xff0c;这些按钮提供了下面列表中描述的功能。 添加新乐队…- 创建新带。创建新…

Elasticsearch使用实战以及代码详解

Elasticsearch 是一个使用 Java 语言编写、遵守 Apache 协议、支持 RESTful 风格的分布式全文搜索和分析引擎&#xff0c;它基于 Lucene 库构建&#xff0c;并提供多种语言的 API。Elasticsearch 可以对任何类型的数据进行索引、查询和聚合分析&#xff0c;无论是文本、数字、地…

【论文学习与撰写】,论文word文档中出现乱码的情况,文档中显示的乱码,都是英文字母之类的,但打印预览是正常的

目录 1、问题 2、解决方法 1、问题 写论文的时候&#xff0c;有时会出现乱码的情况&#xff0c; 如下图&#xff0c;这种情况&#xff0c; 可是 在打印预览的时候&#xff0c;就显示的正常 如下图&#xff0c; 2、解决方法 既然是文档正文显示错误&#xff0c;显示乱码&…

【HarmonyOS NEXT】服务端向终端推送消息——获取Push Token

【需求】 获取Push Token 【文档】 https://developer.huawei.com/consumer/cn/doc/harmonyos-guides-V5/push-get-token-V5 【代码】 // EntryAbility.ets 文件 import { pushService } from kit.PushKit; export default class EntryAbility extends UIAbility {onCreat…

【详解】下载MySql安装教程(帮助数据库下载)

此版本是我下载的版本&#xff0c;其他版本均可以。 1.官网下载相应的版本&#xff1a;MYSQL&#xff1a;8.0.33 https://www.mysql.com/ 2.点击DOWNLOADS进入 3.在上述界面当中往下翻&#xff0c;找到社区版的下载界面 4.点进社区版的界面 前三个是Linux系统下的安装&a…

1.centos 镜像

centos 它有官网的下载地址&#xff1a;https://vault.centos.org/ 选择想要的版本&#xff0c;我选择 centos7.8 进入到镜像目录 isos 选择 x86_64 选择想要的版本&#xff0c;我选择 CentOS-7-x86_64-DVD-2003.iso 安装就正常安装就行。我选择虚拟机安装。这个参考&…

git的安装以及入门使用

文章目录 git的安装以及入门使用什么是git&#xff1f;git安装git官网 git初始化配置使用方式初始化配置&#xff1a; git的安装以及入门使用 什么是git&#xff1f; Git 是一个免费开源的分布式版本控制系统&#xff0c;使用特殊的仓库数据库记录文件变化。它记录每个文件的…

前端开发设计模式——状态模式

目录 一、状态模式的定义和特点 二、状态模式的结构与原理 1.结构&#xff1a; 2.原理&#xff1a; 三、状态模式的实现方式 四、状态模式的使用场景 1.按钮的不同状态&#xff1a; 2.页面加载状态&#xff1a; 3.用户登录状态&#xff1a; 五、状态模式的优点 1.提…

Matplotlib和Seaborn数据可视化

目录 Matplotlib图表绘制 准备工作 折线图line 柱状图bar 水平条形图barh 饼图pie 散点图scatter 气泡图csatter 箱线图boxplot 直方图hist 蜂巢图hexbin Seaborn图表绘制 准备数据 关系散点图scatterplot 关系散点线形图replot 分类散点图stripplot 分类小提…

Flink窗口分配器WindowAssigner

前言 Flink 数据流经过 keyBy 分组后&#xff0c;下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口&#xff0c;元素可以被分发到一个或多个窗口中&#xff0c;Flink 内置了常用的窗口分配器&#xff0c;包括&#xff1a;tumbling wi…

前缀和和差分算法

文章目录 一维前缀和一维前缀和概念一维前缀和数组的构建 二维前缀和二维前缀和概念二维前缀和数组的构建 一维差分一维差分概念一维差分数组的构建 二维差分二维差分概念二维差分数组的构建 一维前缀和 一维前缀和概念 一维前缀和是一种常用的数据预处理方法&#xff0c;它能…

JS事件和DOM

1. DOM 1.1 基本概念 DOM&#xff0c;全称 Document Object Model&#xff0c;即文档对象模型。它是 Web 上最常用的 API 之一&#xff0c;是加载在浏览器中的文档模型&#xff0c;可以将文档表示为节点树&#xff08;或称 DOM 树&#xff09;&#xff0c;其中每个节点代表文…

【协议】IIC总线协议学习

一、IIC基本介绍 设计I2C的初衷是减少电视机等复杂电子系统内部的布线数量&#xff0c;同时也降低制造成本。通过使用只有两根线的通信总线&#xff0c;它有效地减少了器件间连接的复杂性。 IIC总线是两线制总线&#xff0c;仅有串行数据线SDA和串行时钟线SCL进行通信。减少…

【Python实例】Python读取并绘制tif数据

【Python实例】Python读取并绘制tiff数据 Python实例-以全球不透水面积数据为例数据准备&#xff1a;全球不透水面积数据基于gdal库绘制tif图基于Rasterio库绘制tif图 参考 GeoTIff 是一个标准的.tif 文件或是一个图像文件格式&#xff0c;它包含了一些额外的空间信息&#xff…

prompt learning

prompt learning 对于CLIP&#xff08;如上图所示&#xff09;而言&#xff0c;对其prompt构造的更改就是在zero shot应用到下游任务的时候对其输入的label text进行一定的更改&#xff0c;比如将“A photo of a{obj}”改为“[V1][V2]…[Vn][Class]”这样可学习的V1-Vn的token…

业务开发常见问题-并发工具类

hello&#xff0c;大家好&#xff0c;本讲我们一起聊一下常见的几个并发工具类的使用和坑&#xff01; 在日常工作中&#xff0c;我们经常会遇到多线程并发问题&#xff0c;比如ThreadLocal、锁、ConcurrentHashMap、CopyOnWriteArrayList等。那么如何正常的使用呢&#xff1f;…

【最新通知】2024年Cisco思科认证CCNA详解

CCNA现在涵盖安全性、自动化和可编程性。该计划拥有一项涵盖IT职业基础知识的认证&#xff0c;包括一门考试和一门培训课程&#xff0c;助您做好准备。 CCNA培训课程和考试最近面向最新技术和工作岗位进行了重新调整&#xff0c;为您提供了向任何方向发展事业所需的基础。CCNA认…

blender分离含有多个动作的模型,并导出含有材质的fbx模型

问题背景 笔者是模型小白&#xff0c;需要将网络上下载的fbx模型中的动作&#xff0c;分离成单独的动作模型&#xff0c;经过3天摸爬滚打&#xff0c;先后使用了blender&#xff0c;3d max&#xff0c;unity&#xff0c;最终用blender完成&#xff0c;期间参考了众多网络上大佬…