09 flink-sql 中基于 mysql-cdc 的 select * from test_user 的具体实现

前言

这也是最近帮一个朋友看问题 遇到的一个问题 

然后 引发了一下 对于 flink-sql 里面的一些 常规处理的思考, 理解 

原始问题主要是 在测试库可以使用 flink-sql 可以正常同步, 但是 在生产环境 无法正常同步数据 

这个问题 我们后面单独 记录一篇文章 

87fe04f3239e4e768da72132e7774269.png

 

 

测试用例

下载 flink-1.13.6, 首先启动一个 standalone 的集群 

master:flink-1.13.6 jerry$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host master.
Starting taskexecutor daemon on host master.

 

启动 flink sql-client 

master:flink-1.13.6 jerry$ ./bin/sql-client.sh 
Listening for transport dt_socket at address: 5007
No default environment specified.
Searching for '/Users/jerry/Downloads/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
Command history file path: /Users/jerry/.flink-sql-history▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░        ▒▒▒▓██▒  ▒░██▒   ▒▒▓▓█▓▓▒░      ▒██████▒         ░▒▓███▒    ▒█▒█▒░▓█            ███   ▓░▒██▓█       ▒▒▒▒▒▓██▓░▒░▓▓██░ █   ▒▒░       ███▓▓█ ▒█▒▒▒████░   ▒▓█▓      ██▒▒▒ ▓███▒░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓███▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒▓█   ▒█▓   ░     █░                ▒█              █▓█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒███   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░______ _ _       _       _____  ____  _         _____ _ _            _  BETA   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

 

创建 flink-sql 的表结构 

