Canal安装部署与测试

文章目录

  • 第一章 Canal概述
    • 1.1 简介
    • 1.2 工作原理
      • 1.2.1 MySQL主备复制原理
      • 1.2.2 canal 工作原理
    • 1.3 重要版本更新说明
    • 1.4 多语言
  • 第二章 Canal安装部署
    • 2.1 准备
    • 2.2 canal安装
  • 第三章 Canal和Kafka整合测试
  • 注意事项

第一章 Canal概述

Github地址:https://github.com/alibaba/canal

1.1 简介

image-20201118180854772

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2 工作原理

1.2.1 MySQL主备复制原理

img

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

1.2.2 canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流),再回放中继日志

1.3 重要版本更新说明

  1. canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:
    • 整体性能测试&优化,提升了150%. #726 参考: Performance
    • 原生支持prometheus监控 #765 Prometheus QuickStart
    • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
    • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
    • 原生支持docker镜像 #801 参考: Docker QuickStart
  2. canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide

1.4 多语言

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,欢迎大家提交 pull request

  • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
  • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
  • canal go客户端: https://github.com/CanalClient/canal-go
  • canal Python客户端: https://github.com/haozi3156666/canal-python

canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力

  • 参考文档: Canal Kafka/RocketMQ QuickStart

第二章 Canal安装部署

2.1 准备

  • 查看mysql是否开启bin-log,查询语句如下:

    SHOW VARIABLES LIKE '%log_bin%'
    

    如查询出来log_bin的Value是OFF状态,则需要执行如下的开启bin-log步骤。

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复#注意,配置后需要重启msyql
    [root@hadoop01 ~]# systemctl restart mysqld
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant(如有授权用户可不执行如下命令)。

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

2.2 canal安装

https://github.com/alibaba/canal/releases 中查找 canal 最新deploy 版本的下载链接
1.下载压缩包:

[root@hadoop01 ~]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2.解压

[root@hadoop01 ~]# mkdir /usr/local/canal
[root@hadoop01 ~]# tar -zxvf /home/canal.deployer-1.1.4.tar.gz -C /usr/local/canal

3.创建测试配置直接使用自带的样例进行更改,并复制出默认配置以备后期增加库时使用,这里的每一个文件夹就代表一个数据源,多个数据源就复制出多个文件夹后在 canal.properties 的 canal.destinations 里逗号分隔进行配置:

[root@hadoop01 canal]# cp -r ./conf/example/ ./conf/test

4.修改单个数据源的配置

[root@hadoop01 canal]# vi ./conf/test/instance.properties
#修改如下
canal.instance.master.address=127.0.0.1:3306 	# 数据库服务器地址
canal.instance.dbUsername=root 	# 数据库用户名
canal.instance.dbPassword=root 	# 数据库密码
#canal.instance.filter.regex=.*\\..*  # 数据表过滤正则,dbName.tbName,默认的是所有库的所有表
canal.instance.filter.regex=travel.dim_product1 	# 数据表过滤正则,dbName.tbName,默认的是所有库的所有表
canal.mq.topic=travel_dim_product1 		# 这个数据库存储进 Kafka 时使用的 topic

5.修改 Canal 全局设置

[root@hadoop01 canal]# vi conf/canal.properties#修改如下
canal.destinations = test 	# 这里配置开启的 instance,具体方法上面步骤有说明canal.serverMode = kafka 	# 更改模式,直接把数据扔进 Kafka
#canal.mq.servers = 127.0.0.1:6667
canal.mq.servers = 192.168.216.111:9092,192.168.216.112:9092,192.168.216.113:9092 	# Kafka 的地址
canal.mq.batchSize = 16384 # 这里没有更改,值应该小于 Kafka 的 config/producer.properties 中 batch.size,但是 Kafka 里没设置,这里也就不更改了
#canal.mq.flatMessage = false
canal.mq.flatMessage = true 	# 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进制数据,虽然不影响,但是看起来不方便
配置文件详解:
参数名字 参数说明 默认值
canal.destinations 当前server上部署的instance列表
canal.id 每个canal server实例的唯一标识,暂无实际意义 1
canal.ip canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 无
canal.port canal server提供socket服务的端口 11111
canal.zkServers canal server链接zookeeper集群的链接信息
例子:127.0.0.1:2181,127.0.0.1:2182 无canal.zookeeper.flush.period canal持久化数据到zookeeper上的更新频率,单位毫秒 1000
canal.file.data.dir canal持久化数据到file上的目录 …/conf (默认和instance.properties为同一目录,方便运维和备份)
canal.file.flush.period canal持久化数据到file上的更新频率,单位毫秒 1000
canal.instance.mysql.slaveId mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 1234
canal.instance.master.address mysql主库链接地址 127.0.0.1:3306
canal.instance.master.journal.name mysql主库链接时起始的binlog文件 无
canal.instance.master.position mysql主库链接时起始的binlog偏移量 无
canal.instance.master.timestamp mysql主库链接时起始的binlog的时间戳 无
canal.instance.dbUsername mysql数据库帐号 canal
canal.instance.dbPassword mysql数据库密码 canal
canal.instance.defaultDatabaseName mysql链接时默认schema 
canal.instance.connectionCharset mysql 数据解析编码 UTF-8
canal.instance.filter.regex mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)	.*\\..*

