数据实时获取方案之Flink CDC

目录

  • 一、方案描述
  • 二、Flink CDC
    • 1.1 什么是CDC
    • 1.2 什么是Flink CDC
    • 1.3 其它CDC
    • 1.4 FlinkCDC所支持的数据库情况
  • 二、使用Pipeline连接器实时获取数据
    • 2.1 环境介绍
    • 2.2 相关版本信息
    • 2.3 详细步骤
      • 2.3.1 实时获取MySQL数据并发送到Kafka
      • 2.3.2 实时获取MySQL数据并同步到Doris数据库

一、方案描述

在这里插入图片描述

由Flink CDC来监测到源数据库数据变更并将其发送到Kafka或同步到目标数据库中,再由后续消费者或其它应用来使用数据。

二、Flink CDC

1.1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 什么是Flink CDC

官方文档地址:[项目介绍 | Apache Flink CDC](Introduction | Apache Flink CDC)
官方描述:Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

1.3 其它CDC

在这里插入图片描述

1.4 FlinkCDC所支持的数据库情况

在这里插入图片描述

Flink CDC 提供了可用于 YAML 作业的 Pipeline Source 和 Sink 连接器来与外部系统交互。可以直接使用这些连接器,只需将 JAR 文件添加到您的 Flink CDC 环境中,并在 YAML Pipeline 定义中指定所需的连接器。
在这里插入图片描述

Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件(其中一些组件是基于Debezium来获取数据变更,它可以充分利用Debezium的能力)。使用这些组件可以通过Flink SQL或代码开发的方式获取目标数据库的全量数据和增量变更数据。
ConnectoryDatabaseDrivermongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4mysql-cdcMySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1JDBC Driver: 8.0.28oceanbase-cdcOceanBase CE: 3.1.x, 4.xOceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.xoracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

二、使用Pipeline连接器实时获取数据

2.1 环境介绍

我们下面将主要展示通过使用Pipeline连接器来获取实时数据的流程。
准备环境:

  • 单节点的standalone模式的Flink集群
  • Flink CDC
  • 单节点Kafka
  • Doris快速体验版数据库
  • Mysql测试数据库

2.2 相关版本信息

  • Flink 1.18
  • Flink CDC 3.11
  • Kafka 3.6.1
  • Doris doris-2.0.3-rc06

2.3 详细步骤

引入所需依赖包
将 flink-cdc-pipeline-connector-doris-3.1.1.jar flink-cdc-pipeline-connector-kafka-3.1.1.jar flink-cdc-pipeline-connector-mysql-3.1.1.jar放入flink cdc的lib文件夹下

2.3.1 实时获取MySQL数据并发送到Kafka

1.编写同步变更配置文件
将yaml文件放入到flink-cdc下的job文件夹中

# 数据来源
source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8# 数据去向
sink:type: kafkatopic: test003properties.bootstrap.servers: xxx.xxx.xxx.xxx:9092format: jsonpipeline:name: Sync MySQL Data to KAFKAparallelism: 2

2.启动Flink集群

# 在flink/bin下执行
./start-cluster.sh

3.启动Flink CDC 任务

# 在flink-cdc-3.1.1/bin下运行
./flink-cdc.sh ../job/mysql-to-kafka.yaml

启动成功
在这里插入图片描述

4.启动Kafka消费者

kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --topic test003

5.在源数据库中修改数据并观察Kafka消费者
当在源数据库testfid表进行数据新增,删除或修改,Kafka消费者即能消费到对应数据
在这里插入图片描述

2.3.2 实时获取MySQL数据并同步到Doris数据库

1.编写同步变更配置文件

# 数据来源
source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8# 数据去向
sink:type: dorisfenodes: xxx.xxx.xxx.xxx:8030username: rootpassword: "password"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2将yaml文件放入到flink-cdc下的job文件夹中

2.在Doris中创建数据库 doris_test

create database doris_test;

3.启动Flink CDC任务

# 在/app/path/flink-cdc-3.1.1/bin下执行
./flink-cdc.sh ../job/mysql-to-doris.yaml

4.进行数据变更并观察结果
先查看任务启动前源库MySQL和目标库Doris的数据情况,源库MySQL中共有两个表且表中已存在一些数据,Doris中没有表
在这里插入图片描述
在这里插入图片描述
启动任务后,两个表及数据都已同步到Doris中,当源表数据变更及表结构变更时,也都会实时同步到Doris中
在这里插入图片描述

