MySQL 如何从 Binlog 找出变更记录并回滚

文章目录

    • 前言
    • 1. 案例模拟
      • 1.1 确认信息
      • 1.2 下载 Binlog
      • 1.3 准备环境
      • 1.4 注册 Binlog
      • 1.5 准备结构信息
      • 1.6 Python 订阅
      • 1.7 输出结果展示
    • 2. 原理解析
      • 2.1 程序设计
      • 2.2 模块版本
    • 总结

前言

最近有研发同学问我:有一个问题,想查一个 ID 为 xxxx 的 sku 什么时候被更新了吗?更新前的数据是什么?一般这么讲,可能是由于手动执行或者是代码 Bug 导致出现数据丢失或者数据误更新,需要确认订正,一般需要通过分析 Binlog 来解决,本篇文章将通过该案例介绍此类问题的处理思路。

1. 案例模拟

1.1 确认信息

当有需求需要从 Binlog 中查询变更记录或者需要闪回数据的时候,需要和研发确认 时间范围、涉及到的 环境信息、库名、表名 最好是可以提供 SQL 语句。在上述的案例中,研发提供的是 sku 的 ID 时间范围是 2024-02-22 18:01:42 ~ 18:03:42。

1.2 下载 Binlog

阿里云、腾讯云、华为云 的数据库服务 Binlog 都是支持直接下载的,按照研发提供的时间区间下载对应的 Binlog 日志。
在这里插入图片描述
如果是本地自建的 MySQL 数据库,是没用日志开始时间和日志结束时间的,需要先确认下时间。可参考下方文档。

推荐阅读:MySQL 查询 Binlog 生成时间

1.3 准备环境

Binlog 下载完成后,那我们想要的数据记录也在里面,接下来将介绍如何获得我们想要的记录,首先需要准备一台测试数据库(作为数据库管理人员,随身带一套 MySQL 测试环境不过分吧?)测试数据库的大版本需要和生产环境的版本大版本一致。

以下是我环境信息:

  • 生产环境 MySQL 5.7.18
  • 测试环境 MySQL 5.7.33 (单实例)

1.4 注册 Binlog

该步骤,需要把从云上下载的 Binlog 注册到我们的测试环境中,首先需要先清空测试环境中的 Binlog 日志。

reset master;

查询 Binlog 索引文件的位置:

show variables like 'log_bin_index';

将我们从生产环境下载的 Binlog 拷贝到测试环境 Binlog 目录,然后再按照 mysql-bin.index 文件中的格式,将 Binlog 写进去。

/data/mysql_57/logs/mysql-bin.000001
/data/mysql_57/logs/mysql-bin.000002
/data/mysql_57/logs/mysql-bin.000003
/data/mysql_57/logs/mysql-bin.000004
/data/mysql_57/logs/mysql-bin.000005

上面,是注册完成的 Binlog 索引文件信息,生产环境下载了 5 个 Binlog 他们分别是 008213、008214、008215、008216、008217,拷贝到测试环境后,我们将原来 Binlog 名字修改为从 000001 开始,并且是顺序的。注意给拷贝来的 Binlog 设置用户属组。

chown -R mysql:mysql mysql-bin.*

设置完成后,重启测试环境的数据库,注册阶段完成。

1.5 准备结构信息

该步骤,需要把生产环境的表结构 copy 一份到测试环境。不用全部 copy 只 copy 需要查询记录的表。例如上面的 case 我们要查 product 库下的 sku 表。就在测试环境创建一个 product 库,然后将生产环境 sku 的表结构 copy 到测试环境。

create database product;
use product;-- 不在此展示完成结构了,与生产环境保持一致就行
create table sku(.........)create table sku_price(.........)

需要查询到记录涉及到多少张表,那么就 copy 多少张表就行。这次案例涉及到 2 张表。

1.6 Python 订阅

该步骤,要从 5 个 Binlog 文件中搜索到我们想要的记录,一个 Binlog 中可能有几十万个事务,这里我们通过编写 Python 脚本简化操作。我们要搜索的是 product 库下 sku、sku_price 表 sku_id = 810827 的变更记录,只需要按照下方代码注释修改即可。

