数据实时获取方案之Flink CDC

目录

  • 一、方案描述
  • 二、Flink CDC
    • 1.1 什么是CDC
    • 1.2 什么是Flink CDC
    • 1.3 其它CDC
    • 1.4 FlinkCDC所支持的数据库情况
  • 二、使用Pipeline连接器实时获取数据
    • 2.1 环境介绍
    • 2.2 相关版本信息
    • 2.3 详细步骤
      • 2.3.1 实时获取MySQL数据并发送到Kafka
      • 2.3.2 实时获取MySQL数据并同步到Doris数据库

一、方案描述

在这里插入图片描述

由Flink CDC来监测到源数据库数据变更并将其发送到Kafka或同步到目标数据库中,再由后续消费者或其它应用来使用数据。

二、Flink CDC

1.1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 什么是Flink CDC

官方文档地址:[项目介绍 | Apache Flink CDC](Introduction | Apache Flink CDC)
官方描述:Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

1.3 其它CDC

在这里插入图片描述

1.4 FlinkCDC所支持的数据库情况

在这里插入图片描述

Flink CDC 提供了可用于 YAML 作业的 Pipeline Source 和 Sink 连接器来与外部系统交互。可以直接使用这些连接器,只需将 JAR 文件添加到您的 Flink CDC 环境中,并在 YAML Pipeline 定义中指定所需的连接器。
在这里插入图片描述

Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件(其中一些组件是基于Debezium来获取数据变更,它可以充分利用Debezium的能力)。使用这些组件可以通过Flink SQL或代码开发的方式获取目标数据库的全量数据和增量变更数据。
ConnectoryDatabaseDrivermongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4mysql-cdcMySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1JDBC Driver: 8.0.28oceanbase-cdcOceanBase CE: 3.1.x, 4.xOceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.xoracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

二、使用Pipeline连接器实时获取数据

2.1 环境介绍

我们下面将主要展示通过使用Pipeline连接器来获取实时数据的流程。
准备环境:

  • 单节点的standalone模式的Flink集群
  • Flink CDC
  • 单节点Kafka
  • Doris快速体验版数据库
  • Mysql测试数据库

2.2 相关版本信息

  • Flink 1.18
  • Flink CDC 3.11
  • Kafka 3.6.1
  • Doris doris-2.0.3-rc06

2.3 详细步骤

引入所需依赖包
将 flink-cdc-pipeline-connector-doris-3.1.1.jar flink-cdc-pipeline-connector-kafka-3.1.1.jar flink-cdc-pipeline-connector-mysql-3.1.1.jar放入flink cdc的lib文件夹下

2.3.1 实时获取MySQL数据并发送到Kafka

1.编写同步变更配置文件
将yaml文件放入到flink-cdc下的job文件夹中

# 数据来源
source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8# 数据去向
sink:type: kafkatopic: test003properties.bootstrap.servers: xxx.xxx.xxx.xxx:9092format: jsonpipeline:name: Sync MySQL Data to KAFKAparallelism: 2

2.启动Flink集群

# 在flink/bin下执行
./start-cluster.sh

3.启动Flink CDC 任务

# 在flink-cdc-3.1.1/bin下运行
./flink-cdc.sh ../job/mysql-to-kafka.yaml

启动成功
在这里插入图片描述

4.启动Kafka消费者

kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --topic test003

5.在源数据库中修改数据并观察Kafka消费者
当在源数据库testfid表进行数据新增,删除或修改,Kafka消费者即能消费到对应数据
在这里插入图片描述

2.3.2 实时获取MySQL数据并同步到Doris数据库

1.编写同步变更配置文件

# 数据来源
source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8# 数据去向
sink:type: dorisfenodes: xxx.xxx.xxx.xxx:8030username: rootpassword: "password"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2将yaml文件放入到flink-cdc下的job文件夹中

2.在Doris中创建数据库 doris_test

create database doris_test;

3.启动Flink CDC任务

# 在/app/path/flink-cdc-3.1.1/bin下执行
./flink-cdc.sh ../job/mysql-to-doris.yaml

4.进行数据变更并观察结果
先查看任务启动前源库MySQL和目标库Doris的数据情况,源库MySQL中共有两个表且表中已存在一些数据,Doris中没有表
在这里插入图片描述
在这里插入图片描述
启动任务后,两个表及数据都已同步到Doris中,当源表数据变更及表结构变更时,也都会实时同步到Doris中
在这里插入图片描述

