MySQL Binlog 日志监听与 Spring 集成实战
binlog的三种模式
MySQL 的二进制日志(binlog)有三种常见的格式:Statement 模式、Row 模式和Mixed 模式。每种模式的设计目标不同,适用于不同的场景,以下是它们的详细对比和应用:
1. Statement 模式
在 Statement 模式下,MySQL 记录的是每个执行的 SQL 语句,而不是具体的数据变化。例如,执行一个 UPDATE
语句时,binlog 中记录的是该 SQL 语句,而不是更新后的数据。
优点:
- 日志文件小:仅记录 SQL 语句,较为轻量。
- 性能好:对于简单 SQL 操作非常高效。
缺点:
- 不确定性:对于非确定性 SQL(如包含
RAND()
、NOW()
等函数的语句),可能导致主从数据不一致。
2. Row 模式
在 Row 模式下,MySQL 记录每一行数据的变化。如果执行 UPDATE
语句,binlog 记录的是被更新的行的具体数据,而非 SQL 语句。
优点:
- 精确记录:每一行数据的变更都被完整记录,避免因 SQL 语句复杂性导致的不一致问题。
- 可靠性高:即使是非确定性的操作,也能保证数据一致性。
缺点:
- 日志文件大:每一行变化都要单独记录,可能导致日志文件急剧增大。
- 性能开销:尤其是大批量数据变更时,性能会受到影响。
3. Mixed 模式
Mixed 模式结合了 Statement 模式 和 Row 模式,根据具体 SQL 的类型动态选择记录方式。对于简单的 SQL 语句(如 INSERT
),MySQL 使用 Statement 模式;对于复杂的操作或涉及多行数据的 SQL 语句,则采用 Row 模式。
优点:
- 平衡性能与准确性:对于不同的操作选择最合适的记录方式。
- 灵活性高:在大多数应用场景下,Mixed 模式能提供较好的性能与数据一致性。
缺点:
- 配置复杂:需要理解 MySQL 如何选择使用不同模式,可能导致配置不当。
如何设置 Binlog 格式
可以通过修改 MySQL 配置文件来设置 binlog_format
参数,具体操作如下:
[mysqld]
binlog_format=mixed
其中,statement
、row
和 mixed
分别代表 Statement 模式、Row 模式和 Mixed 模式。选择适当的 binlog 模式取决于应用的特定需求和性能要求。不同的模式具有不同的优劣势,例如,Statement 模式可能会更轻量,而 Row 模式可能提供更详细的数据变化信息。
以Mixed 为例
查看 Binlog 是否开启
你可以通过以下 SQL 查询来检查 binlog 是否开启:
show variables like '%log_bin%'
启动springboot程序
新建数据库
这个事件是一个 binlog 事件,其内容表示一个 SQL 查询事件。让我解释一下这个事件的各个部分:
- 事件类型 (***
eventType
***): 该事件的类型是QUERY
,表示这是一个 SQL 查询事件。 - 时间戳 (***
timestamp
***): 事件的时间戳为1700045267000
,表示事件发生的时间。 - 线程ID (***
threadId
***): 线程ID 是189
,表示执行这个查询的线程的标识符。 - 执行时间 (***
executionTime
***): 执行时间为0
,表示执行这个查询所花费的时间。 - 错误代码 (***
errorCode
***): 错误代码为0
,表示查询执行没有错误。 - 数据库 (***
database
***): 数据库为test2023
,表示这个查询发生在test2023
数据库中。 - SQL 查询 (***
sql
***): 实际的 SQL 查询为CREATE DATABASE
test2023CHARACTER SET utf8 COLLATE utf8_general_ci
,表示执行了创建数据库的操作。
这个事件的作用是在 test2023
数据库中执行了一个创建数据库的 SQL 查询。这是 binlog 中的一部分,用于记录数据库中的变化,以便进行数据备份、主从同步等操作。
新建表数据
CREATE TABLE `t_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`userName` varchar(100) NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;
这个事件也是一个 binlog 事件,表示一个 SQL 查询事件。让我解释一下这个事件的各个部分:
- 事件类型 (***
eventType
***): 该事件的类型是QUERY
,表示这是一个 SQL 查询事件。 - 时间戳 (***
timestamp
***): 事件的时间戳为1700045422000
,表示事件发生的时间。 - 线程ID (***
threadId
***): 线程ID 是204
,表示执行这个查询的线程的标识符。 - 执行时间 (***
executionTime
***): 执行时间为0
,表示执行这个查询所花费的时间。 - 错误代码 (***
errorCode
***): 错误代码为0
,表示查询执行没有错误。 - 数据库 (***
database
***): 数据库为test2023
,表示这个查询发生在test2023
数据库中。 - SQL 查询 (***
sql
***): 实际的 SQL 查询为CREATE TABLE
t_user(
idbigint(20) NOT NULL AUTO_INCREMENT,
userNamevarchar(100) NOT NULL, PRIMARY KEY (
id) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
,表示执行了在test2023
数据库中创建名为t_user
的表的操作。
这个事件的作用是在 test2023
数据库中创建了一个名为 t_user
的表,该表包含 id
和 userName
两个字段,其中 id
是自增的主键。这种类型的事件常常用于记录数据库结构的变化,以便进行数据备份、迁移和版本控制等操作。
插入表数据
INSERT INTO `test2023`.`t_user` (`id`, `userName`)
VALUES("10086","用心记录技术,走心分享,始于后端,不止于后端,励志成为一名优秀的全栈架构师,真正的实现码中致富。");
这个事件也是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:
- 事件类型 (***
eventType
***): 该事件的类型是QUERY
,表示这是一个 SQL 查询事件。 - 时间戳 (***
timestamp
***): 事件的时间戳为1700045547000
,表示事件发生的时间。 - 线程ID (***
threadId
***): 线程ID 是204
,表示执行这个查询的线程的标识符。 - 执行时间 (***
executionTime
***): 执行时间为0
,表示执行这个查询所花费的时间。 - 错误代码 (***
errorCode
***): 错误代码为0
,表示查询执行没有错误。 - 数据库 (***
database
***): 数据库为test2023
,表示这个查询发生在test2023
数据库中。 - SQL 查询 (***
sql
***): 实际的 SQL 查询为INSERT INTO
test2023.
t_user(
id,
userName) VALUES ( "10086", "用心记录技术,走心分享,始于后端,不止于后端,励志成为一名优秀的全栈架构师,真正的实现码中致富。"
,表示执行了向test2023
数据库的t_user
表中插入一行数据的操作。
这个事件的作用是向 t_user
表中插入了一行数据,包含了 id
和 userName
两个字段的值。这种类型的事件通常用于记录数据的变化,以便进行数据备份、同步和迁移等操作。
修改表数据修改表数据
修改表数据
UPDATE `test2023`.`t_user`
SET `id` = '10086',`userName` = '我的修改数据!!!'
WHERE(`id` = '10086');
这个事件同样是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:
- 事件类型 (***
eventType
***): 该事件的类型是QUERY
,表示这是一个 SQL 查询事件。 - 时间戳 (***
timestamp
***): 事件的时间戳为1700045675000
,表示事件发生的时间。 - 线程ID (***
threadId
***): 线程ID 是204
,表示执行这个查询的线程的标识符。 - 执行时间 (***
executionTime
***): 执行时间为0
,表示执行这个查询所花费的时间。 - 错误代码 (***
errorCode
***): 错误代码为0
,表示查询执行没有错误。 - 数据库 (***
database
***): 数据库为test2023
,表示这个查询发生在test2023
数据库中。 - SQL 查询 (***
sql
***): 实际的 SQL 查询为UPDATE
test2023.
t_userSET
id= '10086',
userName= '我的修改数据!!!' WHERE (
id= '10086')
,表示执行了更新test2023
数据库中的t_user
表中一行数据的操作。
这个事件的作用是将 t_user
表中 id
为 10086
的行的数据进行更新,将 id
修改为 10086
,userName
修改为 ‘我的修改数据!!!’。这种类型的事件通常用于记录数据的变化,以便进行数据备份、同步和迁移等操作。
删除表数据
DELETE
FROMt_user
WHEREid = '10086';
这个事件同样是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:
- 事件类型 (***
eventType
***): 该事件的类型是QUERY
,表示这是一个 SQL 查询事件。 - 时间戳 (***
timestamp
***): 事件的时间戳为1700045755000
,表示事件发生的时间。 - 线程ID (***
threadId
***): 线程ID 是204
,表示执行这个查询的线程的标识符。 - 执行时间 (***
executionTime
***): 执行时间为0
,表示执行这个查询所花费的时间。 - 错误代码 (***
errorCode
***): 错误代码为0
,表示查询执行没有错误。 - 数据库 (***
database
***): 数据库为test2023
,表示这个查询发生在test2023
数据库中。 - SQL 查询 (***
sql
***): 实际的 SQL 查询为DELETE FROM t_user WHERE id = '10086'
,表示执行了删除test2023
数据库中的t_user
表中一行数据的操作。
这个事件的作用是删除 t_user
表中 id
为 10086
的行。这种类型的事件通常用于记录数据的删除操作,以便进行数据备份、同步和迁移等操作。
总结: binlog_format 设置为 mixed 时,对于 INSERT、UPDATE 和 DELETE 操作,它们在 binlog 中的事件类型都会被表示为 QUERY 事件。这是因为在 mixed 模式下,MySQL 使用了不同的方式来记录不同类型的操作,但在 binlog 中,它们都被包装成了 QUERY 事件。
在 mixed 模式下:
- 对于某些语句级别的操作(例如非确定性的语句或不支持事务的存储引擎),会使用 STATEMENT 事件。
- 对于其他一些情况,会使用 ROW 事件,将变更的行作为事件的一部分进行记录。
这就是为什么看到的 INSERT、UPDATE 和 DELETE 操作的事件类型都是 QUERY。在处理这些事件时,需要根据具体的 SQL 查询语句或其他信息来确定操作的类型。
源码示例
pom.xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.48</version> <!-- 查看最新版本 --></dependency><dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.21.0</version></dependency>
Java示例
package com.example.demo.listener;import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.naming.AuthenticationException;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeoutException;public class BinlogListenerMixed {private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class);private static final String MYSQL_HOST = "8.130.74.105";private static final int MYSQL_PORT = 3306;private static final String MYSQL_USERNAME = "root";private static final String MYSQL_PASSWORD = "zhang.ting.123";public static void main(String[] args) {try {BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
// client.setBinlogFilename(null);
// client.setBinlogPosition(-1); // 或者设置为其他适当的初始位置
// client.setServerId(1);
// client.setBinlogFilename("mysql-bin.000005");
// client.setBinlogPosition(154);EventDeserializer eventDeserializer = new EventDeserializer();eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);logger.info("使用主机={}, 端口={}, 用户名={}, 密码={} 连接到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);client.setEventDeserializer(eventDeserializer);client.registerEventListener(BinlogListenerMixed::handleEvent);client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() {@Overridepublic void onConnect(BinaryLogClient client) {logger.info("Connected to MySQL server");}@Overridepublic void onCommunicationFailure(BinaryLogClient client, Exception ex) {logger.error("Communication failure with MySQL server", ex);}@Overridepublic void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {logger.error("Event deserialization failure", ex);}@Overridepublic void onDisconnect(BinaryLogClient client) {logger.warn("Disconnected from MySQL server");// 在这里添加重新连接或其他处理逻辑}});client.connect();} catch (IOException e) {logger.error("@@ 连接到 MySQL 时发生错误", e);logger.error("@@ Error connecting to MySQL", e);}}private static void handleEvent(Event event) {logger.info("@@ 打印 event: {}", event);logger.info("@@ Received event type: {}", event.getHeader().getEventType());switch (event.getHeader().getEventType()) {case WRITE_ROWS:case EXT_WRITE_ROWS:handleWriteRowsEvent((WriteRowsEventData) event.getData());break;case QUERY:handleQueryEvent((QueryEventData) event.getData());break;case TABLE_MAP:handleTableMapEvent((TableMapEventData) event.getData());break;// 其他事件处理...}}private static void handleWriteRowsEvent(WriteRowsEventData eventData) {List<Serializable[]> rows = eventData.getRows();// 获取表名String tableName = getTableName(eventData);// 处理每一行数据for (Serializable[] row : rows) {// 根据需要调整以下代码以获取具体的列值String column1Value = row[0].toString();String column2Value = row[1].toString();// 将数据备份到另一个数据库backupToAnotherDatabase(tableName, column1Value, column2Value);}}private static void handleQueryEvent(QueryEventData eventData) {String sql = eventData.getSql();logger.info("@@ handleQueryEvent函数执行Query event SQL: {}", sql);// 解析SQL语句,根据需要处理// 例如,检查是否包含写入操作,然后执行相应的逻辑}private static void handleTableMapEvent(TableMapEventData eventData) {// 获取表映射信息,根据需要处理logger.info("@@ handleTableMapEvent函数执行TableMap event: {}", eventData);}private static String getTableName(EventData eventData) {// 获取表名的逻辑,可以使用TableMapEventData等信息// 根据实际情况实现return "example_table";}private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) {// 将数据备份到另一个数据库的逻辑logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value);}
}
总结
选择合适的 binlog 模式对数据库的性能和数据一致性至关重要:
- Statement 模式适用于简单操作,能节省存储空间,但可能导致不一致。
- Row 模式能精确记录数据变化,适合对数据一致性要求较高的场景。
- Mixed 模式平衡了性能与准确性,适用于大多数应用场景。