Debezium 同步 MySQL 实时数据并解决数据重复消费问题

        我们使用 Debezium 实时同步一个 MySQL 的数据到另一个 MySQL,代码网上基本都有,都是在引入 debezium-api,debezium-embedded 后写 Java 代码,做好了基本配置后启动程序,Debezium 会自动读取 MySQL 的实时 binlog,然后触发相应的事件让我们处理,我们就把事件里的数据读取出来,插入到目标库即可。我们的 MySQL 的版本是 5.7 。

        但我们在其中发现了一个很奇怪的问题,目标库存在多个相同的 sql ,我们以为是 Debezium 重复消费了 binlog 里的事件,就记录下每个事件的 position 并判重,但 sql 还是重复了,我们一开始觉得 MySQL 写的 binlog 肯定没问题,一个事务对应一个事件。之后我们使用 binlog2sql 这个 python 工具读取了已归档的 binlog 文件,发现里面没有重复的 sql ,这说明 MySQL binlog 还是没有问题的,问题在 Debezium,但 Debezium 作为一个成熟的 cdc 工具应该也不会有什么大的问题,可能是 Debezium 的配置问题,但检查了 Debezium 的所有配置后还是没发现有什么问题,配置改了后重新运行结果还是一样。 后面我们怀疑可能和 gtid 有关,我们发现 “Insert into xxx values (xxx) ” 会产生一个 binlog 事件,因为一个事务会产生一个 binlog 事件,但 “Insert into xxx values (xxx),(xxx),(xxx)...” 会产生多个事件,但这些事件的 gtid 还是同一个,事件里的 query 属性还是同一个,事件的 query 属性即原始 sql ,这就破案了,我们一直消费每个事件的query,但可能多个事件里的 query 属性是一样的,因为它们的 gtid 属性相同,它们属于同一个全局事务。后面我们使用 gtid 过滤相同属性就解决了数据重复问题。至于为什么一个批量插入会产生一个多个事件,并且多个事件的 gtid 是同一个,我们猜测 MySQL  的 binlog 就是这样写日志的,修改一行数据就产生一个事件,要是批量修改就产生多个事件,但这些批量事件同属于一个全局事务。

        怎么过滤重复 gtid 问题?因为 gtid 是递增的,相同的 gtid 都会一起出现,所以可以使用自动老元素的 Map,或是设置键过期的 redis,或是 带有 gtid 属性的数据库表,并设置它是唯一索引,或是插入数据之前先检查数据库里是否有本事件的gtid,有就跳过,没有就插入,并把这个过程加锁保证原子性。

        核心代码:

