1、Canal介绍
Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x,Canal通过伪装成mysql从服务向主服务拉取数据。所以首先我们要了解mysql 主从数据同步。
2、Mysql 主从数据同步
1、从库(slave)会生成两个线程,I/O线程(IOthread),SQL线程(SQLthread)。
2、当slave的I/O线程连接到master后,会去请求master的二进制日志(binlog), 此时master会通过logdump(将主库的二进制日志文件内容传输给从库的过程) 给从库传输binlog。
3、 然后slave将拿到的binlog日志依次写入Relaylog(中继日志)的最末端,同时将读取到的Master 的bin-log的文件名和位置记录到master- info文件中,作用为了让slave知道它需要从哪个位置和哪 个日志文件开始同步数据,以保证数据的一致性,并且能够及时获取到master的新的更新操作, 开始数据同步过程。slave不仅在启动时读取 master-info 文件,而且会定期更新该文件中的记 录,以确保记录都是最新的。
4、最后SQL线程会读取Relaylog,并解析为具体操作(比如DDL这种),来实现主从库的操作一致。
3、Canal 原理
1、Canal Server与MySQL建立连接后,会通过模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议获取数据库的 binlog(二进制日志)文件。
2、Canal Server解析binlog文件,通过网络将解析后的事件传输给消息中间件(Kafka,RabbitMQ、Rocket等),实现数据的实时同步。
4、实现过程
下载Canal,地址为https://github.com/alibaba/canal/releases,然后解压
修改Canal的配置文件(conf/canal.properties)
指定模式canal.serverMode = rabbitMQ# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2canal.destinations = example# rabbitmq 服务端 iprabbitmq.host = 你的ip(注意不要加端口号哦)# rabbitmq 虚拟主机rabbitmq.virtual.host = /# rabbitmq 交换机rabbitmq.exchange = canal.exchange (这是本例子用的交换机)# rabbitmq 用户名rabbitmq.username = 你的用户名# rabbitmq 密码rabbitmq.password = 你的密码rabbitmq.deliveryMode =
修改实例配置文件 conf/example/instance.properties。
#配置 slaveId,自定义,不等于 mysql 的 server Id 即可canal.instance.mysql.slaveId=10# 数据库地址:配置自己的ip和端口canal.instance.master.address=你的IP:端口号# 数据库用户名和密码canal.instance.dbUsername=用户名canal.instance.dbPassword=密码# 指定库和表canal.instance.filter.regex=.*\..* # 这里的 .* 表示 canal.instance.master.address 下面的所有数据库# mq config# rabbitmq 的 routing keycanal.mq.topic=canal.routing.key(这是本例子用的key)
然后重启 canal 服务。
2 、搭建RabbitMq并且 配置Exchange、Routing key。
3、Spring Boot 实现代码
引入依赖
<!--引入rabbitmq集成的依赖--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
编辑配置文件
spring:#rabbitmqrabbitmq:# host: 192.168.31.189host: 192.168.1.12port: 5672virtual-host: /username: adminpassword: adminlistener:simple: #手动确认消息 ackacknowledge-mode: manualretry:enabled: truemax-attempts: 3max-interval: 1000ms
server:port: 8888
定义队列的消息接受的数据对象
@Data
public class CanalMessage<T> {private String type;private String table;private List<T> data;private String database;private Long es;private Integer id;private Boolean isDdl;private List<T> old;private List<String> pkNames;private String sql;private Long ts;
}
定义rabbitmq的配置,代码如下
@Configuration
@Slf4j
public class RabbitConfig {/*** 消息序列化配置*/@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {// SimpleRabbitListenerContainerFactory 是 RabbitMQ 提供的一个实现了 RabbitListenerContainerFactory 接口的简单消息监听器容器工厂。// 它的作用是创建和配置 RabbitMQ 消息监听器容器,用于监听和处理消息。SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//ConnectionFactory 是 RabbitMQ 提供的一个接口,用于创建 RabbitMQ 的连接factory.setConnectionFactory(connectionFactory);//使用了 Jackson2JsonMessageConverter 将消息转换为 JSON 格式进行序列化和反序列化factory.setMessageConverter( new Jackson2JsonMessageConverter());return factory;}
}
消费的代码
@Component
@RabbitListener(queues = "data")
public class MqNotifyReceive {@RabbitHandlerpublic void receive(CanalMessage CanalMessage) {//进行同步业务处理。。。 }
}