MySQL Binlog增量同步工具go-mysql-transfer实现详解

go-mysql-transfer产品手册:https://www.kancloud.cn/wj596/go-mysql-transfer/2111996

一、 概述

工作需要研究了下阿里开源的MySQL Binlog增量订阅消费组件canal,其功能强大、运行稳定,但是有些方面不是太符合需求,主要有如下三点:

  1. 需要自己编写客户端来消费canal解析到的数据
  2. server-client模式,需要同时部署server和client两个组件,我们的项目中有6个业务数据库要实时同步到redis,意味着要多部署12个组件,硬件和运维成本都会增加。
  3. 从server端到client端需要经过一次网络传输和序列化反序列化操作,然后再同步到接收端,感觉没有直接怼到接收端更高效。

go-mysql-transfer是使用Go语言实现的MySQL数据库实时增量同步工具, 参考Canal但是规避了上述三点。旨在实现一个高性能、低延迟、简洁易用的Binlog增量数据同步管道, 具有如下特点:

  1. 不依赖其它组件,一键部署
  2. 集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ,不需要再编写客户端,开箱即用
  3. 内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑
  4. 支持监控告警,集成Prometheus客户端
  5. 高可用集群部署
  6. 数据同步失败重试
  7. 全量数据初始化

二、 与同类工具比较

三、 设计实现

1、实现原理

go-mysql-transfer将自己伪装成MySQL的Slave,向Master发送dump协议获取binlog,解析binlog并生成消息,实时发送给接收端。

2、数据转换规则

将从binlog解析出来的数据,经过简单的处理转换发送到接收端。使用内置丰富数数据转换规则,可完成大部分同步工作。

例如将表t_user同步到reids,配置如下规则:

rule:-schema: eseap #数据库名称table: t_user #表名称column_underscore_to_camel: true #列名称下划线转驼峰,默认为falsedatetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ssvalue_encoder: json  #值编码类型,支持json、kv-commas、v-commasredis_structure: string # redis数据类型。支持string、hash、list、set类型(与redis的数据类型一致)redis_key_prefix: USER_ #key前缀redis_key_column: USER_NAME #使用哪个列的值作为key,不填写默认使用主键

t_user表,数据如下:

同步到Redis后,数据如下:

更多规则配置和同步案例 请见后续的"使用说明"章节。

3、数据转换脚本

Lua 是一种轻量小巧的脚本语言, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。开发者只需要花费少量时间就能大致掌握Lua的语法,照虎画猫写出可用的脚本。

基于Lua的高扩展性,可以实现更为复杂的数据解析、消息生成逻辑,定制需要的数据格式。

使用方式:

rule:-schema: eseaptable: t_userlua_file_path: lua/t_user_string.lua   #lua脚本文件

示例脚本:

local json = require("json")    -- 加载json模块
local ops = require("redisOps") -- 加载redis操作模块local row = ops.rawRow()  --当前变动的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库的操作事件,包括:insert、updare、deletelocal id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local key = "user_"..id -- 定义keyif action == "delete" -- 删除事件
thenops.DEL(key)  -- 删除KEY
else local password = row["PASSWORD"] --获取USER_NAME列的值local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local result= {}  -- 定义结果result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["source"] = "binlog" -- 数据来源local val = json.encode(result) -- 将result转为jsonops.SET(key,val)  -- 对应Redis的SET命令,第一个参数为key(string类型),第二个参数为value
end

t_user表,数据如下:

同步到Redis后,数据如下:

更多Lua脚本使用说明 和同步案例 请见后续的"使用说明"章节。

4、监控告警

Prometheus是流行开源监控报警系统和TSDB,其指标采集组件被称作exporter。go-mysql-transfer本身就是一个exporter。向Prometheus提供应用状态、接收端状态、insert数量、update数量、delete数量、delay延时等指标。

go-mysql-transfer内置Prometheus exporter可以监控系统的运行状况,并进行健康告警。

相关配置:

enable_exporter: true #启用prometheus exporter,默认false
exporter_addr: 9595 #prometheus exporter端口,默认9595

直接访问127.0.0.1:9595可以看到导出的指标值,如何与Prometheus集成,请参见Prometheus相关教程。

指标说明:transfer_leader_state:当前节点是否为leader,0=否、1=是 transfer_destination_state:接收端状态, 0=掉线、1=正常 transfer_inserted_num:插入数据的数量 transfer_updated_num:修改数据的数量 transfer_deleted_num:删除数据的数量 transfer_delay:与MySQL Master的时延

