Flink CDC数据同步

背景

随着信息化程度的不断提高,企业内部系统的数量和复杂度不断增加,因此,数据库系统的同步问题已成为越来越重要的问题。

缓存失效

在缓存中缓存的条目(entry)在源头被更改或者被删除的时候立即让缓存中的条目失效。如果缓存在一个独立的进程中运行(例如Redis,Memcache),那么简单的缓存失效逻辑可以放在独立的进程或服务中,从而简化主应用的逻辑。在一些场景中,缓存失效逻辑可以更复杂一点,让它利用更改事件中的更新数据去更新缓存中受影响的条目。

简化单体应用

许多应用更新数据库,然后在数据库中的更改被提交后,做一些额外的工作:更新搜索索引,更新缓存,发送通知,运行业务逻辑,等等。这种情况通常称为双写(dual-writes),因为应用没有在一个事务内写多个系统。这样不仅应用逻辑复杂难以维护,而且双写容易丢失数据或者在一些系统更新成功而另一些系统没有更新成功的时候造成不同系统之间的状态不一致。使用捕获更改数据技术(change data capture,CDC),在源数据库的数据更改提交后,这些额外的工作可以被放在独立的线程或者进程(服务)中完成。这种实现方式的容错性更好,不会丢失事件,容易扩展,并且更容易支持升级。

共享数据库

当多个应用共用同一个数据库的时候,一个应用提交的更改通常要被另一个应用感知到。一种实现方式是使用消息总线,尽管非事务性的消息总线总会受上面提到的双写影响。但是,另一种实现方式,变得很直接:每个应用可以直接监控数据库的更改,并且响应更改。

数据集成

数据通常被存储在多个地方,尤其是当数据被用于不同的目的的时候,会有不同的形式。保持多系统的同步是很有挑战性的,但是可以通过使用数据同步工具加上简单的事件处理逻辑来实现简单的ETL类型的解决方案。

命令查询职责分离

在命令查询职责分离 [Command Query Responsibility Separation (CQRS)]架构模式中,更新数据使用了一种数据模型,读数据使用了一种或者多种数据模型。由于数据更改被记录在更新侧(update-side),这些更改将被处理以更新各种读展示。所以CQRS应用通常更复杂,尤其是他们需要保证可靠性和全序(totally-ordered)处理。Debezium和CDC可以使这种方式更可行:写操作被正常记录,但是Debezium捕获数据更改,并且持久化到全序流里,然后供那些需要异步更新只读视图的服务消费。写侧(write-side)表可以表示面向领域的实体(domain-oriented entities),或者当CQRS和 Event Sourcing 结合的时候,写侧表仅仅用做追加操作命令事件的日志。

Flink CDC

CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink ®的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。

image-20230827094534219

数据抓取

FlinkCDC 使用 MySQL 的 binlog 技术进行数据抓取。binlog 是 MySQL 用于记录数据库变更操作的日志,包括对表的增删改操作。FlinkCDC 通过对 binlog 进行解析和读取,得到最新的增量数据,并将其转换为 Flink 支持的数据格式,如 Avro 或 JSON。

如下代码可以帮我们监听数据库的变更日志:

JdbcIncrementalSource<String> oracleChangeEventSource =new OracleSourceBuilder().hostname("host").port(1521).databaseList("XE").schemaList("DEBEZIUM").tableList("DEBEZIUM.PRODUCTS").username("username").password("password").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true) // output the schema changes as well.startupOptions(StartupOptions.initial()).debeziumProperties(debeziumProperties).splitSize(2).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000L);// set the source parallelism to 4env.fromSource(oracleChangeEventSource,WatermarkStrategy.noWatermarks(),"OracleParallelSource").setParallelism(4).print().setParallelism(1);env.execute("Print Oracle Snapshot + RedoLog");

数据同步

FlinkCDC 将抓取到的增量数据同步到 Flink 或者其他的计算引擎中进行处理。同步方式有两种:

pull 模式:FlinkCDC 在启动时会向 MySQL 中的某个位置开始读取 binlog,然后通过一个 HTTP 接口将增量数据暴露给 Flink。Flink 每隔一段时间就会调用该接口拉取增量数据。

push 模式:FlinkCDC 将增量数据通过一个 Kafka Topic 推送给 Flink。Flink 在消费 Kafka Topic 时,就可以直接消费到增量数据。

监听到数据变动,能拿到变更前后的数据对比,经过Sink数据转换成相应的INSERT、UPDATE、DELETE等相关SQL语句,并同步到目标数据库。

