FlinkCDC 实现 MySQL 数据变更实时同步

文章目录

  • 1、基本介绍
  • 2、代码实战
    • 2.1、数据源准备
    • 2.2、代码实战
    • 2.3、数据格式

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)
    在这里插入图片描述
    对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
    在这里插入图片描述

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主键id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',`content` varchar(500) DEFAULT NULL COMMENT '消息内容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
在这里插入图片描述
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

在这里插入图片描述

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置监听数据源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
在这里插入图片描述

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}
# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}
# 删除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/55408.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【图论】(一)图论理论基础与岛屿问题

图论理论基础与岛屿问题 图论理论基础深度搜索&#xff08;dfs&#xff09;广度搜索&#xff08;bfs&#xff09;岛屿问题概述 岛屿数量岛屿数量-深搜版岛屿数量-广搜版 岛屿的最大面积孤岛的总面积沉没孤岛建造最大人工岛水流问题岛屿的周长 图论理论基础 这里仅对图论相关核…

java学习--集合(大写五.5)

5.collection子接口&#xff1a;Set 5.1Set及其实现类特点 java.util.Collection:存储一个一个的数据 子接口&#xff1a;Set:存储无序的、不可重复的数据(高中学习的集合) 1&#xff09;HashsSet主要实现类 底层使用的是HashMap&#xff0c;即使用数组单向链表红黑树结构进行…

netron安装(windows linux)

目录 netron简介 不同操作系统的安装方式 linux windows mac系统 netron简介 netron可视化工具&#xff0c;是一个可以清晰的看到神经网络模型的每一层的输入输出&#xff0c;网络总体的架构&#xff0c;而且支持各种不同网络框架&#xff0c;简单好用。 效果如下所示 不…

028 elasticsearch索引管理-ElasticsearchRestTemplate

文章目录 pom.xmlapplication.ymlCubemallSearchApplication.javaRestClientTest.java使用ElasticsearchRestTemplate对象Blog.javaRestTemplateTest.java pom.xml <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-s…

.net core 3.0 与 6.0 有哪些不同

.NET Core 3.0 和 .NET 6.0&#xff08;注意&#xff0c;从 .NET 5.0 开始&#xff0c;微软将 .NET Core 和 .NET Framework 合并为一个统一的 .NET 平台&#xff09;之间有许多重要的区别。这些区别包括性能改进、新功能、API 的变化以及对不同平台的支持。下面是一些主要的区…

【视频笔记】408新增知识点信号——里昂视频

文章目录 **2.信号****3.信号的实现**4.信号的处理**①忽略信号****②执行信号的默认操作****③捕获井处理信号**几个Linux支持的典型信号&#xff1a; **5.信号的产生**① **通过终端按键(键盘)产生信号**例如&#xff0c;CtrlC发送2号信号SIGINT、Ctrl\发送3号信号SIGQUIT② …

大数据面试题整理——HDFS

大数据面试题整理 第一章 HDFS面试题 第二章 MapReduce面试题 文章目录 大数据面试题整理一、请简要介绍一下 HDFS。二、HDFS 的主要特点有哪些&#xff1f;三、说说 HDFS 的架构&#xff0c;以及 NameNode 和 DataNode 的作用。四、什么是心跳机制&#xff1f;五、解释一下 H…

详解SSH和bash

1. SSH&#xff08;Secure Shell&#xff09; SSH 是你在远程登录到Linux服务器时使用的工具。假设你有一台远程服务器&#xff0c;你想从自己的电脑登录到这台服务器进行操作&#xff0c;SSH 就是你使用的工具。 示例&#xff1a; 你在自己的电脑上打开终端&#xff0c;输入…

使用Python爬虫API,轻松获取电商商品SKU信息

在电子商务的复杂世界中&#xff0c;SKU&#xff08;Stock Keeping Unit&#xff0c;库存单位&#xff09;信息是连接供应商、库存、销售和客户服务的桥梁。它不仅包含了商品的规格、价格、库存等关键数据&#xff0c;还直接影响到库存管理、价格策略和市场分析等多个方面。在这…

爬虫逆向-js进阶

1.作用域和闭包 //作用域 // var a 3 // // function test(a){ // var a 1; // console.log(函数内部,a) // } // test(2) // // console.log(a)//闭包 // function jiami(){ // function encrypt(){ // console.log(在这里进行加密了) // } // p…

五个必备的高清无水印视频素材库推荐

做抖音、短视频创作的朋友都知道&#xff0c;优质的素材往往决定了作品能否获得更多关注。如果你还不知道在哪里下载高清无水印的视频素材&#xff0c;不用担心&#xff01;今天为你推荐5个高品质的视频素材库&#xff0c;助你轻松创作出爆款视频。 蛙学网 是国内领先的视频素材…

Mysql常用sql语句与刷题知识点

目录 1. 常用sql2. 刷题知识点 1. 常用sql #查询MySQL中所有的数据库 SHOW DATABASES; #查询当前正在使用的数据库 SELECT DATABASE();#普通创建&#xff08;创建已经存在的数据库会报错&#xff09; CREATE DATABASE 数据库名称; #创建并判断&#xff08;该数据库不存在才创建…

2.html编辑器介绍

html编辑器介绍 HTML 编辑器推荐 理论上我们可以使用记事本进行html编码和开发&#xff0c;但是在实际开发html页面的时候&#xff0c;使用一些专业的开发工具可以使我们更加快速和高效的进行开发&#xff0c;下面介绍几种开发工具&#xff1a; VS Code&#xff1a;https://…

006_django基于Python的二手房源信息爬取与分析2024_l77153d4

目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍&#xff1a;CodeMentor毕业设计领航者、全网关注者30W群落&#xff0c;InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者&#xff0c;博客领航之星、开发者头条/腾讯云/AW…

【ios】SwiftUI 混用 UIKit 的 Bug 解决:UITableView 无法滚动到底部

问题描述 在 SwiftUI 中嵌套使用 UIKit 的 UITableView 时&#xff0c;你可能会遇到一个常见的 Bug&#xff1a;UITableView 的高度没有正确设置&#xff0c;导致内容无法正常滚动&#xff0c;尤其是滚动到页面底部时。 核心问题在于 SwiftUI 和 UIKit 的布局机制不同。Swift…

DNS:互联网域名系统的核心

什么是 DNS&#xff1f; DNS&#xff08;Domain Name System&#xff0c;域名系统&#xff09;是互联网的一项基础服务&#xff0c;它负责将人类容易记忆的域名&#xff08;如 www.example.com&#xff09;转换成计算机可以识别的 IP 地址&#xff08;如 192.0.2.1&#xff09…

针对脚本爬虫攻击的防御策略与实现

随着互联网的发展&#xff0c;网站和应用程序面临着越来越多的自动化攻击&#xff0c;其中包括使用脚本进行的大规模数据抓取&#xff0c;即所谓的“爬虫攻击”。这类攻击不仅影响网站性能&#xff0c;还可能导致敏感数据泄露。本文将探讨如何识别爬虫攻击&#xff0c;并提供一…

【uniapp】实现触底加载数据

前言&#xff1a;实现界面触底数据加载。后端接口得支持翻页传参&#xff08;本案例使用django&#xff09; 1、后端接口 1.1 封装翻页公共方法standardPagination.py # -*- coding: utf-8 -*- # Time : 2024/10/15 13:15 # Author : super # File : standardPaginat…

全托自闭症教育,关注孩子每个细节

原文文章&#xff1a;http://www.zibizhengwang.com/page37.html 自闭症&#xff0c;这一复杂的神经发育障碍&#xff0c;影响着无数孩子的成长与未来。然而&#xff0c;在广州&#xff0c;有一座特别的灯塔——星贝育园自闭症儿童寄宿制学校&#xff0c;它不仅照亮了自闭症儿…

SpringBoot使用esayExcel根据模板导出excel

1、依赖 <dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.1.3</version></dependency> 2、模板 3、实体类 package com.skybird.iot.addons.productionManagement.qualityTesting…