5.进行路由变更后再进行测试并观察结果
Flink CDC Pipeline连接器也支持将两个同样表结构表的数据同步到目标数据库的一个表中

source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8sink:type: dorisfenodes: xxx.xxx.xxx.xxx:8030username: rootpassword: "password"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1# 将源库中testfid和testfid_copy1表的数据同步到目标库的表route_test中
route:- source-table: doris_test.testfidsink-table: doris_test.route_test- source-table: doris_test.testfid_copy1sink-table: doris_test.route_testpipeline:name: Sync MySQL Database to Dorisparallelism: 2

源数据
在这里插入图片描述
启动任务后Doris中的数据
在这里插入图片描述

源库中两个表的数据被合并同步到目标库的一个表中,但这只适用于相同表结构的合并,如果是不同表结构合并会造成数据错乱。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/47013.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

初识C++|模板初阶

🍬 mooridy-CSDN博客 🧁C专栏(更新中!) 目录 🍉1. 泛型编程 🍉2. 函数模板 🥝2.1 函数模板概念 🥝2.2 函数模板格式 🥝2.3 函数模板的原理 &#x1f95…

万界星空科技QMS系统:全面赋能企业质量管理的创新引擎

万界星空科技质量管理QMS系统(Quality Management System)是一套全面、高效的质量管理工具,旨在帮助企业提升产品质量、优化生产流程、降低质量成本。该系统集成了多个功能模块,以满足企业在质量管理方面的各种需求。以下是万界星…

【网络安全科普】勒索病毒 防护指南

勒索病毒简介 勒索病毒是一种恶意软件,也称为勒索软件(Ransomware),其主要目的是在感染计算机后加密用户文件,并要求用户支付赎金以获取解密密钥。这种类型的恶意软件通常通过电子邮件附件、恶意链接、下载的软件或漏洞…

JavaWeb JavaScript ① JS简介

目录 一、HTML&CSS&JavaScript的作用 二、前后端关联标签——表单标签 1.form标签 2.input标签 3.get/post提交的差异 4.表单项标签 5.布局相关标签 块元素——div 行内元素——span 三、CSS 1.CSS引入方式 方式1 行内式 方式2 内嵌式 方式3 外部样式表 2.CSS选择器 元…

Zabbix监控介绍与部署

目 录 一、zabbix介绍和架构 1.1 zabbix介绍 1.2 为什么需要监控 1.3 需要监控什么 二、zabbix使用场景与系统概述 2.1 zabbix的功能 2.2 zabbix架构 2.3 Zabbix术语 三、编译安装zabbix 3.1 安装依赖环境 3.2 建立管理用户 3.3 准备源码包,解压包 3.…

封装网络请求 鸿蒙APP HarmonyOS ArkTS

一、效果展示 通过在页面直接调用 userLogin(params) 方法,获取登录令牌 二、申请网络权限 访问网络时候首先需要申请网络权限,需要修改 src/main 目录下的 module.json5 文件,加入 requestPermissions 属性,详见官方文档 【声明权…

鸿蒙 next 5.0 版本页面跳转传参 接受参数 ,,接受的时候 要先定义接受参数的类型, 代码可以直接CV使用 [教程]