在准备结构信息的步骤中,我们只在注册服务器中创建了需要的表,就起到了过滤表的作用,所以代码中不需要指定表名。

# -*- coding: utf-8 -*-
import sys
from datetime import datetime
from decimal import Decimal
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent
)
from pymysqlreplication.event import XidEvent, QueryEvent# 填写注册 MySQL 连接信息
mysql_settings = {'host': '172.16.104.56','port': 3306,'user': 'bing','password': 'abc123'
}stream = BinLogStreamReader(connection_settings=mysql_settings,server_id=8023,log_file='mysql-bin.000001',  # 从哪个 Binlog 开始扫描log_pos=4,only_schemas='product',  # 数据库名称only_events=[DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent,]
)def simple_data_type(data_info: dict):"""直接打印结果会包含一些对象信息,在这里简化处理"""tem_data = {}for key, value in data_info.items():if isinstance(value, Decimal):tem_data[key] = float(value)elif isinstance(value, datetime):tem_data[key] = value.strftime('%Y-%m-%d %H:%M:%S')else:tem_data[key] = valuereturn tem_datasearch_file_name = Nonefor binlog_event in stream:if search_file_name == stream.log_file:passelif search_file_name != stream.log_file:search_file_name = stream.log_fileprint('正在扫描:', search_file_name)for row in binlog_event.rows:try:event_time = datetime.fromtimestamp(binlog_event.timestamp)except OSError:event_time = datetime(1980, 1, 1, 0, 0)if isinstance(binlog_event, DeleteRowsEvent):df = row["values"]# 这里条件,需要自己改if int(df['sku_id']) == 810887:print('-' * 160)print('操作类型: DELETE')print('时间: ', event_time)print('日志文件: ', stream.log_file)print('数据库名:', binlog_event.schema)print('表名:', binlog_event.table)print('Position: ', binlog_event.packet.log_pos)print(simple_data_type(df))print('-' * 160)elif isinstance(binlog_event, UpdateRowsEvent):df = row["before_values"]# 这里条件,需要自己改if int(df['sku_id']) == 810827:print('-' * 160)print('操作类型: UPDATE')print('时间: ', event_time)print('日志文件: ', stream.log_file)print('数据库名:', binlog_event.schema)print('表名:', binlog_event.table)print('Position: ', binlog_event.packet.log_pos)print('before_values: ', simple_data_type(row["before_values"]))print('after_values: ', simple_data_type(row["after_values"]))elif isinstance(binlog_event, WriteRowsEvent):df = row["values"]# 这里条件,需要自己改if int(df['sku_id']) == 810827:print('-' * 160)print('操作类型: INSERT')print('时间: ', event_time)print('日志文件: ', stream.log_file)print('数据库名:', binlog_event.schema)print('表名:', binlog_event.table)print('Position: ', binlog_event.packet.log_pos)print(simple_data_type(df))

1.7 输出结果展示

结果已脱敏,可以看出 boutique_price 从原来的 1058.46 被修改为 1614.0,需要注意的是 Binlog 中的 Event 只能精确到秒。

操作类型: UPDATE
时间:  2024-02-22 18:02:42
日志文件:  mysql-bin.000003
数据库名: product
表名: sku
Position:  65716973
before_values:  {'sku_id': 810887, 'product_id': 26492, 'sku_code': '000', 'name': '', 'coverpic': '', 'introduction': '', 'in_price': 132.31, 'price': 361.1, 'created_at': '2022-11-18 13:37:48', 'updated_at': '2024-02-21 04:10:41', 'enabled': '1', 'retail_price': None, 'im_price': 150.0, 'last_check': '2022-11-18 13:37:48', 'size': 'UNI', 'boutique_price': 1058.46}
after_values:  {'sku_id': 810887, 'product_id': 26492, 'sku_code': '000', 'name': '', 'coverpic': '', 'introduction': '', 'in_price': 132.31, 'price': 361.1, 'created_at': '2022-11-18 13:37:48', 'updated_at': '2024-02-22 18:02:42', 'enabled': '1', 'retail_price': None, 'im_price': 150.0, 'last_check': '2022-11-18 13:37:48', 'size': 'UNI', 'boutique_price': 1614.0}