5、高可用

可以选择依赖zookeeper或者etcdr构建高可用集群,一个集群中只存在一个leader节点,其余皆为follower节点。只有leader节点响应binglog的dump事件,follower节点为蛰伏状态,不发送dump命令,因此多个follower也不会加重Master的负担。当leader节点出现故障,follower节点迅速替补上去,实现秒级故障切换。

相关配置:

cluster: # 集群配置name: myTransfer #集群名称,具有相同name的节点放入同一个集群# ZooKeeper地址,多个用逗号分隔zk_addrs: 192.168.1.10:2181,192.168.1.11:2182,192.168.1.12:2183#zk_authentication: 123456 #digest类型的访问秘钥,如:user:password,默认为空#etcd_addrs: 192.168.1.10:2379 #etcd连接地址,多个用逗号分隔#etcd_user: test #etcd用户名#etcd_password: 123456 #etcd密码

6、失败重试

网络抖动、接收方故障都会导致数据同步失败,需要有重试机制,才能保证不漏掉数据,使得每一条数据都能送达。

通常有两种重试实现方式,一种方式是记录下故障时刻binglog的position(位移),等故障恢复后,从position处重新dump 数据,发送给接收端。

一种方式是将同步失败的数据在本地落盘,形成队列。当探测到接收端可用时,逐条预出列尝试发送,发送成功最终出列。确保不丢数据,队列先进先出的特性也可保证数据顺序性,正确性。

go-mysql-transfer采用的是后者,目的是减少发送dump命令的次数,减轻Master的负担。因为binglog记录的整个Master数据库的日志,其增长速度很快。如果只需要拿几条数据,而dump很多数据,有点得不偿失。

7、全量数据初始化

如果数据库原本存在无法通过binlog进行增量同步的数据,可以使用命令行工具-stock完成始化同步。stock基于 SELECT * FROM {table}的方式分批查询出数据,根据规则或者Lua脚本生成指定格式的消息,批量发送到接收端。执行命令 go-mysql-transfer -stoc,在控制台可以直观的看到数据同步状态,如下:

四、安装

二进制安装包

直接下载编译好的安装包: https://github.com/wj596/go-mysql-transfer/releases

源码编译

1、依赖Golang 1.14 及以上版本 2、设置GO111MODULE=on 3、拉取源码 go get -d github.com/wj596/go-mysql-transfer 3、进入目录,执行 go build 编译

五、部署运行

开启MySQL的binlog

#Linux在my.cnf文件
#Windows在my.ini文件
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 go-mysql-transfer 的 slave_id 重复

命令行运行 1、修改app.yml 2、Windows直接运行 go-mysql-transfer.exe 3、Linux执行 nohup go-mysql-transfer &

docker运行

1、拉取源码 go get -d github.com/wj596/go-mysql-transfer 2、修改配置文件 app.yml 中相关配置 3、构建镜像 docker image build -t go-mysql-transfer -f Dockerfile . 4、运行 docker run -d --name go-mysql-transfer -p 9595:9595 go-mysql-transfer:latest

六、性能测试

1、测试环境

平台:虚拟机 CPU:E7-4890 4核8线程 内存:8G 硬盘:机械硬盘 OS:Windows Sever 2012 R2 MySQL: 5.5 Rides: 4.0.2

2、测试数据

t_user表,14个字段,1个字段包含中文,数据量527206条

3、测试配置

规则:

    schema: eseaptable: t_userorder_by_column: id #排序字段,全量数据初始化时不能为空#column_lower_case:false #列名称转为小写,默认为false#column_upper_case:false#列名称转为大写,默认为falsecolumn_underscore_to_camel: true #列名称下划线转驼峰,默认为false# 包含的列,多值逗号分隔,如:id,name,age,area_id  为空时表示包含全部列#include_column: ID,USER_NAME,PASSWORDdate_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dddatetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ssvalue_encoder: json  #值编码,支持json、kv-commas、v-commasredis_structure: string # 数据类型。支持string、hash、list、set类型(与redis的数据类型一直)redis_key_prefix: USER_ #key的前缀redis_key_column: ID #使用哪个列的值作为key,不填写默认使用主键

脚本:

local json = require("json")    -- 加载json模块
local ops = require("redisOps") -- 加载redis操作模块local row = ops.rawRow()  --当前变动的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库的操作事件,包括:insert、updare、deletelocal id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local key = "user_"..id -- 定义keyif action == "delete" -- 删除事件
thenops.DEL(key)  -- 删除KEY
else local password = row["PASSWORD"] --获取USER_NAME列的值local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local result= {}  -- 定义结果result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["source"] = "binlog" -- 数据来源local val = json.encode(result) -- 将result转为jsonops.SET(key,val)  -- 对应Redis的SET命令,第一个参数为key(string类型),第二个参数为value
end

3、测试用例一

使用规则,将52万条数据全量初始化同步到Redis,结果如下:

3次运行的中间值为4.6秒

4、测试用例二

使用Lua脚本,将52万条数据全量初始化同步到Redis,结果如下:

3次运行的中间值为9.5秒

5、测试用例三

使用规则,将binlog中52万条增量数据同步到Redis。结果如下:

每秒增量同步(TPS)32950条

6、测试用例四

使用Lua脚本,将binlog中52万条增量数据同步到Redis。结果如下:

每秒增量同步(TPS)15819条

7、测试用例五

100个线程不停向MySQL写数据,使用规则将数据实时增量同步到Redis,TPS保持在4000以上,资源占用情况如下:

100个线程不停向MySQL写数据,使用Lua脚本将数据实时增量同步到Redis,TPS保持在2000以上,资源占用情况如下:

以上测试结果,会随着测试环境的不同而改变,仅作为参考。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509667.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构实验之栈五:下一较大值(一)

题目描述 对于包含n&#xff08;1<n<1000&#xff09;个整数的序列&#xff0c;对于序列中的每一元素&#xff0c;在序列中查找其位置之后第一个大于它的值&#xff0c;如果找到&#xff0c;输出所找到的值&#xff0c;否则&#xff0c;输出-1。 输入 输入有多组&#xf…

与 C++11 多线程相关的头文件

C11 新标准中引入了四个头文件来支持多线程编程&#xff0c;他们分别是<atomic> ,<thread>,<mutex>,<condition_variable>和<future>。 <atomic>&#xff1a;该头文主要声明了两个类, std::atomic 和 std::atomic_flag&#xff0c;另外还…

数据结构实验之栈二:一般算术表达式转换成后缀式

题目描述 对于一个基于二元运算符的算术表达式&#xff0c;转换为对应的后缀式&#xff0c;并输出之。输入 输入一个算术表达式&#xff0c;以‘#’字符作为结束标志。输出 输出该表达式转换所得到的后缀式。示例输入 a*b(c-d/e)*f# 示例输出 ab*cde/-f* #include<stdio.h…

INFINI GATEWAY 极限网关初体验 ElasticSearch 两个集群数据同步

文章目录极限网关-配置说明配置文件日志、数据目录定义入口定义路由定义流程定义资源使用Demo写入两个ES集群极限网关-常见问题shutdown: ORM handler is not registered极限网关地址极限网关-配置说明 极限网关的大部分配置都可以通过 gateway.yml 来进行配置&#xff0c;配置…

std::thread详解

转自&#xff1a;http://www.cnblogs.com/haippy/p/3236136.html 上一篇博客《C11 并发指南一(C11 多线程初探)》中只是提到了 std::thread 的基本用法&#xff0c;并给出了一个最简单的例子&#xff0c;本文将稍微详细地介绍 std::thread 的用法。 std::thread 在 <thread&…

Kafka 详细配置参数说明

参数值参数文件描述auto.create.topics.enableserver.properties【说明】是否允许自动创建Topic&#xff0c;若是false&#xff0c;就需要通过命令创建Topic。【默认值】true【取值范围】true或falselog.cleaner.backoff.msserver.properties【说明】检查是否有日志需要清理的时…

数据结构实验之栈三:后缀式求值

题目描述 对于一个基于二元运算符的后缀表示式&#xff08;基本操作数都是一位正整数&#xff09;&#xff0c;求其代表的算术表达式的值。输入 输入一个算术表达式的后缀式字符串&#xff0c;以‘#’作为结束标志。输出 求该后缀式所对应的算术表达式的值&#xff0c;并输出之…

std::mutex详解

Mutex 又称互斥量&#xff0c;C 11中与 Mutex 相关的类&#xff08;包括锁类型&#xff09;和函数都声明在 <mutex> 头文件中&#xff0c;所以如果你需要使用 std::mutex&#xff0c;就必须包含 <mutex> 头文件。 <mutex> 头文件介绍 Mutex 系列类(四种) st…

