canal框架
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
参阅: GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
原理
MySQL主备复制原理(借用)
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
架构图
Canal安装
docker search
pull canal/canal-server
挂载文件
[root@localhost software]# cd canal/
[root@localhost canal]# ls
conf
上传挂载文件到canal/conf
配置
canal.properties
instance.properties
master的位置
创建一个和canal交互的用户
创建容器
docker run \
--name canal \
--privileged \
-p 11111:11111 \
--network wn_docker_net \
--ip 172.18.12.66 \
-v /usr/local/software/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \
-v /usr/local/software/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-d canal/canal-server
docker run \
--name canal \
-d canal/canal-server
docker logs canal
springboot整合canal
springboot整合canal
rabbitmq监听canal队列,获取canal转发过来的数据并进行json包装处理(将json字符串转换为对象)
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.lang.reflect.Field;@Service
@Slf4j
public class CanalServiceImpl implements ICanalService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@RabbitListener(queues = "canal.topic.queue")@Overridepublic void listenCanalMQ(Message message) throws InstantiationException, IllegalAccessException {String msg = new String(message.getBody());log.debug("来自与监听canal的队列信息_>{}", msg);//消息过滤JSONObject json = new JSONObject(msg);log.debug("type:{}", json.get("type"));log.debug("table:{}", json.get("table"));log.debug("data:{}", json.get("data"));log.debug("old:{}", json.get("old"));System.out.println("--->" + json.get("data").getClass());JSONArray jsonArray = (JSONArray) json.get("data");System.out.println("-x->" + jsonArray.get(0).getClass());JSONObject jo = (JSONObject) jsonArray.get(0);Class<Older> clazz = Older.class;Older older = clazz.newInstance();Field[] declaredFields = clazz.getDeclaredFields();for (Field field : declaredFields) {field.setAccessible(true);String name = field.getName();field.set(older, jo.get("older_"+name));}}
}
数据回来之后Json的json格式
{
"data": [
{
"older_id": "7",
"older_username": "zhuge",
"older_password": "123",
"older_name": "诸葛卧龙亮",
"older_create_time": null,
"older_create_by": null,
"older_update_time": null
}
],
"database": "ssc_db",
"es": 1700708544000,
"id": 4,
"isDdl": false,
"mysqlType": {
"older_id": "bigint",
"older_username": "varchar(50)",
"older_password": "varchar(50)",
"older_name": "varchar(50)",
"older_create_time": "datetime",
"older_create_by": "varchar(50)",
"older_update_time": "datetime"
},
"old": [
{
"older_name": "诸葛亮"
}
],
"pkNames": [
"older_id"
],
"sql": "",
"sqlType": {
"older_id": -5,
"older_username": 12,
"older_password": 12,
"older_name": 12,
"older_create_time": 93,
"older_create_by": 12,
"older_update_time": 93
},
"table": "older_tab",
"ts": 1700708545181,
"type": "UPDATE"
}