Flink实时数仓同步:实时表实战详解

一、背景

在大数据领域,初始阶段业务数据通常被存储于关系型数据库,如MySQL。然而,为满足日常分析和报表等需求,大数据平台采用多种同步方式,以适应这些业务数据的不同存储需求。这些同步存储方式包括离线仓库和实时仓库等,选择取决于业务需求和数据特性。

一项常见需求是,业务使用人员需要大数据分析平台中实时查看业务表数据,示例如下:

  1. [Mysql] 业务数据 - 用户表全量数据:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Mysql] 2023-06-02 业务数据新增了一名用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

加粗为更新/新增数据

  1. [大数据平台] 2023-06-02日业务人员在大数据平台中查看用户表实时数据,期望数据和Mysql业务数据一致,如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

根据上述需求,我们可以得出需要构建实时表以满足业务数据的实时分析需求。

二、技术架构

为了实现上述需求,我们可以利用实时同步任务将业务数据实时同步至下游的 MPP(Massively Parallel Processing)库,从而构建实时表。结合市场上常见的技术组件,本文选择了实时引擎 FlinkCDC 和 Doris(MPP)库作为实时同步技术架构。整体架构如下:

在这里插入图片描述

三、实现方式

FlinkCDC 提供了三种实现方式,具体如下:

  1. Flink run jar 模式: 这种模式适用于处理复杂的流数据。当使用简单的 Flink SQL 无法满足复杂业务需求时(例如拉链表等),可以通过编写自定义逻辑的方式,将其打包成 Jar 包并运行。以下是一个示例:
// 示例代码
public class MySqlSourceExample {public static void main(String[] args) throws Exception {// 配置数据源和处理逻辑...// 实时任务启动env.execute("Print MySQL Snapshot + Binlog");}
}

更多信息:MysqlCDC connector

  1. sql脚本模式: bin/sql-client -f file ,这种模式适用于简单的流水任务,例如实时表同步等简单的 ETL 任务。你可以通过编写 SQL 文件并使用 Flink SQL 客户端执行,而无需编写额外的 Java 代码。以下是一个示例:
# 示例 mysql2doris SQL 文件
set 'execution.checkpointing.interval'='30000';create table mysql_user( 
# ...
) WITH ( 
# ...
);create table doris_user( 
# ...
) WITH ( 
# ...
);insert into doris_user select * from mysql_user;

执行如下:

$> bin/sql-client.sh --file /usr/local/flinksql/mysql2doris

更多信息:FlinkSQL 客户端

  1. FlinkCDC Pipeline: 这是 FlinkCDC 3.0 版本引入的全新功能,旨在通过简单的配置即可实现数据同步,无需编写复杂的 Flink SQL。缺点是需要使用 Flink 版本 1.16 或更高版本。以下是一个示例:
# 示例配置文件
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: dorisname: Doris Sinkfenodes: 127.0.0.1:8030username: rootpassword: passpipeline:name: MySQL to Doris Pipelineparallelism: 4

执行如下:

$> bin/flink-cdc.sh mysql-to-doris.yaml

更多信息:FlinkCDC Pipeline

这三种方式各有优劣,可以根据具体需求和场景选择合适的实现方式。考虑到前几篇 Flink 实时数仓同步相关博客都采用了 Jar 包形式,为了给读者带来不同的体验,本文采用 sql脚本模式 模式来实现背景需求。

四、sql脚本模式 + 实时表实现

4.1、实时表设计

背景需求需要实时查看业务表数据,因此在Doris中设计表结构时采用了Unique数据模型。建表语句如下:

CREATE TABLE `example_user_real`
(`id` INT NOT NULL COMMENT '用户id',`name` STRING NULL COMMENT '用户昵称',`phone` STRING NULL COMMENT '手机号',`gender` CHAR(5) NULL COMMENT '用户性别',`create_time` DATETIMEV2(0) NULL COMMENT '用户注册时间',`update_time` DATETIMEV2(0) NULL COMMENT '用户更新时间'
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT '用户实时表'
DISTRIBUTED BY HASH(id) BUCKETS AUTO;

关于mysql type 转换 doris type 可参考 Doris 源码内置转换工具

4.2、实时同步逻辑

  1. 首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  2. 本文采用initial模式同步任务

  3. 编写mysql2doris SQL文件,这里需要注意的是类型转换:由于 mysql2doris 是 Flink SQL 文件,故需要将 mysql type -> flink type 以及 doris type -> flink type,示例如下:

set 'execution.checkpointing.interval'='30000';
set 'state.checkpoints.dir'='file:///home/finloan/flink-1.16.1/checkpoint/mysql2doris';create table mysql_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` CHAR(5),
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0),
PRIMARY KEY(id) NOT ENFORCED
) WITH ( 
'connector'='mysql-cdc',
'hostname'='10.185.163.177',
'port' = '80',
'username'='rouser',
'password'='123456',
'database-name' = 'database',
'table-name'='user'
);create table doris_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` STRING,
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0)
) WITH ( 
'password'='password',
'connector'='doris',
'fenodes'='11.113.208.103:8030',
'table.identifier'='database.user',
'sink.label-prefix'='唯一任务标识,每次启动都要唯一',
'username'='username' 
);insert into doris_user select * from mysql_user;