java中stack集合框架

栈(Stack):数据结构的一种,存储特点:Last In First Out. Stack 类表示后进先出&#xff08;LIFO&#xff09;的对象栈. 栈结构在生活中的体现: 1):QQ消息. A,B,C三个人先后发送消息,我们查看的时候发现最顶上的是最新的消息. 2):手枪弹夹的装和发射: 要来实现栈的存储,底层…

ElasticSearch Pipeline 为新增数据设置更新时间

文章目录模拟测试测试返回结果实际应用创建Pipeline查看创建Pipeline新增数据测试查看新增数据创建索引时直接设置Pipeline模拟测试 测试 POST _ingest/pipeline/_simulate {"pipeline": {"processors": [{"set": {"field": "t…

队列的基本操作

链式存储 typedef int QElemType; typedef int Status;//具体数据类型具体定义 typedef struct QNode//队列结点结构体 { QElemType data; QNode *next; } QNode, *Queueptr; typedef struct // 链队列类型 { Queueptr front; // 队头指针&#xff08;结构体类…

c++阻塞队列

基于C11的阻塞队列简单实现 转载请说明出处&#xff1a;http://blog.csdn.net/cywosp/article/details/9157379 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于&#xff0c;当队列为空时&#xff0c;从队列获取…

java中ArrayList类的操作

ArrayList类是Java集合框架出现之后用来取代Vector类的: 二者底层原理都是基于数组的算法,一模一样. 区别: Vector: 所有的方法都使用了synchronized修饰符. 线程安全但是性能较低. 适用于多线程环境. ArrayList:所有的方法都没有使用synchronized修饰符. 线程不安全但是性…

Elasticsearch Painless Script详解

文章目录1. Painless 简介Painless 的用途2. 参数3. 首选参数4. 简短脚本形式5. 通过 Painless 脚本访问字段6. 示例6.1 案例 1&#xff1a;Script Processsor6.2 案例 2&#xff1a;文档更新计数6.3 案例 3&#xff1a;搜索时的 Script 字段6.4 Script :Inline v.s Stored6.5 …

算术表达式的转换

题目描述 小明在学习了数据结构之后&#xff0c;突然想起了以前没有解决的算术表达式转化成后缀式的问题&#xff0c;今天他想解决一下。因为有了数据结构的基础小明很快就解出了这个问题&#xff0c;但是他突然想到怎么求出算术表达式的前缀式和中缀式呢&#xff1f;小明很困惑…

Reactor事件驱动的两种设计实现:面向对象 VS 函数式编程

内容目录&#xff1a; Reactor实现架构对比面向对象的Reactor方案设计函数式编程的Reactor设计示例对比两者的时序图对比结论 Reactor事件驱动的两种设计实现&#xff1a;面向对象 VS 函数式编程 这里的函数式编程的设计以muduo为例进行对比说明&#xff1b; Reactor实现架构对…

ElasticSearch 快照 备份、恢复数据

文章目录ElasticSearch 设置备份文件地址注册快照存储库查看快照存储库保存结果创建快照异步创建指定索引进行快照查看全部快照在服务器查看备份的数据恢复数据本机恢复其他服务器恢复常见问题报错 doesnt match any of the locations specified by path.repo because this set…

java中LinkedList类的操作

LinkedList类是双向链表,单向队列,双向队列,栈的实现类: LinkedList类实现单向队列和双向队列的接口,自身提高了栈操作的方法,链表操作的方法. 在LinkedList类中存在很多方法,但是功能都是相同的.LinkedList表示了多种数据结构的实现,每一种数据结构的操作名字不同. 面试题:编…

数据结构实验之栈七:出栈序列判定

题目描述 给一个初始的入栈序列&#xff0c;其次序即为元素的入栈次序&#xff0c;栈顶元素可以随时出栈&#xff0c;每个元素只能入栈依次。输入一个入栈序列&#xff0c;后面依次输入多个序列&#xff0c;请判断这些序列是否为所给入栈序列合法的出栈序列。 例如序列1&#x…

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

文章目录FileBeat Pipeline 解析日志 保存至ElasticSearch&#xff08;实战&#xff09;下载地址目的日志数据模拟Pipeline创建pipeline查看Pipeline是否创建成功创建FileBeat配置文件 filebeat.yml创建自定义字段 FileBeat fields.yml执行 FileBeatfilebeat 启动命令说明测试…