6.开启 Canal
bin/startup.sh

#因为 Canal 的脚本名称都太普通,所以没有添加到 PATH 里

[root@hadoop01 canal]# ./bin/startup.sh

7.查看日志是否有异常:

[root@hadoop01 canal]# tail -f ./logs/canal/canal.log   #server启动日志
[root@hadoop01 canal]# tail -f ./logs/test/test.log   #instance 的日志

第三章 Canal和Kafka整合测试

kafka的安装在这儿省略,大家自行安装即可…

  1. 测试,连上数据库尝试执行更改 SQL
CREATE TABLE `yq` (`dt` varchar(25) NOT NULL,`province` varchar(255) NOT NULL,`adds` int(11) DEFAULT '0',`possibles` int(11) DEFAULT '0',PRIMARY KEY (`dt`,`province`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
**查看 Kafka 中的数据,列出所有 topic:
kafka-topics.sh --list --zookeeper localhost:2181消费其中的数据:kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning此时应该已经可以列出创建表时的语句后期 Bug 修复:消费端 Kafka 不进数据,Canal 日志报错 org.apache.kafka.common.errors.RecordTooLargeException,认为是 Kafka 消息体大小限制造成的,需要同时修改 Kafka 与 Canal 消息体的最大限制修改 Kafka 配置,server.properties 中修改或添加配置项 message.max.bytes=100000000,producer.properties 中修改或添加配置项 max.request.size=100000000,consumer.properties 中修改或添加配置项 max.partition.fetch.bytes=100000000,重启 Kafka修改 Canal 配置,canal.properties 修改 canal.mq.maxRequestSize 参数值为 90000000,重启 Canal查看 Canal 日志是否报错 Could not find first log file name in binary log index file at… 如果报错则停止 Canal ,再删除实例配置下的 meta.dat 文件,再启动 Canal 即可**
  1. 对yq表进行update、insert、delete操作
{"data":[{"dt":"2020-7-1","province":"beijing","adds":"3","possibles":"12"}],"database":"test","es":1603647506000,"id":1,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":[{"possibles":"6"}],"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647506282,"type":"UPDATE"}
{"data":[{"dt":"2020-7-2","province":"beijing","adds":"10","possibles":"6"}],"database":"test","es":1603647607000,"id":2,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":[{"possibles":"5"}],"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647607348,"type":"UPDATE"}
{"data":[{"dt":"2020-10-26","province":"qingdao","adds":"2","possibles":"3"}],"database":"test","es":1603647656000,"id":3,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":null,"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647656168,"type":"INSERT"}
{"data":[{"dt":"2020-10-26","province":"qingdao","adds":"2","possibles":"3"}],"database":"test","es":1603647707000,"id":4,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":null,"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647707172,"type":"DELETE"}
  1. 关闭Canal
[root@hadoop01 canal]# ./bin/stop.sh关闭之后,如果mysql的表数据有修改,当重启canal以后,将会把变更数据投递到kafka中!!!

注意事项

1、bin-log的存储三种Format:

mysql复制主要有三种方式:基于SQL语句的复制(statement-based replication, SBR),基于行的复制(row-based replication, RBR),混合模式复制(mixed-based replication, MBR)。对应的,binlog的格式也有三种:STATEMENT,ROW,MIXED。① STATEMENT模式(SBR)每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)② ROW模式(RBR)不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。③ MIXED模式(MBR)以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。

2、canal会丢失数据吗?

建议将增量数据投递到MQ中,如果canal有异常,当恢复时候会重读中继日志

3、canal的下游怎么处理?

存储可以进行实时读写处理中---流式计算 ---> 追加到流表中、选择支持upsert 模式流表中、或者选择自己维护状态
离线,可以使用canal周期落地 --- 离线

4、canal对接kafka,kafka多分区怎么办

1、单分区能保证增量数据顺序---但是丢失并行度
2、按照维度得主键进行分区  --- 增加并行处理能力kafka是3个分区
k-0
k-1
k-2mysql
111
112
113111 zs k-0
111 ls k-1
113 k-2flink消费:
3个并行度消费
k-1 先消费
k-0 消费

5、mysql多少节点、canal多少节点

如果维度表数量在50个一下,则构建3-5个canal集群即可。
50-100个维度表,则构建7-11个左右canal即可

6、canal保留原始数据和增量数据?

ES或者HBASE:
eid
主键_version 其它字段 是否有效 起始时间 生效时间redis:
主键 [{},{},{}] 索引+1就是版本信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/8653.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用goldengate 迁移Oracle到postgresql

环境: --源端: IP:10.0.4.16 hostname:tencent Oracle数据库版本:12.2.0.1.0 ogg for oracle版本:19.1.0.0.4 SID:orcl --目标端: IP:10.0.4.16 hostname&#…

ES6基础知识三:对象新增了哪些扩展?

一、属性的简写 ES6中,当对象键名与对应值名相等的时候,可以进行简写 const baz {foo:foo}// 等同于 const baz {foo}方法也能够进行简写 const o {method() {return "Hello!";} };// 等同于const o {method: function() {return "…

【MATLAB绘图】

MATLAB绘图函数:Plot函数详解 介绍 MATLAB是一种常用的科学计算和数据可视化工具,它提供了强大的绘图函数,使用户能够创建各种类型的图表和图形。 基本语法 plot函数的基本语法如下: plot(x, y)其中,x和y是长度相…

第八次CCF计算机软件能力认证

第一题:最大波动 小明正在利用股票的波动程度来研究股票。 小明拿到了一只股票每天收盘时的价格,他想知道,这只股票连续几天的最大波动值是多少,即在这几天中某天收盘价格与前一天收盘价格之差的绝对值最大是多少。 输入格式 输入…

子网划分和计网解题方法

子网的基本概念 子网是计算机网络中的一个逻辑单元,是由多个IP地址组成的网络。在计算机网络中,IP地址是一个32位的二进制数,用于标识网络上的设备。子网划分是将一个大型的IP地址网络划分为多个小的IP地址网络,每个小的IP地址网…

uni-app:实现账号密码登录,并且实现当页面登录过该账号在下次登录时无需再输入账号密码(本地缓存实现)

效果 前端代码 一、完整代码 <template><view><view class"all"><view class"title"><image :src"title_login" alt"图片损坏" /></view><form class"login-form" submit"fo…

TSDB - VictoriaMetrics 技术原理浅析

一、前言 在监控领域&#xff0c;通常需要指标存储组件TSDB&#xff0c;目前开源的TSDB组件比较多&#xff0c;各个组件性能、高可用性、维护成本等等各有差异。本文不分析选型问题&#xff0c;重点讲解VictoriaMetrics&#xff08;后面简称为vm&#xff09;。 有兴趣的朋友建议…

吉林大学计算机软件考研经验贴

文章目录 简介政治英语数学专业课 简介 本人23考研&#xff0c;一战上岸吉林大学软件工程专硕&#xff0c;政治72分&#xff0c;英一71分&#xff0c;数二144分&#xff0c;专业课967综合146分&#xff0c;总分433分&#xff0c;上图&#xff1a; 如果学弟学妹需要专业课资料…

Mysql执行计划字段解释

文章目录 一、前言二、如何查看执行计划三、执行计划各字段解释四、select_type4.1、SIMPLE&#xff08;简单查询&#xff09;4.1.1、简单的单表查询4.1.2、多表连接查询 4.2、PRIMARY&#xff08;主查询&#xff09;4.2.1、包含复杂子查询的外层查询4.2.2、UNION语句中的第一个…

自动化测试框架unittest与pytest的区别!

引言 前面文章已经介绍了python单元测试框架&#xff0c;大家平时经常使用的是unittest&#xff0c;因为它比较基础&#xff0c;并且可以进行二次开发&#xff0c;如果你的开发水平很高&#xff0c;集成开发自动化测试平台也是可以的。而这篇文章主要讲unittest与pytest的区别&…

可解释的 AI:在transformer中可视化注意力

Visualizing Attention in Transformers | Generative AI (medium.com) 一、说明 在本文中&#xff0c;我们将探讨可视化变压器架构核心区别特征的最流行的工具之一&#xff1a;注意力机制。继续阅读以了解有关BertViz的更多信息&#xff0c;以及如何将此注意力可视化工具整合到…

Debian12中为python3配置虚拟环境及在Pycharm中使用虚拟环境

在Debian 12中&#xff0c;python默认为python 3.11。 基于应用&#xff0c;现需设置虚拟环境。 1.安装venv模块 从python3.3开始&#xff0c;配置python虚拟环境&#xff0c;可用venv模块&#xff0c;更加方便了。 执行命令&#xff1a; #apt install python3.11-venv 2.…

Java虚拟机——前端编译优化

Java的编译期是有上下文语境影响的&#xff0c;不同语境下可以指不同的过程&#xff1a; 可以是前端编译器&#xff0c;把*.java文件转变成*.class文件的过程。 JDK的Javac、Eclipse JDT中的增量式编译器 可以指Java虚拟机的即时编译器&#xff08;JIT编译器&#xff09;在运…

【算法基础:搜索与图论】3.6 二分图(染色法判定二分图匈牙利算法)

文章目录 二分图介绍染色法判定二分图例题&#xff1a;860. 染色法判定二分图 匈牙利匹配二分图最大匹配匈牙利匹配算法思想例题&#xff1a;861. 二分图的最大匹配 二分图介绍 https://oi-wiki.org/graph/bi-graph/ 二分图是图论中的一个概念&#xff0c;它的所有节点可以被…

FPGA驱动SPI屏幕(附完整工程)

一. 简介 相信大家都玩过屏幕&#xff0c;在FPGA上使用最多的就是VGA/HDMI接口的显示器了&#xff0c;这两种显示器的优点就不用说了&#xff0c;缺点就是体积比较大&#xff0c;而且价格比较贵&#xff0c;对于追求便携/价格低的我来说&#xff0c;SPI接口的屏幕才是我的首要…

更新合集 | 七月功能上新记

点击链接了解详情 七月来临&#xff0c;正式开启 2023 下半年的新征途&#xff01;这个盛夏&#xff0c;腾讯云 CODING 上线了微信扫码注册、微信通知、Go 制品管理等重点能力&#xff0c;为企业及团队研发管理带来更多便利&#xff01;以下是 CODING 新功能速递&#xff0c;快…

登录页的具体实现 (小兔鲜儿)【Vue3】

登录页 整体认识和路由配置 整体认识 登录页面的主要功能就是表单校验和登录登出业务 准备模板 <script setup></script><template><div><header class"login-header"><div class"container m-top-20"><h1 cl…

详解go的hex.Encode原理

简言 今天看nsq的messageID生成的时候&#xff0c;发现它使用了hex.Encode函数来产生编码&#xff0c;那就顺道研究一下这个编码方式。 原理 hex是16进制的意思&#xff0c;encode是进行编码的意思&#xff0c;内部实现也很简单&#xff0c;就是 每4位计算出十六进制的值&a…

基于Python机器学习、深度学习在气象、海洋、水文等技能提升教程

详情点击链接&#xff1a;基于Python机器学习、深度学习技术提升气象、海洋、水文领域实践应用 前言 Python是功能强大、免费、开源&#xff0c;实现面向对象的编程语言&#xff0c;能够在不同操作系统和平台使用&#xff0c;简洁的语法和解释性语言使其成为理想的脚本语言。…

数据可视化(1)

使用python带的matplotlib库进行简单的绘图。使用之前先进行安装&#xff0c;pip install matplotlib。如果安装了Anaconda,则无需安装matplotlib。 1.简单折线图 #绘制简单图表 import matplotlib.pyplot as plt plt.plot([1,2,3,4,5]) plt.show() import matplotlib.pyp…