类型转换参考:

Doris & Flink Column Type Mapping

Mysql CDC Data Type Mapping

  1. 执行命令如下:此时任务已经提交到flink 集群,本文中使用的是Flink-Cluster 模式而非yarn模式
$> ./sql-client.sh -f  ~/mysql2dorisFlink SQL> [INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5c683fba8567e65509870a6db4e99fa5
  1. 登录flinkUi界面查看任务,如下所示:

在这里插入图片描述

  1. 此时Doris 数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Mysql]业务数据2023-06-02日新增了一名tony用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time备注
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00(手机号从333->444)
4tony5552023-06-02 10:00:002023-06-02 10:00:00(新增tony用户)
  1. 此时Doris 数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

五、总结

本文介绍了实时数仓同步的实操案例,通过 FlinkCDC 和 Doris 实现了实时表的构建和数据同步。在实现过程中,尤其压迫注意数据类型的转换问题,以确保不同数据存储之间的兼容性。

此外要根据具体需求和场景,选择合适的实现方式,本文选择了 sql-client --f file 模式来实现实时表需求,旨在为读者提供了不同的实践体验。

六、相关资料

  • Doris 数据模型
  • Flink Doris Connector
  • FlinkCDC Pipeline
  • FlinkSQL 客户端
  • Flink Run jar 模式
  • Doris 源码内置转换工具
  • Doris & Flink Column Type Mapping
  • Mysql CDC Data Type Mapping

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/728034.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

细说券商VIP快速交易通道与交易所报单一文看懂

最近的行情渐入佳境,很多朋友都喜欢做短线或者打板交易。可以往往发现打板进去的要么就是量能不足没有封住的,或者说是炸板的,想要交易一字板又打板不进去,这种就很让人苦恼。今天我们就来解答下这种到底该怎么处理?首…

Vue中如何处理组件间的耦合问题?

在Vue中处理组件间的耦合问题是前端开发中常见的挑战之一。耦合问题指的是组件之间的依赖关系过于紧密,一旦某个组件发生改动,则可能导致其它组件也需要作出相应调整。为了解决这个问题,我们可以采取以下几种方法: 使用事件总线&…

[笔记] 使用 Java Swing 实现一个简单的窗口

Java Swing 是一个用于构建图形用户界面(GUI)的Java库,它提供了丰富的组件和工具,用于创建交互式的桌面应用程序。Swing 是 Java Foundation Classes(JFC)的一部分,它是 Java 平台的一种标准用户…

金三银四求职季,这个AI神器助你斩获高薪Offer!

金三银四将至,又到了求职的高峰季,不管是招聘方,还是求职者,肉眼可见都会忙到飞起。 过去准备招聘 JD 或求职简历,都依赖人工编辑和包装,而眼下已进入 AI 时代,善用 AI 的人,无形中…

在矩池云上使用CogVLM的具体方法(附与GPT4、Gemini测试效果对比)

CogVLM 是由智谱AI&清华KEG基于对视觉和语言信息之间融合的理解,所推出的多模态大模型。在本文中,我们将展示在矩池云上使用CogVLM的方法。 硬件要求 使用 CogVLM 需要 CUDA 11.8 及以上环境,推理总显存需要40G以上,可以直接…

评估需求优先级的方法

Kano模型: 1.前言 在大量的需求需要进行迭代时,由于时间、人力、财力等相关因素干扰,无法在有限的时间内容对所有的需求进行满足,此时需要我们对需求进行优先级的排列。最大化的合理的提高有限资源的使用。 在常见的产品优先级…

【笔记】Android Telephony 漫游SPN显示定制(Roaming Alpha Tag)

一、功能名词简介和显示规则 Alpha Tag:运营商名称标识符,也是用于标识运营商的一个名称。客户需求描述常用名词,对开发而言都是SPN/PLMN功能模块的内容,状态栏左上角的运营商名称显示。 SPN相关文章: 【笔记】SPN和…

重装系统后正版office如何安装

前言 重装系统后,正版office如何安装 登录官网 https://www.microsoft.com 下载office https://account.microsoft.com/services

