Maxwell 底层原理 详解

        Maxwell 是一个 MySQL 数据库的增量数据捕获(CDC, Change Data Capture)工具,它通过读取 MySQL 的 binlog(Binary Log)来捕获数据变化,并将这些变化实时地发送到如 Kafka、Kinesis、RabbitMQ 或其他输出端。Maxwell 允许用户捕捉到 INSERT、UPDATE、DELETE 等操作的记录,并将其以 JSON 格式发送到下游系统,用于数据同步、分析、实时监控等应用场景。

        要详细解释 Maxwell 的底层原理及源代码,我们需要从 MySQL binlog 的工作机制、Maxwell 如何解析 binlog、内部架构的各个核心组件、事件处理机制等多方面进行深入解析。

1. MySQL binlog 工作原理

        MySQL 的 binlog 是记录数据库事务性和非事务性数据变化的二进制日志文件,所有的 INSERT、UPDATE、DELETE 以及对表结构的更改操作(如 ALTER TABLE)都会写入 binlog 中。这使得 binlog 成为数据库增量数据捕获的重要来源。

binlog 具有两种格式:

  • ROW 格式:记录每一行的数据变化,捕捉到行级别的增删改操作。
  • STATEMENT 格式:记录 SQL 语句本身的执行。
  • MIXED 格式:结合了 ROW 和 STATEMENT 两种格式。

➢ 三种格式的区别:

◼ statement

语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如update test set create_date=now();如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。优点:  节省空间   缺点:  有可能造成数据不一致。

◼ row

行级,  binlog 会记录每次操作后每行记录的变化。优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。缺点:占用较大空间。

◼ mixed

混合级别,statement  的升级版,一定程度上解决了 statement 模式因为一些情况而造成的数据不一致问题。默认还是 statement,在某些情况下,譬如:当函数中包含  UUID()  时;包含  AUTO_INCREMENT  字段的表被更新时;执行  INSERT DELAYED  语句时;用  UDF  时;会按照  ROW 的方式进行处理  优点:节省空间,同时兼顾了一定的一致性。缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 监控的情况都不方便。

        Maxwell 依赖的是 ROW 格式,因为 ROW 格式可以直接获取到数据变化的细节,如具体哪一行数据发生了修改,这对于实时的数据同步和分析非常关键。

2. Maxwell 架构与工作流程

Maxwell 的架构可以概括为以下几个部分:

  1. Binlog Position 监控:Maxwell 会从 MySQL 的 binlog 文件中读取增量变化事件,且会记录当前读取到的 binlog 文件的位置(position),以保证在 Maxwell 重启后能够继续从上次的位置读取。
  2. Binlog 解析:Maxwell 通过解析 MySQL 的 binlog 文件来获取数据的变化详情(包括表名、列值、操作类型等)。
  3. 事件处理器(Event Processor):解析后的 binlog 数据会通过 Maxwell 的事件处理器进行处理,并转换为 JSON 格式。
  4. 输出适配器(Producer Adapter):Maxwell 支持将处理后的数据发送到多个目标输出(如 Kafka、Kinesis 等)。
2.1 核心组件

Maxwell 的底层工作机制由以下几个核心组件协同实现:

  1. BinlogConnectorReplicator

    • 负责与 MySQL 进行通信并获取 binlog 数据。
    • 使用 MySQL Binary Log Client Library 实现 binlog 的读取和消费。Maxwell 通过 BinlogConnectorReplicator 连接 MySQL,获取实时的 binlog 数据。
  2. BinlogParser

    • 负责将二进制格式的 binlog 转换为可理解的事件对象。
    • 它解析 ROW 格式的 binlog 并将其转换为 Maxwell 可以处理的内部事件对象(如 Insert、Update、Delete 事件)。
  3. MaxwellContext

    • 管理 Maxwell 的运行状态,包括当前的 binlog position、错误处理、断点续传等。
    • MaxwellContext 还负责维护 Maxwell 的元数据(如表结构缓存、上次处理的 binlog 位置等),以保证数据的一致性和容错性。
  4. MaxwellReplicator

    MaxwellReplicator 是系统的核心执行器,它从 BinlogConnectorReplicator 获取 binlog 数据,并通过 BinlogParser 解析这些数据,生成 RowMap 对象(用于描述数据变化)。
  5. RowMap

    RowMap 是 Maxwell 对数据变更的内部抽象,它将 binlog 中的行变化转化为键值对的形式,包含了表名、数据库名、操作类型(insert、update、delete)以及具体的行数据。
  6. Producer

    • Producer 是事件发布器,它负责将处理过的事件推送到外部系统(如 Kafka、Kinesis、文件等)。
    • Producer 将 RowMap 转换为 JSON 格式并将其发送至指定的输出端。