// 启动
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(config) .notifying(DataSync::handleChangeEvent).build();ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);
private static void handleChangeEvent(ChangeEvent<String, String> event) {JSONObject valueJson = JSON.parseObject(event.value());if (valueJson != null) {JSONObject payload = valueJson.getJSONObject("payload");JSONObject source = payload.getJSONObject("after");// 原始sqlString query = source.getString("data_definition");// 对 sql 字符串进行美化query = query.replaceAll("[\\n\\r\\t\\s]+", " ");String database = source.getString("database");String table = source.getString("table_name");String gtid = source.getString("gtid");synchronized (lock) {// 查询数据库该 gtid 的数量long cnt = queryGtid(gtid);if (cnt == 0) {// 如果数据库不存在该 sql 就插入save(query, database, table, gtid);} else {System.out.println(gtid + " 有重复");}} }}

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/33947.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

断言提供了哪些方法

Java中的断言机制主要用于开发和测试阶段&#xff0c;以确保代码符合预期的行为。 除了assert关键字&#xff0c;Java标准库中的java.util.Objects类提供了几个实用方法&#xff0c;可以辅助实现更复杂的断言逻辑&#xff1a; Objects.requireNonNull(Object obj, String mess…

【Python机器学习】聚类算法简述

聚类的应用和评估时一个非常定性的过程&#xff0c;通常在数据分析的探索阶段很有帮助。 主要有三种聚类算法&#xff1a;k均值、DBSCAN、凝聚聚类。 这三种算法都可以控制聚类的粒度&#xff1a;k均值和凝聚聚类允许指定想要的簇的数量&#xff0c;而DBSCAN允许你用eps参数定…

SQLAlchemy:原理与使用详解

文章目录 引言SQLAlchemy的原理1. 对象关系映射&#xff08;ORM&#xff09;2. 引擎、会话和元数据3. 查询构造 SQLAlchemy的使用1. 安装SQLAlchemy2. 创建数据库连接和引擎3. 定义模型4. 创建表5. 操作数据库6. 关闭会话 结论 引言 在Python编程环境中&#xff0c;数据库操作…

【复旦邱锡鹏教授《神经网络与深度学习公开课》笔记】卷积

卷积经常用在信号处理中&#xff0c;用于计算信号的延迟累积。假设一个信号发射器每个时刻 t t t产生一个信号 x t x_t xt​&#xff0c;其信息的衰减率为 w k w_k wk​&#xff0c;即在 k − 1 k-1 k−1个时间步长后&#xff0c;信息为原来的 w k w_k wk​倍&#xff0c;时刻 …

SpringBoot开启事务日志

一般框架开启日志的方式&#xff1a; 开启某个包下的日志就写该包路径&#xff0c;开启某个类下的日志就写该类路径。

【数据结构】栈的定义与实现(附完整运行代码)

目录 一、栈的定义 二、顺序栈 链栈比较 三、栈的实现&#xff08;顺序栈&#xff09; 3.1 ❥ 定义栈结构 3.2 ❥ 初始化 3.3 ❥ 销毁 3.4 ❥ 插入&#xff08;入栈&#xff09; 3.5 ❥ 删除 &#xff08;出栈&#xff09; 3.6 ❥ 获取栈顶元素 3.7 ❥ 判空 3.8 ❥…

【LeetCode】每日一题:最大子数组和

给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组是数组中的一个连续部分。 解题思路 要注意最小值是整个前缀&#xff0c;主要是cumsum然后按照买卖股票的思路做的&a…

【Android】创建一个可以在屏幕上拖动的悬浮窗

项目需求 在界面上创建一个悬浮窗&#xff0c;可以自由的移动这个悬浮窗 需求解决 1.添加权限 <uses-permission android:name"android.permission.SYSTEM_ALERT_WINDOW"/>2.请求权限 从 Android 6.0 (API 23) 开始&#xff0c;应用需要动态请求显示悬浮窗…

F5《企业DNS建设白皮书》中的DNS解析服务器最佳实践

在这个数字化转型加速的时代&#xff0c;DNS&#xff08;域名系统&#xff09;的重要性不言而喻。每一次重大事件都凸显了DNS的可靠性和安全性问题。对企业而言&#xff0c;它不仅关系到业务连续性&#xff0c;更是提供永续数字服务的关键。本文根据F5公司发布的《企业DNS建设白…

中国4个民族群体的全基因组DNA甲基化变异图谱首次发布

2023年4月&#xff0c;由西北工业大学联合复旦大学等院校在Science China Life Sciences上发表题为“Genome-wide DNA methylation landscape of four Chinese populations and epigenetic variation linked to Tibetan high altitude adaptation”的文章&#xff0c;该研究通过…

【AI编译器】triton学习:编程模型

介绍 动机 在过去十年里&#xff0c;深度神经网络 (DNNs) 已成为机器学习 (ML) 模型的一个重要分支&#xff0c;能够实现跨领域多种应用中的最佳性能。这些模型由一系列包括参数化&#xff08;如滤波器&#xff09;和非参数化&#xff08;如缩小值函数&#xff09;元件组成的…

Android | 性能优化 之 TraceView工具的使用

上代码&#xff01; 先加权限&#xff1a; <uses-permission android:name"android.permission.WRITE_EXTERNAL_STORAGE"/> <uses-permission android:name"android.permission.MOUNT_UNMOUNT_FILESYSTEMS"/> 选择跟踪范围,在开始追踪和结束…

axios常用的请求方式

Axios 是一个基于 Promise 的 HTTP 客户端&#xff0c;用于浏览器和 Node.js 环境。它允许开发者以简单和直观的方式发送HTTP 请求。 Axios 支持所有标准的 HTTP 请求方法&#xff0c;以下是一些常用的请求方式&#xff1a; GET&#xff1a;用于从服务器获取数据。POST&#x…

景联文科技构建高质量多轮对话数据库,赋能AI交互新飞跃

近年来&#xff0c;大语言模型的发展极大推动了自然语言处理领域的进步&#xff0c;大语言模型正引领智能对话领域进入一个全新时代&#xff0c;不仅提升了对话体验的自然度和效率&#xff0c;也为探索更加人性化、智能化的交互方式开辟了道路。 景联文科技作为大语言模型数据服…

node.js 离线实时语音识别

前言 在node.js实现语音实时转文字。获取麦克风实时语音转文字。 下面是用vosk的效果。注意踩坑要及时评论哦&#xff0c;坑还是挺多的。 在探索后发现本地模型对设备还是有一定要求的&#xff0c;最总无奈采用百度语音识别的方案。 探索结果分享给大家&#xff0c;希望能在项…

AI视频教程下载-定制GPT:使用您的数据创建一个定制聊天GPT

Custom GPTs_ Create a Custom ChatGPT with Your Data 构建一个定制的GPT&#xff0c;与您自己的数据进行聊天。添加文档&#xff0c;生成图像&#xff0c;并集成API和Zapier。 这门全面的Udemy课程专为那些渴望学习如何创建自己定制版ChatGPT的人设计&#xff0c;以满足他们…

GORM 使用指南

1. 介绍 1.1 什么是 GORM&#xff1f; GORM&#xff08;Go Object Relational Mapper&#xff09;是一个用于 Go 语言的 ORM 库&#xff0c;它允许开发者通过面向对象的方式操作数据库&#xff0c;而不必直接编写 SQL 查询语句。GORM 提供了简单易用的 API&#xff0c;使得在…

【五子棋game】

编写一个五子棋游戏程序可以分为几个步骤&#xff1a;设计棋盘、定义规则、实现人机交互、判断胜负。下面是一个简化的五子棋游戏程序示例&#xff0c;使用Python语言编写。 首先&#xff0c;我们需要一个棋盘。可以使用一个二维数组来表示棋盘&#xff0c;其中0表示空位&#…

jstack的火焰图使用说明

1、jstack的官方文档说明 How to use Flame Graph? - Fast thread 2、jstack的文件分析网站&#xff0c;可以关注cpu消耗比较高的线程和火焰图 GC log analysis error

AI人工智能发展的技术路线差异对比分析报告

1. 机器学习与深度学习 机器学习和深度学习是AI领域的两种主要技术路线。机器学习是一种数据驱动的方法,通过训练模型来识别模式和规律。深度学习是机器学习的一个子领域,主要关注神经网络模型,特别是深度神经网络。深度学习在图像识别、语音识别等领域取得了显著的成果,但…