OA系统看飞书,能把繁杂场景设计的这么流畅,绝对是高手。

OA系统看飞书,能把繁杂场景设计的这么流畅,绝对是高手。 2023-08-18 23:33贝格前端工场 飞书是一款功能强大、操作流畅的企业协作工具,它提供了丰富的功能和灵活的场景设计,使得用户在使用过程中能够更加高效地协作和沟通。 以…

ChatMASTER部署教程

项目简介 ChatMASTER,基于AI大模型api实现的自建后端Chat服务,支出同步响应及流式响应,完美呈现打印机效果。支持一键切换ChatGPT(3.5、4.0)模型、文心一言(支持Stable-Diffusion-XL作图)、通义千问、讯飞星火、智谱清言(ChatGLM)等主流模型…

IP形象设计是什么设计?如何做?

随着市场竞争的激烈,越来越多的企业开始关注品牌形象的塑造和推广。在品牌形象中,知识产权形象设计是一个非常重要的方面。在智能和互联网的趋势下,未来的知识产权形象设计可能更加关注数字和社交网络。通过数字技术和社交媒体平台&#xff0…

ospf虚链路实验简述

1、ospf虚链路实验简述 ospf虚链路配置 为解决普通区域不在骨干区域旁,通过配置Vlink-peer实现不同区域网络设备之间建立逻辑上的连接。 实验拓扑图 r1: sys sysname r1 undo info enable int loopb 0 ip add 1.1.1.1 32 ip add 200.200.200.200 32 quit int e0/0/…

Leetcode 239:滑动窗口最大值

题意 大小为 k 的滑动窗口从整数数组 nums 的最左侧移到最右侧,只能看到滑动窗口中的 k 个数字,窗口每次向右移动一位。 返回滑动窗口的最大值。 示例 1: 输入:nums [1,3,-1,-3,5,3,6,7], k 3 输出:[3,3,5,5,6,7] …

一家新店怎么快速出体验分?教大家一个简单好用的方法,建议收藏

大家好,我是电商花花。 在现在直播电商时代,抖音电商已经成为了一种新兴的商业模式,在抖音小店的项目上,店铺体验分成为了抖音小店能否成功的一个关键因素之一。 店铺的体验分越高,我们店铺的权重才会更高&#xff0…

04.if判断

04.if判断 01.if判断02.运算符2.比较(关系)运算符3.逻辑运算符4.三目运算符(三元表达式) (03)5.if-else6.if-elif结构 04.if嵌套7.if嵌套 01.if判断 if判断基本格式 基本格式 if 要判断的条件:…

1.BOM-获取元素(获取元素、修改属性)

web Api基本认知 作用:通过JS去操作html页面和浏览器(实现浏览器中的某些功能) 分类: DOM(网页):Document Object Model(文档对象模型) BOM(浏览器):Borwser Object Model(浏览器对象模型) DOM DOM树 将网页中标签的关系以树状…

【MySQL知识体系】第2章 数据库与表的创建(一)

第2章 数据库与表的创建 2.1 数据库操作 2.2 表操作 文章目录 第2章 数据库与表的创建2.1 数据库操作2.1.1 创建第一个数据库2.1.2 更新数据库名称(数据库创建后无法修改名称)2.1.3 删除数据库2.1.4 取个合适的数据库名称 第2章 数据库与表的创建 2.1 数…

在 echarts 的 rich 中使用 iconfont 图标库图标作为 backgroundColor.image 值的方法

实现步骤 1、引入 iconfont.js。该脚本执行时&#xff0c;会在 body 下插入一个 svg 标签&#xff0c;标签下包含了图标库中的 svg 图标 path。 <script src"your/iconfont/path/iconfont.js"></script>或者 import your/iconfont/path/iconfont.js2、…

【学习心得】websocket协议简介并与http协议对比

一、轮询和长轮询 在websocket协议出现之前&#xff0c;要想实现服务器和客户端的双向持久通信采取的是Ajax轮询。它的原理是每隔一段时间客户端就给服务器发送请求找服务器要数据。 让我们通过一个生活化的比喻来解释轮询和长轮询假设你正在与一位不怎么主动说话的老大爷&…

基于R语言lavaan的SEM在复杂统计建模中的科研技术新突破

此外&#xff0c;我们还将深入探讨R语言的基础知识、结构方程模型的基本原理、lavaan程序包的使用方法等内容。无论是潜变量分析、复合变量分析&#xff0c;还是非线性/非正态/缺失数据处理、分类变量分析、分组数据处理等复杂问题&#xff0c;我们都将一一为您解析。 希望通过…