2.2 事件流处理流程

Maxwell 的数据流处理可以分为以下几个步骤:

  1. 读取 binlog:Maxwell 通过 BinlogConnectorReplicator 从 MySQL binlog 中读取最新的事件。
  2. 解析 binlogBinlogParser 将 binlog 的二进制数据解析为内部事件对象(如 InsertUpdateDelete 事件)。
  3. 生成事件对象:解析后的 binlog 事件会被封装为 RowMap 对象,RowMap 中包含了数据库名、表名、操作类型、变更的数据行内容。
  4. 事件发布:通过 Producer,Maxwell 将 RowMap 转换为 JSON 格式,并发送到外部系统,如 Kafka、Kinesis 等。

    格式数据举例


    json 字段的说明:

    字段

    解释

    database

    变更数据所属的数据库

    table

    表更数据所属的表

    type

    数据变更类型

    ts

    数据变更发生的时间

    xid

    事务id

    commit

    事务提交标志,可用于重新组装事务

    data

    对于insert类型,表示插入的数据;对于update类型,标识修改之后的数据;对于delete类型,表示删除的数据

    old

    对于update类型,表示修改之前的数据,只包含变更字段

3. 源代码分析

为了更详细地解释 Maxwell 的工作原理,接下来分析其核心类的部分源代码。

3.1 BinlogConnectorReplicator(binlog 读取器)

        BinlogConnectorReplicator 是 Maxwell 通过 binlog client 读取 MySQL binlog 数据的核心组件。它负责通过 MySQL Replication 协议从 MySQL 实例拉取 binlog 事件。

public class BinlogConnectorReplicator extends AbstractReplicator {private BinaryLogClient client;private MaxwellFilter filter;public BinlogConnectorReplicator(MaxwellContext context, Position startPosition) throws Exception {super(context);this.client = new BinaryLogClient(context.getConfig().mysqlHost,context.getConfig().mysqlPort,context.getConfig().mysqlUser,context.getConfig().mysqlPassword);// 设置监听器处理 binlog 事件client.registerEventListener(this::handleEvent);}public void start() throws IOException {// 启动客户端开始从 binlog 中获取数据client.connect();}private void handleEvent(Event event) {// 处理 binlog 事件// 将 event 转换为 Maxwell 的 RowMap 对象}
}

在上面的代码中:

  • BinaryLogClient 是用来与 MySQL binlog 进行通信的核心类,它会与 MySQL 建立连接并监听 binlog 的变化。
  • handleEvent 方法会被 MySQL binlog 的事件触发,当 binlog 中有新事件时,该方法会被调用,将事件处理并转换为 Maxwell 的内部对象。
3.2 BinlogParser(binlog 解析器)

        BinlogParser 负责将从 binlog 中获取的二进制事件解析为 Maxwell 可以理解的对象。对于每个 binlog 事件,都会转换为相应的 RowMap 对象。

public class BinlogParser {public RowMap parse(Event event) {EventType type = event.getHeader().getEventType();// 根据 binlog 事件类型处理不同的操作switch (type) {case WRITE_ROWS:return handleInsertEvent(event);case UPDATE_ROWS:return handleUpdateEvent(event);case DELETE_ROWS:return handleDeleteEvent(event);default:return null;}}private RowMap handleInsertEvent(Event event) {// 解析 insert 事件,将其封装为 RowMap}private RowMap handleUpdateEvent(Event event) {// 解析 update 事件,将其封装为 RowMap}private RowMap handleDeleteEvent(Event event) {// 解析 delete 事件,将其封装为 RowMap}
}

在 BinlogParser 中:

  • parse 方法会根据事件类型(如 WRITE_ROWSUPDATE_ROWSDELETE_ROWS)调用对应的处理方法,将事件转换为 RowMap
  • RowMap 是用于描述数据变化的核心对象,包含了具体的数据变化信息。
3.3 RowMap(事件描述对象)

        RowMap 是 Maxwell 中的核心数据结构,负责存储解析后的 binlog 数据。它包含了数据库名、表名、操作类型(如 insert、update、delete)以及具体的列值数据。

public class RowMap {private String database;private String table;private String type; // insert, update, deleteprivate Map<String, Object> data;public RowMap(String database, String table, String type) {this.database = database;this.table = table;this.type = type;this.data = new HashMap<>();}public void putData(String column, Object value) {data.put(column, value);}public String toJSON() {// 将 RowMap 转换为 JSON 字符串}
}

在 RowMap 中:

  • database 和 table 表示数据变更的数据库和表。
  • type 表示操作类型(INSERT、UPDATE、DELETE)。
  • data 是存储行变化数据的键值对映射(列名 -> 值)。
3.4 Producer(事件发布器)

        Producer 负责将处理好的事件(即 RowMap)发送到外部系统,如 Kafka 或 Kinesis。Maxwell 提供了多种 Producer 实现,用户可以选择适合自己需求的 Producer。

public class KafkaProducer extends AbstractProducer {private org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer;public KafkaProducer(MaxwellContext context) {Properties props = new Properties();props.put("bootstrap.servers", context.getConfig().kafkaBootstrapServers);this.kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);}@Overridepublic void push(RowMap r) {String topic = getKafkaTopic(r);String key = r.getPrimaryKey();String value = r.toJSON();kafkaProducer.send(new ProducerRecord<>(topic, key, value));}
}

在 KafkaProducer 中:

  • push 方法将 RowMap 对象转换为 JSON 格式,并发送到指定的 Kafka topic。

4. Maxwell 高级特性

  1. Schema 变更捕获:Maxwell 也能够捕捉 MySQL 表结构的变化(如 ALTER TABLE),它维护了一份 schema 的缓存,以便解析 binlog 事件时能够正确映射列与值。

  2. 断点续传:Maxwell 记录并维护 binlog 的位置,当服务重启或崩溃时,能够从上次停止的位置继续读取,不会丢失任何数据。

  3. 过滤:Maxwell 支持基于数据库和表的过滤,用户可以通过配置文件或命令行参数来指定需要捕获或忽略的数据库和表。

  4. 事务处理:Maxwell 通过 binlog 的事务边界来确保事件的顺序性和一致性,保证在输出端(如 Kafka)消费时,数据的顺序与数据库中的顺序一致。

总结