5.进行路由变更后再进行测试并观察结果
Flink CDC Pipeline连接器也支持将两个同样表结构表的数据同步到目标数据库的一个表中

source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8sink:type: dorisfenodes: xxx.xxx.xxx.xxx:8030username: rootpassword: "password"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1# 将源库中testfid和testfid_copy1表的数据同步到目标库的表route_test中
route:- source-table: doris_test.testfidsink-table: doris_test.route_test- source-table: doris_test.testfid_copy1sink-table: doris_test.route_testpipeline:name: Sync MySQL Database to Dorisparallelism: 2

源数据
在这里插入图片描述
启动任务后Doris中的数据
在这里插入图片描述

源库中两个表的数据被合并同步到目标库的一个表中,但这只适用于相同表结构的合并,如果是不同表结构合并会造成数据错乱。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/47013.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用fiddler 查看手机端数据包

要使用Fiddler查看手机端的数据包,可以按照以下步骤进行操作: 下载并安装Fiddler:首先需要在你的电脑上下载并安装Fiddler软件。可以在Fiddler官方网站(https://www.telerik.com/fiddler)上找到适合你操作系统的版本&a…

初识C++|模板初阶

🍬 mooridy-CSDN博客 🧁C专栏(更新中!) 目录 🍉1. 泛型编程 🍉2. 函数模板 🥝2.1 函数模板概念 🥝2.2 函数模板格式 🥝2.3 函数模板的原理 &#x1f95…

万界星空科技QMS系统:全面赋能企业质量管理的创新引擎

万界星空科技质量管理QMS系统(Quality Management System)是一套全面、高效的质量管理工具,旨在帮助企业提升产品质量、优化生产流程、降低质量成本。该系统集成了多个功能模块,以满足企业在质量管理方面的各种需求。以下是万界星…

【网络安全科普】勒索病毒 防护指南

勒索病毒简介 勒索病毒是一种恶意软件,也称为勒索软件(Ransomware),其主要目的是在感染计算机后加密用户文件,并要求用户支付赎金以获取解密密钥。这种类型的恶意软件通常通过电子邮件附件、恶意链接、下载的软件或漏洞…

Android --- Kotlin学习之路:协程的使用,什么是协程,为什么要用协程?(学习笔记)

Kotlin 协程(coroutine)学习 以下干货满满,掌握以下内容一定会对你在项目开发中有所帮助,记得收藏!!! 文章目录 什么是协程,为什么要用协程?挂起函数挂起的是什么&#…

Svelte与Vue:框架性能与设计理念的比较

Svelte 和 Vue.js 都是现代前端框架,旨在简化 Web 开发并提高应用程序的性能。虽然它们都提供了构建用户界面的工具,但在设计理念、编译过程、运行时开销和性能方面存在显著差异。 Svelte 框架的特点 Svelte 的核心理念是在构建阶段尽可能多地完成工作…

JavaWeb JavaScript ① JS简介

目录 一、HTML&CSS&JavaScript的作用 二、前后端关联标签——表单标签 1.form标签 2.input标签 3.get/post提交的差异 4.表单项标签 5.布局相关标签 块元素——div 行内元素——span 三、CSS 1.CSS引入方式 方式1 行内式 方式2 内嵌式 方式3 外部样式表 2.CSS选择器 元…

【c++】用c++类做一个猜数字游戏

目录 源码: 想法: 可以改进的地方: 源码: #include<iostream> #include<ctime> #include<cstdlib> #include<string>using std::cout; using std::endl; using std::cin;class player { private:int card;bool viewable; public:player(): card…

Java基础编程500题——String

&#x1f4a5; 该系列属于【Java基础编程500题】专栏&#xff0c;如您需查看Java基础的其他相关题目&#xff0c;请您点击左边的连接 目录 1. 将字符串"Hello World"中的所有小写字母转换成大写字母。 2. 将两个字符串"Hello"和"World"拼接。 …

Zabbix监控介绍与部署

目 录 一、zabbix介绍和架构 1.1 zabbix介绍 1.2 为什么需要监控 1.3 需要监控什么 二、zabbix使用场景与系统概述 2.1 zabbix的功能 2.2 zabbix架构 2.3 Zabbix术语 三、编译安装zabbix 3.1 安装依赖环境 3.2 建立管理用户 3.3 准备源码包&#xff0c;解压包 3.…

082、Python 读文本文件

在Python中读取txt文本文件可以通过内置的open()函数结合file对象的read(), readline(), readlines()等方法实现。 1. 使用read()方法 read()方法会读取文件的全部内容&#xff0c;并将其作为一个字符串返回。 # 打开文件并读取全部内容with open(Resources/雨巷.txt, r, en…

Vue中Key的作用

在Vue中&#xff0c;Key的作用主要体现在以下几个方面&#xff1a; 1. 唯一标识列表元素 Key是Vue中用于唯一标识列表元素的特殊属性。在使用v-for指令渲染列表时&#xff0c;每个列表项都应该拥有唯一的key&#xff0c;这样Vue就能准确地追踪每个节点的身份&#xff0c;从而…

linux中当前目录、上级目录、上上级目录表示方法

1&#xff0c;cd [目录名]&#xff1a;表示进入某个目录 . &#xff1a;代表当前的目录&#xff0c;也可以使用 ./ 来表示&#xff1b;(一个点表示当前目录) . . &#xff1a;代表上一层目录&#xff0c;也可以 ../ 来代表。(两个点表示当前目录的上一层目录) ../..…

封装网络请求 鸿蒙APP HarmonyOS ArkTS

一、效果展示 通过在页面直接调用 userLogin(params) 方法&#xff0c;获取登录令牌 二、申请网络权限 访问网络时候首先需要申请网络权限&#xff0c;需要修改 src/main 目录下的 module.json5 文件&#xff0c;加入 requestPermissions 属性&#xff0c;详见官方文档 【声明权…

鸿蒙 next 5.0 版本页面跳转传参 接受参数 ,,接受的时候 要先定义接受参数的类型, 代码可以直接CV使用 [教程]

1, 先看效果 2, 先准备好两个页面 index 页面 传递参数 import router from ohos.routerEntry Component struct Index {Statelist: string[] [星期一, 星期二,星期三, 星期四,星期五]StateactiveIndex: number 0build() {Row() {Column({ space: 10 }) {ForEach(this.list,…

【Git远程操作】向远程仓库推送 | 拉取远程仓库

目录 1.向远程仓库推送 ​1.1本地仓库的配置 1.2remote-gitcode本地仓库 1.3推送至远程仓库 2.拉取远程仓库 现阶段以下操作仅在master主分支上。 1.向远程仓库推送 工作区☞add☞暂存区☞commit☞本地仓库☞推送push☞远程仓库注意&#xff1a;本地仓库的某个分支 ☞推…

《Techporters架构搭建》-Day01 第一个RESTful API接口

微服务架构搭建 搭建微服务架构分析一下项目的build.gradle添加Demo接口 搭建微服务架构 首先搭建系统管理模块&#xff0c;模块结构如下 tps-cloud └── tps-system -- 系统管理模块└── tps-system-api -- 系统管理模块公共api模块└── tps-system-biz -- 系统管理模…

单片机设计_自行车码表(AT89C51, LCD1602, DS1302,霍尔传感器)

想要更多项目私wo!!! 一、电路设计 系统采用51单片机LCD1602液晶DS1302时钟模块霍尔传感器电机按键模块蜂鸣器报警模块设计而成。 产品自带单片机上电复位电路、手动复位电路&#xff08;复位按键&#xff09;、晶振电路&#xff08;给单片机提供时钟周期&#xff09;。 …

Zabbix介绍和架构

目录 一.Zabbix简介 1.为什么需要监控 2.需要监控什么 3.常见的监控工具 4.Zabbix使用场景及系统概述 5.Zabbix 架构 6.Zabbix工作流程 7.Zabbix 术语 二. 部署安装zabbix 三.zabbix 配置文件 一.Zabbix简介 1.为什么需要监控 运维行业有句话:“无监控、不运维”&am…

AGV平面坐标系变换公式及实例

1、AGV坐标系简介 如上图&#xff0c;小车前后对角是有激光雷达的&#xff0c;其坐标系称为激光坐标系&#xff0c;采用极坐标系体现。中间为车体坐标系&#xff0c;激光坐标系相对于车体坐标系关系不变&#xff1b;左下角是地图坐标系&#xff0c;小车扫图后&#xff0c;建立的…