将结果交给研发,任务就算完成了。

2. 原理解析

2.1 程序设计

这里用到了一个模块 pymysqlreplication 它可以伪装成一个 IO 复制线程,从 MySQL 服务器中拉取 Binlog Event 并支持解析。

为什么直接解析 Binlog?因为 Binlog 中没用表字段名信息,直接解析比较难做一些过滤操作。先将表结构和 Binlog 注册到一台测试 MySQL 服务器,然后通过伪装 IO 复制线程拉取 Event 过滤找到我们想要的记录。

2.2 模块版本

模块代码库:python-mysql-replication

# 本次实验使用的版本
mysql-replication==0.13

安装方法:

pip3 install mysql-replication

总结

本篇文章介绍了如何从 Binlog 中定位记录,需要有一点 Python 基础,但注册 Binlog 思路可应用多个场景,例如使用它恢复增量日志等。得到记录结果后,如果要回滚,那么可以依靠上面的字典中的信息,翻译成 SQL 语句即可,目前程序还没有实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/709940.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

经典DP-最长单调子序列

最长递增子序列 思路 定义状态: 我们定义一个数组 dp,其中 dp[i] 表示以 nums[i] 结尾的最长递增子序列的长度。初始化状态: 对于数组中的每个元素 nums[i],初始时都可以被视为一个长度为1的递增子序列,因此 dp[i] 的…

常用的桌面端自动化测试工具

桌面端软件相比Web端软件而言,因为界面元素的多种形态,导致其定位更难,目前市面上常见的定位方式一般是两种,一种是通过控件本身的属性定位,第二种是通过图片识别来定位,市面上常用的可以支持不同操作系统的…

Qt6内嵌CEF

一、下载CEF CEF下载地址:https://cef-builds.spotifycdn.com/index.html 或https://bitbucket.org/chromiumembedded/cef/src/master/ 选择对应系统的版本(本教程选择的是116.0.19) CMake下载地址:https://cmake.org/download…

thefour--Love is like a tide

最后一部分了,要开始进行我们的训练了。 先上代码: import os import numpy as np from tqdm import tqdm import tensorflow as tf from thetwo import NeuralStyleTransferModel import theone import thethree #创建模型 modelNeuralStyleTransferM…

代码随想录训练营第31天 | 理论基础、LeetCode 455.分发饼干、

目录 理论基础 视频讲解:手把手带你学会操作链表 | 贪心算法理论基础!_哔哩哔哩_bilibili LeetCode 455.分发饼干 文章讲解:代码随想录(programmercarl.com) 视频讲解:贪心算法,你想先喂哪个小孩?| Le…

【GameFramework框架内置模块】7、事件(Event)

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址 大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦。 一、前言 【GameFramework框架】系列教程目录: https://blog.csdn.net/q7…

【Vue】路由

📝个人主页:五敷有你 🔥系列专栏:Vue ⛺️稳中求进,晒太阳 目录 路由 单页应用程序 总结: VueRouter 核心步骤: 组件存放目录的问题 路由的封装 声明式导航 声明式导航 - 导航链…

Go语言必知必会100问题-11 使用选项模式

使用选项模式 在设计API时,可能会遇到一个问题:如何处理可选配置?有效的解决可选配置问题可以提高API的灵活性。本文通过一个具体示例说明处理可选配置的一些方法。该示例的要求是设计一个对外提供创建HTTP服务器的库函数。函数定义如下&…

服了,阿里云服务器和腾讯云服务器价格差不多怎么选择?

2024年阿里云服务器和腾讯云服务器价格战已经打响,阿里云服务器优惠61元一年起,腾讯云服务器62元一年,2核2G3M、2核4G、4核8G、8核16G、16核32G、16核64G等配置价格对比,阿腾云atengyun.com整理阿里云和腾讯云服务器详细配置价格表…

高级语言期末2011级B卷(计算机学院)

1.编写函数&#xff0c;实现按照如下公式计算的功能&#xff0c;其中n为自然数 #include <stdio.h>int fac(int n) {if(n0)return 1;elsereturn n*fac(n-1); }float fun(int n) {float flag;float sum0;for(int i0; i<n; i) {flagi/((i1)*fac(i2));sumflag;}return su…

