众所周知,MySQL 的用户群体很大,为了能够增强数据的实时性,很多解决方案会利用 binlog 将数据写入到 ClickHouse。
为了能够监听 binlog 事件,我们需要用到类似 canal 这样的第三方中间件,这无疑增加了系统的复杂度。
在不久的将来,这一现状可能会发生改观。因为目前,编号 10851 的 PR 进入了 review 阶段。
(https://github.com/ClickHouse/ClickHouse/pull/10851)
该 PR 将为 ClickHouse 带来原生消费 binlog 日志的能力,是不是王炸功能?
这次是新增了一个名为 MaterializeMySQL 的 database 引擎,该 database 能映射到 MySQL 中的某个 database,并自动在 ClickHouse 中创建对应的 ReplacingMergeTree。
MaterializeMySQL 同时支持全量和增量同步,在 database 创建之初会全量同步 MySQL 中的表和数据,之后则会通过 binlog 进行增量同步。
MaterializeMySQL database 为其所创建的每张 ReplacingMergeTree 自动增加了 _sign 和 _version 字段。
其中, _version 用作 ReplacingMergeTree 的 ver 版本参数,每当监听到 insert、update 和 delete 事件时,在 databse 内全局自增。而 _sign 则用于标记是否被删除,取值 1 或者 -1。
目前 MaterializeMySQL 支持如下几种 binlog 事件:
MYSQL_WRITE_ROWS_EVENT
_sign = 1,_version ++
MYSQL_DELETE_ROWS_EVENT
_sign = -1,_version ++
MYSQL_UPDATE_ROWS_EVENT
新数据 _sign = 1
MYSQL_QUERY_EVENT
支持 CREATE TABLE 、DROP TABLE 、RENAME TABLE等。
虽然该 PR 目前还没有被 merge 到主线,但是我已忍不住想要尝鲜,接下来就让我们一睹它的芳容吧。
首先准备一个 MySQL 实例
#拉取镜像docker pull mysql:5.7.31#运行镜像docker run -p 3306:3306 --name mysql5.7 -v {your-path}/mysql/conf:/etc/mysql -v {your-path}/mysql/logs:/logs -v {your-path}/mysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=root -d mysql:5.7.31
确保 MySQL 开启了 binlog 功能,且格式为 ROW
#conf/my.cnfserver-id=1# Uncomment the following if you want to log updateslog-bin=mysql-bin# binary logging format - mixed recommended#binlog_format=mixedbinlog_format=ROW
现在开始测试,首先在 MySQL 中创建数据表并写入数据
CREATE TABLE `t_organization` (`id` int(11) NOT NULL AUTO_INCREMENT,`code` int NOT NULL,`name` text DEFAULT NULL,`updatetime` datetime DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY (`code`)) ENGINE=InnoDB;INSERT INTO t_organization (code, name,updatetime) VALUES(1000,'Realinsight',NOW());INSERT INTO t_organization (code, name,updatetime) VALUES(1001,'Realindex',NOW());INSERT INTO t_organization (code, name,updatetime) VALUES(1002,'EDT',NOW());
继续创建第二张表
CREATE TABLE `t_user` (`id` int(11) NOT NULL AUTO_INCREMENT,`code` int,PRIMARY KEY (`id`)) ENGINE=InnoDB;INSERT INTO t_test (code) VALUES(1);
现在轮到 ClickHouse 这边了,创建 MaterializeMySQL 数据库
CREATE DATABASE test_binlog ENGINE = MaterializeMySQL('127.0.0.1:3306','htap','root','root');
其中 4 个参数分别是 MySQL地址、databse、username 和 password。
执行之后可以观察一下它执行日志
2020.07.2020.07.29 01:29:53.571991 [ 868116 ] {} executeQuery: (internal) /*Materialize MySQL step 1: execute MySQL DDL for dump data*/ EXTERNAL DDL FROM MySQL(test_binlog, htap) CREATE TABLE `t_organization` ( `id` int(11) NOT NULL AUTO_INCREMENT, `code` int(11) NOT NULL, `name` text, `updatetime` datetime DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `code` (`code`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf82020.07.29 01:29:53.577595 [ 868116 ] {} executeQuery: (internal) /* Rewritten MySQL DDL Query */ CREATE TABLE test_binlog.t_organization (`id` Int32, `code` Int32, `name` Nullable(String), `updatetime` Nullable(DateTime), `_sign` Int8, `_version` UInt64) ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(id, 4294967) ORDER BY (code, id)
可以看到,ClickHouse dump 出了 MySQL 的表结构,并将其转换成了 ReplacingMergeTree。
在这个过程中,不仅各字段的数据类型进行了映射,还多出了 _sign 和 _version 两个字段。
MySQL 表的 PRIMARY KEY 作为了 ReplacingMergeTree 的 PARTITION BY,并且按照类型大小除以1000整除;
MySQL 表的 PRIMARY KEY 同时也作为了 ReplacingMergeTree 的 ORDER BY,如果 MySQL 表中还有 UNIQUE KEY (此例中的 code 字段),它也会一同被加入到 ORDER BY。
现在我们查询 ClickHouse 的 test_binlog 数据库
use test_binlog;show tables;┌─name───────────┐│ t_organization ││ t_user │└────────────────┘
MySQL 的表已经被同步过来了,接着试试查询数据
select * from t_organization;SELECT *FROM t_organization┌─id─┬─code─┬─name────────┬──────────updatetime─┐│ 1 │ 1000 │ Realinsight │ 2020-07-28 17:29:47 │└────┴──────┴─────────────┴─────────────────────┘┌─id─┬─code─┬─name──────┬──────────updatetime─┐│ 2 │ 1001 │ Realindex │ 2020-07-28 17:29:48 │└────┴──────┴───────────┴─────────────────────┘┌─id─┬─code─┬─name─┬──────────updatetime─┐│ 3 │ 1002 │ EDT │ 2020-07-28 17:29:49 │└────┴──────┴──────┴─────────────────────┘3 rows in set. Elapsed: 0.032 sec.
接下来进一步测试 binlog 的同步功能。
首先在 MySQL 中修改数据:
update t_organization set name = CONCAT(name,'-v1') where id = 1
查看 ClickHouse 日志会发现 binlog 监听事件:
=== UpdateRowsEventV2 ===Timestamp: 1595958048Event Type: 31Server ID: 1Event Size: 93Log Pos: 20454Flags: 0Schema: htapTable: t_organizationRow[0]: (1, 1000, 'Realinsight', 1595928587)Row[1]: (1, 1000, 'Realinsight-v1', 1595928587)
查询 ClickHouse 的数据表:
select * from t_organization;┌─id─┬─code─┬─name───────────┬──────────updatetime─┐│ 1 │ 1000 │ Realinsight-v1 │ 2020-07-28 17:29:47 │└────┴──────┴────────────────┴─────────────────────┘┌─id─┬─code─┬─name──────┬──────────updatetime─┐│ 2 │ 1001 │ Realindex │ 2020-07-28 17:29:48 │└────┴──────┴───────────┴─────────────────────┘┌─id─┬─code─┬─name─┬──────────updatetime─┐│ 3 │ 1002 │ EDT │ 2020-07-28 17:29:49 │└────┴──────┴──────┴─────────────────────┘
可以看到 id = 1的数据被修改了。
现在再次回到 MySQL,尝试删除数据:
DELETE FROM t_organization where id = 2
回到 ClicKHouse,同样会发现 DeleteRows 的 binlog 监听事件:
=== DeleteRowsEventV2 ===Timestamp: 1595958230Event Type: 32Server ID: 1Event Size: 60Log Pos: 20744Flags: 0Schema: htapTable: t_organizationRow[0]: (2, 1001, 'Realindex', 1595928588)
查询 ClickHouse的 t_organization:
select * from t_organization;SELECT *FROM t_organization┌─id─┬─code─┬─name───────────┬──────────updatetime─┐│ 1 │ 1000 │ Realinsight-v1 │ 2020-07-28 17:29:47 │└────┴──────┴────────────────┴─────────────────────┘┌─id─┬─code─┬─name─┬──────────updatetime─┐│ 3 │ 1002 │ EDT │ 2020-07-28 17:29:49 │└────┴──────┴──────┴─────────────────────┘
id = 2 的数据被删除了。
这是怎么实现的呢? 在刚才的查询中增加 _sign 和 _version 虚拟字段,一切将会真相大白。
select *,_sign,_version from t_organization order by _sign desc,_version descSELECT*,_sign,_versionFROM t_organizationORDER BY_sign DESC,_version DESC┌─id─┬─code─┬─name───────────┬──────────updatetime─┬─_sign─┬─_version─┐│ 1 │ 1000 │ Realinsight-v1 │ 2020-07-28 17:29:47 │ 1 │ 2 │└────┴──────┴────────────────┴─────────────────────┴───────┴──────────┘┌─id─┬─code─┬─name────────┬──────────updatetime─┬─_sign─┬─_version─┐│ 1 │ 1000 │ Realinsight │ 2020-07-28 17:29:47 │ 1 │ 1 ││ 2 │ 1001 │ Realindex │ 2020-07-28 17:29:48 │ 1 │ 1 ││ 3 │ 1002 │ EDT │ 2020-07-28 17:29:49 │ 1 │ 1 │└────┴──────┴─────────────┴─────────────────────┴───────┴──────────┘┌─id─┬─code─┬─name──────┬──────────updatetime─┬─_sign─┬─_version─┐│ 2 │ 1001 │ Realindex │ 2020-07-28 17:29:48 │ -1 │ 3 │└────┴──────┴───────────┴─────────────────────┴───────┴──────────┘5 rows in set. Elapsed: 0.048 sec.
在查询时,对于已经被删除的数据,ClickHouse 会自动重写 SQL,将 _sign = -1 的数据过滤掉;
对于修改的数据,则自动重写 SQL,为其增加 FINAL 修饰符。
select * from t_organization等同于select * from t_organization final where _sign = 1
在 20.5 版本中,final 查询已经支持多线程,性能有很大的提升。
大家应该会发现,目前在 ReplacingMergeTree 中被删除的数据只是被过滤掉了,并没有物理删除。经与作者大神 zhang2014 咨询,将来会通过类似 GC 的思路通过另外的线程定期删除 _sign = -1 的数据。
这项功能如果被 merge 进主线,无疑会增强 ClicKHouse 更加自动化的属性。
如果这篇文章对你有帮助,欢迎点赞、转发、在看三连击 :)
欢迎大家扫码关注我的
公众号和视频号
:
ClickHouse的秘密基地
nauu的奇思妙想
往期精彩推荐: