Flink CDC数据同步

背景

随着信息化程度的不断提高,企业内部系统的数量和复杂度不断增加,因此,数据库系统的同步问题已成为越来越重要的问题。

缓存失效

在缓存中缓存的条目(entry)在源头被更改或者被删除的时候立即让缓存中的条目失效。如果缓存在一个独立的进程中运行(例如Redis,Memcache),那么简单的缓存失效逻辑可以放在独立的进程或服务中,从而简化主应用的逻辑。在一些场景中,缓存失效逻辑可以更复杂一点,让它利用更改事件中的更新数据去更新缓存中受影响的条目。

简化单体应用

许多应用更新数据库,然后在数据库中的更改被提交后,做一些额外的工作:更新搜索索引,更新缓存,发送通知,运行业务逻辑,等等。这种情况通常称为双写(dual-writes),因为应用没有在一个事务内写多个系统。这样不仅应用逻辑复杂难以维护,而且双写容易丢失数据或者在一些系统更新成功而另一些系统没有更新成功的时候造成不同系统之间的状态不一致。使用捕获更改数据技术(change data capture,CDC),在源数据库的数据更改提交后,这些额外的工作可以被放在独立的线程或者进程(服务)中完成。这种实现方式的容错性更好,不会丢失事件,容易扩展,并且更容易支持升级。

共享数据库

当多个应用共用同一个数据库的时候,一个应用提交的更改通常要被另一个应用感知到。一种实现方式是使用消息总线,尽管非事务性的消息总线总会受上面提到的双写影响。但是,另一种实现方式,变得很直接:每个应用可以直接监控数据库的更改,并且响应更改。

数据集成

数据通常被存储在多个地方,尤其是当数据被用于不同的目的的时候,会有不同的形式。保持多系统的同步是很有挑战性的,但是可以通过使用数据同步工具加上简单的事件处理逻辑来实现简单的ETL类型的解决方案。

命令查询职责分离

在命令查询职责分离 [Command Query Responsibility Separation (CQRS)]架构模式中,更新数据使用了一种数据模型,读数据使用了一种或者多种数据模型。由于数据更改被记录在更新侧(update-side),这些更改将被处理以更新各种读展示。所以CQRS应用通常更复杂,尤其是他们需要保证可靠性和全序(totally-ordered)处理。Debezium和CDC可以使这种方式更可行:写操作被正常记录,但是Debezium捕获数据更改,并且持久化到全序流里,然后供那些需要异步更新只读视图的服务消费。写侧(write-side)表可以表示面向领域的实体(domain-oriented entities),或者当CQRS和 Event Sourcing 结合的时候,写侧表仅仅用做追加操作命令事件的日志。

Flink CDC

CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink ®的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。

image-20230827094534219

数据抓取

FlinkCDC 使用 MySQL 的 binlog 技术进行数据抓取。binlog 是 MySQL 用于记录数据库变更操作的日志,包括对表的增删改操作。FlinkCDC 通过对 binlog 进行解析和读取,得到最新的增量数据,并将其转换为 Flink 支持的数据格式,如 Avro 或 JSON。

如下代码可以帮我们监听数据库的变更日志:

JdbcIncrementalSource<String> oracleChangeEventSource =new OracleSourceBuilder().hostname("host").port(1521).databaseList("XE").schemaList("DEBEZIUM").tableList("DEBEZIUM.PRODUCTS").username("username").password("password").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true) // output the schema changes as well.startupOptions(StartupOptions.initial()).debeziumProperties(debeziumProperties).splitSize(2).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000L);// set the source parallelism to 4env.fromSource(oracleChangeEventSource,WatermarkStrategy.noWatermarks(),"OracleParallelSource").setParallelism(4).print().setParallelism(1);env.execute("Print Oracle Snapshot + RedoLog");

数据同步

FlinkCDC 将抓取到的增量数据同步到 Flink 或者其他的计算引擎中进行处理。同步方式有两种:

pull 模式:FlinkCDC 在启动时会向 MySQL 中的某个位置开始读取 binlog,然后通过一个 HTTP 接口将增量数据暴露给 Flink。Flink 每隔一段时间就会调用该接口拉取增量数据。

push 模式:FlinkCDC 将增量数据通过一个 Kafka Topic 推送给 Flink。Flink 在消费 Kafka Topic 时,就可以直接消费到增量数据。

监听到数据变动,能拿到变更前后的数据对比,经过Sink数据转换成相应的INSERT、UPDATE、DELETE等相关SQL语句,并同步到目标数据库。