        Maxwell 是一个轻量级的 MySQL binlog 解析工具,它通过 BinlogConnectorReplicator 连接 MySQL 并获取 binlog 数据,利用 BinlogParser 解析这些二进制日志,将其转化为易于处理的 RowMap 对象,并通过 Producer 发送到外部系统。Maxwell 提供了灵活的输出方式和良好的容错机制,适用于实时数据同步和流式数据处理场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882379.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0x3D service

0x3D service 1. 概念2. Request message 数据格式3. Respone message 数据格式3.1 正响应格式3.2 negative respone codes(NRC)4. 示例4.1 正响应示例:4.2 NRC 示例1. 概念 UDS(统一诊断服务)中的0x3D服务,即Write Memory By Address(按地址写内存)服务,允许客户端向服…

2024年中国工业大模型行业发展研究报告|附43页PDF文件下载

工业大模型伴随着大模型技术的发展&#xff0c;逐渐渗透至工业&#xff0c;处于萌芽阶段。 就大模型的本质而言&#xff0c;是由一系列参数化的数学函数组成的计算系统&#xff0c;且是一个概率模型&#xff0c;其工作机制是基于概率和统计推动进行的&#xff0c;而非真正的理解…

aardio 中最重要的控件:自定义控件使用指南

aardio虽然是个小众编程语言&#xff0c;但其在windows下做个小软件生成exe文件&#xff0c;确实方便。只是这个编程语言的生态圈小&#xff0c;文档的详细程度也完全无法和大的编程语言相提并论。今天介绍一下&#xff0c;aardio中的自定义控件如何使用。 这里我们只介绍如何做…

python 作业1

任务1: python为主的工作是很少的 学习的python的优势在于制作工具&#xff0c;制作合适的工具可以提高我们在工作中的工作效率的工具 提高我们的竞争优势。 任务2: 不换行 换行 任务3: 安装pycharm 进入相应网站Download PyCharm: The Python IDE for data science and we…

AnaTraf | TCP重传的工作原理与优化方法

目录 什么是TCP重传&#xff1f; TCP重传的常见触发原因 TCP重传对网络性能的影响 1. 高延迟与重传 2. 吞吐量的下降 如何优化和减少TCP重传 1. 优化网络设备配置 2. 优化网络链路 3. 网络带宽的合理规划 4. 部署CDN和缓存策略 结语 AnaTraf 网络性能监控系统NPM | …

餐饮店怎么标注地图位置信息?

随着市场竞争的日益激烈&#xff0c;商家若想在竞争中脱颖而出&#xff0c;就必须想方设法去提高自身的曝光度和知名度&#xff0c;为店铺带来更多的客流量。其中&#xff0c;地图标注便是一种简单却极为有效的方法。通过在地图平台上添加店铺位置信息&#xff0c;不仅可以方便…

Qt-系统文件相关介绍使用(61)

目录 描述 输⼊输出设备类 打开/读/写/关闭 使用 先初始化&#xff0c;创建出大致的样貌 输入框设置 绑定槽函数 保存文件 打开文件 提取文件属性 描述 在C/C Linux 中我们都接触过关于文件的操作&#xff0c;当然 Qt 也会有对应的文件操作的 ⽂件操作是应⽤程序必不…

【C语言】文件操作(1)(文件打开关闭和顺序读写函数的万字笔记)

文章目录 一、什么是文件1.程序文件2.数据文件 二、数据文件1.文件名2.数据文件的分类文本文件二进制文件 三、文件的打开和关闭1.流和标准流流标准流 2.文件指针3.文件的打开和关闭文件的打开文件的关闭 四、文件的顺序读写1.fgetc函数2.fputc函数3.fgets函数4.fputs函数5.fsc…

微信小程序上传组件封装uploadHelper2.0使用整理

一、uploadHelper2.0使用步骤说明 uploadHelper.js ---上传代码封装库 cos-wx-sdk-v5.min.js---腾讯云&#xff0c;对象存储封装库 第一步&#xff0c;下载组件代码&#xff0c;放置到自己的小程序项目中 第二步、 创建上传对象&#xff0c;执行选择图片/视频 var _this th…

npm install进度卡在 idealTree:node_global: sill idealTree buildDeps

ping一下源&#xff1a;ping http://registry.npm.taobao.org/ ping不通&#xff0c;原因&#xff1a;原淘宝npm永久停止服务&#xff0c;已更新新域名~~震惊&#xff01;&#xff01;&#xff01; 重新安装&#xff1a;npm config set registry https://registry.npmmirror.c…

推荐?还是踩雷?3款中英互译软件大盘点,你真的选对了吗?

作为一个爱到处跑的人&#xff0c;我特别明白旅行的时候能说会道有多重要。不管是跟当地人聊天&#xff0c;还是看路标、菜单&#xff0c;有个好用的翻译软件是肯定少不了的。今天&#xff0c;我打算给你们介绍3款中英文互译的翻译工具&#xff0c;帮你挑出最适合自己的那一个。…

机器学习:opencv--人脸检测以及微笑检测

目录 前言 一、人脸检测的原理 1.特征提取 2.分类器 二、代码实现 1.图片预处理 2.加载分类器 3.进行人脸识别 4.标注人脸及显示 三、微笑检测 前言 人脸检测是计算机视觉中的一个重要任务&#xff0c;旨在自动识别图像或视频中的人脸。它可以用于多种应用&#xff0…

Python和MATLAB锂电铅蓄电化学微分模型和等效电路

&#x1f3af;要点 对比三种电化学颗粒模型&#xff1a;电化学的锂离子电池模型、单粒子模型和带电解质的单粒子模型。求解粒子域内边界通量与局部电流密度有关的扩散方程。扩展为两个相的负或正电极复合电极粒子模型。模拟四种耦合机制下活性物质损失情况。模拟锂离子电池三参…

【PhpSpreadsheet】ThinkPHP5+PhpSpreadsheet实现批量导出数据

目录 前言 一、安装 二、API使用 三、完整实例 四、效果图 前言 为什么使用PhpSpreadsheet&#xff1f; 由于PHPExcel不再维护&#xff0c;所以建议使用PhpSpreadsheet来导出exlcel&#xff0c;但是PhpSpreadsheet由于是个新的类库&#xff0c;所以只支持PHP7.1及以上的版…

服务器数据恢复—RAID5阵列上层Linux操作系统中节点损坏的数据恢复案例

服务器数据恢复环境&#xff1a; 一台服务器上有一组由5块硬盘&#xff08;4块数据盘1块热备盘&#xff09;组建的raid5阵列。服务器安装Linux Redhat操作系统&#xff0c;运行一套基于oracle数据库的OA系统。 服务器故障&#xff1a; 这组raid5阵列中一块磁盘离线&#xff0c…

观测云 AI 助手上线:智能运维,从此触手可及!

在当前的云原生时代&#xff0c;运维的复杂性和数据的爆炸式增长给企业带来了前所未有的挑战。为了帮助企业高效应对这些挑战&#xff0c;观测云自豪地推出了 AI 助手——智能化的运维助手&#xff0c;让每位用户都能轻松驾驭复杂的可观测性场景。 01 你身边的 PE 助手&#x…

《重置MobaXterm密码并连接Linux虚拟机的完整操作指南》

目录 引言 一、双击MobaXterm_Personal_24.2进入&#xff0c;但是忘记密码。 那么接下来请跟着我操作。 二、点击此链接&#xff0c;重设密码。 三、下载完成后&#xff0c;现在把这个exe文件解压。注意解压要与MobaXterm_Personal_24.2.exe在同一目录下哦&#xff0c;不然…

vim编辑器交换文件的产生与处理方法

文章目录 问题附图交换文件的作用和产生原因报错信息解读解决方法恢复文件使用命令行删除在文件管理器中删除在文本编辑器中处理 问题附图 简要分析 这个报错信息是由 vim 编辑器产生的&#xff0c;它表明在你尝试打开文件 /opt/software/openGauss/clusterconfig.xml 时&#…

MyBatis实践:提高持久层数据处理效率

文章目录 1 Mybatis简介1.1 简介1.2 持久层框架对比 2 快速入门2.1 准备数据库2.2 项目搭建2.3 依赖导入2.4 准备MyBatis配置文件2.5 实体类准备2.6 准备Mapper接口和MapperXML文件2.7 运行和测试 3. 核心配置文件4. MyBatis进阶使用4.0 以包为单位&#xff0c;引入所有的映射文…

一次性入门三款分布式定时任务调度框架:Quartz、ElasticJob3.0、xxl-job

分布式定时任务调度框架&#xff08;文末有源码&#xff09; 前言1、Quartz1.1 数据库1.2 maven依赖1.3 代码实现1.3.1 创建一个job1.3.1 为job设置trigger 1.4 配置文件1.5 启动、测试1.1 单机1.2 集群 2、ElasticJob2.1 下载zk2.2 新建三个类型的作业2.3 配置文件2.4 启动项目…