CREATE TABLE test_user (
`name` string,
`age` string,
PRIMARY KEY (`name`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'postgres',
'database-name' = 'test',
'table-name' = 'test_user'
);

 

源表数据如下 

d53a95f8b01a4676981c365726074779.png

 

然后 flink-sql 这边查询 结果如下 

1fbd5138ad994a5383fbcb31bd47d9f4.png

 

然后 我们来看一下 这里的整个处理流程 

 

 

flink-sql 中 select * from test_user 获取全量数据的调试

首先这里交互的角色抽象的可以理解为两个, 一个是 flink 集群, 一个是 flink sql-client 

然后 flink sql-client 这边组合查询, 相关业务, 然后创建一个 flink 任务, 抛给 flink 集群 

然后 两者进行交互, 首先是拿到 test_user 的快照全量数据, 然后 flink sql-client 这边做业务展示 

然后 test_user 的之后的增删改查, 的处理是基于 mysql binlog 这边来做增量处理 

我们这里 仅仅演示 test_user 的快照数据获取 以及 在 test_user 中增加一条记录, 然后 flink sql-client 这边是 怎么获取到的这个整个流程 

 

如下这里是 flink sql-client 这边将用户输入的 " select * from test_user " 转换为 flink sql 上下文的 operations, 然后封装成 pipeline, 提交给 flink 集群 

11a2df70ea0a4b589f0a9e7c2cfaef2a.png

 

 

看一下 flink 集群这边任务的执行, 首先是 第一次的全量数据的快照 

从 CollectSinkFunction 这边从 buffer 中获取到两条记录, 大致可以看出是 第一条记录 和 第二条记录, 然后 sendBackResults 通过 tcp交互, 将这两条数据对应的 StreamRecord 传输回 flink sql-client 

0609d767ecca4bce9d4605071734d1f9.png

 

往前回溯, 看一下 真正执行查询的地方, 执行的是 "select * from test_user;" 

然后这里迭代会将 查询的记录封装成为 SourceRecord, 然后添加到 recorderMarker 的 bufferedRecordQueue 中 

92e1ba3608324a0b847d9289c3c34dc1.png

 

然后这个 bufferedRecordQueue 是一个队列, 会将消耗的元素调用 enqueueRecord 将数据放入到 records 中 

6b48eaeadc2549758874c0f7b5bfcf25.png

 

这里是更细节的 enqueueRecord 的执行流程, 比如这里 迭代的事 第一条记录 

1c3973e8afe84d86af26ef340e11bb32.png

 

然后接着是 更上一层 Engine 的业务流程, 他会将 SnapshotReader 这边读取的记录更新到 batch 中 

351bda6f26bf4455b4d2f1adfb10a528.png

 

然后就是 Engine 这边的任务的执行, 将数据经过 map, filter, NotNull, 等等相关处理 

最终到达 CollectSinkFunction 这边 

1cf7e36ff8344e1f9391efc607f06186.png

 

然后 CollectSinkFunction 这边将数据封装成 GenericRowData, 然后序列化, 放入 buffer 队列 

然后 最终就是 CollectSinkFunction 上面的流程, 将序列化之后的数据通过 CollectCoordinationResponse 回传给 flink sql-client 

b0fa1585ee8c4f99b1aff634cf651099.png

 

 

然后 flink sql-client 这边的处理如下  

将拿到的数据, 添加到 buffer  队列 

3a6a2ea3c8344bd1be5fa3723ecb62f7.png

dc17460a9c7e409b8701bcf524ac4b56.png

 

然后就是 flink sql-client 这边的主线程的处理了, 从 buffer 中迭代 记录出来, 然后 放到 materializedTable, 然后 之后 cli 这边获取表格数据的时候, 将其传输到 snapshot 中 

494c6dd347c84e6a9548d88ba2d01e01.png

c677e808922f43cb83bc423ff641252b.png

 

 

flink sql-client 这边的展示流程如下 

1b2cf8d16f3b4cf097c9df6b07ef4f66.png

 

然后 做具体的展示, 展示结果如下, 然后 随着之后的迭代 能够获取完整当前页的数据, 展示在 cli 中 

f90e68011bb843cb9196cf2ca7002bfa.png

eae2ea9b78e54ea9a484632a45c7ab34.png

 

 

 

 

flink-sql 中 select * from test_user 获取增量数据的调试

增量数据的获取, 来自于 BinlogClient 这边的获取, 连接 mysql 的服务 

发送获取 binlog 的命令, 然后 之后 mysql 这边有 binlog 的事件之后, 会将相关 事件传递到 BinlogClient 这边 

比如这里 执行了一个 ”insert into test_user (`name`, age) select max(age)+1, max(age+1) from test_user;”, 增加了一条记录 (3, 3) 

然后这边 反序列化之后, 读取到 WriteRowsEvent 数据为 (3, 3) 

5ef9dc67350d489b91903f2abb0547c0.png

 

然后就是 BinlogClient 的后续流程, 将数据使用 recordMarker 记录 

和上面 SnapshotReader 这边处理一样, recorderMarker 会将 SourceRecord记录 添加到 records 列表, 由外层 Engine 层轮询 records 将其进行任务的执行, 到后面的 CollectSinkFunction 传输给 flink sql-client 这边做数据增删改查, 以及展示 

e596147a59714192b4bafcfa5f73d28a.png

 

为记录 (3, 3) 生成 SourceRecord 并放到 records 队列 

34bbed10370743b695fd3af2c99174be.png

 

Engine 层的处理, 其他的这里就不细化了 

f08ed652eda44ad39fc545c7290248f3.png

 

 

 

 

flink mysql-cdc MysqlConnectorTask 的处理 

我们可以看到上面 全量读取使用了 快照读, 然后增量的部分使用基于 binlog 来进行处理 

那么这个 处理流程是在这里呢? 

9da3b393683b4988be75f76ab669e9ef.png

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/794831.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络抓包专题

导航目录 HTTP 原理HTTPS 原理TLS 原理网络抓包原理一. 什么是抓包&#xff1f;二. 抓包的原理对HTTP请求进行抓包对HTTPS请求进行抓包 三. Android设备抓包问题Android6.0 及以下系统Android7.0 及以上系统方式一&#xff1a;方式二 HTTP 原理 HTTP 详解 点击跳转 HTTPS 原理…

【QT入门】 Qt代码创建布局综合运用:仿写腾讯会议登陆界面

往期回顾&#xff1a; 【QT入门】 Qt代码创建布局之水平布局、竖直布局详解-CSDN博客 【QT入门】 Qt代码创建布局之栅格布局详解-CSDN博客 【QT入门】 Qt代码创建布局之分裂器布局详解-CSDN博客 【QT入门】 Qt代码创建布局综合运用&#xff1a;仿写腾讯会议登陆界面 一、界面分…

Linux基础篇:文件系统介绍——根目录下文件夹含义与作用介绍

Linux文件系统介绍——文件夹含义与作用 Linux文件系统是一个组织和管理文件的层次结构。它包括了目录、子目录和文件&#xff0c;这些都是按照一定的规则和标准进行组织的。以下是Linux文件系统的一些关键组成部分&#xff1a; 1./bin&#xff1a; 该目录包含了系统启动和运…

Rust线程间通信通讯channel的理解和使用

Channel允许在Rust中创建一个消息传递渠道&#xff0c;它返回一个元组结构体&#xff0c;其中包含发送和接收端。发送端用于向通道发送数据&#xff0c;而接收端则用于从通道接收数据。不能使用可变变量的方式&#xff0c;线程外面修改了可变变量的值&#xff0c;线程里面是拿不…

C++设计模式:策略模式(二)

1、定义与动机 定义一系列算法&#xff0c;把它们一个个封装起来&#xff0c;并且使它们可互相替换&#xff08;变化&#xff09;&#xff0c;该模式使得算法可独立于使用它的客户程序&#xff08;稳定&#xff09;而变化&#xff08;扩展&#xff0c;子类化&#xff09; 在软…

Hadoop-MapReduce

一、MapReduce 概述 1.1 MapReduce 定义 MapReduce 是一个分布式运算程序的编程框架&#xff0c;是用户开发“基于 Hadoop 的数据分析应用”的核心框架。 MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序&#xff0c;并发运行在…

Linux:Centos9:配置固定ip

centos9的网卡位置移动到了 /etc/NetworkManager/system-connections/ 下面 查看网卡 ifconfig 当前有两块网卡&#xff0c;我要去配置ens160的一个固定的ip&#xff0c;让其ip为192.168.6.20/24&#xff0c;网关为192.168.6.254.dns为&#xff1a;1.1.1.1 vim /etc/Netwo…

CSS-概述

&#x1f4da;详见 W3scholl&#xff0c;本篇只做快速思维索引。 概述 CSS 是一种描述 HTML 文档样式的语言。 有三种插入样式表的方法&#xff1a; 外部 CSS内部 CSS行内 CSS &#x1f4c5; 外部 CSS 外部样式表存储在.css文件中。HTML 页面必须在 head 部分的<link&g…

基于JavaWeb实现的漫画网站前后台系统

一、项目简介 本项目是一套基于JavaWeb实现的漫画网站前后台系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#x…

云服务器ECS租用价格表报价——阿里云

阿里云服务器租用价格表2024年最新&#xff0c;云服务器ECS经济型e实例2核2G、3M固定带宽99元一年&#xff0c;轻量应用服务器2核2G3M带宽轻量服务器一年61元&#xff0c;ECS u1服务器2核4G5M固定带宽199元一年&#xff0c;2核4G4M带宽轻量服务器一年165元12个月&#xff0c;2核…

SRS 实时视频服务器搭建及使用

一、SRS 介绍 SRS是一个开源的&#xff08;MIT协议&#xff09;简单高效的实时视频服务器&#xff0c;支持RTMP、WebRTC、HLS、HTTP-FLV、SRT、MPEG-DASH和GB28181等协议。 SRS媒体服务器和FFmpeg、OBS、VLC、 WebRTC等客户端配合使用&#xff0c;提供流的接收和分发的能力&am…

DNS和HTTP

DNS应用层协议 域名解析系统 使用IP地址&#xff0c;来描述设备在网络上的位置 IP地址并不适合来进行传播网站&#xff0c;就采用了域名的方式来解决网站传播的问题。如www.baidu.com这样类似的就很容易让人记住。其域名就直接代表了这个网站。而且有一套自动的系统会将域名解…

YOLO火灾烟雾检测数据集:20000多张,yolo标注完整

YOLO火灾烟雾检测数据集&#xff1a;一共20859张图像&#xff0c;yolo标注完整&#xff0c;部分图像应用增强 适用于CV项目&#xff0c;毕设&#xff0c;科研&#xff0c;实验等 需要此数据集或其他任何数据集请私信

C++11多线程库重点接口

目录 一.thread构造函数 二.移动构造&#xff0c;移动赋值 小结 三.获取线程id的方法 四.thread与lambda表达式联用 五.Mutexs的总览 六.互斥锁 七.Locks的总览 八. 条件变量总览 九.条件变量的wait和notify 十.典型例题 十一.原子类 十二.智能指针和单例模式的线…

详解 Redis 在 Ubuntu 系统上的安装

在 Ubuntu 20.04 安装 Redis 1. 先切换到 root 用户 在 Ubuntu 20.04 中&#xff0c;可以通过以下步骤切换到 root 用户&#xff1a; 输入以下命令&#xff0c;以 root 用户身份登录&#xff1a; sudo su -按回车键&#xff0c;并输入当前用户的密码&#xff08;即具有 sudo…

【论文精读】Detecting Out-of-Distribution Examples with Gram Matrices 使用Gram矩阵检测分布外实例

文章目录 一、文章概览&#xff08;一&#xff09;Gram矩阵1、Gram&#xff08;格朗姆&#xff09;矩阵的定义2、Gram矩阵计算特征表示3、风格迁移中的Gram矩阵 &#xff08;二&#xff09;ood检测&#xff08;三&#xff09;核心思路&#xff1a;扩展 Gram 矩阵以进行分布外检…

2024最新在线工具箱/ 站长IT工具箱/网站系统源码下载

2024最新在线工具箱/ 站长IT工具箱/网站系统源码下载- 更多详情及下载地址请访问https://a5.org.cn/a5_ziyuan/39525.html 转载请注明出处!

SketchUp Pro 2024 for mac 草图大师 专业的3D建模软件

SketchUp Pro 2024 for Mac是一款功能强大的三维建模软件&#xff0c;适用于Mac电脑。其简洁易用的界面和强大的工具集使得用户可以轻松创建复杂的3D模型。 软件下载&#xff1a;SketchUp Pro 2024 for mac v24.0.483 激活版下载 SketchUp Pro 2024 for Mac支持导入和导出多种文…

软件杯 深度学习乳腺癌分类

文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度&#xff0c;召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…

性能测试,python 内存分析工具

Memray是一个由彭博社开发的、开源内存剖析器&#xff1b;开源一个多月&#xff0c;已经收获了超8.4k的star&#xff0c;是名副其实的明星项目。今天我们就给大家来推荐这款python内存分析神器。 Memray可以跟踪python代码、本机扩展模块和python解释器本身中内存分配&#xf…