public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println("监听到活动数据:" + LocalDateTime.now() + value);JSONObject jsonObject = JSONObject.parseObject(value);User before = jsonObject.getObject("before", User.class);User after = jsonObject.getObject("after", User.class);try {String table = jsonObject.getJSONObject("source").getString("table");SqlParse sqlParse = new SqlParse();String executeSQL = "";if(before == null){// 插入executeSQL = sqlParse.getInsert(after,table);}else if(after == null){// 删除executeSQL = sqlParse.getDeleteSQL(before,table);}else{// 更新executeSQL = sqlParse.getUpdateSQL(before,after,table);}SpringJDBC.executeSQL(executeSQL);}catch (Exception e){System.out.println("执行错误");}}}

通用形SqlParse只能解析同构数据,异构数据需要单独处理。

增量数据的解析和处理

FlinkCDC 将抓取到的增量数据转换为 Flink 支持的数据格式后,交由 Flink 进行进一步的处理。Flink 可以对数据进行各种运算,如聚合、过滤、变换等,最终将处理结果输出到其他的存储介质中。

总的来说,FlinkCDC 的原理就是通过解析 MySQL 中的 binlog,抓取到最新的增量数据,并将其转换为 Flink 支持的数据格式,然后将增量数据同步到 Flink 或者其他的计算引擎中进行处理。通过 Flink 的强大计算能力,可以对增量数据进行各种计算,从而实现实时数据处理和分析的功能。

优缺点比较

优点:

  • 能监听多种数据源:MySQL、Oracle、PgSQL等;
  • 支持流式处理,可以实现数据的实时处理和分析;
  • 支持增量更新,可以实现数据的实时同步;
  • 支持容错处理,可以实现数据的高可靠性;

缺点:

  • 对Oracle支持不太友好,需要将开启归档日志,并且部分字段解析需要了解其语义;
  • 对于大表的查询性能较差;
  • 对于大规模数据的处理效率较低;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/55676.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

37.RESTful

RESTful RESTful简介 REST&#xff1a;Representational State Transfer&#xff0c;表现层资源状态转移 资源&#xff1a;资源是一种看待服务器的方式&#xff0c;即——将服务器看作是由很多离散的资源组成。每个资源是服务器上一个可命名的抽象概念。因为资源是一个抽象的…

Springboot使用kafka事务-生产者方

前言 在上一篇文章中&#xff0c;我们使用了springboot的AOP功能实现了kafka的分布式事务&#xff0c;但是那样实现的kafka事务是不完美的&#xff0c;因为请求进来之后分配的是不同线程&#xff0c;但不同线程使用的kafka事务却是同一个&#xff0c;这样会造成多请求情况下的…

QT ListQvector at赋值出错以及解决办法 QT基础入门【QT存储结构】

1、问题 error: passing const QString as this argument discards qualifiers error: assignment of read-only location vec.QVector<int>::at(0) 在Qt中QList,Qvector一般获取元素都是通过at(index)来获取,但是at()的返回是一个const & 常引用,也就是元素不支…

[shell] $@ 与 eval

$ 与 $* $的用法以及与$*的区别可以参考 What does $ mean in a shell script? 测试脚本如下 #!/bin/bashset -e set -xbash -c $ bash -c "$" bash -c $* bash -c "$*"一般情况下&#xff0c;我们可能会这样执行脚本 ./test.sh Y324 X1 echo hello输…

微信小程序canvas type=2d生成海报保存到相册、文字换行溢出显示...、文字删除线、分享面板

一、简介 做个简单的生成二维码海报分享&#xff0c;我做的时候也找简单的方法看能不能实现页面直接截图那种生成图片&#xff0c;原生小程序不支持&#xff0c;不多介绍下面有全部代码有注释、参数自行替换运行看看&#xff0c;还有需要优化的地方&#xff0c;有问题可以咨询…

squid服务器

目录 squid初识 安装squid代理 常用命令 主要配置文件 正向代理 环境配置 linux服务器设置 windows客户端设置 反向代理 环境配置 在web服务器配置服务 linux服务器配置 squid初识 含义&#xff1a;squid cache是一个流行的自由软件&#xff08;GNU通用公共许可证…

低代码与低代码平台的概念解析

随着数字化转型和软件需求的不断增长&#xff0c;传统的手写代码开发方式已经无法满足迅速推出应用程序的需求。为了加快软件开发的速度并降低技术门槛&#xff0c;低代码开发模式应运而生。本文将介绍低代码的概念&#xff0c;探讨什么是低代码什么是低代码平台&#xff1f; 一…

poi带表头多sheet导出

导出工具类 package com.hieasy.comm.core.excel;import com.hieasy.comm.core.excel.fragment.ExcelFragment; import com.hieasy.comm.core.utils.mine.MineDateUtil; import org.apache.poi.hssf.usermodel.*; import org.apache.poi.ss.usermodel.*; import org.apache.po…

Git基本操作(Idea版)

第一次发布项目&#xff08;本地->远程&#xff09; 方式一 通过push的方式推送本地库到远程库&#xff08;远程已创建好仓库&#xff09; 这种方式需要提前创建好仓库。 右键点击项目&#xff0c;可以将当前分支的内容 push 到 GitHub 的远程仓库中。 注意&#xff1a…

Readl DOM (真实DOM) 和 Virtual DOM (虚拟DOM)之间的区别,以及优缺点

文章目录 一、Readl DOM (真实DOM) 和 Virtual DOM (虚拟DOM)之间的区别&#xff1f;二、优点缺点1.优点2.缺点 一、Readl DOM (真实DOM) 和 Virtual DOM (虚拟DOM)之间的区别&#xff1f; 两者之间的区别&#xff1a; 1、真实DOM&#xff1a; 真实DOM是浏览器中的实际DOM结构…

CSDN编程题-每日一练(2023-08-27)

CSDN编程题-每日一练&#xff08;2023-08-27&#xff09; 一、题目名称&#xff1a;异或和二、题目名称&#xff1a;生命进化书三、题目名称&#xff1a;熊孩子拜访 一、题目名称&#xff1a;异或和 时间限制&#xff1a;1000ms内存限制&#xff1a;256M 题目描述&#xff1a; …

css3滤镜属性filter让网页变黑白

今天是特殊的日子&#xff0c;抗击疫情全国哀悼日&#xff0c;向英雄们致敬&#xff0c;一路走好&#xff01;应该发现了今天很多网站页面都是黑白色的&#xff0c;我的博客今天都是黑白色&#xff0c;用css3滤镜属性filter让网页马上变黑白&#xff0c;一行代码就搞定。 在你…

R包开发-2.1:在RStudio中使用Rcpp制作R-Package(更新于2023.8.23)

目录 0-前言 1-在RStudio中创建R包项目 2-创建R包 2.1通过R函数创建新包 2.2在RStudio通过菜单来创建一个新包 2.3关于R包创建的说明 3-添加R自定义函数 4-添加C函数 0-前言 目标&#xff1a;在RStudio中创建一个R包&#xff0c;这个R包中包含C函数&#xff0c;接口是Rc…

c++中的const与constexpt的区别

c中的const与constexpr的区别 const const 是一种修饰符&#xff0c;用于声明一个只读的常量。它可以用于变量、函数参数和函数返回类型。声明为 const 的变量的值在初始化后就不能再改变。 适用场景&#xff1a; const 适用于声明运行时常量&#xff0c;在编译时无法确定值…

如何使用HTML5新增的标签来构建语义化的页面结构?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ <header>&#xff1a;⭐ <nav>&#xff1a;⭐ <main>&#xff1a;⭐ <section>&#xff1a;⭐ <article>&#xff1a;⭐ <aside>&#xff1a;⭐ <footer>&#xff1a;⭐ <figure> 和 &l…

剑指Offer --- 字符串篇

剑指Offer — 字符串篇 — 剑指的题解K神已经写的已经非常详细了&#xff0c;并且Github上开源的电子书目前热度也非常高&#xff0c;这个12天12个模块系列就当作自己的秋招刷题汇总了&#xff0c;欢迎大家交流。 剑指 Offer 05. 替换空格 思路 **(线性扫描) ** O(n) 这个…

登录校验-Filter-详解

目录 执行流程 拦截路径 过滤器链 小结 执行流程 过滤器Filter拦截到请求之后&#xff0c;首先执行方放行之前的逻辑&#xff0c;然后执行放行操作&#xff08;doFilter&#xff09;&#xff0c;然后会访问对应的Web资源&#xff08;对应的Controller类&#xff09;&#…

【图像分类】基于卷积神经网络和主动学习的高光谱图像分类(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Web应用登录验证的几种方式

一、SessionCookie登录 传统的sessioncookie登录是一种有状态 的登录 1、传统的sessioncookie 流程 浏览器登录发送账号和密码&#xff0c;服务端查找数据库验证用户验证成功后&#xff0c;服务端把用户状态&#xff08;登录状态&#xff0c;角色&#xff0c;权限等信息&…

【python】jupyter notebook导出pdf和pdf不显示中文问题

文章目录 写在前面1. 使用jupyter notebook导出pdf1.1 安装Pandoc1.2 安装MiKTex1.3 示例导出pdf 2. 中文显示问题2.1 显示中文问题示例2.2 解决办法1&#xff1a;修改tex2.3 解决办法2&#xff1a;修改内置文件 写在前面 使用jupyter notebook导出pdf时&#xff0c;出现了一些…