1, 先看效果 2, 先准备好两个页面 index 页面 传递参数 import router from ohos.routerEntry Component struct Index {Statelist: string[] [星期一, 星期二,星期三, 星期四,星期五]StateactiveIndex: number 0build() {Row() {Column({ space: 10 }) {ForEach(this.list,…

【Git远程操作】向远程仓库推送 | 拉取远程仓库

目录 1.向远程仓库推送 ​1.1本地仓库的配置 1.2remote-gitcode本地仓库 1.3推送至远程仓库 2.拉取远程仓库 现阶段以下操作仅在master主分支上。 1.向远程仓库推送 工作区☞add☞暂存区☞commit☞本地仓库☞推送push☞远程仓库注意:本地仓库的某个分支 ☞推…

《Techporters架构搭建》-Day01 第一个RESTful API接口

微服务架构搭建 搭建微服务架构分析一下项目的build.gradle添加Demo接口 搭建微服务架构 首先搭建系统管理模块,模块结构如下 tps-cloud └── tps-system -- 系统管理模块└── tps-system-api -- 系统管理模块公共api模块└── tps-system-biz -- 系统管理模…

单片机设计_自行车码表(AT89C51, LCD1602, DS1302,霍尔传感器)

想要更多项目私wo!!! 一、电路设计 系统采用51单片机LCD1602液晶DS1302时钟模块霍尔传感器电机按键模块蜂鸣器报警模块设计而成。 产品自带单片机上电复位电路、手动复位电路(复位按键)、晶振电路(给单片机提供时钟周期)。 …

Zabbix介绍和架构

目录 一.Zabbix简介 1.为什么需要监控 2.需要监控什么 3.常见的监控工具 4.Zabbix使用场景及系统概述 5.Zabbix 架构 6.Zabbix工作流程 7.Zabbix 术语 二. 部署安装zabbix 三.zabbix 配置文件 一.Zabbix简介 1.为什么需要监控 运维行业有句话:“无监控、不运维”&am…

AGV平面坐标系变换公式及实例

1、AGV坐标系简介 如上图,小车前后对角是有激光雷达的,其坐标系称为激光坐标系,采用极坐标系体现。中间为车体坐标系,激光坐标系相对于车体坐标系关系不变;左下角是地图坐标系,小车扫图后,建立的…

探索智慧职校教职工管理的教师信息功能

在智慧职校的教职工管理体系中,教师信息管理犹如教师职业生涯的数字罗盘,引领着教师个人成长与学校教学质量的双轨并进。这一模块的核心精髓在于对教师基本信息的精细捕捉与维护,确保每位教师的个人资料,诸如姓名、性别、出生日期…

RK3588核心板怎么选?为项目挑选合适核心板的五大建议

在信息爆炸的互联网海洋中,面对琳琅满目的RK3588核心板产品,您是否也曾感到眼花缭乱,难以抉择?究竟哪一款能够完美契合您的智能设备开发项目,让您在最短时间内找到最合适的伙伴,减少研发试错,加…

Day58:并查集 108.冗余连接 109.冗余连接II

108. 冗余连接 时间限制:1.000S 空间限制:256MB 题目描述 树可以看成是一个图(拥有 n 个节点和 n - 1 条边的连通无环无向图)。 现给定一个拥有 n 个节点(节点标号是从 1 到 n)和 n 条边的连通无向图&…

wandb本地部署

pip install --upgrade wandbwandb server start(如果失败了) docker pull wandb/local:latest启动 docker run --rm -d -v wandb:/vol -p 8080:8080 --name wandb-local wandb/local:latest打开http://localhost:8080/signup (有可能失败&…

LeetCode热题100刷题16:74. 搜索二维矩阵、33. 搜索旋转排序数组、153. 寻找旋转排序数组中的最小值、98. 验证二叉搜索树

74. 搜索二维矩阵 class Solution { public:bool searchMatrix(vector<vector<int>>& matrix, int target) {int row matrix.size();int col matrix[0].size();for(int i0;i<row;i) {//先排除一下不存在的情况if(i>0&&matrix[i][0]>target…

Qt Style Sheets-入门

Qt 样式表是一种强大的机制&#xff0c;允许您自定义小部件的外观&#xff0c;这是在通过子类化QStyle已经可行的基础上的补充。Qt 样式表的概念、术语和语法在很大程度上受到 HTML级联样式表 (CSS)的启发&#xff0c;但适用于小部件的世界。 概述 样式表是文本规范&#xff0…

教室管理系统的开发与实现(Java+MySQL)

引言 教室管理系统是学校和培训机构日常运营中不可或缺的工具。本文将介绍如何使用Java、Swing GUI、MySQL和JDBC开发一个简单而有效的教室管理系统&#xff0c;并涵盖系统的登录认证、教室管理、查询、启用、暂停和排课管理功能。 技术栈介绍 Java&#xff1a;作为主要编程…

实战项目:仿muduo库实现并发服务器

目录 项目初始与项目演示HTTP服务器基础认识Reactor模式基础认识单Reactor单线程模式认识单Reactor多线程模式认识多Reactor多线程模式认识 目标定位总体大模块划分server模块的管理思想Buffer子模块Socket子模块Channel子模块Connection子模块Acceptor子模块TimerQueue子模块P…