重推请求之curl和fiddler

在实际的项目中会有出现问题&#xff0c;想重现的场景&#xff0c;比较重新调用一个服务&#xff0c;那么如何进行快速的重推请求呢&#xff0c;记录下来&#xff0c;方便备查。 主要有curl和fiddler两种方式&#xff0c;下面详细说。 方式一、curl 命令 curl 是一个利用URL规…

云上攻防-云服务篇弹性计算服务器云数据库实例元数据控制角色AK控制台接管

知识点: 1、云服务-弹性计算服务器-元数据&SSRF&AK 2、云服务-云数据库-外部连接&权限提升 章节点&#xff1a; 云场景攻防&#xff1a;公有云&#xff0c;私有云&#xff0c;混合云&#xff0c;虚拟化集群&#xff0c;云桌面等 云厂商攻防&#xff1a;阿里云&am…

租赁小程序|租赁系统|租赁软件开发带来高效运营

随着社会的不断发展和科技的不断进步&#xff0c;越来越多的企业开始关注设备租赁业务。设备租赁作为一种短期使用设备的方式&#xff0c;为企业提供了灵活和成本节约的优势。针对设备租赁业务的管理和提升企业竞争力的需求&#xff0c;很多企业选择定制开发设备租赁系统。本文…

js 面试 1判断变量是否是数组 2 检测数据类型方法

1 是否是数组 1) typeof 检测数据类型运算符 优点&#xff1a;使用简单 缺点&#xff1a;只能检测基本类型&#xff08;除null外&#xff09; console.log(typeof(10)) //Number console.log(typeof(false)) //boolean console.log(typeof(hello)) //string console.log(typeof…

vue使用gitshot生成gif

vue使用gitshot生成gif 问题背景 本文将介绍vue中使用gitshot生成gif。 问题分析 解决思路&#xff1a; 使用input组件上传一个视频&#xff0c;获取视频文件后用一个video组件进行播放&#xff0c;播放过程进行截图生成图片数组。 demo演示上传一个视频&#xff0c;然后生…

如何使用Docker部署IT-Tools并结合内网穿透实现公网访问本地工具箱服务

作为程序员&#xff0c;在日常工作中&#xff0c;需要借助一些工具来提高我们工作效率&#xff0c;IT-Tools是为开发人员度身打造的一套便捷在线工具。它提供全面功能&#xff0c;使开发者能以更高效方式完成任务。经由IT-Tools&#xff0c;开发人员能轻松应对各类技术挑战&…

C++之数组

1&#xff0c;概述 所谓数组&#xff0c;就是一个集合&#xff0c;里面存放了相同类型的数据元素 特点1&#xff1a;数组中没干过数据元素都是相同的数据类型 特点2&#xff1a;数组都是连续存放位置组成的 2&#xff0c;一维数组 2.1 一维数组的定义 一维数组定义有三种…

Leetcode583. 两个字符串的删除操作 -代码随想录

题目&#xff1a; 代码(首刷自解 2024年2月29日&#xff09;&#xff1a; class Solution { public:// 动态规划 好像和找最长公共子序列一样&#xff1f;int minDistance(string word1, string word2) {int sz1 word1.size();int sz2 word2.size();// dp initvector<vec…

SD-WAN技术:优化国内外服务器访问的关键

在全球化的商业环境中&#xff0c;企业经常需要在国内访问国外的服务器。然而&#xff0c;由于地理位置和网络架构的限制&#xff0c;这种跨国访问往往会遇到速度慢、延迟高等问题。SD-WAN&#xff08;软件定义广域网&#xff09;技术的兴起&#xff0c;为企业提供了一种新的解…

八股文打卡day24——数据库(1)

面试题&#xff1a;左连接和右连接的区别&#xff1f; 我的回答&#xff1a; 左连接的SQL语句是&#xff1a;左表 left join 右表 on 连接条件&#xff0c;表示以左表为基础&#xff0c;将左表的的所有记录与右表进行连接。即使右表中没有与左表匹配的记录&#xff0c;左连接…