public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println("监听到活动数据:" + LocalDateTime.now() + value);JSONObject jsonObject = JSONObject.parseObject(value);User before = jsonObject.getObject("before", User.class);User after = jsonObject.getObject("after", User.class);try {String table = jsonObject.getJSONObject("source").getString("table");SqlParse sqlParse = new SqlParse();String executeSQL = "";if(before == null){// 插入executeSQL = sqlParse.getInsert(after,table);}else if(after == null){// 删除executeSQL = sqlParse.getDeleteSQL(before,table);}else{// 更新executeSQL = sqlParse.getUpdateSQL(before,after,table);}SpringJDBC.executeSQL(executeSQL);}catch (Exception e){System.out.println("执行错误");}}}

通用形SqlParse只能解析同构数据,异构数据需要单独处理。

增量数据的解析和处理

FlinkCDC 将抓取到的增量数据转换为 Flink 支持的数据格式后,交由 Flink 进行进一步的处理。Flink 可以对数据进行各种运算,如聚合、过滤、变换等,最终将处理结果输出到其他的存储介质中。

总的来说,FlinkCDC 的原理就是通过解析 MySQL 中的 binlog,抓取到最新的增量数据,并将其转换为 Flink 支持的数据格式,然后将增量数据同步到 Flink 或者其他的计算引擎中进行处理。通过 Flink 的强大计算能力,可以对增量数据进行各种计算,从而实现实时数据处理和分析的功能。

优缺点比较

优点:

  • 能监听多种数据源:MySQL、Oracle、PgSQL等;
  • 支持流式处理,可以实现数据的实时处理和分析;
  • 支持增量更新,可以实现数据的实时同步;
  • 支持容错处理,可以实现数据的高可靠性;

缺点:

  • 对Oracle支持不太友好,需要将开启归档日志,并且部分字段解析需要了解其语义;
  • 对于大表的查询性能较差;
  • 对于大规模数据的处理效率较低;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/55676.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序canvas type=2d生成海报保存到相册、文字换行溢出显示...、文字删除线、分享面板

一、简介 做个简单的生成二维码海报分享&#xff0c;我做的时候也找简单的方法看能不能实现页面直接截图那种生成图片&#xff0c;原生小程序不支持&#xff0c;不多介绍下面有全部代码有注释、参数自行替换运行看看&#xff0c;还有需要优化的地方&#xff0c;有问题可以咨询…

squid服务器

目录 squid初识 安装squid代理 常用命令 主要配置文件 正向代理 环境配置 linux服务器设置 windows客户端设置 反向代理 环境配置 在web服务器配置服务 linux服务器配置 squid初识 含义&#xff1a;squid cache是一个流行的自由软件&#xff08;GNU通用公共许可证…

低代码与低代码平台的概念解析

随着数字化转型和软件需求的不断增长&#xff0c;传统的手写代码开发方式已经无法满足迅速推出应用程序的需求。为了加快软件开发的速度并降低技术门槛&#xff0c;低代码开发模式应运而生。本文将介绍低代码的概念&#xff0c;探讨什么是低代码什么是低代码平台&#xff1f; 一…

poi带表头多sheet导出

导出工具类 package com.hieasy.comm.core.excel;import com.hieasy.comm.core.excel.fragment.ExcelFragment; import com.hieasy.comm.core.utils.mine.MineDateUtil; import org.apache.poi.hssf.usermodel.*; import org.apache.poi.ss.usermodel.*; import org.apache.po…

Git基本操作(Idea版)

第一次发布项目&#xff08;本地->远程&#xff09; 方式一 通过push的方式推送本地库到远程库&#xff08;远程已创建好仓库&#xff09; 这种方式需要提前创建好仓库。 右键点击项目&#xff0c;可以将当前分支的内容 push 到 GitHub 的远程仓库中。 注意&#xff1a…

CSDN编程题-每日一练(2023-08-27)

CSDN编程题-每日一练&#xff08;2023-08-27&#xff09; 一、题目名称&#xff1a;异或和二、题目名称&#xff1a;生命进化书三、题目名称&#xff1a;熊孩子拜访 一、题目名称&#xff1a;异或和 时间限制&#xff1a;1000ms内存限制&#xff1a;256M 题目描述&#xff1a; …

R包开发-2.1:在RStudio中使用Rcpp制作R-Package(更新于2023.8.23)

目录 0-前言 1-在RStudio中创建R包项目 2-创建R包 2.1通过R函数创建新包 2.2在RStudio通过菜单来创建一个新包 2.3关于R包创建的说明 3-添加R自定义函数 4-添加C函数 0-前言 目标&#xff1a;在RStudio中创建一个R包&#xff0c;这个R包中包含C函数&#xff0c;接口是Rc…

如何使用HTML5新增的标签来构建语义化的页面结构?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ <header>&#xff1a;⭐ <nav>&#xff1a;⭐ <main>&#xff1a;⭐ <section>&#xff1a;⭐ <article>&#xff1a;⭐ <aside>&#xff1a;⭐ <footer>&#xff1a;⭐ <figure> 和 &l…

登录校验-Filter-详解

目录 执行流程 拦截路径 过滤器链 小结 执行流程 过滤器Filter拦截到请求之后&#xff0c;首先执行方放行之前的逻辑&#xff0c;然后执行放行操作&#xff08;doFilter&#xff09;&#xff0c;然后会访问对应的Web资源&#xff08;对应的Controller类&#xff09;&#…

【图像分类】基于卷积神经网络和主动学习的高光谱图像分类(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Web应用登录验证的几种方式

一、SessionCookie登录 传统的sessioncookie登录是一种有状态 的登录 1、传统的sessioncookie 流程 浏览器登录发送账号和密码&#xff0c;服务端查找数据库验证用户验证成功后&#xff0c;服务端把用户状态&#xff08;登录状态&#xff0c;角色&#xff0c;权限等信息&…

【python】jupyter notebook导出pdf和pdf不显示中文问题

文章目录 写在前面1. 使用jupyter notebook导出pdf1.1 安装Pandoc1.2 安装MiKTex1.3 示例导出pdf 2. 中文显示问题2.1 显示中文问题示例2.2 解决办法1&#xff1a;修改tex2.3 解决办法2&#xff1a;修改内置文件 写在前面 使用jupyter notebook导出pdf时&#xff0c;出现了一些…

31、springboot 配置HTTP服务端口及如何通过WebServer实例动态获取项目中的HTTP端口

配置HTTP服务端口及如何通过WebServer实例动态获取项目中的HTTP端口 ★ 设置HTTP服务端口&#xff1a; - server.port或者SERVER_PORT环境变量——总结来说&#xff0c;其实就是要配置server.port外部配置属性。▲ 同样遵守如下优先级&#xff1a; 这些都是外部配置源&#x…

Win 11 电脑的 Win + E 快捷键失效

报的错误信息如下&#xff1a; 该文件没有与之关联的应用来执行该操作。请安装应用&#xff0c;若已经安装应用&#xff0c;请在”默认应用设置"页面中创建关联。 报错原因&#xff1a;系统注册表被改写了导致的出错 解决办法&#xff1a; 1、首先&#xff0c;按键盘上…

python编程环境使用技巧2-python环境迁移

Python环境迁移步骤 将Python环境从一个计算机迁移到另一个计算机可以按照以下步骤进行&#xff1a; 1-备份环境&#xff1a; 在源计算机上&#xff0c;使用pip工具备份当前Python环境的包列表到一个文本文件。在命令行终端中执行以下命令&#xff1a; pip freeze > requi…

【韩顺平 零基础30天学会Java】数组、排序和查找(2days)

数组、排序、查找和多维数组 数组可以存放多个同一类型的数据。数组也是一种数据类 型&#xff0c;是引用数据类型。 定义一个数组 double[] hens {3,5,1,3.4,2,50} 遍历数组得到数组所有元素的和 hens[下标]&#xff0c;下标是从0开始编号的。 可以通过数组名.lenght得到数组…

【数据库】使用ShardingSphere+Mybatis-Plus实现读写分离

书接上回&#xff1a;数据库调优方案中数据库主从复制&#xff0c;如何实现读写分离 ShardingSphere 实现读写分离的方式是通过配置数据源的方式&#xff0c;使得应用程序可以在执行读操作和写操作时分别访问不同的数据库实例。这样可以将读取操作分发到多个从库&#xff08;从…

详细手机代理IP配置

嗨&#xff0c;亲爱的朋友们&#xff01;作为一家代理产品供应商&#xff0c;我知道有很多小伙伴在使用手机进行网络爬虫和数据采集时&#xff0c;常常会遇到一些IP限制的问题。别担心&#xff01;今天我要给大家分享一下手机IP代理的设置方法&#xff0c;让你们轻松应对这些限…

vue2 路由入门

一、单页应用程序介绍 1.概念 单页应用程序&#xff1a;SPA【Single Page Application】是指所有的功能都在一个html页面上实现 2.具体示例 单页应用网站&#xff1a; 网易云音乐 https://music.163.com/ 多页应用网站&#xff1a;京东 https://jd.com/ 3.单页应用 VS 多页…

PostgreSQL命令行工具psql常用命令

1. 概述 通常情况下操作数据库使用图形化客户端工具&#xff0c;在实际工作中&#xff0c;生产环境是不允许直接连接数据库主机&#xff0c;只能在跳板机上登录到Linux服务器才能连接数据库服务器&#xff0c;此时就需要使用到命令行工具。psql是PostgreSQL中的一个命令行交互…